1mod change_store;
2pub(crate) mod loro_dag;
3mod pending_changes;
4
5use bytes::Bytes;
6use std::borrow::Cow;
7use std::cell::RefCell;
8use std::cmp::Ordering;
9use std::rc::Rc;
10use std::sync::Mutex;
11use tracing::{debug, trace, trace_span};
12
13use self::change_store::iter::MergedChangeIter;
14use self::pending_changes::PendingChanges;
15use super::arena::SharedArena;
16use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
17use crate::configure::Configure;
18use crate::container::list::list_op;
19use crate::dag::{Dag, DagUtils};
20use crate::diff_calc::DiffMode;
21use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
22use crate::encoding::{ImportStatus, ParsedHeaderAndBody};
23use crate::history_cache::ContainerHistoryCache;
24use crate::id::{Counter, PeerID, ID};
25use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
26use crate::span::{HasCounterSpan, HasLamportSpan};
27use crate::version::{Frontiers, ImVersionVector, VersionVector};
28use crate::LoroError;
29use change_store::BlockOpRef;
30use loro_common::{IdLp, IdSpan};
31use rle::{HasLength, RleVec, Sliceable};
32use smallvec::SmallVec;
33
34pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
35pub use change_store::{BlockChangeRef, ChangeStore};
36
37pub struct OpLog {
45 pub(crate) dag: AppDag,
46 pub(crate) arena: SharedArena,
47 change_store: ChangeStore,
48 history_cache: Mutex<ContainerHistoryCache>,
49 pub(crate) pending_changes: PendingChanges,
53 pub(crate) batch_importing: bool,
56 pub(crate) configure: Configure,
57}
58
59impl std::fmt::Debug for OpLog {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("OpLog")
62 .field("dag", &self.dag)
63 .field("pending_changes", &self.pending_changes)
64 .finish()
65 }
66}
67
68impl OpLog {
69 #[inline]
70 pub(crate) fn new() -> Self {
71 let arena = SharedArena::new();
72 let cfg = Configure::default();
73 let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
74 Self {
75 history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
76 dag: AppDag::new(change_store.clone()),
77 change_store,
78 arena,
79 pending_changes: Default::default(),
80 batch_importing: false,
81 configure: cfg,
82 }
83 }
84
85 #[inline]
86 pub fn dag(&self) -> &AppDag {
87 &self.dag
88 }
89
90 pub fn change_store(&self) -> &ChangeStore {
91 &self.change_store
92 }
93
94 pub fn get_change_with_lamport_lte(
98 &self,
99 peer: PeerID,
100 lamport: Lamport,
101 ) -> Option<BlockChangeRef> {
102 let ans = self
103 .change_store
104 .get_change_by_lamport_lte(IdLp::new(peer, lamport))?;
105 debug_assert!(ans.lamport <= lamport);
106 Some(ans)
107 }
108
109 pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
110 let mut timestamp = Timestamp::default();
111 for id in f.iter() {
112 if let Some(change) = self.lookup_change(id) {
113 timestamp = timestamp.max(change.timestamp);
114 }
115 }
116
117 timestamp
118 }
119
120 #[inline]
121 pub fn is_empty(&self) -> bool {
122 self.dag.is_empty() && self.arena.can_import_snapshot()
123 }
124
125 pub(crate) fn insert_new_change(&mut self, change: Change, from_local: bool) {
127 let s = trace_span!(
128 "insert_new_change",
129 id = ?change.id,
130 lamport = change.lamport,
131 deps = ?change.deps
132 );
133 let _enter = s.enter();
134 self.dag.handle_new_change(&change, from_local);
135 self.history_cache
136 .try_lock()
137 .unwrap()
138 .insert_by_new_change(&change, true, true);
139 self.register_container_and_parent_link(&change);
140 self.change_store.insert_change(change, true, from_local);
141 }
142
143 #[inline(always)]
144 pub(crate) fn with_history_cache<F, R>(&self, f: F) -> R
145 where
146 F: FnOnce(&mut ContainerHistoryCache) -> R,
147 {
148 let mut history_cache = self.history_cache.try_lock().unwrap();
149 f(&mut history_cache)
150 }
151
152 pub fn has_history_cache(&self) -> bool {
153 self.history_cache.try_lock().unwrap().has_cache()
154 }
155
156 pub fn free_history_cache(&self) {
157 let mut history_cache = self.history_cache.try_lock().unwrap();
158 history_cache.free();
159 }
160
161 pub(crate) fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
172 self.insert_new_change(change, true);
173 Ok(())
174 }
175
176 pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
178 let Some(&end) = self.dag.vv().get(&change.id.peer) else {
179 return Some(change);
180 };
181
182 if change.id.counter >= end {
183 return Some(change);
184 }
185
186 if change.ctr_end() <= end {
187 return None;
188 }
189
190 let offset = (end - change.id.counter) as usize;
191 Some(change.slice(offset, change.atom_len()))
192 }
193
194 #[allow(unused)]
195 fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
196 let cur_end = self.dag.vv().get(&id.peer).cloned().unwrap_or(0);
197 if cur_end > id.counter {
198 return Err(LoroError::UsedOpID { id });
199 }
200
201 Ok(())
202 }
203
204 pub(crate) fn check_change_greater_than_last_peer_id(
209 &self,
210 peer: PeerID,
211 counter: Counter,
212 deps: &Frontiers,
213 ) -> Result<(), LoroError> {
214 if counter == 0 {
215 return Ok(());
216 }
217
218 if !self.configure.detached_editing() {
219 return Ok(());
220 }
221
222 let mut max_last_counter = -1;
223 for dep in deps.iter() {
224 let dep_vv = self.dag.get_vv(dep).unwrap();
225 max_last_counter = max_last_counter.max(dep_vv.get(&peer).cloned().unwrap_or(0) - 1);
226 }
227
228 if counter != max_last_counter + 1 {
229 return Err(LoroError::ConcurrentOpsWithSamePeerID {
230 peer,
231 last_counter: max_last_counter,
232 current: counter,
233 });
234 }
235
236 Ok(())
237 }
238
239 pub(crate) fn next_id(&self, peer: PeerID) -> ID {
240 let cnt = self.dag.vv().get(&peer).copied().unwrap_or(0);
241 ID::new(peer, cnt)
242 }
243
244 pub(crate) fn vv(&self) -> &VersionVector {
245 self.dag.vv()
246 }
247
248 pub(crate) fn frontiers(&self) -> &Frontiers {
249 self.dag.frontiers()
250 }
251
252 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
256 self.dag.cmp_with_frontiers(other)
257 }
258
259 #[inline]
263 pub fn cmp_frontiers(
264 &self,
265 a: &Frontiers,
266 b: &Frontiers,
267 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
268 self.dag.cmp_frontiers(a, b)
269 }
270
271 pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
272 self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
273 }
274
275 pub(crate) fn get_lamport_at(&self, id: ID) -> Option<Lamport> {
276 self.get_change_at(id)
277 .map(|c| c.lamport + (id.counter - c.id.counter) as Lamport)
278 }
279
280 pub(crate) fn iter_ops(&self, id_span: IdSpan) -> impl Iterator<Item = RichOp<'static>> + '_ {
281 let change_iter = self.change_store.iter_changes(id_span);
282 change_iter.flat_map(move |c| RichOp::new_iter_by_cnt_range(c, id_span.counter))
283 }
284
285 pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
286 self.get_change_at(id)
287 .map(|c| {
288 let change_counter = c.id.counter as u32;
289 c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
290 })
291 .unwrap_or(Lamport::MAX)
292 }
293
294 pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
295 self.change_store.get_change(id)
296 }
297
298 pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
299 self.get_change_at(id).map(|c| {
300 if c.id.counter == id.counter {
301 c.deps.clone()
302 } else {
303 Frontiers::from_id(id.inc(-1))
304 }
305 })
306 }
307
308 pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp<'static>>> {
309 let change = self.get_change_at(id)?;
310 Some(convert_change_to_remote(&self.arena, &change))
311 }
312
313 pub(crate) fn import_unknown_lamport_pending_changes(
314 &mut self,
315 remote_changes: Vec<Change>,
316 ) -> Result<(), LoroError> {
317 self.extend_pending_changes_with_unknown_lamport(remote_changes)
318 }
319
320 pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
324 self.change_store.get_change(id)
325 }
326
327 #[inline(always)]
328 pub(crate) fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
329 encode_oplog(self, vv, EncodeMode::Auto)
330 }
331
332 #[inline(always)]
333 pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
334 self.change_store
335 .export_from(vv, f, self.vv(), self.frontiers())
336 }
337
338 #[inline(always)]
339 pub(crate) fn export_change_store_in_range(
340 &self,
341 vv: &VersionVector,
342 f: &Frontiers,
343 to_vv: &VersionVector,
344 to_frontiers: &Frontiers,
345 ) -> Bytes {
346 self.change_store.export_from(vv, f, to_vv, to_frontiers)
347 }
348
349 #[inline(always)]
350 pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
351 self.change_store
352 .export_blocks_from(vv, self.shallow_since_vv(), self.vv(), w)
353 }
354
355 #[inline(always)]
356 pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
357 self.change_store.export_blocks_in_range(spans, w)
358 }
359
360 pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
361 let vv = self.dag.frontiers_to_vv(frontiers)?;
362 Some(
363 self.change_store
364 .fork_changes_up_to(self.dag.shallow_since_vv(), frontiers, &vv),
365 )
366 }
367
368 #[inline(always)]
369 pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<ImportStatus, LoroError> {
370 decode_oplog(self, data)
371 }
372
373 #[allow(clippy::type_complexity)]
384 pub(crate) fn iter_from_lca_causally(
385 &self,
386 from: &VersionVector,
387 from_frontiers: &Frontiers,
388 to: &VersionVector,
389 to_frontiers: &Frontiers,
390 ) -> (
391 VersionVector,
392 DiffMode,
393 impl Iterator<
394 Item = (
395 BlockChangeRef,
396 (Counter, Counter),
397 Rc<RefCell<VersionVector>>,
398 ),
399 > + '_,
400 ) {
401 let mut merged_vv = from.clone();
402 merged_vv.merge(to);
403 debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
404 let (common_ancestors, mut diff_mode) =
405 self.dag.find_common_ancestor(from_frontiers, to_frontiers);
406 if diff_mode == DiffMode::Checkout && to > from {
407 diff_mode = DiffMode::Import;
408 }
409
410 let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
411 let diff = common_ancestors_vv.diff(&merged_vv).forward;
413 let mut iter = self.dag.iter_causal(common_ancestors, diff);
414 let mut node = iter.next();
415 let mut cur_cnt = 0;
416 let vv = Rc::new(RefCell::new(VersionVector::default()));
417 (
418 common_ancestors_vv.clone(),
419 diff_mode,
420 std::iter::from_fn(move || {
421 if let Some(inner) = &node {
422 let mut inner_vv = vv.borrow_mut();
423 inner_vv.clear();
425 self.dag.ensure_vv_for(&inner.data);
426 inner_vv.extend_to_include_vv(inner.data.vv.get().unwrap().iter());
427 let peer = inner.data.peer;
428 let cnt = inner
429 .data
430 .cnt
431 .max(cur_cnt)
432 .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
433 let dag_node_end = (inner.data.cnt + inner.data.len as Counter)
434 .min(merged_vv.get(&peer).copied().unwrap_or(0));
435 trace!("vv: {:?}", self.dag.vv());
436 let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
437
438 if change.ctr_end() < dag_node_end {
439 cur_cnt = change.ctr_end();
440 } else {
441 node = iter.next();
442 cur_cnt = 0;
443 }
444
445 inner_vv.extend_to_include_end_id(change.id);
446
447 Some((change, (cnt, dag_node_end), vv.clone()))
448 } else {
449 None
450 }
451 }),
452 )
453 }
454
455 pub fn len_changes(&self) -> usize {
456 self.change_store.change_num()
457 }
458
459 pub fn diagnose_size(&self) -> SizeInfo {
460 let mut total_changes = 0;
461 let mut total_ops = 0;
462 let mut total_atom_ops = 0;
463 let total_dag_node = self.dag.total_parsed_dag_node();
464 self.change_store.visit_all_changes(&mut |change| {
465 total_changes += 1;
466 total_ops += change.ops.len();
467 total_atom_ops += change.atom_len();
468 });
469
470 println!("total changes: {}", total_changes);
471 println!("total ops: {}", total_ops);
472 println!("total atom ops: {}", total_atom_ops);
473 println!("total dag node: {}", total_dag_node);
474 SizeInfo {
475 total_changes,
476 total_ops,
477 total_atom_ops,
478 total_dag_node,
479 }
480 }
481
482 pub(crate) fn iter_changes_peer_by_peer<'a>(
483 &'a self,
484 from: &VersionVector,
485 to: &VersionVector,
486 ) -> impl Iterator<Item = BlockChangeRef> + 'a {
487 let spans: Vec<_> = from.diff_iter(to).1.collect();
488 spans
489 .into_iter()
490 .flat_map(move |span| self.change_store.iter_changes(span))
491 }
492
493 pub(crate) fn iter_changes_causally_rev<'a>(
494 &'a self,
495 from: &VersionVector,
496 to: &VersionVector,
497 ) -> impl Iterator<Item = BlockChangeRef> + 'a {
498 MergedChangeIter::new_change_iter_rev(self, from, to)
499 }
500
501 pub fn get_timestamp_for_next_txn(&self) -> Timestamp {
502 if self.configure.record_timestamp() {
503 (get_sys_timestamp() as Timestamp + 500) / 1000
504 } else {
505 0
506 }
507 }
508
509 #[inline(never)]
510 pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
511 let change = self.change_store.get_change_by_lamport_lte(id)?;
512
513 if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
514 return None;
515 }
516
517 Some(ID::new(
518 change.id.peer,
519 (id.lamport - change.lamport) as Counter + change.id.counter,
520 ))
521 }
522
523 #[allow(unused)]
524 pub(crate) fn id_to_idlp(&self, id_start: ID) -> IdLp {
525 let change = self.get_change_at(id_start).unwrap();
526 let lamport = change.lamport + (id_start.counter - change.id.counter) as Lamport;
527 let peer = id_start.peer;
528 loro_common::IdLp { peer, lamport }
529 }
530
531 pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
533 let change = self.get_change_at(id)?;
534 change.get_op_with_counter(id.counter)
535 }
536
537 pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {
538 let peer = id_span.peer;
539 let mut counter = id_span.counter.min();
540 let span_end = id_span.counter.norm_end();
541 let mut ans = Vec::new();
542
543 while counter < span_end {
544 let id = ID::new(peer, counter);
545 let node = self.dag.get(id).unwrap();
546
547 let f = if node.cnt == counter {
548 node.deps.clone()
549 } else if counter > 0 {
550 id.inc(-1).into()
551 } else {
552 unreachable!()
553 };
554
555 let cur_end = node.cnt + node.len as Counter;
556 let len = cur_end.min(span_end) - counter;
557 ans.push((id.to_span(len as usize), f));
558 counter += len;
559 }
560
561 ans
562 }
563
564 #[inline]
565 pub fn compact_change_store(&mut self) {
566 self.change_store
567 .flush_and_compact(self.dag.vv(), self.dag.frontiers());
568 }
569
570 #[inline]
571 pub fn change_store_kv_size(&self) -> usize {
572 self.change_store.kv_size()
573 }
574
575 pub fn encode_change_store(&self) -> bytes::Bytes {
576 self.change_store
577 .encode_all(self.dag.vv(), self.dag.frontiers())
578 }
579
580 pub fn check_dag_correctness(&self) {
581 self.dag.check_dag_correctness();
582 }
583
584 pub fn shallow_since_vv(&self) -> &ImVersionVector {
585 self.dag.shallow_since_vv()
586 }
587
588 pub fn shallow_since_frontiers(&self) -> &Frontiers {
589 self.dag.shallow_since_frontiers()
590 }
591
592 pub fn is_shallow(&self) -> bool {
593 !self.dag.shallow_since_vv().is_empty()
594 }
595
596 pub fn get_greatest_timestamp(&self, frontiers: &Frontiers) -> Timestamp {
597 let mut max_timestamp = Timestamp::default();
598 for id in frontiers.iter() {
599 let change = self.get_change_at(id).unwrap();
600 if change.timestamp > max_timestamp {
601 max_timestamp = change.timestamp;
602 }
603 }
604
605 max_timestamp
606 }
607}
608
609#[derive(Debug)]
610pub struct SizeInfo {
611 pub total_changes: usize,
612 pub total_ops: usize,
613 pub total_atom_ops: usize,
614 pub total_dag_node: usize,
615}
616
617pub(crate) fn convert_change_to_remote(
618 arena: &SharedArena,
619 change: &Change,
620) -> Change<RemoteOp<'static>> {
621 let mut ops = RleVec::new();
622 for op in change.ops.iter() {
623 for op in local_op_to_remote(arena, op) {
624 ops.push(op);
625 }
626 }
627
628 Change {
629 ops,
630 id: change.id,
631 deps: change.deps.clone(),
632 lamport: change.lamport,
633 timestamp: change.timestamp,
634 commit_msg: change.commit_msg.clone(),
635 }
636}
637
638pub(crate) fn local_op_to_remote(
639 arena: &SharedArena,
640 op: &crate::op::Op,
641) -> SmallVec<[RemoteOp<'static>; 1]> {
642 let container = arena.get_container_id(op.container).unwrap();
643 let mut contents: SmallVec<[_; 1]> = SmallVec::new();
644 match &op.content {
645 crate::op::InnerContent::List(list) => match list {
646 list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
647 loro_common::ContainerType::Text => {
648 let str = arena
649 .slice_str_by_unicode_range(slice.0.start as usize..slice.0.end as usize);
650 contents.push(RawOpContent::List(list_op::ListOp::Insert {
651 slice: ListSlice::RawStr {
652 unicode_len: str.chars().count(),
653 str: Cow::Owned(str),
654 },
655 pos: *pos,
656 }));
657 }
658 loro_common::ContainerType::List | loro_common::ContainerType::MovableList => {
659 contents.push(RawOpContent::List(list_op::ListOp::Insert {
660 slice: ListSlice::RawData(Cow::Owned(
661 arena.get_values(slice.0.start as usize..slice.0.end as usize),
662 )),
663 pos: *pos,
664 }))
665 }
666 _ => unreachable!(),
667 },
668 list_op::InnerListOp::InsertText {
669 slice,
670 unicode_len: len,
671 unicode_start: _,
672 pos,
673 } => match container.container_type() {
674 loro_common::ContainerType::Text => {
675 contents.push(RawOpContent::List(list_op::ListOp::Insert {
676 slice: ListSlice::RawStr {
677 unicode_len: *len as usize,
678 str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
679 },
680 pos: *pos as usize,
681 }));
682 }
683 _ => unreachable!(),
684 },
685 list_op::InnerListOp::Delete(del) => {
686 contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
687 }
688 list_op::InnerListOp::StyleStart {
689 start,
690 end,
691 key,
692 value,
693 info,
694 } => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
695 start: *start,
696 end: *end,
697 key: key.clone(),
698 value: value.clone(),
699 info: *info,
700 })),
701 list_op::InnerListOp::StyleEnd => {
702 contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
703 }
704 list_op::InnerListOp::Move {
705 from,
706 elem_id: from_id,
707 to,
708 } => contents.push(RawOpContent::List(list_op::ListOp::Move {
709 from: *from,
710 elem_id: *from_id,
711 to: *to,
712 })),
713 list_op::InnerListOp::Set { elem_id, value } => {
714 contents.push(RawOpContent::List(list_op::ListOp::Set {
715 elem_id: *elem_id,
716 value: value.clone(),
717 }))
718 }
719 },
720 crate::op::InnerContent::Map(map) => {
721 let value = map.value.clone();
722 contents.push(RawOpContent::Map(crate::container::map::MapSet {
723 key: map.key.clone(),
724 value,
725 }))
726 }
727 crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(tree.clone())),
728 crate::op::InnerContent::Future(f) => match f {
729 #[cfg(feature = "counter")]
730 crate::op::FutureInnerContent::Counter(c) => contents.push(RawOpContent::Counter(*c)),
731 FutureInnerContent::Unknown { prop, value } => {
732 contents.push(crate::op::RawOpContent::Unknown {
733 prop: *prop,
734 value: (**value).clone(),
735 })
736 }
737 },
738 };
739
740 let mut ans = SmallVec::with_capacity(contents.len());
741 for content in contents {
742 ans.push(RemoteOp {
743 container: container.clone(),
744 content,
745 counter: op.counter,
746 })
747 }
748 ans
749}