loro_internal/
txn.rs

1use core::panic;
2use std::{
3    borrow::Cow,
4    mem::take,
5    sync::{Arc, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use rustc_hash::FxHashMap;
10use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
11use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
12use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
13use rle::{HasLength, Mergable, RleVec, Sliceable};
14use smallvec::{smallvec, SmallVec};
15
16use crate::{
17    change::{Change, Lamport, Timestamp},
18    container::{
19        idx::ContainerIdx,
20        list::list_op::{DeleteSpan, InnerListOp},
21        richtext::Style,
22        IntoContainerId,
23    },
24    delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
25    encoding::export_fast_updates_in_range,
26    event::{Diff, ListDeltaMeta, TextDiff},
27    handler::{Handler, ValueOrHandler},
28    id::{Counter, PeerID, ID},
29    lock::{LoroMutex, LoroMutexGuard},
30    loro::CommitOptions,
31    op::{Op, RawOp, RawOpContent},
32    pre_commit::{ChangeModifier, PreCommitCallbackPayload},
33    span::HasIdSpan,
34    version::Frontiers,
35    ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
36};
37
38use super::{
39    arena::SharedArena,
40    event::{InternalContainerDiff, InternalDocDiff},
41    handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
42    oplog::OpLog,
43    state::DocState,
44};
45
46impl crate::LoroDoc {
47    /// Create a new transaction.
48    /// Every ops created inside one transaction will be packed into a single
49    /// [Change].
50    ///
51    /// There can only be one active transaction at a time for a [LoroDoc].
52    #[inline(always)]
53    pub fn txn(&self) -> Result<Transaction, LoroError> {
54        self.txn_with_origin("")
55    }
56
57    /// Create a new transaction with specified origin.
58    ///
59    /// The origin will be propagated to the events.
60    /// There can only be one active transaction at a time for a [LoroDoc].
61    pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
62        if !self.can_edit() {
63            return Err(LoroError::TransactionError(
64                String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
65            ));
66        }
67
68        let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
69
70        let obs = self.observer.clone();
71        let local_update_subs_weak = self.local_update_subs.downgrade();
72        txn.set_on_commit(Box::new(move |state, oplog, id_span| {
73            let mut state = state.lock().unwrap();
74            let events = state.take_events();
75            drop(state);
76            for event in events {
77                obs.emit(event);
78            }
79
80            if id_span.atom_len() == 0 {
81                return;
82            }
83
84            if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
85                if !local_update_subs.inner().is_empty() {
86                    let bytes =
87                        { export_fast_updates_in_range(&oplog.lock().unwrap(), &[id_span]) };
88                    local_update_subs.emit(&(), bytes);
89                }
90            }
91        }));
92
93        Ok(txn)
94    }
95
96    pub fn start_auto_commit(&self) {
97        self.auto_commit
98            .store(true, std::sync::atomic::Ordering::Release);
99        let mut self_txn = self.txn.lock().unwrap();
100        if self_txn.is_some() || !self.can_edit() {
101            return;
102        }
103
104        let txn = self.txn().unwrap();
105        self_txn.replace(txn);
106    }
107
108    #[inline]
109    pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
110        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
111            let mut self_txn = self.txn.lock().unwrap();
112            if self_txn.is_some() {
113                return;
114            }
115
116            let mut txn = self.txn().unwrap();
117            if let Some(options) = options {
118                txn.set_options(options);
119            }
120            self_txn.replace(txn);
121        }
122    }
123
124    #[inline]
125    pub(crate) fn _renew_txn_if_auto_commit_with_guard(
126        &self,
127        options: Option<CommitOptions>,
128        mut guard: LoroMutexGuard<Option<Transaction>>,
129    ) {
130        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
131            if guard.is_some() {
132                return;
133            }
134
135            let mut txn = self.txn().unwrap();
136            if let Some(options) = options {
137                txn.set_options(options);
138            }
139            guard.replace(txn);
140        }
141    }
142}
143
144pub(crate) type OnCommitFn =
145    Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
146
147pub struct Transaction {
148    peer: PeerID,
149    origin: InternalString,
150    start_counter: Counter,
151    next_counter: Counter,
152    start_lamport: Lamport,
153    next_lamport: Lamport,
154    doc: Weak<LoroDocInner>,
155    frontiers: Frontiers,
156    local_ops: RleVec<[Op; 1]>, // TODO: use a more efficient data structure
157    event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
158    pub(super) arena: SharedArena,
159    finished: bool,
160    on_commit: Option<OnCommitFn>,
161    timestamp: Option<Timestamp>,
162    msg: Option<Arc<str>>,
163    latest_timestamp: Timestamp,
164    pub(super) is_peer_first_appearance: bool,
165}
166
167impl std::fmt::Debug for Transaction {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("Transaction")
170            .field("peer", &self.peer)
171            .field("origin", &self.origin)
172            .field("start_counter", &self.start_counter)
173            .field("next_counter", &self.next_counter)
174            .field("start_lamport", &self.start_lamport)
175            .field("next_lamport", &self.next_lamport)
176            .field("frontiers", &self.frontiers)
177            .field("local_ops", &self.local_ops)
178            .field("event_hints", &self.event_hints)
179            .field("arena", &self.arena)
180            .field("finished", &self.finished)
181            .field("on_commit", &self.on_commit.is_some())
182            .field("timestamp", &self.timestamp)
183            .finish()
184    }
185}
186
187/// We can infer local events directly from the local behavior. This enum is used to
188/// record them, so that we can avoid recalculate them when we commit the transaction.
189///
190/// For example, when we insert a text in wasm, users use the utf16 index to send the
191/// command. However, internally loro will convert it to unicode index. But the users
192/// still need events that are in utf16 index. To avoid the round trip, we record the
193/// events here.
194#[derive(Debug, Clone, EnumAsInner)]
195pub(super) enum EventHint {
196    Mark {
197        start: u32,
198        end: u32,
199        style: Style,
200    },
201    InsertText {
202        /// pos is a Unicode index. If wasm, it's a UTF-16 index.
203        pos: u32,
204        event_len: u32,
205        unicode_len: u32,
206        styles: StyleMeta,
207    },
208    /// pos is a Unicode index. If wasm, it's a UTF-16 index.
209    DeleteText {
210        span: DeleteSpan,
211        unicode_len: usize,
212    },
213    InsertList {
214        len: u32,
215        pos: usize,
216    },
217    SetList {
218        index: usize,
219        value: LoroValue,
220    },
221    Move {
222        value: LoroValue,
223        from: u32,
224        to: u32,
225    },
226    DeleteList(DeleteSpan),
227    Map {
228        key: InternalString,
229        value: Option<LoroValue>,
230    },
231    // use vec because we could bring back some node that has children
232    Tree(SmallVec<[TreeDiffItem; 1]>),
233    MarkEnd,
234    #[cfg(feature = "counter")]
235    Counter(f64),
236}
237
238impl generic_btree::rle::HasLength for EventHint {
239    fn rle_len(&self) -> usize {
240        match self {
241            EventHint::Mark { .. } => 1,
242            EventHint::InsertText {
243                unicode_len: len, ..
244            } => *len as usize,
245            EventHint::DeleteText { unicode_len, .. } => *unicode_len,
246            EventHint::InsertList { len, .. } => *len as usize,
247            EventHint::DeleteList(d) => d.len(),
248            EventHint::Map { .. } => 1,
249            EventHint::Tree(_) => 1,
250            EventHint::MarkEnd => 1,
251            EventHint::Move { .. } => 1,
252            EventHint::SetList { .. } => 1,
253            #[cfg(feature = "counter")]
254            EventHint::Counter(_) => 1,
255        }
256    }
257}
258
259impl generic_btree::rle::Mergeable for EventHint {
260    fn can_merge(&self, rhs: &Self) -> bool {
261        match (self, rhs) {
262            (
263                EventHint::InsertText {
264                    pos,
265                    unicode_len: _,
266                    event_len,
267                    styles,
268                },
269                EventHint::InsertText {
270                    pos: r_pos,
271                    styles: r_styles,
272                    ..
273                },
274            ) => *pos + *event_len == *r_pos && styles == r_styles,
275            (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
276                pos + *len as usize == *pos_right
277            }
278            // We don't merge delete text because it's hard to infer the correct pos to split:
279            // `range` param is in unicode range, but the delete text event is in UTF-16 range.
280            // Without the original text, it's impossible to convert the range.
281            (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
282                span.is_mergable(r, &())
283            }
284            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
285            _ => false,
286        }
287    }
288
289    fn merge_right(&mut self, rhs: &Self) {
290        match (self, rhs) {
291            (
292                EventHint::InsertText {
293                    event_len,
294                    unicode_len: len,
295                    ..
296                },
297                EventHint::InsertText {
298                    event_len: r_event_len,
299                    unicode_len: r_len,
300                    ..
301                },
302            ) => {
303                *len += *r_len;
304                *event_len += *r_event_len;
305            }
306            (
307                EventHint::InsertList { len, pos: _ },
308                EventHint::InsertList { len: r_len, pos: _ },
309            ) => *len += *r_len,
310            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
311            (
312                EventHint::DeleteText { span, unicode_len },
313                EventHint::DeleteText {
314                    span: r_span,
315                    unicode_len: r_len,
316                },
317            ) => {
318                *unicode_len += *r_len;
319                span.merge(r_span, &());
320            }
321            _ => unreachable!(),
322        }
323    }
324
325    fn merge_left(&mut self, _: &Self) {
326        unreachable!()
327    }
328}
329
330impl Transaction {
331    #[inline]
332    pub fn new(doc: Arc<LoroDocInner>) -> Self {
333        Self::new_with_origin(doc.clone(), "".into())
334    }
335
336    pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
337        let oplog_lock = doc.oplog.lock().unwrap();
338        let mut state_lock = doc.state.lock().unwrap();
339        if state_lock.is_in_txn() {
340            panic!("Cannot start a transaction while another one is in progress");
341        }
342
343        state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
344        let arena = state_lock.arena.clone();
345        let frontiers = state_lock.frontiers.clone();
346        let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
347        let next_counter = oplog_lock.next_id(peer).counter;
348        let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
349        let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
350        oplog_lock
351            .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
352            .unwrap();
353        drop(state_lock);
354        drop(oplog_lock);
355        Self {
356            peer,
357            doc: Arc::downgrade(&doc),
358            arena,
359            frontiers,
360            timestamp: None,
361            next_counter,
362            next_lamport,
363            origin: Default::default(),
364            start_counter: next_counter,
365            start_lamport: next_lamport,
366            event_hints: Default::default(),
367            local_ops: RleVec::new(),
368            finished: false,
369            on_commit: None,
370            msg: None,
371            latest_timestamp,
372            is_peer_first_appearance: false,
373        }
374    }
375
376    pub fn set_origin(&mut self, origin: InternalString) {
377        self.origin = origin;
378    }
379
380    pub fn set_timestamp(&mut self, time: Timestamp) {
381        self.timestamp = Some(time);
382    }
383
384    pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
385        self.msg = msg;
386    }
387
388    pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
389        &self.local_ops
390    }
391
392    pub fn peer(&self) -> &PeerID {
393        &self.peer
394    }
395
396    pub fn timestamp(&self) -> &Option<Timestamp> {
397        &self.timestamp
398    }
399
400    pub fn frontiers(&self) -> &Frontiers {
401        &self.frontiers
402    }
403    pub fn msg(&self) -> &Option<Arc<str>> {
404        &self.msg
405    }
406
407    pub fn lamport(&self) -> &Lamport {
408        &self.start_lamport
409    }
410
411    pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
412        self.on_commit = Some(f);
413    }
414
415    pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
416        self.on_commit.take()
417    }
418
419    pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
420        self._commit()
421    }
422
423    #[tracing::instrument(level = "debug", skip(self))]
424    fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
425        if self.finished {
426            return Ok(None);
427        }
428
429        let Some(doc) = self.doc.upgrade() else {
430            return Ok(None);
431        };
432        self.finished = true;
433        if self.local_ops.is_empty() {
434            let mut state = doc.state.lock().unwrap();
435            state.abort_txn();
436            return Ok(Some(self.take_options()));
437        }
438
439        let ops = std::mem::take(&mut self.local_ops);
440        let deps = take(&mut self.frontiers);
441        let change = Change {
442            lamport: self.start_lamport,
443            ops,
444            deps,
445            id: ID::new(self.peer, self.start_counter),
446            timestamp: self.latest_timestamp.max(
447                self.timestamp
448                    .unwrap_or_else(|| doc.oplog.lock().unwrap().get_timestamp_for_next_txn()),
449            ),
450            commit_msg: take(&mut self.msg),
451        };
452
453        let change_meta = ChangeMeta::from_change(&change);
454        {
455            // add change to uncommit field of oplog
456            let mut oplog = doc.oplog.lock().unwrap();
457            oplog.set_uncommitted_change(change);
458        }
459
460        let modifier = ChangeModifier::default();
461        doc.pre_commit_subs.emit(
462            &(),
463            PreCommitCallbackPayload {
464                change_meta,
465                origin: self.origin.to_string(),
466                modifier: modifier.clone(),
467            },
468        );
469
470        let mut oplog = doc.oplog.lock().unwrap();
471        let mut state = doc.state.lock().unwrap();
472
473        let mut change = oplog.uncommitted_change.take().unwrap();
474        modifier.modify_change(&mut change);
475        let diff = if state.is_recording() {
476            Some(change_to_diff(
477                &change,
478                doc.clone(),
479                std::mem::take(&mut self.event_hints),
480            ))
481        } else {
482            None
483        };
484
485        let last_id = change.id_last();
486        if let Err(err) = oplog.import_local_change(change) {
487            state.abort_txn();
488            drop(state);
489            drop(oplog);
490            return Err(err);
491        }
492
493        state.commit_txn(
494            Frontiers::from_id(last_id),
495            diff.map(|arr| InternalDocDiff {
496                by: crate::event::EventTriggerKind::Local,
497                origin: self.origin.clone(),
498                diff: Cow::Owned(
499                    arr.into_iter()
500                        .map(|x| InternalContainerDiff {
501                            idx: x.idx,
502                            bring_back: false,
503                            diff: (x.diff.into()),
504                            diff_mode: crate::diff_calc::DiffMode::Linear,
505                        })
506                        .collect(),
507                ),
508                new_version: Cow::Borrowed(oplog.frontiers()),
509            }),
510        );
511        drop(state);
512        drop(oplog);
513        if let Some(on_commit) = self.on_commit.take() {
514            assert!(!doc.txn.is_locked());
515            on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
516        }
517        Ok(None)
518    }
519
520    fn take_options(&self) -> CommitOptions {
521        let mut options = CommitOptions::new();
522        if !self.origin.is_empty() {
523            options = options.origin(self.origin.as_str());
524        }
525        if let Some(msg) = self.msg.as_ref() {
526            options = options.commit_msg(msg);
527        }
528        if let Some(timestamp) = self.timestamp {
529            options = options.timestamp(timestamp);
530        }
531        options
532    }
533
534    pub(super) fn apply_local_op(
535        &mut self,
536        container: ContainerIdx,
537        content: RawOpContent,
538        event: EventHint,
539        // check whether context and txn are referring to the same state context
540        doc: &LoroDoc,
541    ) -> LoroResult<()> {
542        // TODO: need to check if the doc is the same
543        let this_doc = self.doc.upgrade().unwrap();
544        if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
545            return Err(LoroError::UnmatchedContext {
546                expected: this_doc
547                    .state
548                    .lock()
549                    .unwrap()
550                    .peer
551                    .load(std::sync::atomic::Ordering::Relaxed),
552                found: doc
553                    .state
554                    .lock()
555                    .unwrap()
556                    .peer
557                    .load(std::sync::atomic::Ordering::Relaxed),
558            });
559        }
560
561        let len = content.content_len();
562        assert!(len > 0);
563        let raw_op = RawOp {
564            id: ID {
565                peer: self.peer,
566                counter: self.next_counter,
567            },
568            lamport: self.next_lamport,
569            container,
570            content,
571        };
572
573        let mut oplog = doc.oplog.lock().unwrap();
574        let mut state = doc.state.lock().unwrap();
575        if state.is_deleted(container) {
576            return Err(LoroError::ContainerDeleted {
577                container: Box::new(state.arena.idx_to_id(container).unwrap()),
578            });
579        }
580
581        let op = self.arena.convert_raw_op(&raw_op);
582        state.apply_local_op(&raw_op, &op)?;
583        {
584            if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
585                self.is_peer_first_appearance = true;
586            }
587            // update version info
588            let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
589            let start_id = ID::new(self.peer, self.next_counter);
590            self.next_counter += len as Counter;
591            oplog.dag.update_version_on_new_local_op(
592                if self.local_ops.is_empty() {
593                    &self.frontiers
594                } else {
595                    &dep_id
596                },
597                start_id,
598                self.next_lamport,
599                len,
600            );
601            self.next_lamport += len as Lamport;
602            // set frontiers to the last op id
603            let last_id = start_id.inc(len as Counter - 1);
604            state.frontiers = Frontiers::from_id(last_id);
605        };
606        drop(state);
607        drop(oplog);
608        debug_assert_eq!(
609            event.rle_len(),
610            op.atom_len(),
611            "event:{:#?} \nop:{:#?}",
612            &event,
613            &op
614        );
615
616        let container_hints = self.event_hints.entry(container).or_default();
617
618        match container_hints.last_mut() {
619            Some(last) if last.can_merge(&event) => {
620                last.merge_right(&event);
621            }
622            _ => {
623                container_hints.push(event);
624            }
625        }
626        self.local_ops.push(op);
627        Ok(())
628    }
629
630    /// id can be a str, ContainerID, or ContainerIdRaw.
631    /// if it's str it will use Root container, which will not be None
632    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
633        let id = id.into_container_id(&self.arena, ContainerType::Text);
634        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
635            .into_text()
636            .unwrap()
637    }
638
639    /// id can be a str, ContainerID, or ContainerIdRaw.
640    /// if it's str it will use Root container, which will not be None
641    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
642        let id = id.into_container_id(&self.arena, ContainerType::List);
643        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
644            .into_list()
645            .unwrap()
646    }
647
648    /// id can be a str, ContainerID, or ContainerIdRaw.
649    /// if it's str it will use Root container, which will not be None
650    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
651        let id = id.into_container_id(&self.arena, ContainerType::Map);
652        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
653            .into_map()
654            .unwrap()
655    }
656
657    /// id can be a str, ContainerID, or ContainerIdRaw.
658    /// if it's str it will use Root container, which will not be None
659    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
660        let id = id.into_container_id(&self.arena, ContainerType::Tree);
661        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
662            .into_tree()
663            .unwrap()
664    }
665    pub fn next_id(&self) -> ID {
666        ID {
667            peer: self.peer,
668            counter: self.next_counter,
669        }
670    }
671
672    #[inline]
673    pub fn id_span(&self) -> IdSpan {
674        IdSpan::new(self.peer, self.start_counter, self.next_counter)
675    }
676
677    pub fn next_idlp(&self) -> IdLp {
678        IdLp {
679            peer: self.peer,
680            lamport: self.next_lamport,
681        }
682    }
683
684    pub fn is_empty(&self) -> bool {
685        self.local_ops.is_empty()
686    }
687
688    pub(crate) fn len(&self) -> usize {
689        (self.next_counter - self.start_counter) as usize
690    }
691
692    pub(crate) fn set_options(&mut self, options: CommitOptions) {
693        self.origin = options.origin.unwrap_or_default();
694        self.msg = options.commit_msg;
695        self.timestamp = options.timestamp;
696    }
697
698    pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
699        if self.origin.is_empty() {
700            self.origin = default_options.origin.unwrap_or_default();
701        }
702        if self.msg.is_none() {
703            self.msg = default_options.commit_msg;
704        }
705        if self.timestamp.is_none() {
706            self.timestamp = default_options.timestamp;
707        }
708    }
709}
710
711impl Drop for Transaction {
712    #[tracing::instrument(level = "debug", skip(self))]
713    fn drop(&mut self) {
714        if !self.finished {
715            // TODO: should we abort here or commit here?
716            // what if commit fails?
717            self._commit().unwrap();
718        }
719    }
720}
721
722#[derive(Debug, Clone)]
723pub(crate) struct TxnContainerDiff {
724    pub(crate) idx: ContainerIdx,
725    pub(crate) diff: Diff,
726}
727
728// PERF: could be compacter
729fn change_to_diff(
730    change: &Change,
731    doc: Arc<LoroDocInner>,
732    event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
733) -> Vec<TxnContainerDiff> {
734    let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
735    let peer = change.id.peer;
736    let mut lamport = change.lamport;
737
738    // Group ops by container first to match our new structure
739    let mut ops_by_container: FxHashMap<ContainerIdx, Vec<Op>> = FxHashMap::default();
740    for op in change.ops.iter() {
741        ops_by_container
742            .entry(op.container)
743            .or_default()
744            .push(op.clone());
745    }
746
747    // Process each container's hints and ops together
748    for (container_idx, hints) in event_hints {
749        let Some(container_ops) = ops_by_container.get_mut(&container_idx) else {
750            continue;
751        };
752
753        let mut op_index = 0;
754        let mut hint_iter = hints.into_iter();
755        let mut current_hint = hint_iter.next();
756
757        while op_index < container_ops.len() {
758            let Some(hint) = current_hint.take() else {
759                unreachable!("Missing hint for op");
760            };
761
762            // Collect ops that belong to this hint
763            let mut ops_for_hint: SmallVec<[Op; 1]> = smallvec![container_ops[op_index].clone()];
764            let mut total_len = container_ops[op_index].atom_len();
765
766            // If hint spans multiple ops, collect them
767            while total_len < hint.rle_len() {
768                op_index += 1;
769                let next_op_len = container_ops[op_index].atom_len();
770                let op = if next_op_len + total_len > hint.rle_len() {
771                    let new_len = hint.rle_len() - total_len;
772                    let left = container_ops[op_index].slice(0, new_len);
773                    let right = container_ops[op_index].slice(new_len, next_op_len);
774                    container_ops[op_index] = right;
775                    op_index -= 1;
776                    left
777                } else {
778                    container_ops[op_index].clone()
779                };
780
781                total_len += op.atom_len();
782                ops_for_hint.push(op);
783            }
784
785            op_index += 1;
786            assert_eq!(total_len, hint.rle_len(), "Op/hint length mismatch");
787
788            // Move to next hint
789            current_hint = hint_iter.next();
790
791            // Generate diff based on hint type
792            match hint {
793                EventHint::Mark { start, end, style } => {
794                    let mut meta = StyleMeta::default();
795                    meta.insert(
796                        style.key.clone(),
797                        StyleMetaItem {
798                            lamport,
799                            peer: change.id.peer,
800                            value: style.data,
801                        },
802                    );
803                    let diff = DeltaRopeBuilder::new()
804                        .retain(start as usize, Default::default())
805                        .retain(
806                            (end - start) as usize,
807                            meta.to_option_map().unwrap_or_default().into(),
808                        )
809                        .build();
810                    ans.push(TxnContainerDiff {
811                        idx: container_idx,
812                        diff: Diff::Text(diff),
813                    });
814                }
815                EventHint::InsertText { styles, pos, .. } => {
816                    let mut delta: TextDiff = DeltaRopeBuilder::new()
817                        .retain(pos as usize, Default::default())
818                        .build();
819                    for op in ops_for_hint.iter() {
820                        let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
821                        else {
822                            unreachable!()
823                        };
824
825                        delta.push_insert(
826                            slice.clone().into(),
827                            styles.to_option_map().unwrap_or_default().into(),
828                        );
829                    }
830                    ans.push(TxnContainerDiff {
831                        idx: container_idx,
832                        diff: Diff::Text(delta),
833                    })
834                }
835                EventHint::DeleteText {
836                    span,
837                    unicode_len: _,
838                    // we don't need to iter over ops here, because we already
839                    // know what the events should be
840                } => ans.push(TxnContainerDiff {
841                    idx: container_idx,
842                    diff: Diff::Text(
843                        DeltaRopeBuilder::new()
844                            .retain(span.start() as usize, Default::default())
845                            .delete(span.len())
846                            .build(),
847                    ),
848                }),
849                EventHint::InsertList { pos, .. } => {
850                    // We should use pos from event hint because index in op may
851                    // be using op index for the MovableList
852                    let mut index = pos;
853                    for op in ops_for_hint.iter() {
854                        let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
855                        let values = doc
856                            .arena
857                            .get_values(range.to_range())
858                            .into_iter()
859                            .map(|v| ValueOrHandler::from_value(v, &doc));
860                        let len = values.len();
861                        ans.push(TxnContainerDiff {
862                            idx: container_idx,
863                            diff: Diff::List(
864                                DeltaRopeBuilder::new()
865                                    .retain(index, Default::default())
866                                    .insert_many(values, Default::default())
867                                    .build(),
868                            ),
869                        });
870                        index += len;
871                    }
872                }
873                EventHint::DeleteList(s) => {
874                    ans.push(TxnContainerDiff {
875                        idx: container_idx,
876                        diff: Diff::List(
877                            DeltaRopeBuilder::new()
878                                .retain(s.start() as usize, Default::default())
879                                .delete(s.len())
880                                .build(),
881                        ),
882                    });
883                }
884                EventHint::Map { key, value } => ans.push(TxnContainerDiff {
885                    idx: container_idx,
886                    diff: Diff::Map(ResolvedMapDelta::new().with_entry(
887                        key,
888                        ResolvedMapValue {
889                            value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
890                            idlp: IdLp::new(peer, lamport),
891                        },
892                    )),
893                }),
894                EventHint::Tree(tree_diff) => {
895                    let mut diff = TreeDiff::default();
896                    diff.diff.extend(tree_diff.into_iter());
897                    ans.push(TxnContainerDiff {
898                        idx: container_idx,
899                        diff: Diff::Tree(diff),
900                    });
901                }
902                EventHint::Move { from, to, value } => {
903                    let mut a = DeltaRopeBuilder::new()
904                        .retain(from as usize, Default::default())
905                        .delete(1)
906                        .build();
907                    a.compose(
908                        &DeltaRopeBuilder::new()
909                            .retain(to as usize, Default::default())
910                            .insert(
911                                ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
912                                ListDeltaMeta { from_move: true },
913                            )
914                            .build(),
915                    );
916                    ans.push(TxnContainerDiff {
917                        idx: container_idx,
918                        diff: Diff::List(a),
919                    });
920                }
921                EventHint::SetList { index, value } => {
922                    ans.push(TxnContainerDiff {
923                        idx: container_idx,
924                        diff: Diff::List(
925                            DeltaRopeBuilder::new()
926                                .retain(index, Default::default())
927                                .delete(1)
928                                .insert(
929                                    ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
930                                    Default::default(),
931                                )
932                                .build(),
933                        ),
934                    });
935                }
936                EventHint::MarkEnd => {
937                    // do nothing
938                }
939                #[cfg(feature = "counter")]
940                EventHint::Counter(diff) => {
941                    ans.push(TxnContainerDiff {
942                        idx: container_idx,
943                        diff: Diff::Counter(diff),
944                    });
945                }
946            }
947
948            // Update lamport for this hint's operations
949            lamport += ops_for_hint
950                .iter()
951                .map(|x| x.content_len() as Lamport)
952                .sum::<Lamport>();
953        }
954    }
955
956    ans
957}