Skip to main content

loro_internal/
encoding.rs

1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9    decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::change::Change;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
19use num_traits::{FromPrimitive, ToPrimitive};
20use std::borrow::Cow;
21
22/// The mode of the export.
23///
24/// Loro CRDT internally consists of two parts: document history and current document state.
25/// The export modes offer various options to meet different requirements.
26///
27/// - CRDT property: Documents maintain consistent states when they receive the same set of updates.
28/// - In real-time collaboration, peers typically only need to synchronize updates
29///   (operations/history) to achieve consistency.
30///
31/// ## Update Export
32///
33/// - Exports only the history part, containing multiple operations.
34/// - Suitable for real-time collaboration scenarios where peers only need to synchronize updates.
35///
36/// ## Snapshot Export
37///
38/// ### Default Snapshot
39///
40/// - Includes complete history and current full state.
41///
42/// ### Shallow Snapshot
43///
44/// - Contains the complete current state.
45/// - Retains partial history starting from a specified version.
46///
47/// ### State-only Snapshot
48///
49/// - Exports the state of the target version.
50/// - Includes a minimal set of operation history.
51#[non_exhaustive]
52#[derive(Debug, Clone)]
53pub enum ExportMode<'a> {
54    /// It contains the full history and the current state of the document.
55    Snapshot,
56    /// It contains the history since the `from` version vector.
57    Updates { from: Cow<'a, VersionVector> },
58    /// This mode exports the history in the specified range.
59    UpdatesInRange { spans: Cow<'a, [IdSpan]> },
60    /// The shallow snapshot only contains the history since the target frontiers
61    ShallowSnapshot(Cow<'a, Frontiers>),
62    /// The state only snapshot exports the state of the target version
63    /// with a minimal set of history (a few ops).
64    ///
65    /// It's a shallow snapshot with depth=1 at the target version.
66    /// If the target version is None, it will use the latest version as the target version.
67    StateOnly(Option<Cow<'a, Frontiers>>),
68    /// The snapshot at the specified frontiers. It contains the full history
69    /// till the target frontiers and the state at the target frontiers.
70    SnapshotAt { version: Cow<'a, Frontiers> },
71}
72
73impl<'a> ExportMode<'a> {
74    /// It contains the full history and the current state of the document.
75    pub fn snapshot() -> Self {
76        ExportMode::Snapshot
77    }
78
79    /// It contains the history since the `from` version vector.
80    pub fn updates(from: &'a VersionVector) -> Self {
81        ExportMode::Updates {
82            from: Cow::Borrowed(from),
83        }
84    }
85
86    /// It contains the history since the `from` version vector.
87    pub fn updates_owned(from: VersionVector) -> Self {
88        ExportMode::Updates {
89            from: Cow::Owned(from),
90        }
91    }
92
93    /// It contains all the history of the document.
94    pub fn all_updates() -> Self {
95        ExportMode::Updates {
96            from: Cow::Owned(Default::default()),
97        }
98    }
99
100    /// This mode exports the history in the specified range.
101    pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
102        ExportMode::UpdatesInRange {
103            spans: spans.into(),
104        }
105    }
106
107    /// The shallow snapshot only contains the history since the target frontiers.
108    pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
109        ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
110    }
111
112    /// The shallow snapshot only contains the history since the target frontiers.
113    pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
114        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
115    }
116
117    /// The shallow snapshot only contains the history since the target frontiers.
118    pub fn shallow_snapshot_since(id: ID) -> Self {
119        let frontiers = Frontiers::from_id(id);
120        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
121    }
122
123    /// The state only snapshot exports the state of the target version
124    /// with a minimal set of history (a few ops).
125    ///
126    /// It's a shallow snapshot with depth=1 at the target version.
127    /// If the target version is None, it will use the latest version as the target version.
128    pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
129        ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
130    }
131
132    /// The snapshot at the specified frontiers. It contains the full history
133    /// till the target frontiers and the state at the target frontiers.
134    pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
135        ExportMode::SnapshotAt {
136            version: Cow::Borrowed(frontiers),
137        }
138    }
139
140    /// This mode exports the history within the specified version vector.
141    pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
142        let mut spans = Vec::with_capacity(vv.len());
143        for (peer, counter) in vv.iter() {
144            if *counter > 0 {
145                spans.push(IdSpan::new(*peer, 0, *counter));
146            }
147        }
148
149        ExportMode::UpdatesInRange {
150            spans: Cow::Owned(spans),
151        }
152    }
153}
154
155const MAGIC_BYTES: [u8; 4] = *b"loro";
156
157#[derive(Clone, Copy, Debug, PartialEq, Eq)]
158pub(crate) enum EncodeMode {
159    // This is a config option, it won't be used in encoding.
160    Auto = 255,
161    OutdatedRle = 1,
162    OutdatedSnapshot = 2,
163    FastSnapshot = 3,
164    FastUpdates = 4,
165}
166
167impl num_traits::FromPrimitive for EncodeMode {
168    #[allow(trivial_numeric_casts)]
169    #[inline]
170    fn from_i64(n: i64) -> Option<Self> {
171        match n {
172            n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
173            n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
174            n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
175            n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
176            n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
177            _ => None,
178        }
179    }
180    #[inline]
181    fn from_u64(n: u64) -> Option<Self> {
182        Self::from_i64(n as i64)
183    }
184}
185
186impl num_traits::ToPrimitive for EncodeMode {
187    #[inline]
188    #[allow(trivial_numeric_casts)]
189    fn to_i64(&self) -> Option<i64> {
190        Some(match *self {
191            EncodeMode::Auto => EncodeMode::Auto as i64,
192            EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
193            EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
194            EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
195            EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
196        })
197    }
198    #[inline]
199    fn to_u64(&self) -> Option<u64> {
200        self.to_i64().map(|x| x as u64)
201    }
202}
203
204impl EncodeMode {
205    pub fn to_bytes(self) -> [u8; 2] {
206        let value = self.to_u16().unwrap();
207        value.to_be_bytes()
208    }
209
210    pub fn is_snapshot(self) -> bool {
211        matches!(
212            self,
213            EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
214        )
215    }
216}
217
218impl TryFrom<[u8; 2]> for EncodeMode {
219    type Error = LoroError;
220
221    fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
222        let value = u16::from_be_bytes(value);
223        Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
224    }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Default)]
228pub struct ImportStatus {
229    pub success: VersionRange,
230    pub pending: Option<VersionRange>,
231}
232
233pub(crate) fn decode_oplog(
234    oplog: &mut OpLog,
235    parsed: ParsedHeaderAndBody,
236) -> Result<ImportStatus, LoroError> {
237    let changes = decode_oplog_changes(oplog, parsed)?;
238    let result = apply_decoded_changes_to_oplog(oplog, changes);
239    if result.has_deps_before_shallow_root {
240        return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
241    }
242
243    Ok(result.status)
244}
245
246pub(crate) fn decode_oplog_changes(
247    oplog: &mut OpLog,
248    parsed: ParsedHeaderAndBody,
249) -> Result<Vec<Change>, LoroError> {
250    let ParsedHeaderAndBody { mode, body, .. } = parsed;
251    match mode {
252        EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
253            Err(LoroError::ImportUnsupportedEncodingMode)
254        }
255        EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
256        EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
257        EncodeMode::Auto => unreachable!(),
258    }
259}
260
261pub(crate) struct ApplyDecodedChangesResult {
262    pub status: ImportStatus,
263    pub has_deps_before_shallow_root: bool,
264}
265
266pub(crate) fn apply_decoded_changes_to_oplog(
267    oplog: &mut OpLog,
268    changes: Vec<Change>,
269) -> ApplyDecodedChangesResult {
270    let ImportChangesResult {
271        mut imported,
272        latest_ids,
273        pending_changes,
274        changes_that_have_deps_before_shallow_root,
275    } = import_changes_to_oplog(changes, oplog);
276
277    let mut pending = VersionRange::default();
278    pending_changes.iter().for_each(|c| {
279        pending.extends_to_include_id_span(c.id_span());
280    });
281    // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
282    oplog.try_apply_pending(latest_ids, Some(&mut imported));
283    oplog
284        .import_unknown_lamport_pending_changes(pending_changes)
285        .expect("importing unknown-lamport pending changes is infallible");
286    ApplyDecodedChangesResult {
287        status: ImportStatus {
288            success: imported,
289            pending: (!pending.is_empty()).then_some(pending),
290        },
291        has_deps_before_shallow_root: !changes_that_have_deps_before_shallow_root.is_empty(),
292    }
293}
294
295pub(crate) struct ParsedHeaderAndBody<'a> {
296    pub checksum: [u8; 16],
297    pub checksum_body: &'a [u8],
298    pub mode: EncodeMode,
299    pub body: &'a [u8],
300}
301
302const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
303impl ParsedHeaderAndBody<'_> {
304    /// Return if the checksum is correct.
305    fn check_checksum(&self) -> LoroResult<()> {
306        match self.mode {
307            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
308                if md5::compute(self.checksum_body).0 != self.checksum {
309                    return Err(LoroError::DecodeChecksumMismatchError);
310                }
311            }
312            EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
313                let mut expected_bytes = [0; 4];
314                expected_bytes.copy_from_slice(&self.checksum[12..16]);
315                let expected = u32::from_le_bytes(expected_bytes);
316                if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
317                    return Err(LoroError::DecodeChecksumMismatchError);
318                }
319            }
320            EncodeMode::Auto => {
321                return Err(LoroError::DecodeError(
322                    "Invalid import mode `Auto` in encoded blob"
323                        .to_string()
324                        .into_boxed_str(),
325                ));
326            }
327        }
328
329        Ok(())
330    }
331}
332
333const MIN_HEADER_SIZE: usize = 22;
334pub(crate) fn parse_header_and_body(
335    bytes: &[u8],
336    check_checksum: bool,
337) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
338    let reader = &bytes;
339    if bytes.len() < MIN_HEADER_SIZE {
340        return Err(LoroError::DecodeError("Invalid import data".into()));
341    }
342
343    let (magic_bytes, reader) = reader.split_at(4);
344    if magic_bytes != MAGIC_BYTES {
345        return Err(LoroError::DecodeError("Invalid magic bytes".into()));
346    }
347
348    let (checksum, reader) = reader.split_at(16);
349    let checksum_body = reader;
350    let (mode_bytes, reader) = reader.split_at(2);
351    let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
352    if mode == EncodeMode::Auto {
353        return Err(LoroError::DecodeError(
354            "Invalid import mode `Auto` in encoded blob"
355                .to_string()
356                .into_boxed_str(),
357        ));
358    }
359    let mut checksum_arr = [0; 16];
360    checksum_arr.copy_from_slice(checksum);
361
362    let ans = ParsedHeaderAndBody {
363        mode,
364        checksum_body,
365        checksum: checksum_arr,
366        body: reader,
367    };
368
369    if check_checksum {
370        ans.check_checksum()?;
371    }
372    Ok(ans)
373}
374
375pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
376    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
377        fast_snapshot::encode_snapshot(doc, ans);
378        Ok(())
379    })
380    .unwrap()
381}
382
383pub(crate) fn export_snapshot_at(
384    doc: &LoroDoc,
385    frontiers: &Frontiers,
386) -> Result<Vec<u8>, LoroEncodeError> {
387    check_target_version_reachable(doc, frontiers)?;
388    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
389        shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
390    })
391}
392
393pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
394    encode_with(EncodeMode::FastUpdates, &mut |ans| {
395        fast_snapshot::encode_updates(doc, vv, ans);
396        Ok(())
397    })
398    .unwrap()
399}
400
401pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
402    encode_with(EncodeMode::FastUpdates, &mut |ans| {
403        fast_snapshot::encode_updates_in_range(oplog, spans, ans);
404        Ok(())
405    })
406    .unwrap()
407}
408
409pub(crate) fn export_shallow_snapshot(
410    doc: &LoroDoc,
411    f: &Frontiers,
412) -> Result<Vec<u8>, LoroEncodeError> {
413    check_target_version_reachable(doc, f)?;
414    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
415        shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
416        Ok(())
417    })
418}
419
420fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
421    let oplog = doc.oplog.lock();
422    if !oplog.dag.can_export_shallow_snapshot_on(f) {
423        return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
424    }
425
426    Ok(())
427}
428
429pub(crate) fn export_state_only_snapshot(
430    doc: &LoroDoc,
431    f: &Frontiers,
432) -> Result<Vec<u8>, LoroEncodeError> {
433    check_target_version_reachable(doc, f)?;
434    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
435        shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
436        Ok(())
437    })
438}
439
440fn encode_with(
441    mode: EncodeMode,
442    f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
443) -> Result<Vec<u8>, LoroEncodeError> {
444    // HEADER
445    let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
446    ans.extend(MAGIC_BYTES);
447    let checksum = [0; 16];
448    ans.extend(checksum);
449    ans.extend(mode.to_bytes());
450
451    // BODY
452    f(&mut ans)?;
453
454    // CHECKSUM in HEADER
455    let checksum_body = &ans[20..];
456    let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
457    ans[16..20].copy_from_slice(&checksum.to_le_bytes());
458    Ok(ans)
459}
460
461pub(crate) fn decode_snapshot(
462    doc: &LoroDoc,
463    mode: EncodeMode,
464    body: &[u8],
465    origin: InternalString,
466) -> Result<ImportStatus, LoroError> {
467    match mode {
468        EncodeMode::OutdatedSnapshot => {
469            return Err(LoroError::ImportUnsupportedEncodingMode);
470        }
471        EncodeMode::FastSnapshot => {
472            fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
473        }
474        _ => {
475            return Err(LoroError::DecodeError(
476                format!("Invalid snapshot encoding mode: {mode:?}").into_boxed_str(),
477            ));
478        }
479    };
480    Ok(ImportStatus {
481        success: VersionRange::from_vv(&doc.oplog_vv()),
482        pending: None,
483    })
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
487pub enum EncodedBlobMode {
488    Snapshot,
489    OutdatedSnapshot,
490    ShallowSnapshot,
491    OutdatedRle,
492    Updates,
493}
494
495impl std::fmt::Display for EncodedBlobMode {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        f.write_str(match self {
498            EncodedBlobMode::OutdatedRle => "outdated-update",
499            EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
500            EncodedBlobMode::Snapshot => "snapshot",
501            EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
502            EncodedBlobMode::Updates => "update",
503        })
504    }
505}
506
507impl EncodedBlobMode {
508    pub fn is_snapshot(&self) -> bool {
509        matches!(
510            self,
511            EncodedBlobMode::Snapshot
512                | EncodedBlobMode::ShallowSnapshot
513                | EncodedBlobMode::OutdatedSnapshot
514        )
515    }
516}
517
518#[derive(Debug, Clone)]
519pub struct ImportBlobMetadata {
520    /// The partial start version vector.
521    ///
522    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
523    /// However, it does not constitute a complete version vector, as it only contains counters
524    /// from peers included within the import blob.
525    pub partial_start_vv: VersionVector,
526    /// The partial end version vector.
527    ///
528    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
529    /// However, it does not constitute a complete version vector, as it only contains counters
530    /// from peers included within the import blob.
531    pub partial_end_vv: VersionVector,
532    pub start_timestamp: i64,
533    pub start_frontiers: Frontiers,
534    pub end_timestamp: i64,
535    pub change_num: u32,
536    pub mode: EncodedBlobMode,
537}
538
539impl LoroDoc {
540    /// Decodes the metadata for an imported blob from the provided bytes.
541    pub fn decode_import_blob_meta(
542        blob: &[u8],
543        check_checksum: bool,
544    ) -> LoroResult<ImportBlobMetadata> {
545        let parsed = parse_header_and_body(blob, check_checksum)?;
546        match parsed.mode {
547            EncodeMode::Auto => unreachable!(),
548            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
549                Err(LoroError::ImportUnsupportedEncodingMode)
550            }
551            EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
552            EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
553        }
554    }
555}
556
557#[cfg(test)]
558mod test {
559
560    use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
561
562    #[test]
563    fn test_value_encode_size() {
564        fn assert_size(value: LoroValue, max_size: usize) {
565            let size = postcard::to_allocvec(&value).unwrap().len();
566            assert!(
567                size <= max_size,
568                "value: {:?}, size: {}, max_size: {}",
569                value,
570                size,
571                max_size
572            );
573        }
574
575        assert_size(LoroValue::Null, 1);
576        assert_size(LoroValue::I64(1), 2);
577        assert_size(LoroValue::Double(1.), 9);
578        assert_size(LoroValue::Bool(true), 2);
579        assert_size(LoroValue::String("123".to_string().into()), 5);
580        assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
581        assert_size(
582            loro_value!({
583                "a": 1,
584                "b": 2,
585            }),
586            10,
587        );
588        assert_size(loro_value!([1, 2, 3]), 8);
589        assert_size(
590            LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
591            5,
592        );
593        assert_size(
594            LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
595            5,
596        );
597    }
598}