1mod change_store;
2pub(crate) mod loro_dag;
3mod pending_changes;
4
5use crate::sync::Mutex;
6use bytes::Bytes;
7use std::borrow::Cow;
8use std::cell::RefCell;
9use std::cmp::Ordering;
10use std::rc::Rc;
11use tracing::trace_span;
12
13use self::change_store::iter::MergedChangeIter;
14use self::pending_changes::PendingChanges;
15use super::arena::SharedArena;
16use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
17use crate::configure::Configure;
18use crate::container::list::list_op;
19use crate::dag::{Dag, DagUtils};
20use crate::diff_calc::DiffMode;
21use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
22use crate::encoding::{ImportStatus, ParsedHeaderAndBody};
23use crate::history_cache::ContainerHistoryCache;
24use crate::id::{Counter, PeerID, ID};
25use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
26use crate::span::{HasCounterSpan, HasLamportSpan};
27use crate::version::{Frontiers, ImVersionVector, VersionVector};
28use crate::LoroError;
29use change_store::BlockOpRef;
30use loro_common::{HasIdSpan, IdLp, IdSpan};
31use rle::{HasLength, RleVec, Sliceable};
32use smallvec::SmallVec;
33
34pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
35pub use change_store::{BlockChangeRef, ChangeStore};
36
37pub struct OpLog {
45 pub(crate) dag: AppDag,
46 pub(crate) arena: SharedArena,
47 change_store: ChangeStore,
48 history_cache: Mutex<ContainerHistoryCache>,
49 pub(crate) pending_changes: PendingChanges,
53 pub(crate) batch_importing: bool,
56 pub(crate) configure: Configure,
57 pub(crate) uncommitted_change: Option<Change>,
60}
61
62impl std::fmt::Debug for OpLog {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("OpLog")
65 .field("dag", &self.dag)
66 .field("pending_changes", &self.pending_changes)
67 .finish()
68 }
69}
70
71impl OpLog {
72 #[inline]
73 pub(crate) fn new() -> Self {
74 let arena = SharedArena::new();
75 let cfg = Configure::default();
76 let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
77 Self {
78 history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
79 dag: AppDag::new(change_store.clone()),
80 change_store,
81 arena,
82 pending_changes: Default::default(),
83 batch_importing: false,
84 configure: cfg,
85 uncommitted_change: None,
86 }
87 }
88
89 #[inline]
90 pub fn dag(&self) -> &AppDag {
91 &self.dag
92 }
93
94 pub fn change_store(&self) -> &ChangeStore {
95 &self.change_store
96 }
97
98 pub fn get_change_with_lamport_lte(
102 &self,
103 peer: PeerID,
104 lamport: Lamport,
105 ) -> Option<BlockChangeRef> {
106 let ans = self
107 .change_store
108 .get_change_by_lamport_lte(IdLp::new(peer, lamport))?;
109 debug_assert!(ans.lamport <= lamport);
110 Some(ans)
111 }
112
113 pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
114 let mut timestamp = Timestamp::default();
115 for id in f.iter() {
116 if let Some(change) = self.lookup_change(id) {
117 timestamp = timestamp.max(change.timestamp);
118 }
119 }
120
121 timestamp
122 }
123
124 #[inline]
125 pub fn is_empty(&self) -> bool {
126 self.dag.is_empty() && self.arena.can_import_snapshot()
127 }
128
129 pub(crate) fn insert_new_change(&mut self, change: Change, from_local: bool) {
131 let s = trace_span!(
132 "insert_new_change",
133 id = ?change.id,
134 lamport = change.lamport,
135 deps = ?change.deps
136 );
137 let _enter = s.enter();
138 self.dag.handle_new_change(&change, from_local);
139 self.history_cache
140 .lock()
141 .unwrap()
142 .insert_by_new_change(&change, true, true);
143 self.register_container_and_parent_link(&change);
144 self.change_store.insert_change(change, true, from_local);
145 }
146
147 #[inline(always)]
148 pub(crate) fn with_history_cache<F, R>(&self, f: F) -> R
149 where
150 F: FnOnce(&mut ContainerHistoryCache) -> R,
151 {
152 let mut history_cache = self.history_cache.lock().unwrap();
153 f(&mut history_cache)
154 }
155
156 pub fn has_history_cache(&self) -> bool {
157 self.history_cache.lock().unwrap().has_cache()
158 }
159
160 pub fn free_history_cache(&self) {
161 let mut history_cache = self.history_cache.lock().unwrap();
162 history_cache.free();
163 }
164
165 pub(crate) fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
176 self.insert_new_change(change, true);
177 Ok(())
178 }
179
180 pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
182 let Some(&end) = self.dag.vv().get(&change.id.peer) else {
183 return Some(change);
184 };
185
186 if change.id.counter >= end {
187 return Some(change);
188 }
189
190 if change.ctr_end() <= end {
191 return None;
192 }
193
194 let offset = (end - change.id.counter) as usize;
195 Some(change.slice(offset, change.atom_len()))
196 }
197
198 #[allow(unused)]
199 fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
200 let cur_end = self.dag.vv().get(&id.peer).cloned().unwrap_or(0);
201 if cur_end > id.counter {
202 return Err(LoroError::UsedOpID { id });
203 }
204
205 Ok(())
206 }
207
208 pub(crate) fn check_change_greater_than_last_peer_id(
213 &self,
214 peer: PeerID,
215 counter: Counter,
216 deps: &Frontiers,
217 ) -> Result<(), LoroError> {
218 if counter == 0 {
219 return Ok(());
220 }
221
222 if !self.configure.detached_editing() {
223 return Ok(());
224 }
225
226 let mut max_last_counter = -1;
227 for dep in deps.iter() {
228 let dep_vv = self.dag.get_vv(dep).unwrap();
229 max_last_counter = max_last_counter.max(dep_vv.get(&peer).cloned().unwrap_or(0) - 1);
230 }
231
232 if counter != max_last_counter + 1 {
233 return Err(LoroError::ConcurrentOpsWithSamePeerID {
234 peer,
235 last_counter: max_last_counter,
236 current: counter,
237 });
238 }
239
240 Ok(())
241 }
242
243 pub(crate) fn next_id(&self, peer: PeerID) -> ID {
244 let cnt = self.dag.vv().get(&peer).copied().unwrap_or(0);
245 ID::new(peer, cnt)
246 }
247
248 pub(crate) fn vv(&self) -> &VersionVector {
249 self.dag.vv()
250 }
251
252 pub(crate) fn frontiers(&self) -> &Frontiers {
253 self.dag.frontiers()
254 }
255
256 pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
260 self.dag.cmp_with_frontiers(other)
261 }
262
263 #[inline]
267 pub fn cmp_frontiers(
268 &self,
269 a: &Frontiers,
270 b: &Frontiers,
271 ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
272 self.dag.cmp_frontiers(a, b)
273 }
274
275 pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
276 self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
277 }
278
279 pub(crate) fn get_lamport_at(&self, id: ID) -> Option<Lamport> {
280 self.get_change_at(id)
281 .map(|c| c.lamport + (id.counter - c.id.counter) as Lamport)
282 }
283
284 pub(crate) fn iter_ops(&self, id_span: IdSpan) -> impl Iterator<Item = RichOp<'static>> + '_ {
285 let change_iter = self.change_store.iter_changes(id_span);
286 change_iter.flat_map(move |c| RichOp::new_iter_by_cnt_range(c, id_span.counter))
287 }
288
289 pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
290 self.get_change_at(id)
291 .map(|c| {
292 let change_counter = c.id.counter as u32;
293 c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
294 })
295 .unwrap_or(Lamport::MAX)
296 }
297
298 pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
299 self.change_store.get_change(id)
300 }
301
302 pub(crate) fn set_uncommitted_change(&mut self, change: Change) {
303 self.uncommitted_change = Some(change);
304 }
305
306 pub(crate) fn get_uncommitted_change_in_span(&self, id_span: IdSpan) -> Option<Cow<'_, Change>> {
307 self.uncommitted_change.as_ref().and_then(|c| {
308 if c.id_span() == id_span {
309 Some(Cow::Borrowed(c))
310 } else if let Some((start, end)) = id_span.get_slice_range_on(&c.id_span()) {
311 Some(Cow::Owned(c.slice(start, end)))
312 } else {
313 None
314 }
315 })
316 }
317
318 pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
319 self.get_change_at(id).map(|c| {
320 if c.id.counter == id.counter {
321 c.deps.clone()
322 } else {
323 Frontiers::from_id(id.inc(-1))
324 }
325 })
326 }
327
328 pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp<'static>>> {
329 let change = self.get_change_at(id)?;
330 Some(convert_change_to_remote(&self.arena, &change))
331 }
332
333 pub(crate) fn import_unknown_lamport_pending_changes(
334 &mut self,
335 remote_changes: Vec<Change>,
336 ) -> Result<(), LoroError> {
337 self.extend_pending_changes_with_unknown_lamport(remote_changes)
338 }
339
340 pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
344 self.change_store.get_change(id)
345 }
346
347 #[inline(always)]
348 pub(crate) fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
349 encode_oplog(self, vv, EncodeMode::Auto)
350 }
351
352 #[inline(always)]
353 pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
354 self.change_store
355 .export_from(vv, f, self.vv(), self.frontiers())
356 }
357
358 #[inline(always)]
359 pub(crate) fn export_change_store_in_range(
360 &self,
361 vv: &VersionVector,
362 f: &Frontiers,
363 to_vv: &VersionVector,
364 to_frontiers: &Frontiers,
365 ) -> Bytes {
366 self.change_store.export_from(vv, f, to_vv, to_frontiers)
367 }
368
369 #[inline(always)]
370 pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
371 self.change_store
372 .export_blocks_from(vv, self.shallow_since_vv(), self.vv(), w)
373 }
374
375 #[inline(always)]
376 pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
377 self.change_store.export_blocks_in_range(spans, w)
378 }
379
380 pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
381 let vv = self.dag.frontiers_to_vv(frontiers)?;
382 Some(
383 self.change_store
384 .fork_changes_up_to(self.dag.shallow_since_vv(), frontiers, &vv),
385 )
386 }
387
388 #[inline(always)]
389 pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<ImportStatus, LoroError> {
390 decode_oplog(self, data)
391 }
392
393 #[allow(clippy::type_complexity)]
404 pub(crate) fn iter_from_lca_causally(
405 &self,
406 from: &VersionVector,
407 from_frontiers: &Frontiers,
408 to: &VersionVector,
409 to_frontiers: &Frontiers,
410 ) -> (
411 VersionVector,
412 DiffMode,
413 impl Iterator<
414 Item = (
415 BlockChangeRef,
416 (Counter, Counter),
417 Rc<RefCell<VersionVector>>,
418 ),
419 > + '_,
420 ) {
421 let mut merged_vv = from.clone();
422 merged_vv.merge(to);
423 loro_common::debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
424 let (common_ancestors, mut diff_mode) =
425 self.dag.find_common_ancestor(from_frontiers, to_frontiers);
426 if diff_mode == DiffMode::Checkout && to > from {
427 diff_mode = DiffMode::Import;
428 }
429
430 let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
431 let diff = common_ancestors_vv.diff(&merged_vv).forward;
433 let mut iter = self.dag.iter_causal(common_ancestors, diff);
434 let mut node = iter.next();
435 let mut cur_cnt = 0;
436 let vv = Rc::new(RefCell::new(VersionVector::default()));
437 (
438 common_ancestors_vv.clone(),
439 diff_mode,
440 std::iter::from_fn(move || {
441 if let Some(inner) = &node {
442 let mut inner_vv = vv.borrow_mut();
443 inner_vv.clear();
445 self.dag.ensure_vv_for(&inner.data);
446 inner_vv.extend_to_include_vv(inner.data.vv.get().unwrap().iter());
447 let peer = inner.data.peer;
448 let cnt = inner
449 .data
450 .cnt
451 .max(cur_cnt)
452 .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
453 let dag_node_end = (inner.data.cnt + inner.data.len as Counter)
454 .min(merged_vv.get(&peer).copied().unwrap_or(0));
455 let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
456
457 if change.ctr_end() < dag_node_end {
458 cur_cnt = change.ctr_end();
459 } else {
460 node = iter.next();
461 cur_cnt = 0;
462 }
463
464 inner_vv.extend_to_include_end_id(change.id);
465
466 Some((change, (cnt, dag_node_end), vv.clone()))
467 } else {
468 None
469 }
470 }),
471 )
472 }
473
474 pub fn len_changes(&self) -> usize {
475 self.change_store.change_num()
476 }
477
478 pub fn diagnose_size(&self) -> SizeInfo {
479 let mut total_changes = 0;
480 let mut total_ops = 0;
481 let mut total_atom_ops = 0;
482 let total_dag_node = self.dag.total_parsed_dag_node();
483 self.change_store.visit_all_changes(&mut |change| {
484 total_changes += 1;
485 total_ops += change.ops.len();
486 total_atom_ops += change.atom_len();
487 });
488
489 println!("total changes: {}", total_changes);
490 println!("total ops: {}", total_ops);
491 println!("total atom ops: {}", total_atom_ops);
492 println!("total dag node: {}", total_dag_node);
493 SizeInfo {
494 total_changes,
495 total_ops,
496 total_atom_ops,
497 total_dag_node,
498 }
499 }
500
501 pub(crate) fn iter_changes_peer_by_peer<'a>(
502 &'a self,
503 from: &VersionVector,
504 to: &VersionVector,
505 ) -> impl Iterator<Item = BlockChangeRef> + 'a {
506 let spans: Vec<_> = from.diff_iter(to).1.collect();
507 spans
508 .into_iter()
509 .flat_map(move |span| self.change_store.iter_changes(span))
510 }
511
512 pub(crate) fn iter_changes_causally_rev<'a>(
513 &'a self,
514 from: &VersionVector,
515 to: &VersionVector,
516 ) -> impl Iterator<Item = BlockChangeRef> + 'a {
517 MergedChangeIter::new_change_iter_rev(self, from, to)
518 }
519
520 pub fn get_timestamp_for_next_txn(&self) -> Timestamp {
521 if self.configure.record_timestamp() {
522 get_timestamp_now_txn()
523 } else {
524 0
525 }
526 }
527
528 #[inline(never)]
529 pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
530 let change = self.change_store.get_change_by_lamport_lte(id)?;
531
532 if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
533 return None;
534 }
535
536 Some(ID::new(
537 change.id.peer,
538 (id.lamport - change.lamport) as Counter + change.id.counter,
539 ))
540 }
541
542 #[allow(unused)]
543 pub(crate) fn id_to_idlp(&self, id_start: ID) -> IdLp {
544 let change = self.get_change_at(id_start).unwrap();
545 let lamport = change.lamport + (id_start.counter - change.id.counter) as Lamport;
546 let peer = id_start.peer;
547 loro_common::IdLp { peer, lamport }
548 }
549
550 pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
552 let change = self.get_change_at(id)?;
553 change.get_op_with_counter(id.counter)
554 }
555
556 pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {
557 let peer = id_span.peer;
558 let mut counter = id_span.counter.min();
559 let span_end = id_span.counter.norm_end();
560 let mut ans = Vec::new();
561
562 while counter < span_end {
563 let id = ID::new(peer, counter);
564 let node = self.dag.get(id).unwrap();
565
566 let f = if node.cnt == counter {
567 node.deps.clone()
568 } else if counter > 0 {
569 id.inc(-1).into()
570 } else {
571 unreachable!()
572 };
573
574 let cur_end = node.cnt + node.len as Counter;
575 let len = cur_end.min(span_end) - counter;
576 ans.push((id.to_span(len as usize), f));
577 counter += len;
578 }
579
580 ans
581 }
582
583 #[inline]
584 pub fn compact_change_store(&mut self) {
585 self.change_store
586 .flush_and_compact(self.dag.vv(), self.dag.frontiers());
587 }
588
589 #[inline]
590 pub fn change_store_kv_size(&self) -> usize {
591 self.change_store.kv_size()
592 }
593
594 pub fn encode_change_store(&self) -> bytes::Bytes {
595 self.change_store
596 .encode_all(self.dag.vv(), self.dag.frontiers())
597 }
598
599 pub fn check_dag_correctness(&self) {
600 self.dag.check_dag_correctness();
601 }
602
603 pub fn shallow_since_vv(&self) -> &ImVersionVector {
604 self.dag.shallow_since_vv()
605 }
606
607 pub fn shallow_since_frontiers(&self) -> &Frontiers {
608 self.dag.shallow_since_frontiers()
609 }
610
611 pub fn is_shallow(&self) -> bool {
612 !self.dag.shallow_since_vv().is_empty()
613 }
614
615 pub fn get_greatest_timestamp(&self, frontiers: &Frontiers) -> Timestamp {
616 let mut max_timestamp = Timestamp::default();
617 for id in frontiers.iter() {
618 let change = self.get_change_at(id).unwrap();
619 if change.timestamp > max_timestamp {
620 max_timestamp = change.timestamp;
621 }
622 }
623
624 max_timestamp
625 }
626}
627
628#[derive(Debug)]
629pub struct SizeInfo {
630 pub total_changes: usize,
631 pub total_ops: usize,
632 pub total_atom_ops: usize,
633 pub total_dag_node: usize,
634}
635
636pub(crate) fn convert_change_to_remote(
637 arena: &SharedArena,
638 change: &Change,
639) -> Change<RemoteOp<'static>> {
640 let mut ops = RleVec::new();
641 for op in change.ops.iter() {
642 for op in local_op_to_remote(arena, op) {
643 ops.push(op);
644 }
645 }
646
647 Change {
648 ops,
649 id: change.id,
650 deps: change.deps.clone(),
651 lamport: change.lamport,
652 timestamp: change.timestamp,
653 commit_msg: change.commit_msg.clone(),
654 }
655}
656
657pub(crate) fn local_op_to_remote(
658 arena: &SharedArena,
659 op: &crate::op::Op,
660) -> SmallVec<[RemoteOp<'static>; 1]> {
661 let container = arena.get_container_id(op.container).unwrap();
662 let mut contents: SmallVec<[_; 1]> = SmallVec::new();
663 match &op.content {
664 crate::op::InnerContent::List(list) => match list {
665 list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
666 loro_common::ContainerType::Text => {
667 let str = arena
668 .slice_str_by_unicode_range(slice.0.start as usize..slice.0.end as usize);
669 contents.push(RawOpContent::List(list_op::ListOp::Insert {
670 slice: ListSlice::RawStr {
671 unicode_len: str.chars().count(),
672 str: Cow::Owned(str),
673 },
674 pos: *pos,
675 }));
676 }
677 loro_common::ContainerType::List | loro_common::ContainerType::MovableList => {
678 contents.push(RawOpContent::List(list_op::ListOp::Insert {
679 slice: ListSlice::RawData(Cow::Owned(
680 arena.get_values(slice.0.start as usize..slice.0.end as usize),
681 )),
682 pos: *pos,
683 }))
684 }
685 _ => unreachable!(),
686 },
687 list_op::InnerListOp::InsertText {
688 slice,
689 unicode_len: len,
690 unicode_start: _,
691 pos,
692 } => match container.container_type() {
693 loro_common::ContainerType::Text => {
694 contents.push(RawOpContent::List(list_op::ListOp::Insert {
695 slice: ListSlice::RawStr {
696 unicode_len: *len as usize,
697 str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
698 },
699 pos: *pos as usize,
700 }));
701 }
702 _ => unreachable!(),
703 },
704 list_op::InnerListOp::Delete(del) => {
705 contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
706 }
707 list_op::InnerListOp::StyleStart {
708 start,
709 end,
710 key,
711 value,
712 info,
713 } => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
714 start: *start,
715 end: *end,
716 key: key.clone(),
717 value: value.clone(),
718 info: *info,
719 })),
720 list_op::InnerListOp::StyleEnd => {
721 contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
722 }
723 list_op::InnerListOp::Move {
724 from,
725 elem_id: from_id,
726 to,
727 } => contents.push(RawOpContent::List(list_op::ListOp::Move {
728 from: *from,
729 elem_id: *from_id,
730 to: *to,
731 })),
732 list_op::InnerListOp::Set { elem_id, value } => {
733 contents.push(RawOpContent::List(list_op::ListOp::Set {
734 elem_id: *elem_id,
735 value: value.clone(),
736 }))
737 }
738 },
739 crate::op::InnerContent::Map(map) => {
740 let value = map.value.clone();
741 contents.push(RawOpContent::Map(crate::container::map::MapSet {
742 key: map.key.clone(),
743 value,
744 }))
745 }
746 crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(tree.clone())),
747 crate::op::InnerContent::Future(f) => match f {
748 #[cfg(feature = "counter")]
749 crate::op::FutureInnerContent::Counter(c) => contents.push(RawOpContent::Counter(*c)),
750 FutureInnerContent::Unknown { prop, value } => {
751 contents.push(crate::op::RawOpContent::Unknown {
752 prop: *prop,
753 value: (**value).clone(),
754 })
755 }
756 },
757 };
758
759 let mut ans = SmallVec::with_capacity(contents.len());
760 for content in contents {
761 ans.push(RemoteOp {
762 container: container.clone(),
763 content,
764 counter: op.counter,
765 })
766 }
767 ans
768}
769
770pub(crate) fn get_timestamp_now_txn() -> Timestamp {
771 (get_sys_timestamp() as Timestamp + 500) / 1000
772}