1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9 decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::change::Change;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
19use num_traits::{FromPrimitive, ToPrimitive};
20use std::borrow::Cow;
21
22#[non_exhaustive]
52#[derive(Debug, Clone)]
53pub enum ExportMode<'a> {
54 Snapshot,
56 Updates { from: Cow<'a, VersionVector> },
58 UpdatesInRange { spans: Cow<'a, [IdSpan]> },
60 ShallowSnapshot(Cow<'a, Frontiers>),
62 StateOnly(Option<Cow<'a, Frontiers>>),
68 SnapshotAt { version: Cow<'a, Frontiers> },
71}
72
73impl<'a> ExportMode<'a> {
74 pub fn snapshot() -> Self {
76 ExportMode::Snapshot
77 }
78
79 pub fn updates(from: &'a VersionVector) -> Self {
81 ExportMode::Updates {
82 from: Cow::Borrowed(from),
83 }
84 }
85
86 pub fn updates_owned(from: VersionVector) -> Self {
88 ExportMode::Updates {
89 from: Cow::Owned(from),
90 }
91 }
92
93 pub fn all_updates() -> Self {
95 ExportMode::Updates {
96 from: Cow::Owned(Default::default()),
97 }
98 }
99
100 pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
102 ExportMode::UpdatesInRange {
103 spans: spans.into(),
104 }
105 }
106
107 pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
109 ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
110 }
111
112 pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
114 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
115 }
116
117 pub fn shallow_snapshot_since(id: ID) -> Self {
119 let frontiers = Frontiers::from_id(id);
120 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
121 }
122
123 pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
129 ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
130 }
131
132 pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
135 ExportMode::SnapshotAt {
136 version: Cow::Borrowed(frontiers),
137 }
138 }
139
140 pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
142 let mut spans = Vec::with_capacity(vv.len());
143 for (peer, counter) in vv.iter() {
144 if *counter > 0 {
145 spans.push(IdSpan::new(*peer, 0, *counter));
146 }
147 }
148
149 ExportMode::UpdatesInRange {
150 spans: Cow::Owned(spans),
151 }
152 }
153}
154
155const MAGIC_BYTES: [u8; 4] = *b"loro";
156
157#[derive(Clone, Copy, Debug, PartialEq, Eq)]
158pub(crate) enum EncodeMode {
159 Auto = 255,
161 OutdatedRle = 1,
162 OutdatedSnapshot = 2,
163 FastSnapshot = 3,
164 FastUpdates = 4,
165}
166
167impl num_traits::FromPrimitive for EncodeMode {
168 #[allow(trivial_numeric_casts)]
169 #[inline]
170 fn from_i64(n: i64) -> Option<Self> {
171 match n {
172 n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
173 n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
174 n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
175 n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
176 n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
177 _ => None,
178 }
179 }
180 #[inline]
181 fn from_u64(n: u64) -> Option<Self> {
182 Self::from_i64(n as i64)
183 }
184}
185
186impl num_traits::ToPrimitive for EncodeMode {
187 #[inline]
188 #[allow(trivial_numeric_casts)]
189 fn to_i64(&self) -> Option<i64> {
190 Some(match *self {
191 EncodeMode::Auto => EncodeMode::Auto as i64,
192 EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
193 EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
194 EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
195 EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
196 })
197 }
198 #[inline]
199 fn to_u64(&self) -> Option<u64> {
200 self.to_i64().map(|x| x as u64)
201 }
202}
203
204impl EncodeMode {
205 pub fn to_bytes(self) -> [u8; 2] {
206 let value = self.to_u16().unwrap();
207 value.to_be_bytes()
208 }
209
210 pub fn is_snapshot(self) -> bool {
211 matches!(
212 self,
213 EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
214 )
215 }
216}
217
218impl TryFrom<[u8; 2]> for EncodeMode {
219 type Error = LoroError;
220
221 fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
222 let value = u16::from_be_bytes(value);
223 Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Default)]
228pub struct ImportStatus {
229 pub success: VersionRange,
230 pub pending: Option<VersionRange>,
231}
232
233pub(crate) fn decode_oplog(
234 oplog: &mut OpLog,
235 parsed: ParsedHeaderAndBody,
236) -> Result<ImportStatus, LoroError> {
237 let changes = decode_oplog_changes(oplog, parsed)?;
238 let result = apply_decoded_changes_to_oplog(oplog, changes);
239 if result.has_deps_before_shallow_root {
240 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
241 }
242
243 Ok(result.status)
244}
245
246pub(crate) fn decode_oplog_changes(
247 oplog: &mut OpLog,
248 parsed: ParsedHeaderAndBody,
249) -> Result<Vec<Change>, LoroError> {
250 let ParsedHeaderAndBody { mode, body, .. } = parsed;
251 match mode {
252 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
253 Err(LoroError::ImportUnsupportedEncodingMode)
254 }
255 EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
256 EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
257 EncodeMode::Auto => unreachable!(),
258 }
259}
260
261pub(crate) struct ApplyDecodedChangesResult {
262 pub status: ImportStatus,
263 pub has_deps_before_shallow_root: bool,
264}
265
266pub(crate) fn apply_decoded_changes_to_oplog(
267 oplog: &mut OpLog,
268 changes: Vec<Change>,
269) -> ApplyDecodedChangesResult {
270 let ImportChangesResult {
271 mut imported,
272 latest_ids,
273 pending_changes,
274 changes_that_have_deps_before_shallow_root,
275 } = import_changes_to_oplog(changes, oplog);
276
277 let mut pending = VersionRange::default();
278 pending_changes.iter().for_each(|c| {
279 pending.extends_to_include_id_span(c.id_span());
280 });
281 oplog.try_apply_pending(latest_ids, Some(&mut imported));
283 oplog
284 .import_unknown_lamport_pending_changes(pending_changes)
285 .expect("importing unknown-lamport pending changes is infallible");
286 ApplyDecodedChangesResult {
287 status: ImportStatus {
288 success: imported,
289 pending: (!pending.is_empty()).then_some(pending),
290 },
291 has_deps_before_shallow_root: !changes_that_have_deps_before_shallow_root.is_empty(),
292 }
293}
294
295pub(crate) struct ParsedHeaderAndBody<'a> {
296 pub checksum: [u8; 16],
297 pub checksum_body: &'a [u8],
298 pub mode: EncodeMode,
299 pub body: &'a [u8],
300}
301
302const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
303impl ParsedHeaderAndBody<'_> {
304 fn check_checksum(&self) -> LoroResult<()> {
306 match self.mode {
307 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
308 if md5::compute(self.checksum_body).0 != self.checksum {
309 return Err(LoroError::DecodeChecksumMismatchError);
310 }
311 }
312 EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
313 let mut expected_bytes = [0; 4];
314 expected_bytes.copy_from_slice(&self.checksum[12..16]);
315 let expected = u32::from_le_bytes(expected_bytes);
316 if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
317 return Err(LoroError::DecodeChecksumMismatchError);
318 }
319 }
320 EncodeMode::Auto => {
321 return Err(LoroError::DecodeError(
322 "Invalid import mode `Auto` in encoded blob"
323 .to_string()
324 .into_boxed_str(),
325 ));
326 }
327 }
328
329 Ok(())
330 }
331}
332
333const MIN_HEADER_SIZE: usize = 22;
334pub(crate) fn parse_header_and_body(
335 bytes: &[u8],
336 check_checksum: bool,
337) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
338 let reader = &bytes;
339 if bytes.len() < MIN_HEADER_SIZE {
340 return Err(LoroError::DecodeError("Invalid import data".into()));
341 }
342
343 let (magic_bytes, reader) = reader.split_at(4);
344 if magic_bytes != MAGIC_BYTES {
345 return Err(LoroError::DecodeError("Invalid magic bytes".into()));
346 }
347
348 let (checksum, reader) = reader.split_at(16);
349 let checksum_body = reader;
350 let (mode_bytes, reader) = reader.split_at(2);
351 let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
352 let mut checksum_arr = [0; 16];
353 checksum_arr.copy_from_slice(checksum);
354
355 let ans = ParsedHeaderAndBody {
356 mode,
357 checksum_body,
358 checksum: checksum_arr,
359 body: reader,
360 };
361
362 if check_checksum {
363 ans.check_checksum()?;
364 }
365 Ok(ans)
366}
367
368pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
369 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
370 fast_snapshot::encode_snapshot(doc, ans);
371 Ok(())
372 })
373 .unwrap()
374}
375
376pub(crate) fn export_snapshot_at(
377 doc: &LoroDoc,
378 frontiers: &Frontiers,
379) -> Result<Vec<u8>, LoroEncodeError> {
380 check_target_version_reachable(doc, frontiers)?;
381 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
382 shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
383 })
384}
385
386pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
387 encode_with(EncodeMode::FastUpdates, &mut |ans| {
388 fast_snapshot::encode_updates(doc, vv, ans);
389 Ok(())
390 })
391 .unwrap()
392}
393
394pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
395 encode_with(EncodeMode::FastUpdates, &mut |ans| {
396 fast_snapshot::encode_updates_in_range(oplog, spans, ans);
397 Ok(())
398 })
399 .unwrap()
400}
401
402pub(crate) fn export_shallow_snapshot(
403 doc: &LoroDoc,
404 f: &Frontiers,
405) -> Result<Vec<u8>, LoroEncodeError> {
406 check_target_version_reachable(doc, f)?;
407 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
408 shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
409 Ok(())
410 })
411}
412
413fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
414 let oplog = doc.oplog.lock();
415 if !oplog.dag.can_export_shallow_snapshot_on(f) {
416 return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
417 }
418
419 Ok(())
420}
421
422pub(crate) fn export_state_only_snapshot(
423 doc: &LoroDoc,
424 f: &Frontiers,
425) -> Result<Vec<u8>, LoroEncodeError> {
426 check_target_version_reachable(doc, f)?;
427 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
428 shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
429 Ok(())
430 })
431}
432
433fn encode_with(
434 mode: EncodeMode,
435 f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
436) -> Result<Vec<u8>, LoroEncodeError> {
437 let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
439 ans.extend(MAGIC_BYTES);
440 let checksum = [0; 16];
441 ans.extend(checksum);
442 ans.extend(mode.to_bytes());
443
444 f(&mut ans)?;
446
447 let checksum_body = &ans[20..];
449 let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
450 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
451 Ok(ans)
452}
453
454pub(crate) fn decode_snapshot(
455 doc: &LoroDoc,
456 mode: EncodeMode,
457 body: &[u8],
458 origin: InternalString,
459) -> Result<ImportStatus, LoroError> {
460 match mode {
461 EncodeMode::OutdatedSnapshot => {
462 return Err(LoroError::ImportUnsupportedEncodingMode);
463 }
464 EncodeMode::FastSnapshot => {
465 fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
466 }
467 _ => {
468 return Err(LoroError::DecodeError(
469 format!("Invalid snapshot encoding mode: {mode:?}").into_boxed_str(),
470 ));
471 }
472 };
473 Ok(ImportStatus {
474 success: VersionRange::from_vv(&doc.oplog_vv()),
475 pending: None,
476 })
477}
478
479#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
480pub enum EncodedBlobMode {
481 Snapshot,
482 OutdatedSnapshot,
483 ShallowSnapshot,
484 OutdatedRle,
485 Updates,
486}
487
488impl std::fmt::Display for EncodedBlobMode {
489 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490 f.write_str(match self {
491 EncodedBlobMode::OutdatedRle => "outdated-update",
492 EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
493 EncodedBlobMode::Snapshot => "snapshot",
494 EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
495 EncodedBlobMode::Updates => "update",
496 })
497 }
498}
499
500impl EncodedBlobMode {
501 pub fn is_snapshot(&self) -> bool {
502 matches!(
503 self,
504 EncodedBlobMode::Snapshot
505 | EncodedBlobMode::ShallowSnapshot
506 | EncodedBlobMode::OutdatedSnapshot
507 )
508 }
509}
510
511#[derive(Debug, Clone)]
512pub struct ImportBlobMetadata {
513 pub partial_start_vv: VersionVector,
519 pub partial_end_vv: VersionVector,
525 pub start_timestamp: i64,
526 pub start_frontiers: Frontiers,
527 pub end_timestamp: i64,
528 pub change_num: u32,
529 pub mode: EncodedBlobMode,
530}
531
532impl LoroDoc {
533 pub fn decode_import_blob_meta(
535 blob: &[u8],
536 check_checksum: bool,
537 ) -> LoroResult<ImportBlobMetadata> {
538 let parsed = parse_header_and_body(blob, check_checksum)?;
539 match parsed.mode {
540 EncodeMode::Auto => unreachable!(),
541 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
542 Err(LoroError::ImportUnsupportedEncodingMode)
543 }
544 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
545 EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
546 }
547 }
548}
549
550#[cfg(test)]
551mod test {
552
553 use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
554
555 #[test]
556 fn test_value_encode_size() {
557 fn assert_size(value: LoroValue, max_size: usize) {
558 let size = postcard::to_allocvec(&value).unwrap().len();
559 assert!(
560 size <= max_size,
561 "value: {:?}, size: {}, max_size: {}",
562 value,
563 size,
564 max_size
565 );
566 }
567
568 assert_size(LoroValue::Null, 1);
569 assert_size(LoroValue::I64(1), 2);
570 assert_size(LoroValue::Double(1.), 9);
571 assert_size(LoroValue::Bool(true), 2);
572 assert_size(LoroValue::String("123".to_string().into()), 5);
573 assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
574 assert_size(
575 loro_value!({
576 "a": 1,
577 "b": 2,
578 }),
579 10,
580 );
581 assert_size(loro_value!([1, 2, 3]), 8);
582 assert_size(
583 LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
584 5,
585 );
586 assert_size(
587 LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
588 5,
589 );
590 }
591}