y_octo/doc/codec/
update.rs

1use std::{collections::VecDeque, ops::Range};
2
3use super::*;
4use crate::doc::StateVector;
5
6#[derive(Debug, Default, Clone)]
7pub struct Update {
8    pub(crate) structs: ClientMap<VecDeque<Node>>,
9    pub(crate) delete_set: DeleteSet,
10
11    /// all unapplicable items that we can't integrate into doc
12    /// any item with inconsistent id clock or missing dependency will be put
13    /// here
14    pub(crate) pending_structs: ClientMap<VecDeque<Node>>,
15    /// missing state vector after applying updates
16    pub(crate) missing_state: StateVector,
17    /// all unapplicable delete set
18    pub(crate) pending_delete_set: DeleteSet,
19}
20
21impl<R: CrdtReader> CrdtRead<R> for Update {
22    fn read(decoder: &mut R) -> JwstCodecResult<Self> {
23        let num_of_clients = decoder.read_var_u64()? as usize;
24
25        // See: [HASHMAP_SAFE_CAPACITY]
26        let mut map = ClientMap::with_capacity(num_of_clients.min(HASHMAP_SAFE_CAPACITY));
27        for _ in 0..num_of_clients {
28            let num_of_structs = decoder.read_var_u64()? as usize;
29            let client = decoder.read_var_u64()?;
30            let mut clock = decoder.read_var_u64()?;
31
32            // same reason as above
33            let mut structs = VecDeque::with_capacity(num_of_structs.min(HASHMAP_SAFE_CAPACITY));
34
35            for _ in 0..num_of_structs {
36                let struct_info = Node::read(decoder, Id::new(client, clock))?;
37                clock += struct_info.len();
38                structs.push_back(struct_info);
39            }
40
41            structs.shrink_to_fit();
42            map.insert(client, structs);
43        }
44
45        map.shrink_to_fit();
46
47        let delete_set = DeleteSet::read(decoder)?;
48
49        if !decoder.is_empty() {
50            return Err(JwstCodecError::UpdateNotFullyConsumed(decoder.len() as usize));
51        }
52
53        Ok(Update {
54            structs: map,
55            delete_set,
56            ..Update::default()
57        })
58    }
59}
60
61impl<W: CrdtWriter> CrdtWrite<W> for Update {
62    fn write(&self, encoder: &mut W) -> JwstCodecResult {
63        encoder.write_var_u64(self.structs.len() as u64)?;
64
65        let mut clients = self.structs.keys().copied().collect::<Vec<_>>();
66
67        // Descending
68        clients.sort_by(|a, b| b.cmp(a));
69
70        for client in clients {
71            let structs = self.structs.get(&client).unwrap();
72
73            encoder.write_var_u64(structs.len() as u64)?;
74            encoder.write_var_u64(client)?;
75            encoder.write_var_u64(structs.front().map(|s| s.clock()).unwrap_or(0))?;
76
77            for struct_info in structs {
78                struct_info.write(encoder)?;
79            }
80        }
81
82        self.delete_set.write(encoder)?;
83
84        Ok(())
85    }
86}
87
88impl Update {
89    // decode from ydoc v1
90    pub fn decode_v1<T: AsRef<[u8]>>(buffer: T) -> JwstCodecResult<Update> {
91        Update::read(&mut RawDecoder::new(buffer.as_ref()))
92    }
93
94    pub fn encode_v1(&self) -> JwstCodecResult<Vec<u8>> {
95        let mut encoder = RawEncoder::default();
96        self.write(&mut encoder)?;
97        Ok(encoder.into_inner())
98    }
99
100    pub(crate) fn iter(&mut self, state: StateVector) -> UpdateIterator<'_> {
101        UpdateIterator::new(self, state)
102    }
103
104    pub fn delete_set_iter(&mut self, state: StateVector) -> DeleteSetIterator<'_> {
105        DeleteSetIterator::new(self, state)
106    }
107
108    // take all pending structs and delete set to [self] update struct
109    pub fn drain_pending_state(&mut self) {
110        debug_assert!(self.is_empty());
111
112        std::mem::swap(&mut self.pending_structs, &mut self.structs);
113        std::mem::swap(&mut self.pending_delete_set, &mut self.delete_set);
114    }
115
116    pub fn merge<I: IntoIterator<Item = Update>>(updates: I) -> Update {
117        let mut merged = Update::default();
118
119        Self::merge_into(&mut merged, updates);
120
121        merged
122    }
123
124    pub fn merge_into<I: IntoIterator<Item = Update>>(target: &mut Update, updates: I) {
125        for update in updates {
126            target.delete_set.merge(&update.delete_set);
127
128            for (client, structs) in update.structs {
129                let iter = structs.into_iter().filter(|p| !p.is_skip());
130                if let Some(merged_structs) = target.structs.get_mut(&client) {
131                    merged_structs.extend(iter);
132                } else {
133                    target.structs.insert(client, iter.collect());
134                }
135            }
136        }
137
138        for structs in target.structs.values_mut() {
139            structs.make_contiguous().sort_by_key(|s| s.id().clock);
140
141            // insert [Node::Skip] if structs[index].id().clock + structs[index].len() <
142            // structs[index + 1].id().clock
143            let mut index = 0;
144            let mut merged_index = vec![];
145            while index < structs.len() - 1 {
146                let cur = &structs[index];
147                let next = &structs[index + 1];
148
149                let clock_end = cur.id().clock + cur.len();
150                let next_clock = next.id().clock;
151
152                if next_clock > clock_end {
153                    structs.insert(
154                        index + 1,
155                        Node::new_skip((cur.id().client, clock_end).into(), next_clock - clock_end),
156                    );
157                    index += 1;
158                } else if cur.id().clock == next_clock {
159                    if cur.deleted() == next.deleted()
160                        && cur.last_id() == next.last_id()
161                        && cur.left() == next.left()
162                        && cur.right() == next.right()
163                    {
164                        // merge two nodes, mark the index
165                        merged_index.push(index + 1);
166                    } else {
167                        debug!("merge failed: {:?} {:?}", cur, next)
168                    }
169                }
170
171                index += 1;
172            }
173
174            {
175                // prune the merged nodes
176                let mut new_structs = VecDeque::with_capacity(structs.len() - merged_index.len());
177                let mut next_remove_idx = 0;
178                for (idx, val) in structs.drain(..).enumerate() {
179                    if next_remove_idx < merged_index.len() && idx == merged_index[next_remove_idx] {
180                        next_remove_idx += 1;
181                    } else {
182                        new_structs.push_back(val);
183                    }
184                }
185                structs.extend(new_structs);
186            }
187        }
188    }
189
190    pub fn is_content_empty(&self) -> bool {
191        self.structs.is_empty()
192    }
193
194    pub fn is_empty(&self) -> bool {
195        self.structs.is_empty() && self.delete_set.is_empty()
196    }
197
198    pub fn is_pending_empty(&self) -> bool {
199        self.pending_structs.is_empty() && self.pending_delete_set.is_empty()
200    }
201}
202
203pub(crate) struct UpdateIterator<'a> {
204    update: &'a mut Update,
205
206    // --- local iterator state ---
207    /// current state vector from store
208    state: StateVector,
209    /// all client ids sorted ascending
210    client_ids: Vec<Client>,
211    /// current id of client of the updates we're processing
212    cur_client_id: Option<Client>,
213    /// stack of previous iterating item with higher priority than updates in
214    /// next iteration
215    stack: Vec<Node>,
216}
217
218impl<'a> UpdateIterator<'a> {
219    pub fn new(update: &'a mut Update, state: StateVector) -> Self {
220        let mut client_ids = update.structs.keys().cloned().collect::<Vec<_>>();
221        client_ids.sort();
222        let cur_client_id = client_ids.pop();
223
224        UpdateIterator {
225            update,
226            state,
227            client_ids,
228            cur_client_id,
229            stack: Vec::new(),
230        }
231    }
232
233    /// iterate the client ids until we find the next client with left updates
234    /// that can be consumed
235    ///
236    /// note:
237    /// firstly we will check current client id as well to ensure current
238    /// updates queue is not empty yet
239    fn next_client(&mut self) -> Option<Client> {
240        while let Some(client_id) = self.cur_client_id {
241            match self.update.structs.get(&client_id) {
242                Some(refs) if !refs.is_empty() => {
243                    self.cur_client_id.replace(client_id);
244                    return self.cur_client_id;
245                }
246                _ => {
247                    self.update.structs.remove(&client_id);
248                    self.cur_client_id = self.client_ids.pop();
249                }
250            }
251        }
252
253        None
254    }
255
256    /// update the missing state vector
257    /// tell it the smallest clock that missed.
258    fn update_missing_state(&mut self, client: Client, clock: Clock) {
259        self.update.missing_state.set_min(client, clock);
260    }
261
262    /// any time we can't apply an update during the iteration,
263    /// we should put all items in pending stack to rest structs
264    fn add_stack_to_rest(&mut self) {
265        for s in self.stack.drain(..) {
266            let client = s.id().client;
267            let unapplicable_items = self.update.structs.remove(&client);
268            if let Some(mut items) = unapplicable_items {
269                items.push_front(s);
270                self.update.pending_structs.insert(client, items);
271            } else {
272                self.update.pending_structs.insert(client, [s].into());
273            }
274            self.client_ids.retain(|&c| c != client);
275        }
276    }
277
278    /// tell if current update's dependencies(left, right, parent) has already
279    /// been consumed and recorded and return the client of them if not.
280    fn get_missing_dep(&self, struct_info: &Node) -> Option<Client> {
281        if let Some(item) = struct_info.as_item().get() {
282            let id = item.id;
283            if let Some(left) = &item.origin_left_id
284                && left.client != id.client
285                && left.clock >= self.state.get(&left.client)
286            {
287                return Some(left.client);
288            }
289
290            if let Some(right) = &item.origin_right_id
291                && right.client != id.client
292                && right.clock >= self.state.get(&right.client)
293            {
294                return Some(right.client);
295            }
296
297            if let Some(parent) = &item.parent {
298                match parent {
299                    Parent::Id(parent_id)
300                        if parent_id.client != id.client && parent_id.clock >= self.state.get(&parent_id.client) =>
301                    {
302                        return Some(parent_id.client);
303                    }
304                    _ => {}
305                }
306            }
307        }
308
309        None
310    }
311
312    fn next_candidate(&mut self) -> Option<Node> {
313        let mut cur = None;
314
315        if !self.stack.is_empty() {
316            cur.replace(self.stack.pop().unwrap());
317        } else if let Some(client) = self.next_client() {
318            // Safety:
319            // client index of updates and update length are both checked in next_client
320            // safe to use unwrap
321            cur.replace(self.update.structs.get_mut(&client).unwrap().pop_front().unwrap());
322        }
323
324        cur
325    }
326}
327
328impl Iterator for UpdateIterator<'_> {
329    type Item = (Node, u64);
330
331    fn next(&mut self) -> Option<Self::Item> {
332        // fetch the first candidate from stack or updates
333        let mut cur = self.next_candidate();
334
335        while let Some(cur_update) = cur.take() {
336            let id = cur_update.id();
337            if cur_update.is_skip() {
338                cur = self.next_candidate();
339                continue;
340            } else if !self.state.contains(&id) {
341                // missing local state of same client
342                // can't apply the continuous updates from same client
343                // push into the stack and put tell all the items in stack are unapplicable
344                self.stack.push(cur_update);
345                self.update_missing_state(id.client, id.clock - 1);
346                self.add_stack_to_rest();
347            } else {
348                let id = cur_update.id();
349                let dep = self.get_missing_dep(&cur_update);
350                // some dependency is missing, we need to turn to iterate the dependency first.
351                if let Some(dep) = dep {
352                    self.stack.push(cur_update);
353
354                    match self.update.structs.get_mut(&dep) {
355                        Some(updates) if !updates.is_empty() => {
356                            // iterate the dependency client first
357                            cur.replace(updates.pop_front().unwrap());
358                            continue;
359                        }
360                        // but the dependency update is drained
361                        // need to move all stack item to unapplicable store
362                        _ => {
363                            self.update_missing_state(dep, self.state.get(&dep));
364                            self.add_stack_to_rest();
365                        }
366                    }
367                } else {
368                    // we finally find the first applicable update
369                    let local_state = self.state.get(&id.client);
370                    // we've already check the local state is greater or equal to current update's
371                    // clock so offset here will never be negative
372                    let offset = local_state - id.clock;
373                    if offset == 0 || offset < cur_update.len() {
374                        self.state.set_max(id.client, id.clock + cur_update.len());
375                        return Some((cur_update, offset));
376                    }
377                }
378            }
379
380            cur = self.next_candidate();
381        }
382
383        // we all done
384        None
385    }
386}
387
388pub struct DeleteSetIterator<'a> {
389    update: &'a mut Update,
390    /// current state vector from store
391    state: StateVector,
392}
393
394impl<'a> DeleteSetIterator<'a> {
395    pub fn new(update: &'a mut Update, state: StateVector) -> Self {
396        DeleteSetIterator { update, state }
397    }
398}
399
400impl Iterator for DeleteSetIterator<'_> {
401    type Item = (Client, Range<u64>);
402
403    fn next(&mut self) -> Option<Self::Item> {
404        while let Some(client) = self.update.delete_set.keys().next().cloned() {
405            let deletes = self.update.delete_set.get_mut(&client).unwrap();
406            let local_state = self.state.get(&client);
407
408            while let Some(range) = deletes.pop() {
409                let start = range.start;
410                let end = range.end;
411
412                if start < local_state {
413                    if local_state < end {
414                        // partially state missing
415                        // [start..end)
416                        //        ^ local_state in between
417                        // // split
418                        // [start..local_state) [local_state..end)
419                        //                      ^^^^^ unapplicable
420                        self.update
421                            .pending_delete_set
422                            .add(client, local_state, end - local_state);
423
424                        return Some((client, start..local_state));
425                    }
426
427                    return Some((client, range));
428                } else {
429                    // all state missing
430                    self.update.pending_delete_set.add(client, start, end - start);
431                }
432            }
433
434            self.update.delete_set.remove(&client);
435        }
436
437        None
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use std::{num::ParseIntError, path::PathBuf};
444
445    use serde::Deserialize;
446
447    use super::*;
448    use crate::doc::common::OrderRange;
449
450    fn struct_item(id: (Client, Clock), len: usize) -> Node {
451        Node::Item(Somr::new(
452            ItemBuilder::new()
453                .id(id.into())
454                .content(Content::String("c".repeat(len)))
455                .build(),
456        ))
457    }
458
459    fn parse_doc_update(input: Vec<u8>) -> JwstCodecResult<Update> {
460        Update::decode_v1(input)
461    }
462
463    #[test]
464    #[cfg_attr(any(miri, loom), ignore)]
465    fn test_parse_doc() {
466        let docs = [
467            (include_bytes!("../../fixtures/basic.bin").to_vec(), 1, 188),
468            (include_bytes!("../../fixtures/database.bin").to_vec(), 1, 149),
469            (include_bytes!("../../fixtures/large.bin").to_vec(), 1, 9036),
470            (include_bytes!("../../fixtures/with-subdoc.bin").to_vec(), 2, 30),
471            (
472                include_bytes!("../../fixtures/edge-case-left-right-same-node.bin").to_vec(),
473                2,
474                243,
475            ),
476        ];
477
478        for (doc, clients, structs) in docs {
479            let update = parse_doc_update(doc).unwrap();
480
481            assert_eq!(update.structs.len(), clients);
482            assert_eq!(update.structs.iter().map(|s| s.1.len()).sum::<usize>(), structs);
483        }
484    }
485
486    fn decode_hex(s: &str) -> Result<Vec<u8>, ParseIntError> {
487        (0..s.len())
488            .step_by(2)
489            .map(|i| u8::from_str_radix(&s[i..i + 2], 16))
490            .collect()
491    }
492
493    #[allow(dead_code)]
494    #[derive(Deserialize, Debug)]
495    struct Data {
496        id: u64,
497        workspace: String,
498        timestamp: String,
499        blob: String,
500    }
501
502    #[ignore = "just for local data test"]
503    #[test]
504    fn test_parse_local_doc() {
505        let json = serde_json::from_slice::<Vec<Data>>(include_bytes!("../../fixtures/local_docs.json")).unwrap();
506
507        for ws in json {
508            let data = &ws.blob[5..=(ws.blob.len() - 2)];
509            if let Ok(data) = decode_hex(data) {
510                match parse_doc_update(data.clone()) {
511                    Ok(update) => {
512                        println!(
513                            "workspace: {}, global structs: {}, total structs: {}",
514                            ws.workspace,
515                            update.structs.len(),
516                            update.structs.iter().map(|s| s.1.len()).sum::<usize>()
517                        );
518                    }
519                    Err(_e) => {
520                        std::fs::write(
521                            PathBuf::from("./src/fixtures/invalid").join(format!("{}.ydoc", ws.workspace)),
522                            data,
523                        )
524                        .unwrap();
525                        println!("doc error: {}", ws.workspace);
526                    }
527                }
528            } else {
529                println!("error origin data: {}", ws.workspace);
530            }
531        }
532    }
533
534    #[test]
535    fn test_update_iterator() {
536        loom_model!({
537            let mut update = Update {
538                structs: ClientMap::from_iter([
539                    (
540                        0,
541                        VecDeque::from([
542                            struct_item((0, 0), 1),
543                            struct_item((0, 1), 1),
544                            Node::new_skip((0, 2).into(), 1),
545                        ]),
546                    ),
547                    (
548                        1,
549                        VecDeque::from([
550                            struct_item((1, 0), 1),
551                            Node::Item(Somr::new(
552                                ItemBuilder::new()
553                                    .id((1, 1).into())
554                                    .left_id(Some((0, 1).into()))
555                                    .content(Content::String("c".repeat(2)))
556                                    .build(),
557                            )),
558                        ]),
559                    ),
560                ]),
561                ..Update::default()
562            };
563
564            let mut iter = update.iter(StateVector::default());
565            assert_eq!(iter.next().unwrap().0.id(), (1, 0).into());
566            assert_eq!(iter.next().unwrap().0.id(), (0, 0).into());
567            assert_eq!(iter.next().unwrap().0.id(), (0, 1).into());
568            assert_eq!(iter.next().unwrap().0.id(), (1, 1).into());
569            assert_eq!(iter.next(), None);
570        });
571    }
572
573    #[test]
574    fn test_update_iterator_with_missing_state() {
575        loom_model!({
576            let mut update = Update {
577                // an item with higher sequence id than local state
578                structs: ClientMap::from_iter([(0, VecDeque::from([struct_item((0, 4), 1)]))]),
579                ..Update::default()
580            };
581
582            let mut iter = update.iter(StateVector::from([(0, 3)]));
583            assert_eq!(iter.next(), None);
584            assert!(!update.pending_structs.is_empty());
585            assert_eq!(
586                update.pending_structs.get_mut(&0).unwrap().pop_front().unwrap().id(),
587                (0, 4).into()
588            );
589            assert!(!update.missing_state.is_empty());
590            assert_eq!(update.missing_state.get(&0), 3);
591        });
592    }
593
594    #[test]
595    fn test_delete_set_iterator() {
596        let mut update = Update {
597            delete_set: DeleteSet::from([(0, vec![(0..2), (3..5)])]),
598            ..Update::default()
599        };
600
601        let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
602        assert_eq!(iter.next().unwrap(), (0, 0..2));
603        assert_eq!(iter.next().unwrap(), (0, 3..5));
604        assert_eq!(iter.next(), None);
605    }
606
607    #[test]
608    fn test_delete_set_with_missing_state() {
609        let mut update = Update {
610            delete_set: DeleteSet::from([(0, vec![(3..5), (7..12), (13..15)])]),
611            ..Update::default()
612        };
613
614        let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
615        assert_eq!(iter.next().unwrap(), (0, 3..5));
616        assert_eq!(iter.next().unwrap(), (0, 7..10));
617        assert_eq!(iter.next(), None);
618
619        assert!(!update.pending_delete_set.is_empty());
620        assert_eq!(
621            update.pending_delete_set.get(&0).unwrap(),
622            &OrderRange::from(vec![(10..12), (13..15)])
623        );
624    }
625
626    #[test]
627    fn should_add_skip_when_clock_not_continuous() {
628        loom_model!({
629            let update = Update {
630                structs: ClientMap::from_iter([(
631                    0,
632                    VecDeque::from([
633                        struct_item((0, 0), 1),
634                        struct_item((0, 1), 1),
635                        struct_item((0, 10), 1),
636                        Node::new_gc((0, 20).into(), 10),
637                    ]),
638                )]),
639                ..Default::default()
640            };
641
642            let merged = Update::merge([update]);
643
644            assert_eq!(
645                merged.structs.get(&0).unwrap(),
646                &VecDeque::from([
647                    struct_item((0, 0), 1),
648                    struct_item((0, 1), 1),
649                    Node::new_skip((0, 2).into(), 8),
650                    struct_item((0, 10), 1),
651                    Node::new_skip((0, 11).into(), 9),
652                    Node::new_gc((0, 20).into(), 10),
653                ])
654            );
655        });
656    }
657
658    #[test]
659    fn merged_update_should_not_be_released_in_next_turn() {
660        loom_model!({
661            let update = Update {
662                structs: ClientMap::from_iter([(
663                    0,
664                    VecDeque::from([
665                        struct_item((0, 0), 1),
666                        struct_item((0, 1), 1),
667                        struct_item((0, 10), 1),
668                        Node::new_gc((0, 20).into(), 10),
669                    ]),
670                )]),
671                ..Default::default()
672            };
673
674            let merged = Update::merge([update]);
675
676            let update2 = Update {
677                structs: ClientMap::from_iter([(
678                    0,
679                    VecDeque::from([struct_item((0, 30), 1), Node::new_gc((0, 32).into(), 1)]),
680                )]),
681                ..Default::default()
682            };
683
684            let merged2 = Update::merge([update2, merged]);
685
686            assert_eq!(merged2.structs.get(&0).unwrap().len(), 9);
687        });
688    }
689}