1mod change_store;
2pub(crate) mod loro_dag;
3mod pending_changes;
4
5use crate::sync::{AtomicUsize, Mutex};
6use bytes::Bytes;
7use std::borrow::Cow;
8use std::cell::RefCell;
9use std::cmp::Ordering;
10use std::rc::Rc;
11use std::sync::Arc;
12use tracing::trace_span;
13
14use self::change_store::iter::MergedChangeIter;
15use self::pending_changes::{PendingChanges, PendingChangesRollback};
16use super::arena::{SharedArena, SharedArenaRollback};
17use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
18use crate::configure::Configure;
19use crate::container::list::list_op;
20use crate::dag::{Dag, DagUtils};
21use crate::diff_calc::DiffMode;
22use crate::encoding::decode_oplog;
23use crate::encoding::{ImportStatus, ParsedHeaderAndBody};
24use crate::history_cache::ContainerHistoryCache;
25use crate::id::{Counter, PeerID, ID};
26use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
27use crate::span::{HasCounterSpan, HasLamportSpan};
28use crate::version::{Frontiers, ImVersionVector, VersionVector};
29use crate::LoroError;
30use change_store::{BlockOpRef, ChangeStoreRollback};
31use loro_common::{ContainerType, HasIdSpan, IdLp, IdSpan};
32use rle::{HasLength, RleVec, Sliceable};
33use smallvec::SmallVec;
34
35pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
36pub use change_store::{BlockChangeRef, ChangeStore};
37
38pub struct OpLog {
46 pub(crate) dag: AppDag,
47 pub(crate) arena: SharedArena,
48 visible_op_count: Arc<AtomicUsize>,
49 change_store: ChangeStore,
50 history_cache: Mutex<ContainerHistoryCache>,
51 pub(crate) pending_changes: PendingChanges,
55 pub(crate) batch_importing: bool,
58 pub(crate) configure: Configure,
59 pub(crate) uncommitted_change: Option<Change>,
62 pub(crate) import_rollback: Option<ImportRollback>,
63}
64
65pub(crate) struct ImportRollback {
66 old_vv: VersionVector,
67 arena: SharedArenaRollback,
68 change_store: ChangeStoreRollback,
69 pending: PendingChangesRollback,
70}
71
72#[derive(Debug, Default, Clone, Copy)]
73pub(crate) struct ImportChangesPreflight {
74 pub applies_to_dag: bool,
75 pub has_deps_before_shallow_root: bool,
76 pub needs_state_apply_rollback: bool,
77}
78
79impl std::fmt::Debug for OpLog {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("OpLog")
82 .field("dag", &self.dag)
83 .field("pending_changes", &self.pending_changes)
84 .finish()
85 }
86}
87
88impl OpLog {
89 #[inline]
90 pub(crate) fn new(visible_op_count: Arc<AtomicUsize>) -> Self {
91 let arena = SharedArena::new();
92 let cfg = Configure::default();
93 let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
94 Self {
95 visible_op_count,
96 history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
97 dag: AppDag::new(change_store.clone()),
98 change_store,
99 arena,
100 pending_changes: Default::default(),
101 batch_importing: false,
102 configure: cfg,
103 uncommitted_change: None,
104 import_rollback: None,
105 }
106 }
107
108 #[inline]
109 fn calc_visible_op_count(&self) -> usize {
110 let total = self.dag.vv().values().sum::<i32>() as usize;
111 let shallow = self
112 .dag
113 .shallow_since_vv()
114 .iter()
115 .map(|(_, ops)| *ops)
116 .sum::<i32>() as usize;
117 total - shallow
118 }
119
120 #[inline]
121 pub(crate) fn visible_op_count_exact(&self) -> usize {
122 self.calc_visible_op_count()
123 }
124
125 #[inline]
126 pub(crate) fn refresh_visible_op_count(&self) -> usize {
127 let count = self.calc_visible_op_count();
128 self.visible_op_count
129 .store(count, std::sync::atomic::Ordering::Release);
130 count
131 }
132
133 #[inline]
134 pub fn dag(&self) -> &AppDag {
135 &self.dag
136 }
137
138 pub fn change_store(&self) -> &ChangeStore {
139 &self.change_store
140 }
141
142 pub fn get_change_with_lamport_lte(
146 &self,
147 peer: PeerID,
148 lamport: Lamport,
149 ) -> Option<BlockChangeRef> {
150 let ans = self
151 .change_store
152 .get_change_by_lamport_lte(IdLp::new(peer, lamport))?;
153 debug_assert!(ans.lamport <= lamport);
154 Some(ans)
155 }
156
157 pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
158 let mut timestamp = Timestamp::default();
159 for id in f.iter() {
160 if let Some(change) = self.lookup_change(id) {
161 timestamp = timestamp.max(change.timestamp);
162 }
163 }
164
165 timestamp
166 }
167
168 #[inline]
169 pub fn is_empty(&self) -> bool {
170 self.dag.is_empty() && self.arena.can_import_snapshot()
171 }
172
173 pub(crate) fn insert_new_change(&mut self, change: Change, from_local: bool) {
175 let s = trace_span!(
176 "insert_new_change",
177 id = ?change.id,
178 lamport = change.lamport,
179 deps = ?change.deps
180 );
181 let _enter = s.enter();
182 let rollback_old_vv = self
183 .import_rollback
184 .as_ref()
185 .and_then(|x| (!x.old_vv.is_empty()).then_some(&x.old_vv));
186 self.dag
187 .handle_new_change(&change, from_local, rollback_old_vv);
188 self.history_cache
189 .lock()
190 .insert_by_new_change(&change, true, true);
191 self.register_container_and_parent_link(&change);
192 if let Some(rollback) = self.import_rollback.as_mut() {
193 self.change_store.insert_change_with_rollback(
194 change,
195 true,
196 from_local,
197 &mut rollback.change_store,
198 );
199 } else {
200 self.change_store.insert_change(change, true, from_local);
201 }
202 self.refresh_visible_op_count();
203 }
204
205 pub(crate) fn begin_import_rollback(&mut self) {
206 let arena = self.arena.checkpoint_for_rollback();
207 self.begin_import_rollback_with_arena(arena);
208 }
209
210 pub(crate) fn begin_import_rollback_with_arena(&mut self, arena: SharedArenaRollback) {
211 debug_assert!(self.import_rollback.is_none());
212 let old_vv = self.vv().clone();
213 self.dag.begin_import_rollback();
214 self.import_rollback = Some(ImportRollback {
215 old_vv: old_vv.clone(),
216 arena,
217 change_store: ChangeStoreRollback::new(old_vv),
218 pending: Default::default(),
219 });
220 }
221
222 pub(crate) fn commit_import_rollback(&mut self) {
223 self.dag.commit_import_rollback();
224 self.import_rollback = None;
225 }
226
227 pub(crate) fn preflight_import_changes(&self, changes: &[Change]) -> ImportChangesPreflight {
228 let mut ans = ImportChangesPreflight::default();
229 let pending_needs_state_apply_rollback =
230 self.pending_changes.has_state_apply_rollback_ops();
231 for change in changes {
232 if change.ctr_end() <= self.vv().get(&change.id.peer).copied().unwrap_or(0) {
233 continue;
234 }
235
236 if self.dag.is_before_shallow_root(&change.deps) {
237 ans.has_deps_before_shallow_root = true;
238 continue;
239 }
240
241 if self
242 .dag
243 .get_change_lamport_from_deps(&change.deps)
244 .is_none()
245 {
246 continue;
247 }
248
249 ans.applies_to_dag = true;
250 if change.ops.iter().any(|op| {
251 matches!(
252 op.container.get_type(),
253 ContainerType::List | ContainerType::Tree
254 )
255 }) {
256 ans.needs_state_apply_rollback = true;
257 }
258 }
259
260 if ans.applies_to_dag && pending_needs_state_apply_rollback {
266 ans.needs_state_apply_rollback = true;
267 }
268
269 #[cfg(test)]
270 if ans.applies_to_dag {
271 ans.needs_state_apply_rollback = true;
272 }
273
274 ans
275 }
276
277 pub(crate) fn rollback_import(&mut self) {
278 let Some(rollback) = self.import_rollback.take() else {
279 return;
280 };
281
282 self.change_store.rollback_import(rollback.change_store);
283 self.dag.rollback_import();
284 rollback.pending.rollback(&mut self.pending_changes);
285 self.history_cache.lock().free_all();
286 self.arena.rollback(rollback.arena);
287 self.refresh_visible_op_count();
288 }
289
290 pub(crate) fn reset_to_empty_for_failed_snapshot_import(
291 &mut self,
292 arena_checkpoint: SharedArenaRollback,
293 ) {
294 let arena = self.arena.clone();
295 let configure = self.configure.clone();
296 arena.rollback(arena_checkpoint);
297 let change_store = ChangeStore::new_mem(&arena, configure.merge_interval_in_s.clone());
298 self.history_cache = Mutex::new(ContainerHistoryCache::new(change_store.clone(), None));
299 self.dag = AppDag::new(change_store.clone());
300 self.change_store = change_store;
301 self.pending_changes = Default::default();
302 self.batch_importing = false;
303 self.configure = configure;
304 self.uncommitted_change = None;
305 self.import_rollback = None;
306 self.visible_op_count
307 .store(0, std::sync::atomic::Ordering::Release);
308 }
309
310 #[inline(always)]
311 pub(crate) fn with_history_cache<F, R>(&self, f: F) -> R
312 where
313 F: FnOnce(&mut ContainerHistoryCache) -> R,
314 {
315 let mut history_cache = self.history_cache.lock();
316 f(&mut history_cache)
317 }
318
319 pub fn has_history_cache(&self) -> bool {
320 self.history_cache.lock().has_cache()
321 }
322
323 pub fn free_history_cache(&self) {
324 let mut history_cache = self.history_cache.lock();
325 history_cache.free();
326 }
327
328 #[cfg(test)]
329 #[allow(dead_code)]
330 pub(crate) fn pending_changes_len(&self) -> usize {
331 self.pending_changes.len()
332 }
333
334 pub(crate) fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
345 self.insert_new_change(change, true);
346 Ok(())
347 }
348
349 pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
351 let Some(&end) = self.dag.vv().get(&change.id.peer) else {
352 return Some(change);
353 };
354
355 if change.id.counter >= end {
356 return Some(change);
357 }
358
359 if change.ctr_end() <= end {
360 return None;
361 }
362
363 let offset = (end - change.id.counter) as usize;
364 Some(change.slice(offset, change.atom_len()))
365 }
366
367 #[allow(unused)]
368 fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
369 let cur_end = self.dag.vv().get(&id.peer).cloned().unwrap_or(0);
370 if cur_end > id.counter {
371 return Err(LoroError::UsedOpID { id });
372 }
373
374 Ok(())
375 }
376
377 pub(crate) fn check_change_greater_than_last_peer_id(
382 &self,
383 peer: PeerID,
384 counter: Counter,
385 deps: &Frontiers,
386 ) -> Result<(), LoroError> {
387 if counter == 0 {
388 return Ok(());
389 }
390
391 if !self.configure.detached_editing() {
392 return Ok(());
393 }
394
395 let mut max_last_counter = -1;
396 for dep in deps.iter() {
397 let dep_vv = self
398 .dag
399 .get_vv(dep)
400 .ok_or(LoroError::FrontiersNotFound(dep))?;
401 max_last_counter = max_last_counter.max(dep_vv.get(&peer).cloned().unwrap_or(0) - 1);
402 }
403
404 if counter != max_last_counter + 1 {
405 return Err(LoroError::ConcurrentOpsWithSamePeerID {
406 peer,
407 last_counter: max_last_counter,
408 current: counter,
409 });
410 }
411
412 Ok(())
413 }
414
415 pub(crate) fn next_id(&self, peer: PeerID) -> ID {
416 let cnt = self.dag.vv().get(&peer).copied().unwrap_or(0);
417 ID::new(peer, cnt)
418 }
419
420 pub(crate) fn vv(&self) -> &VersionVector {
421 self.dag.vv()
422 }
423
424 pub(crate) fn frontiers(&self) -> &Frontiers {
425 self.dag.frontiers()
426 }
427
428 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
432 self.dag.cmp_with_frontiers(other)
433 }
434
435 #[inline]
439 pub fn cmp_frontiers(
440 &self,
441 a: &Frontiers,
442 b: &Frontiers,
443 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
444 self.dag.cmp_frontiers(a, b)
445 }
446
447 pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
448 self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
449 }
450
451 pub(crate) fn get_lamport_at(&self, id: ID) -> Option<Lamport> {
452 self.get_change_at(id)
453 .map(|c| c.lamport + (id.counter - c.id.counter) as Lamport)
454 }
455
456 pub(crate) fn iter_ops(&self, id_span: IdSpan) -> impl Iterator<Item = RichOp<'static>> + '_ {
457 let change_iter = self.change_store.iter_changes(id_span);
458 change_iter.flat_map(move |c| RichOp::new_iter_by_cnt_range(c, id_span.counter))
459 }
460
461 pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
462 self.get_change_at(id)
463 .map(|c| {
464 let change_counter = c.id.counter as u32;
465 c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
466 })
467 .unwrap_or(Lamport::MAX)
468 }
469
470 pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
471 self.change_store.get_change(id)
472 }
473
474 pub(crate) fn set_uncommitted_change(&mut self, change: Change) {
475 self.uncommitted_change = Some(change);
476 }
477
478 pub(crate) fn get_uncommitted_change_in_span(
479 &self,
480 id_span: IdSpan,
481 ) -> Option<Cow<'_, Change>> {
482 self.uncommitted_change.as_ref().and_then(|c| {
483 if c.id_span() == id_span {
484 Some(Cow::Borrowed(c))
485 } else if let Some((start, end)) = id_span.get_slice_range_on(&c.id_span()) {
486 Some(Cow::Owned(c.slice(start, end)))
487 } else {
488 None
489 }
490 })
491 }
492
493 pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
494 self.get_change_at(id).map(|c| {
495 if c.id.counter == id.counter {
496 c.deps.clone()
497 } else {
498 Frontiers::from_id(id.inc(-1))
499 }
500 })
501 }
502
503 pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp<'static>>> {
504 let change = self.get_change_at(id)?;
505 Some(convert_change_to_remote(&self.arena, &change))
506 }
507
508 pub(crate) fn import_unknown_lamport_pending_changes(
509 &mut self,
510 remote_changes: Vec<Change>,
511 ) -> Result<(), LoroError> {
512 self.extend_pending_changes_with_unknown_lamport(remote_changes)
513 }
514
515 pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
519 self.change_store.get_change(id)
520 }
521
522 #[inline(always)]
523 pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
524 self.change_store
525 .export_from(vv, f, self.vv(), self.frontiers())
526 }
527
528 #[inline(always)]
529 pub(crate) fn export_change_store_in_range(
530 &self,
531 vv: &VersionVector,
532 f: &Frontiers,
533 to_vv: &VersionVector,
534 to_frontiers: &Frontiers,
535 ) -> Bytes {
536 self.change_store.export_from(vv, f, to_vv, to_frontiers)
537 }
538
539 #[inline(always)]
540 pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
541 self.change_store
542 .export_blocks_from(vv, self.shallow_since_vv(), self.vv(), w)
543 }
544
545 #[inline(always)]
546 pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
547 self.change_store.export_blocks_in_range(spans, w)
548 }
549
550 pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
551 let vv = self.dag.frontiers_to_vv(frontiers)?;
552 Some(
553 self.change_store
554 .fork_changes_up_to(self.dag.shallow_since_vv(), frontiers, &vv),
555 )
556 }
557
558 #[inline(always)]
559 pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<ImportStatus, LoroError> {
560 decode_oplog(self, data)
561 }
562
563 #[allow(clippy::type_complexity)]
574 pub(crate) fn iter_from_lca_causally(
575 &self,
576 from: &VersionVector,
577 from_frontiers: &Frontiers,
578 to: &VersionVector,
579 to_frontiers: &Frontiers,
580 ) -> (
581 VersionVector,
582 DiffMode,
583 impl Iterator<
584 Item = (
585 BlockChangeRef,
586 (Counter, Counter),
587 Rc<RefCell<VersionVector>>,
588 ),
589 > + '_,
590 ) {
591 let mut merged_vv = from.clone();
592 merged_vv.merge(to);
593 loro_common::debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
594 let (common_ancestors, mut diff_mode) =
595 self.dag.find_common_ancestor(from_frontiers, to_frontiers);
596 if diff_mode == DiffMode::Checkout && to > from {
597 diff_mode = DiffMode::Import;
598 }
599
600 let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
601 let diff = common_ancestors_vv.diff(&merged_vv).forward;
603 let mut iter = self.dag.iter_causal(common_ancestors, diff);
604 let mut node = iter.next();
605 let mut cur_cnt = 0;
606 let vv = Rc::new(RefCell::new(VersionVector::default()));
607 (
608 common_ancestors_vv.clone(),
609 diff_mode,
610 std::iter::from_fn(move || {
611 if let Some(inner) = &node {
612 let mut inner_vv = vv.borrow_mut();
613 inner_vv.clear();
615 self.dag.ensure_vv_for(&inner.data);
616 inner_vv.extend_to_include_vv(inner.data.vv.get().unwrap().iter());
617 let peer = inner.data.peer;
618 let cnt = inner
619 .data
620 .cnt
621 .max(cur_cnt)
622 .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
623 let dag_node_end = (inner.data.cnt + inner.data.len as Counter)
624 .min(merged_vv.get(&peer).copied().unwrap_or(0));
625 let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
626
627 if change.ctr_end() < dag_node_end {
628 cur_cnt = change.ctr_end();
629 } else {
630 node = iter.next();
631 cur_cnt = 0;
632 }
633
634 inner_vv.extend_to_include_end_id(change.id);
635
636 Some((change, (cnt, dag_node_end), vv.clone()))
637 } else {
638 None
639 }
640 }),
641 )
642 }
643
644 pub fn len_changes(&self) -> usize {
645 self.change_store.change_num()
646 }
647
648 pub fn diagnose_size(&self) -> SizeInfo {
649 let mut total_changes = 0;
650 let mut total_ops = 0;
651 let mut total_atom_ops = 0;
652 let total_dag_node = self.dag.total_parsed_dag_node();
653 self.change_store.visit_all_changes(&mut |change| {
654 total_changes += 1;
655 total_ops += change.ops.len();
656 total_atom_ops += change.atom_len();
657 });
658
659 println!("total changes: {}", total_changes);
660 println!("total ops: {}", total_ops);
661 println!("total atom ops: {}", total_atom_ops);
662 println!("total dag node: {}", total_dag_node);
663 SizeInfo {
664 total_changes,
665 total_ops,
666 total_atom_ops,
667 total_dag_node,
668 }
669 }
670
671 pub(crate) fn iter_changes_peer_by_peer<'a>(
672 &'a self,
673 from: &VersionVector,
674 to: &VersionVector,
675 ) -> impl Iterator<Item = BlockChangeRef> + 'a {
676 let spans: Vec<_> = from.diff_iter(to).1.collect();
677 spans
678 .into_iter()
679 .flat_map(move |span| self.change_store.iter_changes(span))
680 }
681
682 pub(crate) fn iter_changes_causally_rev<'a>(
683 &'a self,
684 from: &VersionVector,
685 to: &VersionVector,
686 ) -> impl Iterator<Item = BlockChangeRef> + 'a {
687 MergedChangeIter::new_change_iter_rev(self, from, to)
688 }
689
690 pub fn get_timestamp_for_next_txn(&self) -> Timestamp {
691 if self.configure.record_timestamp() {
692 get_timestamp_now_txn()
693 } else {
694 0
695 }
696 }
697
698 #[inline(never)]
699 pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
700 let change = self.change_store.get_change_by_lamport_lte(id)?;
701
702 if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
703 return None;
704 }
705
706 Some(ID::new(
707 change.id.peer,
708 (id.lamport - change.lamport) as Counter + change.id.counter,
709 ))
710 }
711
712 #[allow(unused)]
713 pub(crate) fn id_to_idlp(&self, id_start: ID) -> IdLp {
714 let change = self.get_change_at(id_start).unwrap();
715 let lamport = change.lamport + (id_start.counter - change.id.counter) as Lamport;
716 let peer = id_start.peer;
717 loro_common::IdLp { peer, lamport }
718 }
719
720 pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
722 let change = self.get_change_at(id)?;
723 change.get_op_with_counter(id.counter)
724 }
725
726 pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {
727 let peer = id_span.peer;
728 let mut counter = id_span.counter.min();
729 let span_end = id_span.counter.norm_end();
730 let mut ans = Vec::new();
731
732 while counter < span_end {
733 let id = ID::new(peer, counter);
734 let node = self.dag.get(id).unwrap();
735
736 let f = if node.cnt == counter {
737 node.deps.clone()
738 } else if counter > 0 {
739 id.inc(-1).into()
740 } else {
741 unreachable!()
742 };
743
744 let cur_end = node.cnt + node.len as Counter;
745 let len = cur_end.min(span_end) - counter;
746 ans.push((id.to_span(len as usize), f));
747 counter += len;
748 }
749
750 ans
751 }
752
753 #[inline]
754 pub fn compact_change_store(&mut self) {
755 self.change_store
756 .flush_and_compact(self.dag.vv(), self.dag.frontiers());
757 }
758
759 #[inline]
760 pub fn change_store_kv_size(&self) -> usize {
761 self.change_store.kv_size()
762 }
763
764 pub fn encode_change_store(&self) -> bytes::Bytes {
765 self.change_store
766 .encode_all(self.dag.vv(), self.dag.frontiers())
767 }
768
769 pub fn check_dag_correctness(&self) {
770 self.dag.check_dag_correctness();
771 }
772
773 pub fn shallow_since_vv(&self) -> &ImVersionVector {
774 self.dag.shallow_since_vv()
775 }
776
777 pub fn shallow_since_frontiers(&self) -> &Frontiers {
778 self.dag.shallow_since_frontiers()
779 }
780
781 pub fn is_shallow(&self) -> bool {
782 !self.dag.shallow_since_vv().is_empty()
783 }
784
785 pub fn get_greatest_timestamp(&self, frontiers: &Frontiers) -> Timestamp {
786 let mut max_timestamp = Timestamp::default();
787 for id in frontiers.iter() {
788 let change = self.get_change_at(id).unwrap();
789 if change.timestamp > max_timestamp {
790 max_timestamp = change.timestamp;
791 }
792 }
793
794 max_timestamp
795 }
796}
797
798#[derive(Debug)]
799pub struct SizeInfo {
800 pub total_changes: usize,
801 pub total_ops: usize,
802 pub total_atom_ops: usize,
803 pub total_dag_node: usize,
804}
805
806pub(crate) fn convert_change_to_remote(
807 arena: &SharedArena,
808 change: &Change,
809) -> Change<RemoteOp<'static>> {
810 let mut ops = RleVec::new();
811 for op in change.ops.iter() {
812 for op in local_op_to_remote(arena, op) {
813 ops.push(op);
814 }
815 }
816
817 Change {
818 ops,
819 id: change.id,
820 deps: change.deps.clone(),
821 lamport: change.lamport,
822 timestamp: change.timestamp,
823 commit_msg: change.commit_msg.clone(),
824 }
825}
826
827pub(crate) fn local_op_to_remote(
828 arena: &SharedArena,
829 op: &crate::op::Op,
830) -> SmallVec<[RemoteOp<'static>; 1]> {
831 let container = arena.get_container_id(op.container).unwrap();
832 let mut contents: SmallVec<[_; 1]> = SmallVec::new();
833 match &op.content {
834 crate::op::InnerContent::List(list) => match list {
835 list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
836 loro_common::ContainerType::Text => {
837 let str = arena
838 .slice_str_by_unicode_range(slice.0.start as usize..slice.0.end as usize);
839 contents.push(RawOpContent::List(list_op::ListOp::Insert {
840 slice: ListSlice::RawStr {
841 unicode_len: str.chars().count(),
842 str: Cow::Owned(str),
843 },
844 pos: *pos,
845 }));
846 }
847 loro_common::ContainerType::List | loro_common::ContainerType::MovableList => {
848 contents.push(RawOpContent::List(list_op::ListOp::Insert {
849 slice: ListSlice::RawData(Cow::Owned(
850 arena.get_values(slice.0.start as usize..slice.0.end as usize),
851 )),
852 pos: *pos,
853 }))
854 }
855 _ => unreachable!(),
856 },
857 list_op::InnerListOp::InsertText {
858 slice,
859 unicode_len: len,
860 unicode_start: _,
861 pos,
862 } => match container.container_type() {
863 loro_common::ContainerType::Text => {
864 contents.push(RawOpContent::List(list_op::ListOp::Insert {
865 slice: ListSlice::RawStr {
866 unicode_len: *len as usize,
867 str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
868 },
869 pos: *pos as usize,
870 }));
871 }
872 _ => unreachable!(),
873 },
874 list_op::InnerListOp::Delete(del) => {
875 contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
876 }
877 list_op::InnerListOp::StyleStart {
878 start,
879 end,
880 key,
881 value,
882 info,
883 } => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
884 start: *start,
885 end: *end,
886 key: key.clone(),
887 value: value.clone(),
888 info: *info,
889 })),
890 list_op::InnerListOp::StyleEnd => {
891 contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
892 }
893 list_op::InnerListOp::Move {
894 from,
895 elem_id: from_id,
896 to,
897 } => contents.push(RawOpContent::List(list_op::ListOp::Move {
898 from: *from,
899 elem_id: *from_id,
900 to: *to,
901 })),
902 list_op::InnerListOp::Set { elem_id, value } => {
903 contents.push(RawOpContent::List(list_op::ListOp::Set {
904 elem_id: *elem_id,
905 value: value.clone(),
906 }))
907 }
908 },
909 crate::op::InnerContent::Map(map) => {
910 let value = map.value.clone();
911 contents.push(RawOpContent::Map(crate::container::map::MapSet {
912 key: map.key.clone(),
913 value,
914 }))
915 }
916 crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(tree.clone())),
917 crate::op::InnerContent::Future(f) => match f {
918 #[cfg(feature = "counter")]
919 crate::op::FutureInnerContent::Counter(c) => contents.push(RawOpContent::Counter(*c)),
920 FutureInnerContent::Unknown { prop, value } => {
921 contents.push(crate::op::RawOpContent::Unknown {
922 prop: *prop,
923 value: (**value).clone(),
924 })
925 }
926 },
927 };
928
929 let mut ans = SmallVec::with_capacity(contents.len());
930 for content in contents {
931 ans.push(RemoteOp {
932 container: container.clone(),
933 content,
934 counter: op.counter,
935 })
936 }
937 ans
938}
939
940pub(crate) fn get_timestamp_now_txn() -> Timestamp {
941 (get_sys_timestamp() as Timestamp + 500) / 1000
942}