1use core::panic;
2use std::{
3 borrow::Cow,
4 mem::take,
5 sync::{Arc, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
10use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
11use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
12use rle::{HasLength, Mergable, RleVec};
13use smallvec::{smallvec, SmallVec};
14
15use crate::{
16 change::{Change, Lamport, Timestamp},
17 container::{
18 idx::ContainerIdx,
19 list::list_op::{DeleteSpan, InnerListOp},
20 richtext::Style,
21 IntoContainerId,
22 },
23 delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
24 encoding::export_fast_updates_in_range,
25 event::{Diff, ListDeltaMeta, TextDiff},
26 handler::{Handler, ValueOrHandler},
27 id::{Counter, PeerID, ID},
28 lock::LoroMutex,
29 loro::CommitOptions,
30 op::{Op, RawOp, RawOpContent},
31 pre_commit::{ChangeModifier, PreCommitCallbackPayload},
32 span::HasIdSpan,
33 version::Frontiers,
34 ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
35};
36
37use super::{
38 arena::SharedArena,
39 event::{InternalContainerDiff, InternalDocDiff},
40 handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
41 oplog::OpLog,
42 state::DocState,
43};
44
45impl crate::LoroDoc {
46 #[inline(always)]
52 pub fn txn(&self) -> Result<Transaction, LoroError> {
53 self.txn_with_origin("")
54 }
55
56 pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
61 if !self.can_edit() {
62 return Err(LoroError::TransactionError(
63 String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
64 ));
65 }
66
67 let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
68
69 let obs = self.observer.clone();
70 let local_update_subs_weak = self.local_update_subs.downgrade();
71 txn.set_on_commit(Box::new(move |state, oplog, id_span| {
72 let mut state = state.lock().unwrap();
73 let events = state.take_events();
74 drop(state);
75 for event in events {
76 obs.emit(event);
77 }
78
79 if id_span.atom_len() == 0 {
80 return;
81 }
82
83 if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
84 if !local_update_subs.inner().is_empty() {
85 let bytes =
86 { export_fast_updates_in_range(&oplog.lock().unwrap(), &[id_span]) };
87 local_update_subs.emit(&(), bytes);
88 }
89 }
90 }));
91
92 Ok(txn)
93 }
94
95 pub fn start_auto_commit(&self) {
96 self.auto_commit
97 .store(true, std::sync::atomic::Ordering::Release);
98 let mut self_txn = self.txn.lock().unwrap();
99 if self_txn.is_some() || !self.can_edit() {
100 return;
101 }
102
103 let txn = self.txn().unwrap();
104 self_txn.replace(txn);
105 }
106
107 #[inline]
108 pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
109 if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
110 let mut self_txn = self.txn.lock().unwrap();
111 if self_txn.is_some() {
112 return;
113 }
114
115 let mut txn = self.txn().unwrap();
116 if let Some(options) = options {
117 txn.set_options(options);
118 }
119 self_txn.replace(txn);
120 }
121 }
122}
123
124pub(crate) type OnCommitFn =
125 Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
126
127pub struct Transaction {
128 peer: PeerID,
129 origin: InternalString,
130 start_counter: Counter,
131 next_counter: Counter,
132 start_lamport: Lamport,
133 next_lamport: Lamport,
134 doc: Weak<LoroDocInner>,
135 frontiers: Frontiers,
136 local_ops: RleVec<[Op; 1]>, event_hints: Vec<EventHint>,
138 pub(super) arena: SharedArena,
139 finished: bool,
140 on_commit: Option<OnCommitFn>,
141 timestamp: Option<Timestamp>,
142 msg: Option<Arc<str>>,
143 latest_timestamp: Timestamp,
144 pub(super) is_peer_first_appearance: bool,
145}
146
147impl std::fmt::Debug for Transaction {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("Transaction")
150 .field("peer", &self.peer)
151 .field("origin", &self.origin)
152 .field("start_counter", &self.start_counter)
153 .field("next_counter", &self.next_counter)
154 .field("start_lamport", &self.start_lamport)
155 .field("next_lamport", &self.next_lamport)
156 .field("frontiers", &self.frontiers)
157 .field("local_ops", &self.local_ops)
158 .field("event_hints", &self.event_hints)
159 .field("arena", &self.arena)
160 .field("finished", &self.finished)
161 .field("on_commit", &self.on_commit.is_some())
162 .field("timestamp", &self.timestamp)
163 .finish()
164 }
165}
166
167#[derive(Debug, Clone, EnumAsInner)]
175pub(super) enum EventHint {
176 Mark {
177 start: u32,
178 end: u32,
179 style: Style,
180 },
181 InsertText {
182 pos: u32,
184 event_len: u32,
185 unicode_len: u32,
186 styles: StyleMeta,
187 },
188 DeleteText {
190 span: DeleteSpan,
191 unicode_len: usize,
192 },
193 InsertList {
194 len: u32,
195 pos: usize,
196 },
197 SetList {
198 index: usize,
199 value: LoroValue,
200 },
201 Move {
202 value: LoroValue,
203 from: u32,
204 to: u32,
205 },
206 DeleteList(DeleteSpan),
207 Map {
208 key: InternalString,
209 value: Option<LoroValue>,
210 },
211 Tree(SmallVec<[TreeDiffItem; 1]>),
213 MarkEnd,
214 #[cfg(feature = "counter")]
215 Counter(f64),
216}
217
218impl generic_btree::rle::HasLength for EventHint {
219 fn rle_len(&self) -> usize {
220 match self {
221 EventHint::Mark { .. } => 1,
222 EventHint::InsertText {
223 unicode_len: len, ..
224 } => *len as usize,
225 EventHint::DeleteText { unicode_len, .. } => *unicode_len,
226 EventHint::InsertList { len, .. } => *len as usize,
227 EventHint::DeleteList(d) => d.len(),
228 EventHint::Map { .. } => 1,
229 EventHint::Tree(_) => 1,
230 EventHint::MarkEnd => 1,
231 EventHint::Move { .. } => 1,
232 EventHint::SetList { .. } => 1,
233 #[cfg(feature = "counter")]
234 EventHint::Counter(_) => 1,
235 }
236 }
237}
238
239impl generic_btree::rle::Mergeable for EventHint {
240 fn can_merge(&self, rhs: &Self) -> bool {
241 match (self, rhs) {
242 (
243 EventHint::InsertText {
244 pos,
245 unicode_len: _,
246 event_len,
247 styles,
248 },
249 EventHint::InsertText {
250 pos: r_pos,
251 styles: r_styles,
252 ..
253 },
254 ) => *pos + *event_len == *r_pos && styles == r_styles,
255 (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
256 pos + *len as usize == *pos_right
257 }
258 (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
262 span.is_mergable(r, &())
263 }
264 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
265 _ => false,
266 }
267 }
268
269 fn merge_right(&mut self, rhs: &Self) {
270 match (self, rhs) {
271 (
272 EventHint::InsertText {
273 event_len,
274 unicode_len: len,
275 ..
276 },
277 EventHint::InsertText {
278 event_len: r_event_len,
279 unicode_len: r_len,
280 ..
281 },
282 ) => {
283 *len += *r_len;
284 *event_len += *r_event_len;
285 }
286 (
287 EventHint::InsertList { len, pos: _ },
288 EventHint::InsertList { len: r_len, pos: _ },
289 ) => *len += *r_len,
290 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
291 (
292 EventHint::DeleteText { span, unicode_len },
293 EventHint::DeleteText {
294 span: r_span,
295 unicode_len: r_len,
296 },
297 ) => {
298 *unicode_len += *r_len;
299 span.merge(r_span, &());
300 }
301 _ => unreachable!(),
302 }
303 }
304
305 fn merge_left(&mut self, _: &Self) {
306 unreachable!()
307 }
308}
309
310impl Transaction {
311 #[inline]
312 pub fn new(doc: Arc<LoroDocInner>) -> Self {
313 Self::new_with_origin(doc.clone(), "".into())
314 }
315
316 pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
317 let oplog_lock = doc.oplog.lock().unwrap();
318 let mut state_lock = doc.state.lock().unwrap();
319 if state_lock.is_in_txn() {
320 panic!("Cannot start a transaction while another one is in progress");
321 }
322
323 state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
324 let arena = state_lock.arena.clone();
325 let frontiers = state_lock.frontiers.clone();
326 let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
327 let next_counter = oplog_lock.next_id(peer).counter;
328 let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
329 let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
330 oplog_lock
331 .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
332 .unwrap();
333 drop(state_lock);
334 drop(oplog_lock);
335 Self {
336 peer,
337 doc: Arc::downgrade(&doc),
338 arena,
339 frontiers,
340 timestamp: None,
341 next_counter,
342 next_lamport,
343 origin: Default::default(),
344 start_counter: next_counter,
345 start_lamport: next_lamport,
346 event_hints: Default::default(),
347 local_ops: RleVec::new(),
348 finished: false,
349 on_commit: None,
350 msg: None,
351 latest_timestamp,
352 is_peer_first_appearance: false,
353 }
354 }
355
356 pub fn set_origin(&mut self, origin: InternalString) {
357 self.origin = origin;
358 }
359
360 pub fn set_timestamp(&mut self, time: Timestamp) {
361 self.timestamp = Some(time);
362 }
363
364 pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
365 self.msg = msg;
366 }
367
368 pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
369 &self.local_ops
370 }
371
372 pub fn peer(&self) -> &PeerID {
373 &self.peer
374 }
375
376 pub fn timestamp(&self) -> &Option<Timestamp> {
377 &self.timestamp
378 }
379
380 pub fn frontiers(&self) -> &Frontiers {
381 &self.frontiers
382 }
383 pub fn msg(&self) -> &Option<Arc<str>> {
384 &self.msg
385 }
386
387 pub fn lamport(&self) -> &Lamport {
388 &self.start_lamport
389 }
390
391 pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
392 self.on_commit = Some(f);
393 }
394
395 pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
396 self.on_commit.take()
397 }
398
399 pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
400 self._commit()
401 }
402
403 #[tracing::instrument(level = "debug", skip(self))]
404 fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
405 if self.finished {
406 return Ok(None);
407 }
408
409 let Some(doc) = self.doc.upgrade() else {
410 return Ok(None);
411 };
412 self.finished = true;
413 if self.local_ops.is_empty() {
414 let mut state = doc.state.lock().unwrap();
415 state.abort_txn();
416 return Ok(Some(self.take_options()));
417 }
418
419 let ops = std::mem::take(&mut self.local_ops);
420 let deps = take(&mut self.frontiers);
421 let change = Change {
422 lamport: self.start_lamport,
423 ops,
424 deps,
425 id: ID::new(self.peer, self.start_counter),
426 timestamp: self.latest_timestamp.max(
427 self.timestamp
428 .unwrap_or_else(|| doc.oplog.lock().unwrap().get_timestamp_for_next_txn()),
429 ),
430 commit_msg: take(&mut self.msg),
431 };
432
433 let change_meta = ChangeMeta::from_change(&change);
434 {
435 let mut oplog = doc.oplog.lock().unwrap();
437 oplog.set_uncommitted_change(change);
438 }
439
440 let modifier = ChangeModifier::default();
441 doc.pre_commit_subs.emit(
442 &(),
443 PreCommitCallbackPayload {
444 change_meta,
445 origin: self.origin.to_string(),
446 modifier: modifier.clone(),
447 },
448 );
449
450 let mut oplog = doc.oplog.lock().unwrap();
451 let mut state = doc.state.lock().unwrap();
452
453 let mut change = oplog.uncommitted_change.take().unwrap();
454 modifier.modify_change(&mut change);
455 let diff = if state.is_recording() {
456 Some(change_to_diff(
457 &change,
458 doc.clone(),
459 std::mem::take(&mut self.event_hints),
460 ))
461 } else {
462 None
463 };
464
465 let last_id = change.id_last();
466 if let Err(err) = oplog.import_local_change(change) {
467 state.abort_txn();
468 drop(state);
469 drop(oplog);
470 return Err(err);
471 }
472
473 state.commit_txn(
474 Frontiers::from_id(last_id),
475 diff.map(|arr| InternalDocDiff {
476 by: crate::event::EventTriggerKind::Local,
477 origin: self.origin.clone(),
478 diff: Cow::Owned(
479 arr.into_iter()
480 .map(|x| InternalContainerDiff {
481 idx: x.idx,
482 bring_back: false,
483 is_container_deleted: false,
484 diff: (x.diff.into()),
485 diff_mode: crate::diff_calc::DiffMode::Linear,
486 })
487 .collect(),
488 ),
489 new_version: Cow::Borrowed(oplog.frontiers()),
490 }),
491 );
492 drop(state);
493 drop(oplog);
494 if let Some(on_commit) = self.on_commit.take() {
495 assert!(!doc.txn.is_locked());
496 on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
497 }
498 Ok(None)
499 }
500
501 fn take_options(&self) -> CommitOptions {
502 let mut options = CommitOptions::new();
503 if !self.origin.is_empty() {
504 options = options.origin(self.origin.as_str());
505 }
506 if let Some(msg) = self.msg.as_ref() {
507 options = options.commit_msg(msg);
508 }
509 if let Some(timestamp) = self.timestamp {
510 options = options.timestamp(timestamp);
511 }
512 options
513 }
514
515 pub(super) fn apply_local_op(
516 &mut self,
517 container: ContainerIdx,
518 content: RawOpContent,
519 event: EventHint,
520 doc: &LoroDoc,
522 ) -> LoroResult<()> {
523 let this_doc = self.doc.upgrade().unwrap();
525 if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
526 return Err(LoroError::UnmatchedContext {
527 expected: this_doc
528 .state
529 .lock()
530 .unwrap()
531 .peer
532 .load(std::sync::atomic::Ordering::Relaxed),
533 found: doc
534 .state
535 .lock()
536 .unwrap()
537 .peer
538 .load(std::sync::atomic::Ordering::Relaxed),
539 });
540 }
541
542 let len = content.content_len();
543 assert!(len > 0);
544 let raw_op = RawOp {
545 id: ID {
546 peer: self.peer,
547 counter: self.next_counter,
548 },
549 lamport: self.next_lamport,
550 container,
551 content,
552 };
553
554 let mut oplog = doc.oplog.lock().unwrap();
555 let mut state = doc.state.lock().unwrap();
556 if state.is_deleted(container) {
557 return Err(LoroError::ContainerDeleted {
558 container: Box::new(state.arena.idx_to_id(container).unwrap()),
559 });
560 }
561
562 let op = self.arena.convert_raw_op(&raw_op);
563 state.apply_local_op(&raw_op, &op)?;
564 {
565 if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
566 self.is_peer_first_appearance = true;
567 }
568 let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
570 let start_id = ID::new(self.peer, self.next_counter);
571 self.next_counter += len as Counter;
572 oplog.dag.update_version_on_new_local_op(
573 if self.local_ops.is_empty() {
574 &self.frontiers
575 } else {
576 &dep_id
577 },
578 start_id,
579 self.next_lamport,
580 len,
581 );
582 self.next_lamport += len as Lamport;
583 let last_id = start_id.inc(len as Counter - 1);
585 state.frontiers = Frontiers::from_id(last_id);
586 };
587 drop(state);
588 drop(oplog);
589 debug_assert_eq!(
590 event.rle_len(),
591 op.atom_len(),
592 "event:{:#?} \nop:{:#?}",
593 &event,
594 &op
595 );
596
597 match self.event_hints.last_mut() {
598 Some(last) if last.can_merge(&event) => {
599 last.merge_right(&event);
600 }
601 _ => {
602 self.event_hints.push(event);
603 }
604 }
605 self.local_ops.push(op);
606 Ok(())
607 }
608
609 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
612 let id = id.into_container_id(&self.arena, ContainerType::Text);
613 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
614 .into_text()
615 .unwrap()
616 }
617
618 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
621 let id = id.into_container_id(&self.arena, ContainerType::List);
622 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
623 .into_list()
624 .unwrap()
625 }
626
627 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
630 let id = id.into_container_id(&self.arena, ContainerType::Map);
631 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
632 .into_map()
633 .unwrap()
634 }
635
636 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
639 let id = id.into_container_id(&self.arena, ContainerType::Tree);
640 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
641 .into_tree()
642 .unwrap()
643 }
644 pub fn next_id(&self) -> ID {
645 ID {
646 peer: self.peer,
647 counter: self.next_counter,
648 }
649 }
650
651 #[inline]
652 pub fn id_span(&self) -> IdSpan {
653 IdSpan::new(self.peer, self.start_counter, self.next_counter)
654 }
655
656 pub fn next_idlp(&self) -> IdLp {
657 IdLp {
658 peer: self.peer,
659 lamport: self.next_lamport,
660 }
661 }
662
663 pub fn is_empty(&self) -> bool {
664 self.local_ops.is_empty()
665 }
666
667 pub(crate) fn len(&self) -> usize {
668 (self.next_counter - self.start_counter) as usize
669 }
670
671 pub(crate) fn set_options(&mut self, options: CommitOptions) {
672 self.origin = options.origin.unwrap_or_default();
673 self.msg = options.commit_msg;
674 self.timestamp = options.timestamp;
675 }
676
677 pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
678 if self.origin.is_empty() {
679 self.origin = default_options.origin.unwrap_or_default();
680 }
681 if self.msg.is_none() {
682 self.msg = default_options.commit_msg;
683 }
684 if self.timestamp.is_none() {
685 self.timestamp = default_options.timestamp;
686 }
687 }
688}
689
690impl Drop for Transaction {
691 #[tracing::instrument(level = "debug", skip(self))]
692 fn drop(&mut self) {
693 if !self.finished {
694 self._commit().unwrap();
697 }
698 }
699}
700
701#[derive(Debug, Clone)]
702pub(crate) struct TxnContainerDiff {
703 pub(crate) idx: ContainerIdx,
704 pub(crate) diff: Diff,
705}
706
707fn change_to_diff(
709 change: &Change,
710 doc: Arc<LoroDocInner>,
711 event_hints: Vec<EventHint>,
712) -> Vec<TxnContainerDiff> {
713 let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
714 let peer = change.id.peer;
715 let mut lamport = change.lamport;
716 let mut event_hint_iter = event_hints.into_iter();
717 let mut o_hint = event_hint_iter.next();
718 let mut op_iter = change.ops.iter();
719 while let Some(op) = op_iter.next() {
720 let Some(hint) = o_hint.as_mut() else {
721 unreachable!()
722 };
723
724 let mut ops: SmallVec<[&Op; 1]> = smallvec![op];
725 let hint = match op.atom_len().cmp(&hint.rle_len()) {
726 std::cmp::Ordering::Less => {
727 let mut len = op.atom_len();
728 while len < hint.rle_len() {
729 let next = op_iter.next().unwrap();
730 len += next.atom_len();
731 ops.push(next);
732 }
733 assert!(len == hint.rle_len());
734 match event_hint_iter.next() {
735 Some(n) => o_hint.replace(n).unwrap(),
736 None => o_hint.take().unwrap(),
737 }
738 }
739 std::cmp::Ordering::Equal => match event_hint_iter.next() {
740 Some(n) => o_hint.replace(n).unwrap(),
741 None => o_hint.take().unwrap(),
742 },
743 std::cmp::Ordering::Greater => {
744 unreachable!("{:#?}", &op)
745 }
746 };
747
748 match &hint {
749 EventHint::InsertText { .. }
750 | EventHint::InsertList { .. }
751 | EventHint::DeleteText { .. }
752 | EventHint::DeleteList(_) => {}
753 _ => {
754 assert_eq!(ops.len(), 1);
755 }
756 }
757 match hint {
758 EventHint::Mark { start, end, style } => {
759 let mut meta = StyleMeta::default();
760 meta.insert(
761 style.key.clone(),
762 StyleMetaItem {
763 lamport,
764 peer: change.id.peer,
765 value: style.data,
766 },
767 );
768 let diff = DeltaRopeBuilder::new()
769 .retain(start as usize, Default::default())
770 .retain(
771 (end - start) as usize,
772 meta.to_option_map().unwrap_or_default().into(),
773 )
774 .build();
775 ans.push(TxnContainerDiff {
776 idx: op.container,
777 diff: Diff::Text(diff),
778 });
779 }
780 EventHint::InsertText { styles, pos, .. } => {
781 let mut delta: TextDiff = DeltaRopeBuilder::new()
782 .retain(pos as usize, Default::default())
783 .build();
784 for op in ops.iter() {
785 let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
786 else {
787 unreachable!()
788 };
789
790 delta.push_insert(
791 slice.clone().into(),
792 styles.to_option_map().unwrap_or_default().into(),
793 );
794 }
795 ans.push(TxnContainerDiff {
796 idx: op.container,
797 diff: Diff::Text(delta),
798 })
799 }
800 EventHint::DeleteText {
801 span,
802 unicode_len: _,
803 } => ans.push(TxnContainerDiff {
806 idx: op.container,
807 diff: Diff::Text(
808 DeltaRopeBuilder::new()
809 .retain(span.start() as usize, Default::default())
810 .delete(span.len())
811 .build(),
812 ),
813 }),
814 EventHint::InsertList { pos, .. } => {
815 for op in ops.iter() {
818 let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
819 let values = doc
820 .arena
821 .get_values(range.to_range())
822 .into_iter()
823 .map(|v| ValueOrHandler::from_value(v, &doc));
824 ans.push(TxnContainerDiff {
825 idx: op.container,
826 diff: Diff::List(
827 DeltaRopeBuilder::new()
828 .retain(pos, Default::default())
829 .insert_many(values, Default::default())
830 .build(),
831 ),
832 })
833 }
834 }
835 EventHint::DeleteList(s) => {
836 ans.push(TxnContainerDiff {
837 idx: op.container,
838 diff: Diff::List(
839 DeltaRopeBuilder::new()
840 .retain(s.start() as usize, Default::default())
841 .delete(s.len())
842 .build(),
843 ),
844 });
845 }
846 EventHint::Map { key, value } => ans.push(TxnContainerDiff {
847 idx: op.container,
848 diff: Diff::Map(ResolvedMapDelta::new().with_entry(
849 key,
850 ResolvedMapValue {
851 value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
852 idlp: IdLp::new(peer, lamport),
853 },
854 )),
855 }),
856 EventHint::Tree(tree_diff) => {
857 let mut diff = TreeDiff::default();
858 diff.diff.extend(tree_diff.into_iter());
859 ans.push(TxnContainerDiff {
860 idx: op.container,
861 diff: Diff::Tree(diff),
862 });
863 }
864 EventHint::Move { from, to, value } => {
865 let mut a = DeltaRopeBuilder::new()
866 .retain(from as usize, Default::default())
867 .delete(1)
868 .build();
869 a.compose(
870 &DeltaRopeBuilder::new()
871 .retain(to as usize, Default::default())
872 .insert(
873 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
874 ListDeltaMeta { from_move: true },
875 )
876 .build(),
877 );
878 ans.push(TxnContainerDiff {
879 idx: op.container,
880 diff: Diff::List(a),
881 });
882 }
883 EventHint::SetList { index, value } => {
884 ans.push(TxnContainerDiff {
885 idx: op.container,
886 diff: Diff::List(
887 DeltaRopeBuilder::new()
888 .retain(index, Default::default())
889 .delete(1)
890 .insert(
891 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
892 Default::default(),
893 )
894 .build(),
895 ),
896 });
897 }
898 EventHint::MarkEnd => {
899 }
901 #[cfg(feature = "counter")]
902 EventHint::Counter(diff) => {
903 ans.push(TxnContainerDiff {
904 idx: op.container,
905 diff: Diff::Counter(diff),
906 });
907 }
908 }
909
910 lamport += ops
911 .iter()
912 .map(|x| x.content_len() as Lamport)
913 .sum::<Lamport>();
914 }
915 ans
916}