Skip to main content

loro_internal/
encoding.rs

1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9    decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::version::{Frontiers, VersionRange};
15use crate::LoroDoc;
16use crate::{oplog::OpLog, LoroError, VersionVector};
17use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
18use num_traits::{FromPrimitive, ToPrimitive};
19use std::borrow::Cow;
20
21/// The mode of the export.
22///
23/// Loro CRDT internally consists of two parts: document history and current document state.
24/// The export modes offer various options to meet different requirements.
25///
26/// - CRDT property: Documents maintain consistent states when they receive the same set of updates.
27/// - In real-time collaboration, peers typically only need to synchronize updates
28///   (operations/history) to achieve consistency.
29///
30/// ## Update Export
31///
32/// - Exports only the history part, containing multiple operations.
33/// - Suitable for real-time collaboration scenarios where peers only need to synchronize updates.
34///
35/// ## Snapshot Export
36///
37/// ### Default Snapshot
38///
39/// - Includes complete history and current full state.
40///
41/// ### Shallow Snapshot
42///
43/// - Contains the complete current state.
44/// - Retains partial history starting from a specified version.
45///
46/// ### State-only Snapshot
47///
48/// - Exports the state of the target version.
49/// - Includes a minimal set of operation history.
50#[non_exhaustive]
51#[derive(Debug, Clone)]
52pub enum ExportMode<'a> {
53    /// It contains the full history and the current state of the document.
54    Snapshot,
55    /// It contains the history since the `from` version vector.
56    Updates { from: Cow<'a, VersionVector> },
57    /// This mode exports the history in the specified range.
58    UpdatesInRange { spans: Cow<'a, [IdSpan]> },
59    /// The shallow snapshot only contains the history since the target frontiers
60    ShallowSnapshot(Cow<'a, Frontiers>),
61    /// The state only snapshot exports the state of the target version
62    /// with a minimal set of history (a few ops).
63    ///
64    /// It's a shallow snapshot with depth=1 at the target version.
65    /// If the target version is None, it will use the latest version as the target version.
66    StateOnly(Option<Cow<'a, Frontiers>>),
67    /// The snapshot at the specified frontiers. It contains the full history
68    /// till the target frontiers and the state at the target frontiers.
69    SnapshotAt { version: Cow<'a, Frontiers> },
70}
71
72impl<'a> ExportMode<'a> {
73    /// It contains the full history and the current state of the document.
74    pub fn snapshot() -> Self {
75        ExportMode::Snapshot
76    }
77
78    /// It contains the history since the `from` version vector.
79    pub fn updates(from: &'a VersionVector) -> Self {
80        ExportMode::Updates {
81            from: Cow::Borrowed(from),
82        }
83    }
84
85    /// It contains the history since the `from` version vector.
86    pub fn updates_owned(from: VersionVector) -> Self {
87        ExportMode::Updates {
88            from: Cow::Owned(from),
89        }
90    }
91
92    /// It contains all the history of the document.
93    pub fn all_updates() -> Self {
94        ExportMode::Updates {
95            from: Cow::Owned(Default::default()),
96        }
97    }
98
99    /// This mode exports the history in the specified range.
100    pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
101        ExportMode::UpdatesInRange {
102            spans: spans.into(),
103        }
104    }
105
106    /// The shallow snapshot only contains the history since the target frontiers.
107    pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
108        ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
109    }
110
111    /// The shallow snapshot only contains the history since the target frontiers.
112    pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
113        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
114    }
115
116    /// The shallow snapshot only contains the history since the target frontiers.
117    pub fn shallow_snapshot_since(id: ID) -> Self {
118        let frontiers = Frontiers::from_id(id);
119        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
120    }
121
122    /// The state only snapshot exports the state of the target version
123    /// with a minimal set of history (a few ops).
124    ///
125    /// It's a shallow snapshot with depth=1 at the target version.
126    /// If the target version is None, it will use the latest version as the target version.
127    pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
128        ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
129    }
130
131    /// The snapshot at the specified frontiers. It contains the full history
132    /// till the target frontiers and the state at the target frontiers.
133    pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
134        ExportMode::SnapshotAt {
135            version: Cow::Borrowed(frontiers),
136        }
137    }
138
139    /// This mode exports the history within the specified version vector.
140    pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
141        let mut spans = Vec::with_capacity(vv.len());
142        for (peer, counter) in vv.iter() {
143            if *counter > 0 {
144                spans.push(IdSpan::new(*peer, 0, *counter));
145            }
146        }
147
148        ExportMode::UpdatesInRange {
149            spans: Cow::Owned(spans),
150        }
151    }
152}
153
154const MAGIC_BYTES: [u8; 4] = *b"loro";
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157pub(crate) enum EncodeMode {
158    // This is a config option, it won't be used in encoding.
159    Auto = 255,
160    OutdatedRle = 1,
161    OutdatedSnapshot = 2,
162    FastSnapshot = 3,
163    FastUpdates = 4,
164}
165
166impl num_traits::FromPrimitive for EncodeMode {
167    #[allow(trivial_numeric_casts)]
168    #[inline]
169    fn from_i64(n: i64) -> Option<Self> {
170        match n {
171            n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
172            n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
173            n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
174            n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
175            n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
176            _ => None,
177        }
178    }
179    #[inline]
180    fn from_u64(n: u64) -> Option<Self> {
181        Self::from_i64(n as i64)
182    }
183}
184
185impl num_traits::ToPrimitive for EncodeMode {
186    #[inline]
187    #[allow(trivial_numeric_casts)]
188    fn to_i64(&self) -> Option<i64> {
189        Some(match *self {
190            EncodeMode::Auto => EncodeMode::Auto as i64,
191            EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
192            EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
193            EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
194            EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
195        })
196    }
197    #[inline]
198    fn to_u64(&self) -> Option<u64> {
199        self.to_i64().map(|x| x as u64)
200    }
201}
202
203impl EncodeMode {
204    pub fn to_bytes(self) -> [u8; 2] {
205        let value = self.to_u16().unwrap();
206        value.to_be_bytes()
207    }
208
209    pub fn is_snapshot(self) -> bool {
210        matches!(
211            self,
212            EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
213        )
214    }
215}
216
217impl TryFrom<[u8; 2]> for EncodeMode {
218    type Error = LoroError;
219
220    fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
221        let value = u16::from_be_bytes(value);
222        Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
223    }
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Default)]
227pub struct ImportStatus {
228    pub success: VersionRange,
229    pub pending: Option<VersionRange>,
230}
231
232pub(crate) fn decode_oplog(
233    oplog: &mut OpLog,
234    parsed: ParsedHeaderAndBody,
235) -> Result<ImportStatus, LoroError> {
236    let ParsedHeaderAndBody { mode, body, .. } = parsed;
237    let changes = match mode {
238        EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
239            return Err(LoroError::ImportUnsupportedEncodingMode);
240        }
241        EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
242        EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
243        EncodeMode::Auto => unreachable!(),
244    }?;
245    let ImportChangesResult {
246        mut imported,
247        latest_ids,
248        pending_changes,
249        changes_that_have_deps_before_shallow_root,
250    } = import_changes_to_oplog(changes, oplog);
251
252    let mut pending = VersionRange::default();
253    pending_changes.iter().for_each(|c| {
254        pending.extends_to_include_id_span(c.id_span());
255    });
256    // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
257    oplog.try_apply_pending(latest_ids, Some(&mut imported));
258    oplog.import_unknown_lamport_pending_changes(pending_changes)?;
259    if !changes_that_have_deps_before_shallow_root.is_empty() {
260        return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
261    }
262    Ok(ImportStatus {
263        success: imported,
264        pending: (!pending.is_empty()).then_some(pending),
265    })
266}
267
268pub(crate) struct ParsedHeaderAndBody<'a> {
269    pub checksum: [u8; 16],
270    pub checksum_body: &'a [u8],
271    pub mode: EncodeMode,
272    pub body: &'a [u8],
273}
274
275const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
276impl ParsedHeaderAndBody<'_> {
277    /// Return if the checksum is correct.
278    fn check_checksum(&self) -> LoroResult<()> {
279        match self.mode {
280            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
281                if md5::compute(self.checksum_body).0 != self.checksum {
282                    return Err(LoroError::DecodeChecksumMismatchError);
283                }
284            }
285            EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
286                let mut expected_bytes = [0; 4];
287                expected_bytes.copy_from_slice(&self.checksum[12..16]);
288                let expected = u32::from_le_bytes(expected_bytes);
289                if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
290                    return Err(LoroError::DecodeChecksumMismatchError);
291                }
292            }
293            EncodeMode::Auto => {
294                return Err(LoroError::DecodeError(
295                    "Invalid import mode `Auto` in encoded blob"
296                        .to_string()
297                        .into_boxed_str(),
298                ));
299            }
300        }
301
302        Ok(())
303    }
304}
305
306const MIN_HEADER_SIZE: usize = 22;
307pub(crate) fn parse_header_and_body(
308    bytes: &[u8],
309    check_checksum: bool,
310) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
311    let reader = &bytes;
312    if bytes.len() < MIN_HEADER_SIZE {
313        return Err(LoroError::DecodeError("Invalid import data".into()));
314    }
315
316    let (magic_bytes, reader) = reader.split_at(4);
317    if magic_bytes != MAGIC_BYTES {
318        return Err(LoroError::DecodeError("Invalid magic bytes".into()));
319    }
320
321    let (checksum, reader) = reader.split_at(16);
322    let checksum_body = reader;
323    let (mode_bytes, reader) = reader.split_at(2);
324    let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
325    let mut checksum_arr = [0; 16];
326    checksum_arr.copy_from_slice(checksum);
327
328    let ans = ParsedHeaderAndBody {
329        mode,
330        checksum_body,
331        checksum: checksum_arr,
332        body: reader,
333    };
334
335    if check_checksum {
336        ans.check_checksum()?;
337    }
338    Ok(ans)
339}
340
341pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
342    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
343        fast_snapshot::encode_snapshot(doc, ans);
344        Ok(())
345    })
346    .unwrap()
347}
348
349pub(crate) fn export_snapshot_at(
350    doc: &LoroDoc,
351    frontiers: &Frontiers,
352) -> Result<Vec<u8>, LoroEncodeError> {
353    check_target_version_reachable(doc, frontiers)?;
354    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
355        shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
356    })
357}
358
359pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
360    encode_with(EncodeMode::FastUpdates, &mut |ans| {
361        fast_snapshot::encode_updates(doc, vv, ans);
362        Ok(())
363    })
364    .unwrap()
365}
366
367pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
368    encode_with(EncodeMode::FastUpdates, &mut |ans| {
369        fast_snapshot::encode_updates_in_range(oplog, spans, ans);
370        Ok(())
371    })
372    .unwrap()
373}
374
375pub(crate) fn export_shallow_snapshot(
376    doc: &LoroDoc,
377    f: &Frontiers,
378) -> Result<Vec<u8>, LoroEncodeError> {
379    check_target_version_reachable(doc, f)?;
380    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
381        shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
382        Ok(())
383    })
384}
385
386fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
387    let oplog = doc.oplog.lock();
388    if !oplog.dag.can_export_shallow_snapshot_on(f) {
389        return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
390    }
391
392    Ok(())
393}
394
395pub(crate) fn export_state_only_snapshot(
396    doc: &LoroDoc,
397    f: &Frontiers,
398) -> Result<Vec<u8>, LoroEncodeError> {
399    check_target_version_reachable(doc, f)?;
400    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
401        shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
402        Ok(())
403    })
404}
405
406fn encode_with(
407    mode: EncodeMode,
408    f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
409) -> Result<Vec<u8>, LoroEncodeError> {
410    // HEADER
411    let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
412    ans.extend(MAGIC_BYTES);
413    let checksum = [0; 16];
414    ans.extend(checksum);
415    ans.extend(mode.to_bytes());
416
417    // BODY
418    f(&mut ans)?;
419
420    // CHECKSUM in HEADER
421    let checksum_body = &ans[20..];
422    let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
423    ans[16..20].copy_from_slice(&checksum.to_le_bytes());
424    Ok(ans)
425}
426
427pub(crate) fn decode_snapshot(
428    doc: &LoroDoc,
429    mode: EncodeMode,
430    body: &[u8],
431    origin: InternalString,
432) -> Result<ImportStatus, LoroError> {
433    match mode {
434        EncodeMode::OutdatedSnapshot => {
435            return Err(LoroError::ImportUnsupportedEncodingMode);
436        }
437        EncodeMode::FastSnapshot => {
438            fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
439        }
440        _ => {
441            return Err(LoroError::DecodeError(
442                format!("Invalid snapshot encoding mode: {mode:?}").into_boxed_str(),
443            ));
444        }
445    };
446    Ok(ImportStatus {
447        success: VersionRange::from_vv(&doc.oplog_vv()),
448        pending: None,
449    })
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
453pub enum EncodedBlobMode {
454    Snapshot,
455    OutdatedSnapshot,
456    ShallowSnapshot,
457    OutdatedRle,
458    Updates,
459}
460
461impl std::fmt::Display for EncodedBlobMode {
462    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463        f.write_str(match self {
464            EncodedBlobMode::OutdatedRle => "outdated-update",
465            EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
466            EncodedBlobMode::Snapshot => "snapshot",
467            EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
468            EncodedBlobMode::Updates => "update",
469        })
470    }
471}
472
473impl EncodedBlobMode {
474    pub fn is_snapshot(&self) -> bool {
475        matches!(
476            self,
477            EncodedBlobMode::Snapshot
478                | EncodedBlobMode::ShallowSnapshot
479                | EncodedBlobMode::OutdatedSnapshot
480        )
481    }
482}
483
484#[derive(Debug, Clone)]
485pub struct ImportBlobMetadata {
486    /// The partial start version vector.
487    ///
488    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
489    /// However, it does not constitute a complete version vector, as it only contains counters
490    /// from peers included within the import blob.
491    pub partial_start_vv: VersionVector,
492    /// The partial end version vector.
493    ///
494    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
495    /// However, it does not constitute a complete version vector, as it only contains counters
496    /// from peers included within the import blob.
497    pub partial_end_vv: VersionVector,
498    pub start_timestamp: i64,
499    pub start_frontiers: Frontiers,
500    pub end_timestamp: i64,
501    pub change_num: u32,
502    pub mode: EncodedBlobMode,
503}
504
505impl LoroDoc {
506    /// Decodes the metadata for an imported blob from the provided bytes.
507    pub fn decode_import_blob_meta(
508        blob: &[u8],
509        check_checksum: bool,
510    ) -> LoroResult<ImportBlobMetadata> {
511        let parsed = parse_header_and_body(blob, check_checksum)?;
512        match parsed.mode {
513            EncodeMode::Auto => unreachable!(),
514            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
515                Err(LoroError::ImportUnsupportedEncodingMode)
516            }
517            EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
518            EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
519        }
520    }
521}
522
523#[cfg(test)]
524mod test {
525
526    use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
527
528    #[test]
529    fn test_value_encode_size() {
530        fn assert_size(value: LoroValue, max_size: usize) {
531            let size = postcard::to_allocvec(&value).unwrap().len();
532            assert!(
533                size <= max_size,
534                "value: {:?}, size: {}, max_size: {}",
535                value,
536                size,
537                max_size
538            );
539        }
540
541        assert_size(LoroValue::Null, 1);
542        assert_size(LoroValue::I64(1), 2);
543        assert_size(LoroValue::Double(1.), 9);
544        assert_size(LoroValue::Bool(true), 2);
545        assert_size(LoroValue::String("123".to_string().into()), 5);
546        assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
547        assert_size(
548            loro_value!({
549                "a": 1,
550                "b": 2,
551            }),
552            10,
553        );
554        assert_size(loro_value!([1, 2, 3]), 8);
555        assert_size(
556            LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
557            5,
558        );
559        assert_size(
560            LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
561            5,
562        );
563    }
564}