1use std::{collections::VecDeque, ops::Range};
2
3use super::*;
4use crate::doc::StateVector;
5
6#[derive(Debug, Default, Clone)]
7pub struct Update {
8 pub(crate) structs: ClientMap<VecDeque<Node>>,
9 pub(crate) delete_set: DeleteSet,
10
11 pub(crate) pending_structs: ClientMap<VecDeque<Node>>,
15 pub(crate) missing_state: StateVector,
17 pub(crate) pending_delete_set: DeleteSet,
19}
20
21impl<R: CrdtReader> CrdtRead<R> for Update {
22 fn read(decoder: &mut R) -> JwstCodecResult<Self> {
23 let num_of_clients = decoder.read_var_u64()? as usize;
24
25 let mut map = ClientMap::with_capacity(num_of_clients.min(HASHMAP_SAFE_CAPACITY));
27 for _ in 0..num_of_clients {
28 let num_of_structs = decoder.read_var_u64()? as usize;
29 let client = decoder.read_var_u64()?;
30 let mut clock = decoder.read_var_u64()?;
31
32 let mut structs = VecDeque::with_capacity(num_of_structs.min(HASHMAP_SAFE_CAPACITY));
34
35 for _ in 0..num_of_structs {
36 let struct_info = Node::read(decoder, Id::new(client, clock))?;
37 clock += struct_info.len();
38 structs.push_back(struct_info);
39 }
40
41 structs.shrink_to_fit();
42 map.insert(client, structs);
43 }
44
45 map.shrink_to_fit();
46
47 let delete_set = DeleteSet::read(decoder)?;
48
49 if !decoder.is_empty() {
50 return Err(JwstCodecError::UpdateNotFullyConsumed(decoder.len() as usize));
51 }
52
53 Ok(Update {
54 structs: map,
55 delete_set,
56 ..Update::default()
57 })
58 }
59}
60
61impl<W: CrdtWriter> CrdtWrite<W> for Update {
62 fn write(&self, encoder: &mut W) -> JwstCodecResult {
63 encoder.write_var_u64(self.structs.len() as u64)?;
64
65 let mut clients = self.structs.keys().copied().collect::<Vec<_>>();
66
67 clients.sort_by(|a, b| b.cmp(a));
69
70 for client in clients {
71 let structs = self.structs.get(&client).unwrap();
72
73 encoder.write_var_u64(structs.len() as u64)?;
74 encoder.write_var_u64(client)?;
75 encoder.write_var_u64(structs.front().map(|s| s.clock()).unwrap_or(0))?;
76
77 for struct_info in structs {
78 struct_info.write(encoder)?;
79 }
80 }
81
82 self.delete_set.write(encoder)?;
83
84 Ok(())
85 }
86}
87
88impl Update {
89 pub fn decode_v1<T: AsRef<[u8]>>(buffer: T) -> JwstCodecResult<Update> {
91 Update::read(&mut RawDecoder::new(buffer.as_ref()))
92 }
93
94 pub fn encode_v1(&self) -> JwstCodecResult<Vec<u8>> {
95 let mut encoder = RawEncoder::default();
96 self.write(&mut encoder)?;
97 Ok(encoder.into_inner())
98 }
99
100 pub(crate) fn iter(&mut self, state: StateVector) -> UpdateIterator<'_> {
101 UpdateIterator::new(self, state)
102 }
103
104 pub fn delete_set_iter(&mut self, state: StateVector) -> DeleteSetIterator<'_> {
105 DeleteSetIterator::new(self, state)
106 }
107
108 pub fn drain_pending_state(&mut self) {
110 debug_assert!(self.is_empty());
111
112 std::mem::swap(&mut self.pending_structs, &mut self.structs);
113 std::mem::swap(&mut self.pending_delete_set, &mut self.delete_set);
114 }
115
116 pub fn merge<I: IntoIterator<Item = Update>>(updates: I) -> Update {
117 let mut merged = Update::default();
118
119 Self::merge_into(&mut merged, updates);
120
121 merged
122 }
123
124 pub fn merge_into<I: IntoIterator<Item = Update>>(target: &mut Update, updates: I) {
125 for update in updates {
126 target.delete_set.merge(&update.delete_set);
127
128 for (client, structs) in update.structs {
129 let iter = structs.into_iter().filter(|p| !p.is_skip());
130 if let Some(merged_structs) = target.structs.get_mut(&client) {
131 merged_structs.extend(iter);
132 } else {
133 target.structs.insert(client, iter.collect());
134 }
135 }
136 }
137
138 for structs in target.structs.values_mut() {
139 structs.make_contiguous().sort_by_key(|s| s.id().clock);
140
141 let mut index = 0;
144 let mut merged_index = vec![];
145 while index < structs.len() - 1 {
146 let cur = &structs[index];
147 let next = &structs[index + 1];
148
149 let clock_end = cur.id().clock + cur.len();
150 let next_clock = next.id().clock;
151
152 if next_clock > clock_end {
153 structs.insert(
154 index + 1,
155 Node::new_skip((cur.id().client, clock_end).into(), next_clock - clock_end),
156 );
157 index += 1;
158 } else if cur.id().clock == next_clock {
159 if cur.deleted() == next.deleted()
160 && cur.last_id() == next.last_id()
161 && cur.left() == next.left()
162 && cur.right() == next.right()
163 {
164 merged_index.push(index + 1);
166 } else {
167 debug!("merge failed: {:?} {:?}", cur, next)
168 }
169 }
170
171 index += 1;
172 }
173
174 {
175 let mut new_structs = VecDeque::with_capacity(structs.len() - merged_index.len());
177 let mut next_remove_idx = 0;
178 for (idx, val) in structs.drain(..).enumerate() {
179 if next_remove_idx < merged_index.len() && idx == merged_index[next_remove_idx] {
180 next_remove_idx += 1;
181 } else {
182 new_structs.push_back(val);
183 }
184 }
185 structs.extend(new_structs);
186 }
187 }
188 }
189
190 pub fn is_content_empty(&self) -> bool {
191 self.structs.is_empty()
192 }
193
194 pub fn is_empty(&self) -> bool {
195 self.structs.is_empty() && self.delete_set.is_empty()
196 }
197
198 pub fn is_pending_empty(&self) -> bool {
199 self.pending_structs.is_empty() && self.pending_delete_set.is_empty()
200 }
201}
202
203pub(crate) struct UpdateIterator<'a> {
204 update: &'a mut Update,
205
206 state: StateVector,
209 client_ids: Vec<Client>,
211 cur_client_id: Option<Client>,
213 stack: Vec<Node>,
216}
217
218impl<'a> UpdateIterator<'a> {
219 pub fn new(update: &'a mut Update, state: StateVector) -> Self {
220 let mut client_ids = update.structs.keys().cloned().collect::<Vec<_>>();
221 client_ids.sort();
222 let cur_client_id = client_ids.pop();
223
224 UpdateIterator {
225 update,
226 state,
227 client_ids,
228 cur_client_id,
229 stack: Vec::new(),
230 }
231 }
232
233 fn next_client(&mut self) -> Option<Client> {
240 while let Some(client_id) = self.cur_client_id {
241 match self.update.structs.get(&client_id) {
242 Some(refs) if !refs.is_empty() => {
243 self.cur_client_id.replace(client_id);
244 return self.cur_client_id;
245 }
246 _ => {
247 self.update.structs.remove(&client_id);
248 self.cur_client_id = self.client_ids.pop();
249 }
250 }
251 }
252
253 None
254 }
255
256 fn update_missing_state(&mut self, client: Client, clock: Clock) {
259 self.update.missing_state.set_min(client, clock);
260 }
261
262 fn add_stack_to_rest(&mut self) {
265 for s in self.stack.drain(..) {
266 let client = s.id().client;
267 let unapplicable_items = self.update.structs.remove(&client);
268 if let Some(mut items) = unapplicable_items {
269 items.push_front(s);
270 self.update.pending_structs.insert(client, items);
271 } else {
272 self.update.pending_structs.insert(client, [s].into());
273 }
274 self.client_ids.retain(|&c| c != client);
275 }
276 }
277
278 fn get_missing_dep(&self, struct_info: &Node) -> Option<Client> {
281 if let Some(item) = struct_info.as_item().get() {
282 let id = item.id;
283 if let Some(left) = &item.origin_left_id
284 && left.client != id.client
285 && left.clock >= self.state.get(&left.client)
286 {
287 return Some(left.client);
288 }
289
290 if let Some(right) = &item.origin_right_id
291 && right.client != id.client
292 && right.clock >= self.state.get(&right.client)
293 {
294 return Some(right.client);
295 }
296
297 if let Some(parent) = &item.parent {
298 match parent {
299 Parent::Id(parent_id)
300 if parent_id.client != id.client && parent_id.clock >= self.state.get(&parent_id.client) =>
301 {
302 return Some(parent_id.client);
303 }
304 _ => {}
305 }
306 }
307 }
308
309 None
310 }
311
312 fn next_candidate(&mut self) -> Option<Node> {
313 let mut cur = None;
314
315 if !self.stack.is_empty() {
316 cur.replace(self.stack.pop().unwrap());
317 } else if let Some(client) = self.next_client() {
318 cur.replace(self.update.structs.get_mut(&client).unwrap().pop_front().unwrap());
322 }
323
324 cur
325 }
326}
327
328impl Iterator for UpdateIterator<'_> {
329 type Item = (Node, u64);
330
331 fn next(&mut self) -> Option<Self::Item> {
332 let mut cur = self.next_candidate();
334
335 while let Some(cur_update) = cur.take() {
336 let id = cur_update.id();
337 if cur_update.is_skip() {
338 cur = self.next_candidate();
339 continue;
340 } else if !self.state.contains(&id) {
341 self.stack.push(cur_update);
345 self.update_missing_state(id.client, id.clock - 1);
346 self.add_stack_to_rest();
347 } else {
348 let id = cur_update.id();
349 let dep = self.get_missing_dep(&cur_update);
350 if let Some(dep) = dep {
352 self.stack.push(cur_update);
353
354 match self.update.structs.get_mut(&dep) {
355 Some(updates) if !updates.is_empty() => {
356 cur.replace(updates.pop_front().unwrap());
358 continue;
359 }
360 _ => {
363 self.update_missing_state(dep, self.state.get(&dep));
364 self.add_stack_to_rest();
365 }
366 }
367 } else {
368 let local_state = self.state.get(&id.client);
370 let offset = local_state - id.clock;
373 if offset == 0 || offset < cur_update.len() {
374 self.state.set_max(id.client, id.clock + cur_update.len());
375 return Some((cur_update, offset));
376 }
377 }
378 }
379
380 cur = self.next_candidate();
381 }
382
383 None
385 }
386}
387
388pub struct DeleteSetIterator<'a> {
389 update: &'a mut Update,
390 state: StateVector,
392}
393
394impl<'a> DeleteSetIterator<'a> {
395 pub fn new(update: &'a mut Update, state: StateVector) -> Self {
396 DeleteSetIterator { update, state }
397 }
398}
399
400impl Iterator for DeleteSetIterator<'_> {
401 type Item = (Client, Range<u64>);
402
403 fn next(&mut self) -> Option<Self::Item> {
404 while let Some(client) = self.update.delete_set.keys().next().cloned() {
405 let deletes = self.update.delete_set.get_mut(&client).unwrap();
406 let local_state = self.state.get(&client);
407
408 while let Some(range) = deletes.pop() {
409 let start = range.start;
410 let end = range.end;
411
412 if start < local_state {
413 if local_state < end {
414 self.update
421 .pending_delete_set
422 .add(client, local_state, end - local_state);
423
424 return Some((client, start..local_state));
425 }
426
427 return Some((client, range));
428 } else {
429 self.update.pending_delete_set.add(client, start, end - start);
431 }
432 }
433
434 self.update.delete_set.remove(&client);
435 }
436
437 None
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use std::{num::ParseIntError, path::PathBuf};
444
445 use serde::Deserialize;
446
447 use super::*;
448 use crate::doc::common::OrderRange;
449
450 fn struct_item(id: (Client, Clock), len: usize) -> Node {
451 Node::Item(Somr::new(
452 ItemBuilder::new()
453 .id(id.into())
454 .content(Content::String("c".repeat(len)))
455 .build(),
456 ))
457 }
458
459 fn parse_doc_update(input: Vec<u8>) -> JwstCodecResult<Update> {
460 Update::decode_v1(input)
461 }
462
463 #[test]
464 #[cfg_attr(any(miri, loom), ignore)]
465 fn test_parse_doc() {
466 let docs = [
467 (include_bytes!("../../fixtures/basic.bin").to_vec(), 1, 188),
468 (include_bytes!("../../fixtures/database.bin").to_vec(), 1, 149),
469 (include_bytes!("../../fixtures/large.bin").to_vec(), 1, 9036),
470 (include_bytes!("../../fixtures/with-subdoc.bin").to_vec(), 2, 30),
471 (
472 include_bytes!("../../fixtures/edge-case-left-right-same-node.bin").to_vec(),
473 2,
474 243,
475 ),
476 ];
477
478 for (doc, clients, structs) in docs {
479 let update = parse_doc_update(doc).unwrap();
480
481 assert_eq!(update.structs.len(), clients);
482 assert_eq!(update.structs.iter().map(|s| s.1.len()).sum::<usize>(), structs);
483 }
484 }
485
486 fn decode_hex(s: &str) -> Result<Vec<u8>, ParseIntError> {
487 (0..s.len())
488 .step_by(2)
489 .map(|i| u8::from_str_radix(&s[i..i + 2], 16))
490 .collect()
491 }
492
493 #[allow(dead_code)]
494 #[derive(Deserialize, Debug)]
495 struct Data {
496 id: u64,
497 workspace: String,
498 timestamp: String,
499 blob: String,
500 }
501
502 #[ignore = "just for local data test"]
503 #[test]
504 fn test_parse_local_doc() {
505 let json = serde_json::from_slice::<Vec<Data>>(include_bytes!("../../fixtures/local_docs.json")).unwrap();
506
507 for ws in json {
508 let data = &ws.blob[5..=(ws.blob.len() - 2)];
509 if let Ok(data) = decode_hex(data) {
510 match parse_doc_update(data.clone()) {
511 Ok(update) => {
512 println!(
513 "workspace: {}, global structs: {}, total structs: {}",
514 ws.workspace,
515 update.structs.len(),
516 update.structs.iter().map(|s| s.1.len()).sum::<usize>()
517 );
518 }
519 Err(_e) => {
520 std::fs::write(
521 PathBuf::from("./src/fixtures/invalid").join(format!("{}.ydoc", ws.workspace)),
522 data,
523 )
524 .unwrap();
525 println!("doc error: {}", ws.workspace);
526 }
527 }
528 } else {
529 println!("error origin data: {}", ws.workspace);
530 }
531 }
532 }
533
534 #[test]
535 fn test_update_iterator() {
536 loom_model!({
537 let mut update = Update {
538 structs: ClientMap::from_iter([
539 (
540 0,
541 VecDeque::from([
542 struct_item((0, 0), 1),
543 struct_item((0, 1), 1),
544 Node::new_skip((0, 2).into(), 1),
545 ]),
546 ),
547 (
548 1,
549 VecDeque::from([
550 struct_item((1, 0), 1),
551 Node::Item(Somr::new(
552 ItemBuilder::new()
553 .id((1, 1).into())
554 .left_id(Some((0, 1).into()))
555 .content(Content::String("c".repeat(2)))
556 .build(),
557 )),
558 ]),
559 ),
560 ]),
561 ..Update::default()
562 };
563
564 let mut iter = update.iter(StateVector::default());
565 assert_eq!(iter.next().unwrap().0.id(), (1, 0).into());
566 assert_eq!(iter.next().unwrap().0.id(), (0, 0).into());
567 assert_eq!(iter.next().unwrap().0.id(), (0, 1).into());
568 assert_eq!(iter.next().unwrap().0.id(), (1, 1).into());
569 assert_eq!(iter.next(), None);
570 });
571 }
572
573 #[test]
574 fn test_update_iterator_with_missing_state() {
575 loom_model!({
576 let mut update = Update {
577 structs: ClientMap::from_iter([(0, VecDeque::from([struct_item((0, 4), 1)]))]),
579 ..Update::default()
580 };
581
582 let mut iter = update.iter(StateVector::from([(0, 3)]));
583 assert_eq!(iter.next(), None);
584 assert!(!update.pending_structs.is_empty());
585 assert_eq!(
586 update.pending_structs.get_mut(&0).unwrap().pop_front().unwrap().id(),
587 (0, 4).into()
588 );
589 assert!(!update.missing_state.is_empty());
590 assert_eq!(update.missing_state.get(&0), 3);
591 });
592 }
593
594 #[test]
595 fn test_delete_set_iterator() {
596 let mut update = Update {
597 delete_set: DeleteSet::from([(0, vec![(0..2), (3..5)])]),
598 ..Update::default()
599 };
600
601 let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
602 assert_eq!(iter.next().unwrap(), (0, 0..2));
603 assert_eq!(iter.next().unwrap(), (0, 3..5));
604 assert_eq!(iter.next(), None);
605 }
606
607 #[test]
608 fn test_delete_set_with_missing_state() {
609 let mut update = Update {
610 delete_set: DeleteSet::from([(0, vec![(3..5), (7..12), (13..15)])]),
611 ..Update::default()
612 };
613
614 let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
615 assert_eq!(iter.next().unwrap(), (0, 3..5));
616 assert_eq!(iter.next().unwrap(), (0, 7..10));
617 assert_eq!(iter.next(), None);
618
619 assert!(!update.pending_delete_set.is_empty());
620 assert_eq!(
621 update.pending_delete_set.get(&0).unwrap(),
622 &OrderRange::from(vec![(10..12), (13..15)])
623 );
624 }
625
626 #[test]
627 fn should_add_skip_when_clock_not_continuous() {
628 loom_model!({
629 let update = Update {
630 structs: ClientMap::from_iter([(
631 0,
632 VecDeque::from([
633 struct_item((0, 0), 1),
634 struct_item((0, 1), 1),
635 struct_item((0, 10), 1),
636 Node::new_gc((0, 20).into(), 10),
637 ]),
638 )]),
639 ..Default::default()
640 };
641
642 let merged = Update::merge([update]);
643
644 assert_eq!(
645 merged.structs.get(&0).unwrap(),
646 &VecDeque::from([
647 struct_item((0, 0), 1),
648 struct_item((0, 1), 1),
649 Node::new_skip((0, 2).into(), 8),
650 struct_item((0, 10), 1),
651 Node::new_skip((0, 11).into(), 9),
652 Node::new_gc((0, 20).into(), 10),
653 ])
654 );
655 });
656 }
657
658 #[test]
659 fn merged_update_should_not_be_released_in_next_turn() {
660 loom_model!({
661 let update = Update {
662 structs: ClientMap::from_iter([(
663 0,
664 VecDeque::from([
665 struct_item((0, 0), 1),
666 struct_item((0, 1), 1),
667 struct_item((0, 10), 1),
668 Node::new_gc((0, 20).into(), 10),
669 ]),
670 )]),
671 ..Default::default()
672 };
673
674 let merged = Update::merge([update]);
675
676 let update2 = Update {
677 structs: ClientMap::from_iter([(
678 0,
679 VecDeque::from([struct_item((0, 30), 1), Node::new_gc((0, 32).into(), 1)]),
680 )]),
681 ..Default::default()
682 };
683
684 let merged2 = Update::merge([update2, merged]);
685
686 assert_eq!(merged2.structs.get(&0).unwrap().len(), 9);
687 });
688 }
689}