1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9 decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::version::{Frontiers, VersionRange};
15use crate::LoroDoc;
16use crate::{oplog::OpLog, LoroError, VersionVector};
17use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
18use num_traits::{FromPrimitive, ToPrimitive};
19use std::borrow::Cow;
20
21#[non_exhaustive]
51#[derive(Debug, Clone)]
52pub enum ExportMode<'a> {
53 Snapshot,
55 Updates { from: Cow<'a, VersionVector> },
57 UpdatesInRange { spans: Cow<'a, [IdSpan]> },
59 ShallowSnapshot(Cow<'a, Frontiers>),
61 StateOnly(Option<Cow<'a, Frontiers>>),
67 SnapshotAt { version: Cow<'a, Frontiers> },
70}
71
72impl<'a> ExportMode<'a> {
73 pub fn snapshot() -> Self {
75 ExportMode::Snapshot
76 }
77
78 pub fn updates(from: &'a VersionVector) -> Self {
80 ExportMode::Updates {
81 from: Cow::Borrowed(from),
82 }
83 }
84
85 pub fn updates_owned(from: VersionVector) -> Self {
87 ExportMode::Updates {
88 from: Cow::Owned(from),
89 }
90 }
91
92 pub fn all_updates() -> Self {
94 ExportMode::Updates {
95 from: Cow::Owned(Default::default()),
96 }
97 }
98
99 pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
101 ExportMode::UpdatesInRange {
102 spans: spans.into(),
103 }
104 }
105
106 pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
108 ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
109 }
110
111 pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
113 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
114 }
115
116 pub fn shallow_snapshot_since(id: ID) -> Self {
118 let frontiers = Frontiers::from_id(id);
119 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
120 }
121
122 pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
128 ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
129 }
130
131 pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
134 ExportMode::SnapshotAt {
135 version: Cow::Borrowed(frontiers),
136 }
137 }
138
139 pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
141 let mut spans = Vec::with_capacity(vv.len());
142 for (peer, counter) in vv.iter() {
143 if *counter > 0 {
144 spans.push(IdSpan::new(*peer, 0, *counter));
145 }
146 }
147
148 ExportMode::UpdatesInRange {
149 spans: Cow::Owned(spans),
150 }
151 }
152}
153
154const MAGIC_BYTES: [u8; 4] = *b"loro";
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157pub(crate) enum EncodeMode {
158 Auto = 255,
160 OutdatedRle = 1,
161 OutdatedSnapshot = 2,
162 FastSnapshot = 3,
163 FastUpdates = 4,
164}
165
166impl num_traits::FromPrimitive for EncodeMode {
167 #[allow(trivial_numeric_casts)]
168 #[inline]
169 fn from_i64(n: i64) -> Option<Self> {
170 match n {
171 n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
172 n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
173 n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
174 n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
175 n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
176 _ => None,
177 }
178 }
179 #[inline]
180 fn from_u64(n: u64) -> Option<Self> {
181 Self::from_i64(n as i64)
182 }
183}
184
185impl num_traits::ToPrimitive for EncodeMode {
186 #[inline]
187 #[allow(trivial_numeric_casts)]
188 fn to_i64(&self) -> Option<i64> {
189 Some(match *self {
190 EncodeMode::Auto => EncodeMode::Auto as i64,
191 EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
192 EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
193 EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
194 EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
195 })
196 }
197 #[inline]
198 fn to_u64(&self) -> Option<u64> {
199 self.to_i64().map(|x| x as u64)
200 }
201}
202
203impl EncodeMode {
204 pub fn to_bytes(self) -> [u8; 2] {
205 let value = self.to_u16().unwrap();
206 value.to_be_bytes()
207 }
208
209 pub fn is_snapshot(self) -> bool {
210 matches!(
211 self,
212 EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
213 )
214 }
215}
216
217impl TryFrom<[u8; 2]> for EncodeMode {
218 type Error = LoroError;
219
220 fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
221 let value = u16::from_be_bytes(value);
222 Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
223 }
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Default)]
227pub struct ImportStatus {
228 pub success: VersionRange,
229 pub pending: Option<VersionRange>,
230}
231
232pub(crate) fn decode_oplog(
233 oplog: &mut OpLog,
234 parsed: ParsedHeaderAndBody,
235) -> Result<ImportStatus, LoroError> {
236 let ParsedHeaderAndBody { mode, body, .. } = parsed;
237 let changes = match mode {
238 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
239 return Err(LoroError::ImportUnsupportedEncodingMode);
240 }
241 EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
242 EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
243 EncodeMode::Auto => unreachable!(),
244 }?;
245 let ImportChangesResult {
246 mut imported,
247 latest_ids,
248 pending_changes,
249 changes_that_have_deps_before_shallow_root,
250 } = import_changes_to_oplog(changes, oplog);
251
252 let mut pending = VersionRange::default();
253 pending_changes.iter().for_each(|c| {
254 pending.extends_to_include_id_span(c.id_span());
255 });
256 oplog.try_apply_pending(latest_ids, Some(&mut imported));
258 oplog.import_unknown_lamport_pending_changes(pending_changes)?;
259 if !changes_that_have_deps_before_shallow_root.is_empty() {
260 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
261 }
262 Ok(ImportStatus {
263 success: imported,
264 pending: (!pending.is_empty()).then_some(pending),
265 })
266}
267
268pub(crate) struct ParsedHeaderAndBody<'a> {
269 pub checksum: [u8; 16],
270 pub checksum_body: &'a [u8],
271 pub mode: EncodeMode,
272 pub body: &'a [u8],
273}
274
275const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
276impl ParsedHeaderAndBody<'_> {
277 fn check_checksum(&self) -> LoroResult<()> {
279 match self.mode {
280 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
281 if md5::compute(self.checksum_body).0 != self.checksum {
282 return Err(LoroError::DecodeChecksumMismatchError);
283 }
284 }
285 EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
286 let mut expected_bytes = [0; 4];
287 expected_bytes.copy_from_slice(&self.checksum[12..16]);
288 let expected = u32::from_le_bytes(expected_bytes);
289 if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
290 return Err(LoroError::DecodeChecksumMismatchError);
291 }
292 }
293 EncodeMode::Auto => {
294 return Err(LoroError::DecodeError(
295 "Invalid import mode `Auto` in encoded blob"
296 .to_string()
297 .into_boxed_str(),
298 ));
299 }
300 }
301
302 Ok(())
303 }
304}
305
306const MIN_HEADER_SIZE: usize = 22;
307pub(crate) fn parse_header_and_body(
308 bytes: &[u8],
309 check_checksum: bool,
310) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
311 let reader = &bytes;
312 if bytes.len() < MIN_HEADER_SIZE {
313 return Err(LoroError::DecodeError("Invalid import data".into()));
314 }
315
316 let (magic_bytes, reader) = reader.split_at(4);
317 if magic_bytes != MAGIC_BYTES {
318 return Err(LoroError::DecodeError("Invalid magic bytes".into()));
319 }
320
321 let (checksum, reader) = reader.split_at(16);
322 let checksum_body = reader;
323 let (mode_bytes, reader) = reader.split_at(2);
324 let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
325 let mut checksum_arr = [0; 16];
326 checksum_arr.copy_from_slice(checksum);
327
328 let ans = ParsedHeaderAndBody {
329 mode,
330 checksum_body,
331 checksum: checksum_arr,
332 body: reader,
333 };
334
335 if check_checksum {
336 ans.check_checksum()?;
337 }
338 Ok(ans)
339}
340
341pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
342 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
343 fast_snapshot::encode_snapshot(doc, ans);
344 Ok(())
345 })
346 .unwrap()
347}
348
349pub(crate) fn export_snapshot_at(
350 doc: &LoroDoc,
351 frontiers: &Frontiers,
352) -> Result<Vec<u8>, LoroEncodeError> {
353 check_target_version_reachable(doc, frontiers)?;
354 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
355 shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
356 })
357}
358
359pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
360 encode_with(EncodeMode::FastUpdates, &mut |ans| {
361 fast_snapshot::encode_updates(doc, vv, ans);
362 Ok(())
363 })
364 .unwrap()
365}
366
367pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
368 encode_with(EncodeMode::FastUpdates, &mut |ans| {
369 fast_snapshot::encode_updates_in_range(oplog, spans, ans);
370 Ok(())
371 })
372 .unwrap()
373}
374
375pub(crate) fn export_shallow_snapshot(
376 doc: &LoroDoc,
377 f: &Frontiers,
378) -> Result<Vec<u8>, LoroEncodeError> {
379 check_target_version_reachable(doc, f)?;
380 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
381 shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
382 Ok(())
383 })
384}
385
386fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
387 let oplog = doc.oplog.lock();
388 if !oplog.dag.can_export_shallow_snapshot_on(f) {
389 return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
390 }
391
392 Ok(())
393}
394
395pub(crate) fn export_state_only_snapshot(
396 doc: &LoroDoc,
397 f: &Frontiers,
398) -> Result<Vec<u8>, LoroEncodeError> {
399 check_target_version_reachable(doc, f)?;
400 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
401 shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
402 Ok(())
403 })
404}
405
406fn encode_with(
407 mode: EncodeMode,
408 f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
409) -> Result<Vec<u8>, LoroEncodeError> {
410 let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
412 ans.extend(MAGIC_BYTES);
413 let checksum = [0; 16];
414 ans.extend(checksum);
415 ans.extend(mode.to_bytes());
416
417 f(&mut ans)?;
419
420 let checksum_body = &ans[20..];
422 let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
423 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
424 Ok(ans)
425}
426
427pub(crate) fn decode_snapshot(
428 doc: &LoroDoc,
429 mode: EncodeMode,
430 body: &[u8],
431 origin: InternalString,
432) -> Result<ImportStatus, LoroError> {
433 match mode {
434 EncodeMode::OutdatedSnapshot => {
435 return Err(LoroError::ImportUnsupportedEncodingMode);
436 }
437 EncodeMode::FastSnapshot => {
438 fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
439 }
440 _ => {
441 return Err(LoroError::DecodeError(
442 format!("Invalid snapshot encoding mode: {mode:?}").into_boxed_str(),
443 ));
444 }
445 };
446 Ok(ImportStatus {
447 success: VersionRange::from_vv(&doc.oplog_vv()),
448 pending: None,
449 })
450}
451
452#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
453pub enum EncodedBlobMode {
454 Snapshot,
455 OutdatedSnapshot,
456 ShallowSnapshot,
457 OutdatedRle,
458 Updates,
459}
460
461impl std::fmt::Display for EncodedBlobMode {
462 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463 f.write_str(match self {
464 EncodedBlobMode::OutdatedRle => "outdated-update",
465 EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
466 EncodedBlobMode::Snapshot => "snapshot",
467 EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
468 EncodedBlobMode::Updates => "update",
469 })
470 }
471}
472
473impl EncodedBlobMode {
474 pub fn is_snapshot(&self) -> bool {
475 matches!(
476 self,
477 EncodedBlobMode::Snapshot
478 | EncodedBlobMode::ShallowSnapshot
479 | EncodedBlobMode::OutdatedSnapshot
480 )
481 }
482}
483
484#[derive(Debug, Clone)]
485pub struct ImportBlobMetadata {
486 pub partial_start_vv: VersionVector,
492 pub partial_end_vv: VersionVector,
498 pub start_timestamp: i64,
499 pub start_frontiers: Frontiers,
500 pub end_timestamp: i64,
501 pub change_num: u32,
502 pub mode: EncodedBlobMode,
503}
504
505impl LoroDoc {
506 pub fn decode_import_blob_meta(
508 blob: &[u8],
509 check_checksum: bool,
510 ) -> LoroResult<ImportBlobMetadata> {
511 let parsed = parse_header_and_body(blob, check_checksum)?;
512 match parsed.mode {
513 EncodeMode::Auto => unreachable!(),
514 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
515 Err(LoroError::ImportUnsupportedEncodingMode)
516 }
517 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
518 EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
519 }
520 }
521}
522
523#[cfg(test)]
524mod test {
525
526 use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
527
528 #[test]
529 fn test_value_encode_size() {
530 fn assert_size(value: LoroValue, max_size: usize) {
531 let size = postcard::to_allocvec(&value).unwrap().len();
532 assert!(
533 size <= max_size,
534 "value: {:?}, size: {}, max_size: {}",
535 value,
536 size,
537 max_size
538 );
539 }
540
541 assert_size(LoroValue::Null, 1);
542 assert_size(LoroValue::I64(1), 2);
543 assert_size(LoroValue::Double(1.), 9);
544 assert_size(LoroValue::Bool(true), 2);
545 assert_size(LoroValue::String("123".to_string().into()), 5);
546 assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
547 assert_size(
548 loro_value!({
549 "a": 1,
550 "b": 2,
551 }),
552 10,
553 );
554 assert_size(loro_value!([1, 2, 3]), 8);
555 assert_size(
556 LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
557 5,
558 );
559 assert_size(
560 LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
561 5,
562 );
563 }
564}