loro_internal/
oplog.rs

1mod change_store;
2pub(crate) mod loro_dag;
3mod pending_changes;
4
5use bytes::Bytes;
6use std::borrow::Cow;
7use std::cell::RefCell;
8use std::cmp::Ordering;
9use std::rc::Rc;
10use std::sync::Mutex;
11use tracing::{debug, trace, trace_span};
12
13use self::change_store::iter::MergedChangeIter;
14use self::pending_changes::PendingChanges;
15use super::arena::SharedArena;
16use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
17use crate::configure::Configure;
18use crate::container::list::list_op;
19use crate::dag::{Dag, DagUtils};
20use crate::diff_calc::DiffMode;
21use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
22use crate::encoding::{ImportStatus, ParsedHeaderAndBody};
23use crate::history_cache::ContainerHistoryCache;
24use crate::id::{Counter, PeerID, ID};
25use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
26use crate::span::{HasCounterSpan, HasLamportSpan};
27use crate::version::{Frontiers, ImVersionVector, VersionVector};
28use crate::LoroError;
29use change_store::BlockOpRef;
30use loro_common::{IdLp, IdSpan};
31use rle::{HasLength, RleVec, Sliceable};
32use smallvec::SmallVec;
33
34pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
35pub use change_store::{BlockChangeRef, ChangeStore};
36
37/// [OpLog] store all the ops i.e. the history.
38/// It allows multiple [AppState] to attach to it.
39/// So you can derive different versions of the state from the [OpLog].
40/// It allows us to build a version control system.
41///
42/// The causal graph should always be a DAG and complete. So we can always find the LCA.
43/// If deps are missing, we can't import the change. It will be put into the `pending_changes`.
44pub struct OpLog {
45    pub(crate) dag: AppDag,
46    pub(crate) arena: SharedArena,
47    change_store: ChangeStore,
48    history_cache: Mutex<ContainerHistoryCache>,
49    /// Pending changes that haven't been applied to the dag.
50    /// A change can be imported only when all its deps are already imported.
51    /// Key is the ID of the missing dep
52    pub(crate) pending_changes: PendingChanges,
53    /// Whether we are importing a batch of changes.
54    /// If so the Dag's frontiers won't be updated until the batch is finished.
55    pub(crate) batch_importing: bool,
56    pub(crate) configure: Configure,
57}
58
59impl std::fmt::Debug for OpLog {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("OpLog")
62            .field("dag", &self.dag)
63            .field("pending_changes", &self.pending_changes)
64            .finish()
65    }
66}
67
68impl OpLog {
69    #[inline]
70    pub(crate) fn new() -> Self {
71        let arena = SharedArena::new();
72        let cfg = Configure::default();
73        let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
74        Self {
75            history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
76            dag: AppDag::new(change_store.clone()),
77            change_store,
78            arena,
79            pending_changes: Default::default(),
80            batch_importing: false,
81            configure: cfg,
82        }
83    }
84
85    #[inline]
86    pub fn dag(&self) -> &AppDag {
87        &self.dag
88    }
89
90    pub fn change_store(&self) -> &ChangeStore {
91        &self.change_store
92    }
93
94    /// Get the change with the given peer and lamport.
95    ///
96    /// If not found, return the change with the greatest lamport that is smaller than the given lamport.
97    pub fn get_change_with_lamport_lte(
98        &self,
99        peer: PeerID,
100        lamport: Lamport,
101    ) -> Option<BlockChangeRef> {
102        let ans = self
103            .change_store
104            .get_change_by_lamport_lte(IdLp::new(peer, lamport))?;
105        debug_assert!(ans.lamport <= lamport);
106        Some(ans)
107    }
108
109    pub fn get_timestamp_of_version(&self, f: &Frontiers) -> Timestamp {
110        let mut timestamp = Timestamp::default();
111        for id in f.iter() {
112            if let Some(change) = self.lookup_change(id) {
113                timestamp = timestamp.max(change.timestamp);
114            }
115        }
116
117        timestamp
118    }
119
120    #[inline]
121    pub fn is_empty(&self) -> bool {
122        self.dag.is_empty() && self.arena.can_import_snapshot()
123    }
124
125    /// This is the **only** place to update the `OpLog.changes`
126    pub(crate) fn insert_new_change(&mut self, change: Change, from_local: bool) {
127        let s = trace_span!(
128            "insert_new_change",
129            id = ?change.id,
130            lamport = change.lamport,
131            deps = ?change.deps
132        );
133        let _enter = s.enter();
134        self.dag.handle_new_change(&change, from_local);
135        self.history_cache
136            .try_lock()
137            .unwrap()
138            .insert_by_new_change(&change, true, true);
139        self.register_container_and_parent_link(&change);
140        self.change_store.insert_change(change, true, from_local);
141    }
142
143    #[inline(always)]
144    pub(crate) fn with_history_cache<F, R>(&self, f: F) -> R
145    where
146        F: FnOnce(&mut ContainerHistoryCache) -> R,
147    {
148        let mut history_cache = self.history_cache.try_lock().unwrap();
149        f(&mut history_cache)
150    }
151
152    pub fn has_history_cache(&self) -> bool {
153        self.history_cache.try_lock().unwrap().has_cache()
154    }
155
156    pub fn free_history_cache(&self) {
157        let mut history_cache = self.history_cache.try_lock().unwrap();
158        history_cache.free();
159    }
160
161    /// Import a change.
162    ///
163    /// Pending changes that haven't been applied to the dag.
164    /// A change can be imported only when all its deps are already imported.
165    /// Key is the ID of the missing dep
166    ///
167    /// # Err
168    ///
169    /// - Return Err(LoroError::UsedOpID) when the change's id is occupied
170    /// - Return Err(LoroError::DecodeError) when the change's deps are missing
171    pub(crate) fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
172        self.insert_new_change(change, true);
173        Ok(())
174    }
175
176    /// Trim the known part of change
177    pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
178        let Some(&end) = self.dag.vv().get(&change.id.peer) else {
179            return Some(change);
180        };
181
182        if change.id.counter >= end {
183            return Some(change);
184        }
185
186        if change.ctr_end() <= end {
187            return None;
188        }
189
190        let offset = (end - change.id.counter) as usize;
191        Some(change.slice(offset, change.atom_len()))
192    }
193
194    #[allow(unused)]
195    fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
196        let cur_end = self.dag.vv().get(&id.peer).cloned().unwrap_or(0);
197        if cur_end > id.counter {
198            return Err(LoroError::UsedOpID { id });
199        }
200
201        Ok(())
202    }
203
204    /// Ensure the new change is greater than the last peer's id and the counter is continuous.
205    ///
206    /// It can be false when users use detached editing mode and use a custom peer id.
207    // This method might be slow and can be optimized if needed in the future.
208    pub(crate) fn check_change_greater_than_last_peer_id(
209        &self,
210        peer: PeerID,
211        counter: Counter,
212        deps: &Frontiers,
213    ) -> Result<(), LoroError> {
214        if counter == 0 {
215            return Ok(());
216        }
217
218        if !self.configure.detached_editing() {
219            return Ok(());
220        }
221
222        let mut max_last_counter = -1;
223        for dep in deps.iter() {
224            let dep_vv = self.dag.get_vv(dep).unwrap();
225            max_last_counter = max_last_counter.max(dep_vv.get(&peer).cloned().unwrap_or(0) - 1);
226        }
227
228        if counter != max_last_counter + 1 {
229            return Err(LoroError::ConcurrentOpsWithSamePeerID {
230                peer,
231                last_counter: max_last_counter,
232                current: counter,
233            });
234        }
235
236        Ok(())
237    }
238
239    pub(crate) fn next_id(&self, peer: PeerID) -> ID {
240        let cnt = self.dag.vv().get(&peer).copied().unwrap_or(0);
241        ID::new(peer, cnt)
242    }
243
244    pub(crate) fn vv(&self) -> &VersionVector {
245        self.dag.vv()
246    }
247
248    pub(crate) fn frontiers(&self) -> &Frontiers {
249        self.dag.frontiers()
250    }
251
252    /// - Ordering::Less means self is less than target or parallel
253    /// - Ordering::Equal means versions equal
254    /// - Ordering::Greater means self's version is greater than target
255    pub fn cmp_with_frontiers(&self, other: &Frontiers) -> Ordering {
256        self.dag.cmp_with_frontiers(other)
257    }
258
259    /// Compare two [Frontiers] causally.
260    ///
261    /// If one of the [Frontiers] are not included, it will return [FrontiersNotIncluded].
262    #[inline]
263    pub fn cmp_frontiers(
264        &self,
265        a: &Frontiers,
266        b: &Frontiers,
267    ) -> Result<Option<Ordering>, FrontiersNotIncluded> {
268        self.dag.cmp_frontiers(a, b)
269    }
270
271    pub(crate) fn get_min_lamport_at(&self, id: ID) -> Lamport {
272        self.get_change_at(id).map(|c| c.lamport).unwrap_or(0)
273    }
274
275    pub(crate) fn get_lamport_at(&self, id: ID) -> Option<Lamport> {
276        self.get_change_at(id)
277            .map(|c| c.lamport + (id.counter - c.id.counter) as Lamport)
278    }
279
280    pub(crate) fn iter_ops(&self, id_span: IdSpan) -> impl Iterator<Item = RichOp<'static>> + '_ {
281        let change_iter = self.change_store.iter_changes(id_span);
282        change_iter.flat_map(move |c| RichOp::new_iter_by_cnt_range(c, id_span.counter))
283    }
284
285    pub(crate) fn get_max_lamport_at(&self, id: ID) -> Lamport {
286        self.get_change_at(id)
287            .map(|c| {
288                let change_counter = c.id.counter as u32;
289                c.lamport + c.ops().last().map(|op| op.counter).unwrap_or(0) as u32 - change_counter
290            })
291            .unwrap_or(Lamport::MAX)
292    }
293
294    pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
295        self.change_store.get_change(id)
296    }
297
298    pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
299        self.get_change_at(id).map(|c| {
300            if c.id.counter == id.counter {
301                c.deps.clone()
302            } else {
303                Frontiers::from_id(id.inc(-1))
304            }
305        })
306    }
307
308    pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp<'static>>> {
309        let change = self.get_change_at(id)?;
310        Some(convert_change_to_remote(&self.arena, &change))
311    }
312
313    pub(crate) fn import_unknown_lamport_pending_changes(
314        &mut self,
315        remote_changes: Vec<Change>,
316    ) -> Result<(), LoroError> {
317        self.extend_pending_changes_with_unknown_lamport(remote_changes)
318    }
319
320    /// lookup change by id.
321    ///
322    /// if id does not included in this oplog, return None
323    pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
324        self.change_store.get_change(id)
325    }
326
327    #[inline(always)]
328    pub(crate) fn export_from(&self, vv: &VersionVector) -> Vec<u8> {
329        encode_oplog(self, vv, EncodeMode::Auto)
330    }
331
332    #[inline(always)]
333    pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
334        self.change_store
335            .export_from(vv, f, self.vv(), self.frontiers())
336    }
337
338    #[inline(always)]
339    pub(crate) fn export_change_store_in_range(
340        &self,
341        vv: &VersionVector,
342        f: &Frontiers,
343        to_vv: &VersionVector,
344        to_frontiers: &Frontiers,
345    ) -> Bytes {
346        self.change_store.export_from(vv, f, to_vv, to_frontiers)
347    }
348
349    #[inline(always)]
350    pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
351        self.change_store
352            .export_blocks_from(vv, self.shallow_since_vv(), self.vv(), w)
353    }
354
355    #[inline(always)]
356    pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
357        self.change_store.export_blocks_in_range(spans, w)
358    }
359
360    pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
361        let vv = self.dag.frontiers_to_vv(frontiers)?;
362        Some(
363            self.change_store
364                .fork_changes_up_to(self.dag.shallow_since_vv(), frontiers, &vv),
365        )
366    }
367
368    #[inline(always)]
369    pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<ImportStatus, LoroError> {
370        decode_oplog(self, data)
371    }
372
373    /// iterates over all changes between LCA(common ancestors) to the merged version of (`from` and `to`) causally
374    ///
375    /// Tht iterator will include a version vector when the change is applied
376    ///
377    /// returns: (common_ancestor_vv, iterator)
378    ///
379    /// Note: the change returned by the iterator may include redundant ops at the beginning, you should trim it by yourself.
380    /// You can trim it by the provided counter value. It should start with the counter.
381    ///
382    /// If frontiers are provided, it will be faster (because we don't need to calculate it from version vector
383    #[allow(clippy::type_complexity)]
384    pub(crate) fn iter_from_lca_causally(
385        &self,
386        from: &VersionVector,
387        from_frontiers: &Frontiers,
388        to: &VersionVector,
389        to_frontiers: &Frontiers,
390    ) -> (
391        VersionVector,
392        DiffMode,
393        impl Iterator<
394                Item = (
395                    BlockChangeRef,
396                    (Counter, Counter),
397                    Rc<RefCell<VersionVector>>,
398                ),
399            > + '_,
400    ) {
401        let mut merged_vv = from.clone();
402        merged_vv.merge(to);
403        debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
404        let (common_ancestors, mut diff_mode) =
405            self.dag.find_common_ancestor(from_frontiers, to_frontiers);
406        if diff_mode == DiffMode::Checkout && to > from {
407            diff_mode = DiffMode::Import;
408        }
409
410        let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
411        // go from lca to merged_vv
412        let diff = common_ancestors_vv.diff(&merged_vv).forward;
413        let mut iter = self.dag.iter_causal(common_ancestors, diff);
414        let mut node = iter.next();
415        let mut cur_cnt = 0;
416        let vv = Rc::new(RefCell::new(VersionVector::default()));
417        (
418            common_ancestors_vv.clone(),
419            diff_mode,
420            std::iter::from_fn(move || {
421                if let Some(inner) = &node {
422                    let mut inner_vv = vv.borrow_mut();
423                    // FIXME: PERF: it looks slow for large vv, like 10000+ entries
424                    inner_vv.clear();
425                    self.dag.ensure_vv_for(&inner.data);
426                    inner_vv.extend_to_include_vv(inner.data.vv.get().unwrap().iter());
427                    let peer = inner.data.peer;
428                    let cnt = inner
429                        .data
430                        .cnt
431                        .max(cur_cnt)
432                        .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
433                    let dag_node_end = (inner.data.cnt + inner.data.len as Counter)
434                        .min(merged_vv.get(&peer).copied().unwrap_or(0));
435                    trace!("vv: {:?}", self.dag.vv());
436                    let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
437
438                    if change.ctr_end() < dag_node_end {
439                        cur_cnt = change.ctr_end();
440                    } else {
441                        node = iter.next();
442                        cur_cnt = 0;
443                    }
444
445                    inner_vv.extend_to_include_end_id(change.id);
446
447                    Some((change, (cnt, dag_node_end), vv.clone()))
448                } else {
449                    None
450                }
451            }),
452        )
453    }
454
455    pub fn len_changes(&self) -> usize {
456        self.change_store.change_num()
457    }
458
459    pub fn diagnose_size(&self) -> SizeInfo {
460        let mut total_changes = 0;
461        let mut total_ops = 0;
462        let mut total_atom_ops = 0;
463        let total_dag_node = self.dag.total_parsed_dag_node();
464        self.change_store.visit_all_changes(&mut |change| {
465            total_changes += 1;
466            total_ops += change.ops.len();
467            total_atom_ops += change.atom_len();
468        });
469
470        println!("total changes: {}", total_changes);
471        println!("total ops: {}", total_ops);
472        println!("total atom ops: {}", total_atom_ops);
473        println!("total dag node: {}", total_dag_node);
474        SizeInfo {
475            total_changes,
476            total_ops,
477            total_atom_ops,
478            total_dag_node,
479        }
480    }
481
482    pub(crate) fn iter_changes_peer_by_peer<'a>(
483        &'a self,
484        from: &VersionVector,
485        to: &VersionVector,
486    ) -> impl Iterator<Item = BlockChangeRef> + 'a {
487        let spans: Vec<_> = from.diff_iter(to).1.collect();
488        spans
489            .into_iter()
490            .flat_map(move |span| self.change_store.iter_changes(span))
491    }
492
493    pub(crate) fn iter_changes_causally_rev<'a>(
494        &'a self,
495        from: &VersionVector,
496        to: &VersionVector,
497    ) -> impl Iterator<Item = BlockChangeRef> + 'a {
498        MergedChangeIter::new_change_iter_rev(self, from, to)
499    }
500
501    pub fn get_timestamp_for_next_txn(&self) -> Timestamp {
502        if self.configure.record_timestamp() {
503            (get_sys_timestamp() as Timestamp + 500) / 1000
504        } else {
505            0
506        }
507    }
508
509    #[inline(never)]
510    pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
511        let change = self.change_store.get_change_by_lamport_lte(id)?;
512
513        if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
514            return None;
515        }
516
517        Some(ID::new(
518            change.id.peer,
519            (id.lamport - change.lamport) as Counter + change.id.counter,
520        ))
521    }
522
523    #[allow(unused)]
524    pub(crate) fn id_to_idlp(&self, id_start: ID) -> IdLp {
525        let change = self.get_change_at(id_start).unwrap();
526        let lamport = change.lamport + (id_start.counter - change.id.counter) as Lamport;
527        let peer = id_start.peer;
528        loro_common::IdLp { peer, lamport }
529    }
530
531    /// NOTE: This may return a op that includes the given id, not necessarily start with the given id
532    pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
533        let change = self.get_change_at(id)?;
534        change.get_op_with_counter(id.counter)
535    }
536
537    pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {
538        let peer = id_span.peer;
539        let mut counter = id_span.counter.min();
540        let span_end = id_span.counter.norm_end();
541        let mut ans = Vec::new();
542
543        while counter < span_end {
544            let id = ID::new(peer, counter);
545            let node = self.dag.get(id).unwrap();
546
547            let f = if node.cnt == counter {
548                node.deps.clone()
549            } else if counter > 0 {
550                id.inc(-1).into()
551            } else {
552                unreachable!()
553            };
554
555            let cur_end = node.cnt + node.len as Counter;
556            let len = cur_end.min(span_end) - counter;
557            ans.push((id.to_span(len as usize), f));
558            counter += len;
559        }
560
561        ans
562    }
563
564    #[inline]
565    pub fn compact_change_store(&mut self) {
566        self.change_store
567            .flush_and_compact(self.dag.vv(), self.dag.frontiers());
568    }
569
570    #[inline]
571    pub fn change_store_kv_size(&self) -> usize {
572        self.change_store.kv_size()
573    }
574
575    pub fn encode_change_store(&self) -> bytes::Bytes {
576        self.change_store
577            .encode_all(self.dag.vv(), self.dag.frontiers())
578    }
579
580    pub fn check_dag_correctness(&self) {
581        self.dag.check_dag_correctness();
582    }
583
584    pub fn shallow_since_vv(&self) -> &ImVersionVector {
585        self.dag.shallow_since_vv()
586    }
587
588    pub fn shallow_since_frontiers(&self) -> &Frontiers {
589        self.dag.shallow_since_frontiers()
590    }
591
592    pub fn is_shallow(&self) -> bool {
593        !self.dag.shallow_since_vv().is_empty()
594    }
595
596    pub fn get_greatest_timestamp(&self, frontiers: &Frontiers) -> Timestamp {
597        let mut max_timestamp = Timestamp::default();
598        for id in frontiers.iter() {
599            let change = self.get_change_at(id).unwrap();
600            if change.timestamp > max_timestamp {
601                max_timestamp = change.timestamp;
602            }
603        }
604
605        max_timestamp
606    }
607}
608
609#[derive(Debug)]
610pub struct SizeInfo {
611    pub total_changes: usize,
612    pub total_ops: usize,
613    pub total_atom_ops: usize,
614    pub total_dag_node: usize,
615}
616
617pub(crate) fn convert_change_to_remote(
618    arena: &SharedArena,
619    change: &Change,
620) -> Change<RemoteOp<'static>> {
621    let mut ops = RleVec::new();
622    for op in change.ops.iter() {
623        for op in local_op_to_remote(arena, op) {
624            ops.push(op);
625        }
626    }
627
628    Change {
629        ops,
630        id: change.id,
631        deps: change.deps.clone(),
632        lamport: change.lamport,
633        timestamp: change.timestamp,
634        commit_msg: change.commit_msg.clone(),
635    }
636}
637
638pub(crate) fn local_op_to_remote(
639    arena: &SharedArena,
640    op: &crate::op::Op,
641) -> SmallVec<[RemoteOp<'static>; 1]> {
642    let container = arena.get_container_id(op.container).unwrap();
643    let mut contents: SmallVec<[_; 1]> = SmallVec::new();
644    match &op.content {
645        crate::op::InnerContent::List(list) => match list {
646            list_op::InnerListOp::Insert { slice, pos } => match container.container_type() {
647                loro_common::ContainerType::Text => {
648                    let str = arena
649                        .slice_str_by_unicode_range(slice.0.start as usize..slice.0.end as usize);
650                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
651                        slice: ListSlice::RawStr {
652                            unicode_len: str.chars().count(),
653                            str: Cow::Owned(str),
654                        },
655                        pos: *pos,
656                    }));
657                }
658                loro_common::ContainerType::List | loro_common::ContainerType::MovableList => {
659                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
660                        slice: ListSlice::RawData(Cow::Owned(
661                            arena.get_values(slice.0.start as usize..slice.0.end as usize),
662                        )),
663                        pos: *pos,
664                    }))
665                }
666                _ => unreachable!(),
667            },
668            list_op::InnerListOp::InsertText {
669                slice,
670                unicode_len: len,
671                unicode_start: _,
672                pos,
673            } => match container.container_type() {
674                loro_common::ContainerType::Text => {
675                    contents.push(RawOpContent::List(list_op::ListOp::Insert {
676                        slice: ListSlice::RawStr {
677                            unicode_len: *len as usize,
678                            str: Cow::Owned(std::str::from_utf8(slice).unwrap().to_owned()),
679                        },
680                        pos: *pos as usize,
681                    }));
682                }
683                _ => unreachable!(),
684            },
685            list_op::InnerListOp::Delete(del) => {
686                contents.push(RawOpContent::List(list_op::ListOp::Delete(*del)))
687            }
688            list_op::InnerListOp::StyleStart {
689                start,
690                end,
691                key,
692                value,
693                info,
694            } => contents.push(RawOpContent::List(list_op::ListOp::StyleStart {
695                start: *start,
696                end: *end,
697                key: key.clone(),
698                value: value.clone(),
699                info: *info,
700            })),
701            list_op::InnerListOp::StyleEnd => {
702                contents.push(RawOpContent::List(list_op::ListOp::StyleEnd))
703            }
704            list_op::InnerListOp::Move {
705                from,
706                elem_id: from_id,
707                to,
708            } => contents.push(RawOpContent::List(list_op::ListOp::Move {
709                from: *from,
710                elem_id: *from_id,
711                to: *to,
712            })),
713            list_op::InnerListOp::Set { elem_id, value } => {
714                contents.push(RawOpContent::List(list_op::ListOp::Set {
715                    elem_id: *elem_id,
716                    value: value.clone(),
717                }))
718            }
719        },
720        crate::op::InnerContent::Map(map) => {
721            let value = map.value.clone();
722            contents.push(RawOpContent::Map(crate::container::map::MapSet {
723                key: map.key.clone(),
724                value,
725            }))
726        }
727        crate::op::InnerContent::Tree(tree) => contents.push(RawOpContent::Tree(tree.clone())),
728        crate::op::InnerContent::Future(f) => match f {
729            #[cfg(feature = "counter")]
730            crate::op::FutureInnerContent::Counter(c) => contents.push(RawOpContent::Counter(*c)),
731            FutureInnerContent::Unknown { prop, value } => {
732                contents.push(crate::op::RawOpContent::Unknown {
733                    prop: *prop,
734                    value: (**value).clone(),
735                })
736            }
737        },
738    };
739
740    let mut ans = SmallVec::with_capacity(contents.len());
741    for content in contents {
742        ans.push(RemoteOp {
743            container: container.clone(),
744            content,
745            counter: op.counter,
746        })
747    }
748    ans
749}