Skip to main content

loro_internal/
txn.rs

1use std::{
2    borrow::Cow,
3    mem::take,
4    sync::{Arc, Weak},
5};
6
7use enum_as_inner::EnumAsInner;
8use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
9use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
10use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
11use rle::{HasLength, Mergable, RleVec, Sliceable};
12use rustc_hash::FxHashMap;
13use smallvec::{smallvec, SmallVec};
14
15use crate::{
16    change::{Change, Lamport, Timestamp},
17    container::{
18        idx::ContainerIdx,
19        list::list_op::{DeleteSpan, InnerListOp},
20        richtext::Style,
21        IntoContainerId,
22    },
23    delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
24    encoding::export_fast_updates_in_range,
25    event::{Diff, ListDeltaMeta, TextDiff},
26    handler::{Handler, ValueOrHandler},
27    id::{Counter, PeerID, ID},
28    lock::{LoroMutex, LoroMutexGuard},
29    loro::CommitOptions,
30    op::{Op, RawOp, RawOpContent},
31    pre_commit::{ChangeModifier, PreCommitCallbackPayload},
32    span::HasIdSpan,
33    version::Frontiers,
34    ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
35};
36
37use super::{
38    arena::SharedArena,
39    event::{InternalContainerDiff, InternalDocDiff},
40    handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
41    oplog::OpLog,
42    state::DocState,
43};
44
45impl crate::LoroDoc {
46    /// Create a new transaction.
47    /// Every ops created inside one transaction will be packed into a single
48    /// [Change].
49    ///
50    /// There can only be one active transaction at a time for a [LoroDoc].
51    #[inline(always)]
52    pub fn txn(&self) -> Result<Transaction, LoroError> {
53        self.txn_with_origin("")
54    }
55
56    /// Create a new transaction with specified origin.
57    ///
58    /// The origin will be propagated to the events.
59    /// There can only be one active transaction at a time for a [LoroDoc].
60    pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
61        if !self.can_edit() {
62            return Err(LoroError::TransactionError(
63                String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
64            ));
65        }
66
67        let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into())?;
68
69        let obs = self.observer.clone();
70        let local_update_subs_weak = self.local_update_subs.downgrade();
71        txn.set_on_commit(Box::new(move |state, oplog, id_span| {
72            let mut state = state.lock();
73            let events = state.take_events();
74            drop(state);
75            for event in events {
76                obs.emit(event);
77            }
78
79            if id_span.atom_len() == 0 {
80                return;
81            }
82
83            if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
84                if !local_update_subs.inner().is_empty() {
85                    let bytes = { export_fast_updates_in_range(&oplog.lock(), &[id_span]) };
86                    local_update_subs.emit(&(), bytes);
87                }
88            }
89        }));
90
91        Ok(txn)
92    }
93
94    pub fn start_auto_commit(&self) {
95        self.auto_commit
96            .store(true, std::sync::atomic::Ordering::Release);
97        let mut self_txn = self.txn.lock();
98        if self_txn.is_some() || !self.can_edit() {
99            return;
100        }
101
102        let txn = self
103            .txn()
104            .expect("auto-commit should be able to create a transaction");
105        self_txn.replace(txn);
106    }
107
108    #[inline]
109    pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
110        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
111            let mut self_txn = self.txn.lock();
112            if self_txn.is_some() {
113                return;
114            }
115
116            let mut txn = self
117                .txn()
118                .expect("auto-commit should be able to renew a transaction");
119            if let Some(options) = options {
120                txn.set_options(options);
121            }
122            self_txn.replace(txn);
123        }
124    }
125
126    #[inline]
127    pub(crate) fn _renew_txn_if_auto_commit_with_guard(
128        &self,
129        options: Option<CommitOptions>,
130        mut guard: LoroMutexGuard<Option<Transaction>>,
131    ) {
132        if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
133            if guard.is_some() {
134                return;
135            }
136
137            let mut txn = self
138                .txn()
139                .expect("auto-commit should be able to renew a transaction");
140            if let Some(options) = options {
141                txn.set_options(options);
142            }
143            guard.replace(txn);
144        }
145    }
146}
147
148pub(crate) type OnCommitFn =
149    Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
150
151pub struct Transaction {
152    peer: PeerID,
153    origin: InternalString,
154    start_counter: Counter,
155    next_counter: Counter,
156    start_lamport: Lamport,
157    next_lamport: Lamport,
158    doc: Weak<LoroDocInner>,
159    frontiers: Frontiers,
160    local_ops: RleVec<[Op; 1]>, // TODO: use a more efficient data structure
161    event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
162    pub(super) arena: SharedArena,
163    finished: bool,
164    on_commit: Option<OnCommitFn>,
165    timestamp: Option<Timestamp>,
166    msg: Option<Arc<str>>,
167    latest_timestamp: Timestamp,
168    pub(super) is_peer_first_appearance: bool,
169}
170
171impl std::fmt::Debug for Transaction {
172    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173        f.debug_struct("Transaction")
174            .field("peer", &self.peer)
175            .field("origin", &self.origin)
176            .field("start_counter", &self.start_counter)
177            .field("next_counter", &self.next_counter)
178            .field("start_lamport", &self.start_lamport)
179            .field("next_lamport", &self.next_lamport)
180            .field("frontiers", &self.frontiers)
181            .field("local_ops", &self.local_ops)
182            .field("event_hints", &self.event_hints)
183            .field("arena", &self.arena)
184            .field("finished", &self.finished)
185            .field("on_commit", &self.on_commit.is_some())
186            .field("timestamp", &self.timestamp)
187            .finish()
188    }
189}
190
191/// We can infer local events directly from the local behavior. This enum is used to
192/// record them, so that we can avoid recalculate them when we commit the transaction.
193///
194/// For example, when we insert a text in wasm, users use the utf16 index to send the
195/// command. However, internally loro will convert it to unicode index. But the users
196/// still need events that are in utf16 index. To avoid the round trip, we record the
197/// events here.
198#[derive(Debug, Clone, EnumAsInner)]
199pub(super) enum EventHint {
200    Mark {
201        start: u32,
202        end: u32,
203        style: Style,
204    },
205    InsertText {
206        /// pos is a Unicode index. If wasm, it's a UTF-16 index.
207        pos: u32,
208        event_len: u32,
209        unicode_len: u32,
210        styles: StyleMeta,
211    },
212    /// pos is a Unicode index. If wasm, it's a UTF-16 index.
213    DeleteText {
214        span: DeleteSpan,
215        unicode_len: usize,
216    },
217    InsertList {
218        len: u32,
219        pos: usize,
220    },
221    SetList {
222        index: usize,
223        value: LoroValue,
224    },
225    Move {
226        value: LoroValue,
227        from: u32,
228        to: u32,
229    },
230    DeleteList(DeleteSpan),
231    Map {
232        key: InternalString,
233        value: Option<LoroValue>,
234    },
235    // use vec because we could bring back some node that has children
236    Tree(SmallVec<[TreeDiffItem; 1]>),
237    MarkEnd,
238    #[cfg(feature = "counter")]
239    Counter(f64),
240}
241
242impl generic_btree::rle::HasLength for EventHint {
243    fn rle_len(&self) -> usize {
244        match self {
245            EventHint::Mark { .. } => 1,
246            EventHint::InsertText {
247                unicode_len: len, ..
248            } => *len as usize,
249            EventHint::DeleteText { unicode_len, .. } => *unicode_len,
250            EventHint::InsertList { len, .. } => *len as usize,
251            EventHint::DeleteList(d) => d.len(),
252            EventHint::Map { .. } => 1,
253            EventHint::Tree(_) => 1,
254            EventHint::MarkEnd => 1,
255            EventHint::Move { .. } => 1,
256            EventHint::SetList { .. } => 1,
257            #[cfg(feature = "counter")]
258            EventHint::Counter(_) => 1,
259        }
260    }
261}
262
263impl generic_btree::rle::Mergeable for EventHint {
264    fn can_merge(&self, rhs: &Self) -> bool {
265        match (self, rhs) {
266            (
267                EventHint::InsertText {
268                    pos,
269                    unicode_len: _,
270                    event_len,
271                    styles,
272                },
273                EventHint::InsertText {
274                    pos: r_pos,
275                    styles: r_styles,
276                    ..
277                },
278            ) => *pos + *event_len == *r_pos && styles == r_styles,
279            (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
280                pos + *len as usize == *pos_right
281            }
282            // We don't merge delete text because it's hard to infer the correct pos to split:
283            // `range` param is in unicode range, but the delete text event is in UTF-16 range.
284            // Without the original text, it's impossible to convert the range.
285            (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
286                span.is_mergable(r, &())
287            }
288            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
289            _ => false,
290        }
291    }
292
293    fn merge_right(&mut self, rhs: &Self) {
294        match (self, rhs) {
295            (
296                EventHint::InsertText {
297                    event_len,
298                    unicode_len: len,
299                    ..
300                },
301                EventHint::InsertText {
302                    event_len: r_event_len,
303                    unicode_len: r_len,
304                    ..
305                },
306            ) => {
307                *len += *r_len;
308                *event_len += *r_event_len;
309            }
310            (
311                EventHint::InsertList { len, pos: _ },
312                EventHint::InsertList { len: r_len, pos: _ },
313            ) => *len += *r_len,
314            (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
315            (
316                EventHint::DeleteText { span, unicode_len },
317                EventHint::DeleteText {
318                    span: r_span,
319                    unicode_len: r_len,
320                },
321            ) => {
322                *unicode_len += *r_len;
323                span.merge(r_span, &());
324            }
325            _ => unreachable!(),
326        }
327    }
328
329    fn merge_left(&mut self, _: &Self) {
330        unreachable!()
331    }
332}
333
334impl Transaction {
335    #[inline]
336    pub fn new(doc: Arc<LoroDocInner>) -> LoroResult<Self> {
337        Self::new_with_origin(doc.clone(), "".into())
338    }
339
340    pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> LoroResult<Self> {
341        let oplog_lock = doc.oplog.lock();
342        let mut state_lock = doc.state.lock();
343        if state_lock.is_in_txn() {
344            return Err(LoroError::DuplicatedTransactionError);
345        }
346
347        state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
348        let arena = state_lock.arena.clone();
349        let frontiers = state_lock.frontiers.clone();
350        let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
351        let next_counter = oplog_lock.next_id(peer).counter;
352        let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
353        let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
354        if let Err(err) =
355            oplog_lock.check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
356        {
357            state_lock.abort_txn();
358            return Err(err);
359        }
360        drop(state_lock);
361        drop(oplog_lock);
362        Ok(Self {
363            peer,
364            doc: Arc::downgrade(&doc),
365            arena,
366            frontiers,
367            timestamp: None,
368            next_counter,
369            next_lamport,
370            origin: Default::default(),
371            start_counter: next_counter,
372            start_lamport: next_lamport,
373            event_hints: Default::default(),
374            local_ops: RleVec::new(),
375            finished: false,
376            on_commit: None,
377            msg: None,
378            latest_timestamp,
379            is_peer_first_appearance: false,
380        })
381    }
382
383    pub fn set_origin(&mut self, origin: InternalString) {
384        self.origin = origin;
385    }
386
387    pub fn set_timestamp(&mut self, time: Timestamp) {
388        self.timestamp = Some(time);
389    }
390
391    pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
392        self.msg = msg;
393    }
394
395    pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
396        &self.local_ops
397    }
398
399    pub fn peer(&self) -> &PeerID {
400        &self.peer
401    }
402
403    pub fn timestamp(&self) -> &Option<Timestamp> {
404        &self.timestamp
405    }
406
407    pub fn frontiers(&self) -> &Frontiers {
408        &self.frontiers
409    }
410    pub fn msg(&self) -> &Option<Arc<str>> {
411        &self.msg
412    }
413
414    pub fn lamport(&self) -> &Lamport {
415        &self.start_lamport
416    }
417
418    pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
419        self.on_commit = Some(f);
420    }
421
422    pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
423        self.on_commit.take()
424    }
425
426    pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
427        self._commit()
428    }
429
430    #[tracing::instrument(level = "debug", skip(self))]
431    fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
432        if self.finished {
433            return Ok(None);
434        }
435
436        let Some(doc) = self.doc.upgrade() else {
437            return Ok(None);
438        };
439        self.finished = true;
440        if self.local_ops.is_empty() {
441            let mut state = doc.state.lock();
442            state.abort_txn();
443            return Ok(Some(self.take_options()));
444        }
445
446        let ops = std::mem::take(&mut self.local_ops);
447        let deps = take(&mut self.frontiers);
448        let change = Change {
449            lamport: self.start_lamport,
450            ops,
451            deps,
452            id: ID::new(self.peer, self.start_counter),
453            timestamp: self.latest_timestamp.max(
454                self.timestamp
455                    .unwrap_or_else(|| doc.oplog.lock().get_timestamp_for_next_txn()),
456            ),
457            commit_msg: take(&mut self.msg),
458        };
459
460        let change_meta = ChangeMeta::from_change(&change);
461        {
462            // add change to uncommit field of oplog
463            let mut oplog = doc.oplog.lock();
464            oplog.set_uncommitted_change(change);
465        }
466
467        let modifier = ChangeModifier::default();
468        doc.pre_commit_subs.emit(
469            &(),
470            PreCommitCallbackPayload {
471                change_meta,
472                origin: self.origin.to_string(),
473                modifier: modifier.clone(),
474            },
475        );
476
477        let mut oplog = doc.oplog.lock();
478        let mut state = doc.state.lock();
479
480        let Some(mut change) = oplog.uncommitted_change.take() else {
481            state.abort_txn();
482            drop(state);
483            drop(oplog);
484            return Err(LoroError::internal(
485                "missing uncommitted change while committing transaction",
486            ));
487        };
488        modifier.modify_change(&mut change);
489        let diff = if state.is_recording() {
490            Some(change_to_diff(
491                &change,
492                doc.clone(),
493                std::mem::take(&mut self.event_hints),
494            ))
495        } else {
496            None
497        };
498
499        let last_id = change.id_last();
500        if let Err(err) = oplog.import_local_change(change) {
501            state.abort_txn();
502            drop(state);
503            drop(oplog);
504            return Err(err);
505        }
506
507        state.commit_txn(
508            Frontiers::from_id(last_id),
509            diff.map(|arr| InternalDocDiff {
510                by: crate::event::EventTriggerKind::Local,
511                origin: self.origin.clone(),
512                diff: Cow::Owned(
513                    arr.into_iter()
514                        .map(|x| InternalContainerDiff {
515                            idx: x.idx,
516                            bring_back: false,
517                            diff: (x.diff.into()),
518                            diff_mode: crate::diff_calc::DiffMode::Linear,
519                        })
520                        .collect(),
521                ),
522                new_version: Cow::Borrowed(oplog.frontiers()),
523            }),
524        );
525        drop(state);
526        drop(oplog);
527        if let Some(on_commit) = self.on_commit.take() {
528            assert!(!doc.txn.is_locked());
529            on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
530        }
531        Ok(None)
532    }
533
534    fn take_options(&self) -> CommitOptions {
535        let mut options = CommitOptions::new();
536        if !self.origin.is_empty() {
537            options = options.origin(self.origin.as_str());
538        }
539        if let Some(msg) = self.msg.as_ref() {
540            options = options.commit_msg(msg);
541        }
542        if let Some(timestamp) = self.timestamp {
543            options = options.timestamp(timestamp);
544        }
545        options
546    }
547
548    pub(super) fn apply_local_op(
549        &mut self,
550        container: ContainerIdx,
551        content: RawOpContent,
552        event: EventHint,
553        // check whether context and txn are referring to the same state context
554        doc: &LoroDoc,
555    ) -> LoroResult<()> {
556        // TODO: need to check if the doc is the same
557        let this_doc = self.doc.upgrade().unwrap();
558        if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
559            return Err(LoroError::UnmatchedContext {
560                expected: this_doc
561                    .state
562                    .lock()
563                    .peer
564                    .load(std::sync::atomic::Ordering::Relaxed),
565                found: doc
566                    .state
567                    .lock()
568                    .peer
569                    .load(std::sync::atomic::Ordering::Relaxed),
570            });
571        }
572
573        let len = content.content_len();
574        assert!(len > 0);
575        let raw_op = RawOp {
576            id: ID {
577                peer: self.peer,
578                counter: self.next_counter,
579            },
580            lamport: self.next_lamport,
581            container,
582            content,
583        };
584
585        let mut oplog = doc.oplog.lock();
586        let mut state = doc.state.lock();
587        if state.is_deleted(container) {
588            return Err(LoroError::ContainerDeleted {
589                container: Box::new(state.arena.idx_to_id(container).unwrap()),
590            });
591        }
592
593        let op = self.arena.convert_raw_op(&raw_op);
594        state.apply_local_op(&raw_op, &op)?;
595        {
596            if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
597                self.is_peer_first_appearance = true;
598            }
599            // update version info
600            let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
601            let start_id = ID::new(self.peer, self.next_counter);
602            self.next_counter += len as Counter;
603            oplog.dag.update_version_on_new_local_op(
604                if self.local_ops.is_empty() {
605                    &self.frontiers
606                } else {
607                    &dep_id
608                },
609                start_id,
610                self.next_lamport,
611                len,
612            );
613            oplog.refresh_visible_op_count();
614            self.next_lamport += len as Lamport;
615            // set frontiers to the last op id
616            let last_id = start_id.inc(len as Counter - 1);
617            state.frontiers = Frontiers::from_id(last_id);
618        };
619        drop(state);
620        drop(oplog);
621        debug_assert_eq!(
622            event.rle_len(),
623            op.atom_len(),
624            "event:{:#?} \nop:{:#?}",
625            &event,
626            &op
627        );
628
629        let container_hints = self.event_hints.entry(container).or_default();
630
631        match container_hints.last_mut() {
632            Some(last) if last.can_merge(&event) => {
633                last.merge_right(&event);
634            }
635            _ => {
636                container_hints.push(event);
637            }
638        }
639        self.local_ops.push(op);
640        Ok(())
641    }
642
643    /// id can be a str, ContainerID, or ContainerIdRaw.
644    /// if it's str it will use Root container, which will not be None
645    pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
646        let id = id.into_container_id(&self.arena, ContainerType::Text);
647        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
648            .into_text()
649            .unwrap()
650    }
651
652    /// id can be a str, ContainerID, or ContainerIdRaw.
653    /// if it's str it will use Root container, which will not be None
654    pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
655        let id = id.into_container_id(&self.arena, ContainerType::List);
656        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
657            .into_list()
658            .unwrap()
659    }
660
661    /// id can be a str, ContainerID, or ContainerIdRaw.
662    /// if it's str it will use Root container, which will not be None
663    pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
664        let id = id.into_container_id(&self.arena, ContainerType::Map);
665        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
666            .into_map()
667            .unwrap()
668    }
669
670    /// id can be a str, ContainerID, or ContainerIdRaw.
671    /// if it's str it will use Root container, which will not be None
672    pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
673        let id = id.into_container_id(&self.arena, ContainerType::Tree);
674        Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
675            .into_tree()
676            .unwrap()
677    }
678    pub fn next_id(&self) -> ID {
679        ID {
680            peer: self.peer,
681            counter: self.next_counter,
682        }
683    }
684
685    #[inline]
686    pub fn id_span(&self) -> IdSpan {
687        IdSpan::new(self.peer, self.start_counter, self.next_counter)
688    }
689
690    pub fn next_idlp(&self) -> IdLp {
691        IdLp {
692            peer: self.peer,
693            lamport: self.next_lamport,
694        }
695    }
696
697    pub fn is_empty(&self) -> bool {
698        self.local_ops.is_empty()
699    }
700
701    pub(crate) fn len(&self) -> usize {
702        (self.next_counter - self.start_counter) as usize
703    }
704
705    pub(crate) fn set_options(&mut self, options: CommitOptions) {
706        self.origin = options.origin.unwrap_or_default();
707        self.msg = options.commit_msg;
708        self.timestamp = options.timestamp;
709    }
710
711    pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
712        if self.origin.is_empty() {
713            self.origin = default_options.origin.unwrap_or_default();
714        }
715        if self.msg.is_none() {
716            self.msg = default_options.commit_msg;
717        }
718        if self.timestamp.is_none() {
719            self.timestamp = default_options.timestamp;
720        }
721    }
722}
723
724impl Drop for Transaction {
725    #[tracing::instrument(level = "debug", skip(self))]
726    fn drop(&mut self) {
727        if !self.finished {
728            // TODO: should we abort here or commit here?
729            // what if commit fails?
730            self._commit().unwrap();
731        }
732    }
733}
734
735#[derive(Debug, Clone)]
736pub(crate) struct TxnContainerDiff {
737    pub(crate) idx: ContainerIdx,
738    pub(crate) diff: Diff,
739}
740
741// PERF: could be compacter
742fn change_to_diff(
743    change: &Change,
744    doc: Arc<LoroDocInner>,
745    event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
746) -> Vec<TxnContainerDiff> {
747    let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
748    let peer = change.id.peer;
749    let mut lamport = change.lamport;
750
751    // Group ops by container first to match our new structure
752    let mut ops_by_container: FxHashMap<ContainerIdx, Vec<Op>> = FxHashMap::default();
753    for op in change.ops.iter() {
754        ops_by_container
755            .entry(op.container)
756            .or_default()
757            .push(op.clone());
758    }
759
760    // Process each container's hints and ops together
761    for (container_idx, hints) in event_hints {
762        let Some(container_ops) = ops_by_container.get_mut(&container_idx) else {
763            continue;
764        };
765
766        let mut op_index = 0;
767        let mut hint_iter = hints.into_iter();
768        let mut current_hint = hint_iter.next();
769
770        while op_index < container_ops.len() {
771            let Some(hint) = current_hint.take() else {
772                unreachable!("Missing hint for op");
773            };
774
775            // Collect ops that belong to this hint
776            let mut ops_for_hint: SmallVec<[Op; 1]> = smallvec![container_ops[op_index].clone()];
777            let mut total_len = container_ops[op_index].atom_len();
778
779            // If hint spans multiple ops, collect them
780            while total_len < hint.rle_len() {
781                op_index += 1;
782                let next_op_len = container_ops[op_index].atom_len();
783                let op = if next_op_len + total_len > hint.rle_len() {
784                    let new_len = hint.rle_len() - total_len;
785                    let left = container_ops[op_index].slice(0, new_len);
786                    let right = container_ops[op_index].slice(new_len, next_op_len);
787                    container_ops[op_index] = right;
788                    op_index -= 1;
789                    left
790                } else {
791                    container_ops[op_index].clone()
792                };
793
794                total_len += op.atom_len();
795                ops_for_hint.push(op);
796            }
797
798            op_index += 1;
799            assert_eq!(total_len, hint.rle_len(), "Op/hint length mismatch");
800
801            // Move to next hint
802            current_hint = hint_iter.next();
803
804            // Generate diff based on hint type
805            match hint {
806                EventHint::Mark { start, end, style } => {
807                    let mut meta = StyleMeta::default();
808                    meta.insert(
809                        style.key.clone(),
810                        StyleMetaItem {
811                            lamport,
812                            peer: change.id.peer,
813                            value: style.data,
814                        },
815                    );
816                    let diff = DeltaRopeBuilder::new()
817                        .retain(start as usize, Default::default())
818                        .retain(
819                            (end - start) as usize,
820                            meta.to_option_map().unwrap_or_default().into(),
821                        )
822                        .build();
823                    ans.push(TxnContainerDiff {
824                        idx: container_idx,
825                        diff: Diff::Text(diff),
826                    });
827                }
828                EventHint::InsertText { styles, pos, .. } => {
829                    let mut delta: TextDiff = DeltaRopeBuilder::new()
830                        .retain(pos as usize, Default::default())
831                        .build();
832                    for op in ops_for_hint.iter() {
833                        let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
834                        else {
835                            unreachable!()
836                        };
837
838                        delta.push_insert(
839                            slice.clone().into(),
840                            styles.to_option_map().unwrap_or_default().into(),
841                        );
842                    }
843                    ans.push(TxnContainerDiff {
844                        idx: container_idx,
845                        diff: Diff::Text(delta),
846                    })
847                }
848                EventHint::DeleteText {
849                    span,
850                    unicode_len: _,
851                    // we don't need to iter over ops here, because we already
852                    // know what the events should be
853                } => ans.push(TxnContainerDiff {
854                    idx: container_idx,
855                    diff: Diff::Text(
856                        DeltaRopeBuilder::new()
857                            .retain(span.start() as usize, Default::default())
858                            .delete(span.len())
859                            .build(),
860                    ),
861                }),
862                EventHint::InsertList { pos, .. } => {
863                    // We should use pos from event hint because index in op may
864                    // be using op index for the MovableList
865                    let mut index = pos;
866                    for op in ops_for_hint.iter() {
867                        let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
868                        let values = doc
869                            .arena
870                            .get_values(range.to_range())
871                            .into_iter()
872                            .map(|v| ValueOrHandler::from_value(v, &doc));
873                        let len = values.len();
874                        ans.push(TxnContainerDiff {
875                            idx: container_idx,
876                            diff: Diff::List(
877                                DeltaRopeBuilder::new()
878                                    .retain(index, Default::default())
879                                    .insert_many(values, Default::default())
880                                    .build(),
881                            ),
882                        });
883                        index += len;
884                    }
885                }
886                EventHint::DeleteList(s) => {
887                    ans.push(TxnContainerDiff {
888                        idx: container_idx,
889                        diff: Diff::List(
890                            DeltaRopeBuilder::new()
891                                .retain(s.start() as usize, Default::default())
892                                .delete(s.len())
893                                .build(),
894                        ),
895                    });
896                }
897                EventHint::Map { key, value } => ans.push(TxnContainerDiff {
898                    idx: container_idx,
899                    diff: Diff::Map(ResolvedMapDelta::new().with_entry(
900                        key,
901                        ResolvedMapValue {
902                            value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
903                            idlp: IdLp::new(peer, lamport),
904                        },
905                    )),
906                }),
907                EventHint::Tree(tree_diff) => {
908                    let mut diff = TreeDiff::default();
909                    diff.diff.extend(tree_diff.into_iter());
910                    ans.push(TxnContainerDiff {
911                        idx: container_idx,
912                        diff: Diff::Tree(diff),
913                    });
914                }
915                EventHint::Move { from, to, value } => {
916                    let mut a = DeltaRopeBuilder::new()
917                        .retain(from as usize, Default::default())
918                        .delete(1)
919                        .build();
920                    a.compose(
921                        &DeltaRopeBuilder::new()
922                            .retain(to as usize, Default::default())
923                            .insert(
924                                ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
925                                ListDeltaMeta { from_move: true },
926                            )
927                            .build(),
928                    );
929                    ans.push(TxnContainerDiff {
930                        idx: container_idx,
931                        diff: Diff::List(a),
932                    });
933                }
934                EventHint::SetList { index, value } => {
935                    ans.push(TxnContainerDiff {
936                        idx: container_idx,
937                        diff: Diff::List(
938                            DeltaRopeBuilder::new()
939                                .retain(index, Default::default())
940                                .delete(1)
941                                .insert(
942                                    ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
943                                    Default::default(),
944                                )
945                                .build(),
946                        ),
947                    });
948                }
949                EventHint::MarkEnd => {
950                    // do nothing
951                }
952                #[cfg(feature = "counter")]
953                EventHint::Counter(diff) => {
954                    ans.push(TxnContainerDiff {
955                        idx: container_idx,
956                        diff: Diff::Counter(diff),
957                    });
958                }
959            }
960
961            // Update lamport for this hint's operations
962            lamport += ops_for_hint
963                .iter()
964                .map(|x| x.content_len() as Lamport)
965                .sum::<Lamport>();
966        }
967    }
968
969    ans
970}
971
972#[cfg(test)]
973mod tests {
974    use super::*;
975    use crate::{cursor::PosType, version::Frontiers};
976
977    #[test]
978    fn txn_creation_rolls_back_in_txn_after_peer_conflict() {
979        let doc = LoroDoc::new();
980        doc.set_detached_editing(true);
981        doc.set_peer_id(7).unwrap();
982
983        let text = doc.get_text("text");
984        let mut txn = doc.txn().unwrap();
985        text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
986            .unwrap();
987        txn.commit().unwrap();
988
989        doc.checkout(&Frontiers::default()).unwrap();
990        doc.set_peer_id(7).unwrap();
991
992        let err = doc
993            .txn()
994            .expect_err("stale detached frontiers should reject reusing the same peer");
995        assert!(matches!(
996            err,
997            LoroError::ConcurrentOpsWithSamePeerID { peer: 7, .. }
998        ));
999        assert!(!doc.app_state().lock().is_in_txn());
1000    }
1001}