1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9 decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::op::OpWithId;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{
19 HasIdSpan, IdLpSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, PeerID, ID,
20};
21use num_traits::{FromPrimitive, ToPrimitive};
22use rle::{HasLength, Sliceable};
23use std::borrow::Cow;
24
25#[non_exhaustive]
55#[derive(Debug, Clone)]
56pub enum ExportMode<'a> {
57 Snapshot,
59 Updates { from: Cow<'a, VersionVector> },
61 UpdatesInRange { spans: Cow<'a, [IdSpan]> },
63 ShallowSnapshot(Cow<'a, Frontiers>),
65 StateOnly(Option<Cow<'a, Frontiers>>),
71 SnapshotAt { version: Cow<'a, Frontiers> },
74}
75
76impl<'a> ExportMode<'a> {
77 pub fn snapshot() -> Self {
79 ExportMode::Snapshot
80 }
81
82 pub fn updates(from: &'a VersionVector) -> Self {
84 ExportMode::Updates {
85 from: Cow::Borrowed(from),
86 }
87 }
88
89 pub fn updates_owned(from: VersionVector) -> Self {
91 ExportMode::Updates {
92 from: Cow::Owned(from),
93 }
94 }
95
96 pub fn all_updates() -> Self {
98 ExportMode::Updates {
99 from: Cow::Owned(Default::default()),
100 }
101 }
102
103 pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
105 ExportMode::UpdatesInRange {
106 spans: spans.into(),
107 }
108 }
109
110 pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
112 ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
113 }
114
115 pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
117 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
118 }
119
120 pub fn shallow_snapshot_since(id: ID) -> Self {
122 let frontiers = Frontiers::from_id(id);
123 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
124 }
125
126 pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
132 ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
133 }
134
135 pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
138 ExportMode::SnapshotAt {
139 version: Cow::Borrowed(frontiers),
140 }
141 }
142
143 pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
145 let mut spans = Vec::with_capacity(vv.len());
146 for (peer, counter) in vv.iter() {
147 if *counter > 0 {
148 spans.push(IdSpan::new(*peer, 0, *counter));
149 }
150 }
151
152 ExportMode::UpdatesInRange {
153 spans: Cow::Owned(spans),
154 }
155 }
156}
157
158const MAGIC_BYTES: [u8; 4] = *b"loro";
159
160#[derive(Clone, Copy, Debug, PartialEq, Eq)]
161pub(crate) enum EncodeMode {
162 Auto = 255,
164 OutdatedRle = 1,
165 OutdatedSnapshot = 2,
166 FastSnapshot = 3,
167 FastUpdates = 4,
168}
169
170impl num_traits::FromPrimitive for EncodeMode {
171 #[allow(trivial_numeric_casts)]
172 #[inline]
173 fn from_i64(n: i64) -> Option<Self> {
174 match n {
175 n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
176 n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
177 n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
178 n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
179 n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
180 _ => None,
181 }
182 }
183 #[inline]
184 fn from_u64(n: u64) -> Option<Self> {
185 Self::from_i64(n as i64)
186 }
187}
188
189impl num_traits::ToPrimitive for EncodeMode {
190 #[inline]
191 #[allow(trivial_numeric_casts)]
192 fn to_i64(&self) -> Option<i64> {
193 Some(match *self {
194 EncodeMode::Auto => EncodeMode::Auto as i64,
195 EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
196 EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
197 EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
198 EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
199 })
200 }
201 #[inline]
202 fn to_u64(&self) -> Option<u64> {
203 self.to_i64().map(|x| x as u64)
204 }
205}
206
207impl EncodeMode {
208 pub fn to_bytes(self) -> [u8; 2] {
209 let value = self.to_u16().unwrap();
210 value.to_be_bytes()
211 }
212
213 pub fn is_snapshot(self) -> bool {
214 matches!(
215 self,
216 EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
217 )
218 }
219}
220
221impl TryFrom<[u8; 2]> for EncodeMode {
222 type Error = LoroError;
223
224 fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
225 let value = u16::from_be_bytes(value);
226 Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, Default)]
231pub struct ImportStatus {
232 pub success: VersionRange,
233 pub pending: Option<VersionRange>,
234}
235
236pub(crate) struct StateSnapshotEncoder<'a> {
247 check_idspan: &'a dyn Fn(IdLpSpan) -> Result<(), IdLpSpan>,
251 encoder_by_op: &'a mut dyn FnMut(OpWithId),
253 record_idspan: &'a mut dyn FnMut(IdLpSpan),
256 register_peer: &'a mut dyn FnMut(PeerID) -> usize,
257 #[allow(unused)]
258 mode: EncodeMode,
259}
260
261impl StateSnapshotEncoder<'_> {
262 pub fn encode_op(&mut self, id_span: IdLpSpan, get_op: impl FnOnce() -> OpWithId) {
263 if let Err(span) = (self.check_idspan)(id_span) {
264 let mut op = get_op();
265 if span == id_span {
266 (self.encoder_by_op)(op);
267 } else {
268 debug_assert_eq!(span.lamport.start, id_span.lamport.start);
269 op.op = op.op.slice(span.atom_len(), op.op.atom_len());
270 (self.encoder_by_op)(op);
271 }
272 }
273
274 (self.record_idspan)(id_span);
275 }
276
277 #[allow(unused)]
278 pub fn mode(&self) -> EncodeMode {
279 self.mode
280 }
281
282 pub(crate) fn register_peer(&mut self, peer: PeerID) -> usize {
283 (self.register_peer)(peer)
284 }
285}
286
287pub(crate) struct StateSnapshotDecodeContext<'a> {
288 pub oplog: &'a OpLog,
289 pub peers: &'a [PeerID],
290 pub ops: &'a mut dyn Iterator<Item = OpWithId>,
291 #[allow(unused)]
292 pub blob: &'a [u8],
293 pub mode: EncodeMode,
294}
295
296pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode) -> Vec<u8> {
297 let mode = match mode {
298 EncodeMode::Auto => EncodeMode::OutdatedRle,
299 mode => mode,
300 };
301
302 let body = match &mode {
303 EncodeMode::OutdatedRle => outdated_encode_reordered::encode_updates(oplog, vv),
304 _ => unreachable!(),
305 };
306
307 encode_header_and_body(mode, body)
308}
309
310pub(crate) fn decode_oplog(
311 oplog: &mut OpLog,
312 parsed: ParsedHeaderAndBody,
313) -> Result<ImportStatus, LoroError> {
314 let ParsedHeaderAndBody { mode, body, .. } = parsed;
315 let changes = match mode {
316 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
317 outdated_encode_reordered::decode_updates(oplog, body)
318 }
319 EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
320 EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
321 EncodeMode::Auto => unreachable!(),
322 }?;
323 let ImportChangesResult {
324 mut imported,
325 latest_ids,
326 pending_changes,
327 changes_that_have_deps_before_shallow_root,
328 } = import_changes_to_oplog(changes, oplog);
329
330 let mut pending = VersionRange::default();
331 pending_changes.iter().for_each(|c| {
332 pending.extends_to_include_id_span(c.id_span());
333 });
334 oplog.try_apply_pending(latest_ids, Some(&mut imported));
336 oplog.import_unknown_lamport_pending_changes(pending_changes)?;
337 if !changes_that_have_deps_before_shallow_root.is_empty() {
338 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
339 }
340 Ok(ImportStatus {
341 success: imported,
342 pending: (!pending.is_empty()).then_some(pending),
343 })
344}
345
346pub(crate) struct ParsedHeaderAndBody<'a> {
347 pub checksum: [u8; 16],
348 pub checksum_body: &'a [u8],
349 pub mode: EncodeMode,
350 pub body: &'a [u8],
351}
352
353const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
354impl ParsedHeaderAndBody<'_> {
355 fn check_checksum(&self) -> LoroResult<()> {
357 match self.mode {
358 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
359 if md5::compute(self.checksum_body).0 != self.checksum {
360 return Err(LoroError::DecodeChecksumMismatchError);
361 }
362 }
363 EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
364 let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap());
365 if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
366 return Err(LoroError::DecodeChecksumMismatchError);
367 }
368 }
369 EncodeMode::Auto => unreachable!(),
370 }
371
372 Ok(())
373 }
374}
375
376const MIN_HEADER_SIZE: usize = 22;
377pub(crate) fn parse_header_and_body(
378 bytes: &[u8],
379 check_checksum: bool,
380) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
381 let reader = &bytes;
382 if bytes.len() < MIN_HEADER_SIZE {
383 return Err(LoroError::DecodeError("Invalid import data".into()));
384 }
385
386 let (magic_bytes, reader) = reader.split_at(4);
387 let magic_bytes: [u8; 4] = magic_bytes.try_into().unwrap();
388 if magic_bytes != MAGIC_BYTES {
389 return Err(LoroError::DecodeError("Invalid magic bytes".into()));
390 }
391
392 let (checksum, reader) = reader.split_at(16);
393 let checksum_body = reader;
394 let (mode_bytes, reader) = reader.split_at(2);
395 let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
396
397 let ans = ParsedHeaderAndBody {
398 mode,
399 checksum_body,
400 checksum: checksum.try_into().unwrap(),
401 body: reader,
402 };
403
404 if check_checksum {
405 ans.check_checksum()?;
406 }
407 Ok(ans)
408}
409
410fn encode_header_and_body(mode: EncodeMode, body: Vec<u8>) -> Vec<u8> {
411 let mut ans = Vec::new();
412 ans.extend(MAGIC_BYTES);
413 let checksum = [0; 16];
414 ans.extend(checksum);
415 ans.extend(mode.to_bytes());
416 ans.extend(body);
417 let checksum_body = &ans[20..];
418 let checksum = md5::compute(checksum_body).0;
419 ans[4..20].copy_from_slice(&checksum);
420 ans
421}
422
423pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
424 let body = outdated_encode_reordered::encode_snapshot(
425 &doc.oplog().lock().unwrap(),
426 &mut doc.app_state().lock().unwrap(),
427 &Default::default(),
428 );
429
430 encode_header_and_body(EncodeMode::OutdatedSnapshot, body)
431}
432
433pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
434 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
435 fast_snapshot::encode_snapshot(doc, ans);
436 Ok(())
437 })
438 .unwrap()
439}
440
441pub(crate) fn export_snapshot_at(
442 doc: &LoroDoc,
443 frontiers: &Frontiers,
444) -> Result<Vec<u8>, LoroEncodeError> {
445 check_target_version_reachable(doc, frontiers)?;
446 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
447 shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
448 })
449}
450
451pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
452 encode_with(EncodeMode::FastUpdates, &mut |ans| {
453 fast_snapshot::encode_updates(doc, vv, ans);
454 Ok(())
455 })
456 .unwrap()
457}
458
459pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
460 encode_with(EncodeMode::FastUpdates, &mut |ans| {
461 fast_snapshot::encode_updates_in_range(oplog, spans, ans);
462 Ok(())
463 })
464 .unwrap()
465}
466
467pub(crate) fn export_shallow_snapshot(
468 doc: &LoroDoc,
469 f: &Frontiers,
470) -> Result<Vec<u8>, LoroEncodeError> {
471 check_target_version_reachable(doc, f)?;
472 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
473 shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
474 Ok(())
475 })
476}
477
478fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
479 let oplog = doc.oplog.lock().unwrap();
480 if !oplog.dag.can_export_shallow_snapshot_on(f) {
481 return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
482 }
483
484 Ok(())
485}
486
487pub(crate) fn export_state_only_snapshot(
488 doc: &LoroDoc,
489 f: &Frontiers,
490) -> Result<Vec<u8>, LoroEncodeError> {
491 check_target_version_reachable(doc, f)?;
492 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
493 shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
494 Ok(())
495 })
496}
497
498fn encode_with(
499 mode: EncodeMode,
500 f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
501) -> Result<Vec<u8>, LoroEncodeError> {
502 let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
504 ans.extend(MAGIC_BYTES);
505 let checksum = [0; 16];
506 ans.extend(checksum);
507 ans.extend(mode.to_bytes());
508
509 f(&mut ans)?;
511
512 let checksum_body = &ans[20..];
514 let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
515 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
516 Ok(ans)
517}
518
519pub(crate) fn decode_snapshot(
520 doc: &LoroDoc,
521 mode: EncodeMode,
522 body: &[u8],
523 origin: InternalString,
524) -> Result<ImportStatus, LoroError> {
525 match mode {
526 EncodeMode::OutdatedSnapshot => {
527 outdated_encode_reordered::decode_snapshot(doc, body, origin)?
528 }
529 EncodeMode::FastSnapshot => {
530 fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
531 }
532 _ => unreachable!(),
533 };
534 Ok(ImportStatus {
535 success: VersionRange::from_vv(&doc.oplog_vv()),
536 pending: None,
537 })
538}
539
540#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
541pub enum EncodedBlobMode {
542 Snapshot,
543 OutdatedSnapshot,
544 ShallowSnapshot,
545 OutdatedRle,
546 Updates,
547}
548
549impl std::fmt::Display for EncodedBlobMode {
550 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551 f.write_str(match self {
552 EncodedBlobMode::OutdatedRle => "outdated-update",
553 EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
554 EncodedBlobMode::Snapshot => "snapshot",
555 EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
556 EncodedBlobMode::Updates => "update",
557 })
558 }
559}
560
561impl EncodedBlobMode {
562 pub fn is_snapshot(&self) -> bool {
563 matches!(
564 self,
565 EncodedBlobMode::Snapshot
566 | EncodedBlobMode::ShallowSnapshot
567 | EncodedBlobMode::OutdatedSnapshot
568 )
569 }
570}
571
572#[derive(Debug, Clone)]
573pub struct ImportBlobMetadata {
574 pub partial_start_vv: VersionVector,
580 pub partial_end_vv: VersionVector,
586 pub start_timestamp: i64,
587 pub start_frontiers: Frontiers,
588 pub end_timestamp: i64,
589 pub change_num: u32,
590 pub mode: EncodedBlobMode,
591}
592
593impl LoroDoc {
594 pub fn decode_import_blob_meta(
596 blob: &[u8],
597 check_checksum: bool,
598 ) -> LoroResult<ImportBlobMetadata> {
599 let parsed = parse_header_and_body(blob, check_checksum)?;
600 match parsed.mode {
601 EncodeMode::Auto => unreachable!(),
602 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
603 outdated_encode_reordered::decode_import_blob_meta(parsed)
604 }
605 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
606 EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
607 }
608 }
609}
610
611#[cfg(test)]
612mod test {
613
614 use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
615
616 #[test]
617 fn test_value_encode_size() {
618 fn assert_size(value: LoroValue, max_size: usize) {
619 let size = postcard::to_allocvec(&value).unwrap().len();
620 assert!(
621 size <= max_size,
622 "value: {:?}, size: {}, max_size: {}",
623 value,
624 size,
625 max_size
626 );
627 }
628
629 assert_size(LoroValue::Null, 1);
630 assert_size(LoroValue::I64(1), 2);
631 assert_size(LoroValue::Double(1.), 9);
632 assert_size(LoroValue::Bool(true), 2);
633 assert_size(LoroValue::String("123".to_string().into()), 5);
634 assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
635 assert_size(
636 loro_value!({
637 "a": 1,
638 "b": 2,
639 }),
640 10,
641 );
642 assert_size(loro_value!([1, 2, 3]), 8);
643 assert_size(
644 LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
645 5,
646 );
647 assert_size(
648 LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
649 5,
650 );
651 }
652}