1pub(crate) mod arena;
2pub(crate) mod fast_snapshot;
3pub(crate) mod json_schema;
4mod outdated_encode_reordered;
5mod shallow_snapshot;
6pub(crate) mod value;
7pub(crate) mod value_register;
8pub(crate) use outdated_encode_reordered::{
9 decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
10};
11use outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
12pub(crate) use value::OwnedValue;
13
14use crate::version::{Frontiers, VersionRange};
15use crate::LoroDoc;
16use crate::{oplog::OpLog, LoroError, VersionVector};
17use loro_common::{HasIdSpan, IdSpan, InternalString, LoroEncodeError, LoroResult, ID};
18use num_traits::{FromPrimitive, ToPrimitive};
19use std::borrow::Cow;
20
21#[non_exhaustive]
51#[derive(Debug, Clone)]
52pub enum ExportMode<'a> {
53 Snapshot,
55 Updates { from: Cow<'a, VersionVector> },
57 UpdatesInRange { spans: Cow<'a, [IdSpan]> },
59 ShallowSnapshot(Cow<'a, Frontiers>),
61 StateOnly(Option<Cow<'a, Frontiers>>),
67 SnapshotAt { version: Cow<'a, Frontiers> },
70}
71
72impl<'a> ExportMode<'a> {
73 pub fn snapshot() -> Self {
75 ExportMode::Snapshot
76 }
77
78 pub fn updates(from: &'a VersionVector) -> Self {
80 ExportMode::Updates {
81 from: Cow::Borrowed(from),
82 }
83 }
84
85 pub fn updates_owned(from: VersionVector) -> Self {
87 ExportMode::Updates {
88 from: Cow::Owned(from),
89 }
90 }
91
92 pub fn all_updates() -> Self {
94 ExportMode::Updates {
95 from: Cow::Owned(Default::default()),
96 }
97 }
98
99 pub fn updates_in_range(spans: impl Into<Cow<'a, [IdSpan]>>) -> Self {
101 ExportMode::UpdatesInRange {
102 spans: spans.into(),
103 }
104 }
105
106 pub fn shallow_snapshot(frontiers: &'a Frontiers) -> Self {
108 ExportMode::ShallowSnapshot(Cow::Borrowed(frontiers))
109 }
110
111 pub fn shallow_snapshot_owned(frontiers: Frontiers) -> Self {
113 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
114 }
115
116 pub fn shallow_snapshot_since(id: ID) -> Self {
118 let frontiers = Frontiers::from_id(id);
119 ExportMode::ShallowSnapshot(Cow::Owned(frontiers))
120 }
121
122 pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
128 ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
129 }
130
131 pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
134 ExportMode::SnapshotAt {
135 version: Cow::Borrowed(frontiers),
136 }
137 }
138
139 pub fn updates_till(vv: &VersionVector) -> ExportMode<'static> {
141 let mut spans = Vec::with_capacity(vv.len());
142 for (peer, counter) in vv.iter() {
143 if *counter > 0 {
144 spans.push(IdSpan::new(*peer, 0, *counter));
145 }
146 }
147
148 ExportMode::UpdatesInRange {
149 spans: Cow::Owned(spans),
150 }
151 }
152}
153
154const MAGIC_BYTES: [u8; 4] = *b"loro";
155
156#[derive(Clone, Copy, Debug, PartialEq, Eq)]
157pub(crate) enum EncodeMode {
158 Auto = 255,
160 OutdatedRle = 1,
161 OutdatedSnapshot = 2,
162 FastSnapshot = 3,
163 FastUpdates = 4,
164}
165
166impl num_traits::FromPrimitive for EncodeMode {
167 #[allow(trivial_numeric_casts)]
168 #[inline]
169 fn from_i64(n: i64) -> Option<Self> {
170 match n {
171 n if n == EncodeMode::Auto as i64 => Some(EncodeMode::Auto),
172 n if n == EncodeMode::OutdatedRle as i64 => Some(EncodeMode::OutdatedRle),
173 n if n == EncodeMode::OutdatedSnapshot as i64 => Some(EncodeMode::OutdatedSnapshot),
174 n if n == EncodeMode::FastSnapshot as i64 => Some(EncodeMode::FastSnapshot),
175 n if n == EncodeMode::FastUpdates as i64 => Some(EncodeMode::FastUpdates),
176 _ => None,
177 }
178 }
179 #[inline]
180 fn from_u64(n: u64) -> Option<Self> {
181 Self::from_i64(n as i64)
182 }
183}
184
185impl num_traits::ToPrimitive for EncodeMode {
186 #[inline]
187 #[allow(trivial_numeric_casts)]
188 fn to_i64(&self) -> Option<i64> {
189 Some(match *self {
190 EncodeMode::Auto => EncodeMode::Auto as i64,
191 EncodeMode::OutdatedRle => EncodeMode::OutdatedRle as i64,
192 EncodeMode::OutdatedSnapshot => EncodeMode::OutdatedSnapshot as i64,
193 EncodeMode::FastSnapshot => EncodeMode::FastSnapshot as i64,
194 EncodeMode::FastUpdates => EncodeMode::FastUpdates as i64,
195 })
196 }
197 #[inline]
198 fn to_u64(&self) -> Option<u64> {
199 self.to_i64().map(|x| x as u64)
200 }
201}
202
203impl EncodeMode {
204 pub fn to_bytes(self) -> [u8; 2] {
205 let value = self.to_u16().unwrap();
206 value.to_be_bytes()
207 }
208
209 pub fn is_snapshot(self) -> bool {
210 matches!(
211 self,
212 EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
213 )
214 }
215}
216
217impl TryFrom<[u8; 2]> for EncodeMode {
218 type Error = LoroError;
219
220 fn try_from(value: [u8; 2]) -> Result<Self, Self::Error> {
221 let value = u16::from_be_bytes(value);
222 Self::from_u16(value).ok_or(LoroError::IncompatibleFutureEncodingError(value as usize))
223 }
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Default)]
227pub struct ImportStatus {
228 pub success: VersionRange,
229 pub pending: Option<VersionRange>,
230}
231
232pub(crate) fn decode_oplog(
233 oplog: &mut OpLog,
234 parsed: ParsedHeaderAndBody,
235) -> Result<ImportStatus, LoroError> {
236 let ParsedHeaderAndBody { mode, body, .. } = parsed;
237 let changes = match mode {
238 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
239 return Err(LoroError::ImportUnsupportedEncodingMode);
240 }
241 EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
242 EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
243 EncodeMode::Auto => unreachable!(),
244 }?;
245 let ImportChangesResult {
246 mut imported,
247 latest_ids,
248 pending_changes,
249 changes_that_have_deps_before_shallow_root,
250 } = import_changes_to_oplog(changes, oplog);
251
252 let mut pending = VersionRange::default();
253 pending_changes.iter().for_each(|c| {
254 pending.extends_to_include_id_span(c.id_span());
255 });
256 oplog.try_apply_pending(latest_ids, Some(&mut imported));
258 oplog.import_unknown_lamport_pending_changes(pending_changes)?;
259 if !changes_that_have_deps_before_shallow_root.is_empty() {
260 return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
261 }
262 Ok(ImportStatus {
263 success: imported,
264 pending: (!pending.is_empty()).then_some(pending),
265 })
266}
267
268pub(crate) struct ParsedHeaderAndBody<'a> {
269 pub checksum: [u8; 16],
270 pub checksum_body: &'a [u8],
271 pub mode: EncodeMode,
272 pub body: &'a [u8],
273}
274
275const XXH_SEED: u32 = u32::from_le_bytes(*b"LORO");
276impl ParsedHeaderAndBody<'_> {
277 fn check_checksum(&self) -> LoroResult<()> {
279 match self.mode {
280 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
281 if md5::compute(self.checksum_body).0 != self.checksum {
282 return Err(LoroError::DecodeChecksumMismatchError);
283 }
284 }
285 EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
286 let expected = u32::from_le_bytes(self.checksum[12..16].try_into().unwrap());
287 if xxhash_rust::xxh32::xxh32(self.checksum_body, XXH_SEED) != expected {
288 return Err(LoroError::DecodeChecksumMismatchError);
289 }
290 }
291 EncodeMode::Auto => unreachable!(),
292 }
293
294 Ok(())
295 }
296}
297
298const MIN_HEADER_SIZE: usize = 22;
299pub(crate) fn parse_header_and_body(
300 bytes: &[u8],
301 check_checksum: bool,
302) -> Result<ParsedHeaderAndBody<'_>, LoroError> {
303 let reader = &bytes;
304 if bytes.len() < MIN_HEADER_SIZE {
305 return Err(LoroError::DecodeError("Invalid import data".into()));
306 }
307
308 let (magic_bytes, reader) = reader.split_at(4);
309 let magic_bytes: [u8; 4] = magic_bytes.try_into().unwrap();
310 if magic_bytes != MAGIC_BYTES {
311 return Err(LoroError::DecodeError("Invalid magic bytes".into()));
312 }
313
314 let (checksum, reader) = reader.split_at(16);
315 let checksum_body = reader;
316 let (mode_bytes, reader) = reader.split_at(2);
317 let mode: EncodeMode = [mode_bytes[0], mode_bytes[1]].try_into()?;
318
319 let ans = ParsedHeaderAndBody {
320 mode,
321 checksum_body,
322 checksum: checksum.try_into().unwrap(),
323 body: reader,
324 };
325
326 if check_checksum {
327 ans.check_checksum()?;
328 }
329 Ok(ans)
330}
331
332pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
333 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
334 fast_snapshot::encode_snapshot(doc, ans);
335 Ok(())
336 })
337 .unwrap()
338}
339
340pub(crate) fn export_snapshot_at(
341 doc: &LoroDoc,
342 frontiers: &Frontiers,
343) -> Result<Vec<u8>, LoroEncodeError> {
344 check_target_version_reachable(doc, frontiers)?;
345 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
346 shallow_snapshot::encode_snapshot_at(doc, frontiers, ans)
347 })
348}
349
350pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
351 encode_with(EncodeMode::FastUpdates, &mut |ans| {
352 fast_snapshot::encode_updates(doc, vv, ans);
353 Ok(())
354 })
355 .unwrap()
356}
357
358pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
359 encode_with(EncodeMode::FastUpdates, &mut |ans| {
360 fast_snapshot::encode_updates_in_range(oplog, spans, ans);
361 Ok(())
362 })
363 .unwrap()
364}
365
366pub(crate) fn export_shallow_snapshot(
367 doc: &LoroDoc,
368 f: &Frontiers,
369) -> Result<Vec<u8>, LoroEncodeError> {
370 check_target_version_reachable(doc, f)?;
371 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
372 shallow_snapshot::export_shallow_snapshot(doc, f, ans)?;
373 Ok(())
374 })
375}
376
377fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
378 let oplog = doc.oplog.lock().unwrap();
379 if !oplog.dag.can_export_shallow_snapshot_on(f) {
380 return Err(LoroEncodeError::FrontiersNotFound(format!("{f:?}")));
381 }
382
383 Ok(())
384}
385
386pub(crate) fn export_state_only_snapshot(
387 doc: &LoroDoc,
388 f: &Frontiers,
389) -> Result<Vec<u8>, LoroEncodeError> {
390 check_target_version_reachable(doc, f)?;
391 encode_with(EncodeMode::FastSnapshot, &mut |ans| {
392 shallow_snapshot::export_state_only_snapshot(doc, f, ans)?;
393 Ok(())
394 })
395}
396
397fn encode_with(
398 mode: EncodeMode,
399 f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
400) -> Result<Vec<u8>, LoroEncodeError> {
401 let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
403 ans.extend(MAGIC_BYTES);
404 let checksum = [0; 16];
405 ans.extend(checksum);
406 ans.extend(mode.to_bytes());
407
408 f(&mut ans)?;
410
411 let checksum_body = &ans[20..];
413 let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
414 ans[16..20].copy_from_slice(&checksum.to_le_bytes());
415 Ok(ans)
416}
417
418pub(crate) fn decode_snapshot(
419 doc: &LoroDoc,
420 mode: EncodeMode,
421 body: &[u8],
422 origin: InternalString,
423) -> Result<ImportStatus, LoroError> {
424 match mode {
425 EncodeMode::OutdatedSnapshot => {
426 return Err(LoroError::ImportUnsupportedEncodingMode);
427 }
428 EncodeMode::FastSnapshot => {
429 fast_snapshot::decode_snapshot(doc, body.to_vec().into(), origin)?
430 }
431 _ => unreachable!(),
432 };
433 Ok(ImportStatus {
434 success: VersionRange::from_vv(&doc.oplog_vv()),
435 pending: None,
436 })
437}
438
439#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
440pub enum EncodedBlobMode {
441 Snapshot,
442 OutdatedSnapshot,
443 ShallowSnapshot,
444 OutdatedRle,
445 Updates,
446}
447
448impl std::fmt::Display for EncodedBlobMode {
449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450 f.write_str(match self {
451 EncodedBlobMode::OutdatedRle => "outdated-update",
452 EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
453 EncodedBlobMode::Snapshot => "snapshot",
454 EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
455 EncodedBlobMode::Updates => "update",
456 })
457 }
458}
459
460impl EncodedBlobMode {
461 pub fn is_snapshot(&self) -> bool {
462 matches!(
463 self,
464 EncodedBlobMode::Snapshot
465 | EncodedBlobMode::ShallowSnapshot
466 | EncodedBlobMode::OutdatedSnapshot
467 )
468 }
469}
470
471#[derive(Debug, Clone)]
472pub struct ImportBlobMetadata {
473 pub partial_start_vv: VersionVector,
479 pub partial_end_vv: VersionVector,
485 pub start_timestamp: i64,
486 pub start_frontiers: Frontiers,
487 pub end_timestamp: i64,
488 pub change_num: u32,
489 pub mode: EncodedBlobMode,
490}
491
492impl LoroDoc {
493 pub fn decode_import_blob_meta(
495 blob: &[u8],
496 check_checksum: bool,
497 ) -> LoroResult<ImportBlobMetadata> {
498 let parsed = parse_header_and_body(blob, check_checksum)?;
499 match parsed.mode {
500 EncodeMode::Auto => unreachable!(),
501 EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
502 Err(LoroError::ImportUnsupportedEncodingMode)
503 }
504 EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
505 EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
506 }
507 }
508}
509
510#[cfg(test)]
511mod test {
512
513 use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};
514
515 #[test]
516 fn test_value_encode_size() {
517 fn assert_size(value: LoroValue, max_size: usize) {
518 let size = postcard::to_allocvec(&value).unwrap().len();
519 assert!(
520 size <= max_size,
521 "value: {:?}, size: {}, max_size: {}",
522 value,
523 size,
524 max_size
525 );
526 }
527
528 assert_size(LoroValue::Null, 1);
529 assert_size(LoroValue::I64(1), 2);
530 assert_size(LoroValue::Double(1.), 9);
531 assert_size(LoroValue::Bool(true), 2);
532 assert_size(LoroValue::String("123".to_string().into()), 5);
533 assert_size(LoroValue::Binary(vec![1, 2, 3].into()), 5);
534 assert_size(
535 loro_value!({
536 "a": 1,
537 "b": 2,
538 }),
539 10,
540 );
541 assert_size(loro_value!([1, 2, 3]), 8);
542 assert_size(
543 LoroValue::Container(ContainerID::new_normal(ID::new(1, 1), ContainerType::Map)),
544 5,
545 );
546 assert_size(
547 LoroValue::Container(ContainerID::new_root("a", ContainerType::Map)),
548 5,
549 );
550 }
551}