loro_internal/
txn.rs

1use core::panic;
2use std::{
3    borrow::Cow,
4    mem::take,
5    sync::{Arc, Mutex, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
10use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
11use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
12use rle::{HasLength, Mergable, RleVec};
13use smallvec::{smallvec, SmallVec};
14
15use crate::{
16    change::{Change, Lamport, Timestamp},
17    container::{
18        idx::ContainerIdx,
19        list::list_op::{DeleteSpan, InnerListOp},
20        richtext::Style,
21        IntoContainerId,
22    },
23    delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
24    encoding::export_fast_updates_in_range,
25    event::{Diff, ListDeltaMeta, TextDiff},
26    handler::{Handler, ValueOrHandler},
27    id::{Counter, PeerID, ID},
28    loro::CommitOptions,
29    op::{Op, RawOp, RawOpContent},
30    span::HasIdSpan,
31    version::Frontiers,
32    InternalString, LoroDocInner, LoroError, LoroValue,
33};
34
35use super::{
36    arena::SharedArena,
37    event::{InternalContainerDiff, InternalDocDiff},
38    handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
39    oplog::OpLog,
40    state::DocState,
41};
42
43impl crate::LoroDoc {
44    /// Create a new transaction.
45    /// Every ops created inside one transaction will be packed into a single
46    /// [Change].
47    ///
48    /// There can only be one active transaction at a time for a [LoroDoc].
49    #[inline(always)]
50    pub fn txn(&self) -> Result<Transaction, LoroError> {
51        self.txn_with_origin("")
52    }
53
54    /// Create a new transaction with specified origin.
55    ///
56    /// The origin will be propagated to the events.
57    /// There can only be one active transaction at a time for a [LoroDoc].
58    pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
59        if !self.can_edit() {
60            return Err(LoroError::TransactionError(
61                String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
62            ));
63        }
64
65        let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
66
67        let obs = self.observer.clone();
68        let local_update_subs_weak = self.local_update_subs.downgrade();
69        txn.set_on_commit(Box::new(move |state, oplog, id_span| {
70            let mut state = state.try_lock().unwrap();
71            let events = state.take_events();
72            drop(state);
73            for event in events {
74                obs.emit(event);
75            }
76
77            if id_span.atom_len() == 0 {
78                return;
79            }
80
81            if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
82                if !local_update_subs.inner().is_empty() {
83                    let bytes =
84                        { export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
85                    local_update_subs.emit(&(), bytes);
86                }
87            }
88        }));
89
90        Ok(txn)
91    }
92
93    #[inline(always)]
94    pub fn with_txn<F, R>(&self, f: F) -> LoroResult<R>
95    where
96        F: FnOnce(&mut Transaction) -> LoroResult<R>,
97    {
98        let mut txn = self.txn().unwrap();
99        let v = f(&mut txn)?;
100        txn.commit()?;
101        Ok(v)
102    }
103
104    pub fn start_auto_commit(&self) {
105        self.auto_commit
106            .store(true, std::sync::atomic::Ordering::Release);
107        let mut self_txn = self.txn.try_lock().unwrap();
108        if self_txn.is_some() || !self.can_edit() {
109            return;
110        }
111
112        let txn = self.txn().unwrap();
113        self_txn.replace(txn);
114    }
115
116    #[inline]
117    pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
118        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
119            let mut self_txn = self.txn.try_lock().unwrap();
120            if self_txn.is_some() {
121                return;
122            }
123
124            let mut txn = self.txn().unwrap();
125            if let Some(options) = options {
126                txn.set_options(options);
127            }
128            self_txn.replace(txn);
129        }
130    }
131}
132
133pub(crate) type OnCommitFn =
134    Box<dyn FnOnce(&Arc<Mutex<DocState>>, &Arc<Mutex<OpLog>>, IdSpan) + Sync + Send>;
135
136pub struct Transaction {
137    peer: PeerID,
138    origin: InternalString,
139    start_counter: Counter,
140    next_counter: Counter,
141    start_lamport: Lamport,
142    next_lamport: Lamport,
143    doc: Weak<LoroDocInner>,
144    frontiers: Frontiers,
145    local_ops: RleVec<[Op; 1]>, // TODO: use a more efficient data structure
146    event_hints: Vec<EventHint>,
147    pub(super) arena: SharedArena,
148    finished: bool,
149    on_commit: Option<OnCommitFn>,
150    timestamp: Option<Timestamp>,
151    msg: Option<Arc<str>>,
152    latest_timestamp: Timestamp,
153}
154
155impl std::fmt::Debug for Transaction {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("Transaction")
158            .field("peer", &self.peer)
159            .field("origin", &self.origin)
160            .field("start_counter", &self.start_counter)
161            .field("next_counter", &self.next_counter)
162            .field("start_lamport", &self.start_lamport)
163            .field("next_lamport", &self.next_lamport)
164            .field("frontiers", &self.frontiers)
165            .field("local_ops", &self.local_ops)
166            .field("event_hints", &self.event_hints)
167            .field("arena", &self.arena)
168            .field("finished", &self.finished)
169            .field("on_commit", &self.on_commit.is_some())
170            .field("timestamp", &self.timestamp)
171            .finish()
172    }
173}
174
175/// We can infer local events directly from the local behavior. This enum is used to
176/// record them, so that we can avoid recalculate them when we commit the transaction.
177///
178/// For example, when we insert a text in wasm, users use the utf16 index to send the
179/// command. However, internally loro will convert it to unicode index. But the users
180/// still need events that are in utf16 index. To avoid the round trip, we record the
181/// events here.
182#[derive(Debug, Clone, EnumAsInner)]
183pub(super) enum EventHint {
184    Mark {
185        start: u32,
186        end: u32,
187        style: Style,
188    },
189    InsertText {
190        /// pos is a Unicode index. If wasm, it's a UTF-16 index.
191        pos: u32,
192        event_len: u32,
193        unicode_len: u32,
194        styles: StyleMeta,
195    },
196    /// pos is a Unicode index. If wasm, it's a UTF-16 index.
197    DeleteText {
198        span: DeleteSpan,
199        unicode_len: usize,
200    },
201    InsertList {
202        len: u32,
203        pos: usize,
204    },
205    SetList {
206        index: usize,
207        value: LoroValue,
208    },
209    Move {
210        value: LoroValue,
211        from: u32,
212        to: u32,
213    },
214    DeleteList(DeleteSpan),
215    Map {
216        key: InternalString,
217        value: Option<LoroValue>,
218    },
219    // use vec because we could bring back some node that has children
220    Tree(SmallVec<[TreeDiffItem; 1]>),
221    MarkEnd,
222    #[cfg(feature = "counter")]
223    Counter(f64),
224}
225
226impl generic_btree::rle::HasLength for EventHint {
227    fn rle_len(&self) -> usize {
228        match self {
229            EventHint::Mark { .. } => 1,
230            EventHint::InsertText {
231                unicode_len: len, ..
232            } => *len as usize,
233            EventHint::DeleteText { unicode_len, .. } => *unicode_len,
234            EventHint::InsertList { len, .. } => *len as usize,
235            EventHint::DeleteList(d) => d.len(),
236            EventHint::Map { .. } => 1,
237            EventHint::Tree(_) => 1,
238            EventHint::MarkEnd => 1,
239            EventHint::Move { .. } => 1,
240            EventHint::SetList { .. } => 1,
241            #[cfg(feature = "counter")]
242            EventHint::Counter(_) => 1,
243        }
244    }
245}
246
247impl generic_btree::rle::Mergeable for EventHint {
248    fn can_merge(&self, rhs: &Self) -> bool {
249        match (self, rhs) {
250            (
251                EventHint::InsertText {
252                    pos,
253                    unicode_len: _,
254                    event_len,
255                    styles,
256                },
257                EventHint::InsertText {
258                    pos: r_pos,
259                    styles: r_styles,
260                    ..
261                },
262            ) => *pos + *event_len == *r_pos && styles == r_styles,
263            (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
264                pos + *len as usize == *pos_right
265            }
266            // We don't merge delete text because it's hard to infer the correct pos to split:
267            // `range` param is in unicode range, but the delete text event is in UTF-16 range.
268            // Without the original text, it's impossible to convert the range.
269            (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
270                span.is_mergable(r, &())
271            }
272            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
273            _ => false,
274        }
275    }
276
277    fn merge_right(&mut self, rhs: &Self) {
278        match (self, rhs) {
279            (
280                EventHint::InsertText {
281                    event_len,
282                    unicode_len: len,
283                    ..
284                },
285                EventHint::InsertText {
286                    event_len: r_event_len,
287                    unicode_len: r_len,
288                    ..
289                },
290            ) => {
291                *len += *r_len;
292                *event_len += *r_event_len;
293            }
294            (
295                EventHint::InsertList { len, pos: _ },
296                EventHint::InsertList { len: r_len, pos: _ },
297            ) => *len += *r_len,
298            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
299            (
300                EventHint::DeleteText { span, unicode_len },
301                EventHint::DeleteText {
302                    span: r_span,
303                    unicode_len: r_len,
304                },
305            ) => {
306                *unicode_len += *r_len;
307                span.merge(r_span, &());
308            }
309            _ => unreachable!(),
310        }
311    }
312
313    fn merge_left(&mut self, _: &Self) {
314        unreachable!()
315    }
316}
317
318impl Transaction {
319    #[inline]
320    pub fn new(doc: Arc<LoroDocInner>) -> Self {
321        Self::new_with_origin(doc.clone(), "".into())
322    }
323
324    pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
325        let mut state_lock = doc.state.try_lock().unwrap();
326        if state_lock.is_in_txn() {
327            panic!("Cannot start a transaction while another one is in progress");
328        }
329
330        let oplog_lock = doc.oplog.try_lock().unwrap();
331        state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
332        let arena = state_lock.arena.clone();
333        let frontiers = state_lock.frontiers.clone();
334        let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
335        let next_counter = oplog_lock.next_id(peer).counter;
336        let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
337        let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
338        oplog_lock
339            .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
340            .unwrap();
341        drop(state_lock);
342        drop(oplog_lock);
343        Self {
344            peer,
345            doc: Arc::downgrade(&doc),
346            arena,
347            frontiers,
348            timestamp: None,
349            next_counter,
350            next_lamport,
351            origin: Default::default(),
352            start_counter: next_counter,
353            start_lamport: next_lamport,
354            event_hints: Default::default(),
355            local_ops: RleVec::new(),
356            finished: false,
357            on_commit: None,
358            msg: None,
359            latest_timestamp,
360        }
361    }
362
363    pub fn set_origin(&mut self, origin: InternalString) {
364        self.origin = origin;
365    }
366
367    pub fn set_timestamp(&mut self, time: Timestamp) {
368        self.timestamp = Some(time);
369    }
370
371    pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
372        self.msg = msg;
373    }
374
375    pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
376        &self.local_ops
377    }
378
379    pub fn peer(&self) -> &PeerID {
380        &self.peer
381    }
382
383    pub fn timestamp(&self) -> &Option<Timestamp> {
384        &self.timestamp
385    }
386
387    pub fn frontiers(&self) -> &Frontiers {
388        &self.frontiers
389    }
390    pub fn msg(&self) -> &Option<Arc<str>> {
391        &self.msg
392    }
393
394    pub fn lamport(&self) -> &Lamport {
395        &self.start_lamport
396    }
397
398    pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
399        self.on_commit = Some(f);
400    }
401
402    pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
403        self.on_commit.take()
404    }
405
406    pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
407        self._commit()
408    }
409
410    #[tracing::instrument(level = "debug", skip(self))]
411    fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
412        if self.finished {
413            return Ok(None);
414        }
415
416        let Some(doc) = self.doc.upgrade() else {
417            return Ok(None);
418        };
419        self.finished = true;
420        let mut state = doc.state.try_lock().unwrap();
421        if self.local_ops.is_empty() {
422            state.abort_txn();
423            return Ok(Some(self.take_options()));
424        }
425
426        let ops = std::mem::take(&mut self.local_ops);
427        let mut oplog = doc.oplog.try_lock().unwrap();
428        let deps = take(&mut self.frontiers);
429        let change = Change {
430            lamport: self.start_lamport,
431            ops,
432            deps,
433            id: ID::new(self.peer, self.start_counter),
434            timestamp: self.latest_timestamp.max(
435                self.timestamp
436                    .unwrap_or_else(|| oplog.get_timestamp_for_next_txn()),
437            ),
438            commit_msg: take(&mut self.msg),
439        };
440
441        let diff = if state.is_recording() {
442            Some(change_to_diff(
443                &change,
444                doc.clone(),
445                std::mem::take(&mut self.event_hints),
446            ))
447        } else {
448            None
449        };
450
451        let last_id = change.id_last();
452        if let Err(err) = oplog.import_local_change(change) {
453            state.abort_txn();
454            drop(state);
455            drop(oplog);
456            return Err(err);
457        }
458
459        state.commit_txn(
460            Frontiers::from_id(last_id),
461            diff.map(|arr| InternalDocDiff {
462                by: crate::event::EventTriggerKind::Local,
463                origin: self.origin.clone(),
464                diff: Cow::Owned(
465                    arr.into_iter()
466                        .map(|x| InternalContainerDiff {
467                            idx: x.idx,
468                            bring_back: false,
469                            is_container_deleted: false,
470                            diff: (x.diff.into()),
471                            diff_mode: crate::diff_calc::DiffMode::Linear,
472                        })
473                        .collect(),
474                ),
475                new_version: Cow::Borrowed(oplog.frontiers()),
476            }),
477        );
478        drop(state);
479        drop(oplog);
480        if let Some(on_commit) = self.on_commit.take() {
481            on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
482        }
483        Ok(None)
484    }
485
486    fn take_options(&self) -> CommitOptions {
487        let mut options = CommitOptions::new();
488        if !self.origin.is_empty() {
489            options = options.origin(self.origin.as_str());
490        }
491        if let Some(msg) = self.msg.as_ref() {
492            options = options.commit_msg(msg);
493        }
494        if let Some(timestamp) = self.timestamp {
495            options = options.timestamp(timestamp);
496        }
497        options
498    }
499
500    pub(super) fn apply_local_op(
501        &mut self,
502        container: ContainerIdx,
503        content: RawOpContent,
504        event: EventHint,
505        // check whether context and txn are referring to the same state context
506        doc: &Arc<LoroDocInner>,
507    ) -> LoroResult<()> {
508        // TODO: need to check if the doc is the same
509        let this_doc = self.doc.upgrade().unwrap();
510        if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
511            return Err(LoroError::UnmatchedContext {
512                expected: this_doc
513                    .state
514                    .try_lock()
515                    .unwrap()
516                    .peer
517                    .load(std::sync::atomic::Ordering::Relaxed),
518                found: doc
519                    .state
520                    .try_lock()
521                    .unwrap()
522                    .peer
523                    .load(std::sync::atomic::Ordering::Relaxed),
524            });
525        }
526
527        let len = content.content_len();
528        assert!(len > 0);
529        let raw_op = RawOp {
530            id: ID {
531                peer: self.peer,
532                counter: self.next_counter,
533            },
534            lamport: self.next_lamport,
535            container,
536            content,
537        };
538
539        let mut state = doc.state.try_lock().unwrap();
540        if state.is_deleted(container) {
541            return Err(LoroError::ContainerDeleted {
542                container: Box::new(state.arena.idx_to_id(container).unwrap()),
543            });
544        }
545
546        let op = self.arena.convert_raw_op(&raw_op);
547        state.apply_local_op(&raw_op, &op)?;
548        {
549            // update version info
550            let mut oplog = doc.oplog.try_lock().unwrap();
551            let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
552            let start_id = ID::new(self.peer, self.next_counter);
553            self.next_counter += len as Counter;
554            oplog.dag.update_version_on_new_local_op(
555                if self.local_ops.is_empty() {
556                    &self.frontiers
557                } else {
558                    &dep_id
559                },
560                start_id,
561                self.next_lamport,
562                len,
563            );
564            self.next_lamport += len as Lamport;
565            // set frontiers to the last op id
566            let last_id = start_id.inc(len as Counter - 1);
567            state.frontiers = Frontiers::from_id(last_id);
568        };
569        drop(state);
570        debug_assert_eq!(
571            event.rle_len(),
572            op.atom_len(),
573            "event:{:#?} \nop:{:#?}",
574            &event,
575            &op
576        );
577
578        match self.event_hints.last_mut() {
579            Some(last) if last.can_merge(&event) => {
580                last.merge_right(&event);
581            }
582            _ => {
583                self.event_hints.push(event);
584            }
585        }
586        self.local_ops.push(op);
587        Ok(())
588    }
589
590    /// id can be a str, ContainerID, or ContainerIdRaw.
591    /// if it's str it will use Root container, which will not be None
592    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
593        let id = id.into_container_id(&self.arena, ContainerType::Text);
594        Handler::new_attached(id, self.doc.upgrade().unwrap())
595            .into_text()
596            .unwrap()
597    }
598
599    /// id can be a str, ContainerID, or ContainerIdRaw.
600    /// if it's str it will use Root container, which will not be None
601    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
602        let id = id.into_container_id(&self.arena, ContainerType::List);
603        Handler::new_attached(id, self.doc.upgrade().unwrap())
604            .into_list()
605            .unwrap()
606    }
607
608    /// id can be a str, ContainerID, or ContainerIdRaw.
609    /// if it's str it will use Root container, which will not be None
610    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
611        let id = id.into_container_id(&self.arena, ContainerType::Map);
612        Handler::new_attached(id, self.doc.upgrade().unwrap())
613            .into_map()
614            .unwrap()
615    }
616
617    /// id can be a str, ContainerID, or ContainerIdRaw.
618    /// if it's str it will use Root container, which will not be None
619    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
620        let id = id.into_container_id(&self.arena, ContainerType::Tree);
621        Handler::new_attached(id, self.doc.upgrade().unwrap())
622            .into_tree()
623            .unwrap()
624    }
625    pub fn next_id(&self) -> ID {
626        ID {
627            peer: self.peer,
628            counter: self.next_counter,
629        }
630    }
631
632    #[inline]
633    pub fn id_span(&self) -> IdSpan {
634        IdSpan::new(self.peer, self.start_counter, self.next_counter)
635    }
636
637    pub fn next_idlp(&self) -> IdLp {
638        IdLp {
639            peer: self.peer,
640            lamport: self.next_lamport,
641        }
642    }
643
644    pub fn is_empty(&self) -> bool {
645        self.local_ops.is_empty()
646    }
647
648    pub(crate) fn len(&self) -> usize {
649        (self.next_counter - self.start_counter) as usize
650    }
651
652    pub(crate) fn set_options(&mut self, options: CommitOptions) {
653        self.origin = options.origin.unwrap_or_default();
654        self.msg = options.commit_msg;
655        self.timestamp = options.timestamp;
656    }
657
658    pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
659        if self.origin.is_empty() {
660            self.origin = default_options.origin.unwrap_or_default();
661        }
662        if self.msg.is_none() {
663            self.msg = default_options.commit_msg;
664        }
665        if self.timestamp.is_none() {
666            self.timestamp = default_options.timestamp;
667        }
668    }
669}
670
671impl Drop for Transaction {
672    #[tracing::instrument(level = "debug", skip(self))]
673    fn drop(&mut self) {
674        if !self.finished {
675            // TODO: should we abort here or commit here?
676            // what if commit fails?
677            self._commit().unwrap();
678        }
679    }
680}
681
682#[derive(Debug, Clone)]
683pub(crate) struct TxnContainerDiff {
684    pub(crate) idx: ContainerIdx,
685    pub(crate) diff: Diff,
686}
687
688// PERF: could be compacter
689fn change_to_diff(
690    change: &Change,
691    doc: Arc<LoroDocInner>,
692    event_hints: Vec<EventHint>,
693) -> Vec<TxnContainerDiff> {
694    let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
695    let peer = change.id.peer;
696    let mut lamport = change.lamport;
697    let mut event_hint_iter = event_hints.into_iter();
698    let mut o_hint = event_hint_iter.next();
699    let mut op_iter = change.ops.iter();
700    while let Some(op) = op_iter.next() {
701        let Some(hint) = o_hint.as_mut() else {
702            unreachable!()
703        };
704
705        let mut ops: SmallVec<[&Op; 1]> = smallvec![op];
706        let hint = match op.atom_len().cmp(&hint.rle_len()) {
707            std::cmp::Ordering::Less => {
708                let mut len = op.atom_len();
709                while len < hint.rle_len() {
710                    let next = op_iter.next().unwrap();
711                    len += next.atom_len();
712                    ops.push(next);
713                }
714                assert!(len == hint.rle_len());
715                match event_hint_iter.next() {
716                    Some(n) => o_hint.replace(n).unwrap(),
717                    None => o_hint.take().unwrap(),
718                }
719            }
720            std::cmp::Ordering::Equal => match event_hint_iter.next() {
721                Some(n) => o_hint.replace(n).unwrap(),
722                None => o_hint.take().unwrap(),
723            },
724            std::cmp::Ordering::Greater => {
725                unreachable!("{:#?}", &op)
726            }
727        };
728
729        match &hint {
730            EventHint::InsertText { .. }
731            | EventHint::InsertList { .. }
732            | EventHint::DeleteText { .. }
733            | EventHint::DeleteList(_) => {}
734            _ => {
735                assert_eq!(ops.len(), 1);
736            }
737        }
738        match hint {
739            EventHint::Mark { start, end, style } => {
740                let mut meta = StyleMeta::default();
741                meta.insert(
742                    style.key.clone(),
743                    StyleMetaItem {
744                        lamport,
745                        peer: change.id.peer,
746                        value: style.data,
747                    },
748                );
749                let diff = DeltaRopeBuilder::new()
750                    .retain(start as usize, Default::default())
751                    .retain(
752                        (end - start) as usize,
753                        meta.to_option_map().unwrap_or_default().into(),
754                    )
755                    .build();
756                ans.push(TxnContainerDiff {
757                    idx: op.container,
758                    diff: Diff::Text(diff),
759                });
760            }
761            EventHint::InsertText { styles, pos, .. } => {
762                let mut delta: TextDiff = DeltaRopeBuilder::new()
763                    .retain(pos as usize, Default::default())
764                    .build();
765                for op in ops.iter() {
766                    let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
767                    else {
768                        unreachable!()
769                    };
770
771                    delta.push_insert(
772                        slice.clone().into(),
773                        styles.to_option_map().unwrap_or_default().into(),
774                    );
775                }
776                ans.push(TxnContainerDiff {
777                    idx: op.container,
778                    diff: Diff::Text(delta),
779                })
780            }
781            EventHint::DeleteText {
782                span,
783                unicode_len: _,
784                // we don't need to iter over ops here, because we already
785                // know what the events should be
786            } => ans.push(TxnContainerDiff {
787                idx: op.container,
788                diff: Diff::Text(
789                    DeltaRopeBuilder::new()
790                        .retain(span.start() as usize, Default::default())
791                        .delete(span.len())
792                        .build(),
793                ),
794            }),
795            EventHint::InsertList { pos, .. } => {
796                // We should use pos from event hint because index in op may
797                // be using op index for the MovableList
798                for op in ops.iter() {
799                    let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
800                    let values = doc
801                        .arena
802                        .get_values(range.to_range())
803                        .into_iter()
804                        .map(|v| ValueOrHandler::from_value(v, &doc));
805                    ans.push(TxnContainerDiff {
806                        idx: op.container,
807                        diff: Diff::List(
808                            DeltaRopeBuilder::new()
809                                .retain(pos, Default::default())
810                                .insert_many(values, Default::default())
811                                .build(),
812                        ),
813                    })
814                }
815            }
816            EventHint::DeleteList(s) => {
817                ans.push(TxnContainerDiff {
818                    idx: op.container,
819                    diff: Diff::List(
820                        DeltaRopeBuilder::new()
821                            .retain(s.start() as usize, Default::default())
822                            .delete(s.len())
823                            .build(),
824                    ),
825                });
826            }
827            EventHint::Map { key, value } => ans.push(TxnContainerDiff {
828                idx: op.container,
829                diff: Diff::Map(ResolvedMapDelta::new().with_entry(
830                    key,
831                    ResolvedMapValue {
832                        value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
833                        idlp: IdLp::new(peer, lamport),
834                    },
835                )),
836            }),
837            EventHint::Tree(tree_diff) => {
838                let mut diff = TreeDiff::default();
839                diff.diff.extend(tree_diff.into_iter());
840                ans.push(TxnContainerDiff {
841                    idx: op.container,
842                    diff: Diff::Tree(diff),
843                });
844            }
845            EventHint::Move { from, to, value } => {
846                let mut a = DeltaRopeBuilder::new()
847                    .retain(from as usize, Default::default())
848                    .delete(1)
849                    .build();
850                a.compose(
851                    &DeltaRopeBuilder::new()
852                        .retain(to as usize, Default::default())
853                        .insert(
854                            ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
855                            ListDeltaMeta { from_move: true },
856                        )
857                        .build(),
858                );
859                ans.push(TxnContainerDiff {
860                    idx: op.container,
861                    diff: Diff::List(a),
862                });
863            }
864            EventHint::SetList { index, value } => {
865                ans.push(TxnContainerDiff {
866                    idx: op.container,
867                    diff: Diff::List(
868                        DeltaRopeBuilder::new()
869                            .retain(index, Default::default())
870                            .delete(1)
871                            .insert(
872                                ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
873                                Default::default(),
874                            )
875                            .build(),
876                    ),
877                });
878            }
879            EventHint::MarkEnd => {
880                // do nothing
881            }
882            #[cfg(feature = "counter")]
883            EventHint::Counter(diff) => {
884                ans.push(TxnContainerDiff {
885                    idx: op.container,
886                    diff: Diff::Counter(diff),
887                });
888            }
889        }
890
891        lamport += ops
892            .iter()
893            .map(|x| x.content_len() as Lamport)
894            .sum::<Lamport>();
895    }
896    ans
897}