loro_internal/
txn.rs

1use core::panic;
2use std::{
3    borrow::Cow,
4    mem::take,
5    sync::{Arc, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use fxhash::FxHashMap;
10use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
11use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
12use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
13use rle::{HasLength, Mergable, RleVec, Sliceable};
14use smallvec::{smallvec, SmallVec};
15
16use crate::{
17    change::{Change, Lamport, Timestamp},
18    container::{
19        idx::ContainerIdx,
20        list::list_op::{DeleteSpan, InnerListOp},
21        richtext::Style,
22        IntoContainerId,
23    },
24    delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
25    encoding::export_fast_updates_in_range,
26    event::{Diff, ListDeltaMeta, TextDiff},
27    handler::{Handler, ValueOrHandler},
28    id::{Counter, PeerID, ID},
29    lock::LoroMutex,
30    loro::CommitOptions,
31    op::{Op, RawOp, RawOpContent},
32    pre_commit::{ChangeModifier, PreCommitCallbackPayload},
33    span::HasIdSpan,
34    version::Frontiers,
35    ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
36};
37
38use super::{
39    arena::SharedArena,
40    event::{InternalContainerDiff, InternalDocDiff},
41    handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
42    oplog::OpLog,
43    state::DocState,
44};
45
46impl crate::LoroDoc {
47    /// Create a new transaction.
48    /// Every ops created inside one transaction will be packed into a single
49    /// [Change].
50    ///
51    /// There can only be one active transaction at a time for a [LoroDoc].
52    #[inline(always)]
53    pub fn txn(&self) -> Result<Transaction, LoroError> {
54        self.txn_with_origin("")
55    }
56
57    /// Create a new transaction with specified origin.
58    ///
59    /// The origin will be propagated to the events.
60    /// There can only be one active transaction at a time for a [LoroDoc].
61    pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
62        if !self.can_edit() {
63            return Err(LoroError::TransactionError(
64                String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
65            ));
66        }
67
68        let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
69
70        let obs = self.observer.clone();
71        let local_update_subs_weak = self.local_update_subs.downgrade();
72        txn.set_on_commit(Box::new(move |state, oplog, id_span| {
73            let mut state = state.lock().unwrap();
74            let events = state.take_events();
75            drop(state);
76            for event in events {
77                obs.emit(event);
78            }
79
80            if id_span.atom_len() == 0 {
81                return;
82            }
83
84            if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
85                if !local_update_subs.inner().is_empty() {
86                    let bytes =
87                        { export_fast_updates_in_range(&oplog.lock().unwrap(), &[id_span]) };
88                    local_update_subs.emit(&(), bytes);
89                }
90            }
91        }));
92
93        Ok(txn)
94    }
95
96    pub fn start_auto_commit(&self) {
97        self.auto_commit
98            .store(true, std::sync::atomic::Ordering::Release);
99        let mut self_txn = self.txn.lock().unwrap();
100        if self_txn.is_some() || !self.can_edit() {
101            return;
102        }
103
104        let txn = self.txn().unwrap();
105        self_txn.replace(txn);
106    }
107
108    #[inline]
109    pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
110        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
111            let mut self_txn = self.txn.lock().unwrap();
112            if self_txn.is_some() {
113                return;
114            }
115
116            let mut txn = self.txn().unwrap();
117            if let Some(options) = options {
118                txn.set_options(options);
119            }
120            self_txn.replace(txn);
121        }
122    }
123}
124
125pub(crate) type OnCommitFn =
126    Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
127
128pub struct Transaction {
129    peer: PeerID,
130    origin: InternalString,
131    start_counter: Counter,
132    next_counter: Counter,
133    start_lamport: Lamport,
134    next_lamport: Lamport,
135    doc: Weak<LoroDocInner>,
136    frontiers: Frontiers,
137    local_ops: RleVec<[Op; 1]>, // TODO: use a more efficient data structure
138    event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
139    pub(super) arena: SharedArena,
140    finished: bool,
141    on_commit: Option<OnCommitFn>,
142    timestamp: Option<Timestamp>,
143    msg: Option<Arc<str>>,
144    latest_timestamp: Timestamp,
145    pub(super) is_peer_first_appearance: bool,
146}
147
148impl std::fmt::Debug for Transaction {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("Transaction")
151            .field("peer", &self.peer)
152            .field("origin", &self.origin)
153            .field("start_counter", &self.start_counter)
154            .field("next_counter", &self.next_counter)
155            .field("start_lamport", &self.start_lamport)
156            .field("next_lamport", &self.next_lamport)
157            .field("frontiers", &self.frontiers)
158            .field("local_ops", &self.local_ops)
159            .field("event_hints", &self.event_hints)
160            .field("arena", &self.arena)
161            .field("finished", &self.finished)
162            .field("on_commit", &self.on_commit.is_some())
163            .field("timestamp", &self.timestamp)
164            .finish()
165    }
166}
167
168/// We can infer local events directly from the local behavior. This enum is used to
169/// record them, so that we can avoid recalculate them when we commit the transaction.
170///
171/// For example, when we insert a text in wasm, users use the utf16 index to send the
172/// command. However, internally loro will convert it to unicode index. But the users
173/// still need events that are in utf16 index. To avoid the round trip, we record the
174/// events here.
175#[derive(Debug, Clone, EnumAsInner)]
176pub(super) enum EventHint {
177    Mark {
178        start: u32,
179        end: u32,
180        style: Style,
181    },
182    InsertText {
183        /// pos is a Unicode index. If wasm, it's a UTF-16 index.
184        pos: u32,
185        event_len: u32,
186        unicode_len: u32,
187        styles: StyleMeta,
188    },
189    /// pos is a Unicode index. If wasm, it's a UTF-16 index.
190    DeleteText {
191        span: DeleteSpan,
192        unicode_len: usize,
193    },
194    InsertList {
195        len: u32,
196        pos: usize,
197    },
198    SetList {
199        index: usize,
200        value: LoroValue,
201    },
202    Move {
203        value: LoroValue,
204        from: u32,
205        to: u32,
206    },
207    DeleteList(DeleteSpan),
208    Map {
209        key: InternalString,
210        value: Option<LoroValue>,
211    },
212    // use vec because we could bring back some node that has children
213    Tree(SmallVec<[TreeDiffItem; 1]>),
214    MarkEnd,
215    #[cfg(feature = "counter")]
216    Counter(f64),
217}
218
219impl generic_btree::rle::HasLength for EventHint {
220    fn rle_len(&self) -> usize {
221        match self {
222            EventHint::Mark { .. } => 1,
223            EventHint::InsertText {
224                unicode_len: len, ..
225            } => *len as usize,
226            EventHint::DeleteText { unicode_len, .. } => *unicode_len,
227            EventHint::InsertList { len, .. } => *len as usize,
228            EventHint::DeleteList(d) => d.len(),
229            EventHint::Map { .. } => 1,
230            EventHint::Tree(_) => 1,
231            EventHint::MarkEnd => 1,
232            EventHint::Move { .. } => 1,
233            EventHint::SetList { .. } => 1,
234            #[cfg(feature = "counter")]
235            EventHint::Counter(_) => 1,
236        }
237    }
238}
239
240impl generic_btree::rle::Mergeable for EventHint {
241    fn can_merge(&self, rhs: &Self) -> bool {
242        match (self, rhs) {
243            (
244                EventHint::InsertText {
245                    pos,
246                    unicode_len: _,
247                    event_len,
248                    styles,
249                },
250                EventHint::InsertText {
251                    pos: r_pos,
252                    styles: r_styles,
253                    ..
254                },
255            ) => *pos + *event_len == *r_pos && styles == r_styles,
256            (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
257                pos + *len as usize == *pos_right
258            }
259            // We don't merge delete text because it's hard to infer the correct pos to split:
260            // `range` param is in unicode range, but the delete text event is in UTF-16 range.
261            // Without the original text, it's impossible to convert the range.
262            (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
263                span.is_mergable(r, &())
264            }
265            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
266            _ => false,
267        }
268    }
269
270    fn merge_right(&mut self, rhs: &Self) {
271        match (self, rhs) {
272            (
273                EventHint::InsertText {
274                    event_len,
275                    unicode_len: len,
276                    ..
277                },
278                EventHint::InsertText {
279                    event_len: r_event_len,
280                    unicode_len: r_len,
281                    ..
282                },
283            ) => {
284                *len += *r_len;
285                *event_len += *r_event_len;
286            }
287            (
288                EventHint::InsertList { len, pos: _ },
289                EventHint::InsertList { len: r_len, pos: _ },
290            ) => *len += *r_len,
291            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
292            (
293                EventHint::DeleteText { span, unicode_len },
294                EventHint::DeleteText {
295                    span: r_span,
296                    unicode_len: r_len,
297                },
298            ) => {
299                *unicode_len += *r_len;
300                span.merge(r_span, &());
301            }
302            _ => unreachable!(),
303        }
304    }
305
306    fn merge_left(&mut self, _: &Self) {
307        unreachable!()
308    }
309}
310
311impl Transaction {
312    #[inline]
313    pub fn new(doc: Arc<LoroDocInner>) -> Self {
314        Self::new_with_origin(doc.clone(), "".into())
315    }
316
317    pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
318        let oplog_lock = doc.oplog.lock().unwrap();
319        let mut state_lock = doc.state.lock().unwrap();
320        if state_lock.is_in_txn() {
321            panic!("Cannot start a transaction while another one is in progress");
322        }
323
324        state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
325        let arena = state_lock.arena.clone();
326        let frontiers = state_lock.frontiers.clone();
327        let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
328        let next_counter = oplog_lock.next_id(peer).counter;
329        let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
330        let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
331        oplog_lock
332            .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
333            .unwrap();
334        drop(state_lock);
335        drop(oplog_lock);
336        Self {
337            peer,
338            doc: Arc::downgrade(&doc),
339            arena,
340            frontiers,
341            timestamp: None,
342            next_counter,
343            next_lamport,
344            origin: Default::default(),
345            start_counter: next_counter,
346            start_lamport: next_lamport,
347            event_hints: Default::default(),
348            local_ops: RleVec::new(),
349            finished: false,
350            on_commit: None,
351            msg: None,
352            latest_timestamp,
353            is_peer_first_appearance: false,
354        }
355    }
356
357    pub fn set_origin(&mut self, origin: InternalString) {
358        self.origin = origin;
359    }
360
361    pub fn set_timestamp(&mut self, time: Timestamp) {
362        self.timestamp = Some(time);
363    }
364
365    pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
366        self.msg = msg;
367    }
368
369    pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
370        &self.local_ops
371    }
372
373    pub fn peer(&self) -> &PeerID {
374        &self.peer
375    }
376
377    pub fn timestamp(&self) -> &Option<Timestamp> {
378        &self.timestamp
379    }
380
381    pub fn frontiers(&self) -> &Frontiers {
382        &self.frontiers
383    }
384    pub fn msg(&self) -> &Option<Arc<str>> {
385        &self.msg
386    }
387
388    pub fn lamport(&self) -> &Lamport {
389        &self.start_lamport
390    }
391
392    pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
393        self.on_commit = Some(f);
394    }
395
396    pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
397        self.on_commit.take()
398    }
399
400    pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
401        self._commit()
402    }
403
404    #[tracing::instrument(level = "debug", skip(self))]
405    fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
406        if self.finished {
407            return Ok(None);
408        }
409
410        let Some(doc) = self.doc.upgrade() else {
411            return Ok(None);
412        };
413        self.finished = true;
414        if self.local_ops.is_empty() {
415            let mut state = doc.state.lock().unwrap();
416            state.abort_txn();
417            return Ok(Some(self.take_options()));
418        }
419
420        let ops = std::mem::take(&mut self.local_ops);
421        let deps = take(&mut self.frontiers);
422        let change = Change {
423            lamport: self.start_lamport,
424            ops,
425            deps,
426            id: ID::new(self.peer, self.start_counter),
427            timestamp: self.latest_timestamp.max(
428                self.timestamp
429                    .unwrap_or_else(|| doc.oplog.lock().unwrap().get_timestamp_for_next_txn()),
430            ),
431            commit_msg: take(&mut self.msg),
432        };
433
434        let change_meta = ChangeMeta::from_change(&change);
435        {
436            // add change to uncommit field of oplog
437            let mut oplog = doc.oplog.lock().unwrap();
438            oplog.set_uncommitted_change(change);
439        }
440
441        let modifier = ChangeModifier::default();
442        doc.pre_commit_subs.emit(
443            &(),
444            PreCommitCallbackPayload {
445                change_meta,
446                origin: self.origin.to_string(),
447                modifier: modifier.clone(),
448            },
449        );
450
451        let mut oplog = doc.oplog.lock().unwrap();
452        let mut state = doc.state.lock().unwrap();
453
454        let mut change = oplog.uncommitted_change.take().unwrap();
455        modifier.modify_change(&mut change);
456        let diff = if state.is_recording() {
457            Some(change_to_diff(
458                &change,
459                doc.clone(),
460                std::mem::take(&mut self.event_hints),
461            ))
462        } else {
463            None
464        };
465
466        let last_id = change.id_last();
467        if let Err(err) = oplog.import_local_change(change) {
468            state.abort_txn();
469            drop(state);
470            drop(oplog);
471            return Err(err);
472        }
473
474        state.commit_txn(
475            Frontiers::from_id(last_id),
476            diff.map(|arr| InternalDocDiff {
477                by: crate::event::EventTriggerKind::Local,
478                origin: self.origin.clone(),
479                diff: Cow::Owned(
480                    arr.into_iter()
481                        .map(|x| InternalContainerDiff {
482                            idx: x.idx,
483                            bring_back: false,
484                            diff: (x.diff.into()),
485                            diff_mode: crate::diff_calc::DiffMode::Linear,
486                        })
487                        .collect(),
488                ),
489                new_version: Cow::Borrowed(oplog.frontiers()),
490            }),
491        );
492        drop(state);
493        drop(oplog);
494        if let Some(on_commit) = self.on_commit.take() {
495            assert!(!doc.txn.is_locked());
496            on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
497        }
498        Ok(None)
499    }
500
501    fn take_options(&self) -> CommitOptions {
502        let mut options = CommitOptions::new();
503        if !self.origin.is_empty() {
504            options = options.origin(self.origin.as_str());
505        }
506        if let Some(msg) = self.msg.as_ref() {
507            options = options.commit_msg(msg);
508        }
509        if let Some(timestamp) = self.timestamp {
510            options = options.timestamp(timestamp);
511        }
512        options
513    }
514
515    pub(super) fn apply_local_op(
516        &mut self,
517        container: ContainerIdx,
518        content: RawOpContent,
519        event: EventHint,
520        // check whether context and txn are referring to the same state context
521        doc: &LoroDoc,
522    ) -> LoroResult<()> {
523        // TODO: need to check if the doc is the same
524        let this_doc = self.doc.upgrade().unwrap();
525        if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
526            return Err(LoroError::UnmatchedContext {
527                expected: this_doc
528                    .state
529                    .lock()
530                    .unwrap()
531                    .peer
532                    .load(std::sync::atomic::Ordering::Relaxed),
533                found: doc
534                    .state
535                    .lock()
536                    .unwrap()
537                    .peer
538                    .load(std::sync::atomic::Ordering::Relaxed),
539            });
540        }
541
542        let len = content.content_len();
543        assert!(len > 0);
544        let raw_op = RawOp {
545            id: ID {
546                peer: self.peer,
547                counter: self.next_counter,
548            },
549            lamport: self.next_lamport,
550            container,
551            content,
552        };
553
554        let mut oplog = doc.oplog.lock().unwrap();
555        let mut state = doc.state.lock().unwrap();
556        if state.is_deleted(container) {
557            return Err(LoroError::ContainerDeleted {
558                container: Box::new(state.arena.idx_to_id(container).unwrap()),
559            });
560        }
561
562        let op = self.arena.convert_raw_op(&raw_op);
563        state.apply_local_op(&raw_op, &op)?;
564        {
565            if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
566                self.is_peer_first_appearance = true;
567            }
568            // update version info
569            let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
570            let start_id = ID::new(self.peer, self.next_counter);
571            self.next_counter += len as Counter;
572            oplog.dag.update_version_on_new_local_op(
573                if self.local_ops.is_empty() {
574                    &self.frontiers
575                } else {
576                    &dep_id
577                },
578                start_id,
579                self.next_lamport,
580                len,
581            );
582            self.next_lamport += len as Lamport;
583            // set frontiers to the last op id
584            let last_id = start_id.inc(len as Counter - 1);
585            state.frontiers = Frontiers::from_id(last_id);
586        };
587        drop(state);
588        drop(oplog);
589        debug_assert_eq!(
590            event.rle_len(),
591            op.atom_len(),
592            "event:{:#?} \nop:{:#?}",
593            &event,
594            &op
595        );
596
597        let container_hints = self.event_hints.entry(container).or_default();
598
599        match container_hints.last_mut() {
600            Some(last) if last.can_merge(&event) => {
601                last.merge_right(&event);
602            }
603            _ => {
604                container_hints.push(event);
605            }
606        }
607        self.local_ops.push(op);
608        Ok(())
609    }
610
611    /// id can be a str, ContainerID, or ContainerIdRaw.
612    /// if it's str it will use Root container, which will not be None
613    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
614        let id = id.into_container_id(&self.arena, ContainerType::Text);
615        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
616            .into_text()
617            .unwrap()
618    }
619
620    /// id can be a str, ContainerID, or ContainerIdRaw.
621    /// if it's str it will use Root container, which will not be None
622    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
623        let id = id.into_container_id(&self.arena, ContainerType::List);
624        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
625            .into_list()
626            .unwrap()
627    }
628
629    /// id can be a str, ContainerID, or ContainerIdRaw.
630    /// if it's str it will use Root container, which will not be None
631    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
632        let id = id.into_container_id(&self.arena, ContainerType::Map);
633        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
634            .into_map()
635            .unwrap()
636    }
637
638    /// id can be a str, ContainerID, or ContainerIdRaw.
639    /// if it's str it will use Root container, which will not be None
640    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
641        let id = id.into_container_id(&self.arena, ContainerType::Tree);
642        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
643            .into_tree()
644            .unwrap()
645    }
646    pub fn next_id(&self) -> ID {
647        ID {
648            peer: self.peer,
649            counter: self.next_counter,
650        }
651    }
652
653    #[inline]
654    pub fn id_span(&self) -> IdSpan {
655        IdSpan::new(self.peer, self.start_counter, self.next_counter)
656    }
657
658    pub fn next_idlp(&self) -> IdLp {
659        IdLp {
660            peer: self.peer,
661            lamport: self.next_lamport,
662        }
663    }
664
665    pub fn is_empty(&self) -> bool {
666        self.local_ops.is_empty()
667    }
668
669    pub(crate) fn len(&self) -> usize {
670        (self.next_counter - self.start_counter) as usize
671    }
672
673    pub(crate) fn set_options(&mut self, options: CommitOptions) {
674        self.origin = options.origin.unwrap_or_default();
675        self.msg = options.commit_msg;
676        self.timestamp = options.timestamp;
677    }
678
679    pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
680        if self.origin.is_empty() {
681            self.origin = default_options.origin.unwrap_or_default();
682        }
683        if self.msg.is_none() {
684            self.msg = default_options.commit_msg;
685        }
686        if self.timestamp.is_none() {
687            self.timestamp = default_options.timestamp;
688        }
689    }
690}
691
692impl Drop for Transaction {
693    #[tracing::instrument(level = "debug", skip(self))]
694    fn drop(&mut self) {
695        if !self.finished {
696            // TODO: should we abort here or commit here?
697            // what if commit fails?
698            self._commit().unwrap();
699        }
700    }
701}
702
703#[derive(Debug, Clone)]
704pub(crate) struct TxnContainerDiff {
705    pub(crate) idx: ContainerIdx,
706    pub(crate) diff: Diff,
707}
708
709// PERF: could be compacter
710fn change_to_diff(
711    change: &Change,
712    doc: Arc<LoroDocInner>,
713    event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
714) -> Vec<TxnContainerDiff> {
715    let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
716    let peer = change.id.peer;
717    let mut lamport = change.lamport;
718
719    // Group ops by container first to match our new structure
720    let mut ops_by_container: FxHashMap<ContainerIdx, Vec<Op>> = FxHashMap::default();
721    for op in change.ops.iter() {
722        ops_by_container
723            .entry(op.container)
724            .or_default()
725            .push(op.clone());
726    }
727
728    // Process each container's hints and ops together
729    for (container_idx, hints) in event_hints {
730        let Some(container_ops) = ops_by_container.get_mut(&container_idx) else {
731            continue;
732        };
733
734        let mut op_index = 0;
735        let mut hint_iter = hints.into_iter();
736        let mut current_hint = hint_iter.next();
737
738        while op_index < container_ops.len() {
739            let Some(hint) = current_hint.take() else {
740                unreachable!("Missing hint for op");
741            };
742
743            // Collect ops that belong to this hint
744            let mut ops_for_hint: SmallVec<[Op; 1]> = smallvec![container_ops[op_index].clone()];
745            let mut total_len = container_ops[op_index].atom_len();
746
747            // If hint spans multiple ops, collect them
748            while total_len < hint.rle_len() {
749                op_index += 1;
750                let next_op_len = container_ops[op_index].atom_len();
751                let op = if next_op_len + total_len > hint.rle_len() {
752                    let new_len = hint.rle_len() - total_len;
753                    let left = container_ops[op_index].slice(0, new_len);
754                    let right = container_ops[op_index].slice(new_len, next_op_len);
755                    container_ops[op_index] = right;
756                    op_index -= 1;
757                    left
758                } else {
759                    container_ops[op_index].clone()
760                };
761
762                total_len += op.atom_len();
763                ops_for_hint.push(op);
764            }
765
766            op_index += 1;
767            assert_eq!(total_len, hint.rle_len(), "Op/hint length mismatch");
768
769            // Move to next hint
770            current_hint = hint_iter.next();
771
772            // Generate diff based on hint type
773            match hint {
774                EventHint::Mark { start, end, style } => {
775                    let mut meta = StyleMeta::default();
776                    meta.insert(
777                        style.key.clone(),
778                        StyleMetaItem {
779                            lamport,
780                            peer: change.id.peer,
781                            value: style.data,
782                        },
783                    );
784                    let diff = DeltaRopeBuilder::new()
785                        .retain(start as usize, Default::default())
786                        .retain(
787                            (end - start) as usize,
788                            meta.to_option_map().unwrap_or_default().into(),
789                        )
790                        .build();
791                    ans.push(TxnContainerDiff {
792                        idx: container_idx,
793                        diff: Diff::Text(diff),
794                    });
795                }
796                EventHint::InsertText { styles, pos, .. } => {
797                    let mut delta: TextDiff = DeltaRopeBuilder::new()
798                        .retain(pos as usize, Default::default())
799                        .build();
800                    for op in ops_for_hint.iter() {
801                        let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
802                        else {
803                            unreachable!()
804                        };
805
806                        delta.push_insert(
807                            slice.clone().into(),
808                            styles.to_option_map().unwrap_or_default().into(),
809                        );
810                    }
811                    ans.push(TxnContainerDiff {
812                        idx: container_idx,
813                        diff: Diff::Text(delta),
814                    })
815                }
816                EventHint::DeleteText {
817                    span,
818                    unicode_len: _,
819                    // we don't need to iter over ops here, because we already
820                    // know what the events should be
821                } => ans.push(TxnContainerDiff {
822                    idx: container_idx,
823                    diff: Diff::Text(
824                        DeltaRopeBuilder::new()
825                            .retain(span.start() as usize, Default::default())
826                            .delete(span.len())
827                            .build(),
828                    ),
829                }),
830                EventHint::InsertList { pos, .. } => {
831                    // We should use pos from event hint because index in op may
832                    // be using op index for the MovableList
833                    let mut index = pos;
834                    for op in ops_for_hint.iter() {
835                        let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
836                        let values = doc
837                            .arena
838                            .get_values(range.to_range())
839                            .into_iter()
840                            .map(|v| ValueOrHandler::from_value(v, &doc));
841                        let len = values.len();
842                        ans.push(TxnContainerDiff {
843                            idx: container_idx,
844                            diff: Diff::List(
845                                DeltaRopeBuilder::new()
846                                    .retain(index, Default::default())
847                                    .insert_many(values, Default::default())
848                                    .build(),
849                            ),
850                        });
851                        index += len;
852                    }
853                }
854                EventHint::DeleteList(s) => {
855                    ans.push(TxnContainerDiff {
856                        idx: container_idx,
857                        diff: Diff::List(
858                            DeltaRopeBuilder::new()
859                                .retain(s.start() as usize, Default::default())
860                                .delete(s.len())
861                                .build(),
862                        ),
863                    });
864                }
865                EventHint::Map { key, value } => ans.push(TxnContainerDiff {
866                    idx: container_idx,
867                    diff: Diff::Map(ResolvedMapDelta::new().with_entry(
868                        key,
869                        ResolvedMapValue {
870                            value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
871                            idlp: IdLp::new(peer, lamport),
872                        },
873                    )),
874                }),
875                EventHint::Tree(tree_diff) => {
876                    let mut diff = TreeDiff::default();
877                    diff.diff.extend(tree_diff.into_iter());
878                    ans.push(TxnContainerDiff {
879                        idx: container_idx,
880                        diff: Diff::Tree(diff),
881                    });
882                }
883                EventHint::Move { from, to, value } => {
884                    let mut a = DeltaRopeBuilder::new()
885                        .retain(from as usize, Default::default())
886                        .delete(1)
887                        .build();
888                    a.compose(
889                        &DeltaRopeBuilder::new()
890                            .retain(to as usize, Default::default())
891                            .insert(
892                                ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
893                                ListDeltaMeta { from_move: true },
894                            )
895                            .build(),
896                    );
897                    ans.push(TxnContainerDiff {
898                        idx: container_idx,
899                        diff: Diff::List(a),
900                    });
901                }
902                EventHint::SetList { index, value } => {
903                    ans.push(TxnContainerDiff {
904                        idx: container_idx,
905                        diff: Diff::List(
906                            DeltaRopeBuilder::new()
907                                .retain(index, Default::default())
908                                .delete(1)
909                                .insert(
910                                    ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
911                                    Default::default(),
912                                )
913                                .build(),
914                        ),
915                    });
916                }
917                EventHint::MarkEnd => {
918                    // do nothing
919                }
920                #[cfg(feature = "counter")]
921                EventHint::Counter(diff) => {
922                    ans.push(TxnContainerDiff {
923                        idx: container_idx,
924                        diff: Diff::Counter(diff),
925                    });
926                }
927            }
928
929            // Update lamport for this hint's operations
930            lamport += ops_for_hint
931                .iter()
932                .map(|x| x.content_len() as Lamport)
933                .sum::<Lamport>();
934        }
935    }
936
937    ans
938}