Skip to main content

y_octo/doc/
document.rs

1#[cfg(feature = "events")]
2use publisher::DocPublisher;
3
4use super::{
5    history::StoreHistory,
6    store::{ChangedTypeRefs, StoreRef},
7    *,
8};
9use crate::sync::{Arc, RwLock};
10
11#[cfg(feature = "debug")]
12#[derive(Debug, Clone)]
13pub struct DocStoreStatus {
14    pub nodes: usize,
15    pub delete_sets: usize,
16    pub types: usize,
17    pub dangling_types: usize,
18    pub pending_nodes: usize,
19}
20
21/// [DocOptions] used to create a new [Doc]
22///
23/// ```
24/// use y_octo::DocOptions;
25///
26/// let doc = DocOptions::new()
27///     .with_client_id(1)
28///     .with_guid("guid".into())
29///     .auto_gc(true)
30///     .build();
31///
32/// assert_eq!(doc.guid(), "guid")
33/// ```
34#[derive(Clone, Debug)]
35pub struct DocOptions {
36    pub guid: String,
37    pub client_id: u64,
38    pub gc: bool,
39}
40
41impl Default for DocOptions {
42    fn default() -> Self {
43        if cfg!(any(test, feature = "bench")) {
44            Self {
45                client_id: 1,
46                guid: "test".into(),
47                gc: true,
48            }
49        } else {
50            Self {
51                client_id: prefer_small_random(),
52                guid: nanoid::nanoid!(),
53                gc: true,
54            }
55        }
56    }
57}
58
59impl DocOptions {
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    pub fn with_client_id(mut self, client_id: u64) -> Self {
65        self.client_id = client_id;
66        self
67    }
68
69    pub fn with_guid(mut self, guid: String) -> Self {
70        self.guid = guid;
71        self
72    }
73
74    pub fn auto_gc(mut self, gc: bool) -> Self {
75        self.gc = gc;
76        self
77    }
78
79    pub fn build(self) -> Doc {
80        Doc::with_options(self)
81    }
82}
83
84impl From<DocOptions> for Any {
85    fn from(value: DocOptions) -> Self {
86        Any::Object(HashMap::from_iter([
87            ("gc".into(), value.gc.into()),
88            ("guid".into(), value.guid.into()),
89        ]))
90    }
91}
92
93impl TryFrom<Any> for DocOptions {
94    type Error = JwstCodecError;
95
96    fn try_from(value: Any) -> Result<Self, Self::Error> {
97        match value {
98            Any::Object(map) => {
99                let mut options = DocOptions::default();
100                for (key, value) in map {
101                    match key.as_str() {
102                        "gc" => {
103                            options.gc = bool::try_from(value)?;
104                        }
105                        "guid" => {
106                            options.guid = String::try_from(value)?;
107                        }
108                        _ => {}
109                    }
110                }
111
112                Ok(options)
113            }
114            _ => Err(JwstCodecError::UnexpectedType("Object")),
115        }
116    }
117}
118
119#[derive(Debug, Clone)]
120pub struct Doc {
121    client_id: u64,
122    opts: DocOptions,
123
124    pub(crate) store: StoreRef,
125    #[cfg(feature = "events")]
126    pub publisher: Arc<DocPublisher>,
127    pub(crate) batch: Somr<Batch>,
128}
129
130unsafe impl Send for Doc {}
131unsafe impl Sync for Doc {}
132
133impl Default for Doc {
134    fn default() -> Self {
135        Doc::new()
136    }
137}
138
139impl PartialEq for Doc {
140    fn eq(&self, other: &Self) -> bool {
141        self.client_id == other.client_id
142    }
143}
144
145impl Doc {
146    pub fn new() -> Self {
147        Self::with_options(DocOptions::default())
148    }
149
150    pub fn with_options(options: DocOptions) -> Self {
151        let store = Arc::new(RwLock::new(DocStore::with_client(options.client_id)));
152        #[cfg(feature = "events")]
153        let publisher = Arc::new(DocPublisher::new(store.clone()));
154
155        Self {
156            client_id: options.client_id,
157            opts: options,
158            store,
159            #[cfg(feature = "events")]
160            publisher,
161            batch: Somr::none(),
162        }
163    }
164
165    pub fn with_client(client_id: u64) -> Self {
166        DocOptions::new().with_client_id(client_id).build()
167    }
168
169    pub fn client(&self) -> Client {
170        self.client_id
171    }
172
173    pub fn set_client(&mut self, client_id: u64) {
174        self.client_id = client_id;
175    }
176
177    pub fn renew_client(&mut self) {
178        self.client_id = prefer_small_random();
179    }
180
181    pub fn clients(&self) -> Vec<u64> {
182        self.store.read().unwrap().clients()
183    }
184
185    pub fn history(&self) -> StoreHistory {
186        let history = StoreHistory::new(&self.store);
187        history.resolve();
188        history
189    }
190
191    #[cfg(feature = "debug")]
192    pub fn store_status(&self) -> DocStoreStatus {
193        let store = self.store.read().unwrap();
194
195        DocStoreStatus {
196            nodes: store.total_nodes(),
197            delete_sets: store.total_delete_sets(),
198            types: store.total_types(),
199            dangling_types: store.total_dangling_types(),
200            pending_nodes: store.total_pending_nodes(),
201        }
202    }
203
204    pub(crate) fn get_changed(&self) -> ChangedTypeRefs {
205        self.store.write().unwrap().get_changed()
206    }
207
208    pub fn store_compare(&self, other: &Doc) -> bool {
209        let store = self.store.read().unwrap();
210        let other_store = other.store.read().unwrap();
211
212        store.deep_compare(&other_store)
213    }
214
215    pub fn options(&self) -> &DocOptions {
216        &self.opts
217    }
218
219    pub fn guid(&self) -> &str {
220        self.opts.guid.as_str()
221    }
222
223    // TODO:
224    //   provide a better way instead of `_v1` methods
225    //   when implementing `v2` binary format
226    pub fn try_from_binary_v1<T: AsRef<[u8]>>(binary: T) -> JwstCodecResult<Self> {
227        Self::try_from_binary_v1_with_options(binary, DocOptions::default())
228    }
229
230    pub fn try_from_binary_v1_with_options<T: AsRef<[u8]>>(binary: T, options: DocOptions) -> JwstCodecResult<Self> {
231        let mut doc = Doc::with_options(options);
232        doc.apply_update_from_binary_v1(binary)?;
233        Ok(doc)
234    }
235
236    pub fn apply_update_from_binary_v1<T: AsRef<[u8]>>(&mut self, binary: T) -> JwstCodecResult {
237        let mut decoder = RawDecoder::new(binary.as_ref());
238        let update = Update::read(&mut decoder)?;
239        self.apply_update(update)
240    }
241
242    pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult {
243        let mut store = self.store.write().unwrap();
244        let mut retry = false;
245
246        loop {
247            // clone every time to avoid ref count issue
248            let pending_types = update
249                .structs
250                .values()
251                .flatten()
252                .filter_map(|n| {
253                    if let Node::Item(item_ref) = n
254                        && let Some(item) = item_ref.get()
255                        && let Content::Type(ty) = &item.content
256                    {
257                        Some((item.id, ty.clone()))
258                    } else {
259                        None
260                    }
261                })
262                .collect();
263            for (mut s, offset) in update.iter(store.get_state_vector()) {
264                if let Node::Item(item) = &mut s {
265                    debug_assert!(item.is_owned());
266                    let mut item = unsafe { item.get_mut_unchecked() };
267                    store.repair(&mut item, self.store.clone(), &pending_types)?;
268                }
269                store.integrate(s, offset, None)?;
270            }
271
272            for (client, range) in update.delete_set_iter(store.get_state_vector()) {
273                store.delete_range(client, range)?;
274            }
275
276            if let Some(mut pending_update) = store.pending.take() {
277                if pending_update
278                    .missing_state
279                    .iter()
280                    .any(|(client, clock)| *clock < store.get_state(*client))
281                {
282                    // new update has been applied to the doc, need to re-integrate
283                    retry = true;
284                }
285
286                for (client, range) in pending_update.delete_set_iter(store.get_state_vector()) {
287                    store.delete_range(client, range)?;
288                }
289
290                if update.is_pending_empty() {
291                    update = pending_update;
292                } else {
293                    // drain all pending state to pending update for later iteration
294                    update.drain_pending_state();
295                    Update::merge_into(&mut update, [pending_update]);
296                }
297            } else {
298                // no pending update at store
299
300                // no pending update in current iteration
301                // thank god, all clean
302                if update.is_pending_empty() {
303                    break;
304                } else {
305                    // need to turn all pending state into update for later iteration
306                    update.drain_pending_state();
307                    retry = false;
308                };
309            }
310
311            // can't integrate any more, save the pending update
312            if !retry {
313                if !update.is_empty() {
314                    store.pending.replace(update);
315                }
316                break;
317            }
318        }
319
320        if self.opts.gc {
321            store.optimize()?;
322        }
323
324        Ok(())
325    }
326
327    pub fn keys(&self) -> Vec<String> {
328        let store = self.store.read().unwrap();
329        store.types.keys().cloned().collect()
330    }
331
332    pub fn get_or_create_text<S: AsRef<str>>(&self, name: S) -> JwstCodecResult<Text> {
333        YTypeBuilder::new(self.store.clone())
334            .with_kind(YTypeKind::Text)
335            .set_name(name.as_ref().to_string())
336            .build()
337    }
338
339    pub fn create_text(&self) -> JwstCodecResult<Text> {
340        YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Text).build()
341    }
342
343    pub fn get_or_create_array<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Array> {
344        YTypeBuilder::new(self.store.clone())
345            .with_kind(YTypeKind::Array)
346            .set_name(str.as_ref().to_string())
347            .build()
348    }
349
350    pub fn create_array(&self) -> JwstCodecResult<Array> {
351        YTypeBuilder::new(self.store.clone())
352            .with_kind(YTypeKind::Array)
353            .build()
354    }
355
356    pub fn get_or_create_map<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Map> {
357        YTypeBuilder::new(self.store.clone())
358            .with_kind(YTypeKind::Map)
359            .set_name(str.as_ref().to_string())
360            .build()
361    }
362
363    pub fn create_map(&self) -> JwstCodecResult<Map> {
364        YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Map).build()
365    }
366
367    pub fn get_map(&self, str: &str) -> JwstCodecResult<Map> {
368        YTypeBuilder::new(self.store.clone())
369            .with_kind(YTypeKind::Map)
370            .set_name(str.to_string())
371            .build_exists()
372    }
373
374    pub fn encode_update_v1(&self) -> JwstCodecResult<Vec<u8>> {
375        self.encode_state_as_update_v1(&StateVector::default())
376    }
377
378    pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult<Vec<u8>> {
379        let update = self.encode_state_as_update(sv)?;
380
381        let mut encoder = RawEncoder::default();
382        update.write(&mut encoder)?;
383        Ok(encoder.into_inner())
384    }
385
386    pub fn encode_update(&self) -> JwstCodecResult<Update> {
387        self.encode_state_as_update(&StateVector::default())
388    }
389
390    pub fn encode_state_as_update(&self, sv: &StateVector) -> JwstCodecResult<Update> {
391        self.store.read().unwrap().diff_state_vector(sv, true)
392    }
393
394    pub fn get_state_vector(&self) -> StateVector {
395        self.store.read().unwrap().get_state_vector()
396    }
397
398    pub fn get_delete_sets(&self) -> DeleteSet {
399        self.store.read().unwrap().get_delete_sets()
400    }
401
402    #[cfg(feature = "events")]
403    pub fn subscribe(&self, cb: impl Fn(&[u8], &[History]) + Sync + Send + 'static) {
404        self.publisher.subscribe(cb);
405    }
406
407    #[cfg(feature = "events")]
408    pub fn unsubscribe_all(&self) {
409        self.publisher.unsubscribe_all();
410    }
411
412    #[cfg(feature = "events")]
413    pub fn subscribe_count(&self) -> usize {
414        self.publisher.count()
415    }
416
417    #[cfg(feature = "events")]
418    pub fn subscriber_count(&self) -> usize {
419        Arc::<DocPublisher>::strong_count(&self.publisher)
420    }
421
422    pub fn gc(&self) -> JwstCodecResult<()> {
423        self.store.write().unwrap().optimize()
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use yrs::{Array, Map, Options, Transact, types::ToJson, updates::decoder::Decode};
430
431    use super::*;
432
433    #[test]
434    fn test_encode_state_as_update() {
435        let yrs_options_left = Options::default();
436        let yrs_options_right = Options::default();
437
438        loom_model!({
439            let (binary, binary_new) = if cfg!(miri) {
440                let doc = Doc::new();
441
442                let mut map = doc.get_or_create_map("abc").unwrap();
443                map.insert("a".to_string(), 1).unwrap();
444                let binary = doc.encode_update_v1().unwrap();
445
446                let doc_new = Doc::new();
447                let mut array = doc_new.get_or_create_array("array").unwrap();
448                array.insert(0, "array_value").unwrap();
449                let binary_new = doc.encode_update_v1().unwrap();
450
451                (binary, binary_new)
452            } else {
453                let yrs_doc = yrs::Doc::with_options(yrs_options_left.clone());
454
455                let map = yrs_doc.get_or_insert_map("abc");
456                let mut trx = yrs_doc.transact_mut();
457                map.insert(&mut trx, "a", 1);
458                let binary = trx.encode_update_v1();
459
460                let yrs_doc_new = yrs::Doc::with_options(yrs_options_right.clone());
461                let array = yrs_doc_new.get_or_insert_array("array");
462                let mut trx = yrs_doc_new.transact_mut();
463                array.insert(&mut trx, 0, "array_value");
464                let binary_new = trx.encode_update_v1();
465
466                (binary, binary_new)
467            };
468
469            let mut doc = Doc::try_from_binary_v1(binary).unwrap();
470            let mut doc_new = Doc::try_from_binary_v1(binary_new).unwrap();
471
472            let diff_update = doc_new.encode_state_as_update_v1(&doc.get_state_vector()).unwrap();
473
474            let diff_update_reverse = doc.encode_state_as_update_v1(&doc_new.get_state_vector()).unwrap();
475
476            doc.apply_update_from_binary_v1(diff_update).unwrap();
477            doc_new.apply_update_from_binary_v1(diff_update_reverse).unwrap();
478
479            assert_eq!(doc.encode_update_v1().unwrap(), doc_new.encode_update_v1().unwrap());
480        });
481    }
482
483    #[test]
484    #[cfg_attr(any(miri, loom), ignore)]
485    fn test_array_create() {
486        let yrs_options = yrs::Options::default();
487
488        let json = serde_json::json!([42.0, -42.0, true, false, "hello", "world", [1.0]]);
489
490        {
491            let doc = yrs::Doc::with_options(yrs_options.clone());
492            let array = doc.get_or_insert_array("abc");
493            let mut trx = doc.transact_mut();
494            array.insert(&mut trx, 0, 42);
495            array.insert(&mut trx, 1, -42);
496            array.insert(&mut trx, 2, true);
497            array.insert(&mut trx, 3, false);
498            array.insert(&mut trx, 4, "hello");
499            array.insert(&mut trx, 5, "world");
500
501            let sub_array = yrs::ArrayPrelim::default();
502            let sub_array = array.insert(&mut trx, 6, sub_array);
503            sub_array.insert(&mut trx, 0, 1);
504
505            drop(trx);
506            let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
507                .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
508            assert_json_diff::assert_json_matches!(array.to_json(&doc.transact()), json, config);
509        };
510
511        {
512            let binary = {
513                let doc = Doc::new();
514                let mut array = doc.get_or_create_array("abc").unwrap();
515                array.insert(0, 42).unwrap();
516                array.insert(1, -42).unwrap();
517                array.insert(2, true).unwrap();
518                array.insert(3, false).unwrap();
519                array.insert(4, "hello").unwrap();
520                array.insert(5, "world").unwrap();
521
522                let mut sub_array = doc.create_array().unwrap();
523                array.insert(6, sub_array.clone()).unwrap();
524                // FIXME: array need insert first to compatible with yrs
525                sub_array.insert(0, 1).unwrap();
526
527                doc.encode_update_v1().unwrap()
528            };
529
530            let ydoc = yrs::Doc::with_options(yrs_options);
531            let array = ydoc.get_or_insert_array("abc");
532            let mut trx = ydoc.transact_mut();
533            trx.apply_update(yrs::Update::decode_v1(&binary).unwrap()).unwrap();
534
535            let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
536                .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
537            assert_json_diff::assert_json_matches!(array.to_json(&trx), json, config);
538
539            let mut doc = Doc::new();
540            let array = doc.get_or_create_array("abc").unwrap();
541            doc.apply_update_from_binary_v1(binary).unwrap();
542
543            let list = array.iter().collect::<Vec<_>>();
544
545            assert!(list.len() == 7);
546            assert!(matches!(list[6], Value::Array(_)));
547        }
548
549        {
550            let binary_detached = {
551                let doc = Doc::new();
552                let mut array = doc.get_or_create_array("abc").unwrap();
553                array.insert(0, 42).unwrap();
554                array.insert(1, -42).unwrap();
555                array.insert(2, true).unwrap();
556                array.insert(3, false).unwrap();
557                array.insert(4, "hello").unwrap();
558                array.insert(5, "world").unwrap();
559
560                let mut sub_array = doc.create_array().unwrap();
561                sub_array.insert(0, 1).unwrap();
562                array.insert(6, sub_array.clone()).unwrap();
563
564                doc.encode_update_v1().unwrap()
565            };
566
567            let detached_doc = Doc::try_from_binary_v1(binary_detached).unwrap();
568            let detached_array = detached_doc.get_or_create_array("abc").unwrap();
569            let detached_sub_array = match detached_array.get(6).unwrap() {
570                Value::Array(arr) => arr,
571                _ => panic!("expected array at index 6"),
572            };
573            assert_eq!(detached_sub_array.get(0).unwrap(), Value::Any(1.0.into()));
574        }
575    }
576
577    #[test]
578    #[cfg(feature = "events")]
579    #[ignore = "inaccurate timing on ci, need for more accurate timing testing"]
580    fn test_subscribe() {
581        use crate::sync::{AtomicU8, Ordering};
582
583        loom_model!({
584            let doc = Doc::default();
585            let doc_clone = doc.clone();
586
587            let count = Arc::new(AtomicU8::new(0));
588            let count_clone1 = count.clone();
589            let count_clone2 = count.clone();
590            doc.subscribe(move |_, _| {
591                count_clone1.fetch_add(1, Ordering::SeqCst);
592            });
593
594            doc_clone.subscribe(move |_, _| {
595                count_clone2.fetch_add(1, Ordering::SeqCst);
596            });
597
598            doc_clone.get_or_create_array("abc").unwrap().insert(0, 42).unwrap();
599
600            // wait observer, cycle once every 100mm
601            std::thread::sleep(std::time::Duration::from_millis(200));
602
603            assert_eq!(count.load(Ordering::SeqCst), 2);
604        });
605    }
606
607    #[test]
608    fn test_repeated_applied_pending_update() {
609        // generate a pending update
610        // update: [1, 1, 1, 0, 39, 1, 4, 116, 101, 115, 116, 3, 109, 97, 112, 1, 0]
611        // update: [1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95,
612        // 107, 101, 121, 1, 119, 13, 115, 117, 98, 95, 109, 97, 112, 95, 118, 97, 108,
613        // 117, 101, 0]
614        // {
615        //     let doc1 = Doc::default();
616
617        //     doc1.subscribe(|update| {
618        //         println!("update: {:?}", update);
619        //     });
620
621        //     let mut map = doc1.get_or_create_map("test").unwrap();
622        //     std::thread::sleep(std::time::Duration::from_millis(500));
623
624        //     let mut sub_map = doc1.create_map().unwrap();
625        //     map.insert("map", sub_map.clone()).unwrap();
626        //     std::thread::sleep(std::time::Duration::from_millis(500));
627
628        //     sub_map.insert("sub_map_key", "sub_map_value").unwrap();
629        //     std::thread::sleep(std::time::Duration::from_millis(500));
630        // }
631
632        loom_model!({
633            let mut doc = Doc::default();
634
635            doc.apply_update_from_binary_v1(vec![
636                1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
637                98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
638            ])
639            .unwrap();
640
641            let pending_size = doc
642                .store
643                .read()
644                .unwrap()
645                .pending
646                .as_ref()
647                .unwrap()
648                .structs
649                .iter()
650                .map(|s| s.1.len())
651                .sum::<usize>();
652            doc.apply_update_from_binary_v1(vec![
653                1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
654                98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
655            ])
656            .unwrap();
657
658            // pending nodes should not grow up after apply same pending update
659            assert_eq!(
660                pending_size,
661                doc.store
662                    .read()
663                    .unwrap()
664                    .pending
665                    .as_ref()
666                    .unwrap()
667                    .structs
668                    .iter()
669                    .map(|s| s.1.len())
670                    .sum::<usize>()
671            );
672        });
673    }
674
675    #[test]
676    fn test_update_from_vec_ref() {
677        loom_model!({
678            let doc = Doc::new();
679
680            let mut text = doc.get_or_create_text("text").unwrap();
681            text.insert(0, "hello world").unwrap();
682
683            let mut root = doc.get_or_create_map("root").unwrap();
684            let mut child = doc.create_map().unwrap();
685            child.insert("k".to_string(), "v").unwrap();
686            root.insert("child".to_string(), child.clone()).unwrap();
687
688            let update = doc.encode_update_v1().unwrap();
689
690            let doc = Doc::try_from_binary_v1(update).unwrap();
691            let text = doc.get_or_create_text("text").unwrap();
692
693            assert_eq!(&text.to_string(), "hello world");
694
695            let root = doc.get_or_create_map("root").unwrap();
696            if let Some(Value::Map(child)) = root.get("child") {
697                assert!(
698                    matches!(child.get("k"), Some(Value::Any(Any::String(s))) if s == "v"),
699                    "expected nested map value to survive apply_update"
700                );
701            } else {
702                panic!("expected nested map to survive apply_update");
703            }
704        });
705    }
706
707    #[test]
708    #[cfg_attr(any(miri, loom), ignore)]
709    fn test_apply_update() {
710        let updates = [
711            include_bytes!("../fixtures/basic.bin").to_vec(),
712            include_bytes!("../fixtures/database.bin").to_vec(),
713            include_bytes!("../fixtures/large.bin").to_vec(),
714            include_bytes!("../fixtures/with-subdoc.bin").to_vec(),
715            include_bytes!("../fixtures/edge-case-left-right-same-node.bin").to_vec(),
716        ];
717
718        for update in updates {
719            let mut doc = Doc::new();
720            doc.apply_update_from_binary_v1(&update).unwrap();
721        }
722    }
723}