loro_internal/
encoding.rs

1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9    decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::op::OpWithId;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{HasIdSpan, IdLpSpan, IdSpan, LoroEncodeError, LoroResult, PeerID, ID};
19use num_traits::{FromPrimitive, ToPrimitive};
20use rle::{HasLength, Sliceable};
21use std::borrow::Cow;
22
23/// The mode of the export.
24///
25/// Loro CRDT internally consists of two parts: document history and current document state.
26/// The export modes offer various options to meet different requirements.
27///
28/// - CRDT property: Documents maintain consistent states when they receive the same set of updates.
29/// - In real-time collaboration, peers typically only need to synchronize updates
30///   (operations/history) to achieve consistency.
31///
32/// ## Update Export
33///
34/// - Exports only the history part, containing multiple operations.
35/// - Suitable for real-time collaboration scenarios where peers only need to synchronize updates.
36///
37/// ## Snapshot Export
38///
39/// ### Default Snapshot
40///
41/// - Includes complete history and current full state.
42///
43/// ### Shallow Snapshot
44///
45/// - Contains the complete current state.
46/// - Retains partial history starting from a specified version.
47///
48/// ### State-only Snapshot
49///
50/// - Exports the state of the target version.
51/// - Includes a minimal set of operation history.
52#[non_exhaustive]
53#[derive(Debug, Clone)]
54pub enum ExportMode<'a> {
55    /// It contains the full history and the current state of the document.
56    Snapshot,
57    /// It contains the history since the `from` version vector.
58    Updates { from: Cow<'a, VersionVector> },
59    /// This mode exports the history in the specified range.
60    UpdatesInRange { spans: Cow<'a, [IdSpan]> },
61    /// The shallow snapshot only contains the history since the target frontiers
62    ShallowSnapshot(Cow<'a, Frontiers>),
63    /// The state only snapshot exports the state of the target version
64    /// with a minimal set of history (a few ops).
65    ///
66    /// It's a shallow snapshot with depth=1 at the target version.
67    /// If the target version is None, it will use the latest version as the target version.
68    StateOnly(Option<Cow<'a, Frontiers>>),
69    /// The snapshot at the specified frontiers. It contains the full history
70    /// till the target frontiers and the state at the target frontiers.
71    SnapshotAt { version: Cow<'a, Frontiers> },
72}
73
74impl<'a> ExportMode<'a> {
75    /// It contains the full history and the current state of the document.
76    pub fn snapshot() -> Self {
77        ExportMode::Snapshot
78    }
79
80    /// It contains the history since the `from` version vector.
81    pub fn updates(from: &'a VersionVector) -> Self {
82        ExportMode::Updates {
83            from: Cow::Borrowed(from),
84        }
85    }
86
87    /// It contains the history since the `from` version vector.
88    pub fn updates_owned(from: VersionVector) -> Self {
89        ExportMode::Updates {
90            from: Cow::Owned(from),
91        }
92    }
93
94    /// It contains all the history of the document.
95    pub fn all_updates() -> Self {
96        ExportMode::Updates {
97            from: Cow::Owned(Default::default()),
98        }
99    }
100
101    /// This mode exports the history in the specified range.
102    pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
103        ExportMode::UpdatesInRange {
104            spans: spans.into(),
105        }
106    }
107
108    /// The shallow snapshot only contains the history since the target frontiers.
109    pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
110        ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
111    }
112
113    /// The shallow snapshot only contains the history since the target frontiers.
114    pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
115        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
116    }
117
118    /// The shallow snapshot only contains the history since the target frontiers.
119    pub fn shallow_snapshot_since(id: ID) -> Self {
120        let frontiers = Frontiers::from_id(id);
121        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
122    }
123
124    /// The state only snapshot exports the state of the target version
125    /// with a minimal set of history (a few ops).
126    ///
127    /// It's a shallow snapshot with depth=1 at the target version.
128    /// If the target version is None, it will use the latest version as the target version.
129    pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
130        ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
131    }
132
133    /// The snapshot at the specified frontiers. It contains the full history
134    /// till the target frontiers and the state at the target frontiers.
135    pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
136        ExportMode::SnapshotAt {
137            version: Cow::Borrowed(frontiers),
138        }
139    }
140
141    /// This mode exports the history within the specified version vector.
142    pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
143        let mut spans = Vec::with_capacity(vv.len());
144        for (peer, counter) in vv.iter() {
145            if *counter > 0 {
146                spans.push(IdSpan::new(*peer, 0, *counter));
147            }
148        }
149
150        ExportMode::UpdatesInRange {
151            spans: Cow::Owned(spans),
152        }
153    }
154}
155
156const MAGIC_BYTES: [u8; 4] = *b"loro";
157
158#[derive(Clone, Copy, Debug, PartialEq, Eq)]
159pub(crate) enum EncodeMode {
160    // This is a config option, it won't be used in encoding.
161    Auto = 255,
162    OutdatedRle = 1,
163    OutdatedSnapshot = 2,
164    FastSnapshot = 3,
165    FastUpdates = 4,
166}
167
168impl num_traits::FromPrimitive for EncodeMode {
169    #[allow(trivial_numeric_casts)]
170    #[inline]
171    fn from_i64(n: i64) -> Option<Self> {
172        match n {
173            n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
174            n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
175            n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
176            n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
177            n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
178            _ => None,
179        }
180    }
181    #[inline]
182    fn from_u64(n: u64) -> Option<Self> {
183        Self::from_i64(n as i64)
184    }
185}
186
187impl num_traits::ToPrimitive for EncodeMode {
188    #[inline]
189    #[allow(trivial_numeric_casts)]
190    fn to_i64(&self) -> Option<i64> {
191        Some(match *self {
192            EncodeMode::Auto => EncodeMode::Auto as i64,
193            EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
194            EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
195            EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
196            EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
197        })
198    }
199    #[inline]
200    fn to_u64(&self) -> Option<u64> {
201        self.to_i64().map(|x| x as u64)
202    }
203}
204
205impl EncodeMode {
206    pub fn to_bytes(self) -> [u8; 2] {
207        let value = self.to_u16().unwrap();
208        value.to_be_bytes()
209    }
210
211    pub fn is_snapshot(self) -> bool {
212        matches!(
213            self,
214            EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
215        )
216    }
217}
218
219impl TryFrom<[u8; 2]> for EncodeMode {
220    type Error = LoroError;
221
222    fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
223        let value = u16::from_be_bytes(value);
224        Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
225    }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, Default)]
229pub struct ImportStatus {
230    pub success: VersionRange,
231    pub pending: Option<VersionRange>,
232}
233
234/// The encoder used to encode the container states.
235///
236/// Each container state can be represented by a sequence of operations.
237/// For example, a list state can be represented by a sequence of insert
238/// operations that form its current state.
239/// We ignore the delete operations.
240///
241/// We will use a new encoder for each container state.
242/// Each container state should call encode_op multiple times until all the
243/// operations constituting its current state are encoded.
244pub(crate) struct StateSnapshotEncoder<'a> {
245    /// The `check_idspan` function is used to check if the id span is valid.
246    /// If the id span is invalid, the function should return an error that
247    /// contains the missing id span.
248    check_idspan: &'a dyn Fn(IdLpSpan) -> Result<(), IdLpSpan>,
249    /// The `encoder_by_op` function is used to encode an operation.
250    encoder_by_op: &'a mut dyn FnMut(OpWithId),
251    /// The `record_idspan` function is used to record the id span to track the
252    /// encoded order.
253    record_idspan: &'a mut dyn FnMut(IdLpSpan),
254    register_peer: &'a mut dyn FnMut(PeerID) -> usize,
255    #[allow(unused)]
256    mode: EncodeMode,
257}
258
259impl StateSnapshotEncoder<'_> {
260    pub fn encode_op(&mut self, id_span: IdLpSpan, get_op: impl FnOnce() -> OpWithId) {
261        if let Err(span) = (self.check_idspan)(id_span) {
262            let mut op = get_op();
263            if span == id_span {
264                (self.encoder_by_op)(op);
265            } else {
266                debug_assert_eq!(span.lamport.start, id_span.lamport.start);
267                op.op = op.op.slice(span.atom_len(), op.op.atom_len());
268                (self.encoder_by_op)(op);
269            }
270        }
271
272        (self.record_idspan)(id_span);
273    }
274
275    #[allow(unused)]
276    pub fn mode(&self) -> EncodeMode {
277        self.mode
278    }
279
280    pub(crate) fn register_peer(&mut self, peer: PeerID) -> usize {
281        (self.register_peer)(peer)
282    }
283}
284
285pub(crate) struct StateSnapshotDecodeContext<'a> {
286    pub oplog: &'a OpLog,
287    pub peers: &'a [PeerID],
288    pub ops: &'a mut dyn Iterator<Item = OpWithId>,
289    #[allow(unused)]
290    pub blob: &'a [u8],
291    pub mode: EncodeMode,
292}
293
294pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode) -> Vec<u8> {
295    let mode = match mode {
296        EncodeMode::Auto => EncodeMode::OutdatedRle,
297        mode => mode,
298    };
299
300    let body = match &mode {
301        EncodeMode::OutdatedRle => outdated_encode_reordered::encode_updates(oplog, vv),
302        _ => unreachable!(),
303    };
304
305    encode_header_and_body(mode, body)
306}
307
308pub(crate) fn decode_oplog(
309    oplog: &mut OpLog,
310    parsed: ParsedHeaderAndBody,
311) -> Result<ImportStatus, LoroError> {
312    let ParsedHeaderAndBody { mode, body, .. } = parsed;
313    let changes = match mode {
314        EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
315            outdated_encode_reordered::decode_updates(oplog, body)
316        }
317        EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
318        EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
319        EncodeMode::Auto => unreachable!(),
320    }?;
321    let ImportChangesResult {
322        mut imported,
323        latest_ids,
324        pending_changes,
325        changes_that_have_deps_before_shallow_root,
326    } = import_changes_to_oplog(changes, oplog);
327
328    let mut pending = VersionRange::default();
329    pending_changes.iter().for_each(|c| {
330        pending.extends_to_include_id_span(c.id_span());
331    });
332    // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
333    oplog.try_apply_pending(latest_ids, Some(&mut imported));
334    oplog.import_unknown_lamport_pending_changes(pending_changes)?;
335    if !changes_that_have_deps_before_shallow_root.is_empty() {
336        return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
337    }
338    Ok(ImportStatus {
339        success: imported,
340        pending: (!pending.is_empty()).then_some(pending),
341    })
342}
343
344pub(crate) struct ParsedHeaderAndBody<'a> {
345    pub checksum: [u8; 16],
346    pub checksum_body: &'a [u8],
347    pub mode: EncodeMode,
348    pub body: &'a [u8],
349}
350
351const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
352impl ParsedHeaderAndBody<'_> {
353    /// Return if the checksum is correct.
354    fn check_checksum(&self) -> LoroResult<()> {
355        match self.mode {
356            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
357                if md5::compute(self.checksum_body).0 != self.checksum {
358                    return Err(LoroError::DecodeChecksumMismatchError);
359                }
360            }
361            EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
362                let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap());
363                if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
364                    return Err(LoroError::DecodeChecksumMismatchError);
365                }
366            }
367            EncodeMode::Auto => unreachable!(),
368        }
369
370        Ok(())
371    }
372}
373
374const MIN_HEADER_SIZE: usize = 22;
375pub(crate) fn parse_header_and_body(
376    bytes: &[u8],
377    check_checksum: bool,
378) -> Result<ParsedHeaderAndBody, LoroError> {
379    let reader = &bytes;
380    if bytes.len() < MIN_HEADER_SIZE {
381        return Err(LoroError::DecodeError("Invalid import data".into()));
382    }
383
384    let (magic_bytes, reader) = reader.split_at(4);
385    let magic_bytes: [u8; 4] = magic_bytes.try_into().unwrap();
386    if magic_bytes != MAGIC_BYTES {
387        return Err(LoroError::DecodeError("Invalid magic bytes".into()));
388    }
389
390    let (checksum, reader) = reader.split_at(16);
391    let checksum_body = reader;
392    let (mode_bytes, reader) = reader.split_at(2);
393    let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
394
395    let ans = ParsedHeaderAndBody {
396        mode,
397        checksum_body,
398        checksum: checksum.try_into().unwrap(),
399        body: reader,
400    };
401
402    if check_checksum {
403        ans.check_checksum()?;
404    }
405    Ok(ans)
406}
407
408fn encode_header_and_body(mode: EncodeMode, body: Vec<u8>) -> Vec<u8> {
409    let mut ans = Vec::new();
410    ans.extend(MAGIC_BYTES);
411    let checksum = [0; 16];
412    ans.extend(checksum);
413    ans.extend(mode.to_bytes());
414    ans.extend(body);
415    let checksum_body = &ans[20..];
416    let checksum = md5::compute(checksum_body).0;
417    ans[4..20].copy_from_slice(&checksum);
418    ans
419}
420
421pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
422    let body = outdated_encode_reordered::encode_snapshot(
423        &doc.oplog().try_lock().unwrap(),
424        &mut doc.app_state().try_lock().unwrap(),
425        &Default::default(),
426    );
427
428    encode_header_and_body(EncodeMode::OutdatedSnapshot, body)
429}
430
431pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
432    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
433        fast_snapshot::encode_snapshot(doc, ans);
434        Ok(())
435    })
436    .unwrap()
437}
438
439pub(crate) fn export_snapshot_at(
440    doc: &LoroDoc,
441    frontiers: &Frontiers,
442) -> Result<Vec<u8>, LoroEncodeError> {
443    check_target_version_reachable(doc, frontiers)?;
444    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
445        shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
446    })
447}
448
449pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
450    encode_with(EncodeMode::FastUpdates, &mut |ans| {
451        fast_snapshot::encode_updates(doc, vv, ans);
452        Ok(())
453    })
454    .unwrap()
455}
456
457pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
458    encode_with(EncodeMode::FastUpdates, &mut |ans| {
459        fast_snapshot::encode_updates_in_range(oplog, spans, ans);
460        Ok(())
461    })
462    .unwrap()
463}
464
465pub(crate) fn export_shallow_snapshot(
466    doc: &LoroDoc,
467    f: &Frontiers,
468) -> Result<Vec<u8>, LoroEncodeError> {
469    check_target_version_reachable(doc, f)?;
470    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
471        shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
472        Ok(())
473    })
474}
475
476fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
477    let oplog = doc.oplog.try_lock().unwrap();
478    if !oplog.dag.can_export_shallow_snapshot_on(f) {
479        return Err(LoroEncodeError::FrontiersNotFound(format!("{:?}", f)));
480    }
481
482    Ok(())
483}
484
485pub(crate) fn export_state_only_snapshot(
486    doc: &LoroDoc,
487    f: &Frontiers,
488) -> Result<Vec<u8>, LoroEncodeError> {
489    check_target_version_reachable(doc, f)?;
490    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
491        shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
492        Ok(())
493    })
494}
495
496fn encode_with(
497    mode: EncodeMode,
498    f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
499) -> Result<Vec<u8>, LoroEncodeError> {
500    // HEADER
501    let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
502    ans.extend(MAGIC_BYTES);
503    let checksum = [0; 16];
504    ans.extend(checksum);
505    ans.extend(mode.to_bytes());
506
507    // BODY
508    f(&mut ans)?;
509
510    // CHECKSUM in HEADER
511    let checksum_body = &ans[20..];
512    let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
513    ans[16..20].copy_from_slice(&checksum.to_le_bytes());
514    Ok(ans)
515}
516
517pub(crate) fn decode_snapshot(
518    doc: &LoroDoc,
519    mode: EncodeMode,
520    body: &[u8],
521) -> Result<ImportStatus, LoroError> {
522    match mode {
523        EncodeMode::OutdatedSnapshot => outdated_encode_reordered::decode_snapshot(doc, body)?,
524        EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into())?,
525        _ => unreachable!(),
526    };
527    Ok(ImportStatus {
528        success: VersionRange::from_vv(&doc.oplog_vv()),
529        pending: None,
530    })
531}
532
533#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
534pub enum EncodedBlobMode {
535    Snapshot,
536    OutdatedSnapshot,
537    ShallowSnapshot,
538    OutdatedRle,
539    Updates,
540}
541
542impl std::fmt::Display for EncodedBlobMode {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        f.write_str(match self {
545            EncodedBlobMode::OutdatedRle => "outdated-update",
546            EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
547            EncodedBlobMode::Snapshot => "snapshot",
548            EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
549            EncodedBlobMode::Updates => "update",
550        })
551    }
552}
553
554impl EncodedBlobMode {
555    pub fn is_snapshot(&self) -> bool {
556        matches!(
557            self,
558            EncodedBlobMode::Snapshot
559                | EncodedBlobMode::ShallowSnapshot
560                | EncodedBlobMode::OutdatedSnapshot
561        )
562    }
563}
564
565#[derive(Debug, Clone)]
566pub struct ImportBlobMetadata {
567    /// The partial start version vector.
568    ///
569    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
570    /// However, it does not constitute a complete version vector, as it only contains counters
571    /// from peers included within the import blob.
572    pub partial_start_vv: VersionVector,
573    /// The partial end version vector.
574    ///
575    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
576    /// However, it does not constitute a complete version vector, as it only contains counters
577    /// from peers included within the import blob.
578    pub partial_end_vv: VersionVector,
579    pub start_timestamp: i64,
580    pub start_frontiers: Frontiers,
581    pub end_timestamp: i64,
582    pub change_num: u32,
583    pub mode: EncodedBlobMode,
584}
585
586impl LoroDoc {
587    /// Decodes the metadata for an imported blob from the provided bytes.
588    pub fn decode_import_blob_meta(
589        blob: &[u8],
590        check_checksum: bool,
591    ) -> LoroResult<ImportBlobMetadata> {
592        let parsed = parse_header_and_body(blob, check_checksum)?;
593        match parsed.mode {
594            EncodeMode::Auto => unreachable!(),
595            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
596                outdated_encode_reordered::decode_import_blob_meta(parsed)
597            }
598            EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
599            EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
600        }
601    }
602}
603
604#[cfg(test)]
605mod test {
606
607    use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
608
609    #[test]
610    fn test_value_encode_size() {
611        fn assert_size(value: LoroValue, max_size: usize) {
612            let size = postcard::to_allocvec(&value).unwrap().len();
613            assert!(
614                size <= max_size,
615                "value: {:?}, size: {}, max_size: {}",
616                value,
617                size,
618                max_size
619            );
620        }
621
622        assert_size(LoroValue::Null, 1);
623        assert_size(LoroValue::I64(1), 2);
624        assert_size(LoroValue::Double(1.), 9);
625        assert_size(LoroValue::Bool(true), 2);
626        assert_size(LoroValue::String("123".to_string().into()), 5);
627        assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
628        assert_size(
629            loro_value!({
630                "a": 1,
631                "b": 2,
632            }),
633            10,
634        );
635        assert_size(loro_value!([1, 2, 3]), 8);
636        assert_size(
637            LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
638            5,
639        );
640        assert_size(
641            LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
642            5,
643        );
644    }
645}