1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9 decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::change::Change;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
19use num_traits::{FromPrimitive, ToPrimitive};
20use std::borrow::Cow;
21
22#[non_exhaustive]
52#[derive(Debug, Clone)]
53pub enum ExportMode<'a> {
54 Snapshot,
56 Updates { from: Cow<'a, VersionVector> },
58 UpdatesInRange { spans: Cow<'a, [IdSpan]> },
60 ShallowSnapshot(Cow<'a, Frontiers>),
62 StateOnly(Option<Cow<'a, Frontiers>>),
68 SnapshotAt { version: Cow<'a, Frontiers> },
71}
72
73impl<'a> ExportMode<'a> {
74 pub fn snapshot() -> Self {
76 ExportMode::Snapshot
77 }
78
79 pub fn updates(from: &'a VersionVector) -> Self {
81 ExportMode::Updates {
82 from: Cow::Borrowed(from),
83 }
84 }
85
86 pub fn updates_owned(from: VersionVector) -> Self {
88 ExportMode::Updates {
89 from: Cow::Owned(from),
90 }
91 }
92
93 pub fn all_updates() -> Self {
95 ExportMode::Updates {
96 from: Cow::Owned(Default::default()),
97 }
98 }
99
100 pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
102 ExportMode::UpdatesInRange {
103 spans: spans.into(),
104 }
105 }
106
107 pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
109 ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
110 }
111
112 pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
114 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
115 }
116
117 pub fn shallow_snapshot_since(id: ID) -> Self {
119 let frontiers = Frontiers::from_id(id);
120 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
121 }
122
123 pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
129 ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
130 }
131
132 pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
135 ExportMode::SnapshotAt {
136 version: Cow::Borrowed(frontiers),
137 }
138 }
139
140 pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
142 let mut spans = Vec::with_capacity(vv.len());
143 for (peer, counter) in vv.iter() {
144 if *counter > 0 {
145 spans.push(IdSpan::new(*peer, 0, *counter));
146 }
147 }
148
149 ExportMode::UpdatesInRange {
150 spans: Cow::Owned(spans),
151 }
152 }
153}
154
155const MAGIC_BYTES: [u8; 4] = *b"loro";
156
157#[derive(Clone, Copy, Debug, PartialEq, Eq)]
158pub(crate) enum EncodeMode {
159 Auto = 255,
161 OutdatedRle = 1,
162 OutdatedSnapshot = 2,
163 FastSnapshot = 3,
164 FastUpdates = 4,
165}
166
167impl num_traits::FromPrimitive for EncodeMode {
168 #[allow(trivial_numeric_casts)]
169 #[inline]
170 fn from_i64(n: i64) -> Option<Self> {
171 match n {
172 n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
173 n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
174 n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
175 n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
176 n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
177 _ => None,
178 }
179 }
180 #[inline]
181 fn from_u64(n: u64) -> Option<Self> {
182 Self::from_i64(n as i64)
183 }
184}
185
186impl num_traits::ToPrimitive for EncodeMode {
187 #[inline]
188 #[allow(trivial_numeric_casts)]
189 fn to_i64(&self) -> Option<i64> {
190 Some(match *self {
191 EncodeMode::Auto => EncodeMode::Auto as i64,
192 EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
193 EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
194 EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
195 EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
196 })
197 }
198 #[inline]
199 fn to_u64(&self) -> Option<u64> {
200 self.to_i64().map(|x| x as u64)
201 }
202}
203
204impl EncodeMode {
205 pub fn to_bytes(self) -> [u8; 2] {
206 let value = self.to_u16().unwrap();
207 value.to_be_bytes()
208 }
209
210 pub fn is_snapshot(self) -> bool {
211 matches!(
212 self,
213 EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
214 )
215 }
216}
217
218impl TryFrom<[u8; 2]> for EncodeMode {
219 type Error = LoroError;
220
221 fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
222 let value = u16::from_be_bytes(value);
223 Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Default)]
228pub struct ImportStatus {
229 pub success: VersionRange,
230 pub pending: Option<VersionRange>,
231}
232
233pub(crate) fn decode_oplog(
234 oplog: &mut OpLog,
235 parsed: ParsedHeaderAndBody,
236) -> Result<ImportStatus, LoroError> {
237 let changes = decode_oplog_changes(oplog, parsed)?;
238 let result = apply_decoded_changes_to_oplog(oplog, changes);
239 if result.has_deps_before_shallow_root {
240 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
241 }
242
243 Ok(result.status)
244}
245
246pub(crate) fn decode_oplog_changes(
247 oplog: &mut OpLog,
248 parsed: ParsedHeaderAndBody,
249) -> Result<Vec<Change>, LoroError> {
250 let ParsedHeaderAndBody { mode, body, .. } = parsed;
251 match mode {
252 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
253 Err(LoroError::ImportUnsupportedEncodingMode)
254 }
255 EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
256 EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
257 EncodeMode::Auto => unreachable!(),
258 }
259}
260
261pub(crate) struct ApplyDecodedChangesResult {
262 pub status: ImportStatus,
263 pub has_deps_before_shallow_root: bool,
264}
265
266pub(crate) fn apply_decoded_changes_to_oplog(
267 oplog: &mut OpLog,
268 changes: Vec<Change>,
269) -> ApplyDecodedChangesResult {
270 let ImportChangesResult {
271 mut imported,
272 latest_ids,
273 pending_changes,
274 changes_that_have_deps_before_shallow_root,
275 } = import_changes_to_oplog(changes, oplog);
276
277 let mut pending = VersionRange::default();
278 pending_changes.iter().for_each(|c| {
279 pending.extends_to_include_id_span(c.id_span());
280 });
281 oplog.try_apply_pending(latest_ids, Some(&mut imported));
283 oplog
284 .import_unknown_lamport_pending_changes(pending_changes)
285 .expect("importing unknown-lamport pending changes is infallible");
286 ApplyDecodedChangesResult {
287 status: ImportStatus {
288 success: imported,
289 pending: (!pending.is_empty()).then_some(pending),
290 },
291 has_deps_before_shallow_root: !changes_that_have_deps_before_shallow_root.is_empty(),
292 }
293}
294
295pub(crate) struct ParsedHeaderAndBody<'a> {
296 pub checksum: [u8; 16],
297 pub checksum_body: &'a [u8],
298 pub mode: EncodeMode,
299 pub body: &'a [u8],
300}
301
302const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
303impl ParsedHeaderAndBody<'_> {
304 fn check_checksum(&self) -> LoroResult<()> {
306 match self.mode {
307 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
308 if md5::compute(self.checksum_body).0 != self.checksum {
309 return Err(LoroError::DecodeChecksumMismatchError);
310 }
311 }
312 EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
313 let mut expected_bytes = [0; 4];
314 expected_bytes.copy_from_slice(&self.checksum[12..16]);
315 let expected = u32::from_le_bytes(expected_bytes);
316 if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
317 return Err(LoroError::DecodeChecksumMismatchError);
318 }
319 }
320 EncodeMode::Auto => {
321 return Err(LoroError::DecodeError(
322 "Invalid import mode `Auto` in encoded blob"
323 .to_string()
324 .into_boxed_str(),
325 ));
326 }
327 }
328
329 Ok(())
330 }
331}
332
333const MIN_HEADER_SIZE: usize = 22;
334pub(crate) fn parse_header_and_body(
335 bytes: &[u8],
336 check_checksum: bool,
337) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
338 let reader = &bytes;
339 if bytes.len() < MIN_HEADER_SIZE {
340 return Err(LoroError::DecodeError("Invalid import data".into()));
341 }
342
343 let (magic_bytes, reader) = reader.split_at(4);
344 if magic_bytes != MAGIC_BYTES {
345 return Err(LoroError::DecodeError("Invalid magic bytes".into()));
346 }
347
348 let (checksum, reader) = reader.split_at(16);
349 let checksum_body = reader;
350 let (mode_bytes, reader) = reader.split_at(2);
351 let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
352 if mode == EncodeMode::Auto {
353 return Err(LoroError::DecodeError(
354 "Invalid import mode `Auto` in encoded blob"
355 .to_string()
356 .into_boxed_str(),
357 ));
358 }
359 let mut checksum_arr = [0; 16];
360 checksum_arr.copy_from_slice(checksum);
361
362 let ans = ParsedHeaderAndBody {
363 mode,
364 checksum_body,
365 checksum: checksum_arr,
366 body: reader,
367 };
368
369 if check_checksum {
370 ans.check_checksum()?;
371 }
372 Ok(ans)
373}
374
375pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
376 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
377 fast_snapshot::encode_snapshot(doc, ans);
378 Ok(())
379 })
380 .unwrap()
381}
382
383pub(crate) fn export_snapshot_at(
384 doc: &LoroDoc,
385 frontiers: &Frontiers,
386) -> Result<Vec<u8>, LoroEncodeError> {
387 check_target_version_reachable(doc, frontiers)?;
388 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
389 shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
390 })
391}
392
393pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
394 encode_with(EncodeMode::FastUpdates, &mut |ans| {
395 fast_snapshot::encode_updates(doc, vv, ans);
396 Ok(())
397 })
398 .unwrap()
399}
400
401pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
402 encode_with(EncodeMode::FastUpdates, &mut |ans| {
403 fast_snapshot::encode_updates_in_range(oplog, spans, ans);
404 Ok(())
405 })
406 .unwrap()
407}
408
409pub(crate) fn export_shallow_snapshot(
410 doc: &LoroDoc,
411 f: &Frontiers,
412) -> Result<Vec<u8>, LoroEncodeError> {
413 check_target_version_reachable(doc, f)?;
414 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
415 shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
416 Ok(())
417 })
418}
419
420fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
421 let oplog = doc.oplog.lock();
422 if !oplog.dag.can_export_shallow_snapshot_on(f) {
423 return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
424 }
425
426 Ok(())
427}
428
429pub(crate) fn export_state_only_snapshot(
430 doc: &LoroDoc,
431 f: &Frontiers,
432) -> Result<Vec<u8>, LoroEncodeError> {
433 check_target_version_reachable(doc, f)?;
434 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
435 shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
436 Ok(())
437 })
438}
439
440fn encode_with(
441 mode: EncodeMode,
442 f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
443) -> Result<Vec<u8>, LoroEncodeError> {
444 let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
446 ans.extend(MAGIC_BYTES);
447 let checksum = [0; 16];
448 ans.extend(checksum);
449 ans.extend(mode.to_bytes());
450
451 f(&mut ans)?;
453
454 let checksum_body = &ans[20..];
456 let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
457 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
458 Ok(ans)
459}
460
461pub(crate) fn decode_snapshot(
462 doc: &LoroDoc,
463 mode: EncodeMode,
464 body: &[u8],
465 origin: InternalString,
466) -> Result<ImportStatus, LoroError> {
467 match mode {
468 EncodeMode::OutdatedSnapshot => {
469 return Err(LoroError::ImportUnsupportedEncodingMode);
470 }
471 EncodeMode::FastSnapshot => {
472 fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
473 }
474 _ => {
475 return Err(LoroError::DecodeError(
476 format!("Invalid snapshot encoding mode: {mode:?}").into_boxed_str(),
477 ));
478 }
479 };
480 Ok(ImportStatus {
481 success: VersionRange::from_vv(&doc.oplog_vv()),
482 pending: None,
483 })
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
487pub enum EncodedBlobMode {
488 Snapshot,
489 OutdatedSnapshot,
490 ShallowSnapshot,
491 OutdatedRle,
492 Updates,
493}
494
495impl std::fmt::Display for EncodedBlobMode {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 f.write_str(match self {
498 EncodedBlobMode::OutdatedRle => "outdated-update",
499 EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
500 EncodedBlobMode::Snapshot => "snapshot",
501 EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
502 EncodedBlobMode::Updates => "update",
503 })
504 }
505}
506
507impl EncodedBlobMode {
508 pub fn is_snapshot(&self) -> bool {
509 matches!(
510 self,
511 EncodedBlobMode::Snapshot
512 | EncodedBlobMode::ShallowSnapshot
513 | EncodedBlobMode::OutdatedSnapshot
514 )
515 }
516}
517
518#[derive(Debug, Clone)]
519pub struct ImportBlobMetadata {
520 pub partial_start_vv: VersionVector,
526 pub partial_end_vv: VersionVector,
532 pub start_timestamp: i64,
533 pub start_frontiers: Frontiers,
534 pub end_timestamp: i64,
535 pub change_num: u32,
536 pub mode: EncodedBlobMode,
537}
538
539impl LoroDoc {
540 pub fn decode_import_blob_meta(
542 blob: &[u8],
543 check_checksum: bool,
544 ) -> LoroResult<ImportBlobMetadata> {
545 let parsed = parse_header_and_body(blob, check_checksum)?;
546 match parsed.mode {
547 EncodeMode::Auto => unreachable!(),
548 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
549 Err(LoroError::ImportUnsupportedEncodingMode)
550 }
551 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
552 EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
553 }
554 }
555}
556
557#[cfg(test)]
558mod test {
559
560 use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
561
562 #[test]
563 fn test_value_encode_size() {
564 fn assert_size(value: LoroValue, max_size: usize) {
565 let size = postcard::to_allocvec(&value).unwrap().len();
566 assert!(
567 size <= max_size,
568 "value: {:?}, size: {}, max_size: {}",
569 value,
570 size,
571 max_size
572 );
573 }
574
575 assert_size(LoroValue::Null, 1);
576 assert_size(LoroValue::I64(1), 2);
577 assert_size(LoroValue::Double(1.), 9);
578 assert_size(LoroValue::Bool(true), 2);
579 assert_size(LoroValue::String("123".to_string().into()), 5);
580 assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
581 assert_size(
582 loro_value!({
583 "a": 1,
584 "b": 2,
585 }),
586 10,
587 );
588 assert_size(loro_value!([1, 2, 3]), 8);
589 assert_size(
590 LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
591 5,
592 );
593 assert_size(
594 LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
595 5,
596 );
597 }
598}