1use super::{publisher::DocPublisher, store::StoreRef, *};
2use crate::sync::{Arc, RwLock};
3
4#[derive(Clone, Default)]
5pub struct DocOptions {
6 pub guid: Option<String>,
7 pub client: Option<u64>,
8}
9
10#[derive(Debug, Clone)]
11pub struct Doc {
12 client_id: u64,
13 #[allow(dead_code)]
16 guid: String,
17 pub(super) store: StoreRef,
18 pub publisher: Arc<DocPublisher>,
19}
20
21unsafe impl Send for Doc {}
22unsafe impl Sync for Doc {}
23
24impl Default for Doc {
25 fn default() -> Self {
26 let client_id = rand::random();
27 let store = Arc::new(RwLock::new(DocStore::with_client(client_id)));
28 let publisher = Arc::new(DocPublisher::new(store.clone()));
29
30 Self {
31 client_id,
32 guid: nanoid!(),
33 store,
34 publisher,
35 }
36 }
37}
38
39impl PartialEq for Doc {
40 fn eq(&self, other: &Self) -> bool {
41 self.client_id == other.client_id
42 }
43}
44
45impl Doc {
46 pub fn with_options(options: DocOptions) -> Self {
47 let client = options.client.unwrap_or_else(rand::random);
48 let store = Arc::new(RwLock::new(DocStore::with_client(client)));
49 let publisher = Arc::new(DocPublisher::new(store.clone()));
50
51 Self {
52 client_id: client,
53 store,
54 guid: options.guid.unwrap_or_else(|| nanoid!()),
55 publisher,
56 }
57 }
58
59 pub fn with_client(client_id: u64) -> Self {
60 let store = Arc::new(RwLock::new(DocStore::with_client(client_id)));
61 let publisher = Arc::new(DocPublisher::new(store.clone()));
62 Self {
63 client_id,
64 store,
65 guid: nanoid!(),
66 publisher,
67 }
68 }
69
70 pub fn client(&self) -> Client {
71 self.client_id
72 }
73
74 pub fn guid(&self) -> &str {
75 self.guid.as_str()
76 }
77
78 pub fn new_from_binary(binary: Vec<u8>) -> JwstCodecResult<Self> {
79 let mut doc = Doc::default();
80 doc.apply_update_from_binary(binary)?;
81 Ok(doc)
82 }
83
84 pub fn new_from_binary_with_options(binary: Vec<u8>, options: DocOptions) -> JwstCodecResult<Self> {
85 let mut doc = Doc::with_options(options);
86 doc.apply_update_from_binary(binary)?;
87 Ok(doc)
88 }
89
90 pub fn apply_update_from_binary(&mut self, update: Vec<u8>) -> JwstCodecResult {
91 let mut decoder = RawDecoder::new(update);
92 let update = Update::read(&mut decoder)?;
93 self.apply_update(update)?;
94 Ok(())
95 }
96
97 pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult {
98 let mut store = self.store.write().unwrap();
99 let mut retry = false;
100 loop {
101 for (mut s, offset) in update.iter(store.get_state_vector()) {
102 if let Node::Item(item) = &mut s {
103 debug_assert!(item.is_owned());
104 let item = unsafe { item.get_mut_unchecked() };
105 store.repair(item, self.store.clone())?;
106 }
107 store.integrate(s, offset, None)?;
108 }
109
110 for (client, range) in update.delete_set_iter(store.get_state_vector()) {
111 store.delete_range(client, range)?;
112 }
113
114 if let Some(mut pending_update) = store.pending.take() {
115 if pending_update
116 .missing_state
117 .iter()
118 .any(|(client, clock)| store.get_state(*client) > *clock)
119 {
120 retry = true;
122 }
123
124 for (client, range) in pending_update.delete_set_iter(store.get_state_vector()) {
125 store.delete_range(client, range)?;
126 }
127
128 if update.is_pending_empty() {
129 update = pending_update;
130 } else {
131 update.drain_pending_state();
133 Update::merge_into(&mut update, [pending_update]);
134 }
135 } else {
136 if update.is_pending_empty() {
141 break;
142 } else {
143 update.drain_pending_state();
145 };
146 }
147
148 if !retry {
150 if !update.is_empty() {
151 store.pending.replace(update);
152 }
153 break;
154 }
155 }
156
157 Ok(())
158 }
159
160 pub fn keys(&self) -> Vec<String> {
161 let store = self.store.read().unwrap();
162 store.types.keys().cloned().collect()
163 }
164
165 pub fn get_or_create_text(&self, name: &str) -> JwstCodecResult<Text> {
166 YTypeBuilder::new(self.store.clone())
167 .with_kind(YTypeKind::Text)
168 .set_name(name.to_string())
169 .build()
170 }
171
172 pub fn create_text(&self) -> JwstCodecResult<Text> {
173 YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Text).build()
174 }
175
176 pub fn get_or_create_array(&self, str: &str) -> JwstCodecResult<Array> {
177 YTypeBuilder::new(self.store.clone())
178 .with_kind(YTypeKind::Array)
179 .set_name(str.to_string())
180 .build()
181 }
182
183 pub fn create_array(&self) -> JwstCodecResult<Array> {
184 YTypeBuilder::new(self.store.clone())
185 .with_kind(YTypeKind::Array)
186 .build()
187 }
188
189 pub fn get_or_create_map(&self, str: &str) -> JwstCodecResult<Map> {
190 YTypeBuilder::new(self.store.clone())
191 .with_kind(YTypeKind::Map)
192 .set_name(str.to_string())
193 .build()
194 }
195
196 pub fn create_map(&self) -> JwstCodecResult<Map> {
197 YTypeBuilder::new(self.store.clone()).with_kind(YTypeKind::Map).build()
198 }
199
200 pub fn get_map(&self, str: &str) -> JwstCodecResult<Map> {
201 YTypeBuilder::new(self.store.clone())
202 .with_kind(YTypeKind::Map)
203 .set_name(str.to_string())
204 .build_exists()
205 }
206
207 pub fn encode_update_v1(&self) -> JwstCodecResult<Vec<u8>> {
208 self.encode_state_as_update_v1(&StateVector::default())
209 }
210
211 pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult<Vec<u8>> {
212 let update = self.store.read().unwrap().diff_state_vector(sv)?;
213
214 let mut encoder = RawEncoder::default();
215 update.write(&mut encoder)?;
216 Ok(encoder.into_inner())
217 }
218
219 pub fn get_state_vector(&self) -> StateVector {
220 self.store.read().unwrap().get_state_vector()
221 }
222
223 pub fn subscribe(&self, cb: impl Fn(&[u8]) + Sync + Send + 'static) {
224 self.publisher.subscribe(cb);
225 }
226
227 pub fn unsubscribe_all(&self) {
228 self.publisher.unsubscribe_all();
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use yrs::{types::ToJson, updates::decoder::Decode, Array, Map, Options, Transact};
235
236 use super::*;
237 use crate::sync::{AtomicU8, Ordering};
238
239 #[test]
240 #[cfg_attr(miri, ignore)]
241 fn test_double_run_with_yrs_basic() {
242 let yrs_doc = yrs::Doc::new();
243
244 let map = yrs_doc.get_or_insert_map("abc");
245 let mut trx = yrs_doc.transact_mut();
246 map.insert(&mut trx, "a", 1).unwrap();
247
248 let binary_from_yrs = trx.encode_update_v1().unwrap();
249
250 let options = DocOptions {
251 client: Some(rand::random()),
252 guid: Some(nanoid::nanoid!()),
253 };
254
255 loom_model!({
256 let doc = Doc::new_from_binary_with_options(binary_from_yrs.clone(), options.clone()).unwrap();
257 let binary = doc.encode_update_v1().unwrap();
258
259 assert_eq!(binary_from_yrs, binary);
260 });
261 }
262
263 #[test]
264 fn test_encode_state_as_update() {
265 let options_left = DocOptions {
266 client: Some(rand::random()),
267 guid: Some(nanoid::nanoid!()),
268 };
269 let options_right = DocOptions {
270 client: Some(rand::random()),
271 guid: Some(nanoid::nanoid!()),
272 };
273
274 let yrs_options_left = Options {
275 client_id: rand::random(),
276 guid: nanoid::nanoid!().into(),
277 ..Default::default()
278 };
279
280 let yrs_options_right = Options {
281 client_id: rand::random(),
282 guid: nanoid::nanoid!().into(),
283 ..Default::default()
284 };
285
286 loom_model!({
287 let (binary, binary_new) = if cfg!(miri) {
288 let doc = Doc::with_options(options_left.clone());
289
290 let mut map = doc.get_or_create_map("abc").unwrap();
291 map.insert("a", 1).unwrap();
292 let binary = doc.encode_update_v1().unwrap();
293
294 let doc_new = Doc::with_options(options_right.clone());
295 let mut array = doc_new.get_or_create_array("array").unwrap();
296 array.insert(0, "array_value").unwrap();
297 let binary_new = doc.encode_update_v1().unwrap();
298
299 (binary, binary_new)
300 } else {
301 let yrs_doc = yrs::Doc::with_options(yrs_options_left.clone());
302
303 let map = yrs_doc.get_or_insert_map("abc");
304 let mut trx = yrs_doc.transact_mut();
305 map.insert(&mut trx, "a", 1).unwrap();
306 let binary = trx.encode_update_v1().unwrap();
307
308 let yrs_doc_new = yrs::Doc::with_options(yrs_options_right.clone());
309 let array = yrs_doc_new.get_or_insert_array("array");
310 let mut trx = yrs_doc_new.transact_mut();
311 array.insert(&mut trx, 0, "array_value").unwrap();
312 let binary_new = trx.encode_update_v1().unwrap();
313
314 (binary, binary_new)
315 };
316
317 let mut doc = Doc::new_from_binary_with_options(binary.clone(), options_left.clone()).unwrap();
318 let mut doc_new = Doc::new_from_binary_with_options(binary_new.clone(), options_right.clone()).unwrap();
319
320 let diff_update = doc_new.encode_state_as_update_v1(&doc.get_state_vector()).unwrap();
321
322 let diff_update_reverse = doc.encode_state_as_update_v1(&doc_new.get_state_vector()).unwrap();
323
324 doc.apply_update_from_binary(diff_update).unwrap();
325 doc_new.apply_update_from_binary(diff_update_reverse).unwrap();
326
327 assert_eq!(doc.encode_update_v1().unwrap(), doc_new.encode_update_v1().unwrap());
328 });
329 }
330
331 #[test]
332 #[cfg_attr(any(miri, loom), ignore)]
333 fn test_array_create() {
334 let options = DocOptions {
335 client: Some(rand::random()),
336 guid: Some(nanoid::nanoid!()),
337 };
338
339 let yrs_options =
340 yrs::Options::with_guid_and_client_id(options.guid.clone().unwrap().into(), options.client.unwrap());
341
342 let json = serde_json::json!([42.0, -42.0, true, false, "hello", "world", [1.0]]);
343
344 {
345 let doc = yrs::Doc::with_options(yrs_options.clone());
346 let array = doc.get_or_insert_array("abc");
347 let mut trx = doc.transact_mut();
348 array.insert(&mut trx, 0, 42).unwrap();
349 array.insert(&mut trx, 1, -42).unwrap();
350 array.insert(&mut trx, 2, true).unwrap();
351 array.insert(&mut trx, 3, false).unwrap();
352 array.insert(&mut trx, 4, "hello").unwrap();
353 array.insert(&mut trx, 5, "world").unwrap();
354
355 let sub_array = yrs::ArrayPrelim::default();
356 let sub_array = array.insert(&mut trx, 6, sub_array).unwrap();
357 sub_array.insert(&mut trx, 0, 1).unwrap();
358
359 drop(trx);
360
361 assert_json_diff::assert_json_eq!(array.to_json(&doc.transact()), json);
362 };
363
364 let binary = {
365 let doc = Doc::with_options(options.clone());
366 let mut array = doc.get_or_create_array("abc").unwrap();
367 array.insert(0, 42).unwrap();
368 array.insert(1, -42).unwrap();
369 array.insert(2, true).unwrap();
370 array.insert(3, false).unwrap();
371 array.insert(4, "hello").unwrap();
372 array.insert(5, "world").unwrap();
373
374 let mut sub_array = doc.create_array().unwrap();
375 array.insert(6, sub_array.clone()).unwrap();
376 sub_array.insert(0, 1).unwrap();
378
379 doc.encode_update_v1().unwrap()
380 };
381
382 let ydoc = yrs::Doc::with_options(yrs_options);
383 let array = ydoc.get_or_insert_array("abc");
384 let mut trx = ydoc.transact_mut();
385 trx.apply_update(yrs::Update::decode_v1(&binary).unwrap());
386
387 assert_json_diff::assert_json_eq!(array.to_json(&trx), json);
388
389 let mut doc = Doc::with_options(options);
390 let array = doc.get_or_create_array("abc").unwrap();
391 doc.apply_update_from_binary(binary).unwrap();
392
393 let list = array.iter().collect::<Vec<_>>();
394
395 assert!(list.len() == 7);
396 assert!(matches!(list[6], Value::Array(_)));
397 }
398
399 #[test]
400 #[ignore = "inaccurate timing on ci, need for more accurate timing testing"]
401 fn test_subscribe() {
402 loom_model!({
403 let doc = Doc::default();
404 let doc_clone = doc.clone();
405
406 let count = Arc::new(AtomicU8::new(0));
407 let count_clone1 = count.clone();
408 let count_clone2 = count.clone();
409 doc.subscribe(move |_| {
410 count_clone1.fetch_add(1, Ordering::SeqCst);
411 });
412
413 doc_clone.subscribe(move |_| {
414 count_clone2.fetch_add(1, Ordering::SeqCst);
415 });
416
417 doc_clone.get_or_create_array("abc").unwrap().insert(0, 42).unwrap();
418
419 std::thread::sleep(std::time::Duration::from_millis(200));
421
422 assert_eq!(count.load(Ordering::SeqCst), 2);
423 });
424 }
425}