loro_internal/
txn.rs

1use core::panic;
2use std::{
3    borrow::Cow,
4    mem::take,
5    sync::{Arc, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
10use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
11use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
12use rle::{HasLength, Mergable, RleVec};
13use smallvec::{smallvec, SmallVec};
14
15use crate::{
16    change::{Change, Lamport, Timestamp},
17    container::{
18        idx::ContainerIdx,
19        list::list_op::{DeleteSpan, InnerListOp},
20        richtext::Style,
21        IntoContainerId,
22    },
23    delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
24    encoding::export_fast_updates_in_range,
25    event::{Diff, ListDeltaMeta, TextDiff},
26    handler::{Handler, ValueOrHandler},
27    id::{Counter, PeerID, ID},
28    lock::LoroMutex,
29    loro::CommitOptions,
30    op::{Op, RawOp, RawOpContent},
31    pre_commit::{ChangeModifier, PreCommitCallbackPayload},
32    span::HasIdSpan,
33    version::Frontiers,
34    ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
35};
36
37use super::{
38    arena::SharedArena,
39    event::{InternalContainerDiff, InternalDocDiff},
40    handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
41    oplog::OpLog,
42    state::DocState,
43};
44
45impl crate::LoroDoc {
46    /// Create a new transaction.
47    /// Every ops created inside one transaction will be packed into a single
48    /// [Change].
49    ///
50    /// There can only be one active transaction at a time for a [LoroDoc].
51    #[inline(always)]
52    pub fn txn(&self) -> Result<Transaction, LoroError> {
53        self.txn_with_origin("")
54    }
55
56    /// Create a new transaction with specified origin.
57    ///
58    /// The origin will be propagated to the events.
59    /// There can only be one active transaction at a time for a [LoroDoc].
60    pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
61        if !self.can_edit() {
62            return Err(LoroError::TransactionError(
63                String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
64            ));
65        }
66
67        let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
68
69        let obs = self.observer.clone();
70        let local_update_subs_weak = self.local_update_subs.downgrade();
71        txn.set_on_commit(Box::new(move |state, oplog, id_span| {
72            let mut state = state.lock().unwrap();
73            let events = state.take_events();
74            drop(state);
75            for event in events {
76                obs.emit(event);
77            }
78
79            if id_span.atom_len() == 0 {
80                return;
81            }
82
83            if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
84                if !local_update_subs.inner().is_empty() {
85                    let bytes =
86                        { export_fast_updates_in_range(&oplog.lock().unwrap(), &[id_span]) };
87                    local_update_subs.emit(&(), bytes);
88                }
89            }
90        }));
91
92        Ok(txn)
93    }
94
95    pub fn start_auto_commit(&self) {
96        self.auto_commit
97            .store(true, std::sync::atomic::Ordering::Release);
98        let mut self_txn = self.txn.lock().unwrap();
99        if self_txn.is_some() || !self.can_edit() {
100            return;
101        }
102
103        let txn = self.txn().unwrap();
104        self_txn.replace(txn);
105    }
106
107    #[inline]
108    pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
109        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
110            let mut self_txn = self.txn.lock().unwrap();
111            if self_txn.is_some() {
112                return;
113            }
114
115            let mut txn = self.txn().unwrap();
116            if let Some(options) = options {
117                txn.set_options(options);
118            }
119            self_txn.replace(txn);
120        }
121    }
122}
123
124pub(crate) type OnCommitFn =
125    Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
126
127pub struct Transaction {
128    peer: PeerID,
129    origin: InternalString,
130    start_counter: Counter,
131    next_counter: Counter,
132    start_lamport: Lamport,
133    next_lamport: Lamport,
134    doc: Weak<LoroDocInner>,
135    frontiers: Frontiers,
136    local_ops: RleVec<[Op; 1]>, // TODO: use a more efficient data structure
137    event_hints: Vec<EventHint>,
138    pub(super) arena: SharedArena,
139    finished: bool,
140    on_commit: Option<OnCommitFn>,
141    timestamp: Option<Timestamp>,
142    msg: Option<Arc<str>>,
143    latest_timestamp: Timestamp,
144    pub(super) is_peer_first_appearance: bool,
145}
146
147impl std::fmt::Debug for Transaction {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("Transaction")
150            .field("peer", &self.peer)
151            .field("origin", &self.origin)
152            .field("start_counter", &self.start_counter)
153            .field("next_counter", &self.next_counter)
154            .field("start_lamport", &self.start_lamport)
155            .field("next_lamport", &self.next_lamport)
156            .field("frontiers", &self.frontiers)
157            .field("local_ops", &self.local_ops)
158            .field("event_hints", &self.event_hints)
159            .field("arena", &self.arena)
160            .field("finished", &self.finished)
161            .field("on_commit", &self.on_commit.is_some())
162            .field("timestamp", &self.timestamp)
163            .finish()
164    }
165}
166
167/// We can infer local events directly from the local behavior. This enum is used to
168/// record them, so that we can avoid recalculate them when we commit the transaction.
169///
170/// For example, when we insert a text in wasm, users use the utf16 index to send the
171/// command. However, internally loro will convert it to unicode index. But the users
172/// still need events that are in utf16 index. To avoid the round trip, we record the
173/// events here.
174#[derive(Debug, Clone, EnumAsInner)]
175pub(super) enum EventHint {
176    Mark {
177        start: u32,
178        end: u32,
179        style: Style,
180    },
181    InsertText {
182        /// pos is a Unicode index. If wasm, it's a UTF-16 index.
183        pos: u32,
184        event_len: u32,
185        unicode_len: u32,
186        styles: StyleMeta,
187    },
188    /// pos is a Unicode index. If wasm, it's a UTF-16 index.
189    DeleteText {
190        span: DeleteSpan,
191        unicode_len: usize,
192    },
193    InsertList {
194        len: u32,
195        pos: usize,
196    },
197    SetList {
198        index: usize,
199        value: LoroValue,
200    },
201    Move {
202        value: LoroValue,
203        from: u32,
204        to: u32,
205    },
206    DeleteList(DeleteSpan),
207    Map {
208        key: InternalString,
209        value: Option<LoroValue>,
210    },
211    // use vec because we could bring back some node that has children
212    Tree(SmallVec<[TreeDiffItem; 1]>),
213    MarkEnd,
214    #[cfg(feature = "counter")]
215    Counter(f64),
216}
217
218impl generic_btree::rle::HasLength for EventHint {
219    fn rle_len(&self) -> usize {
220        match self {
221            EventHint::Mark { .. } => 1,
222            EventHint::InsertText {
223                unicode_len: len, ..
224            } => *len as usize,
225            EventHint::DeleteText { unicode_len, .. } => *unicode_len,
226            EventHint::InsertList { len, .. } => *len as usize,
227            EventHint::DeleteList(d) => d.len(),
228            EventHint::Map { .. } => 1,
229            EventHint::Tree(_) => 1,
230            EventHint::MarkEnd => 1,
231            EventHint::Move { .. } => 1,
232            EventHint::SetList { .. } => 1,
233            #[cfg(feature = "counter")]
234            EventHint::Counter(_) => 1,
235        }
236    }
237}
238
239impl generic_btree::rle::Mergeable for EventHint {
240    fn can_merge(&self, rhs: &Self) -> bool {
241        match (self, rhs) {
242            (
243                EventHint::InsertText {
244                    pos,
245                    unicode_len: _,
246                    event_len,
247                    styles,
248                },
249                EventHint::InsertText {
250                    pos: r_pos,
251                    styles: r_styles,
252                    ..
253                },
254            ) => *pos + *event_len == *r_pos && styles == r_styles,
255            (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
256                pos + *len as usize == *pos_right
257            }
258            // We don't merge delete text because it's hard to infer the correct pos to split:
259            // `range` param is in unicode range, but the delete text event is in UTF-16 range.
260            // Without the original text, it's impossible to convert the range.
261            (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
262                span.is_mergable(r, &())
263            }
264            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
265            _ => false,
266        }
267    }
268
269    fn merge_right(&mut self, rhs: &Self) {
270        match (self, rhs) {
271            (
272                EventHint::InsertText {
273                    event_len,
274                    unicode_len: len,
275                    ..
276                },
277                EventHint::InsertText {
278                    event_len: r_event_len,
279                    unicode_len: r_len,
280                    ..
281                },
282            ) => {
283                *len += *r_len;
284                *event_len += *r_event_len;
285            }
286            (
287                EventHint::InsertList { len, pos: _ },
288                EventHint::InsertList { len: r_len, pos: _ },
289            ) => *len += *r_len,
290            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
291            (
292                EventHint::DeleteText { span, unicode_len },
293                EventHint::DeleteText {
294                    span: r_span,
295                    unicode_len: r_len,
296                },
297            ) => {
298                *unicode_len += *r_len;
299                span.merge(r_span, &());
300            }
301            _ => unreachable!(),
302        }
303    }
304
305    fn merge_left(&mut self, _: &Self) {
306        unreachable!()
307    }
308}
309
310impl Transaction {
311    #[inline]
312    pub fn new(doc: Arc<LoroDocInner>) -> Self {
313        Self::new_with_origin(doc.clone(), "".into())
314    }
315
316    pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
317        let oplog_lock = doc.oplog.lock().unwrap();
318        let mut state_lock = doc.state.lock().unwrap();
319        if state_lock.is_in_txn() {
320            panic!("Cannot start a transaction while another one is in progress");
321        }
322
323        state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
324        let arena = state_lock.arena.clone();
325        let frontiers = state_lock.frontiers.clone();
326        let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
327        let next_counter = oplog_lock.next_id(peer).counter;
328        let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
329        let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
330        oplog_lock
331            .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
332            .unwrap();
333        drop(state_lock);
334        drop(oplog_lock);
335        Self {
336            peer,
337            doc: Arc::downgrade(&doc),
338            arena,
339            frontiers,
340            timestamp: None,
341            next_counter,
342            next_lamport,
343            origin: Default::default(),
344            start_counter: next_counter,
345            start_lamport: next_lamport,
346            event_hints: Default::default(),
347            local_ops: RleVec::new(),
348            finished: false,
349            on_commit: None,
350            msg: None,
351            latest_timestamp,
352            is_peer_first_appearance: false,
353        }
354    }
355
356    pub fn set_origin(&mut self, origin: InternalString) {
357        self.origin = origin;
358    }
359
360    pub fn set_timestamp(&mut self, time: Timestamp) {
361        self.timestamp = Some(time);
362    }
363
364    pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
365        self.msg = msg;
366    }
367
368    pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
369        &self.local_ops
370    }
371
372    pub fn peer(&self) -> &PeerID {
373        &self.peer
374    }
375
376    pub fn timestamp(&self) -> &Option<Timestamp> {
377        &self.timestamp
378    }
379
380    pub fn frontiers(&self) -> &Frontiers {
381        &self.frontiers
382    }
383    pub fn msg(&self) -> &Option<Arc<str>> {
384        &self.msg
385    }
386
387    pub fn lamport(&self) -> &Lamport {
388        &self.start_lamport
389    }
390
391    pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
392        self.on_commit = Some(f);
393    }
394
395    pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
396        self.on_commit.take()
397    }
398
399    pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
400        self._commit()
401    }
402
403    #[tracing::instrument(level = "debug", skip(self))]
404    fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
405        if self.finished {
406            return Ok(None);
407        }
408
409        let Some(doc) = self.doc.upgrade() else {
410            return Ok(None);
411        };
412        self.finished = true;
413        if self.local_ops.is_empty() {
414            let mut state = doc.state.lock().unwrap();
415            state.abort_txn();
416            return Ok(Some(self.take_options()));
417        }
418
419        let ops = std::mem::take(&mut self.local_ops);
420        let deps = take(&mut self.frontiers);
421        let change = Change {
422            lamport: self.start_lamport,
423            ops,
424            deps,
425            id: ID::new(self.peer, self.start_counter),
426            timestamp: self.latest_timestamp.max(
427                self.timestamp
428                    .unwrap_or_else(|| doc.oplog.lock().unwrap().get_timestamp_for_next_txn()),
429            ),
430            commit_msg: take(&mut self.msg),
431        };
432
433        let change_meta = ChangeMeta::from_change(&change);
434        {
435            // add change to uncommit field of oplog
436            let mut oplog = doc.oplog.lock().unwrap();
437            oplog.set_uncommitted_change(change);
438        }
439
440        let modifier = ChangeModifier::default();
441        doc.pre_commit_subs.emit(
442            &(),
443            PreCommitCallbackPayload {
444                change_meta,
445                origin: self.origin.to_string(),
446                modifier: modifier.clone(),
447            },
448        );
449
450        let mut oplog = doc.oplog.lock().unwrap();
451        let mut state = doc.state.lock().unwrap();
452
453        let mut change = oplog.uncommitted_change.take().unwrap();
454        modifier.modify_change(&mut change);
455        let diff = if state.is_recording() {
456            Some(change_to_diff(
457                &change,
458                doc.clone(),
459                std::mem::take(&mut self.event_hints),
460            ))
461        } else {
462            None
463        };
464
465        let last_id = change.id_last();
466        if let Err(err) = oplog.import_local_change(change) {
467            state.abort_txn();
468            drop(state);
469            drop(oplog);
470            return Err(err);
471        }
472
473        state.commit_txn(
474            Frontiers::from_id(last_id),
475            diff.map(|arr| InternalDocDiff {
476                by: crate::event::EventTriggerKind::Local,
477                origin: self.origin.clone(),
478                diff: Cow::Owned(
479                    arr.into_iter()
480                        .map(|x| InternalContainerDiff {
481                            idx: x.idx,
482                            bring_back: false,
483                            is_container_deleted: false,
484                            diff: (x.diff.into()),
485                            diff_mode: crate::diff_calc::DiffMode::Linear,
486                        })
487                        .collect(),
488                ),
489                new_version: Cow::Borrowed(oplog.frontiers()),
490            }),
491        );
492        drop(state);
493        drop(oplog);
494        if let Some(on_commit) = self.on_commit.take() {
495            assert!(!doc.txn.is_locked());
496            on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
497        }
498        Ok(None)
499    }
500
501    fn take_options(&self) -> CommitOptions {
502        let mut options = CommitOptions::new();
503        if !self.origin.is_empty() {
504            options = options.origin(self.origin.as_str());
505        }
506        if let Some(msg) = self.msg.as_ref() {
507            options = options.commit_msg(msg);
508        }
509        if let Some(timestamp) = self.timestamp {
510            options = options.timestamp(timestamp);
511        }
512        options
513    }
514
515    pub(super) fn apply_local_op(
516        &mut self,
517        container: ContainerIdx,
518        content: RawOpContent,
519        event: EventHint,
520        // check whether context and txn are referring to the same state context
521        doc: &LoroDoc,
522    ) -> LoroResult<()> {
523        // TODO: need to check if the doc is the same
524        let this_doc = self.doc.upgrade().unwrap();
525        if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
526            return Err(LoroError::UnmatchedContext {
527                expected: this_doc
528                    .state
529                    .lock()
530                    .unwrap()
531                    .peer
532                    .load(std::sync::atomic::Ordering::Relaxed),
533                found: doc
534                    .state
535                    .lock()
536                    .unwrap()
537                    .peer
538                    .load(std::sync::atomic::Ordering::Relaxed),
539            });
540        }
541
542        let len = content.content_len();
543        assert!(len > 0);
544        let raw_op = RawOp {
545            id: ID {
546                peer: self.peer,
547                counter: self.next_counter,
548            },
549            lamport: self.next_lamport,
550            container,
551            content,
552        };
553
554        let mut oplog = doc.oplog.lock().unwrap();
555        let mut state = doc.state.lock().unwrap();
556        if state.is_deleted(container) {
557            return Err(LoroError::ContainerDeleted {
558                container: Box::new(state.arena.idx_to_id(container).unwrap()),
559            });
560        }
561
562        let op = self.arena.convert_raw_op(&raw_op);
563        state.apply_local_op(&raw_op, &op)?;
564        {
565            if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
566                self.is_peer_first_appearance = true;
567            }
568            // update version info
569            let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
570            let start_id = ID::new(self.peer, self.next_counter);
571            self.next_counter += len as Counter;
572            oplog.dag.update_version_on_new_local_op(
573                if self.local_ops.is_empty() {
574                    &self.frontiers
575                } else {
576                    &dep_id
577                },
578                start_id,
579                self.next_lamport,
580                len,
581            );
582            self.next_lamport += len as Lamport;
583            // set frontiers to the last op id
584            let last_id = start_id.inc(len as Counter - 1);
585            state.frontiers = Frontiers::from_id(last_id);
586        };
587        drop(state);
588        drop(oplog);
589        debug_assert_eq!(
590            event.rle_len(),
591            op.atom_len(),
592            "event:{:#?} \nop:{:#?}",
593            &event,
594            &op
595        );
596
597        match self.event_hints.last_mut() {
598            Some(last) if last.can_merge(&event) => {
599                last.merge_right(&event);
600            }
601            _ => {
602                self.event_hints.push(event);
603            }
604        }
605        self.local_ops.push(op);
606        Ok(())
607    }
608
609    /// id can be a str, ContainerID, or ContainerIdRaw.
610    /// if it's str it will use Root container, which will not be None
611    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
612        let id = id.into_container_id(&self.arena, ContainerType::Text);
613        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
614            .into_text()
615            .unwrap()
616    }
617
618    /// id can be a str, ContainerID, or ContainerIdRaw.
619    /// if it's str it will use Root container, which will not be None
620    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
621        let id = id.into_container_id(&self.arena, ContainerType::List);
622        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
623            .into_list()
624            .unwrap()
625    }
626
627    /// id can be a str, ContainerID, or ContainerIdRaw.
628    /// if it's str it will use Root container, which will not be None
629    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
630        let id = id.into_container_id(&self.arena, ContainerType::Map);
631        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
632            .into_map()
633            .unwrap()
634    }
635
636    /// id can be a str, ContainerID, or ContainerIdRaw.
637    /// if it's str it will use Root container, which will not be None
638    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
639        let id = id.into_container_id(&self.arena, ContainerType::Tree);
640        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
641            .into_tree()
642            .unwrap()
643    }
644    pub fn next_id(&self) -> ID {
645        ID {
646            peer: self.peer,
647            counter: self.next_counter,
648        }
649    }
650
651    #[inline]
652    pub fn id_span(&self) -> IdSpan {
653        IdSpan::new(self.peer, self.start_counter, self.next_counter)
654    }
655
656    pub fn next_idlp(&self) -> IdLp {
657        IdLp {
658            peer: self.peer,
659            lamport: self.next_lamport,
660        }
661    }
662
663    pub fn is_empty(&self) -> bool {
664        self.local_ops.is_empty()
665    }
666
667    pub(crate) fn len(&self) -> usize {
668        (self.next_counter - self.start_counter) as usize
669    }
670
671    pub(crate) fn set_options(&mut self, options: CommitOptions) {
672        self.origin = options.origin.unwrap_or_default();
673        self.msg = options.commit_msg;
674        self.timestamp = options.timestamp;
675    }
676
677    pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
678        if self.origin.is_empty() {
679            self.origin = default_options.origin.unwrap_or_default();
680        }
681        if self.msg.is_none() {
682            self.msg = default_options.commit_msg;
683        }
684        if self.timestamp.is_none() {
685            self.timestamp = default_options.timestamp;
686        }
687    }
688}
689
690impl Drop for Transaction {
691    #[tracing::instrument(level = "debug", skip(self))]
692    fn drop(&mut self) {
693        if !self.finished {
694            // TODO: should we abort here or commit here?
695            // what if commit fails?
696            self._commit().unwrap();
697        }
698    }
699}
700
701#[derive(Debug, Clone)]
702pub(crate) struct TxnContainerDiff {
703    pub(crate) idx: ContainerIdx,
704    pub(crate) diff: Diff,
705}
706
707// PERF: could be compacter
708fn change_to_diff(
709    change: &Change,
710    doc: Arc<LoroDocInner>,
711    event_hints: Vec<EventHint>,
712) -> Vec<TxnContainerDiff> {
713    let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
714    let peer = change.id.peer;
715    let mut lamport = change.lamport;
716    let mut event_hint_iter = event_hints.into_iter();
717    let mut o_hint = event_hint_iter.next();
718    let mut op_iter = change.ops.iter();
719    while let Some(op) = op_iter.next() {
720        let Some(hint) = o_hint.as_mut() else {
721            unreachable!()
722        };
723
724        let mut ops: SmallVec<[&Op; 1]> = smallvec![op];
725        let hint = match op.atom_len().cmp(&hint.rle_len()) {
726            std::cmp::Ordering::Less => {
727                let mut len = op.atom_len();
728                while len < hint.rle_len() {
729                    let next = op_iter.next().unwrap();
730                    len += next.atom_len();
731                    ops.push(next);
732                }
733                assert!(len == hint.rle_len());
734                match event_hint_iter.next() {
735                    Some(n) => o_hint.replace(n).unwrap(),
736                    None => o_hint.take().unwrap(),
737                }
738            }
739            std::cmp::Ordering::Equal => match event_hint_iter.next() {
740                Some(n) => o_hint.replace(n).unwrap(),
741                None => o_hint.take().unwrap(),
742            },
743            std::cmp::Ordering::Greater => {
744                unreachable!("{:#?}", &op)
745            }
746        };
747
748        match &hint {
749            EventHint::InsertText { .. }
750            | EventHint::InsertList { .. }
751            | EventHint::DeleteText { .. }
752            | EventHint::DeleteList(_) => {}
753            _ => {
754                assert_eq!(ops.len(), 1);
755            }
756        }
757        match hint {
758            EventHint::Mark { start, end, style } => {
759                let mut meta = StyleMeta::default();
760                meta.insert(
761                    style.key.clone(),
762                    StyleMetaItem {
763                        lamport,
764                        peer: change.id.peer,
765                        value: style.data,
766                    },
767                );
768                let diff = DeltaRopeBuilder::new()
769                    .retain(start as usize, Default::default())
770                    .retain(
771                        (end - start) as usize,
772                        meta.to_option_map().unwrap_or_default().into(),
773                    )
774                    .build();
775                ans.push(TxnContainerDiff {
776                    idx: op.container,
777                    diff: Diff::Text(diff),
778                });
779            }
780            EventHint::InsertText { styles, pos, .. } => {
781                let mut delta: TextDiff = DeltaRopeBuilder::new()
782                    .retain(pos as usize, Default::default())
783                    .build();
784                for op in ops.iter() {
785                    let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
786                    else {
787                        unreachable!()
788                    };
789
790                    delta.push_insert(
791                        slice.clone().into(),
792                        styles.to_option_map().unwrap_or_default().into(),
793                    );
794                }
795                ans.push(TxnContainerDiff {
796                    idx: op.container,
797                    diff: Diff::Text(delta),
798                })
799            }
800            EventHint::DeleteText {
801                span,
802                unicode_len: _,
803                // we don't need to iter over ops here, because we already
804                // know what the events should be
805            } => ans.push(TxnContainerDiff {
806                idx: op.container,
807                diff: Diff::Text(
808                    DeltaRopeBuilder::new()
809                        .retain(span.start() as usize, Default::default())
810                        .delete(span.len())
811                        .build(),
812                ),
813            }),
814            EventHint::InsertList { pos, .. } => {
815                // We should use pos from event hint because index in op may
816                // be using op index for the MovableList
817                for op in ops.iter() {
818                    let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
819                    let values = doc
820                        .arena
821                        .get_values(range.to_range())
822                        .into_iter()
823                        .map(|v| ValueOrHandler::from_value(v, &doc));
824                    ans.push(TxnContainerDiff {
825                        idx: op.container,
826                        diff: Diff::List(
827                            DeltaRopeBuilder::new()
828                                .retain(pos, Default::default())
829                                .insert_many(values, Default::default())
830                                .build(),
831                        ),
832                    })
833                }
834            }
835            EventHint::DeleteList(s) => {
836                ans.push(TxnContainerDiff {
837                    idx: op.container,
838                    diff: Diff::List(
839                        DeltaRopeBuilder::new()
840                            .retain(s.start() as usize, Default::default())
841                            .delete(s.len())
842                            .build(),
843                    ),
844                });
845            }
846            EventHint::Map { key, value } => ans.push(TxnContainerDiff {
847                idx: op.container,
848                diff: Diff::Map(ResolvedMapDelta::new().with_entry(
849                    key,
850                    ResolvedMapValue {
851                        value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
852                        idlp: IdLp::new(peer, lamport),
853                    },
854                )),
855            }),
856            EventHint::Tree(tree_diff) => {
857                let mut diff = TreeDiff::default();
858                diff.diff.extend(tree_diff.into_iter());
859                ans.push(TxnContainerDiff {
860                    idx: op.container,
861                    diff: Diff::Tree(diff),
862                });
863            }
864            EventHint::Move { from, to, value } => {
865                let mut a = DeltaRopeBuilder::new()
866                    .retain(from as usize, Default::default())
867                    .delete(1)
868                    .build();
869                a.compose(
870                    &DeltaRopeBuilder::new()
871                        .retain(to as usize, Default::default())
872                        .insert(
873                            ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
874                            ListDeltaMeta { from_move: true },
875                        )
876                        .build(),
877                );
878                ans.push(TxnContainerDiff {
879                    idx: op.container,
880                    diff: Diff::List(a),
881                });
882            }
883            EventHint::SetList { index, value } => {
884                ans.push(TxnContainerDiff {
885                    idx: op.container,
886                    diff: Diff::List(
887                        DeltaRopeBuilder::new()
888                            .retain(index, Default::default())
889                            .delete(1)
890                            .insert(
891                                ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
892                                Default::default(),
893                            )
894                            .build(),
895                    ),
896                });
897            }
898            EventHint::MarkEnd => {
899                // do nothing
900            }
901            #[cfg(feature = "counter")]
902            EventHint::Counter(diff) => {
903                ans.push(TxnContainerDiff {
904                    idx: op.container,
905                    diff: Diff::Counter(diff),
906                });
907            }
908        }
909
910        lamport += ops
911            .iter()
912            .map(|x| x.content_len() as Lamport)
913            .sum::<Lamport>();
914    }
915    ans
916}