1use core::panic;
2use std::{
3 borrow::Cow,
4 mem::take,
5 sync::{Arc, Mutex, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
10use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
11use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
12use rle::{HasLength, Mergable, RleVec};
13use smallvec::{smallvec, SmallVec};
14
15use crate::{
16 change::{Change, Lamport, Timestamp},
17 container::{
18 idx::ContainerIdx,
19 list::list_op::{DeleteSpan, InnerListOp},
20 richtext::Style,
21 IntoContainerId,
22 },
23 delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
24 encoding::export_fast_updates_in_range,
25 event::{Diff, ListDeltaMeta, TextDiff},
26 handler::{Handler, ValueOrHandler},
27 id::{Counter, PeerID, ID},
28 loro::CommitOptions,
29 op::{Op, RawOp, RawOpContent},
30 span::HasIdSpan,
31 version::Frontiers,
32 InternalString, LoroDocInner, LoroError, LoroValue,
33};
34
35use super::{
36 arena::SharedArena,
37 event::{InternalContainerDiff, InternalDocDiff},
38 handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
39 oplog::OpLog,
40 state::DocState,
41};
42
43impl crate::LoroDoc {
44 #[inline(always)]
50 pub fn txn(&self) -> Result<Transaction, LoroError> {
51 self.txn_with_origin("")
52 }
53
54 pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
59 if !self.can_edit() {
60 return Err(LoroError::TransactionError(
61 String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
62 ));
63 }
64
65 let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
66
67 let obs = self.observer.clone();
68 let local_update_subs_weak = self.local_update_subs.downgrade();
69 txn.set_on_commit(Box::new(move |state, oplog, id_span| {
70 let mut state = state.try_lock().unwrap();
71 let events = state.take_events();
72 drop(state);
73 for event in events {
74 obs.emit(event);
75 }
76
77 if id_span.atom_len() == 0 {
78 return;
79 }
80
81 if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
82 if !local_update_subs.inner().is_empty() {
83 let bytes =
84 { export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
85 local_update_subs.emit(&(), bytes);
86 }
87 }
88 }));
89
90 Ok(txn)
91 }
92
93 #[inline(always)]
94 pub fn with_txn<F, R>(&self, f: F) -> LoroResult<R>
95 where
96 F: FnOnce(&mut Transaction) -> LoroResult<R>,
97 {
98 let mut txn = self.txn().unwrap();
99 let v = f(&mut txn)?;
100 txn.commit()?;
101 Ok(v)
102 }
103
104 pub fn start_auto_commit(&self) {
105 self.auto_commit
106 .store(true, std::sync::atomic::Ordering::Release);
107 let mut self_txn = self.txn.try_lock().unwrap();
108 if self_txn.is_some() || !self.can_edit() {
109 return;
110 }
111
112 let txn = self.txn().unwrap();
113 self_txn.replace(txn);
114 }
115
116 #[inline]
117 pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
118 if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
119 let mut self_txn = self.txn.try_lock().unwrap();
120 if self_txn.is_some() {
121 return;
122 }
123
124 let mut txn = self.txn().unwrap();
125 if let Some(options) = options {
126 txn.set_options(options);
127 }
128 self_txn.replace(txn);
129 }
130 }
131}
132
133pub(crate) type OnCommitFn =
134 Box<dyn FnOnce(&Arc<Mutex<DocState>>, &Arc<Mutex<OpLog>>, IdSpan) + Sync + Send>;
135
136pub struct Transaction {
137 peer: PeerID,
138 origin: InternalString,
139 start_counter: Counter,
140 next_counter: Counter,
141 start_lamport: Lamport,
142 next_lamport: Lamport,
143 doc: Weak<LoroDocInner>,
144 frontiers: Frontiers,
145 local_ops: RleVec<[Op; 1]>, event_hints: Vec<EventHint>,
147 pub(super) arena: SharedArena,
148 finished: bool,
149 on_commit: Option<OnCommitFn>,
150 timestamp: Option<Timestamp>,
151 msg: Option<Arc<str>>,
152 latest_timestamp: Timestamp,
153}
154
155impl std::fmt::Debug for Transaction {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("Transaction")
158 .field("peer", &self.peer)
159 .field("origin", &self.origin)
160 .field("start_counter", &self.start_counter)
161 .field("next_counter", &self.next_counter)
162 .field("start_lamport", &self.start_lamport)
163 .field("next_lamport", &self.next_lamport)
164 .field("frontiers", &self.frontiers)
165 .field("local_ops", &self.local_ops)
166 .field("event_hints", &self.event_hints)
167 .field("arena", &self.arena)
168 .field("finished", &self.finished)
169 .field("on_commit", &self.on_commit.is_some())
170 .field("timestamp", &self.timestamp)
171 .finish()
172 }
173}
174
175#[derive(Debug, Clone, EnumAsInner)]
183pub(super) enum EventHint {
184 Mark {
185 start: u32,
186 end: u32,
187 style: Style,
188 },
189 InsertText {
190 pos: u32,
192 event_len: u32,
193 unicode_len: u32,
194 styles: StyleMeta,
195 },
196 DeleteText {
198 span: DeleteSpan,
199 unicode_len: usize,
200 },
201 InsertList {
202 len: u32,
203 pos: usize,
204 },
205 SetList {
206 index: usize,
207 value: LoroValue,
208 },
209 Move {
210 value: LoroValue,
211 from: u32,
212 to: u32,
213 },
214 DeleteList(DeleteSpan),
215 Map {
216 key: InternalString,
217 value: Option<LoroValue>,
218 },
219 Tree(SmallVec<[TreeDiffItem; 1]>),
221 MarkEnd,
222 #[cfg(feature = "counter")]
223 Counter(f64),
224}
225
226impl generic_btree::rle::HasLength for EventHint {
227 fn rle_len(&self) -> usize {
228 match self {
229 EventHint::Mark { .. } => 1,
230 EventHint::InsertText {
231 unicode_len: len, ..
232 } => *len as usize,
233 EventHint::DeleteText { unicode_len, .. } => *unicode_len,
234 EventHint::InsertList { len, .. } => *len as usize,
235 EventHint::DeleteList(d) => d.len(),
236 EventHint::Map { .. } => 1,
237 EventHint::Tree(_) => 1,
238 EventHint::MarkEnd => 1,
239 EventHint::Move { .. } => 1,
240 EventHint::SetList { .. } => 1,
241 #[cfg(feature = "counter")]
242 EventHint::Counter(_) => 1,
243 }
244 }
245}
246
247impl generic_btree::rle::Mergeable for EventHint {
248 fn can_merge(&self, rhs: &Self) -> bool {
249 match (self, rhs) {
250 (
251 EventHint::InsertText {
252 pos,
253 unicode_len: _,
254 event_len,
255 styles,
256 },
257 EventHint::InsertText {
258 pos: r_pos,
259 styles: r_styles,
260 ..
261 },
262 ) => *pos + *event_len == *r_pos && styles == r_styles,
263 (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
264 pos + *len as usize == *pos_right
265 }
266 (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
270 span.is_mergable(r, &())
271 }
272 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
273 _ => false,
274 }
275 }
276
277 fn merge_right(&mut self, rhs: &Self) {
278 match (self, rhs) {
279 (
280 EventHint::InsertText {
281 event_len,
282 unicode_len: len,
283 ..
284 },
285 EventHint::InsertText {
286 event_len: r_event_len,
287 unicode_len: r_len,
288 ..
289 },
290 ) => {
291 *len += *r_len;
292 *event_len += *r_event_len;
293 }
294 (
295 EventHint::InsertList { len, pos: _ },
296 EventHint::InsertList { len: r_len, pos: _ },
297 ) => *len += *r_len,
298 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
299 (
300 EventHint::DeleteText { span, unicode_len },
301 EventHint::DeleteText {
302 span: r_span,
303 unicode_len: r_len,
304 },
305 ) => {
306 *unicode_len += *r_len;
307 span.merge(r_span, &());
308 }
309 _ => unreachable!(),
310 }
311 }
312
313 fn merge_left(&mut self, _: &Self) {
314 unreachable!()
315 }
316}
317
318impl Transaction {
319 #[inline]
320 pub fn new(doc: Arc<LoroDocInner>) -> Self {
321 Self::new_with_origin(doc.clone(), "".into())
322 }
323
324 pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
325 let mut state_lock = doc.state.try_lock().unwrap();
326 if state_lock.is_in_txn() {
327 panic!("Cannot start a transaction while another one is in progress");
328 }
329
330 let oplog_lock = doc.oplog.try_lock().unwrap();
331 state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
332 let arena = state_lock.arena.clone();
333 let frontiers = state_lock.frontiers.clone();
334 let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
335 let next_counter = oplog_lock.next_id(peer).counter;
336 let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
337 let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
338 oplog_lock
339 .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
340 .unwrap();
341 drop(state_lock);
342 drop(oplog_lock);
343 Self {
344 peer,
345 doc: Arc::downgrade(&doc),
346 arena,
347 frontiers,
348 timestamp: None,
349 next_counter,
350 next_lamport,
351 origin: Default::default(),
352 start_counter: next_counter,
353 start_lamport: next_lamport,
354 event_hints: Default::default(),
355 local_ops: RleVec::new(),
356 finished: false,
357 on_commit: None,
358 msg: None,
359 latest_timestamp,
360 }
361 }
362
363 pub fn set_origin(&mut self, origin: InternalString) {
364 self.origin = origin;
365 }
366
367 pub fn set_timestamp(&mut self, time: Timestamp) {
368 self.timestamp = Some(time);
369 }
370
371 pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
372 self.msg = msg;
373 }
374
375 pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
376 &self.local_ops
377 }
378
379 pub fn peer(&self) -> &PeerID {
380 &self.peer
381 }
382
383 pub fn timestamp(&self) -> &Option<Timestamp> {
384 &self.timestamp
385 }
386
387 pub fn frontiers(&self) -> &Frontiers {
388 &self.frontiers
389 }
390 pub fn msg(&self) -> &Option<Arc<str>> {
391 &self.msg
392 }
393
394 pub fn lamport(&self) -> &Lamport {
395 &self.start_lamport
396 }
397
398 pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
399 self.on_commit = Some(f);
400 }
401
402 pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
403 self.on_commit.take()
404 }
405
406 pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
407 self._commit()
408 }
409
410 #[tracing::instrument(level = "debug", skip(self))]
411 fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
412 if self.finished {
413 return Ok(None);
414 }
415
416 let Some(doc) = self.doc.upgrade() else {
417 return Ok(None);
418 };
419 self.finished = true;
420 let mut state = doc.state.try_lock().unwrap();
421 if self.local_ops.is_empty() {
422 state.abort_txn();
423 return Ok(Some(self.take_options()));
424 }
425
426 let ops = std::mem::take(&mut self.local_ops);
427 let mut oplog = doc.oplog.try_lock().unwrap();
428 let deps = take(&mut self.frontiers);
429 let change = Change {
430 lamport: self.start_lamport,
431 ops,
432 deps,
433 id: ID::new(self.peer, self.start_counter),
434 timestamp: self.latest_timestamp.max(
435 self.timestamp
436 .unwrap_or_else(|| oplog.get_timestamp_for_next_txn()),
437 ),
438 commit_msg: take(&mut self.msg),
439 };
440
441 let diff = if state.is_recording() {
442 Some(change_to_diff(
443 &change,
444 doc.clone(),
445 std::mem::take(&mut self.event_hints),
446 ))
447 } else {
448 None
449 };
450
451 let last_id = change.id_last();
452 if let Err(err) = oplog.import_local_change(change) {
453 state.abort_txn();
454 drop(state);
455 drop(oplog);
456 return Err(err);
457 }
458
459 state.commit_txn(
460 Frontiers::from_id(last_id),
461 diff.map(|arr| InternalDocDiff {
462 by: crate::event::EventTriggerKind::Local,
463 origin: self.origin.clone(),
464 diff: Cow::Owned(
465 arr.into_iter()
466 .map(|x| InternalContainerDiff {
467 idx: x.idx,
468 bring_back: false,
469 is_container_deleted: false,
470 diff: (x.diff.into()),
471 diff_mode: crate::diff_calc::DiffMode::Linear,
472 })
473 .collect(),
474 ),
475 new_version: Cow::Borrowed(oplog.frontiers()),
476 }),
477 );
478 drop(state);
479 drop(oplog);
480 if let Some(on_commit) = self.on_commit.take() {
481 on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
482 }
483 Ok(None)
484 }
485
486 fn take_options(&self) -> CommitOptions {
487 let mut options = CommitOptions::new();
488 if !self.origin.is_empty() {
489 options = options.origin(self.origin.as_str());
490 }
491 if let Some(msg) = self.msg.as_ref() {
492 options = options.commit_msg(msg);
493 }
494 if let Some(timestamp) = self.timestamp {
495 options = options.timestamp(timestamp);
496 }
497 options
498 }
499
500 pub(super) fn apply_local_op(
501 &mut self,
502 container: ContainerIdx,
503 content: RawOpContent,
504 event: EventHint,
505 doc: &Arc<LoroDocInner>,
507 ) -> LoroResult<()> {
508 let this_doc = self.doc.upgrade().unwrap();
510 if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
511 return Err(LoroError::UnmatchedContext {
512 expected: this_doc
513 .state
514 .try_lock()
515 .unwrap()
516 .peer
517 .load(std::sync::atomic::Ordering::Relaxed),
518 found: doc
519 .state
520 .try_lock()
521 .unwrap()
522 .peer
523 .load(std::sync::atomic::Ordering::Relaxed),
524 });
525 }
526
527 let len = content.content_len();
528 assert!(len > 0);
529 let raw_op = RawOp {
530 id: ID {
531 peer: self.peer,
532 counter: self.next_counter,
533 },
534 lamport: self.next_lamport,
535 container,
536 content,
537 };
538
539 let mut state = doc.state.try_lock().unwrap();
540 if state.is_deleted(container) {
541 return Err(LoroError::ContainerDeleted {
542 container: Box::new(state.arena.idx_to_id(container).unwrap()),
543 });
544 }
545
546 let op = self.arena.convert_raw_op(&raw_op);
547 state.apply_local_op(&raw_op, &op)?;
548 {
549 let mut oplog = doc.oplog.try_lock().unwrap();
551 let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
552 let start_id = ID::new(self.peer, self.next_counter);
553 self.next_counter += len as Counter;
554 oplog.dag.update_version_on_new_local_op(
555 if self.local_ops.is_empty() {
556 &self.frontiers
557 } else {
558 &dep_id
559 },
560 start_id,
561 self.next_lamport,
562 len,
563 );
564 self.next_lamport += len as Lamport;
565 let last_id = start_id.inc(len as Counter - 1);
567 state.frontiers = Frontiers::from_id(last_id);
568 };
569 drop(state);
570 debug_assert_eq!(
571 event.rle_len(),
572 op.atom_len(),
573 "event:{:#?} \nop:{:#?}",
574 &event,
575 &op
576 );
577
578 match self.event_hints.last_mut() {
579 Some(last) if last.can_merge(&event) => {
580 last.merge_right(&event);
581 }
582 _ => {
583 self.event_hints.push(event);
584 }
585 }
586 self.local_ops.push(op);
587 Ok(())
588 }
589
590 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
593 let id = id.into_container_id(&self.arena, ContainerType::Text);
594 Handler::new_attached(id, self.doc.upgrade().unwrap())
595 .into_text()
596 .unwrap()
597 }
598
599 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
602 let id = id.into_container_id(&self.arena, ContainerType::List);
603 Handler::new_attached(id, self.doc.upgrade().unwrap())
604 .into_list()
605 .unwrap()
606 }
607
608 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
611 let id = id.into_container_id(&self.arena, ContainerType::Map);
612 Handler::new_attached(id, self.doc.upgrade().unwrap())
613 .into_map()
614 .unwrap()
615 }
616
617 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
620 let id = id.into_container_id(&self.arena, ContainerType::Tree);
621 Handler::new_attached(id, self.doc.upgrade().unwrap())
622 .into_tree()
623 .unwrap()
624 }
625 pub fn next_id(&self) -> ID {
626 ID {
627 peer: self.peer,
628 counter: self.next_counter,
629 }
630 }
631
632 #[inline]
633 pub fn id_span(&self) -> IdSpan {
634 IdSpan::new(self.peer, self.start_counter, self.next_counter)
635 }
636
637 pub fn next_idlp(&self) -> IdLp {
638 IdLp {
639 peer: self.peer,
640 lamport: self.next_lamport,
641 }
642 }
643
644 pub fn is_empty(&self) -> bool {
645 self.local_ops.is_empty()
646 }
647
648 pub(crate) fn len(&self) -> usize {
649 (self.next_counter - self.start_counter) as usize
650 }
651
652 pub(crate) fn set_options(&mut self, options: CommitOptions) {
653 self.origin = options.origin.unwrap_or_default();
654 self.msg = options.commit_msg;
655 self.timestamp = options.timestamp;
656 }
657
658 pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
659 if self.origin.is_empty() {
660 self.origin = default_options.origin.unwrap_or_default();
661 }
662 if self.msg.is_none() {
663 self.msg = default_options.commit_msg;
664 }
665 if self.timestamp.is_none() {
666 self.timestamp = default_options.timestamp;
667 }
668 }
669}
670
671impl Drop for Transaction {
672 #[tracing::instrument(level = "debug", skip(self))]
673 fn drop(&mut self) {
674 if !self.finished {
675 self._commit().unwrap();
678 }
679 }
680}
681
682#[derive(Debug, Clone)]
683pub(crate) struct TxnContainerDiff {
684 pub(crate) idx: ContainerIdx,
685 pub(crate) diff: Diff,
686}
687
688fn change_to_diff(
690 change: &Change,
691 doc: Arc<LoroDocInner>,
692 event_hints: Vec<EventHint>,
693) -> Vec<TxnContainerDiff> {
694 let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
695 let peer = change.id.peer;
696 let mut lamport = change.lamport;
697 let mut event_hint_iter = event_hints.into_iter();
698 let mut o_hint = event_hint_iter.next();
699 let mut op_iter = change.ops.iter();
700 while let Some(op) = op_iter.next() {
701 let Some(hint) = o_hint.as_mut() else {
702 unreachable!()
703 };
704
705 let mut ops: SmallVec<[&Op; 1]> = smallvec![op];
706 let hint = match op.atom_len().cmp(&hint.rle_len()) {
707 std::cmp::Ordering::Less => {
708 let mut len = op.atom_len();
709 while len < hint.rle_len() {
710 let next = op_iter.next().unwrap();
711 len += next.atom_len();
712 ops.push(next);
713 }
714 assert!(len == hint.rle_len());
715 match event_hint_iter.next() {
716 Some(n) => o_hint.replace(n).unwrap(),
717 None => o_hint.take().unwrap(),
718 }
719 }
720 std::cmp::Ordering::Equal => match event_hint_iter.next() {
721 Some(n) => o_hint.replace(n).unwrap(),
722 None => o_hint.take().unwrap(),
723 },
724 std::cmp::Ordering::Greater => {
725 unreachable!("{:#?}", &op)
726 }
727 };
728
729 match &hint {
730 EventHint::InsertText { .. }
731 | EventHint::InsertList { .. }
732 | EventHint::DeleteText { .. }
733 | EventHint::DeleteList(_) => {}
734 _ => {
735 assert_eq!(ops.len(), 1);
736 }
737 }
738 match hint {
739 EventHint::Mark { start, end, style } => {
740 let mut meta = StyleMeta::default();
741 meta.insert(
742 style.key.clone(),
743 StyleMetaItem {
744 lamport,
745 peer: change.id.peer,
746 value: style.data,
747 },
748 );
749 let diff = DeltaRopeBuilder::new()
750 .retain(start as usize, Default::default())
751 .retain(
752 (end - start) as usize,
753 meta.to_option_map().unwrap_or_default().into(),
754 )
755 .build();
756 ans.push(TxnContainerDiff {
757 idx: op.container,
758 diff: Diff::Text(diff),
759 });
760 }
761 EventHint::InsertText { styles, pos, .. } => {
762 let mut delta: TextDiff = DeltaRopeBuilder::new()
763 .retain(pos as usize, Default::default())
764 .build();
765 for op in ops.iter() {
766 let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
767 else {
768 unreachable!()
769 };
770
771 delta.push_insert(
772 slice.clone().into(),
773 styles.to_option_map().unwrap_or_default().into(),
774 );
775 }
776 ans.push(TxnContainerDiff {
777 idx: op.container,
778 diff: Diff::Text(delta),
779 })
780 }
781 EventHint::DeleteText {
782 span,
783 unicode_len: _,
784 } => ans.push(TxnContainerDiff {
787 idx: op.container,
788 diff: Diff::Text(
789 DeltaRopeBuilder::new()
790 .retain(span.start() as usize, Default::default())
791 .delete(span.len())
792 .build(),
793 ),
794 }),
795 EventHint::InsertList { pos, .. } => {
796 for op in ops.iter() {
799 let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
800 let values = doc
801 .arena
802 .get_values(range.to_range())
803 .into_iter()
804 .map(|v| ValueOrHandler::from_value(v, &doc));
805 ans.push(TxnContainerDiff {
806 idx: op.container,
807 diff: Diff::List(
808 DeltaRopeBuilder::new()
809 .retain(pos, Default::default())
810 .insert_many(values, Default::default())
811 .build(),
812 ),
813 })
814 }
815 }
816 EventHint::DeleteList(s) => {
817 ans.push(TxnContainerDiff {
818 idx: op.container,
819 diff: Diff::List(
820 DeltaRopeBuilder::new()
821 .retain(s.start() as usize, Default::default())
822 .delete(s.len())
823 .build(),
824 ),
825 });
826 }
827 EventHint::Map { key, value } => ans.push(TxnContainerDiff {
828 idx: op.container,
829 diff: Diff::Map(ResolvedMapDelta::new().with_entry(
830 key,
831 ResolvedMapValue {
832 value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
833 idlp: IdLp::new(peer, lamport),
834 },
835 )),
836 }),
837 EventHint::Tree(tree_diff) => {
838 let mut diff = TreeDiff::default();
839 diff.diff.extend(tree_diff.into_iter());
840 ans.push(TxnContainerDiff {
841 idx: op.container,
842 diff: Diff::Tree(diff),
843 });
844 }
845 EventHint::Move { from, to, value } => {
846 let mut a = DeltaRopeBuilder::new()
847 .retain(from as usize, Default::default())
848 .delete(1)
849 .build();
850 a.compose(
851 &DeltaRopeBuilder::new()
852 .retain(to as usize, Default::default())
853 .insert(
854 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
855 ListDeltaMeta { from_move: true },
856 )
857 .build(),
858 );
859 ans.push(TxnContainerDiff {
860 idx: op.container,
861 diff: Diff::List(a),
862 });
863 }
864 EventHint::SetList { index, value } => {
865 ans.push(TxnContainerDiff {
866 idx: op.container,
867 diff: Diff::List(
868 DeltaRopeBuilder::new()
869 .retain(index, Default::default())
870 .delete(1)
871 .insert(
872 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
873 Default::default(),
874 )
875 .build(),
876 ),
877 });
878 }
879 EventHint::MarkEnd => {
880 }
882 #[cfg(feature = "counter")]
883 EventHint::Counter(diff) => {
884 ans.push(TxnContainerDiff {
885 idx: op.container,
886 diff: Diff::Counter(diff),
887 });
888 }
889 }
890
891 lamport += ops
892 .iter()
893 .map(|x| x.content_len() as Lamport)
894 .sum::<Lamport>();
895 }
896 ans
897}