1use std::collections::HashMap;
2
3#[cfg(feature = "events")]
4use publisher::DocPublisher;
5
6use super::{
7 history::StoreHistory,
8 store::{ChangedTypeRefs, StoreRef},
9 *,
10};
11use crate::sync::{Arc, RwLock};
12
13#[cfg(feature = "debug")]
14#[derive(Debug, Clone)]
15pub struct DocStoreStatus {
16 pub nodes: usize,
17 pub delete_sets: usize,
18 pub types: usize,
19 pub dangling_types: usize,
20 pub pending_nodes: usize,
21}
22
23#[derive(Clone, Debug)]
37pub struct DocOptions {
38 pub guid: String,
39 pub client_id: u64,
40 pub gc: bool,
41}
42
43impl Default for DocOptions {
44 fn default() -> Self {
45 if cfg!(any(test, feature = "bench")) {
46 Self {
47 client_id: 1,
48 guid: "test".into(),
49 gc: true,
50 }
51 } else {
52 Self {
53 client_id: prefer_small_random(),
54 guid: nanoid::nanoid!(),
55 gc: true,
56 }
57 }
58 }
59}
60
61impl DocOptions {
62 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn with_client_id(mut self, client_id: u64) -> Self {
67 self.client_id = client_id;
68 self
69 }
70
71 pub fn with_guid(mut self, guid: String) -> Self {
72 self.guid = guid;
73 self
74 }
75
76 pub fn auto_gc(mut self, gc: bool) -> Self {
77 self.gc = gc;
78 self
79 }
80
81 pub fn build(self) -> Doc {
82 Doc::with_options(self)
83 }
84}
85
86impl From<DocOptions> for Any {
87 fn from(value: DocOptions) -> Self {
88 Any::Object(HashMap::from_iter([
89 ("gc".into(), value.gc.into()),
90 ("guid".into(), value.guid.into()),
91 ]))
92 }
93}
94
95impl TryFrom<Any> for DocOptions {
96 type Error = JwstCodecError;
97
98 fn try_from(value: Any) -> Result<Self, Self::Error> {
99 match value {
100 Any::Object(map) => {
101 let mut options = DocOptions::default();
102 for (key, value) in map {
103 match key.as_str() {
104 "gc" => {
105 options.gc = bool::try_from(value)?;
106 }
107 "guid" => {
108 options.guid = String::try_from(value)?;
109 }
110 _ => {}
111 }
112 }
113
114 Ok(options)
115 }
116 _ => Err(JwstCodecError::UnexpectedType("Object")),
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
122pub struct Doc {
123 client_id: u64,
124 opts: DocOptions,
125
126 pub(crate) store: StoreRef,
127 #[cfg(feature = "events")]
128 pub publisher: Arc<DocPublisher>,
129 pub(crate) batch: Somr<Batch>,
130}
131
132unsafe impl Send for Doc {}
133unsafe impl Sync for Doc {}
134
135impl Default for Doc {
136 fn default() -> Self {
137 Doc::new()
138 }
139}
140
141impl PartialEq for Doc {
142 fn eq(&self, other: &Self) -> bool {
143 self.client_id == other.client_id
144 }
145}
146
147impl Doc {
148 pub fn new() -> Self {
149 Self::with_options(DocOptions::default())
150 }
151
152 pub fn with_options(options: DocOptions) -> Self {
153 let store = Arc::new(RwLock::new(DocStore::with_client(options.client_id)));
154 #[cfg(feature = "events")]
155 let publisher = Arc::new(DocPublisher::new(store.clone()));
156
157 Self {
158 client_id: options.client_id,
159 opts: options,
160 store,
161 #[cfg(feature = "events")]
162 publisher,
163 batch: Somr::none(),
164 }
165 }
166
167 pub fn with_client(client_id: u64) -> Self {
168 DocOptions::new().with_client_id(client_id).build()
169 }
170
171 pub fn client(&self) -> Client {
172 self.client_id
173 }
174
175 pub fn set_client(&mut self, client_id: u64) {
176 self.client_id = client_id;
177 }
178
179 pub fn renew_client(&mut self) {
180 self.client_id = prefer_small_random();
181 }
182
183 pub fn clients(&self) -> Vec<u64> {
184 self.store.read().unwrap().clients()
185 }
186
187 pub fn history(&self) -> StoreHistory {
188 let history = StoreHistory::new(&self.store);
189 history.resolve();
190 history
191 }
192
193 #[cfg(feature = "debug")]
194 pub fn store_status(&self) -> DocStoreStatus {
195 let store = self.store.read().unwrap();
196
197 DocStoreStatus {
198 nodes: store.total_nodes(),
199 delete_sets: store.total_delete_sets(),
200 types: store.total_types(),
201 dangling_types: store.total_dangling_types(),
202 pending_nodes: store.total_pending_nodes(),
203 }
204 }
205
206 pub(crate) fn get_changed(&self) -> ChangedTypeRefs {
207 self.store.write().unwrap().get_changed()
208 }
209
210 pub fn store_compare(&self, other: &Doc) -> bool {
211 let store = self.store.read().unwrap();
212 let other_store = other.store.read().unwrap();
213
214 store.deep_compare(&other_store)
215 }
216
217 pub fn options(&self) -> &DocOptions {
218 &self.opts
219 }
220
221 pub fn guid(&self) -> &str {
222 self.opts.guid.as_str()
223 }
224
225 pub fn try_from_binary_v1<T: AsRef<[u8]>>(binary: T) -> JwstCodecResult<Self> {
229 Self::try_from_binary_v1_with_options(binary, DocOptions::default())
230 }
231
232 pub fn try_from_binary_v1_with_options<T: AsRef<[u8]>>(binary: T, options: DocOptions) -> JwstCodecResult<Self> {
233 let mut doc = Doc::with_options(options);
234 doc.apply_update_from_binary_v1(binary)?;
235 Ok(doc)
236 }
237
238 pub fn apply_update_from_binary_v1<T: AsRef<[u8]>>(&mut self, binary: T) -> JwstCodecResult {
239 let mut decoder = RawDecoder::new(binary.as_ref());
240 let update = Update::read(&mut decoder)?;
241 self.apply_update(update)
242 }
243
244 pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult {
245 let mut store = self.store.write().unwrap();
246 let mut retry = false;
247 loop {
248 for (mut s, offset) in update.iter(store.get_state_vector()) {
249 if let Node::Item(item) = &mut s {
250 debug_assert!(item.is_owned());
251 let mut item = unsafe { item.get_mut_unchecked() };
252 store.repair(&mut item, self.store.clone())?;
253 }
254 store.integrate(s, offset, None)?;
255 }
256
257 for (client, range) in update.delete_set_iter(store.get_state_vector()) {
258 store.delete_range(client, range)?;
259 }
260
261 if let Some(mut pending_update) = store.pending.take() {
262 if pending_update
263 .missing_state
264 .iter()
265 .any(|(client, clock)| *clock < store.get_state(*client))
266 {
267 retry = true;
269 }
270
271 for (client, range) in pending_update.delete_set_iter(store.get_state_vector()) {
272 store.delete_range(client, range)?;
273 }
274
275 if update.is_pending_empty() {
276 update = pending_update;
277 } else {
278 update.drain_pending_state();
280 Update::merge_into(&mut update, [pending_update]);
281 }
282 } else {
283 if update.is_pending_empty() {
288 break;
289 } else {
290 update.drain_pending_state();
292 retry = false;
293 };
294 }
295
296 if !retry {
298 if !update.is_empty() {
299 store.pending.replace(update);
300 }
301 break;
302 }
303 }
304
305 if self.opts.gc {
306 store.optimize()?;
307 }
308
309 Ok(())
310 }
311
312 pub fn keys(&self) -> Vec<String> {
313 let store = self.store.read().unwrap();
314 store.types.keys().cloned().collect()
315 }
316
317 pub fn get_or_create_text<S: AsRef<str>>(&self, name: S) -> JwstCodecResult<Text> {
318 YTypeBuilder::new(self.store.clone())
319 .with_kind(YTypeKind::Text)
320 .set_name(name.as_ref().to_string())
321 .build()
322 }
323
324 pub fn create_text(&self) -> JwstCodecResult<Text> {
325 YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Text).build()
326 }
327
328 pub fn get_or_create_array<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Array> {
329 YTypeBuilder::new(self.store.clone())
330 .with_kind(YTypeKind::Array)
331 .set_name(str.as_ref().to_string())
332 .build()
333 }
334
335 pub fn create_array(&self) -> JwstCodecResult<Array> {
336 YTypeBuilder::new(self.store.clone())
337 .with_kind(YTypeKind::Array)
338 .build()
339 }
340
341 pub fn get_or_create_map<S: AsRef<str>>(&self, str: S) -> JwstCodecResult<Map> {
342 YTypeBuilder::new(self.store.clone())
343 .with_kind(YTypeKind::Map)
344 .set_name(str.as_ref().to_string())
345 .build()
346 }
347
348 pub fn create_map(&self) -> JwstCodecResult<Map> {
349 YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Map).build()
350 }
351
352 pub fn get_map(&self, str: &str) -> JwstCodecResult<Map> {
353 YTypeBuilder::new(self.store.clone())
354 .with_kind(YTypeKind::Map)
355 .set_name(str.to_string())
356 .build_exists()
357 }
358
359 pub fn encode_update_v1(&self) -> JwstCodecResult<Vec<u8>> {
360 self.encode_state_as_update_v1(&StateVector::default())
361 }
362
363 pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult<Vec<u8>> {
364 let update = self.encode_state_as_update(sv)?;
365
366 let mut encoder = RawEncoder::default();
367 update.write(&mut encoder)?;
368 Ok(encoder.into_inner())
369 }
370
371 pub fn encode_update(&self) -> JwstCodecResult<Update> {
372 self.encode_state_as_update(&StateVector::default())
373 }
374
375 pub fn encode_state_as_update(&self, sv: &StateVector) -> JwstCodecResult<Update> {
376 self.store.read().unwrap().diff_state_vector(sv, true)
377 }
378
379 pub fn get_state_vector(&self) -> StateVector {
380 self.store.read().unwrap().get_state_vector()
381 }
382
383 pub fn get_delete_sets(&self) -> DeleteSet {
384 self.store.read().unwrap().get_delete_sets()
385 }
386
387 #[cfg(feature = "events")]
388 pub fn subscribe(&self, cb: impl Fn(&[u8], &[History]) + Sync + Send + 'static) {
389 self.publisher.subscribe(cb);
390 }
391
392 #[cfg(feature = "events")]
393 pub fn unsubscribe_all(&self) {
394 self.publisher.unsubscribe_all();
395 }
396
397 #[cfg(feature = "events")]
398 pub fn subscribe_count(&self) -> usize {
399 self.publisher.count()
400 }
401
402 #[cfg(feature = "events")]
403 pub fn subscriber_count(&self) -> usize {
404 Arc::<DocPublisher>::strong_count(&self.publisher)
405 }
406
407 pub fn gc(&self) -> JwstCodecResult<()> {
408 self.store.write().unwrap().optimize()
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use yrs::{Array, Map, Options, Transact, types::ToJson, updates::decoder::Decode};
415
416 use super::*;
417
418 #[test]
419 fn test_encode_state_as_update() {
420 let yrs_options_left = Options::default();
421 let yrs_options_right = Options::default();
422
423 loom_model!({
424 let (binary, binary_new) = if cfg!(miri) {
425 let doc = Doc::new();
426
427 let mut map = doc.get_or_create_map("abc").unwrap();
428 map.insert("a".to_string(), 1).unwrap();
429 let binary = doc.encode_update_v1().unwrap();
430
431 let doc_new = Doc::new();
432 let mut array = doc_new.get_or_create_array("array").unwrap();
433 array.insert(0, "array_value").unwrap();
434 let binary_new = doc.encode_update_v1().unwrap();
435
436 (binary, binary_new)
437 } else {
438 let yrs_doc = yrs::Doc::with_options(yrs_options_left.clone());
439
440 let map = yrs_doc.get_or_insert_map("abc");
441 let mut trx = yrs_doc.transact_mut();
442 map.insert(&mut trx, "a", 1);
443 let binary = trx.encode_update_v1();
444
445 let yrs_doc_new = yrs::Doc::with_options(yrs_options_right.clone());
446 let array = yrs_doc_new.get_or_insert_array("array");
447 let mut trx = yrs_doc_new.transact_mut();
448 array.insert(&mut trx, 0, "array_value");
449 let binary_new = trx.encode_update_v1();
450
451 (binary, binary_new)
452 };
453
454 let mut doc = Doc::try_from_binary_v1(binary).unwrap();
455 let mut doc_new = Doc::try_from_binary_v1(binary_new).unwrap();
456
457 let diff_update = doc_new.encode_state_as_update_v1(&doc.get_state_vector()).unwrap();
458
459 let diff_update_reverse = doc.encode_state_as_update_v1(&doc_new.get_state_vector()).unwrap();
460
461 doc.apply_update_from_binary_v1(diff_update).unwrap();
462 doc_new.apply_update_from_binary_v1(diff_update_reverse).unwrap();
463
464 assert_eq!(doc.encode_update_v1().unwrap(), doc_new.encode_update_v1().unwrap());
465 });
466 }
467
468 #[test]
469 #[cfg_attr(any(miri, loom), ignore)]
470 fn test_array_create() {
471 let yrs_options = yrs::Options::default();
472
473 let json = serde_json::json!([42.0, -42.0, true, false, "hello", "world", [1.0]]);
474
475 {
476 let doc = yrs::Doc::with_options(yrs_options.clone());
477 let array = doc.get_or_insert_array("abc");
478 let mut trx = doc.transact_mut();
479 array.insert(&mut trx, 0, 42);
480 array.insert(&mut trx, 1, -42);
481 array.insert(&mut trx, 2, true);
482 array.insert(&mut trx, 3, false);
483 array.insert(&mut trx, 4, "hello");
484 array.insert(&mut trx, 5, "world");
485
486 let sub_array = yrs::ArrayPrelim::default();
487 let sub_array = array.insert(&mut trx, 6, sub_array);
488 sub_array.insert(&mut trx, 0, 1);
489
490 drop(trx);
491 let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
492 .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
493 assert_json_diff::assert_json_matches!(array.to_json(&doc.transact()), json, config);
494 };
495
496 let binary = {
497 let doc = Doc::new();
498 let mut array = doc.get_or_create_array("abc").unwrap();
499 array.insert(0, 42).unwrap();
500 array.insert(1, -42).unwrap();
501 array.insert(2, true).unwrap();
502 array.insert(3, false).unwrap();
503 array.insert(4, "hello").unwrap();
504 array.insert(5, "world").unwrap();
505
506 let mut sub_array = doc.create_array().unwrap();
507 array.insert(6, sub_array.clone()).unwrap();
508 sub_array.insert(0, 1).unwrap();
510
511 doc.encode_update_v1().unwrap()
512 };
513
514 let ydoc = yrs::Doc::with_options(yrs_options);
515 let array = ydoc.get_or_insert_array("abc");
516 let mut trx = ydoc.transact_mut();
517 trx.apply_update(yrs::Update::decode_v1(&binary).unwrap()).unwrap();
518
519 let config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict)
520 .numeric_mode(assert_json_diff::NumericMode::AssumeFloat);
521 assert_json_diff::assert_json_matches!(array.to_json(&trx), json, config);
522
523 let mut doc = Doc::new();
524 let array = doc.get_or_create_array("abc").unwrap();
525 doc.apply_update_from_binary_v1(binary).unwrap();
526
527 let list = array.iter().collect::<Vec<_>>();
528
529 assert!(list.len() == 7);
530 assert!(matches!(list[6], Value::Array(_)));
531 }
532
533 #[test]
534 #[cfg(feature = "events")]
535 #[ignore = "inaccurate timing on ci, need for more accurate timing testing"]
536 fn test_subscribe() {
537 use crate::sync::{AtomicU8, Ordering};
538
539 loom_model!({
540 let doc = Doc::default();
541 let doc_clone = doc.clone();
542
543 let count = Arc::new(AtomicU8::new(0));
544 let count_clone1 = count.clone();
545 let count_clone2 = count.clone();
546 doc.subscribe(move |_, _| {
547 count_clone1.fetch_add(1, Ordering::SeqCst);
548 });
549
550 doc_clone.subscribe(move |_, _| {
551 count_clone2.fetch_add(1, Ordering::SeqCst);
552 });
553
554 doc_clone.get_or_create_array("abc").unwrap().insert(0, 42).unwrap();
555
556 std::thread::sleep(std::time::Duration::from_millis(200));
558
559 assert_eq!(count.load(Ordering::SeqCst), 2);
560 });
561 }
562
563 #[test]
564 fn test_repeated_applied_pending_update() {
565 loom_model!({
589 let mut doc = Doc::default();
590
591 doc.apply_update_from_binary_v1(vec![
592 1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
593 98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
594 ])
595 .unwrap();
596
597 let pending_size = doc
598 .store
599 .read()
600 .unwrap()
601 .pending
602 .as_ref()
603 .unwrap()
604 .structs
605 .iter()
606 .map(|s| s.1.len())
607 .sum::<usize>();
608 doc.apply_update_from_binary_v1(vec![
609 1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
610 98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
611 ])
612 .unwrap();
613
614 assert_eq!(
616 pending_size,
617 doc.store
618 .read()
619 .unwrap()
620 .pending
621 .as_ref()
622 .unwrap()
623 .structs
624 .iter()
625 .map(|s| s.1.len())
626 .sum::<usize>()
627 );
628 });
629 }
630
631 #[test]
632 fn test_update_from_vec_ref() {
633 loom_model!({
634 let doc = Doc::new();
635
636 let mut text = doc.get_or_create_text("text").unwrap();
637 text.insert(0, "hello world").unwrap();
638
639 let update = doc.encode_update_v1().unwrap();
640
641 let doc = Doc::try_from_binary_v1(update).unwrap();
642 let text = doc.get_or_create_text("text").unwrap();
643
644 assert_eq!(&text.to_string(), "hello world");
645 });
646 }
647
648 #[test]
649 #[cfg_attr(any(miri, loom), ignore)]
650 fn test_apply_update() {
651 let updates = [
652 include_bytes!("../fixtures/basic.bin").to_vec(),
653 include_bytes!("../fixtures/database.bin").to_vec(),
654 include_bytes!("../fixtures/large.bin").to_vec(),
655 include_bytes!("../fixtures/with-subdoc.bin").to_vec(),
656 include_bytes!("../fixtures/edge-case-left-right-same-node.bin").to_vec(),
657 ];
658
659 for update in updates {
660 let mut doc = Doc::new();
661 doc.apply_update_from_binary_v1(&update).unwrap();
662 }
663 }
664}