y_octo/doc/
document.rs

1use super::{publisher::DocPublisher, store::StoreRef, *};
2use crate::sync::{Arc, RwLock};
3
4#[derive(Clone, Default)]
5pub struct DocOptions {
6    pub guid: Option<String>,
7    pub client: Option<u64>,
8}
9
10#[derive(Debug, Clone)]
11pub struct Doc {
12    client_id: u64,
13    // random id for each doc, use in sub doc
14    // TODO: use function in code
15    #[allow(dead_code)]
16    guid: String,
17    pub(super) store: StoreRef,
18    pub publisher: Arc<DocPublisher>,
19}
20
21unsafe impl Send for Doc {}
22unsafe impl Sync for Doc {}
23
24impl Default for Doc {
25    fn default() -> Self {
26        let client_id = rand::random();
27        let store = Arc::new(RwLock::new(DocStore::with_client(client_id)));
28        let publisher = Arc::new(DocPublisher::new(store.clone()));
29
30        Self {
31            client_id,
32            guid: nanoid!(),
33            store,
34            publisher,
35        }
36    }
37}
38
39impl PartialEq for Doc {
40    fn eq(&self, other: &Self) -> bool {
41        self.client_id == other.client_id
42    }
43}
44
45impl Doc {
46    pub fn with_options(options: DocOptions) -> Self {
47        let client = options.client.unwrap_or_else(rand::random);
48        let store = Arc::new(RwLock::new(DocStore::with_client(client)));
49        let publisher = Arc::new(DocPublisher::new(store.clone()));
50
51        Self {
52            client_id: client,
53            store,
54            guid: options.guid.unwrap_or_else(|| nanoid!()),
55            publisher,
56        }
57    }
58
59    pub fn with_client(client_id: u64) -> Self {
60        let store = Arc::new(RwLock::new(DocStore::with_client(client_id)));
61        let publisher = Arc::new(DocPublisher::new(store.clone()));
62        Self {
63            client_id,
64            store,
65            guid: nanoid!(),
66            publisher,
67        }
68    }
69
70    pub fn client(&self) -> Client {
71        self.client_id
72    }
73
74    pub fn guid(&self) -> &str {
75        self.guid.as_str()
76    }
77
78    pub fn new_from_binary(binary: Vec<u8>) -> JwstCodecResult<Self> {
79        let mut doc = Doc::default();
80        doc.apply_update_from_binary(binary)?;
81        Ok(doc)
82    }
83
84    pub fn new_from_binary_with_options(binary: Vec<u8>, options: DocOptions) -> JwstCodecResult<Self> {
85        let mut doc = Doc::with_options(options);
86        doc.apply_update_from_binary(binary)?;
87        Ok(doc)
88    }
89
90    pub fn apply_update_from_binary(&mut self, update: Vec<u8>) -> JwstCodecResult {
91        let mut decoder = RawDecoder::new(update);
92        let update = Update::read(&mut decoder)?;
93        self.apply_update(update)?;
94        Ok(())
95    }
96
97    pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult {
98        let mut store = self.store.write().unwrap();
99        let mut retry = false;
100        loop {
101            for (mut s, offset) in update.iter(store.get_state_vector()) {
102                if let Node::Item(item) = &mut s {
103                    debug_assert!(item.is_owned());
104                    let item = unsafe { item.get_mut_unchecked() };
105                    store.repair(item, self.store.clone())?;
106                }
107                store.integrate(s, offset, None)?;
108            }
109
110            for (client, range) in update.delete_set_iter(store.get_state_vector()) {
111                store.delete_range(client, range)?;
112            }
113
114            if let Some(mut pending_update) = store.pending.take() {
115                if pending_update
116                    .missing_state
117                    .iter()
118                    .any(|(client, clock)| store.get_state(*client) > *clock)
119                {
120                    // new update has been applied to the doc, need to re-integrate
121                    retry = true;
122                }
123
124                for (client, range) in pending_update.delete_set_iter(store.get_state_vector()) {
125                    store.delete_range(client, range)?;
126                }
127
128                if update.is_pending_empty() {
129                    update = pending_update;
130                } else {
131                    // drain all pending state to pending update for later iteration
132                    update.drain_pending_state();
133                    Update::merge_into(&mut update, [pending_update]);
134                }
135            } else {
136                // no pending update at store
137
138                // no pending update in current iteration
139                // thank god, all clean
140                if update.is_pending_empty() {
141                    break;
142                } else {
143                    // need to turn all pending state into update for later iteration
144                    update.drain_pending_state();
145                };
146            }
147
148            // can't integrate any more, save the pending update
149            if !retry {
150                if !update.is_empty() {
151                    store.pending.replace(update);
152                }
153                break;
154            }
155        }
156
157        Ok(())
158    }
159
160    pub fn keys(&self) -> Vec<String> {
161        let store = self.store.read().unwrap();
162        store.types.keys().cloned().collect()
163    }
164
165    pub fn get_or_create_text(&self, name: &str) -> JwstCodecResult<Text> {
166        YTypeBuilder::new(self.store.clone())
167            .with_kind(YTypeKind::Text)
168            .set_name(name.to_string())
169            .build()
170    }
171
172    pub fn create_text(&self) -> JwstCodecResult<Text> {
173        YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Text).build()
174    }
175
176    pub fn get_or_create_array(&self, str: &str) -> JwstCodecResult<Array> {
177        YTypeBuilder::new(self.store.clone())
178            .with_kind(YTypeKind::Array)
179            .set_name(str.to_string())
180            .build()
181    }
182
183    pub fn create_array(&self) -> JwstCodecResult<Array> {
184        YTypeBuilder::new(self.store.clone())
185            .with_kind(YTypeKind::Array)
186            .build()
187    }
188
189    pub fn get_or_create_map(&self, str: &str) -> JwstCodecResult<Map> {
190        YTypeBuilder::new(self.store.clone())
191            .with_kind(YTypeKind::Map)
192            .set_name(str.to_string())
193            .build()
194    }
195
196    pub fn create_map(&self) -> JwstCodecResult<Map> {
197        YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Map).build()
198    }
199
200    pub fn get_map(&self, str: &str) -> JwstCodecResult<Map> {
201        YTypeBuilder::new(self.store.clone())
202            .with_kind(YTypeKind::Map)
203            .set_name(str.to_string())
204            .build_exists()
205    }
206
207    pub fn encode_update_v1(&self) -> JwstCodecResult<Vec<u8>> {
208        self.encode_state_as_update_v1(&StateVector::default())
209    }
210
211    pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult<Vec<u8>> {
212        let update = self.store.read().unwrap().diff_state_vector(sv)?;
213
214        let mut encoder = RawEncoder::default();
215        update.write(&mut encoder)?;
216        Ok(encoder.into_inner())
217    }
218
219    pub fn get_state_vector(&self) -> StateVector {
220        self.store.read().unwrap().get_state_vector()
221    }
222
223    pub fn subscribe(&self, cb: impl Fn(&[u8]) + Sync + Send + 'static) {
224        self.publisher.subscribe(cb);
225    }
226
227    pub fn unsubscribe_all(&self) {
228        self.publisher.unsubscribe_all();
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use yrs::{types::ToJson, updates::decoder::Decode, Array, Map, Options, Transact};
235
236    use super::*;
237    use crate::sync::{AtomicU8, Ordering};
238
239    #[test]
240    #[cfg_attr(miri, ignore)]
241    fn test_double_run_with_yrs_basic() {
242        let yrs_doc = yrs::Doc::new();
243
244        let map = yrs_doc.get_or_insert_map("abc");
245        let mut trx = yrs_doc.transact_mut();
246        map.insert(&mut trx, "a", 1).unwrap();
247
248        let binary_from_yrs = trx.encode_update_v1().unwrap();
249
250        let options = DocOptions {
251            client: Some(rand::random()),
252            guid: Some(nanoid::nanoid!()),
253        };
254
255        loom_model!({
256            let doc = Doc::new_from_binary_with_options(binary_from_yrs.clone(), options.clone()).unwrap();
257            let binary = doc.encode_update_v1().unwrap();
258
259            assert_eq!(binary_from_yrs, binary);
260        });
261    }
262
263    #[test]
264    fn test_encode_state_as_update() {
265        let options_left = DocOptions {
266            client: Some(rand::random()),
267            guid: Some(nanoid::nanoid!()),
268        };
269        let options_right = DocOptions {
270            client: Some(rand::random()),
271            guid: Some(nanoid::nanoid!()),
272        };
273
274        let yrs_options_left = Options {
275            client_id: rand::random(),
276            guid: nanoid::nanoid!().into(),
277            ..Default::default()
278        };
279
280        let yrs_options_right = Options {
281            client_id: rand::random(),
282            guid: nanoid::nanoid!().into(),
283            ..Default::default()
284        };
285
286        loom_model!({
287            let (binary, binary_new) = if cfg!(miri) {
288                let doc = Doc::with_options(options_left.clone());
289
290                let mut map = doc.get_or_create_map("abc").unwrap();
291                map.insert("a", 1).unwrap();
292                let binary = doc.encode_update_v1().unwrap();
293
294                let doc_new = Doc::with_options(options_right.clone());
295                let mut array = doc_new.get_or_create_array("array").unwrap();
296                array.insert(0, "array_value").unwrap();
297                let binary_new = doc.encode_update_v1().unwrap();
298
299                (binary, binary_new)
300            } else {
301                let yrs_doc = yrs::Doc::with_options(yrs_options_left.clone());
302
303                let map = yrs_doc.get_or_insert_map("abc");
304                let mut trx = yrs_doc.transact_mut();
305                map.insert(&mut trx, "a", 1).unwrap();
306                let binary = trx.encode_update_v1().unwrap();
307
308                let yrs_doc_new = yrs::Doc::with_options(yrs_options_right.clone());
309                let array = yrs_doc_new.get_or_insert_array("array");
310                let mut trx = yrs_doc_new.transact_mut();
311                array.insert(&mut trx, 0, "array_value").unwrap();
312                let binary_new = trx.encode_update_v1().unwrap();
313
314                (binary, binary_new)
315            };
316
317            let mut doc = Doc::new_from_binary_with_options(binary.clone(), options_left.clone()).unwrap();
318            let mut doc_new = Doc::new_from_binary_with_options(binary_new.clone(), options_right.clone()).unwrap();
319
320            let diff_update = doc_new.encode_state_as_update_v1(&doc.get_state_vector()).unwrap();
321
322            let diff_update_reverse = doc.encode_state_as_update_v1(&doc_new.get_state_vector()).unwrap();
323
324            doc.apply_update_from_binary(diff_update).unwrap();
325            doc_new.apply_update_from_binary(diff_update_reverse).unwrap();
326
327            assert_eq!(doc.encode_update_v1().unwrap(), doc_new.encode_update_v1().unwrap());
328        });
329    }
330
331    #[test]
332    #[cfg_attr(any(miri, loom), ignore)]
333    fn test_array_create() {
334        let options = DocOptions {
335            client: Some(rand::random()),
336            guid: Some(nanoid::nanoid!()),
337        };
338
339        let yrs_options =
340            yrs::Options::with_guid_and_client_id(options.guid.clone().unwrap().into(), options.client.unwrap());
341
342        let json = serde_json::json!([42.0, -42.0, true, false, "hello", "world", [1.0]]);
343
344        {
345            let doc = yrs::Doc::with_options(yrs_options.clone());
346            let array = doc.get_or_insert_array("abc");
347            let mut trx = doc.transact_mut();
348            array.insert(&mut trx, 0, 42).unwrap();
349            array.insert(&mut trx, 1, -42).unwrap();
350            array.insert(&mut trx, 2, true).unwrap();
351            array.insert(&mut trx, 3, false).unwrap();
352            array.insert(&mut trx, 4, "hello").unwrap();
353            array.insert(&mut trx, 5, "world").unwrap();
354
355            let sub_array = yrs::ArrayPrelim::default();
356            let sub_array = array.insert(&mut trx, 6, sub_array).unwrap();
357            sub_array.insert(&mut trx, 0, 1).unwrap();
358
359            drop(trx);
360
361            assert_json_diff::assert_json_eq!(array.to_json(&doc.transact()), json);
362        };
363
364        let binary = {
365            let doc = Doc::with_options(options.clone());
366            let mut array = doc.get_or_create_array("abc").unwrap();
367            array.insert(0, 42).unwrap();
368            array.insert(1, -42).unwrap();
369            array.insert(2, true).unwrap();
370            array.insert(3, false).unwrap();
371            array.insert(4, "hello").unwrap();
372            array.insert(5, "world").unwrap();
373
374            let mut sub_array = doc.create_array().unwrap();
375            array.insert(6, sub_array.clone()).unwrap();
376            // FIXME: array need insert first to compatible with yrs
377            sub_array.insert(0, 1).unwrap();
378
379            doc.encode_update_v1().unwrap()
380        };
381
382        let ydoc = yrs::Doc::with_options(yrs_options);
383        let array = ydoc.get_or_insert_array("abc");
384        let mut trx = ydoc.transact_mut();
385        trx.apply_update(yrs::Update::decode_v1(&binary).unwrap());
386
387        assert_json_diff::assert_json_eq!(array.to_json(&trx), json);
388
389        let mut doc = Doc::with_options(options);
390        let array = doc.get_or_create_array("abc").unwrap();
391        doc.apply_update_from_binary(binary).unwrap();
392
393        let list = array.iter().collect::<Vec<_>>();
394
395        assert!(list.len() == 7);
396        assert!(matches!(list[6], Value::Array(_)));
397    }
398
399    #[test]
400    #[ignore = "inaccurate timing on ci, need for more accurate timing testing"]
401    fn test_subscribe() {
402        loom_model!({
403            let doc = Doc::default();
404            let doc_clone = doc.clone();
405
406            let count = Arc::new(AtomicU8::new(0));
407            let count_clone1 = count.clone();
408            let count_clone2 = count.clone();
409            doc.subscribe(move |_| {
410                count_clone1.fetch_add(1, Ordering::SeqCst);
411            });
412
413            doc_clone.subscribe(move |_| {
414                count_clone2.fetch_add(1, Ordering::SeqCst);
415            });
416
417            doc_clone.get_or_create_array("abc").unwrap().insert(0, 42).unwrap();
418
419            // wait observer, cycle once every 100mm
420            std::thread::sleep(std::time::Duration::from_millis(200));
421
422            assert_eq!(count.load(Ordering::SeqCst), 2);
423        });
424    }
425}