Skip to main content

loro_internal/
encoding.rs

1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9    decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::change::Change;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
19use num_traits::{FromPrimitive, ToPrimitive};
20use std::borrow::Cow;
21
22/// The mode of the export.
23///
24/// Loro CRDT internally consists of two parts: document history and current document state.
25/// The export modes offer various options to meet different requirements.
26///
27/// - CRDT property: Documents maintain consistent states when they receive the same set of updates.
28/// - In real-time collaboration, peers typically only need to synchronize updates
29///   (operations/history) to achieve consistency.
30///
31/// ## Update Export
32///
33/// - Exports only the history part, containing multiple operations.
34/// - Suitable for real-time collaboration scenarios where peers only need to synchronize updates.
35///
36/// ## Snapshot Export
37///
38/// ### Default Snapshot
39///
40/// - Includes complete history and current full state.
41///
42/// ### Shallow Snapshot
43///
44/// - Contains the complete current state.
45/// - Retains partial history starting from a specified version.
46///
47/// ### State-only Snapshot
48///
49/// - Exports the state of the target version.
50/// - Includes a minimal set of operation history.
51#[non_exhaustive]
52#[derive(Debug, Clone)]
53pub enum ExportMode<'a> {
54    /// It contains the full history and the current state of the document.
55    Snapshot,
56    /// It contains the history since the `from` version vector.
57    Updates { from: Cow<'a, VersionVector> },
58    /// This mode exports the history in the specified range.
59    UpdatesInRange { spans: Cow<'a, [IdSpan]> },
60    /// The shallow snapshot only contains the history since the target frontiers
61    ShallowSnapshot(Cow<'a, Frontiers>),
62    /// The state only snapshot exports the state of the target version
63    /// with a minimal set of history (a few ops).
64    ///
65    /// It's a shallow snapshot with depth=1 at the target version.
66    /// If the target version is None, it will use the latest version as the target version.
67    StateOnly(Option<Cow<'a, Frontiers>>),
68    /// The snapshot at the specified frontiers. It contains the full history
69    /// till the target frontiers and the state at the target frontiers.
70    SnapshotAt { version: Cow<'a, Frontiers> },
71}
72
73impl<'a> ExportMode<'a> {
74    /// It contains the full history and the current state of the document.
75    pub fn snapshot() -> Self {
76        ExportMode::Snapshot
77    }
78
79    /// It contains the history since the `from` version vector.
80    pub fn updates(from: &'a VersionVector) -> Self {
81        ExportMode::Updates {
82            from: Cow::Borrowed(from),
83        }
84    }
85
86    /// It contains the history since the `from` version vector.
87    pub fn updates_owned(from: VersionVector) -> Self {
88        ExportMode::Updates {
89            from: Cow::Owned(from),
90        }
91    }
92
93    /// It contains all the history of the document.
94    pub fn all_updates() -> Self {
95        ExportMode::Updates {
96            from: Cow::Owned(Default::default()),
97        }
98    }
99
100    /// This mode exports the history in the specified range.
101    pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
102        ExportMode::UpdatesInRange {
103            spans: spans.into(),
104        }
105    }
106
107    /// The shallow snapshot only contains the history since the target frontiers.
108    pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
109        ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
110    }
111
112    /// The shallow snapshot only contains the history since the target frontiers.
113    pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
114        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
115    }
116
117    /// The shallow snapshot only contains the history since the target frontiers.
118    pub fn shallow_snapshot_since(id: ID) -> Self {
119        let frontiers = Frontiers::from_id(id);
120        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
121    }
122
123    /// The state only snapshot exports the state of the target version
124    /// with a minimal set of history (a few ops).
125    ///
126    /// It's a shallow snapshot with depth=1 at the target version.
127    /// If the target version is None, it will use the latest version as the target version.
128    pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
129        ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
130    }
131
132    /// The snapshot at the specified frontiers. It contains the full history
133    /// till the target frontiers and the state at the target frontiers.
134    pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
135        ExportMode::SnapshotAt {
136            version: Cow::Borrowed(frontiers),
137        }
138    }
139
140    /// This mode exports the history within the specified version vector.
141    pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
142        let mut spans = Vec::with_capacity(vv.len());
143        for (peer, counter) in vv.iter() {
144            if *counter > 0 {
145                spans.push(IdSpan::new(*peer, 0, *counter));
146            }
147        }
148
149        ExportMode::UpdatesInRange {
150            spans: Cow::Owned(spans),
151        }
152    }
153}
154
155const MAGIC_BYTES: [u8; 4] = *b"loro";
156
157#[derive(Clone, Copy, Debug, PartialEq, Eq)]
158pub(crate) enum EncodeMode {
159    // This is a config option, it won't be used in encoding.
160    Auto = 255,
161    OutdatedRle = 1,
162    OutdatedSnapshot = 2,
163    FastSnapshot = 3,
164    FastUpdates = 4,
165}
166
167impl num_traits::FromPrimitive for EncodeMode {
168    #[allow(trivial_numeric_casts)]
169    #[inline]
170    fn from_i64(n: i64) -> Option<Self> {
171        match n {
172            n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
173            n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
174            n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
175            n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
176            n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
177            _ => None,
178        }
179    }
180    #[inline]
181    fn from_u64(n: u64) -> Option<Self> {
182        Self::from_i64(n as i64)
183    }
184}
185
186impl num_traits::ToPrimitive for EncodeMode {
187    #[inline]
188    #[allow(trivial_numeric_casts)]
189    fn to_i64(&self) -> Option<i64> {
190        Some(match *self {
191            EncodeMode::Auto => EncodeMode::Auto as i64,
192            EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
193            EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
194            EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
195            EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
196        })
197    }
198    #[inline]
199    fn to_u64(&self) -> Option<u64> {
200        self.to_i64().map(|x| x as u64)
201    }
202}
203
204impl EncodeMode {
205    pub fn to_bytes(self) -> [u8; 2] {
206        let value = self.to_u16().unwrap();
207        value.to_be_bytes()
208    }
209
210    pub fn is_snapshot(self) -> bool {
211        matches!(
212            self,
213            EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
214        )
215    }
216}
217
218impl TryFrom<[u8; 2]> for EncodeMode {
219    type Error = LoroError;
220
221    fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
222        let value = u16::from_be_bytes(value);
223        Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
224    }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Default)]
228pub struct ImportStatus {
229    pub success: VersionRange,
230    pub pending: Option<VersionRange>,
231}
232
233pub(crate) fn decode_oplog(
234    oplog: &mut OpLog,
235    parsed: ParsedHeaderAndBody,
236) -> Result<ImportStatus, LoroError> {
237    let changes = decode_oplog_changes(oplog, parsed)?;
238    let result = apply_decoded_changes_to_oplog(oplog, changes);
239    if result.has_deps_before_shallow_root {
240        return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
241    }
242
243    Ok(result.status)
244}
245
246pub(crate) fn decode_oplog_changes(
247    oplog: &mut OpLog,
248    parsed: ParsedHeaderAndBody,
249) -> Result<Vec<Change>, LoroError> {
250    let ParsedHeaderAndBody { mode, body, .. } = parsed;
251    match mode {
252        EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
253            Err(LoroError::ImportUnsupportedEncodingMode)
254        }
255        EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
256        EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
257        EncodeMode::Auto => unreachable!(),
258    }
259}
260
261pub(crate) struct ApplyDecodedChangesResult {
262    pub status: ImportStatus,
263    pub has_deps_before_shallow_root: bool,
264}
265
266pub(crate) fn apply_decoded_changes_to_oplog(
267    oplog: &mut OpLog,
268    changes: Vec<Change>,
269) -> ApplyDecodedChangesResult {
270    let ImportChangesResult {
271        mut imported,
272        latest_ids,
273        pending_changes,
274        changes_that_have_deps_before_shallow_root,
275    } = import_changes_to_oplog(changes, oplog);
276
277    let mut pending = VersionRange::default();
278    pending_changes.iter().for_each(|c| {
279        pending.extends_to_include_id_span(c.id_span());
280    });
281    // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
282    oplog.try_apply_pending(latest_ids, Some(&mut imported));
283    oplog
284        .import_unknown_lamport_pending_changes(pending_changes)
285        .expect("importing unknown-lamport pending changes is infallible");
286    ApplyDecodedChangesResult {
287        status: ImportStatus {
288            success: imported,
289            pending: (!pending.is_empty()).then_some(pending),
290        },
291        has_deps_before_shallow_root: !changes_that_have_deps_before_shallow_root.is_empty(),
292    }
293}
294
295pub(crate) struct ParsedHeaderAndBody<'a> {
296    pub checksum: [u8; 16],
297    pub checksum_body: &'a [u8],
298    pub mode: EncodeMode,
299    pub body: &'a [u8],
300}
301
302const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
303impl ParsedHeaderAndBody<'_> {
304    /// Return if the checksum is correct.
305    fn check_checksum(&self) -> LoroResult<()> {
306        match self.mode {
307            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
308                if md5::compute(self.checksum_body).0 != self.checksum {
309                    return Err(LoroError::DecodeChecksumMismatchError);
310                }
311            }
312            EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
313                let mut expected_bytes = [0; 4];
314                expected_bytes.copy_from_slice(&self.checksum[12..16]);
315                let expected = u32::from_le_bytes(expected_bytes);
316                if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
317                    return Err(LoroError::DecodeChecksumMismatchError);
318                }
319            }
320            EncodeMode::Auto => {
321                return Err(LoroError::DecodeError(
322                    "Invalid import mode `Auto` in encoded blob"
323                        .to_string()
324                        .into_boxed_str(),
325                ));
326            }
327        }
328
329        Ok(())
330    }
331}
332
333const MIN_HEADER_SIZE: usize = 22;
334pub(crate) fn parse_header_and_body(
335    bytes: &[u8],
336    check_checksum: bool,
337) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
338    let reader = &bytes;
339    if bytes.len() < MIN_HEADER_SIZE {
340        return Err(LoroError::DecodeError("Invalid import data".into()));
341    }
342
343    let (magic_bytes, reader) = reader.split_at(4);
344    if magic_bytes != MAGIC_BYTES {
345        return Err(LoroError::DecodeError("Invalid magic bytes".into()));
346    }
347
348    let (checksum, reader) = reader.split_at(16);
349    let checksum_body = reader;
350    let (mode_bytes, reader) = reader.split_at(2);
351    let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
352    let mut checksum_arr = [0; 16];
353    checksum_arr.copy_from_slice(checksum);
354
355    let ans = ParsedHeaderAndBody {
356        mode,
357        checksum_body,
358        checksum: checksum_arr,
359        body: reader,
360    };
361
362    if check_checksum {
363        ans.check_checksum()?;
364    }
365    Ok(ans)
366}
367
368pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
369    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
370        fast_snapshot::encode_snapshot(doc, ans);
371        Ok(())
372    })
373    .unwrap()
374}
375
376pub(crate) fn export_snapshot_at(
377    doc: &LoroDoc,
378    frontiers: &Frontiers,
379) -> Result<Vec<u8>, LoroEncodeError> {
380    check_target_version_reachable(doc, frontiers)?;
381    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
382        shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
383    })
384}
385
386pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
387    encode_with(EncodeMode::FastUpdates, &mut |ans| {
388        fast_snapshot::encode_updates(doc, vv, ans);
389        Ok(())
390    })
391    .unwrap()
392}
393
394pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
395    encode_with(EncodeMode::FastUpdates, &mut |ans| {
396        fast_snapshot::encode_updates_in_range(oplog, spans, ans);
397        Ok(())
398    })
399    .unwrap()
400}
401
402pub(crate) fn export_shallow_snapshot(
403    doc: &LoroDoc,
404    f: &Frontiers,
405) -> Result<Vec<u8>, LoroEncodeError> {
406    check_target_version_reachable(doc, f)?;
407    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
408        shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
409        Ok(())
410    })
411}
412
413fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
414    let oplog = doc.oplog.lock();
415    if !oplog.dag.can_export_shallow_snapshot_on(f) {
416        return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
417    }
418
419    Ok(())
420}
421
422pub(crate) fn export_state_only_snapshot(
423    doc: &LoroDoc,
424    f: &Frontiers,
425) -> Result<Vec<u8>, LoroEncodeError> {
426    check_target_version_reachable(doc, f)?;
427    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
428        shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
429        Ok(())
430    })
431}
432
433fn encode_with(
434    mode: EncodeMode,
435    f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
436) -> Result<Vec<u8>, LoroEncodeError> {
437    // HEADER
438    let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
439    ans.extend(MAGIC_BYTES);
440    let checksum = [0; 16];
441    ans.extend(checksum);
442    ans.extend(mode.to_bytes());
443
444    // BODY
445    f(&mut ans)?;
446
447    // CHECKSUM in HEADER
448    let checksum_body = &ans[20..];
449    let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
450    ans[16..20].copy_from_slice(&checksum.to_le_bytes());
451    Ok(ans)
452}
453
454pub(crate) fn decode_snapshot(
455    doc: &LoroDoc,
456    mode: EncodeMode,
457    body: &[u8],
458    origin: InternalString,
459) -> Result<ImportStatus, LoroError> {
460    match mode {
461        EncodeMode::OutdatedSnapshot => {
462            return Err(LoroError::ImportUnsupportedEncodingMode);
463        }
464        EncodeMode::FastSnapshot => {
465            fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
466        }
467        _ => {
468            return Err(LoroError::DecodeError(
469                format!("Invalid snapshot encoding mode: {mode:?}").into_boxed_str(),
470            ));
471        }
472    };
473    Ok(ImportStatus {
474        success: VersionRange::from_vv(&doc.oplog_vv()),
475        pending: None,
476    })
477}
478
479#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
480pub enum EncodedBlobMode {
481    Snapshot,
482    OutdatedSnapshot,
483    ShallowSnapshot,
484    OutdatedRle,
485    Updates,
486}
487
488impl std::fmt::Display for EncodedBlobMode {
489    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490        f.write_str(match self {
491            EncodedBlobMode::OutdatedRle => "outdated-update",
492            EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
493            EncodedBlobMode::Snapshot => "snapshot",
494            EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
495            EncodedBlobMode::Updates => "update",
496        })
497    }
498}
499
500impl EncodedBlobMode {
501    pub fn is_snapshot(&self) -> bool {
502        matches!(
503            self,
504            EncodedBlobMode::Snapshot
505                | EncodedBlobMode::ShallowSnapshot
506                | EncodedBlobMode::OutdatedSnapshot
507        )
508    }
509}
510
511#[derive(Debug, Clone)]
512pub struct ImportBlobMetadata {
513    /// The partial start version vector.
514    ///
515    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
516    /// However, it does not constitute a complete version vector, as it only contains counters
517    /// from peers included within the import blob.
518    pub partial_start_vv: VersionVector,
519    /// The partial end version vector.
520    ///
521    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
522    /// However, it does not constitute a complete version vector, as it only contains counters
523    /// from peers included within the import blob.
524    pub partial_end_vv: VersionVector,
525    pub start_timestamp: i64,
526    pub start_frontiers: Frontiers,
527    pub end_timestamp: i64,
528    pub change_num: u32,
529    pub mode: EncodedBlobMode,
530}
531
532impl LoroDoc {
533    /// Decodes the metadata for an imported blob from the provided bytes.
534    pub fn decode_import_blob_meta(
535        blob: &[u8],
536        check_checksum: bool,
537    ) -> LoroResult<ImportBlobMetadata> {
538        let parsed = parse_header_and_body(blob, check_checksum)?;
539        match parsed.mode {
540            EncodeMode::Auto => unreachable!(),
541            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
542                Err(LoroError::ImportUnsupportedEncodingMode)
543            }
544            EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
545            EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
546        }
547    }
548}
549
550#[cfg(test)]
551mod test {
552
553    use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
554
555    #[test]
556    fn test_value_encode_size() {
557        fn assert_size(value: LoroValue, max_size: usize) {
558            let size = postcard::to_allocvec(&value).unwrap().len();
559            assert!(
560                size <= max_size,
561                "value: {:?}, size: {}, max_size: {}",
562                value,
563                size,
564                max_size
565            );
566        }
567
568        assert_size(LoroValue::Null, 1);
569        assert_size(LoroValue::I64(1), 2);
570        assert_size(LoroValue::Double(1.), 9);
571        assert_size(LoroValue::Bool(true), 2);
572        assert_size(LoroValue::String("123".to_string().into()), 5);
573        assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
574        assert_size(
575            loro_value!({
576                "a": 1,
577                "b": 2,
578            }),
579            10,
580        );
581        assert_size(loro_value!([1, 2, 3]), 8);
582        assert_size(
583            LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
584            5,
585        );
586        assert_size(
587            LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
588            5,
589        );
590    }
591}