1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9 decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::op::OpWithId;
15use crate::version::{Frontiers, VersionRange};
16use crate::LoroDoc;
17use crate::{oplog::OpLog, LoroError, VersionVector};
18use loro_common::{HasIdSpan, IdLpSpan, IdSpan, LoroEncodeError, LoroResult, PeerID, ID};
19use num_traits::{FromPrimitive, ToPrimitive};
20use rle::{HasLength, Sliceable};
21use std::borrow::Cow;
22
23#[non_exhaustive]
53#[derive(Debug, Clone)]
54pub enum ExportMode<'a> {
55 Snapshot,
57 Updates { from: Cow<'a, VersionVector> },
59 UpdatesInRange { spans: Cow<'a, [IdSpan]> },
61 ShallowSnapshot(Cow<'a, Frontiers>),
63 StateOnly(Option<Cow<'a, Frontiers>>),
69 SnapshotAt { version: Cow<'a, Frontiers> },
72}
73
74impl<'a> ExportMode<'a> {
75 pub fn snapshot() -> Self {
77 ExportMode::Snapshot
78 }
79
80 pub fn updates(from: &'a VersionVector) -> Self {
82 ExportMode::Updates {
83 from: Cow::Borrowed(from),
84 }
85 }
86
87 pub fn updates_owned(from: VersionVector) -> Self {
89 ExportMode::Updates {
90 from: Cow::Owned(from),
91 }
92 }
93
94 pub fn all_updates() -> Self {
96 ExportMode::Updates {
97 from: Cow::Owned(Default::default()),
98 }
99 }
100
101 pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
103 ExportMode::UpdatesInRange {
104 spans: spans.into(),
105 }
106 }
107
108 pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
110 ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
111 }
112
113 pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
115 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
116 }
117
118 pub fn shallow_snapshot_since(id: ID) -> Self {
120 let frontiers = Frontiers::from_id(id);
121 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
122 }
123
124 pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
130 ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
131 }
132
133 pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
136 ExportMode::SnapshotAt {
137 version: Cow::Borrowed(frontiers),
138 }
139 }
140
141 pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
143 let mut spans = Vec::with_capacity(vv.len());
144 for (peer, counter) in vv.iter() {
145 if *counter > 0 {
146 spans.push(IdSpan::new(*peer, 0, *counter));
147 }
148 }
149
150 ExportMode::UpdatesInRange {
151 spans: Cow::Owned(spans),
152 }
153 }
154}
155
156const MAGIC_BYTES: [u8; 4] = *b"loro";
157
158#[derive(Clone, Copy, Debug, PartialEq, Eq)]
159pub(crate) enum EncodeMode {
160 Auto = 255,
162 OutdatedRle = 1,
163 OutdatedSnapshot = 2,
164 FastSnapshot = 3,
165 FastUpdates = 4,
166}
167
168impl num_traits::FromPrimitive for EncodeMode {
169 #[allow(trivial_numeric_casts)]
170 #[inline]
171 fn from_i64(n: i64) -> Option<Self> {
172 match n {
173 n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
174 n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
175 n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
176 n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
177 n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
178 _ => None,
179 }
180 }
181 #[inline]
182 fn from_u64(n: u64) -> Option<Self> {
183 Self::from_i64(n as i64)
184 }
185}
186
187impl num_traits::ToPrimitive for EncodeMode {
188 #[inline]
189 #[allow(trivial_numeric_casts)]
190 fn to_i64(&self) -> Option<i64> {
191 Some(match *self {
192 EncodeMode::Auto => EncodeMode::Auto as i64,
193 EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
194 EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
195 EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
196 EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
197 })
198 }
199 #[inline]
200 fn to_u64(&self) -> Option<u64> {
201 self.to_i64().map(|x| x as u64)
202 }
203}
204
205impl EncodeMode {
206 pub fn to_bytes(self) -> [u8; 2] {
207 let value = self.to_u16().unwrap();
208 value.to_be_bytes()
209 }
210
211 pub fn is_snapshot(self) -> bool {
212 matches!(
213 self,
214 EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
215 )
216 }
217}
218
219impl TryFrom<[u8; 2]> for EncodeMode {
220 type Error = LoroError;
221
222 fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
223 let value = u16::from_be_bytes(value);
224 Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, Default)]
229pub struct ImportStatus {
230 pub success: VersionRange,
231 pub pending: Option<VersionRange>,
232}
233
234pub(crate) struct StateSnapshotEncoder<'a> {
245 check_idspan: &'a dyn Fn(IdLpSpan) -> Result<(), IdLpSpan>,
249 encoder_by_op: &'a mut dyn FnMut(OpWithId),
251 record_idspan: &'a mut dyn FnMut(IdLpSpan),
254 register_peer: &'a mut dyn FnMut(PeerID) -> usize,
255 #[allow(unused)]
256 mode: EncodeMode,
257}
258
259impl StateSnapshotEncoder<'_> {
260 pub fn encode_op(&mut self, id_span: IdLpSpan, get_op: impl FnOnce() -> OpWithId) {
261 if let Err(span) = (self.check_idspan)(id_span) {
262 let mut op = get_op();
263 if span == id_span {
264 (self.encoder_by_op)(op);
265 } else {
266 debug_assert_eq!(span.lamport.start, id_span.lamport.start);
267 op.op = op.op.slice(span.atom_len(), op.op.atom_len());
268 (self.encoder_by_op)(op);
269 }
270 }
271
272 (self.record_idspan)(id_span);
273 }
274
275 #[allow(unused)]
276 pub fn mode(&self) -> EncodeMode {
277 self.mode
278 }
279
280 pub(crate) fn register_peer(&mut self, peer: PeerID) -> usize {
281 (self.register_peer)(peer)
282 }
283}
284
285pub(crate) struct StateSnapshotDecodeContext<'a> {
286 pub oplog: &'a OpLog,
287 pub peers: &'a [PeerID],
288 pub ops: &'a mut dyn Iterator<Item = OpWithId>,
289 #[allow(unused)]
290 pub blob: &'a [u8],
291 pub mode: EncodeMode,
292}
293
294pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode) -> Vec<u8> {
295 let mode = match mode {
296 EncodeMode::Auto => EncodeMode::OutdatedRle,
297 mode => mode,
298 };
299
300 let body = match &mode {
301 EncodeMode::OutdatedRle => outdated_encode_reordered::encode_updates(oplog, vv),
302 _ => unreachable!(),
303 };
304
305 encode_header_and_body(mode, body)
306}
307
308pub(crate) fn decode_oplog(
309 oplog: &mut OpLog,
310 parsed: ParsedHeaderAndBody,
311) -> Result<ImportStatus, LoroError> {
312 let ParsedHeaderAndBody { mode, body, .. } = parsed;
313 let changes = match mode {
314 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
315 outdated_encode_reordered::decode_updates(oplog, body)
316 }
317 EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
318 EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
319 EncodeMode::Auto => unreachable!(),
320 }?;
321 let ImportChangesResult {
322 mut imported,
323 latest_ids,
324 pending_changes,
325 changes_that_have_deps_before_shallow_root,
326 } = import_changes_to_oplog(changes, oplog);
327
328 let mut pending = VersionRange::default();
329 pending_changes.iter().for_each(|c| {
330 pending.extends_to_include_id_span(c.id_span());
331 });
332 oplog.try_apply_pending(latest_ids, Some(&mut imported));
334 oplog.import_unknown_lamport_pending_changes(pending_changes)?;
335 if !changes_that_have_deps_before_shallow_root.is_empty() {
336 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
337 }
338 Ok(ImportStatus {
339 success: imported,
340 pending: (!pending.is_empty()).then_some(pending),
341 })
342}
343
344pub(crate) struct ParsedHeaderAndBody<'a> {
345 pub checksum: [u8; 16],
346 pub checksum_body: &'a [u8],
347 pub mode: EncodeMode,
348 pub body: &'a [u8],
349}
350
351const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
352impl ParsedHeaderAndBody<'_> {
353 fn check_checksum(&self) -> LoroResult<()> {
355 match self.mode {
356 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
357 if md5::compute(self.checksum_body).0 != self.checksum {
358 return Err(LoroError::DecodeChecksumMismatchError);
359 }
360 }
361 EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
362 let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap());
363 if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
364 return Err(LoroError::DecodeChecksumMismatchError);
365 }
366 }
367 EncodeMode::Auto => unreachable!(),
368 }
369
370 Ok(())
371 }
372}
373
374const MIN_HEADER_SIZE: usize = 22;
375pub(crate) fn parse_header_and_body(
376 bytes: &[u8],
377 check_checksum: bool,
378) -> Result<ParsedHeaderAndBody, LoroError> {
379 let reader = &bytes;
380 if bytes.len() < MIN_HEADER_SIZE {
381 return Err(LoroError::DecodeError("Invalid import data".into()));
382 }
383
384 let (magic_bytes, reader) = reader.split_at(4);
385 let magic_bytes: [u8; 4] = magic_bytes.try_into().unwrap();
386 if magic_bytes != MAGIC_BYTES {
387 return Err(LoroError::DecodeError("Invalid magic bytes".into()));
388 }
389
390 let (checksum, reader) = reader.split_at(16);
391 let checksum_body = reader;
392 let (mode_bytes, reader) = reader.split_at(2);
393 let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
394
395 let ans = ParsedHeaderAndBody {
396 mode,
397 checksum_body,
398 checksum: checksum.try_into().unwrap(),
399 body: reader,
400 };
401
402 if check_checksum {
403 ans.check_checksum()?;
404 }
405 Ok(ans)
406}
407
408fn encode_header_and_body(mode: EncodeMode, body: Vec<u8>) -> Vec<u8> {
409 let mut ans = Vec::new();
410 ans.extend(MAGIC_BYTES);
411 let checksum = [0; 16];
412 ans.extend(checksum);
413 ans.extend(mode.to_bytes());
414 ans.extend(body);
415 let checksum_body = &ans[20..];
416 let checksum = md5::compute(checksum_body).0;
417 ans[4..20].copy_from_slice(&checksum);
418 ans
419}
420
421pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
422 let body = outdated_encode_reordered::encode_snapshot(
423 &doc.oplog().try_lock().unwrap(),
424 &mut doc.app_state().try_lock().unwrap(),
425 &Default::default(),
426 );
427
428 encode_header_and_body(EncodeMode::OutdatedSnapshot, body)
429}
430
431pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
432 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
433 fast_snapshot::encode_snapshot(doc, ans);
434 Ok(())
435 })
436 .unwrap()
437}
438
439pub(crate) fn export_snapshot_at(
440 doc: &LoroDoc,
441 frontiers: &Frontiers,
442) -> Result<Vec<u8>, LoroEncodeError> {
443 check_target_version_reachable(doc, frontiers)?;
444 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
445 shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
446 })
447}
448
449pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
450 encode_with(EncodeMode::FastUpdates, &mut |ans| {
451 fast_snapshot::encode_updates(doc, vv, ans);
452 Ok(())
453 })
454 .unwrap()
455}
456
457pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
458 encode_with(EncodeMode::FastUpdates, &mut |ans| {
459 fast_snapshot::encode_updates_in_range(oplog, spans, ans);
460 Ok(())
461 })
462 .unwrap()
463}
464
465pub(crate) fn export_shallow_snapshot(
466 doc: &LoroDoc,
467 f: &Frontiers,
468) -> Result<Vec<u8>, LoroEncodeError> {
469 check_target_version_reachable(doc, f)?;
470 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
471 shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
472 Ok(())
473 })
474}
475
476fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
477 let oplog = doc.oplog.try_lock().unwrap();
478 if !oplog.dag.can_export_shallow_snapshot_on(f) {
479 return Err(LoroEncodeError::FrontiersNotFound(format!("{:?}", f)));
480 }
481
482 Ok(())
483}
484
485pub(crate) fn export_state_only_snapshot(
486 doc: &LoroDoc,
487 f: &Frontiers,
488) -> Result<Vec<u8>, LoroEncodeError> {
489 check_target_version_reachable(doc, f)?;
490 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
491 shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
492 Ok(())
493 })
494}
495
496fn encode_with(
497 mode: EncodeMode,
498 f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
499) -> Result<Vec<u8>, LoroEncodeError> {
500 let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
502 ans.extend(MAGIC_BYTES);
503 let checksum = [0; 16];
504 ans.extend(checksum);
505 ans.extend(mode.to_bytes());
506
507 f(&mut ans)?;
509
510 let checksum_body = &ans[20..];
512 let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
513 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
514 Ok(ans)
515}
516
517pub(crate) fn decode_snapshot(
518 doc: &LoroDoc,
519 mode: EncodeMode,
520 body: &[u8],
521) -> Result<ImportStatus, LoroError> {
522 match mode {
523 EncodeMode::OutdatedSnapshot => outdated_encode_reordered::decode_snapshot(doc, body)?,
524 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into())?,
525 _ => unreachable!(),
526 };
527 Ok(ImportStatus {
528 success: VersionRange::from_vv(&doc.oplog_vv()),
529 pending: None,
530 })
531}
532
533#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
534pub enum EncodedBlobMode {
535 Snapshot,
536 OutdatedSnapshot,
537 ShallowSnapshot,
538 OutdatedRle,
539 Updates,
540}
541
542impl std::fmt::Display for EncodedBlobMode {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 f.write_str(match self {
545 EncodedBlobMode::OutdatedRle => "outdated-update",
546 EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
547 EncodedBlobMode::Snapshot => "snapshot",
548 EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
549 EncodedBlobMode::Updates => "update",
550 })
551 }
552}
553
554impl EncodedBlobMode {
555 pub fn is_snapshot(&self) -> bool {
556 matches!(
557 self,
558 EncodedBlobMode::Snapshot
559 | EncodedBlobMode::ShallowSnapshot
560 | EncodedBlobMode::OutdatedSnapshot
561 )
562 }
563}
564
565#[derive(Debug, Clone)]
566pub struct ImportBlobMetadata {
567 pub partial_start_vv: VersionVector,
573 pub partial_end_vv: VersionVector,
579 pub start_timestamp: i64,
580 pub start_frontiers: Frontiers,
581 pub end_timestamp: i64,
582 pub change_num: u32,
583 pub mode: EncodedBlobMode,
584}
585
586impl LoroDoc {
587 pub fn decode_import_blob_meta(
589 blob: &[u8],
590 check_checksum: bool,
591 ) -> LoroResult<ImportBlobMetadata> {
592 let parsed = parse_header_and_body(blob, check_checksum)?;
593 match parsed.mode {
594 EncodeMode::Auto => unreachable!(),
595 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
596 outdated_encode_reordered::decode_import_blob_meta(parsed)
597 }
598 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
599 EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
600 }
601 }
602}
603
604#[cfg(test)]
605mod test {
606
607 use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
608
609 #[test]
610 fn test_value_encode_size() {
611 fn assert_size(value: LoroValue, max_size: usize) {
612 let size = postcard::to_allocvec(&value).unwrap().len();
613 assert!(
614 size <= max_size,
615 "value: {:?}, size: {}, max_size: {}",
616 value,
617 size,
618 max_size
619 );
620 }
621
622 assert_size(LoroValue::Null, 1);
623 assert_size(LoroValue::I64(1), 2);
624 assert_size(LoroValue::Double(1.), 9);
625 assert_size(LoroValue::Bool(true), 2);
626 assert_size(LoroValue::String("123".to_string().into()), 5);
627 assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
628 assert_size(
629 loro_value!({
630 "a": 1,
631 "b": 2,
632 }),
633 10,
634 );
635 assert_size(loro_value!([1, 2, 3]), 8);
636 assert_size(
637 LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
638 5,
639 );
640 assert_size(
641 LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
642 5,
643 );
644 }
645}