y_octo/doc/
document.rs

1use std::collections::HashMap;
2
3#[cfg(feature = "events")]
4use publisher::DocPublisher;
5
6use super::{
7    history::StoreHistory,
8    store::{ChangedTypeRefs, StoreRef},
9    *,
10};
11use crate::sync::{Arc, RwLock};
12
13#[cfg(feature = "debug")]
14#[derive(Debug, Clone)]
15pub struct DocStoreStatus {
16    pub nodes: usize,
17    pub delete_sets: usize,
18    pub types: usize,
19    pub dangling_types: usize,
20    pub pending_nodes: usize,
21}
22
23/// [DocOptions] used to create a new [Doc]
24///
25/// ```
26/// use y_octo::DocOptions;
27///
28/// let doc = DocOptions::new()
29///     .with_client_id(1)
30///     .with_guid("guid".into())
31///     .auto_gc(true)
32///     .build();
33///
34/// assert_eq!(doc.guid(), "guid")
35/// ```
36#[derive(Clone, Debug)]
37pub struct DocOptions {
38    pub guid: String,
39    pub client_id: u64,
40    pub gc: bool,
41}
42
43impl Default for DocOptions {
44    fn default() -> Self {
45        if cfg!(any(test, feature = "bench")) {
46            Self {
47                client_id: 1,
48                guid: "test".into(),
49                gc: true,
50            }
51        } else {
52            Self {
53                client_id: prefer_small_random(),
54                guid: nanoid::nanoid!(),
55                gc: true,
56            }
57        }
58    }
59}
60
61impl DocOptions {
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    pub fn with_client_id(mut self, client_id: u64) -> Self {
67        self.client_id = client_id;
68        self
69    }
70
71    pub fn with_guid(mut self, guid: String) -> Self {
72        self.guid = guid;
73        self
74    }
75
76    pub fn auto_gc(mut self, gc: bool) -> Self {
77        self.gc = gc;
78        self
79    }
80
81    pub fn build(self) -> Doc {
82        Doc::with_options(self)
83    }
84}
85
86impl From<DocOptions> for Any {
87    fn from(value: DocOptions) -> Self {
88        Any::Object(HashMap::from_iter([
89            ("gc".into(), value.gc.into()),
90            ("guid".into(), value.guid.into()),
91        ]))
92    }
93}
94
95impl TryFrom<Any> for DocOptions {
96    type Error = JwstCodecError;
97
98    fn try_from(value: Any) -> Result<Self, Self::Error> {
99        match value {
100            Any::Object(map) => {
101                let mut options = DocOptions::default();
102                for (key, value) in map {
103                    match key.as_str() {
104                        "gc" => {
105                            options.gc = bool::try_from(value)?;
106                        }
107                        "guid" => {
108                            options.guid = String::try_from(value)?;
109                        }
110                        _ => {}
111                    }
112                }
113
114                Ok(options)
115            }
116            _ => Err(JwstCodecError::UnexpectedType("Object")),
117        }
118    }
119}
120
121#[derive(Debug, Clone)]
122pub struct Doc {
123    client_id: u64,
124    opts: DocOptions,
125
126    pub(crate) store: StoreRef,
127    #[cfg(feature = "events")]
128    pub publisher: Arc<DocPublisher>,
129    pub(crate) batch: Somr<Batch>,
130}
131
132unsafe impl Send for Doc {}
133unsafe impl Sync for Doc {}
134
135impl Default for Doc {
136    fn default() -> Self {
137        Doc::new()
138    }
139}
140
141impl PartialEq for Doc {
142    fn eq(&self, other: &Self) -> bool {
143        self.client_id == other.client_id
144    }
145}
146
147impl Doc {
148    pub fn new() -> Self {
149        Self::with_options(DocOptions::default())
150    }
151
152    pub fn with_options(options: DocOptions) -> Self {
153        let store = Arc::new(RwLock::new(DocStore::with_client(options.client_id)));
154        #[cfg(feature = "events")]
155        let publisher = Arc::new(DocPublisher::new(store.clone()));
156
157        Self {
158            client_id: options.client_id,
159            opts: options,
160            store,
161            #[cfg(feature = "events")]
162            publisher,
163            batch: Somr::none(),
164        }
165    }
166
167    pub fn with_client(client_id: u64) -> Self {
168        DocOptions::new().with_client_id(client_id).build()
169    }
170
171    pub fn client(&self) -> Client {
172        self.client_id
173    }
174
175    pub fn set_client(&mut self, client_id: u64) {
176        self.client_id = client_id;
177    }
178
179    pub fn renew_client(&mut self) {
180        self.client_id = prefer_small_random();
181    }
182
183    pub fn clients(&self) -> Vec<u64> {
184        self.store.read().unwrap().clients()
185    }
186
187    pub fn history(&self) -> StoreHistory {
188        let history = StoreHistory::new(&self.store);
189        history.resolve();
190        history
191    }
192
193    #[cfg(feature = "debug")]
194    pub fn store_status(&self) -> DocStoreStatus {
195        let store = self.store.read().unwrap();
196
197        DocStoreStatus {
198            nodes: store.total_nodes(),
199            delete_sets: store.total_delete_sets(),
200            types: store.total_types(),
201            dangling_types: store.total_dangling_types(),
202            pending_nodes: store.total_pending_nodes(),
203        }
204    }
205
206    pub(crate) fn get_changed(&self) -> ChangedTypeRefs {
207        self.store.write().unwrap().get_changed()
208    }
209
210    pub fn store_compare(&self, other: &Doc) -> bool {
211        let store = self.store.read().unwrap();
212        let other_store = other.store.read().unwrap();
213
214        store.deep_compare(&other_store)
215    }
216
217    pub fn options(&self) -> &DocOptions {
218        &self.opts
219    }
220
221    pub fn guid(&self) -> &str {
222        self.opts.guid.as_str()
223    }
224
225    // TODO:
226    //   provide a better way instead of `_v1` methods
227    //   when implementing `v2` binary format
228    pub fn try_from_binary_v1<T: AsRef<[u8]>>(binary: T) -> JwstCodecResult<Self> {
229        Self::try_from_binary_v1_with_options(binary, DocOptions::default())
230    }
231
232    pub fn try_from_binary_v1_with_options<T: AsRef<[u8]>>(binary: T, options: DocOptions) -> JwstCodecResult<Self> {
233        let mut doc = Doc::with_options(options);
234        doc.apply_update_from_binary_v1(binary)?;
235        Ok(doc)
236    }
237
238    pub fn apply_update_from_binary_v1<T: AsRef<[u8]>>(&mut self, binary: T) -> JwstCodecResult {
239        let mut decoder = RawDecoder::new(binary.as_ref());
240        let update = Update::read(&mut decoder)?;
241        self.apply_update(update)
242    }
243
244    pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult {
245        let mut store = self.store.write().unwrap();
246        let mut retry = false;
247        loop {
248            for (mut s, offset) in update.iter(store.get_state_vector()) {
249                if let Node::Item(item) = &mut s {
250                    debug_assert!(item.is_owned());
251                    let mut item = unsafe { item.get_mut_unchecked() };
252                    store.repair(&mut item, self.store.clone())?;
253                }
254                store.integrate(s, offset, None)?;
255            }
256
257            for (client, range) in update.delete_set_iter(store.get_state_vector()) {
258                store.delete_range(client, range)?;
259            }
260
261            if let Some(mut pending_update) = store.pending.take() {
262                if pending_update
263                    .missing_state
264                    .iter()
265                    .any(|(client, clock)| *clock < store.get_state(*client))
266                {
267                    // new update has been applied to the doc, need to re-integrate
268                    retry = true;
269                }
270
271                for (client, range) in pending_update.delete_set_iter(store.get_state_vector()) {
272                    store.delete_range(client, range)?;
273                }
274
275                if update.is_pending_empty() {
276                    update = pending_update;
277                } else {
278                    // drain all pending state to pending update for later iteration
279                    update.drain_pending_state();
280                    Update::merge_into(&mut update, [pending_update]);
281                }
282            } else {
283                // no pending update at store
284
285                // no pending update in current iteration
286                // thank god, all clean
287                if update.is_pending_empty() {
288                    break;
289                } else {
290                    // need to turn all pending state into update for later iteration
291                    update.drain_pending_state();
292                    retry = false;
293                };
294            }
295
296            // can't integrate any more, save the pending update
297            if !retry {
298                if !update.is_empty() {
299                    store.pending.replace(update);
300                }
301                break;
302            }
303        }
304
305        if self.opts.gc {
306            store.optimize()?;
307        }
308
309        Ok(())
310    }
311
312    pub fn keys(&self) -> Vec<String> {
313        let store = self.store.read().unwrap();
314        store.types.keys().cloned().collect()
315    }
316
317    pub fn get_or_create_text<S: AsRef<str>>(&self, name: S) -> JwstCodecResult<Text> {
318        YTypeBuilder::new(self.store.clone())
319            .with_kind(YTypeKind::Text)
320            .set_name(name.as_ref().to_string())
321            .build()
322    }
323
324    pub fn create_text(&self) -> JwstCodecResult<Text> {
325        YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Text).build()
326    }
327
328    pub fn get_or_create_array<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Array> {
329        YTypeBuilder::new(self.store.clone())
330            .with_kind(YTypeKind::Array)
331            .set_name(str.as_ref().to_string())
332            .build()
333    }
334
335    pub fn create_array(&self) -> JwstCodecResult<Array> {
336        YTypeBuilder::new(self.store.clone())
337            .with_kind(YTypeKind::Array)
338            .build()
339    }
340
341    pub fn get_or_create_map<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Map> {
342        YTypeBuilder::new(self.store.clone())
343            .with_kind(YTypeKind::Map)
344            .set_name(str.as_ref().to_string())
345            .build()
346    }
347
348    pub fn create_map(&self) -> JwstCodecResult<Map> {
349        YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Map).build()
350    }
351
352    pub fn get_map(&self, str: &str) -> JwstCodecResult<Map> {
353        YTypeBuilder::new(self.store.clone())
354            .with_kind(YTypeKind::Map)
355            .set_name(str.to_string())
356            .build_exists()
357    }
358
359    pub fn encode_update_v1(&self) -> JwstCodecResult<Vec<u8>> {
360        self.encode_state_as_update_v1(&StateVector::default())
361    }
362
363    pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult<Vec<u8>> {
364        let update = self.encode_state_as_update(sv)?;
365
366        let mut encoder = RawEncoder::default();
367        update.write(&mut encoder)?;
368        Ok(encoder.into_inner())
369    }
370
371    pub fn encode_update(&self) -> JwstCodecResult<Update> {
372        self.encode_state_as_update(&StateVector::default())
373    }
374
375    pub fn encode_state_as_update(&self, sv: &StateVector) -> JwstCodecResult<Update> {
376        self.store.read().unwrap().diff_state_vector(sv, true)
377    }
378
379    pub fn get_state_vector(&self) -> StateVector {
380        self.store.read().unwrap().get_state_vector()
381    }
382
383    pub fn get_delete_sets(&self) -> DeleteSet {
384        self.store.read().unwrap().get_delete_sets()
385    }
386
387    #[cfg(feature = "events")]
388    pub fn subscribe(&self, cb: impl Fn(&[u8], &[History]) + Sync + Send + 'static) {
389        self.publisher.subscribe(cb);
390    }
391
392    #[cfg(feature = "events")]
393    pub fn unsubscribe_all(&self) {
394        self.publisher.unsubscribe_all();
395    }
396
397    #[cfg(feature = "events")]
398    pub fn subscribe_count(&self) -> usize {
399        self.publisher.count()
400    }
401
402    #[cfg(feature = "events")]
403    pub fn subscriber_count(&self) -> usize {
404        Arc::<DocPublisher>::strong_count(&self.publisher)
405    }
406
407    pub fn gc(&self) -> JwstCodecResult<()> {
408        self.store.write().unwrap().optimize()
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use yrs::{Array, Map, Options, Transact, types::ToJson, updates::decoder::Decode};
415
416    use super::*;
417
418    #[test]
419    fn test_encode_state_as_update() {
420        let yrs_options_left = Options::default();
421        let yrs_options_right = Options::default();
422
423        loom_model!({
424            let (binary, binary_new) = if cfg!(miri) {
425                let doc = Doc::new();
426
427                let mut map = doc.get_or_create_map("abc").unwrap();
428                map.insert("a".to_string(), 1).unwrap();
429                let binary = doc.encode_update_v1().unwrap();
430
431                let doc_new = Doc::new();
432                let mut array = doc_new.get_or_create_array("array").unwrap();
433                array.insert(0, "array_value").unwrap();
434                let binary_new = doc.encode_update_v1().unwrap();
435
436                (binary, binary_new)
437            } else {
438                let yrs_doc = yrs::Doc::with_options(yrs_options_left.clone());
439
440                let map = yrs_doc.get_or_insert_map("abc");
441                let mut trx = yrs_doc.transact_mut();
442                map.insert(&mut trx, "a", 1);
443                let binary = trx.encode_update_v1();
444
445                let yrs_doc_new = yrs::Doc::with_options(yrs_options_right.clone());
446                let array = yrs_doc_new.get_or_insert_array("array");
447                let mut trx = yrs_doc_new.transact_mut();
448                array.insert(&mut trx, 0, "array_value");
449                let binary_new = trx.encode_update_v1();
450
451                (binary, binary_new)
452            };
453
454            let mut doc = Doc::try_from_binary_v1(binary).unwrap();
455            let mut doc_new = Doc::try_from_binary_v1(binary_new).unwrap();
456
457            let diff_update = doc_new.encode_state_as_update_v1(&doc.get_state_vector()).unwrap();
458
459            let diff_update_reverse = doc.encode_state_as_update_v1(&doc_new.get_state_vector()).unwrap();
460
461            doc.apply_update_from_binary_v1(diff_update).unwrap();
462            doc_new.apply_update_from_binary_v1(diff_update_reverse).unwrap();
463
464            assert_eq!(doc.encode_update_v1().unwrap(), doc_new.encode_update_v1().unwrap());
465        });
466    }
467
468    #[test]
469    #[cfg_attr(any(miri, loom), ignore)]
470    fn test_array_create() {
471        let yrs_options = yrs::Options::default();
472
473        let json = serde_json::json!([42.0, -42.0, true, false, "hello", "world", [1.0]]);
474
475        {
476            let doc = yrs::Doc::with_options(yrs_options.clone());
477            let array = doc.get_or_insert_array("abc");
478            let mut trx = doc.transact_mut();
479            array.insert(&mut trx, 0, 42);
480            array.insert(&mut trx, 1, -42);
481            array.insert(&mut trx, 2, true);
482            array.insert(&mut trx, 3, false);
483            array.insert(&mut trx, 4, "hello");
484            array.insert(&mut trx, 5, "world");
485
486            let sub_array = yrs::ArrayPrelim::default();
487            let sub_array = array.insert(&mut trx, 6, sub_array);
488            sub_array.insert(&mut trx, 0, 1);
489
490            drop(trx);
491            let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
492                .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
493            assert_json_diff::assert_json_matches!(array.to_json(&doc.transact()), json, config);
494        };
495
496        let binary = {
497            let doc = Doc::new();
498            let mut array = doc.get_or_create_array("abc").unwrap();
499            array.insert(0, 42).unwrap();
500            array.insert(1, -42).unwrap();
501            array.insert(2, true).unwrap();
502            array.insert(3, false).unwrap();
503            array.insert(4, "hello").unwrap();
504            array.insert(5, "world").unwrap();
505
506            let mut sub_array = doc.create_array().unwrap();
507            array.insert(6, sub_array.clone()).unwrap();
508            // FIXME: array need insert first to compatible with yrs
509            sub_array.insert(0, 1).unwrap();
510
511            doc.encode_update_v1().unwrap()
512        };
513
514        let ydoc = yrs::Doc::with_options(yrs_options);
515        let array = ydoc.get_or_insert_array("abc");
516        let mut trx = ydoc.transact_mut();
517        trx.apply_update(yrs::Update::decode_v1(&binary).unwrap()).unwrap();
518
519        let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
520            .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
521        assert_json_diff::assert_json_matches!(array.to_json(&trx), json, config);
522
523        let mut doc = Doc::new();
524        let array = doc.get_or_create_array("abc").unwrap();
525        doc.apply_update_from_binary_v1(binary).unwrap();
526
527        let list = array.iter().collect::<Vec<_>>();
528
529        assert!(list.len() == 7);
530        assert!(matches!(list[6], Value::Array(_)));
531    }
532
533    #[test]
534    #[cfg(feature = "events")]
535    #[ignore = "inaccurate timing on ci, need for more accurate timing testing"]
536    fn test_subscribe() {
537        use crate::sync::{AtomicU8, Ordering};
538
539        loom_model!({
540            let doc = Doc::default();
541            let doc_clone = doc.clone();
542
543            let count = Arc::new(AtomicU8::new(0));
544            let count_clone1 = count.clone();
545            let count_clone2 = count.clone();
546            doc.subscribe(move |_, _| {
547                count_clone1.fetch_add(1, Ordering::SeqCst);
548            });
549
550            doc_clone.subscribe(move |_, _| {
551                count_clone2.fetch_add(1, Ordering::SeqCst);
552            });
553
554            doc_clone.get_or_create_array("abc").unwrap().insert(0, 42).unwrap();
555
556            // wait observer, cycle once every 100mm
557            std::thread::sleep(std::time::Duration::from_millis(200));
558
559            assert_eq!(count.load(Ordering::SeqCst), 2);
560        });
561    }
562
563    #[test]
564    fn test_repeated_applied_pending_update() {
565        // generate a pending update
566        // update: [1, 1, 1, 0, 39, 1, 4, 116, 101, 115, 116, 3, 109, 97, 112, 1, 0]
567        // update: [1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95,
568        // 107, 101, 121, 1, 119, 13, 115, 117, 98, 95, 109, 97, 112, 95, 118, 97, 108,
569        // 117, 101, 0]
570        // {
571        //     let doc1 = Doc::default();
572
573        //     doc1.subscribe(|update| {
574        //         println!("update: {:?}", update);
575        //     });
576
577        //     let mut map = doc1.get_or_create_map("test").unwrap();
578        //     std::thread::sleep(std::time::Duration::from_millis(500));
579
580        //     let mut sub_map = doc1.create_map().unwrap();
581        //     map.insert("map", sub_map.clone()).unwrap();
582        //     std::thread::sleep(std::time::Duration::from_millis(500));
583
584        //     sub_map.insert("sub_map_key", "sub_map_value").unwrap();
585        //     std::thread::sleep(std::time::Duration::from_millis(500));
586        // }
587
588        loom_model!({
589            let mut doc = Doc::default();
590
591            doc.apply_update_from_binary_v1(vec![
592                1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
593                98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
594            ])
595            .unwrap();
596
597            let pending_size = doc
598                .store
599                .read()
600                .unwrap()
601                .pending
602                .as_ref()
603                .unwrap()
604                .structs
605                .iter()
606                .map(|s| s.1.len())
607                .sum::<usize>();
608            doc.apply_update_from_binary_v1(vec![
609                1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
610                98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
611            ])
612            .unwrap();
613
614            // pending nodes should not grow up after apply same pending update
615            assert_eq!(
616                pending_size,
617                doc.store
618                    .read()
619                    .unwrap()
620                    .pending
621                    .as_ref()
622                    .unwrap()
623                    .structs
624                    .iter()
625                    .map(|s| s.1.len())
626                    .sum::<usize>()
627            );
628        });
629    }
630
631    #[test]
632    fn test_update_from_vec_ref() {
633        loom_model!({
634            let doc = Doc::new();
635
636            let mut text = doc.get_or_create_text("text").unwrap();
637            text.insert(0, "hello world").unwrap();
638
639            let update = doc.encode_update_v1().unwrap();
640
641            let doc = Doc::try_from_binary_v1(update).unwrap();
642            let text = doc.get_or_create_text("text").unwrap();
643
644            assert_eq!(&text.to_string(), "hello world");
645        });
646    }
647
648    #[test]
649    #[cfg_attr(any(miri, loom), ignore)]
650    fn test_apply_update() {
651        let updates = [
652            include_bytes!("../fixtures/basic.bin").to_vec(),
653            include_bytes!("../fixtures/database.bin").to_vec(),
654            include_bytes!("../fixtures/large.bin").to_vec(),
655            include_bytes!("../fixtures/with-subdoc.bin").to_vec(),
656            include_bytes!("../fixtures/edge-case-left-right-same-node.bin").to_vec(),
657        ];
658
659        for update in updates {
660            let mut doc = Doc::new();
661            doc.apply_update_from_binary_v1(&update).unwrap();
662        }
663    }
664}