loro_internal/
encoding.rs

1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9    decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::version::{Frontiers, VersionRange};
15use crate::LoroDoc;
16use crate::{oplog::OpLog, LoroError, VersionVector};
17use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
18use num_traits::{FromPrimitive, ToPrimitive};
19use std::borrow::Cow;
20
21/// The mode of the export.
22///
23/// Loro CRDT internally consists of two parts: document history and current document state.
24/// The export modes offer various options to meet different requirements.
25///
26/// - CRDT property: Documents maintain consistent states when they receive the same set of updates.
27/// - In real-time collaboration, peers typically only need to synchronize updates
28///   (operations/history) to achieve consistency.
29///
30/// ## Update Export
31///
32/// - Exports only the history part, containing multiple operations.
33/// - Suitable for real-time collaboration scenarios where peers only need to synchronize updates.
34///
35/// ## Snapshot Export
36///
37/// ### Default Snapshot
38///
39/// - Includes complete history and current full state.
40///
41/// ### Shallow Snapshot
42///
43/// - Contains the complete current state.
44/// - Retains partial history starting from a specified version.
45///
46/// ### State-only Snapshot
47///
48/// - Exports the state of the target version.
49/// - Includes a minimal set of operation history.
50#[non_exhaustive]
51#[derive(Debug, Clone)]
52pub enum ExportMode<'a> {
53    /// It contains the full history and the current state of the document.
54    Snapshot,
55    /// It contains the history since the `from` version vector.
56    Updates { from: Cow<'a, VersionVector> },
57    /// This mode exports the history in the specified range.
58    UpdatesInRange { spans: Cow<'a, [IdSpan]> },
59    /// The shallow snapshot only contains the history since the target frontiers
60    ShallowSnapshot(Cow<'a, Frontiers>),
61    /// The state only snapshot exports the state of the target version
62    /// with a minimal set of history (a few ops).
63    ///
64    /// It's a shallow snapshot with depth=1 at the target version.
65    /// If the target version is None, it will use the latest version as the target version.
66    StateOnly(Option<Cow<'a, Frontiers>>),
67    /// The snapshot at the specified frontiers. It contains the full history
68    /// till the target frontiers and the state at the target frontiers.
69    SnapshotAt { version: Cow<'a, Frontiers> },
70}
71
72impl<'a> ExportMode<'a> {
73    /// It contains the full history and the current state of the document.
74    pub fn snapshot() -> Self {
75        ExportMode::Snapshot
76    }
77
78    /// It contains the history since the `from` version vector.
79    pub fn updates(from: &'a VersionVector) -> Self {
80        ExportMode::Updates {
81            from: Cow::Borrowed(from),
82        }
83    }
84
85    /// It contains the history since the `from` version vector.
86    pub fn updates_owned(from: VersionVector) -> Self {
87        ExportMode::Updates {
88            from: Cow::Owned(from),
89        }
90    }
91
92    /// It contains all the history of the document.
93    pub fn all_updates() -> Self {
94        ExportMode::Updates {
95            from: Cow::Owned(Default::default()),
96        }
97    }
98
99    /// This mode exports the history in the specified range.
100    pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
101        ExportMode::UpdatesInRange {
102            spans: spans.into(),
103        }
104    }
105
106    /// The shallow snapshot only contains the history since the target frontiers.
107    pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
108        ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
109    }
110
111    /// The shallow snapshot only contains the history since the target frontiers.
112    pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
113        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
114    }
115
116    /// The shallow snapshot only contains the history since the target frontiers.
117    pub fn shallow_snapshot_since(id: ID) -> Self {
118        let frontiers = Frontiers::from_id(id);
119        ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
120    }
121
122    /// The state only snapshot exports the state of the target version
123    /// with a minimal set of history (a few ops).
124    ///
125    /// It's a shallow snapshot with depth=1 at the target version.
126    /// If the target version is None, it will use the latest version as the target version.
127    pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
128        ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
129    }
130
131    /// The snapshot at the specified frontiers. It contains the full history
132    /// till the target frontiers and the state at the target frontiers.
133    pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
134        ExportMode::SnapshotAt {
135            version: Cow::Borrowed(frontiers),
136        }
137    }
138
139    /// This mode exports the history within the specified version vector.
140    pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
141        let mut spans = Vec::with_capacity(vv.len());
142        for (peer, counter) in vv.iter() {
143            if *counter > 0 {
144                spans.push(IdSpan::new(*peer, 0, *counter));
145            }
146        }
147
148        ExportMode::UpdatesInRange {
149            spans: Cow::Owned(spans),
150        }
151    }
152}
153
154const MAGIC_BYTES: [u8; 4] = *b"loro";
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157pub(crate) enum EncodeMode {
158    // This is a config option, it won't be used in encoding.
159    Auto = 255,
160    OutdatedRle = 1,
161    OutdatedSnapshot = 2,
162    FastSnapshot = 3,
163    FastUpdates = 4,
164}
165
166impl num_traits::FromPrimitive for EncodeMode {
167    #[allow(trivial_numeric_casts)]
168    #[inline]
169    fn from_i64(n: i64) -> Option<Self> {
170        match n {
171            n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
172            n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
173            n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
174            n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
175            n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
176            _ => None,
177        }
178    }
179    #[inline]
180    fn from_u64(n: u64) -> Option<Self> {
181        Self::from_i64(n as i64)
182    }
183}
184
185impl num_traits::ToPrimitive for EncodeMode {
186    #[inline]
187    #[allow(trivial_numeric_casts)]
188    fn to_i64(&self) -> Option<i64> {
189        Some(match *self {
190            EncodeMode::Auto => EncodeMode::Auto as i64,
191            EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
192            EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
193            EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
194            EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
195        })
196    }
197    #[inline]
198    fn to_u64(&self) -> Option<u64> {
199        self.to_i64().map(|x| x as u64)
200    }
201}
202
203impl EncodeMode {
204    pub fn to_bytes(self) -> [u8; 2] {
205        let value = self.to_u16().unwrap();
206        value.to_be_bytes()
207    }
208
209    pub fn is_snapshot(self) -> bool {
210        matches!(
211            self,
212            EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
213        )
214    }
215}
216
217impl TryFrom<[u8; 2]> for EncodeMode {
218    type Error = LoroError;
219
220    fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
221        let value = u16::from_be_bytes(value);
222        Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
223    }
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Default)]
227pub struct ImportStatus {
228    pub success: VersionRange,
229    pub pending: Option<VersionRange>,
230}
231
232pub(crate) fn decode_oplog(
233    oplog: &mut OpLog,
234    parsed: ParsedHeaderAndBody,
235) -> Result<ImportStatus, LoroError> {
236    let ParsedHeaderAndBody { mode, body, .. } = parsed;
237    let changes = match mode {
238        EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
239            return Err(LoroError::ImportUnsupportedEncodingMode);
240        }
241        EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
242        EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
243        EncodeMode::Auto => unreachable!(),
244    }?;
245    let ImportChangesResult {
246        mut imported,
247        latest_ids,
248        pending_changes,
249        changes_that_have_deps_before_shallow_root,
250    } = import_changes_to_oplog(changes, oplog);
251
252    let mut pending = VersionRange::default();
253    pending_changes.iter().for_each(|c| {
254        pending.extends_to_include_id_span(c.id_span());
255    });
256    // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
257    oplog.try_apply_pending(latest_ids, Some(&mut imported));
258    oplog.import_unknown_lamport_pending_changes(pending_changes)?;
259    if !changes_that_have_deps_before_shallow_root.is_empty() {
260        return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
261    }
262    Ok(ImportStatus {
263        success: imported,
264        pending: (!pending.is_empty()).then_some(pending),
265    })
266}
267
268pub(crate) struct ParsedHeaderAndBody<'a> {
269    pub checksum: [u8; 16],
270    pub checksum_body: &'a [u8],
271    pub mode: EncodeMode,
272    pub body: &'a [u8],
273}
274
275const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
276impl ParsedHeaderAndBody<'_> {
277    /// Return if the checksum is correct.
278    fn check_checksum(&self) -> LoroResult<()> {
279        match self.mode {
280            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
281                if md5::compute(self.checksum_body).0 != self.checksum {
282                    return Err(LoroError::DecodeChecksumMismatchError);
283                }
284            }
285            EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
286                let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap());
287                if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
288                    return Err(LoroError::DecodeChecksumMismatchError);
289                }
290            }
291            EncodeMode::Auto => unreachable!(),
292        }
293
294        Ok(())
295    }
296}
297
298const MIN_HEADER_SIZE: usize = 22;
299pub(crate) fn parse_header_and_body(
300    bytes: &[u8],
301    check_checksum: bool,
302) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
303    let reader = &bytes;
304    if bytes.len() < MIN_HEADER_SIZE {
305        return Err(LoroError::DecodeError("Invalid import data".into()));
306    }
307
308    let (magic_bytes, reader) = reader.split_at(4);
309    let magic_bytes: [u8; 4] = magic_bytes.try_into().unwrap();
310    if magic_bytes != MAGIC_BYTES {
311        return Err(LoroError::DecodeError("Invalid magic bytes".into()));
312    }
313
314    let (checksum, reader) = reader.split_at(16);
315    let checksum_body = reader;
316    let (mode_bytes, reader) = reader.split_at(2);
317    let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
318
319    let ans = ParsedHeaderAndBody {
320        mode,
321        checksum_body,
322        checksum: checksum.try_into().unwrap(),
323        body: reader,
324    };
325
326    if check_checksum {
327        ans.check_checksum()?;
328    }
329    Ok(ans)
330}
331
332pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
333    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
334        fast_snapshot::encode_snapshot(doc, ans);
335        Ok(())
336    })
337    .unwrap()
338}
339
340pub(crate) fn export_snapshot_at(
341    doc: &LoroDoc,
342    frontiers: &Frontiers,
343) -> Result<Vec<u8>, LoroEncodeError> {
344    check_target_version_reachable(doc, frontiers)?;
345    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
346        shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
347    })
348}
349
350pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
351    encode_with(EncodeMode::FastUpdates, &mut |ans| {
352        fast_snapshot::encode_updates(doc, vv, ans);
353        Ok(())
354    })
355    .unwrap()
356}
357
358pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
359    encode_with(EncodeMode::FastUpdates, &mut |ans| {
360        fast_snapshot::encode_updates_in_range(oplog, spans, ans);
361        Ok(())
362    })
363    .unwrap()
364}
365
366pub(crate) fn export_shallow_snapshot(
367    doc: &LoroDoc,
368    f: &Frontiers,
369) -> Result<Vec<u8>, LoroEncodeError> {
370    check_target_version_reachable(doc, f)?;
371    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
372        shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
373        Ok(())
374    })
375}
376
377fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
378    let oplog = doc.oplog.lock().unwrap();
379    if !oplog.dag.can_export_shallow_snapshot_on(f) {
380        return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
381    }
382
383    Ok(())
384}
385
386pub(crate) fn export_state_only_snapshot(
387    doc: &LoroDoc,
388    f: &Frontiers,
389) -> Result<Vec<u8>, LoroEncodeError> {
390    check_target_version_reachable(doc, f)?;
391    encode_with(EncodeMode::FastSnapshot, &mut |ans| {
392        shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
393        Ok(())
394    })
395}
396
397fn encode_with(
398    mode: EncodeMode,
399    f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
400) -> Result<Vec<u8>, LoroEncodeError> {
401    // HEADER
402    let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
403    ans.extend(MAGIC_BYTES);
404    let checksum = [0; 16];
405    ans.extend(checksum);
406    ans.extend(mode.to_bytes());
407
408    // BODY
409    f(&mut ans)?;
410
411    // CHECKSUM in HEADER
412    let checksum_body = &ans[20..];
413    let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
414    ans[16..20].copy_from_slice(&checksum.to_le_bytes());
415    Ok(ans)
416}
417
418pub(crate) fn decode_snapshot(
419    doc: &LoroDoc,
420    mode: EncodeMode,
421    body: &[u8],
422    origin: InternalString,
423) -> Result<ImportStatus, LoroError> {
424    match mode {
425        EncodeMode::OutdatedSnapshot => {
426            return Err(LoroError::ImportUnsupportedEncodingMode);
427        }
428        EncodeMode::FastSnapshot => {
429            fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
430        }
431        _ => unreachable!(),
432    };
433    Ok(ImportStatus {
434        success: VersionRange::from_vv(&doc.oplog_vv()),
435        pending: None,
436    })
437}
438
439#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
440pub enum EncodedBlobMode {
441    Snapshot,
442    OutdatedSnapshot,
443    ShallowSnapshot,
444    OutdatedRle,
445    Updates,
446}
447
448impl std::fmt::Display for EncodedBlobMode {
449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450        f.write_str(match self {
451            EncodedBlobMode::OutdatedRle => "outdated-update",
452            EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
453            EncodedBlobMode::Snapshot => "snapshot",
454            EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
455            EncodedBlobMode::Updates => "update",
456        })
457    }
458}
459
460impl EncodedBlobMode {
461    pub fn is_snapshot(&self) -> bool {
462        matches!(
463            self,
464            EncodedBlobMode::Snapshot
465                | EncodedBlobMode::ShallowSnapshot
466                | EncodedBlobMode::OutdatedSnapshot
467        )
468    }
469}
470
471#[derive(Debug, Clone)]
472pub struct ImportBlobMetadata {
473    /// The partial start version vector.
474    ///
475    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
476    /// However, it does not constitute a complete version vector, as it only contains counters
477    /// from peers included within the import blob.
478    pub partial_start_vv: VersionVector,
479    /// The partial end version vector.
480    ///
481    /// Import blob includes all the ops from `partial_start_vv` to `partial_end_vv`.
482    /// However, it does not constitute a complete version vector, as it only contains counters
483    /// from peers included within the import blob.
484    pub partial_end_vv: VersionVector,
485    pub start_timestamp: i64,
486    pub start_frontiers: Frontiers,
487    pub end_timestamp: i64,
488    pub change_num: u32,
489    pub mode: EncodedBlobMode,
490}
491
492impl LoroDoc {
493    /// Decodes the metadata for an imported blob from the provided bytes.
494    pub fn decode_import_blob_meta(
495        blob: &[u8],
496        check_checksum: bool,
497    ) -> LoroResult<ImportBlobMetadata> {
498        let parsed = parse_header_and_body(blob, check_checksum)?;
499        match parsed.mode {
500            EncodeMode::Auto => unreachable!(),
501            EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
502                Err(LoroError::ImportUnsupportedEncodingMode)
503            }
504            EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
505            EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
506        }
507    }
508}
509
510#[cfg(test)]
511mod test {
512
513    use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
514
515    #[test]
516    fn test_value_encode_size() {
517        fn assert_size(value: LoroValue, max_size: usize) {
518            let size = postcard::to_allocvec(&value).unwrap().len();
519            assert!(
520                size <= max_size,
521                "value: {:?}, size: {}, max_size: {}",
522                value,
523                size,
524                max_size
525            );
526        }
527
528        assert_size(LoroValue::Null, 1);
529        assert_size(LoroValue::I64(1), 2);
530        assert_size(LoroValue::Double(1.), 9);
531        assert_size(LoroValue::Bool(true), 2);
532        assert_size(LoroValue::String("123".to_string().into()), 5);
533        assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
534        assert_size(
535            loro_value!({
536                "a": 1,
537                "b": 2,
538            }),
539            10,
540        );
541        assert_size(loro_value!([1, 2, 3]), 8);
542        assert_size(
543            LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
544            5,
545        );
546        assert_size(
547            LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
548            5,
549        );
550    }
551}