loro_internal/
oplog.rs

1mod change_store;
2pub(crate) mod loro_dag;
3mod pending_changes;
4
5use crate::sync::Mutex;
6use bytes::Bytes;
7use std::borrow::Cow;
8use std::cell::RefCell;
9use std::cmp::Ordering;
10use std::rc::Rc;
11use tracing::trace_span;
12
13use self::change_store::iter::MergedChangeIter;
14use self::pending_changes::PendingChanges;
15use super::arena::SharedArena;
16use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
17use crate::configure::Configure;
18use crate::container::list::list_op;
19use crate::dag::{Dag, DagUtils};
20use crate::diff_calc::DiffMode;
21use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
22use crate::encoding::{ImportStatus, ParsedHeaderAndBody};
23use crate::history_cache::ContainerHistoryCache;
24use crate::id::{Counter, PeerID, ID};
25use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
26use crate::span::{HasCounterSpan, HasLamportSpan};
27use crate::version::{Frontiers, ImVersionVector, VersionVector};
28use crate::LoroError;
29use change_store::BlockOpRef;
30use loro_common::{HasIdSpan, IdLp, IdSpan};
31use rle::{HasLength, RleVec, Sliceable};
32use smallvec::SmallVec;
33
34pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
35pub use change_store::{BlockChangeRef, ChangeStore};
36
37/// [OpLog] store all the ops i.e. the history.
38/// It allows multiple [AppState] to attach to it.
39/// So you can derive different versions of the state from the [OpLog].
40/// It allows us to build a version control system.
41///
42/// The causal graph should always be a DAG and complete. So we can always find the LCA.
43/// If deps are missing, we can't import the change. It will be put into the `pending_changes`.
44pub struct OpLog {
45    pub(crate) dag: AppDag,
46    pub(crate) arena: SharedArena,
47    change_store: ChangeStore,
48    history_cache: Mutex<ContainerHistoryCache>,
49    /// Pending changes that haven't been applied to the dag.
50    /// A change can be imported only when all its deps are already imported.
51    /// Key is the ID of the missing dep
52    pub(crate) pending_changes: PendingChanges,
53    /// Whether we are importing a batch of changes.
54    /// If so the Dag's frontiers won't be updated until the batch is finished.
55    pub(crate) batch_importing: bool,
56    pub(crate) configure: Configure,
57    /// The uncommitted change, it's a placeholder for the change
58    /// that is being edited in pre-commit callback.
59    pub(crate) uncommitted_change: Option<Change>,
60}
61
62impl std::fmt::Debug for OpLog {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("OpLog")
65            .field("dag", &self.dag)
66            .field("pending_changes", &self.pending_changes)
67            .finish()
68    }
69}
70
71impl OpLog {
72    #[inline]
73    pub(crate) fn new() -> Self {
74        let arena = SharedArena::new();
75        let cfg = Configure::default();
76        let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
77        Self {
78            history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
79            dag: AppDag::new(change_store.clone()),
80            change_store,
81            arena,
82            pending_changes: Default::default(),
83            batch_importing: false,
84            configure: cfg,
85            uncommitted_change: None,
86        }
87    }
88
89    #[inline]
90    pub fn dag(&self) -> &AppDag {
91        &self.dag
92    }
93
94    pub fn change_store(&self) -> &ChangeStore {
95        &self.change_store
96    }
97
98    /// Get the change with the given peer and lamport.
99    ///
100    /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
101    pub fn get_change_with_lamport_lte(
102        &self,
103        peer: PeerID,
104        lamport: Lamport,
105    ) -> Option<BlockChangeRef> {
106        let ans = self
107            .change_store
108            .get_change_by_lamport_lte(IdLp::new(peer, lamport))?;
109        debug_assert!(ans.lamport <= lamport);
110        Some(ans)
111    }
112
113    pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
114        let mut timestamp = Timestamp::default();
115        for id in f.iter() {
116            if let Some(change) = self.lookup_change(id) {
117                timestamp = timestamp.max(change.timestamp);
118            }
119        }
120
121        timestamp
122    }
123
124    #[inline]
125    pub fn is_empty(&self) -> bool {
126        self.dag.is_empty() && self.arena.can_import_snapshot()
127    }
128
129    /// This is the **only** place to update the `OpLog.changes`
130    pub(crate) fn insert_new_change(&mut self, change: Change, from_local: bool) {
131        let s = trace_span!(
132            "insert_new_change",
133            id = ?change.id,
134            lamport = change.lamport,
135            deps = ?change.deps
136        );
137        let _enter = s.enter();
138        self.dag.handle_new_change(&change, from_local);
139        self.history_cache
140            .lock()
141            .unwrap()
142            .insert_by_new_change(&change, true, true);
143        self.register_container_and_parent_link(&change);
144        self.change_store.insert_change(change, true, from_local);
145    }
146
147    #[inline(always)]
148    pub(crate) fn with_history_cache<F, R>(&self, f: F) -> R
149    where
150        F: FnOnce(&mut ContainerHistoryCache) -> R,
151    {
152        let mut history_cache = self.history_cache.lock().unwrap();
153        f(&mut history_cache)
154    }
155
156    pub fn has_history_cache(&self) -> bool {
157        self.history_cache.lock().unwrap().has_cache()
158    }
159
160    pub fn free_history_cache(&self) {
161        let mut history_cache = self.history_cache.lock().unwrap();
162        history_cache.free();
163    }
164
165    /// Import a change.
166    ///
167    /// Pending changes that haven't been applied to the dag.
168    /// A change can be imported only when all its deps are already imported.
169    /// Key is the ID of the missing dep
170    ///
171    /// # Err
172    ///
173    /// - Return Err(LoroError::UsedOpID) when the change's id is occupied
174    /// - Return Err(LoroError::DecodeError) when the change's deps are missing
175    pub(crate) fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
176        self.insert_new_change(change, true);
177        Ok(())
178    }
179
180    /// Trim the known part of change
181    pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
182        let Some(&end) = self.dag.vv().get(&change.id.peer) else {
183            return Some(change);
184        };
185
186        if change.id.counter >= end {
187            return Some(change);
188        }
189
190        if change.ctr_end() <= end {
191            return None;
192        }
193
194        let offset = (end - change.id.counter) as usize;
195        Some(change.slice(offset, change.atom_len()))
196    }
197
198    #[allow(unused)]
199    fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
200        let cur_end = self.dag.vv().get(&id.peer).cloned().unwrap_or(0);
201        if cur_end > id.counter {
202            return Err(LoroError::UsedOpID { id });
203        }
204
205        Ok(())
206    }
207
208    /// Ensure the new change is greater than the last peer's id and the counter is continuous.
209    ///
210    /// It can be false when users use detached editing mode and use a custom peer id.
211    // This method might be slow and can be optimized if needed in the future.
212    pub(crate) fn check_change_greater_than_last_peer_id(
213        &self,
214        peer: PeerID,
215        counter: Counter,
216        deps: &Frontiers,
217    ) -> Result<(), LoroError> {
218        if counter == 0 {
219            return Ok(());
220        }
221
222        if !self.configure.detached_editing() {
223            return Ok(());
224        }
225
226        let mut max_last_counter = -1;
227        for dep in deps.iter() {
228            let dep_vv = self.dag.get_vv(dep).unwrap();
229            max_last_counter = max_last_counter.max(dep_vv.get(&peer).cloned().unwrap_or(0) - 1);
230        }
231
232        if counter != max_last_counter + 1 {
233            return Err(LoroError::ConcurrentOpsWithSamePeerID {
234                peer,
235                last_counter: max_last_counter,
236                current: counter,
237            });
238        }
239
240        Ok(())
241    }
242
243    pub(crate) fn next_id(&self, peer: PeerID) -> ID {
244        let cnt = self.dag.vv().get(&peer).copied().unwrap_or(0);
245        ID::new(peer, cnt)
246    }
247
248    pub(crate) fn vv(&self) -> &VersionVector {
249        self.dag.vv()
250    }
251
252    pub(crate) fn frontiers(&self) -> &Frontiers {
253        self.dag.frontiers()
254    }
255
256    /// - Ordering::Less means self is less than target or parallel
257    /// - Ordering::Equal means versions equal
258    /// - Ordering::Greater means self's version is greater than target
259    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
260        self.dag.cmp_with_frontiers(other)
261    }
262
263    /// Compare two [Frontiers] causally.
264    ///
265    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
266    #[inline]
267    pub fn cmp_frontiers(
268        &self,
269        a: &Frontiers,
270        b: &Frontiers,
271    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
272        self.dag.cmp_frontiers(a, b)
273    }
274
275    pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
276        self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
277    }
278
279    pub(crate) fn get_lamport_at(&self, id: ID) -> Option<Lamport> {
280        self.get_change_at(id)
281            .map(|c| c.lamport + (id.counter - c.id.counter) as Lamport)
282    }
283
284    pub(crate) fn iter_ops(&self, id_span: IdSpan) -> impl Iterator<Item = RichOp<'static>> + '_ {
285        let change_iter = self.change_store.iter_changes(id_span);
286        change_iter.flat_map(move |c| RichOp::new_iter_by_cnt_range(c, id_span.counter))
287    }
288
289    pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
290        self.get_change_at(id)
291            .map(|c| {
292                let change_counter = c.id.counter as u32;
293                c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
294            })
295            .unwrap_or(Lamport::MAX)
296    }
297
298    pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
299        self.change_store.get_change(id)
300    }
301
302    pub(crate) fn set_uncommitted_change(&mut self, change: Change) {
303        self.uncommitted_change = Some(change);
304    }
305
306    pub(crate) fn get_uncommitted_change_in_span(&self, id_span: IdSpan) -> Option<Cow<'_, Change>> {
307        self.uncommitted_change.as_ref().and_then(|c| {
308            if c.id_span() == id_span {
309                Some(Cow::Borrowed(c))
310            } else if let Some((start, end)) = id_span.get_slice_range_on(&c.id_span()) {
311                Some(Cow::Owned(c.slice(start, end)))
312            } else {
313                None
314            }
315        })
316    }
317
318    pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
319        self.get_change_at(id).map(|c| {
320            if c.id.counter == id.counter {
321                c.deps.clone()
322            } else {
323                Frontiers::from_id(id.inc(-1))
324            }
325        })
326    }
327
328    pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp<'static>>> {
329        let change = self.get_change_at(id)?;
330        Some(convert_change_to_remote(&self.arena, &change))
331    }
332
333    pub(crate) fn import_unknown_lamport_pending_changes(
334        &mut self,
335        remote_changes: Vec<Change>,
336    ) -> Result<(), LoroError> {
337        self.extend_pending_changes_with_unknown_lamport(remote_changes)
338    }
339
340    /// lookup change by id.
341    ///
342    /// if id does not included in this oplog, return None
343    pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
344        self.change_store.get_change(id)
345    }
346
347    #[inline(always)]
348    pub(crate) fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
349        encode_oplog(self, vv, EncodeMode::Auto)
350    }
351
352    #[inline(always)]
353    pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
354        self.change_store
355            .export_from(vv, f, self.vv(), self.frontiers())
356    }
357
358    #[inline(always)]
359    pub(crate) fn export_change_store_in_range(
360        &self,
361        vv: &VersionVector,
362        f: &Frontiers,
363        to_vv: &VersionVector,
364        to_frontiers: &Frontiers,
365    ) -> Bytes {
366        self.change_store.export_from(vv, f, to_vv, to_frontiers)
367    }
368
369    #[inline(always)]
370    pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
371        self.change_store
372            .export_blocks_from(vv, self.shallow_since_vv(), self.vv(), w)
373    }
374
375    #[inline(always)]
376    pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
377        self.change_store.export_blocks_in_range(spans, w)
378    }
379
380    pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
381        let vv = self.dag.frontiers_to_vv(frontiers)?;
382        Some(
383            self.change_store
384                .fork_changes_up_to(self.dag.shallow_since_vv(), frontiers, &vv),
385        )
386    }
387
388    #[inline(always)]
389    pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<ImportStatus, LoroError> {
390        decode_oplog(self, data)
391    }
392
393    /// iterates over all changes between LCA(common ancestors) to the merged version of (`from` and `to`) causally
394    ///
395    /// Tht iterator will include a version vector when the change is applied
396    ///
397    /// returns: (common_ancestor_vv, iterator)
398    ///
399    /// Note: the change returned by the iterator may include redundant ops at the beginning, you should trim it by yourself.
400    /// You can trim it by the provided counter value. It should start with the counter.
401    ///
402    /// If frontiers are provided, it will be faster (because we don't need to calculate it from version vector
403    #[allow(clippy::type_complexity)]
404    pub(crate) fn iter_from_lca_causally(
405        &self,
406        from: &VersionVector,
407        from_frontiers: &Frontiers,
408        to: &VersionVector,
409        to_frontiers: &Frontiers,
410    ) -> (
411        VersionVector,
412        DiffMode,
413        impl Iterator<
414                Item = (
415                    BlockChangeRef,
416                    (Counter, Counter),
417                    Rc<RefCell<VersionVector>>,
418                ),
419            > + '_,
420    ) {
421        let mut merged_vv = from.clone();
422        merged_vv.merge(to);
423        loro_common::debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
424        let (common_ancestors, mut diff_mode) =
425            self.dag.find_common_ancestor(from_frontiers, to_frontiers);
426        if diff_mode == DiffMode::Checkout && to > from {
427            diff_mode = DiffMode::Import;
428        }
429
430        let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
431        // go from lca to merged_vv
432        let diff = common_ancestors_vv.diff(&merged_vv).forward;
433        let mut iter = self.dag.iter_causal(common_ancestors, diff);
434        let mut node = iter.next();
435        let mut cur_cnt = 0;
436        let vv = Rc::new(RefCell::new(VersionVector::default()));
437        (
438            common_ancestors_vv.clone(),
439            diff_mode,
440            std::iter::from_fn(move || {
441                if let Some(inner) = &node {
442                    let mut inner_vv = vv.borrow_mut();
443                    // FIXME: PERF: it looks slow for large vv, like 10000+ entries
444                    inner_vv.clear();
445                    self.dag.ensure_vv_for(&inner.data);
446                    inner_vv.extend_to_include_vv(inner.data.vv.get().unwrap().iter());
447                    let peer = inner.data.peer;
448                    let cnt = inner
449                        .data
450                        .cnt
451                        .max(cur_cnt)
452                        .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
453                    let dag_node_end = (inner.data.cnt + inner.data.len as Counter)
454                        .min(merged_vv.get(&peer).copied().unwrap_or(0));
455                    let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
456
457                    if change.ctr_end() < dag_node_end {
458                        cur_cnt = change.ctr_end();
459                    } else {
460                        node = iter.next();
461                        cur_cnt = 0;
462                    }
463
464                    inner_vv.extend_to_include_end_id(change.id);
465
466                    Some((change, (cnt, dag_node_end), vv.clone()))
467                } else {
468                    None
469                }
470            }),
471        )
472    }
473
474    pub fn len_changes(&self) -> usize {
475        self.change_store.change_num()
476    }
477
478    pub fn diagnose_size(&self) -> SizeInfo {
479        let mut total_changes = 0;
480        let mut total_ops = 0;
481        let mut total_atom_ops = 0;
482        let total_dag_node = self.dag.total_parsed_dag_node();
483        self.change_store.visit_all_changes(&mut |change| {
484            total_changes += 1;
485            total_ops += change.ops.len();
486            total_atom_ops += change.atom_len();
487        });
488
489        println!("total changes: {}", total_changes);
490        println!("total ops: {}", total_ops);
491        println!("total atom ops: {}", total_atom_ops);
492        println!("total dag node: {}", total_dag_node);
493        SizeInfo {
494            total_changes,
495            total_ops,
496            total_atom_ops,
497            total_dag_node,
498        }
499    }
500
501    pub(crate) fn iter_changes_peer_by_peer<'a>(
502        &'a self,
503        from: &VersionVector,
504        to: &VersionVector,
505    ) -> impl Iterator<Item = BlockChangeRef> + 'a {
506        let spans: Vec<_> = from.diff_iter(to).1.collect();
507        spans
508            .into_iter()
509            .flat_map(move |span| self.change_store.iter_changes(span))
510    }
511
512    pub(crate) fn iter_changes_causally_rev<'a>(
513        &'a self,
514        from: &VersionVector,
515        to: &VersionVector,
516    ) -> impl Iterator<Item = BlockChangeRef> + 'a {
517        MergedChangeIter::new_change_iter_rev(self, from, to)
518    }
519
520    pub fn get_timestamp_for_next_txn(&self) -> Timestamp {
521        if self.configure.record_timestamp() {
522            get_timestamp_now_txn()
523        } else {
524            0
525        }
526    }
527
528    #[inline(never)]
529    pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
530        let change = self.change_store.get_change_by_lamport_lte(id)?;
531
532        if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
533            return None;
534        }
535
536        Some(ID::new(
537            change.id.peer,
538            (id.lamport - change.lamport) as Counter + change.id.counter,
539        ))
540    }
541
542    #[allow(unused)]
543    pub(crate) fn id_to_idlp(&self, id_start: ID) -> IdLp {
544        let change = self.get_change_at(id_start).unwrap();
545        let lamport = change.lamport + (id_start.counter - change.id.counter) as Lamport;
546        let peer = id_start.peer;
547        loro_common::IdLp { peer, lamport }
548    }
549
550    /// NOTE: This may return a op that includes the given id, not necessarily start with the given id
551    pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
552        let change = self.get_change_at(id)?;
553        change.get_op_with_counter(id.counter)
554    }
555
556    pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {
557        let peer = id_span.peer;
558        let mut counter = id_span.counter.min();
559        let span_end = id_span.counter.norm_end();
560        let mut ans = Vec::new();
561
562        while counter < span_end {
563            let id = ID::new(peer, counter);
564            let node = self.dag.get(id).unwrap();
565
566            let f = if node.cnt == counter {
567                node.deps.clone()
568            } else if counter > 0 {
569                id.inc(-1).into()
570            } else {
571                unreachable!()
572            };
573
574            let cur_end = node.cnt + node.len as Counter;
575            let len = cur_end.min(span_end) - counter;
576            ans.push((id.to_span(len as usize), f));
577            counter += len;
578        }
579
580        ans
581    }
582
583    #[inline]
584    pub fn compact_change_store(&mut self) {
585        self.change_store
586            .flush_and_compact(self.dag.vv(), self.dag.frontiers());
587    }
588
589    #[inline]
590    pub fn change_store_kv_size(&self) -> usize {
591        self.change_store.kv_size()
592    }
593
594    pub fn encode_change_store(&self) -> bytes::Bytes {
595        self.change_store
596            .encode_all(self.dag.vv(), self.dag.frontiers())
597    }
598
599    pub fn check_dag_correctness(&self) {
600        self.dag.check_dag_correctness();
601    }
602
603    pub fn shallow_since_vv(&self) -> &ImVersionVector {
604        self.dag.shallow_since_vv()
605    }
606
607    pub fn shallow_since_frontiers(&self) -> &Frontiers {
608        self.dag.shallow_since_frontiers()
609    }
610
611    pub fn is_shallow(&self) -> bool {
612        !self.dag.shallow_since_vv().is_empty()
613    }
614
615    pub fn get_greatest_timestamp(&self, frontiers: &Frontiers) -> Timestamp {
616        let mut max_timestamp = Timestamp::default();
617        for id in frontiers.iter() {
618            let change = self.get_change_at(id).unwrap();
619            if change.timestamp > max_timestamp {
620                max_timestamp = change.timestamp;
621            }
622        }
623
624        max_timestamp
625    }
626}
627
628#[derive(Debug)]
629pub struct SizeInfo {
630    pub total_changes: usize,
631    pub total_ops: usize,
632    pub total_atom_ops: usize,
633    pub total_dag_node: usize,
634}
635
636pub(crate) fn convert_change_to_remote(
637    arena: &SharedArena,
638    change: &Change,
639) -> Change<RemoteOp<'static>> {
640    let mut ops = RleVec::new();
641    for op in change.ops.iter() {
642        for op in local_op_to_remote(arena, op) {
643            ops.push(op);
644        }
645    }
646
647    Change {
648        ops,
649        id: change.id,
650        deps: change.deps.clone(),
651        lamport: change.lamport,
652        timestamp: change.timestamp,
653        commit_msg: change.commit_msg.clone(),
654    }
655}
656
657pub(crate) fn local_op_to_remote(
658    arena: &SharedArena,
659    op: &crate::op::Op,
660) -> SmallVec<[RemoteOp<'static>; 1]> {
661    let container = arena.get_container_id(op.container).unwrap();
662    let mut contents: SmallVec<[_; 1]> = SmallVec::new();
663    match &op.content {
664        crate::op::InnerContent::List(list) => match list {
665            list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
666                loro_common::ContainerType::Text => {
667                    let str = arena
668                        .slice_str_by_unicode_range(slice.0.start as usize..slice.0.end as usize);
669                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
670                        slice: ListSlice::RawStr {
671                            unicode_len: str.chars().count(),
672                            str: Cow::Owned(str),
673                        },
674                        pos: *pos,
675                    }));
676                }
677                loro_common::ContainerType::List | loro_common::ContainerType::MovableList => {
678                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
679                        slice: ListSlice::RawData(Cow::Owned(
680                            arena.get_values(slice.0.start as usize..slice.0.end as usize),
681                        )),
682                        pos: *pos,
683                    }))
684                }
685                _ => unreachable!(),
686            },
687            list_op::InnerListOp::InsertText {
688                slice,
689                unicode_len: len,
690                unicode_start: _,
691                pos,
692            } => match container.container_type() {
693                loro_common::ContainerType::Text => {
694                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
695                        slice: ListSlice::RawStr {
696                            unicode_len: *len as usize,
697                            str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
698                        },
699                        pos: *pos as usize,
700                    }));
701                }
702                _ => unreachable!(),
703            },
704            list_op::InnerListOp::Delete(del) => {
705                contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
706            }
707            list_op::InnerListOp::StyleStart {
708                start,
709                end,
710                key,
711                value,
712                info,
713            } => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
714                start: *start,
715                end: *end,
716                key: key.clone(),
717                value: value.clone(),
718                info: *info,
719            })),
720            list_op::InnerListOp::StyleEnd => {
721                contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
722            }
723            list_op::InnerListOp::Move {
724                from,
725                elem_id: from_id,
726                to,
727            } => contents.push(RawOpContent::List(list_op::ListOp::Move {
728                from: *from,
729                elem_id: *from_id,
730                to: *to,
731            })),
732            list_op::InnerListOp::Set { elem_id, value } => {
733                contents.push(RawOpContent::List(list_op::ListOp::Set {
734                    elem_id: *elem_id,
735                    value: value.clone(),
736                }))
737            }
738        },
739        crate::op::InnerContent::Map(map) => {
740            let value = map.value.clone();
741            contents.push(RawOpContent::Map(crate::container::map::MapSet {
742                key: map.key.clone(),
743                value,
744            }))
745        }
746        crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(tree.clone())),
747        crate::op::InnerContent::Future(f) => match f {
748            #[cfg(feature = "counter")]
749            crate::op::FutureInnerContent::Counter(c) => contents.push(RawOpContent::Counter(*c)),
750            FutureInnerContent::Unknown { prop, value } => {
751                contents.push(crate::op::RawOpContent::Unknown {
752                    prop: *prop,
753                    value: (**value).clone(),
754                })
755            }
756        },
757    };
758
759    let mut ans = SmallVec::with_capacity(contents.len());
760    for content in contents {
761        ans.push(RemoteOp {
762            container: container.clone(),
763            content,
764            counter: op.counter,
765        })
766    }
767    ans
768}
769
770pub(crate) fn get_timestamp_now_txn() -> Timestamp {
771    (get_sys_timestamp() as Timestamp + 500) / 1000
772}