1#[cfg(feature = "events")]
2use publisher::DocPublisher;
3
4use super::{
5 history::StoreHistory,
6 store::{ChangedTypeRefs, StoreRef},
7 *,
8};
9use crate::sync::{Arc, RwLock};
10
11#[cfg(feature = "debug")]
12#[derive(Debug, Clone)]
13pub struct DocStoreStatus {
14 pub nodes: usize,
15 pub delete_sets: usize,
16 pub types: usize,
17 pub dangling_types: usize,
18 pub pending_nodes: usize,
19}
20
21#[derive(Clone, Debug)]
35pub struct DocOptions {
36 pub guid: String,
37 pub client_id: u64,
38 pub gc: bool,
39}
40
41impl Default for DocOptions {
42 fn default() -> Self {
43 if cfg!(any(test, feature = "bench")) {
44 Self {
45 client_id: 1,
46 guid: "test".into(),
47 gc: true,
48 }
49 } else {
50 Self {
51 client_id: prefer_small_random(),
52 guid: nanoid::nanoid!(),
53 gc: true,
54 }
55 }
56 }
57}
58
59impl DocOptions {
60 pub fn new() -> Self {
61 Self::default()
62 }
63
64 pub fn with_client_id(mut self, client_id: u64) -> Self {
65 self.client_id = client_id;
66 self
67 }
68
69 pub fn with_guid(mut self, guid: String) -> Self {
70 self.guid = guid;
71 self
72 }
73
74 pub fn auto_gc(mut self, gc: bool) -> Self {
75 self.gc = gc;
76 self
77 }
78
79 pub fn build(self) -> Doc {
80 Doc::with_options(self)
81 }
82}
83
84impl From<DocOptions> for Any {
85 fn from(value: DocOptions) -> Self {
86 Any::Object(HashMap::from_iter([
87 ("gc".into(), value.gc.into()),
88 ("guid".into(), value.guid.into()),
89 ]))
90 }
91}
92
93impl TryFrom<Any> for DocOptions {
94 type Error = JwstCodecError;
95
96 fn try_from(value: Any) -> Result<Self, Self::Error> {
97 match value {
98 Any::Object(map) => {
99 let mut options = DocOptions::default();
100 for (key, value) in map {
101 match key.as_str() {
102 "gc" => {
103 options.gc = bool::try_from(value)?;
104 }
105 "guid" => {
106 options.guid = String::try_from(value)?;
107 }
108 _ => {}
109 }
110 }
111
112 Ok(options)
113 }
114 _ => Err(JwstCodecError::UnexpectedType("Object")),
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
120pub struct Doc {
121 client_id: u64,
122 opts: DocOptions,
123
124 pub(crate) store: StoreRef,
125 #[cfg(feature = "events")]
126 pub publisher: Arc<DocPublisher>,
127 pub(crate) batch: Somr<Batch>,
128}
129
130unsafe impl Send for Doc {}
131unsafe impl Sync for Doc {}
132
133impl Default for Doc {
134 fn default() -> Self {
135 Doc::new()
136 }
137}
138
139impl PartialEq for Doc {
140 fn eq(&self, other: &Self) -> bool {
141 self.client_id == other.client_id
142 }
143}
144
145impl Doc {
146 pub fn new() -> Self {
147 Self::with_options(DocOptions::default())
148 }
149
150 pub fn with_options(options: DocOptions) -> Self {
151 let store = Arc::new(RwLock::new(DocStore::with_client(options.client_id)));
152 #[cfg(feature = "events")]
153 let publisher = Arc::new(DocPublisher::new(store.clone()));
154
155 Self {
156 client_id: options.client_id,
157 opts: options,
158 store,
159 #[cfg(feature = "events")]
160 publisher,
161 batch: Somr::none(),
162 }
163 }
164
165 pub fn with_client(client_id: u64) -> Self {
166 DocOptions::new().with_client_id(client_id).build()
167 }
168
169 pub fn client(&self) -> Client {
170 self.client_id
171 }
172
173 pub fn set_client(&mut self, client_id: u64) {
174 self.client_id = client_id;
175 }
176
177 pub fn renew_client(&mut self) {
178 self.client_id = prefer_small_random();
179 }
180
181 pub fn clients(&self) -> Vec<u64> {
182 self.store.read().unwrap().clients()
183 }
184
185 pub fn history(&self) -> StoreHistory {
186 let history = StoreHistory::new(&self.store);
187 history.resolve();
188 history
189 }
190
191 #[cfg(feature = "debug")]
192 pub fn store_status(&self) -> DocStoreStatus {
193 let store = self.store.read().unwrap();
194
195 DocStoreStatus {
196 nodes: store.total_nodes(),
197 delete_sets: store.total_delete_sets(),
198 types: store.total_types(),
199 dangling_types: store.total_dangling_types(),
200 pending_nodes: store.total_pending_nodes(),
201 }
202 }
203
204 pub(crate) fn get_changed(&self) -> ChangedTypeRefs {
205 self.store.write().unwrap().get_changed()
206 }
207
208 pub fn store_compare(&self, other: &Doc) -> bool {
209 let store = self.store.read().unwrap();
210 let other_store = other.store.read().unwrap();
211
212 store.deep_compare(&other_store)
213 }
214
215 pub fn options(&self) -> &DocOptions {
216 &self.opts
217 }
218
219 pub fn guid(&self) -> &str {
220 self.opts.guid.as_str()
221 }
222
223 pub fn try_from_binary_v1<T: AsRef<[u8]>>(binary: T) -> JwstCodecResult<Self> {
227 Self::try_from_binary_v1_with_options(binary, DocOptions::default())
228 }
229
230 pub fn try_from_binary_v1_with_options<T: AsRef<[u8]>>(binary: T, options: DocOptions) -> JwstCodecResult<Self> {
231 let mut doc = Doc::with_options(options);
232 doc.apply_update_from_binary_v1(binary)?;
233 Ok(doc)
234 }
235
236 pub fn apply_update_from_binary_v1<T: AsRef<[u8]>>(&mut self, binary: T) -> JwstCodecResult {
237 let mut decoder = RawDecoder::new(binary.as_ref());
238 let update = Update::read(&mut decoder)?;
239 self.apply_update(update)
240 }
241
242 pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult {
243 let mut store = self.store.write().unwrap();
244 let mut retry = false;
245
246 loop {
247 let pending_types = update
249 .structs
250 .values()
251 .flatten()
252 .filter_map(|n| {
253 if let Node::Item(item_ref) = n
254 && let Some(item) = item_ref.get()
255 && let Content::Type(ty) = &item.content
256 {
257 Some((item.id, ty.clone()))
258 } else {
259 None
260 }
261 })
262 .collect();
263 for (mut s, offset) in update.iter(store.get_state_vector()) {
264 if let Node::Item(item) = &mut s {
265 debug_assert!(item.is_owned());
266 let mut item = unsafe { item.get_mut_unchecked() };
267 store.repair(&mut item, self.store.clone(), &pending_types)?;
268 }
269 store.integrate(s, offset, None)?;
270 }
271
272 for (client, range) in update.delete_set_iter(store.get_state_vector()) {
273 store.delete_range(client, range)?;
274 }
275
276 if let Some(mut pending_update) = store.pending.take() {
277 if pending_update
278 .missing_state
279 .iter()
280 .any(|(client, clock)| *clock < store.get_state(*client))
281 {
282 retry = true;
284 }
285
286 for (client, range) in pending_update.delete_set_iter(store.get_state_vector()) {
287 store.delete_range(client, range)?;
288 }
289
290 if update.is_pending_empty() {
291 update = pending_update;
292 } else {
293 update.drain_pending_state();
295 Update::merge_into(&mut update, [pending_update]);
296 }
297 } else {
298 if update.is_pending_empty() {
303 break;
304 } else {
305 update.drain_pending_state();
307 retry = false;
308 };
309 }
310
311 if !retry {
313 if !update.is_empty() {
314 store.pending.replace(update);
315 }
316 break;
317 }
318 }
319
320 if self.opts.gc {
321 store.optimize()?;
322 }
323
324 Ok(())
325 }
326
327 pub fn keys(&self) -> Vec<String> {
328 let store = self.store.read().unwrap();
329 store.types.keys().cloned().collect()
330 }
331
332 pub fn get_or_create_text<S: AsRef<str>>(&self, name: S) -> JwstCodecResult<Text> {
333 YTypeBuilder::new(self.store.clone())
334 .with_kind(YTypeKind::Text)
335 .set_name(name.as_ref().to_string())
336 .build()
337 }
338
339 pub fn create_text(&self) -> JwstCodecResult<Text> {
340 YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Text).build()
341 }
342
343 pub fn get_or_create_array<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Array> {
344 YTypeBuilder::new(self.store.clone())
345 .with_kind(YTypeKind::Array)
346 .set_name(str.as_ref().to_string())
347 .build()
348 }
349
350 pub fn create_array(&self) -> JwstCodecResult<Array> {
351 YTypeBuilder::new(self.store.clone())
352 .with_kind(YTypeKind::Array)
353 .build()
354 }
355
356 pub fn get_or_create_map<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Map> {
357 YTypeBuilder::new(self.store.clone())
358 .with_kind(YTypeKind::Map)
359 .set_name(str.as_ref().to_string())
360 .build()
361 }
362
363 pub fn create_map(&self) -> JwstCodecResult<Map> {
364 YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Map).build()
365 }
366
367 pub fn get_map(&self, str: &str) -> JwstCodecResult<Map> {
368 YTypeBuilder::new(self.store.clone())
369 .with_kind(YTypeKind::Map)
370 .set_name(str.to_string())
371 .build_exists()
372 }
373
374 pub fn encode_update_v1(&self) -> JwstCodecResult<Vec<u8>> {
375 self.encode_state_as_update_v1(&StateVector::default())
376 }
377
378 pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult<Vec<u8>> {
379 let update = self.encode_state_as_update(sv)?;
380
381 let mut encoder = RawEncoder::default();
382 update.write(&mut encoder)?;
383 Ok(encoder.into_inner())
384 }
385
386 pub fn encode_update(&self) -> JwstCodecResult<Update> {
387 self.encode_state_as_update(&StateVector::default())
388 }
389
390 pub fn encode_state_as_update(&self, sv: &StateVector) -> JwstCodecResult<Update> {
391 self.store.read().unwrap().diff_state_vector(sv, true)
392 }
393
394 pub fn get_state_vector(&self) -> StateVector {
395 self.store.read().unwrap().get_state_vector()
396 }
397
398 pub fn get_delete_sets(&self) -> DeleteSet {
399 self.store.read().unwrap().get_delete_sets()
400 }
401
402 #[cfg(feature = "events")]
403 pub fn subscribe(&self, cb: impl Fn(&[u8], &[History]) + Sync + Send + 'static) {
404 self.publisher.subscribe(cb);
405 }
406
407 #[cfg(feature = "events")]
408 pub fn unsubscribe_all(&self) {
409 self.publisher.unsubscribe_all();
410 }
411
412 #[cfg(feature = "events")]
413 pub fn subscribe_count(&self) -> usize {
414 self.publisher.count()
415 }
416
417 #[cfg(feature = "events")]
418 pub fn subscriber_count(&self) -> usize {
419 Arc::<DocPublisher>::strong_count(&self.publisher)
420 }
421
422 pub fn gc(&self) -> JwstCodecResult<()> {
423 self.store.write().unwrap().optimize()
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use yrs::{Array, Map, Options, Transact, types::ToJson, updates::decoder::Decode};
430
431 use super::*;
432
433 #[test]
434 fn test_encode_state_as_update() {
435 let yrs_options_left = Options::default();
436 let yrs_options_right = Options::default();
437
438 loom_model!({
439 let (binary, binary_new) = if cfg!(miri) {
440 let doc = Doc::new();
441
442 let mut map = doc.get_or_create_map("abc").unwrap();
443 map.insert("a".to_string(), 1).unwrap();
444 let binary = doc.encode_update_v1().unwrap();
445
446 let doc_new = Doc::new();
447 let mut array = doc_new.get_or_create_array("array").unwrap();
448 array.insert(0, "array_value").unwrap();
449 let binary_new = doc.encode_update_v1().unwrap();
450
451 (binary, binary_new)
452 } else {
453 let yrs_doc = yrs::Doc::with_options(yrs_options_left.clone());
454
455 let map = yrs_doc.get_or_insert_map("abc");
456 let mut trx = yrs_doc.transact_mut();
457 map.insert(&mut trx, "a", 1);
458 let binary = trx.encode_update_v1();
459
460 let yrs_doc_new = yrs::Doc::with_options(yrs_options_right.clone());
461 let array = yrs_doc_new.get_or_insert_array("array");
462 let mut trx = yrs_doc_new.transact_mut();
463 array.insert(&mut trx, 0, "array_value");
464 let binary_new = trx.encode_update_v1();
465
466 (binary, binary_new)
467 };
468
469 let mut doc = Doc::try_from_binary_v1(binary).unwrap();
470 let mut doc_new = Doc::try_from_binary_v1(binary_new).unwrap();
471
472 let diff_update = doc_new.encode_state_as_update_v1(&doc.get_state_vector()).unwrap();
473
474 let diff_update_reverse = doc.encode_state_as_update_v1(&doc_new.get_state_vector()).unwrap();
475
476 doc.apply_update_from_binary_v1(diff_update).unwrap();
477 doc_new.apply_update_from_binary_v1(diff_update_reverse).unwrap();
478
479 assert_eq!(doc.encode_update_v1().unwrap(), doc_new.encode_update_v1().unwrap());
480 });
481 }
482
483 #[test]
484 #[cfg_attr(any(miri, loom), ignore)]
485 fn test_array_create() {
486 let yrs_options = yrs::Options::default();
487
488 let json = serde_json::json!([42.0, -42.0, true, false, "hello", "world", [1.0]]);
489
490 {
491 let doc = yrs::Doc::with_options(yrs_options.clone());
492 let array = doc.get_or_insert_array("abc");
493 let mut trx = doc.transact_mut();
494 array.insert(&mut trx, 0, 42);
495 array.insert(&mut trx, 1, -42);
496 array.insert(&mut trx, 2, true);
497 array.insert(&mut trx, 3, false);
498 array.insert(&mut trx, 4, "hello");
499 array.insert(&mut trx, 5, "world");
500
501 let sub_array = yrs::ArrayPrelim::default();
502 let sub_array = array.insert(&mut trx, 6, sub_array);
503 sub_array.insert(&mut trx, 0, 1);
504
505 drop(trx);
506 let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
507 .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
508 assert_json_diff::assert_json_matches!(array.to_json(&doc.transact()), json, config);
509 };
510
511 {
512 let binary = {
513 let doc = Doc::new();
514 let mut array = doc.get_or_create_array("abc").unwrap();
515 array.insert(0, 42).unwrap();
516 array.insert(1, -42).unwrap();
517 array.insert(2, true).unwrap();
518 array.insert(3, false).unwrap();
519 array.insert(4, "hello").unwrap();
520 array.insert(5, "world").unwrap();
521
522 let mut sub_array = doc.create_array().unwrap();
523 array.insert(6, sub_array.clone()).unwrap();
524 sub_array.insert(0, 1).unwrap();
526
527 doc.encode_update_v1().unwrap()
528 };
529
530 let ydoc = yrs::Doc::with_options(yrs_options);
531 let array = ydoc.get_or_insert_array("abc");
532 let mut trx = ydoc.transact_mut();
533 trx.apply_update(yrs::Update::decode_v1(&binary).unwrap()).unwrap();
534
535 let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
536 .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
537 assert_json_diff::assert_json_matches!(array.to_json(&trx), json, config);
538
539 let mut doc = Doc::new();
540 let array = doc.get_or_create_array("abc").unwrap();
541 doc.apply_update_from_binary_v1(binary).unwrap();
542
543 let list = array.iter().collect::<Vec<_>>();
544
545 assert!(list.len() == 7);
546 assert!(matches!(list[6], Value::Array(_)));
547 }
548
549 {
550 let binary_detached = {
551 let doc = Doc::new();
552 let mut array = doc.get_or_create_array("abc").unwrap();
553 array.insert(0, 42).unwrap();
554 array.insert(1, -42).unwrap();
555 array.insert(2, true).unwrap();
556 array.insert(3, false).unwrap();
557 array.insert(4, "hello").unwrap();
558 array.insert(5, "world").unwrap();
559
560 let mut sub_array = doc.create_array().unwrap();
561 sub_array.insert(0, 1).unwrap();
562 array.insert(6, sub_array.clone()).unwrap();
563
564 doc.encode_update_v1().unwrap()
565 };
566
567 let detached_doc = Doc::try_from_binary_v1(binary_detached).unwrap();
568 let detached_array = detached_doc.get_or_create_array("abc").unwrap();
569 let detached_sub_array = match detached_array.get(6).unwrap() {
570 Value::Array(arr) => arr,
571 _ => panic!("expected array at index 6"),
572 };
573 assert_eq!(detached_sub_array.get(0).unwrap(), Value::Any(1.0.into()));
574 }
575 }
576
577 #[test]
578 #[cfg(feature = "events")]
579 #[ignore = "inaccurate timing on ci, need for more accurate timing testing"]
580 fn test_subscribe() {
581 use crate::sync::{AtomicU8, Ordering};
582
583 loom_model!({
584 let doc = Doc::default();
585 let doc_clone = doc.clone();
586
587 let count = Arc::new(AtomicU8::new(0));
588 let count_clone1 = count.clone();
589 let count_clone2 = count.clone();
590 doc.subscribe(move |_, _| {
591 count_clone1.fetch_add(1, Ordering::SeqCst);
592 });
593
594 doc_clone.subscribe(move |_, _| {
595 count_clone2.fetch_add(1, Ordering::SeqCst);
596 });
597
598 doc_clone.get_or_create_array("abc").unwrap().insert(0, 42).unwrap();
599
600 std::thread::sleep(std::time::Duration::from_millis(200));
602
603 assert_eq!(count.load(Ordering::SeqCst), 2);
604 });
605 }
606
607 #[test]
608 fn test_repeated_applied_pending_update() {
609 loom_model!({
633 let mut doc = Doc::default();
634
635 doc.apply_update_from_binary_v1(vec![
636 1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
637 98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
638 ])
639 .unwrap();
640
641 let pending_size = doc
642 .store
643 .read()
644 .unwrap()
645 .pending
646 .as_ref()
647 .unwrap()
648 .structs
649 .iter()
650 .map(|s| s.1.len())
651 .sum::<usize>();
652 doc.apply_update_from_binary_v1(vec![
653 1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
654 98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
655 ])
656 .unwrap();
657
658 assert_eq!(
660 pending_size,
661 doc.store
662 .read()
663 .unwrap()
664 .pending
665 .as_ref()
666 .unwrap()
667 .structs
668 .iter()
669 .map(|s| s.1.len())
670 .sum::<usize>()
671 );
672 });
673 }
674
675 #[test]
676 fn test_update_from_vec_ref() {
677 loom_model!({
678 let doc = Doc::new();
679
680 let mut text = doc.get_or_create_text("text").unwrap();
681 text.insert(0, "hello world").unwrap();
682
683 let mut root = doc.get_or_create_map("root").unwrap();
684 let mut child = doc.create_map().unwrap();
685 child.insert("k".to_string(), "v").unwrap();
686 root.insert("child".to_string(), child.clone()).unwrap();
687
688 let update = doc.encode_update_v1().unwrap();
689
690 let doc = Doc::try_from_binary_v1(update).unwrap();
691 let text = doc.get_or_create_text("text").unwrap();
692
693 assert_eq!(&text.to_string(), "hello world");
694
695 let root = doc.get_or_create_map("root").unwrap();
696 if let Some(Value::Map(child)) = root.get("child") {
697 assert!(
698 matches!(child.get("k"), Some(Value::Any(Any::String(s))) if s == "v"),
699 "expected nested map value to survive apply_update"
700 );
701 } else {
702 panic!("expected nested map to survive apply_update");
703 }
704 });
705 }
706
707 #[test]
708 #[cfg_attr(any(miri, loom), ignore)]
709 fn test_apply_update() {
710 let updates = [
711 include_bytes!("../fixtures/basic.bin").to_vec(),
712 include_bytes!("../fixtures/database.bin").to_vec(),
713 include_bytes!("../fixtures/large.bin").to_vec(),
714 include_bytes!("../fixtures/with-subdoc.bin").to_vec(),
715 include_bytes!("../fixtures/edge-case-left-right-same-node.bin").to_vec(),
716 ];
717
718 for update in updates {
719 let mut doc = Doc::new();
720 doc.apply_update_from_binary_v1(&update).unwrap();
721 }
722 }
723}