y_octo/doc/codec/
update.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    ops::Range,
4};
5
6use super::*;
7use crate::doc::StateVector;
8
9#[derive(Debug, Default, Clone)]
10pub struct Update {
11    pub(crate) structs: HashMap<u64, VecDeque<Node>>,
12    pub(crate) delete_set: DeleteSet,
13
14    /// all unapplicable items that we can't integrate into doc
15    /// any item with inconsistent id clock or missing dependency will be put
16    /// here
17    pub(crate) pending_structs: HashMap<Client, VecDeque<Node>>,
18    /// missing state vector after applying updates
19    pub(crate) missing_state: StateVector,
20    /// all unapplicable delete set
21    pub(crate) pending_delete_set: DeleteSet,
22}
23
24impl<R: CrdtReader> CrdtRead<R> for Update {
25    fn read(decoder: &mut R) -> JwstCodecResult<Self> {
26        let num_of_clients = decoder.read_var_u64()?;
27
28        let mut map = HashMap::with_capacity(num_of_clients as usize);
29        for _ in 0..num_of_clients {
30            let num_of_structs = decoder.read_var_u64()?;
31            let client = decoder.read_var_u64()?;
32            let mut clock = decoder.read_var_u64()?;
33
34            let mut structs = VecDeque::with_capacity(num_of_structs as usize);
35
36            for _ in 0..num_of_structs {
37                let struct_info = Node::read(decoder, Id::new(client, clock))?;
38                clock += struct_info.len();
39                structs.push_back(struct_info);
40            }
41
42            map.insert(client, structs);
43        }
44
45        let delete_set = DeleteSet::read(decoder)?;
46
47        if !decoder.is_empty() {
48            return Err(JwstCodecError::UpdateNotFullyConsumed(decoder.len() as usize));
49        }
50
51        Ok(Update {
52            structs: map,
53            delete_set,
54            ..Update::default()
55        })
56    }
57}
58
59impl<W: CrdtWriter> CrdtWrite<W> for Update {
60    fn write(&self, encoder: &mut W) -> JwstCodecResult {
61        encoder.write_var_u64(self.structs.len() as u64)?;
62
63        let mut clients = self.structs.keys().copied().collect::<Vec<_>>();
64
65        // Descending
66        clients.sort_by(|a, b| b.cmp(a));
67
68        for client in clients {
69            let structs = self.structs.get(&client).unwrap();
70
71            encoder.write_var_u64(structs.len() as u64)?;
72            encoder.write_var_u64(client)?;
73            encoder.write_var_u64(structs.front().map(|s| s.clock()).unwrap_or(0))?;
74
75            for struct_info in structs {
76                struct_info.write(encoder)?;
77            }
78        }
79
80        self.delete_set.write(encoder)?;
81
82        Ok(())
83    }
84}
85
86impl Update {
87    // decode from ydoc v1
88    pub fn from_ybinary1(buffer: Vec<u8>) -> JwstCodecResult<Update> {
89        Update::read(&mut RawDecoder::new(buffer))
90    }
91
92    pub fn into_ybinary1(self) -> JwstCodecResult<Vec<u8>> {
93        let mut encoder = RawEncoder::default();
94        self.write(&mut encoder)?;
95        Ok(encoder.into_inner())
96    }
97
98    pub(crate) fn iter(&mut self, state: StateVector) -> UpdateIterator {
99        UpdateIterator::new(self, state)
100    }
101
102    pub fn delete_set_iter(&mut self, state: StateVector) -> DeleteSetIterator {
103        DeleteSetIterator::new(self, state)
104    }
105
106    // take all pending structs and delete set to [self] update struct
107    pub fn drain_pending_state(&mut self) {
108        debug_assert!(self.is_empty());
109
110        std::mem::swap(&mut self.pending_structs, &mut self.structs);
111        std::mem::swap(&mut self.pending_delete_set, &mut self.delete_set);
112    }
113
114    pub fn merge<I: IntoIterator<Item = Update>>(updates: I) -> Update {
115        let mut merged = Update::default();
116
117        Self::merge_into(&mut merged, updates);
118
119        merged
120    }
121
122    pub fn merge_into<I: IntoIterator<Item = Update>>(target: &mut Update, updates: I) {
123        for update in updates {
124            target.delete_set.merge(&update.delete_set);
125
126            for (client, structs) in update.structs {
127                let iter = structs.into_iter().filter(|p| !p.is_skip());
128                if let Some(merged_structs) = target.structs.get_mut(&client) {
129                    merged_structs.extend(iter);
130                } else {
131                    target.structs.insert(client, iter.collect());
132                }
133            }
134        }
135
136        for structs in target.structs.values_mut() {
137            structs.make_contiguous().sort_by_key(|s| s.id().clock);
138
139            // insert [Node::Skip] if structs[index].id().clock + structs[index].len() <
140            // structs[index + 1].id().clock
141            let mut index = 0;
142            while index < structs.len() - 1 {
143                let cur = &structs[index];
144                let next = &structs[index + 1];
145
146                let clock_end = cur.id().clock + cur.len();
147                let next_clock = next.id().clock;
148
149                if next_clock > clock_end {
150                    structs.insert(
151                        index + 1,
152                        Node::new_skip((cur.id().client, clock_end).into(), next_clock - clock_end),
153                    );
154                    index += 1;
155                }
156
157                index += 1;
158            }
159        }
160    }
161
162    pub fn is_empty(&self) -> bool {
163        self.structs.is_empty() && self.delete_set.is_empty()
164    }
165
166    pub fn is_pending_empty(&self) -> bool {
167        self.pending_structs.is_empty() && self.pending_delete_set.is_empty()
168    }
169}
170
171pub(crate) struct UpdateIterator<'a> {
172    update: &'a mut Update,
173
174    // --- local iterator state ---
175    /// current state vector from store
176    state: StateVector,
177    /// all client ids sorted ascending
178    client_ids: Vec<Client>,
179    /// current id of client of the updates we're processing
180    cur_client_id: Option<Client>,
181    /// stack of previous iterating item with higher priority than updates in
182    /// next iteration
183    stack: Vec<Node>,
184}
185
186impl<'a> UpdateIterator<'a> {
187    pub fn new(update: &'a mut Update, state: StateVector) -> Self {
188        let mut client_ids = update.structs.keys().cloned().collect::<Vec<_>>();
189        client_ids.sort();
190        let cur_client_id = client_ids.pop();
191
192        UpdateIterator {
193            update,
194            state,
195            client_ids,
196            cur_client_id,
197            stack: Vec::new(),
198        }
199    }
200
201    /// iterate the client ids until we find the next client with left updates
202    /// that can be consumed
203    ///
204    /// note:
205    /// firstly we will check current client id as well to ensure current
206    /// updates queue is not empty yet
207    fn next_client(&mut self) -> Option<Client> {
208        while let Some(client_id) = self.cur_client_id {
209            match self.update.structs.get(&client_id) {
210                Some(refs) if !refs.is_empty() => {
211                    self.cur_client_id.replace(client_id);
212                    return self.cur_client_id;
213                }
214                _ => {
215                    self.update.structs.remove(&client_id);
216                    self.cur_client_id = self.client_ids.pop();
217                }
218            }
219        }
220
221        None
222    }
223
224    /// update the missing state vector
225    /// tell it the smallest clock that missed.
226    fn update_missing_state(&mut self, client: Client, clock: Clock) {
227        self.update.missing_state.set_min(client, clock);
228    }
229
230    /// any time we can't apply an update during the iteration,
231    /// we should put all items in pending stack to rest structs
232    fn add_stack_to_rest(&mut self) {
233        for s in self.stack.drain(..) {
234            let client = s.id().client;
235            let unapplicable_items = self.update.structs.remove(&client);
236            if let Some(mut items) = unapplicable_items {
237                items.push_front(s);
238                self.update.pending_structs.insert(client, items);
239            } else {
240                self.update.pending_structs.insert(client, [s].into());
241            }
242            self.client_ids.retain(|&c| c != client);
243        }
244    }
245
246    /// tell if current update's dependencies(left, right, parent) has already
247    /// been consumed and recorded and return the client of them if not.
248    fn get_missing_dep(&self, struct_info: &Node) -> Option<Client> {
249        if let Some(item) = struct_info.as_item().get() {
250            let id = item.id;
251            if let Some(left) = &item.origin_left_id {
252                if left.client != id.client && left.clock >= self.state.get(&left.client) {
253                    return Some(left.client);
254                }
255            }
256
257            if let Some(right) = &item.origin_right_id {
258                if right.client != id.client && right.clock >= self.state.get(&right.client) {
259                    return Some(right.client);
260                }
261            }
262
263            if let Some(parent) = &item.parent {
264                match parent {
265                    Parent::Id(parent_id)
266                        if parent_id.client != id.client && parent_id.clock >= self.state.get(&parent_id.client) =>
267                    {
268                        return Some(parent_id.client);
269                    }
270                    _ => {}
271                }
272            }
273        }
274
275        None
276    }
277
278    fn next_candidate(&mut self) -> Option<Node> {
279        let mut cur = None;
280
281        if !self.stack.is_empty() {
282            cur.replace(self.stack.pop().unwrap());
283        } else if let Some(client) = self.next_client() {
284            // Safety:
285            // client index of updates and update length are both checked in next_client
286            // safe to use unwrap
287            cur.replace(self.update.structs.get_mut(&client).unwrap().pop_front().unwrap());
288        }
289
290        cur
291    }
292}
293
294impl Iterator for UpdateIterator<'_> {
295    type Item = (Node, u64);
296
297    fn next(&mut self) -> Option<Self::Item> {
298        // fetch the first candidate from stack or updates
299        let mut cur = self.next_candidate();
300
301        while let Some(cur_update) = cur.take() {
302            let id = cur_update.id();
303            if cur_update.is_skip() {
304                cur = self.next_candidate();
305                continue;
306            } else if !self.state.contains(&id) {
307                // missing local state of same client
308                // can't apply the continuous updates from same client
309                // push into the stack and put tell all the items in stack are unapplicable
310                self.stack.push(cur_update);
311                self.update_missing_state(id.client, id.clock - 1);
312                self.add_stack_to_rest();
313            } else {
314                let id = cur_update.id();
315                let dep = self.get_missing_dep(&cur_update);
316                // some dependency is missing, we need to turn to iterate the dependency first.
317                if let Some(dep) = dep {
318                    self.stack.push(cur_update);
319
320                    match self.update.structs.get_mut(&dep) {
321                        Some(updates) if !updates.is_empty() => {
322                            // iterate the dependency client first
323                            cur.replace(updates.pop_front().unwrap());
324                            continue;
325                        }
326                        // but the dependency update is drained
327                        // need to move all stack item to unapplicable store
328                        _ => {
329                            self.update_missing_state(dep, self.state.get(&dep));
330                            self.add_stack_to_rest();
331                        }
332                    }
333                } else {
334                    // we finally find the first applicable update
335                    let local_state = self.state.get(&id.client);
336                    // we've already check the local state is greater or equal to current update's
337                    // clock so offset here will never be negative
338                    let offset = local_state - id.clock;
339                    if offset == 0 || offset < cur_update.len() {
340                        self.state.set_max(id.client, id.clock + cur_update.len());
341                        return Some((cur_update, offset));
342                    }
343                }
344            }
345
346            cur = self.next_candidate();
347        }
348
349        // we all done
350        None
351    }
352}
353
354pub struct DeleteSetIterator<'a> {
355    update: &'a mut Update,
356    /// current state vector from store
357    state: StateVector,
358}
359
360impl<'a> DeleteSetIterator<'a> {
361    pub fn new(update: &'a mut Update, state: StateVector) -> Self {
362        DeleteSetIterator { update, state }
363    }
364}
365
366impl Iterator for DeleteSetIterator<'_> {
367    type Item = (Client, Range<u64>);
368
369    fn next(&mut self) -> Option<Self::Item> {
370        while let Some(client) = self.update.delete_set.keys().next().cloned() {
371            let deletes = self.update.delete_set.get_mut(&client).unwrap();
372            let local_state = self.state.get(&client);
373
374            while let Some(range) = deletes.pop() {
375                let start = range.start;
376                let end = range.end;
377
378                if start < local_state {
379                    if local_state < end {
380                        // partially state missing
381                        // [start..end)
382                        //        ^ local_state in between
383                        // // split
384                        // [start..local_state) [local_state..end)
385                        //                      ^^^^^ unapplicable
386                        self.update
387                            .pending_delete_set
388                            .add(client, local_state, end - local_state);
389
390                        return Some((client, start..local_state));
391                    }
392
393                    return Some((client, range));
394                } else {
395                    // all state missing
396                    self.update.pending_delete_set.add(client, start, end - start);
397                }
398            }
399
400            self.update.delete_set.remove(&client);
401        }
402
403        None
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use std::{num::ParseIntError, path::PathBuf};
410
411    use serde::Deserialize;
412
413    use super::*;
414    use crate::doc::common::OrderRange;
415
416    fn struct_item(id: (Client, Clock), len: usize) -> Node {
417        Node::Item(Somr::new(
418            ItemBuilder::new()
419                .id(id.into())
420                .content(Content::String("c".repeat(len)))
421                .build(),
422        ))
423    }
424
425    fn parse_doc_update(input: Vec<u8>) -> JwstCodecResult<Update> {
426        Update::from_ybinary1(input)
427    }
428
429    #[test]
430    #[cfg_attr(any(miri, loom), ignore)]
431    fn test_parse_doc() {
432        let docs = [
433            (include_bytes!("../../fixtures/basic.bin").to_vec(), 1, 188),
434            (include_bytes!("../../fixtures/database.bin").to_vec(), 1, 149),
435            (include_bytes!("../../fixtures/large.bin").to_vec(), 1, 9036),
436            (include_bytes!("../../fixtures/with-subdoc.bin").to_vec(), 2, 30),
437        ];
438
439        for (doc, clients, structs) in docs {
440            let update = parse_doc_update(doc).unwrap();
441
442            assert_eq!(update.structs.len(), clients);
443            assert_eq!(update.structs.iter().map(|s| s.1.len()).sum::<usize>(), structs);
444        }
445    }
446
447    fn decode_hex(s: &str) -> Result<Vec<u8>, ParseIntError> {
448        (0..s.len())
449            .step_by(2)
450            .map(|i| u8::from_str_radix(&s[i..i + 2], 16))
451            .collect()
452    }
453
454    #[allow(dead_code)]
455    #[derive(Deserialize, Debug)]
456    struct Data {
457        id: u64,
458        workspace: String,
459        timestamp: String,
460        blob: String,
461    }
462
463    #[ignore = "just for local data test"]
464    #[test]
465    fn test_parse_local_doc() {
466        let json = serde_json::from_slice::<Vec<Data>>(include_bytes!("../../fixtures/local_docs.json")).unwrap();
467
468        for ws in json {
469            let data = &ws.blob[5..=(ws.blob.len() - 2)];
470            if let Ok(data) = decode_hex(data) {
471                match parse_doc_update(data.clone()) {
472                    Ok(update) => {
473                        println!(
474                            "workspace: {}, global structs: {}, total structs: {}",
475                            ws.workspace,
476                            update.structs.len(),
477                            update.structs.iter().map(|s| s.1.len()).sum::<usize>()
478                        );
479                    }
480                    Err(_e) => {
481                        std::fs::write(
482                            PathBuf::from("./src/fixtures/invalid").join(format!("{}.ydoc", ws.workspace)),
483                            data,
484                        )
485                        .unwrap();
486                        println!("doc error: {}", ws.workspace);
487                    }
488                }
489            } else {
490                println!("error origin data: {}", ws.workspace);
491            }
492        }
493    }
494
495    #[test]
496    fn test_update_iterator() {
497        loom_model!({
498            let mut update = Update {
499                structs: HashMap::from([
500                    (
501                        0,
502                        VecDeque::from([
503                            struct_item((0, 0), 1),
504                            struct_item((0, 1), 1),
505                            Node::new_skip((0, 2).into(), 1),
506                        ]),
507                    ),
508                    (
509                        1,
510                        VecDeque::from([
511                            struct_item((1, 0), 1),
512                            Node::Item(Somr::new(
513                                ItemBuilder::new()
514                                    .id((1, 1).into())
515                                    .left_id(Some((0, 1).into()))
516                                    .content(Content::String("c".repeat(2)))
517                                    .build(),
518                            )),
519                        ]),
520                    ),
521                ]),
522                ..Update::default()
523            };
524
525            let mut iter = update.iter(StateVector::default());
526            assert_eq!(iter.next().unwrap().0.id(), (1, 0).into());
527            assert_eq!(iter.next().unwrap().0.id(), (0, 0).into());
528            assert_eq!(iter.next().unwrap().0.id(), (0, 1).into());
529            assert_eq!(iter.next().unwrap().0.id(), (1, 1).into());
530            assert_eq!(iter.next(), None);
531        });
532    }
533
534    #[test]
535    fn test_update_iterator_with_missing_state() {
536        loom_model!({
537            let mut update = Update {
538                // an item with higher sequence id than local state
539                structs: HashMap::from([(0, VecDeque::from([struct_item((0, 4), 1)]))]),
540                ..Update::default()
541            };
542
543            let mut iter = update.iter(StateVector::from([(0, 3)]));
544            assert_eq!(iter.next(), None);
545            assert!(!update.pending_structs.is_empty());
546            assert_eq!(
547                update.pending_structs.get_mut(&0).unwrap().pop_front().unwrap().id(),
548                (0, 4).into()
549            );
550            assert!(!update.missing_state.is_empty());
551            assert_eq!(update.missing_state.get(&0), 3);
552        });
553    }
554
555    #[test]
556    fn test_delete_set_iterator() {
557        let mut update = Update {
558            delete_set: DeleteSet::from([(0, vec![(0..2), (3..5)])]),
559            ..Update::default()
560        };
561
562        let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
563        assert_eq!(iter.next().unwrap(), (0, 0..2));
564        assert_eq!(iter.next().unwrap(), (0, 3..5));
565        assert_eq!(iter.next(), None);
566    }
567
568    #[test]
569    fn test_delete_set_with_missing_state() {
570        let mut update = Update {
571            delete_set: DeleteSet::from([(0, vec![(3..5), (7..12), (13..15)])]),
572            ..Update::default()
573        };
574
575        let mut iter = update.delete_set_iter(StateVector::from([(0, 10)]));
576        assert_eq!(iter.next().unwrap(), (0, 3..5));
577        assert_eq!(iter.next().unwrap(), (0, 7..10));
578        assert_eq!(iter.next(), None);
579
580        assert!(!update.pending_delete_set.is_empty());
581        assert_eq!(
582            update.pending_delete_set.get(&0).unwrap(),
583            &OrderRange::from(vec![(10..12), (13..15)])
584        );
585    }
586
587    #[test]
588    fn should_add_skip_when_clock_not_continuous() {
589        loom_model!({
590            let update = Update {
591                structs: HashMap::from([(
592                    0,
593                    VecDeque::from([
594                        struct_item((0, 0), 1),
595                        struct_item((0, 1), 1),
596                        struct_item((0, 10), 1),
597                        Node::new_gc((0, 20).into(), 10),
598                    ]),
599                )]),
600                ..Default::default()
601            };
602
603            let merged = Update::merge([update]);
604
605            assert_eq!(
606                merged.structs.get(&0).unwrap(),
607                &VecDeque::from([
608                    struct_item((0, 0), 1),
609                    struct_item((0, 1), 1),
610                    Node::new_skip((0, 2).into(), 8),
611                    struct_item((0, 10), 1),
612                    Node::new_skip((0, 11).into(), 9),
613                    Node::new_gc((0, 20).into(), 10),
614                ])
615            );
616        });
617    }
618
619    #[test]
620    fn merged_update_should_not_be_released_in_next_turn() {
621        loom_model!({
622            let update = Update {
623                structs: HashMap::from([(
624                    0,
625                    VecDeque::from([
626                        struct_item((0, 0), 1),
627                        struct_item((0, 1), 1),
628                        struct_item((0, 10), 1),
629                        Node::new_gc((0, 20).into(), 10),
630                    ]),
631                )]),
632                ..Default::default()
633            };
634
635            let merged = Update::merge([update]);
636
637            let update2 = Update {
638                structs: HashMap::from([(
639                    0,
640                    VecDeque::from([struct_item((0, 30), 1), Node::new_gc((0, 32).into(), 1)]),
641                )]),
642                ..Default::default()
643            };
644
645            let merged2 = Update::merge([update2, merged]);
646
647            assert_eq!(merged2.structs.get(&0).unwrap().len(), 9);
648        });
649    }
650}