1use std::{
2 borrow::Cow,
3 mem::take,
4 sync::{Arc, Weak},
5};
6
7use enum_as_inner::EnumAsInner;
8use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
9use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
10use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
11use rle::{HasLength, Mergable, RleVec, Sliceable};
12use rustc_hash::FxHashMap;
13use smallvec::{smallvec, SmallVec};
14
15use crate::{
16 change::{Change, Lamport, Timestamp},
17 container::{
18 idx::ContainerIdx,
19 list::list_op::{DeleteSpan, InnerListOp},
20 richtext::Style,
21 IntoContainerId,
22 },
23 delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
24 encoding::export_fast_updates_in_range,
25 event::{Diff, ListDeltaMeta, TextDiff},
26 handler::{Handler, ValueOrHandler},
27 id::{Counter, PeerID, ID},
28 lock::{LoroMutex, LoroMutexGuard},
29 loro::CommitOptions,
30 op::{Op, RawOp, RawOpContent},
31 pre_commit::{ChangeModifier, PreCommitCallbackPayload},
32 span::HasIdSpan,
33 version::Frontiers,
34 ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
35};
36
37use super::{
38 arena::SharedArena,
39 event::{InternalContainerDiff, InternalDocDiff},
40 handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
41 oplog::OpLog,
42 state::DocState,
43};
44
45impl crate::LoroDoc {
46 #[inline(always)]
52 pub fn txn(&self) -> Result<Transaction, LoroError> {
53 self.txn_with_origin("")
54 }
55
56 pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
61 if !self.can_edit() {
62 return Err(LoroError::TransactionError(
63 String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
64 ));
65 }
66
67 let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into())?;
68
69 let obs = self.observer.clone();
70 let local_update_subs_weak = self.local_update_subs.downgrade();
71 txn.set_on_commit(Box::new(move |state, oplog, id_span| {
72 let mut state = state.lock();
73 let events = state.take_events();
74 drop(state);
75 for event in events {
76 obs.emit(event);
77 }
78
79 if id_span.atom_len() == 0 {
80 return;
81 }
82
83 if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
84 if !local_update_subs.inner().is_empty() {
85 let bytes = { export_fast_updates_in_range(&oplog.lock(), &[id_span]) };
86 local_update_subs.emit(&(), bytes);
87 }
88 }
89 }));
90
91 Ok(txn)
92 }
93
94 pub fn start_auto_commit(&self) {
95 self.auto_commit
96 .store(true, std::sync::atomic::Ordering::Release);
97 let mut self_txn = self.txn.lock();
98 if self_txn.is_some() || !self.can_edit() {
99 return;
100 }
101
102 let txn = self
103 .txn()
104 .expect("auto-commit should be able to create a transaction");
105 self_txn.replace(txn);
106 }
107
108 #[inline]
109 pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
110 if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
111 let mut self_txn = self.txn.lock();
112 if self_txn.is_some() {
113 return;
114 }
115
116 let mut txn = self
117 .txn()
118 .expect("auto-commit should be able to renew a transaction");
119 if let Some(options) = options {
120 txn.set_options(options);
121 }
122 self_txn.replace(txn);
123 }
124 }
125
126 #[inline]
127 pub(crate) fn _renew_txn_if_auto_commit_with_guard(
128 &self,
129 options: Option<CommitOptions>,
130 mut guard: LoroMutexGuard<Option<Transaction>>,
131 ) {
132 if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
133 if guard.is_some() {
134 return;
135 }
136
137 let mut txn = self
138 .txn()
139 .expect("auto-commit should be able to renew a transaction");
140 if let Some(options) = options {
141 txn.set_options(options);
142 }
143 guard.replace(txn);
144 }
145 }
146}
147
148pub(crate) type OnCommitFn =
149 Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
150
151pub struct Transaction {
152 peer: PeerID,
153 origin: InternalString,
154 start_counter: Counter,
155 next_counter: Counter,
156 start_lamport: Lamport,
157 next_lamport: Lamport,
158 doc: Weak<LoroDocInner>,
159 frontiers: Frontiers,
160 local_ops: RleVec<[Op; 1]>, event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
162 pub(super) arena: SharedArena,
163 finished: bool,
164 on_commit: Option<OnCommitFn>,
165 timestamp: Option<Timestamp>,
166 msg: Option<Arc<str>>,
167 latest_timestamp: Timestamp,
168 pub(super) is_peer_first_appearance: bool,
169}
170
171impl std::fmt::Debug for Transaction {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 f.debug_struct("Transaction")
174 .field("peer", &self.peer)
175 .field("origin", &self.origin)
176 .field("start_counter", &self.start_counter)
177 .field("next_counter", &self.next_counter)
178 .field("start_lamport", &self.start_lamport)
179 .field("next_lamport", &self.next_lamport)
180 .field("frontiers", &self.frontiers)
181 .field("local_ops", &self.local_ops)
182 .field("event_hints", &self.event_hints)
183 .field("arena", &self.arena)
184 .field("finished", &self.finished)
185 .field("on_commit", &self.on_commit.is_some())
186 .field("timestamp", &self.timestamp)
187 .finish()
188 }
189}
190
191#[derive(Debug, Clone, EnumAsInner)]
199pub(super) enum EventHint {
200 Mark {
201 start: u32,
202 end: u32,
203 style: Style,
204 },
205 InsertText {
206 pos: u32,
208 event_len: u32,
209 unicode_len: u32,
210 styles: StyleMeta,
211 },
212 DeleteText {
214 span: DeleteSpan,
215 unicode_len: usize,
216 },
217 InsertList {
218 len: u32,
219 pos: usize,
220 },
221 SetList {
222 index: usize,
223 value: LoroValue,
224 },
225 Move {
226 value: LoroValue,
227 from: u32,
228 to: u32,
229 },
230 DeleteList(DeleteSpan),
231 Map {
232 key: InternalString,
233 value: Option<LoroValue>,
234 },
235 Tree(SmallVec<[TreeDiffItem; 1]>),
237 MarkEnd,
238 #[cfg(feature = "counter")]
239 Counter(f64),
240}
241
242impl generic_btree::rle::HasLength for EventHint {
243 fn rle_len(&self) -> usize {
244 match self {
245 EventHint::Mark { .. } => 1,
246 EventHint::InsertText {
247 unicode_len: len, ..
248 } => *len as usize,
249 EventHint::DeleteText { unicode_len, .. } => *unicode_len,
250 EventHint::InsertList { len, .. } => *len as usize,
251 EventHint::DeleteList(d) => d.len(),
252 EventHint::Map { .. } => 1,
253 EventHint::Tree(_) => 1,
254 EventHint::MarkEnd => 1,
255 EventHint::Move { .. } => 1,
256 EventHint::SetList { .. } => 1,
257 #[cfg(feature = "counter")]
258 EventHint::Counter(_) => 1,
259 }
260 }
261}
262
263impl generic_btree::rle::Mergeable for EventHint {
264 fn can_merge(&self, rhs: &Self) -> bool {
265 match (self, rhs) {
266 (
267 EventHint::InsertText {
268 pos,
269 unicode_len: _,
270 event_len,
271 styles,
272 },
273 EventHint::InsertText {
274 pos: r_pos,
275 styles: r_styles,
276 ..
277 },
278 ) => *pos + *event_len == *r_pos && styles == r_styles,
279 (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
280 pos + *len as usize == *pos_right
281 }
282 (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
286 span.is_mergable(r, &())
287 }
288 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
289 _ => false,
290 }
291 }
292
293 fn merge_right(&mut self, rhs: &Self) {
294 match (self, rhs) {
295 (
296 EventHint::InsertText {
297 event_len,
298 unicode_len: len,
299 ..
300 },
301 EventHint::InsertText {
302 event_len: r_event_len,
303 unicode_len: r_len,
304 ..
305 },
306 ) => {
307 *len += *r_len;
308 *event_len += *r_event_len;
309 }
310 (
311 EventHint::InsertList { len, pos: _ },
312 EventHint::InsertList { len: r_len, pos: _ },
313 ) => *len += *r_len,
314 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
315 (
316 EventHint::DeleteText { span, unicode_len },
317 EventHint::DeleteText {
318 span: r_span,
319 unicode_len: r_len,
320 },
321 ) => {
322 *unicode_len += *r_len;
323 span.merge(r_span, &());
324 }
325 _ => unreachable!(),
326 }
327 }
328
329 fn merge_left(&mut self, _: &Self) {
330 unreachable!()
331 }
332}
333
334impl Transaction {
335 #[inline]
336 pub fn new(doc: Arc<LoroDocInner>) -> LoroResult<Self> {
337 Self::new_with_origin(doc.clone(), "".into())
338 }
339
340 pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> LoroResult<Self> {
341 let oplog_lock = doc.oplog.lock();
342 let mut state_lock = doc.state.lock();
343 if state_lock.is_in_txn() {
344 return Err(LoroError::DuplicatedTransactionError);
345 }
346
347 state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
348 let arena = state_lock.arena.clone();
349 let frontiers = state_lock.frontiers.clone();
350 let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
351 let next_counter = oplog_lock.next_id(peer).counter;
352 let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
353 let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
354 if let Err(err) =
355 oplog_lock.check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
356 {
357 state_lock.abort_txn();
358 return Err(err);
359 }
360 drop(state_lock);
361 drop(oplog_lock);
362 Ok(Self {
363 peer,
364 doc: Arc::downgrade(&doc),
365 arena,
366 frontiers,
367 timestamp: None,
368 next_counter,
369 next_lamport,
370 origin: Default::default(),
371 start_counter: next_counter,
372 start_lamport: next_lamport,
373 event_hints: Default::default(),
374 local_ops: RleVec::new(),
375 finished: false,
376 on_commit: None,
377 msg: None,
378 latest_timestamp,
379 is_peer_first_appearance: false,
380 })
381 }
382
383 pub fn set_origin(&mut self, origin: InternalString) {
384 self.origin = origin;
385 }
386
387 pub fn set_timestamp(&mut self, time: Timestamp) {
388 self.timestamp = Some(time);
389 }
390
391 pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
392 self.msg = msg;
393 }
394
395 pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
396 &self.local_ops
397 }
398
399 pub fn peer(&self) -> &PeerID {
400 &self.peer
401 }
402
403 pub fn timestamp(&self) -> &Option<Timestamp> {
404 &self.timestamp
405 }
406
407 pub fn frontiers(&self) -> &Frontiers {
408 &self.frontiers
409 }
410 pub fn msg(&self) -> &Option<Arc<str>> {
411 &self.msg
412 }
413
414 pub fn lamport(&self) -> &Lamport {
415 &self.start_lamport
416 }
417
418 pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
419 self.on_commit = Some(f);
420 }
421
422 pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
423 self.on_commit.take()
424 }
425
426 pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
427 self._commit()
428 }
429
430 #[tracing::instrument(level = "debug", skip(self))]
431 fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
432 if self.finished {
433 return Ok(None);
434 }
435
436 let Some(doc) = self.doc.upgrade() else {
437 return Ok(None);
438 };
439 self.finished = true;
440 if self.local_ops.is_empty() {
441 let mut state = doc.state.lock();
442 state.abort_txn();
443 return Ok(Some(self.take_options()));
444 }
445
446 let ops = std::mem::take(&mut self.local_ops);
447 let deps = take(&mut self.frontiers);
448 let change = Change {
449 lamport: self.start_lamport,
450 ops,
451 deps,
452 id: ID::new(self.peer, self.start_counter),
453 timestamp: self.latest_timestamp.max(
454 self.timestamp
455 .unwrap_or_else(|| doc.oplog.lock().get_timestamp_for_next_txn()),
456 ),
457 commit_msg: take(&mut self.msg),
458 };
459
460 let change_meta = ChangeMeta::from_change(&change);
461 {
462 let mut oplog = doc.oplog.lock();
464 oplog.set_uncommitted_change(change);
465 }
466
467 let modifier = ChangeModifier::default();
468 doc.pre_commit_subs.emit(
469 &(),
470 PreCommitCallbackPayload {
471 change_meta,
472 origin: self.origin.to_string(),
473 modifier: modifier.clone(),
474 },
475 );
476
477 let mut oplog = doc.oplog.lock();
478 let mut state = doc.state.lock();
479
480 let Some(mut change) = oplog.uncommitted_change.take() else {
481 state.abort_txn();
482 drop(state);
483 drop(oplog);
484 return Err(LoroError::internal(
485 "missing uncommitted change while committing transaction",
486 ));
487 };
488 modifier.modify_change(&mut change);
489 let diff = if state.is_recording() {
490 Some(change_to_diff(
491 &change,
492 doc.clone(),
493 std::mem::take(&mut self.event_hints),
494 ))
495 } else {
496 None
497 };
498
499 let last_id = change.id_last();
500 if let Err(err) = oplog.import_local_change(change) {
501 state.abort_txn();
502 drop(state);
503 drop(oplog);
504 return Err(err);
505 }
506
507 state.commit_txn(
508 Frontiers::from_id(last_id),
509 diff.map(|arr| InternalDocDiff {
510 by: crate::event::EventTriggerKind::Local,
511 origin: self.origin.clone(),
512 diff: Cow::Owned(
513 arr.into_iter()
514 .map(|x| InternalContainerDiff {
515 idx: x.idx,
516 bring_back: false,
517 diff: (x.diff.into()),
518 diff_mode: crate::diff_calc::DiffMode::Linear,
519 })
520 .collect(),
521 ),
522 new_version: Cow::Borrowed(oplog.frontiers()),
523 }),
524 );
525 drop(state);
526 drop(oplog);
527 if let Some(on_commit) = self.on_commit.take() {
528 assert!(!doc.txn.is_locked());
529 on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
530 }
531 Ok(None)
532 }
533
534 fn take_options(&self) -> CommitOptions {
535 let mut options = CommitOptions::new();
536 if !self.origin.is_empty() {
537 options = options.origin(self.origin.as_str());
538 }
539 if let Some(msg) = self.msg.as_ref() {
540 options = options.commit_msg(msg);
541 }
542 if let Some(timestamp) = self.timestamp {
543 options = options.timestamp(timestamp);
544 }
545 options
546 }
547
548 pub(super) fn apply_local_op(
549 &mut self,
550 container: ContainerIdx,
551 content: RawOpContent,
552 event: EventHint,
553 doc: &LoroDoc,
555 ) -> LoroResult<()> {
556 let this_doc = self.doc.upgrade().unwrap();
558 if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
559 return Err(LoroError::UnmatchedContext {
560 expected: this_doc
561 .state
562 .lock()
563 .peer
564 .load(std::sync::atomic::Ordering::Relaxed),
565 found: doc
566 .state
567 .lock()
568 .peer
569 .load(std::sync::atomic::Ordering::Relaxed),
570 });
571 }
572
573 let len = content.content_len();
574 assert!(len > 0);
575 let raw_op = RawOp {
576 id: ID {
577 peer: self.peer,
578 counter: self.next_counter,
579 },
580 lamport: self.next_lamport,
581 container,
582 content,
583 };
584
585 let mut oplog = doc.oplog.lock();
586 let mut state = doc.state.lock();
587 if state.is_deleted(container) {
588 return Err(LoroError::ContainerDeleted {
589 container: Box::new(state.arena.idx_to_id(container).unwrap()),
590 });
591 }
592
593 let op = self.arena.convert_raw_op(&raw_op);
594 state.apply_local_op(&raw_op, &op)?;
595 {
596 if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
597 self.is_peer_first_appearance = true;
598 }
599 let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
601 let start_id = ID::new(self.peer, self.next_counter);
602 self.next_counter += len as Counter;
603 oplog.dag.update_version_on_new_local_op(
604 if self.local_ops.is_empty() {
605 &self.frontiers
606 } else {
607 &dep_id
608 },
609 start_id,
610 self.next_lamport,
611 len,
612 );
613 oplog.refresh_visible_op_count();
614 self.next_lamport += len as Lamport;
615 let last_id = start_id.inc(len as Counter - 1);
617 state.frontiers = Frontiers::from_id(last_id);
618 };
619 drop(state);
620 drop(oplog);
621 debug_assert_eq!(
622 event.rle_len(),
623 op.atom_len(),
624 "event:{:#?} \nop:{:#?}",
625 &event,
626 &op
627 );
628
629 let container_hints = self.event_hints.entry(container).or_default();
630
631 match container_hints.last_mut() {
632 Some(last) if last.can_merge(&event) => {
633 last.merge_right(&event);
634 }
635 _ => {
636 container_hints.push(event);
637 }
638 }
639 self.local_ops.push(op);
640 Ok(())
641 }
642
643 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
646 let id = id.into_container_id(&self.arena, ContainerType::Text);
647 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
648 .into_text()
649 .unwrap()
650 }
651
652 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
655 let id = id.into_container_id(&self.arena, ContainerType::List);
656 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
657 .into_list()
658 .unwrap()
659 }
660
661 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
664 let id = id.into_container_id(&self.arena, ContainerType::Map);
665 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
666 .into_map()
667 .unwrap()
668 }
669
670 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
673 let id = id.into_container_id(&self.arena, ContainerType::Tree);
674 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
675 .into_tree()
676 .unwrap()
677 }
678 pub fn next_id(&self) -> ID {
679 ID {
680 peer: self.peer,
681 counter: self.next_counter,
682 }
683 }
684
685 #[inline]
686 pub fn id_span(&self) -> IdSpan {
687 IdSpan::new(self.peer, self.start_counter, self.next_counter)
688 }
689
690 pub fn next_idlp(&self) -> IdLp {
691 IdLp {
692 peer: self.peer,
693 lamport: self.next_lamport,
694 }
695 }
696
697 pub fn is_empty(&self) -> bool {
698 self.local_ops.is_empty()
699 }
700
701 pub(crate) fn len(&self) -> usize {
702 (self.next_counter - self.start_counter) as usize
703 }
704
705 pub(crate) fn set_options(&mut self, options: CommitOptions) {
706 self.origin = options.origin.unwrap_or_default();
707 self.msg = options.commit_msg;
708 self.timestamp = options.timestamp;
709 }
710
711 pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
712 if self.origin.is_empty() {
713 self.origin = default_options.origin.unwrap_or_default();
714 }
715 if self.msg.is_none() {
716 self.msg = default_options.commit_msg;
717 }
718 if self.timestamp.is_none() {
719 self.timestamp = default_options.timestamp;
720 }
721 }
722}
723
724impl Drop for Transaction {
725 #[tracing::instrument(level = "debug", skip(self))]
726 fn drop(&mut self) {
727 if !self.finished {
728 self._commit().unwrap();
731 }
732 }
733}
734
735#[derive(Debug, Clone)]
736pub(crate) struct TxnContainerDiff {
737 pub(crate) idx: ContainerIdx,
738 pub(crate) diff: Diff,
739}
740
741fn change_to_diff(
743 change: &Change,
744 doc: Arc<LoroDocInner>,
745 event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
746) -> Vec<TxnContainerDiff> {
747 let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
748 let peer = change.id.peer;
749 let mut lamport = change.lamport;
750
751 let mut ops_by_container: FxHashMap<ContainerIdx, Vec<Op>> = FxHashMap::default();
753 for op in change.ops.iter() {
754 ops_by_container
755 .entry(op.container)
756 .or_default()
757 .push(op.clone());
758 }
759
760 for (container_idx, hints) in event_hints {
762 let Some(container_ops) = ops_by_container.get_mut(&container_idx) else {
763 continue;
764 };
765
766 let mut op_index = 0;
767 let mut hint_iter = hints.into_iter();
768 let mut current_hint = hint_iter.next();
769
770 while op_index < container_ops.len() {
771 let Some(hint) = current_hint.take() else {
772 unreachable!("Missing hint for op");
773 };
774
775 let mut ops_for_hint: SmallVec<[Op; 1]> = smallvec![container_ops[op_index].clone()];
777 let mut total_len = container_ops[op_index].atom_len();
778
779 while total_len < hint.rle_len() {
781 op_index += 1;
782 let next_op_len = container_ops[op_index].atom_len();
783 let op = if next_op_len + total_len > hint.rle_len() {
784 let new_len = hint.rle_len() - total_len;
785 let left = container_ops[op_index].slice(0, new_len);
786 let right = container_ops[op_index].slice(new_len, next_op_len);
787 container_ops[op_index] = right;
788 op_index -= 1;
789 left
790 } else {
791 container_ops[op_index].clone()
792 };
793
794 total_len += op.atom_len();
795 ops_for_hint.push(op);
796 }
797
798 op_index += 1;
799 assert_eq!(total_len, hint.rle_len(), "Op/hint length mismatch");
800
801 current_hint = hint_iter.next();
803
804 match hint {
806 EventHint::Mark { start, end, style } => {
807 let mut meta = StyleMeta::default();
808 meta.insert(
809 style.key.clone(),
810 StyleMetaItem {
811 lamport,
812 peer: change.id.peer,
813 value: style.data,
814 },
815 );
816 let diff = DeltaRopeBuilder::new()
817 .retain(start as usize, Default::default())
818 .retain(
819 (end - start) as usize,
820 meta.to_option_map().unwrap_or_default().into(),
821 )
822 .build();
823 ans.push(TxnContainerDiff {
824 idx: container_idx,
825 diff: Diff::Text(diff),
826 });
827 }
828 EventHint::InsertText { styles, pos, .. } => {
829 let mut delta: TextDiff = DeltaRopeBuilder::new()
830 .retain(pos as usize, Default::default())
831 .build();
832 for op in ops_for_hint.iter() {
833 let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
834 else {
835 unreachable!()
836 };
837
838 delta.push_insert(
839 slice.clone().into(),
840 styles.to_option_map().unwrap_or_default().into(),
841 );
842 }
843 ans.push(TxnContainerDiff {
844 idx: container_idx,
845 diff: Diff::Text(delta),
846 })
847 }
848 EventHint::DeleteText {
849 span,
850 unicode_len: _,
851 } => ans.push(TxnContainerDiff {
854 idx: container_idx,
855 diff: Diff::Text(
856 DeltaRopeBuilder::new()
857 .retain(span.start() as usize, Default::default())
858 .delete(span.len())
859 .build(),
860 ),
861 }),
862 EventHint::InsertList { pos, .. } => {
863 let mut index = pos;
866 for op in ops_for_hint.iter() {
867 let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
868 let values = doc
869 .arena
870 .get_values(range.to_range())
871 .into_iter()
872 .map(|v| ValueOrHandler::from_value(v, &doc));
873 let len = values.len();
874 ans.push(TxnContainerDiff {
875 idx: container_idx,
876 diff: Diff::List(
877 DeltaRopeBuilder::new()
878 .retain(index, Default::default())
879 .insert_many(values, Default::default())
880 .build(),
881 ),
882 });
883 index += len;
884 }
885 }
886 EventHint::DeleteList(s) => {
887 ans.push(TxnContainerDiff {
888 idx: container_idx,
889 diff: Diff::List(
890 DeltaRopeBuilder::new()
891 .retain(s.start() as usize, Default::default())
892 .delete(s.len())
893 .build(),
894 ),
895 });
896 }
897 EventHint::Map { key, value } => ans.push(TxnContainerDiff {
898 idx: container_idx,
899 diff: Diff::Map(ResolvedMapDelta::new().with_entry(
900 key,
901 ResolvedMapValue {
902 value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
903 idlp: IdLp::new(peer, lamport),
904 },
905 )),
906 }),
907 EventHint::Tree(tree_diff) => {
908 let mut diff = TreeDiff::default();
909 diff.diff.extend(tree_diff.into_iter());
910 ans.push(TxnContainerDiff {
911 idx: container_idx,
912 diff: Diff::Tree(diff),
913 });
914 }
915 EventHint::Move { from, to, value } => {
916 let mut a = DeltaRopeBuilder::new()
917 .retain(from as usize, Default::default())
918 .delete(1)
919 .build();
920 a.compose(
921 &DeltaRopeBuilder::new()
922 .retain(to as usize, Default::default())
923 .insert(
924 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
925 ListDeltaMeta { from_move: true },
926 )
927 .build(),
928 );
929 ans.push(TxnContainerDiff {
930 idx: container_idx,
931 diff: Diff::List(a),
932 });
933 }
934 EventHint::SetList { index, value } => {
935 ans.push(TxnContainerDiff {
936 idx: container_idx,
937 diff: Diff::List(
938 DeltaRopeBuilder::new()
939 .retain(index, Default::default())
940 .delete(1)
941 .insert(
942 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
943 Default::default(),
944 )
945 .build(),
946 ),
947 });
948 }
949 EventHint::MarkEnd => {
950 }
952 #[cfg(feature = "counter")]
953 EventHint::Counter(diff) => {
954 ans.push(TxnContainerDiff {
955 idx: container_idx,
956 diff: Diff::Counter(diff),
957 });
958 }
959 }
960
961 lamport += ops_for_hint
963 .iter()
964 .map(|x| x.content_len() as Lamport)
965 .sum::<Lamport>();
966 }
967 }
968
969 ans
970}
971
972#[cfg(test)]
973mod tests {
974 use super::*;
975 use crate::{cursor::PosType, version::Frontiers};
976
977 #[test]
978 fn txn_creation_rolls_back_in_txn_after_peer_conflict() {
979 let doc = LoroDoc::new();
980 doc.set_detached_editing(true);
981 doc.set_peer_id(7).unwrap();
982
983 let text = doc.get_text("text");
984 let mut txn = doc.txn().unwrap();
985 text.insert_with_txn(&mut txn, 0, "a", PosType::Unicode)
986 .unwrap();
987 txn.commit().unwrap();
988
989 doc.checkout(&Frontiers::default()).unwrap();
990 doc.set_peer_id(7).unwrap();
991
992 let err = doc
993 .txn()
994 .expect_err("stale detached frontiers should reject reusing the same peer");
995 assert!(matches!(
996 err,
997 LoroError::ConcurrentOpsWithSamePeerID { peer: 7, .. }
998 ));
999 assert!(!doc.app_state().lock().is_in_txn());
1000 }
1001}