1use std::{
2 collections::{HashMap, VecDeque},
3 ops::Range,
4};
5
6use super::*;
7use crate::doc::StateVector;
8
9#[derive(Debug, Default, Clone)]
10pub struct Update {
11 pub(crate) structs: HashMap<u64, VecDeque<Node>>,
12 pub(crate) delete_set: DeleteSet,
13
14 pub(crate) pending_structs: HashMap<Client, VecDeque<Node>>,
18 pub(crate) missing_state: StateVector,
20 pub(crate) pending_delete_set: DeleteSet,
22}
23
24impl<R: CrdtReader> CrdtRead<R> for Update {
25 fn read(decoder: &mut R) -> JwstCodecResult<Self> {
26 let num_of_clients = decoder.read_var_u64()?;
27
28 let mut map = HashMap::with_capacity(num_of_clients as usize);
29 for _ in 0..num_of_clients {
30 let num_of_structs = decoder.read_var_u64()?;
31 let client = decoder.read_var_u64()?;
32 let mut clock = decoder.read_var_u64()?;
33
34 let mut structs = VecDeque::with_capacity(num_of_structs as usize);
35
36 for _ in 0..num_of_structs {
37 let struct_info = Node::read(decoder, Id::new(client, clock))?;
38 clock += struct_info.len();
39 structs.push_back(struct_info);
40 }
41
42 map.insert(client, structs);
43 }
44
45 let delete_set = DeleteSet::read(decoder)?;
46
47 if !decoder.is_empty() {
48 return Err(JwstCodecError::UpdateNotFullyConsumed(decoder.len() as usize));
49 }
50
51 Ok(Update {
52 structs: map,
53 delete_set,
54 ..Update::default()
55 })
56 }
57}
58
59impl<W: CrdtWriter> CrdtWrite<W> for Update {
60 fn write(&self, encoder: &mut W) -> JwstCodecResult {
61 encoder.write_var_u64(self.structs.len() as u64)?;
62
63 let mut clients = self.structs.keys().copied().collect::<Vec<_>>();
64
65 clients.sort_by(|a, b| b.cmp(a));
67
68 for client in clients {
69 let structs = self.structs.get(&client).unwrap();
70
71 encoder.write_var_u64(structs.len() as u64)?;
72 encoder.write_var_u64(client)?;
73 encoder.write_var_u64(structs.front().map(|s| s.clock()).unwrap_or(0))?;
74
75 for struct_info in structs {
76 struct_info.write(encoder)?;
77 }
78 }
79
80 self.delete_set.write(encoder)?;
81
82 Ok(())
83 }
84}
85
86impl Update {
87 pub fn from_ybinary1(buffer: Vec<u8>) -> JwstCodecResult<Update> {
89 Update::read(&mut RawDecoder::new(buffer))
90 }
91
92 pub fn into_ybinary1(self) -> JwstCodecResult<Vec<u8>> {
93 let mut encoder = RawEncoder::default();
94 self.write(&mut encoder)?;
95 Ok(encoder.into_inner())
96 }
97
98 pub(crate) fn iter(&mut self, state: StateVector) -> UpdateIterator {
99 UpdateIterator::new(self, state)
100 }
101
102 pub fn delete_set_iter(&mut self, state: StateVector) -> DeleteSetIterator {
103 DeleteSetIterator::new(self, state)
104 }
105
106 pub fn drain_pending_state(&mut self) {
108 debug_assert!(self.is_empty());
109
110 std::mem::swap(&mut self.pending_structs, &mut self.structs);
111 std::mem::swap(&mut self.pending_delete_set, &mut self.delete_set);
112 }
113
114 pub fn merge<I: IntoIterator<Item = Update>>(updates: I) -> Update {
115 let mut merged = Update::default();
116
117 Self::merge_into(&mut merged, updates);
118
119 merged
120 }
121
122 pub fn merge_into<I: IntoIterator<Item = Update>>(target: &mut Update, updates: I) {
123 for update in updates {
124 target.delete_set.merge(&update.delete_set);
125
126 for (client, structs) in update.structs {
127 let iter = structs.into_iter().filter(|p| !p.is_skip());
128 if let Some(merged_structs) = target.structs.get_mut(&client) {
129 merged_structs.extend(iter);
130 } else {
131 target.structs.insert(client, iter.collect());
132 }
133 }
134 }
135
136 for structs in target.structs.values_mut() {
137 structs.make_contiguous().sort_by_key(|s| s.id().clock);
138
139 let mut index = 0;
142 while index < structs.len() - 1 {
143 let cur = &structs[index];
144 let next = &structs[index + 1];
145
146 let clock_end = cur.id().clock + cur.len();
147 let next_clock = next.id().clock;
148
149 if next_clock > clock_end {
150 structs.insert(
151 index + 1,
152 Node::new_skip((cur.id().client, clock_end).into(), next_clock - clock_end),
153 );
154 index += 1;
155 }
156
157 index += 1;
158 }
159 }
160 }
161
162 pub fn is_empty(&self) -> bool {
163 self.structs.is_empty() && self.delete_set.is_empty()
164 }
165
166 pub fn is_pending_empty(&self) -> bool {
167 self.pending_structs.is_empty() && self.pending_delete_set.is_empty()
168 }
169}
170
171pub(crate) struct UpdateIterator<'a> {
172 update: &'a mut Update,
173
174 state: StateVector,
177 client_ids: Vec<Client>,
179 cur_client_id: Option<Client>,
181 stack: Vec<Node>,
184}
185
186impl<'a> UpdateIterator<'a> {
187 pub fn new(update: &'a mut Update, state: StateVector) -> Self {
188 let mut client_ids = update.structs.keys().cloned().collect::<Vec<_>>();
189 client_ids.sort();
190 let cur_client_id = client_ids.pop();
191
192 UpdateIterator {
193 update,
194 state,
195 client_ids,
196 cur_client_id,
197 stack: Vec::new(),
198 }
199 }
200
201 fn next_client(&mut self) -> Option<Client> {
208 while let Some(client_id) = self.cur_client_id {
209 match self.update.structs.get(&client_id) {
210 Some(refs) if !refs.is_empty() => {
211 self.cur_client_id.replace(client_id);
212 return self.cur_client_id;
213 }
214 _ => {
215 self.update.structs.remove(&client_id);
216 self.cur_client_id = self.client_ids.pop();
217 }
218 }
219 }
220
221 None
222 }
223
224 fn update_missing_state(&mut self, client: Client, clock: Clock) {
227 self.update.missing_state.set_min(client, clock);
228 }
229
230 fn add_stack_to_rest(&mut self) {
233 for s in self.stack.drain(..) {
234 let client = s.id().client;
235 let unapplicable_items = self.update.structs.remove(&client);
236 if let Some(mut items) = unapplicable_items {
237 items.push_front(s);
238 self.update.pending_structs.insert(client, items);
239 } else {
240 self.update.pending_structs.insert(client, [s].into());
241 }
242 self.client_ids.retain(|&c| c != client);
243 }
244 }
245
246 fn get_missing_dep(&self, struct_info: &Node) -> Option<Client> {
249 if let Some(item) = struct_info.as_item().get() {
250 let id = item.id;
251 if let Some(left) = &item.origin_left_id {
252 if left.client != id.client && left.clock >= self.state.get(&left.client) {
253 return Some(left.client);
254 }
255 }
256
257 if let Some(right) = &item.origin_right_id {
258 if right.client != id.client && right.clock >= self.state.get(&right.client) {
259 return Some(right.client);
260 }
261 }
262
263 if let Some(parent) = &item.parent {
264 match parent {
265 Parent::Id(parent_id)
266 if parent_id.client != id.client && parent_id.clock >= self.state.get(&parent_id.client) =>
267 {
268 return Some(parent_id.client);
269 }
270 _ => {}
271 }
272 }
273 }
274
275 None
276 }
277
278 fn next_candidate(&mut self) -> Option<Node> {
279 let mut cur = None;
280
281 if !self.stack.is_empty() {
282 cur.replace(self.stack.pop().unwrap());
283 } else if let Some(client) = self.next_client() {
284 cur.replace(self.update.structs.get_mut(&client).unwrap().pop_front().unwrap());
288 }
289
290 cur
291 }
292}
293
294impl Iterator for UpdateIterator<'_> {
295 type Item = (Node, u64);
296
297 fn next(&mut self) -> Option<Self::Item> {
298 let mut cur = self.next_candidate();
300
301 while let Some(cur_update) = cur.take() {
302 let id = cur_update.id();
303 if cur_update.is_skip() {
304 cur = self.next_candidate();
305 continue;
306 } else if !self.state.contains(&id) {
307 self.stack.push(cur_update);
311 self.update_missing_state(id.client, id.clock - 1);
312 self.add_stack_to_rest();
313 } else {
314 let id = cur_update.id();
315 let dep = self.get_missing_dep(&cur_update);
316 if let Some(dep) = dep {
318 self.stack.push(cur_update);
319
320 match self.update.structs.get_mut(&dep) {
321 Some(updates) if !updates.is_empty() => {
322 cur.replace(updates.pop_front().unwrap());
324 continue;
325 }
326 _ => {
329 self.update_missing_state(dep, self.state.get(&dep));
330 self.add_stack_to_rest();
331 }
332 }
333 } else {
334 let local_state = self.state.get(&id.client);
336 let offset = local_state - id.clock;
339 if offset == 0 || offset < cur_update.len() {
340 self.state.set_max(id.client, id.clock + cur_update.len());
341 return Some((cur_update, offset));
342 }
343 }
344 }
345
346 cur = self.next_candidate();
347 }
348
349 None
351 }
352}
353
354pub struct DeleteSetIterator<'a> {
355 update: &'a mut Update,
356 state: StateVector,
358}
359
360impl<'a> DeleteSetIterator<'a> {
361 pub fn new(update: &'a mut Update, state: StateVector) -> Self {
362 DeleteSetIterator { update, state }
363 }
364}
365
366impl Iterator for DeleteSetIterator<'_> {
367 type Item = (Client, Range<u64>);
368
369 fn next(&mut self) -> Option<Self::Item> {
370 while let Some(client) = self.update.delete_set.keys().next().cloned() {
371 let deletes = self.update.delete_set.get_mut(&client).unwrap();
372 let local_state = self.state.get(&client);
373
374 while let Some(range) = deletes.pop() {
375 let start = range.start;
376 let end = range.end;
377
378 if start < local_state {
379 if local_state < end {
380 self.update
387 .pending_delete_set
388 .add(client, local_state, end - local_state);
389
390 return Some((client, start..local_state));
391 }
392
393 return Some((client, range));
394 } else {
395 self.update.pending_delete_set.add(client, start, end - start);
397 }
398 }
399
400 self.update.delete_set.remove(&client);
401 }
402
403 None
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use std::{num::ParseIntError, path::PathBuf};
410
411 use serde::Deserialize;
412
413 use super::*;
414 use crate::doc::common::OrderRange;
415
416 fn struct_item(id: (Client, Clock), len: usize) -> Node {
417 Node::Item(Somr::new(
418 ItemBuilder::new()
419 .id(id.into())
420 .content(Content::String("c".repeat(len)))
421 .build(),
422 ))
423 }
424
425 fn parse_doc_update(input: Vec<u8>) -> JwstCodecResult<Update> {
426 Update::from_ybinary1(input)
427 }
428
429 #[test]
430 #[cfg_attr(any(miri, loom), ignore)]
431 fn test_parse_doc() {
432 let docs = [
433 (include_bytes!("../../fixtures/basic.bin").to_vec(), 1, 188),
434 (include_bytes!("../../fixtures/database.bin").to_vec(), 1, 149),
435 (include_bytes!("../../fixtures/large.bin").to_vec(), 1, 9036),
436 (include_bytes!("../../fixtures/with-subdoc.bin").to_vec(), 2, 30),
437 ];
438
439 for (doc, clients, structs) in docs {
440 let update = parse_doc_update(doc).unwrap();
441
442 assert_eq!(update.structs.len(), clients);
443 assert_eq!(update.structs.iter().map(|s| s.1.len()).sum::<usize>(), structs);
444 }
445 }
446
447 fn decode_hex(s: &str) -> Result<Vec<u8>, ParseIntError> {
448 (0..s.len())
449 .step_by(2)
450 .map(|i| u8::from_str_radix(&s[i..i + 2], 16))
451 .collect()
452 }
453
454 #[allow(dead_code)]
455 #[derive(Deserialize, Debug)]
456 struct Data {
457 id: u64,
458 workspace: String,
459 timestamp: String,
460 blob: String,
461 }
462
463 #[ignore = "just for local data test"]
464 #[test]
465 fn test_parse_local_doc() {
466 let json = serde_json::from_slice::<Vec<Data>>(include_bytes!("../../fixtures/local_docs.json")).unwrap();
467
468 for ws in json {
469 let data = &ws.blob[5..=(ws.blob.len() - 2)];
470 if let Ok(data) = decode_hex(data) {
471 match parse_doc_update(data.clone()) {
472 Ok(update) => {
473 println!(
474 "workspace: {}, global structs: {}, total structs: {}",
475 ws.workspace,
476 update.structs.len(),
477 update.structs.iter().map(|s| s.1.len()).sum::<usize>()
478 );
479 }
480 Err(_e) => {
481 std::fs::write(
482 PathBuf::from("./src/fixtures/invalid").join(format!("{}.ydoc", ws.workspace)),
483 data,
484 )
485 .unwrap();
486 println!("doc error: {}", ws.workspace);
487 }
488 }
489 } else {
490 println!("error origin data: {}", ws.workspace);
491 }
492 }
493 }
494
495 #[test]
496 fn test_update_iterator() {
497 loom_model!({
498 let mut update = Update {
499 structs: HashMap::from([
500 (
501 0,
502 VecDeque::from([
503 struct_item((0, 0), 1),
504 struct_item((0, 1), 1),
505 Node::new_skip((0, 2).into(), 1),
506 ]),
507 ),
508 (
509 1,
510 VecDeque::from([
511 struct_item((1, 0), 1),
512 Node::Item(Somr::new(
513 ItemBuilder::new()
514 .id((1, 1).into())
515 .left_id(Some((0, 1).into()))
516 .content(Content::String("c".repeat(2)))
517 .build(),
518 )),
519 ]),
520 ),
521 ]),
522 ..Update::default()
523 };
524
525 let mut iter = update.iter(StateVector::default());
526 assert_eq!(iter.next().unwrap().0.id(), (1, 0).into());
527 assert_eq!(iter.next().unwrap().0.id(), (0, 0).into());
528 assert_eq!(iter.next().unwrap().0.id(), (0, 1).into());
529 assert_eq!(iter.next().unwrap().0.id(), (1, 1).into());
530 assert_eq!(iter.next(), None);
531 });
532 }
533
534 #[test]
535 fn test_update_iterator_with_missing_state() {
536 loom_model!({
537 let mut update = Update {
538 structs: HashMap::from([(0, VecDeque::from([struct_item((0, 4), 1)]))]),
540 ..Update::default()
541 };
542
543 let mut iter = update.iter(StateVector::from([(0, 3)]));
544 assert_eq!(iter.next(), None);
545 assert!(!update.pending_structs.is_empty());
546 assert_eq!(
547 update.pending_structs.get_mut(&0).unwrap().pop_front().unwrap().id(),
548 (0, 4).into()
549 );
550 assert!(!update.missing_state.is_empty());
551 assert_eq!(update.missing_state.get(&0), 3);
552 });
553 }
554
555 #[test]
556 fn test_delete_set_iterator() {
557 let mut update = Update {
558 delete_set: DeleteSet::from([(0, vec![(0..2), (3..5)])]),
559 ..Update::default()
560 };
561
562 let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
563 assert_eq!(iter.next().unwrap(), (0, 0..2));
564 assert_eq!(iter.next().unwrap(), (0, 3..5));
565 assert_eq!(iter.next(), None);
566 }
567
568 #[test]
569 fn test_delete_set_with_missing_state() {
570 let mut update = Update {
571 delete_set: DeleteSet::from([(0, vec![(3..5), (7..12), (13..15)])]),
572 ..Update::default()
573 };
574
575 let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
576 assert_eq!(iter.next().unwrap(), (0, 3..5));
577 assert_eq!(iter.next().unwrap(), (0, 7..10));
578 assert_eq!(iter.next(), None);
579
580 assert!(!update.pending_delete_set.is_empty());
581 assert_eq!(
582 update.pending_delete_set.get(&0).unwrap(),
583 &OrderRange::from(vec![(10..12), (13..15)])
584 );
585 }
586
587 #[test]
588 fn should_add_skip_when_clock_not_continuous() {
589 loom_model!({
590 let update = Update {
591 structs: HashMap::from([(
592 0,
593 VecDeque::from([
594 struct_item((0, 0), 1),
595 struct_item((0, 1), 1),
596 struct_item((0, 10), 1),
597 Node::new_gc((0, 20).into(), 10),
598 ]),
599 )]),
600 ..Default::default()
601 };
602
603 let merged = Update::merge([update]);
604
605 assert_eq!(
606 merged.structs.get(&0).unwrap(),
607 &VecDeque::from([
608 struct_item((0, 0), 1),
609 struct_item((0, 1), 1),
610 Node::new_skip((0, 2).into(), 8),
611 struct_item((0, 10), 1),
612 Node::new_skip((0, 11).into(), 9),
613 Node::new_gc((0, 20).into(), 10),
614 ])
615 );
616 });
617 }
618
619 #[test]
620 fn merged_update_should_not_be_released_in_next_turn() {
621 loom_model!({
622 let update = Update {
623 structs: HashMap::from([(
624 0,
625 VecDeque::from([
626 struct_item((0, 0), 1),
627 struct_item((0, 1), 1),
628 struct_item((0, 10), 1),
629 Node::new_gc((0, 20).into(), 10),
630 ]),
631 )]),
632 ..Default::default()
633 };
634
635 let merged = Update::merge([update]);
636
637 let update2 = Update {
638 structs: HashMap::from([(
639 0,
640 VecDeque::from([struct_item((0, 30), 1), Node::new_gc((0, 32).into(), 1)]),
641 )]),
642 ..Default::default()
643 };
644
645 let merged2 = Update::merge([update2, merged]);
646
647 assert_eq!(merged2.structs.get(&0).unwrap().len(), 9);
648 });
649 }
650}