1use core::panic;
2use std::{
3 borrow::Cow,
4 mem::take,
5 sync::{Arc, Weak},
6};
7
8use enum_as_inner::EnumAsInner;
9use fxhash::FxHashMap;
10use generic_btree::rle::{HasLength as RleHasLength, Mergeable as GBSliceable};
11use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
12use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
13use rle::{HasLength, Mergable, RleVec, Sliceable};
14use smallvec::{smallvec, SmallVec};
15
16use crate::{
17 change::{Change, Lamport, Timestamp},
18 container::{
19 idx::ContainerIdx,
20 list::list_op::{DeleteSpan, InnerListOp},
21 richtext::Style,
22 IntoContainerId,
23 },
24 delta::{ResolvedMapDelta, ResolvedMapValue, StyleMeta, StyleMetaItem, TreeDiff, TreeDiffItem},
25 encoding::export_fast_updates_in_range,
26 event::{Diff, ListDeltaMeta, TextDiff},
27 handler::{Handler, ValueOrHandler},
28 id::{Counter, PeerID, ID},
29 lock::LoroMutex,
30 loro::CommitOptions,
31 op::{Op, RawOp, RawOpContent},
32 pre_commit::{ChangeModifier, PreCommitCallbackPayload},
33 span::HasIdSpan,
34 version::Frontiers,
35 ChangeMeta, InternalString, LoroDoc, LoroDocInner, LoroError, LoroValue,
36};
37
38use super::{
39 arena::SharedArena,
40 event::{InternalContainerDiff, InternalDocDiff},
41 handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
42 oplog::OpLog,
43 state::DocState,
44};
45
46impl crate::LoroDoc {
47 #[inline(always)]
53 pub fn txn(&self) -> Result<Transaction, LoroError> {
54 self.txn_with_origin("")
55 }
56
57 pub fn txn_with_origin(&self, origin: &str) -> Result<Transaction, LoroError> {
62 if !self.can_edit() {
63 return Err(LoroError::TransactionError(
64 String::from("LoroDoc is in readonly detached mode. To make it writable in detached mode, call `set_detached_editing(true)`.").into_boxed_str(),
65 ));
66 }
67
68 let mut txn = Transaction::new_with_origin(self.inner.clone(), origin.into());
69
70 let obs = self.observer.clone();
71 let local_update_subs_weak = self.local_update_subs.downgrade();
72 txn.set_on_commit(Box::new(move |state, oplog, id_span| {
73 let mut state = state.lock().unwrap();
74 let events = state.take_events();
75 drop(state);
76 for event in events {
77 obs.emit(event);
78 }
79
80 if id_span.atom_len() == 0 {
81 return;
82 }
83
84 if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
85 if !local_update_subs.inner().is_empty() {
86 let bytes =
87 { export_fast_updates_in_range(&oplog.lock().unwrap(), &[id_span]) };
88 local_update_subs.emit(&(), bytes);
89 }
90 }
91 }));
92
93 Ok(txn)
94 }
95
96 pub fn start_auto_commit(&self) {
97 self.auto_commit
98 .store(true, std::sync::atomic::Ordering::Release);
99 let mut self_txn = self.txn.lock().unwrap();
100 if self_txn.is_some() || !self.can_edit() {
101 return;
102 }
103
104 let txn = self.txn().unwrap();
105 self_txn.replace(txn);
106 }
107
108 #[inline]
109 pub fn renew_txn_if_auto_commit(&self, options: Option<CommitOptions>) {
110 if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
111 let mut self_txn = self.txn.lock().unwrap();
112 if self_txn.is_some() {
113 return;
114 }
115
116 let mut txn = self.txn().unwrap();
117 if let Some(options) = options {
118 txn.set_options(options);
119 }
120 self_txn.replace(txn);
121 }
122 }
123}
124
125pub(crate) type OnCommitFn =
126 Box<dyn FnOnce(&Arc<LoroMutex<DocState>>, &Arc<LoroMutex<OpLog>>, IdSpan) + Sync + Send>;
127
128pub struct Transaction {
129 peer: PeerID,
130 origin: InternalString,
131 start_counter: Counter,
132 next_counter: Counter,
133 start_lamport: Lamport,
134 next_lamport: Lamport,
135 doc: Weak<LoroDocInner>,
136 frontiers: Frontiers,
137 local_ops: RleVec<[Op; 1]>, event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
139 pub(super) arena: SharedArena,
140 finished: bool,
141 on_commit: Option<OnCommitFn>,
142 timestamp: Option<Timestamp>,
143 msg: Option<Arc<str>>,
144 latest_timestamp: Timestamp,
145 pub(super) is_peer_first_appearance: bool,
146}
147
148impl std::fmt::Debug for Transaction {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("Transaction")
151 .field("peer", &self.peer)
152 .field("origin", &self.origin)
153 .field("start_counter", &self.start_counter)
154 .field("next_counter", &self.next_counter)
155 .field("start_lamport", &self.start_lamport)
156 .field("next_lamport", &self.next_lamport)
157 .field("frontiers", &self.frontiers)
158 .field("local_ops", &self.local_ops)
159 .field("event_hints", &self.event_hints)
160 .field("arena", &self.arena)
161 .field("finished", &self.finished)
162 .field("on_commit", &self.on_commit.is_some())
163 .field("timestamp", &self.timestamp)
164 .finish()
165 }
166}
167
168#[derive(Debug, Clone, EnumAsInner)]
176pub(super) enum EventHint {
177 Mark {
178 start: u32,
179 end: u32,
180 style: Style,
181 },
182 InsertText {
183 pos: u32,
185 event_len: u32,
186 unicode_len: u32,
187 styles: StyleMeta,
188 },
189 DeleteText {
191 span: DeleteSpan,
192 unicode_len: usize,
193 },
194 InsertList {
195 len: u32,
196 pos: usize,
197 },
198 SetList {
199 index: usize,
200 value: LoroValue,
201 },
202 Move {
203 value: LoroValue,
204 from: u32,
205 to: u32,
206 },
207 DeleteList(DeleteSpan),
208 Map {
209 key: InternalString,
210 value: Option<LoroValue>,
211 },
212 Tree(SmallVec<[TreeDiffItem; 1]>),
214 MarkEnd,
215 #[cfg(feature = "counter")]
216 Counter(f64),
217}
218
219impl generic_btree::rle::HasLength for EventHint {
220 fn rle_len(&self) -> usize {
221 match self {
222 EventHint::Mark { .. } => 1,
223 EventHint::InsertText {
224 unicode_len: len, ..
225 } => *len as usize,
226 EventHint::DeleteText { unicode_len, .. } => *unicode_len,
227 EventHint::InsertList { len, .. } => *len as usize,
228 EventHint::DeleteList(d) => d.len(),
229 EventHint::Map { .. } => 1,
230 EventHint::Tree(_) => 1,
231 EventHint::MarkEnd => 1,
232 EventHint::Move { .. } => 1,
233 EventHint::SetList { .. } => 1,
234 #[cfg(feature = "counter")]
235 EventHint::Counter(_) => 1,
236 }
237 }
238}
239
240impl generic_btree::rle::Mergeable for EventHint {
241 fn can_merge(&self, rhs: &Self) -> bool {
242 match (self, rhs) {
243 (
244 EventHint::InsertText {
245 pos,
246 unicode_len: _,
247 event_len,
248 styles,
249 },
250 EventHint::InsertText {
251 pos: r_pos,
252 styles: r_styles,
253 ..
254 },
255 ) => *pos + *event_len == *r_pos && styles == r_styles,
256 (EventHint::InsertList { pos, len }, EventHint::InsertList { pos: pos_right, .. }) => {
257 pos + *len as usize == *pos_right
258 }
259 (EventHint::DeleteText { span, .. }, EventHint::DeleteText { span: r, .. }) => {
263 span.is_mergable(r, &())
264 }
265 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.is_mergable(r, &()),
266 _ => false,
267 }
268 }
269
270 fn merge_right(&mut self, rhs: &Self) {
271 match (self, rhs) {
272 (
273 EventHint::InsertText {
274 event_len,
275 unicode_len: len,
276 ..
277 },
278 EventHint::InsertText {
279 event_len: r_event_len,
280 unicode_len: r_len,
281 ..
282 },
283 ) => {
284 *len += *r_len;
285 *event_len += *r_event_len;
286 }
287 (
288 EventHint::InsertList { len, pos: _ },
289 EventHint::InsertList { len: r_len, pos: _ },
290 ) => *len += *r_len,
291 (EventHint::DeleteList(l), EventHint::DeleteList(r)) => l.merge(r, &()),
292 (
293 EventHint::DeleteText { span, unicode_len },
294 EventHint::DeleteText {
295 span: r_span,
296 unicode_len: r_len,
297 },
298 ) => {
299 *unicode_len += *r_len;
300 span.merge(r_span, &());
301 }
302 _ => unreachable!(),
303 }
304 }
305
306 fn merge_left(&mut self, _: &Self) {
307 unreachable!()
308 }
309}
310
311impl Transaction {
312 #[inline]
313 pub fn new(doc: Arc<LoroDocInner>) -> Self {
314 Self::new_with_origin(doc.clone(), "".into())
315 }
316
317 pub fn new_with_origin(doc: Arc<LoroDocInner>, origin: InternalString) -> Self {
318 let oplog_lock = doc.oplog.lock().unwrap();
319 let mut state_lock = doc.state.lock().unwrap();
320 if state_lock.is_in_txn() {
321 panic!("Cannot start a transaction while another one is in progress");
322 }
323
324 state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
325 let arena = state_lock.arena.clone();
326 let frontiers = state_lock.frontiers.clone();
327 let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
328 let next_counter = oplog_lock.next_id(peer).counter;
329 let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
330 let latest_timestamp = oplog_lock.get_greatest_timestamp(&frontiers);
331 oplog_lock
332 .check_change_greater_than_last_peer_id(peer, next_counter, &frontiers)
333 .unwrap();
334 drop(state_lock);
335 drop(oplog_lock);
336 Self {
337 peer,
338 doc: Arc::downgrade(&doc),
339 arena,
340 frontiers,
341 timestamp: None,
342 next_counter,
343 next_lamport,
344 origin: Default::default(),
345 start_counter: next_counter,
346 start_lamport: next_lamport,
347 event_hints: Default::default(),
348 local_ops: RleVec::new(),
349 finished: false,
350 on_commit: None,
351 msg: None,
352 latest_timestamp,
353 is_peer_first_appearance: false,
354 }
355 }
356
357 pub fn set_origin(&mut self, origin: InternalString) {
358 self.origin = origin;
359 }
360
361 pub fn set_timestamp(&mut self, time: Timestamp) {
362 self.timestamp = Some(time);
363 }
364
365 pub fn set_msg(&mut self, msg: Option<Arc<str>>) {
366 self.msg = msg;
367 }
368
369 pub fn local_ops(&self) -> &RleVec<[Op; 1]> {
370 &self.local_ops
371 }
372
373 pub fn peer(&self) -> &PeerID {
374 &self.peer
375 }
376
377 pub fn timestamp(&self) -> &Option<Timestamp> {
378 &self.timestamp
379 }
380
381 pub fn frontiers(&self) -> &Frontiers {
382 &self.frontiers
383 }
384 pub fn msg(&self) -> &Option<Arc<str>> {
385 &self.msg
386 }
387
388 pub fn lamport(&self) -> &Lamport {
389 &self.start_lamport
390 }
391
392 pub(crate) fn set_on_commit(&mut self, f: OnCommitFn) {
393 self.on_commit = Some(f);
394 }
395
396 pub(crate) fn take_on_commit(&mut self) -> Option<OnCommitFn> {
397 self.on_commit.take()
398 }
399
400 pub fn commit(mut self) -> Result<Option<CommitOptions>, LoroError> {
401 self._commit()
402 }
403
404 #[tracing::instrument(level = "debug", skip(self))]
405 fn _commit(&mut self) -> Result<Option<CommitOptions>, LoroError> {
406 if self.finished {
407 return Ok(None);
408 }
409
410 let Some(doc) = self.doc.upgrade() else {
411 return Ok(None);
412 };
413 self.finished = true;
414 if self.local_ops.is_empty() {
415 let mut state = doc.state.lock().unwrap();
416 state.abort_txn();
417 return Ok(Some(self.take_options()));
418 }
419
420 let ops = std::mem::take(&mut self.local_ops);
421 let deps = take(&mut self.frontiers);
422 let change = Change {
423 lamport: self.start_lamport,
424 ops,
425 deps,
426 id: ID::new(self.peer, self.start_counter),
427 timestamp: self.latest_timestamp.max(
428 self.timestamp
429 .unwrap_or_else(|| doc.oplog.lock().unwrap().get_timestamp_for_next_txn()),
430 ),
431 commit_msg: take(&mut self.msg),
432 };
433
434 let change_meta = ChangeMeta::from_change(&change);
435 {
436 let mut oplog = doc.oplog.lock().unwrap();
438 oplog.set_uncommitted_change(change);
439 }
440
441 let modifier = ChangeModifier::default();
442 doc.pre_commit_subs.emit(
443 &(),
444 PreCommitCallbackPayload {
445 change_meta,
446 origin: self.origin.to_string(),
447 modifier: modifier.clone(),
448 },
449 );
450
451 let mut oplog = doc.oplog.lock().unwrap();
452 let mut state = doc.state.lock().unwrap();
453
454 let mut change = oplog.uncommitted_change.take().unwrap();
455 modifier.modify_change(&mut change);
456 let diff = if state.is_recording() {
457 Some(change_to_diff(
458 &change,
459 doc.clone(),
460 std::mem::take(&mut self.event_hints),
461 ))
462 } else {
463 None
464 };
465
466 let last_id = change.id_last();
467 if let Err(err) = oplog.import_local_change(change) {
468 state.abort_txn();
469 drop(state);
470 drop(oplog);
471 return Err(err);
472 }
473
474 state.commit_txn(
475 Frontiers::from_id(last_id),
476 diff.map(|arr| InternalDocDiff {
477 by: crate::event::EventTriggerKind::Local,
478 origin: self.origin.clone(),
479 diff: Cow::Owned(
480 arr.into_iter()
481 .map(|x| InternalContainerDiff {
482 idx: x.idx,
483 bring_back: false,
484 diff: (x.diff.into()),
485 diff_mode: crate::diff_calc::DiffMode::Linear,
486 })
487 .collect(),
488 ),
489 new_version: Cow::Borrowed(oplog.frontiers()),
490 }),
491 );
492 drop(state);
493 drop(oplog);
494 if let Some(on_commit) = self.on_commit.take() {
495 assert!(!doc.txn.is_locked());
496 on_commit(&doc.state.clone(), &doc.oplog.clone(), self.id_span());
497 }
498 Ok(None)
499 }
500
501 fn take_options(&self) -> CommitOptions {
502 let mut options = CommitOptions::new();
503 if !self.origin.is_empty() {
504 options = options.origin(self.origin.as_str());
505 }
506 if let Some(msg) = self.msg.as_ref() {
507 options = options.commit_msg(msg);
508 }
509 if let Some(timestamp) = self.timestamp {
510 options = options.timestamp(timestamp);
511 }
512 options
513 }
514
515 pub(super) fn apply_local_op(
516 &mut self,
517 container: ContainerIdx,
518 content: RawOpContent,
519 event: EventHint,
520 doc: &LoroDoc,
522 ) -> LoroResult<()> {
523 let this_doc = self.doc.upgrade().unwrap();
525 if Arc::as_ptr(&this_doc.state) != Arc::as_ptr(&doc.state) {
526 return Err(LoroError::UnmatchedContext {
527 expected: this_doc
528 .state
529 .lock()
530 .unwrap()
531 .peer
532 .load(std::sync::atomic::Ordering::Relaxed),
533 found: doc
534 .state
535 .lock()
536 .unwrap()
537 .peer
538 .load(std::sync::atomic::Ordering::Relaxed),
539 });
540 }
541
542 let len = content.content_len();
543 assert!(len > 0);
544 let raw_op = RawOp {
545 id: ID {
546 peer: self.peer,
547 counter: self.next_counter,
548 },
549 lamport: self.next_lamport,
550 container,
551 content,
552 };
553
554 let mut oplog = doc.oplog.lock().unwrap();
555 let mut state = doc.state.lock().unwrap();
556 if state.is_deleted(container) {
557 return Err(LoroError::ContainerDeleted {
558 container: Box::new(state.arena.idx_to_id(container).unwrap()),
559 });
560 }
561
562 let op = self.arena.convert_raw_op(&raw_op);
563 state.apply_local_op(&raw_op, &op)?;
564 {
565 if !self.is_peer_first_appearance && !oplog.dag.latest_vv_contains_peer(self.peer) {
566 self.is_peer_first_appearance = true;
567 }
568 let dep_id = Frontiers::from_id(ID::new(self.peer, self.next_counter - 1));
570 let start_id = ID::new(self.peer, self.next_counter);
571 self.next_counter += len as Counter;
572 oplog.dag.update_version_on_new_local_op(
573 if self.local_ops.is_empty() {
574 &self.frontiers
575 } else {
576 &dep_id
577 },
578 start_id,
579 self.next_lamport,
580 len,
581 );
582 self.next_lamport += len as Lamport;
583 let last_id = start_id.inc(len as Counter - 1);
585 state.frontiers = Frontiers::from_id(last_id);
586 };
587 drop(state);
588 drop(oplog);
589 debug_assert_eq!(
590 event.rle_len(),
591 op.atom_len(),
592 "event:{:#?} \nop:{:#?}",
593 &event,
594 &op
595 );
596
597 let container_hints = self.event_hints.entry(container).or_default();
598
599 match container_hints.last_mut() {
600 Some(last) if last.can_merge(&event) => {
601 last.merge_right(&event);
602 }
603 _ => {
604 container_hints.push(event);
605 }
606 }
607 self.local_ops.push(op);
608 Ok(())
609 }
610
611 pub fn get_text<I: IntoContainerId>(&self, id: I) -> TextHandler {
614 let id = id.into_container_id(&self.arena, ContainerType::Text);
615 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
616 .into_text()
617 .unwrap()
618 }
619
620 pub fn get_list<I: IntoContainerId>(&self, id: I) -> ListHandler {
623 let id = id.into_container_id(&self.arena, ContainerType::List);
624 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
625 .into_list()
626 .unwrap()
627 }
628
629 pub fn get_map<I: IntoContainerId>(&self, id: I) -> MapHandler {
632 let id = id.into_container_id(&self.arena, ContainerType::Map);
633 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
634 .into_map()
635 .unwrap()
636 }
637
638 pub fn get_tree<I: IntoContainerId>(&self, id: I) -> TreeHandler {
641 let id = id.into_container_id(&self.arena, ContainerType::Tree);
642 Handler::new_attached(id, LoroDoc::from_inner(self.doc.upgrade().unwrap()))
643 .into_tree()
644 .unwrap()
645 }
646 pub fn next_id(&self) -> ID {
647 ID {
648 peer: self.peer,
649 counter: self.next_counter,
650 }
651 }
652
653 #[inline]
654 pub fn id_span(&self) -> IdSpan {
655 IdSpan::new(self.peer, self.start_counter, self.next_counter)
656 }
657
658 pub fn next_idlp(&self) -> IdLp {
659 IdLp {
660 peer: self.peer,
661 lamport: self.next_lamport,
662 }
663 }
664
665 pub fn is_empty(&self) -> bool {
666 self.local_ops.is_empty()
667 }
668
669 pub(crate) fn len(&self) -> usize {
670 (self.next_counter - self.start_counter) as usize
671 }
672
673 pub(crate) fn set_options(&mut self, options: CommitOptions) {
674 self.origin = options.origin.unwrap_or_default();
675 self.msg = options.commit_msg;
676 self.timestamp = options.timestamp;
677 }
678
679 pub(crate) fn set_default_options(&mut self, default_options: crate::loro::CommitOptions) {
680 if self.origin.is_empty() {
681 self.origin = default_options.origin.unwrap_or_default();
682 }
683 if self.msg.is_none() {
684 self.msg = default_options.commit_msg;
685 }
686 if self.timestamp.is_none() {
687 self.timestamp = default_options.timestamp;
688 }
689 }
690}
691
692impl Drop for Transaction {
693 #[tracing::instrument(level = "debug", skip(self))]
694 fn drop(&mut self) {
695 if !self.finished {
696 self._commit().unwrap();
699 }
700 }
701}
702
703#[derive(Debug, Clone)]
704pub(crate) struct TxnContainerDiff {
705 pub(crate) idx: ContainerIdx,
706 pub(crate) diff: Diff,
707}
708
709fn change_to_diff(
711 change: &Change,
712 doc: Arc<LoroDocInner>,
713 event_hints: FxHashMap<ContainerIdx, Vec<EventHint>>,
714) -> Vec<TxnContainerDiff> {
715 let mut ans: Vec<TxnContainerDiff> = Vec::with_capacity(change.ops.len());
716 let peer = change.id.peer;
717 let mut lamport = change.lamport;
718
719 let mut ops_by_container: FxHashMap<ContainerIdx, Vec<Op>> = FxHashMap::default();
721 for op in change.ops.iter() {
722 ops_by_container
723 .entry(op.container)
724 .or_default()
725 .push(op.clone());
726 }
727
728 for (container_idx, hints) in event_hints {
730 let Some(container_ops) = ops_by_container.get_mut(&container_idx) else {
731 continue;
732 };
733
734 let mut op_index = 0;
735 let mut hint_iter = hints.into_iter();
736 let mut current_hint = hint_iter.next();
737
738 while op_index < container_ops.len() {
739 let Some(hint) = current_hint.take() else {
740 unreachable!("Missing hint for op");
741 };
742
743 let mut ops_for_hint: SmallVec<[Op; 1]> = smallvec![container_ops[op_index].clone()];
745 let mut total_len = container_ops[op_index].atom_len();
746
747 while total_len < hint.rle_len() {
749 op_index += 1;
750 let next_op_len = container_ops[op_index].atom_len();
751 let op = if next_op_len + total_len > hint.rle_len() {
752 let new_len = hint.rle_len() - total_len;
753 let left = container_ops[op_index].slice(0, new_len);
754 let right = container_ops[op_index].slice(new_len, next_op_len);
755 container_ops[op_index] = right;
756 op_index -= 1;
757 left
758 } else {
759 container_ops[op_index].clone()
760 };
761
762 total_len += op.atom_len();
763 ops_for_hint.push(op);
764 }
765
766 op_index += 1;
767 assert_eq!(total_len, hint.rle_len(), "Op/hint length mismatch");
768
769 current_hint = hint_iter.next();
771
772 match hint {
774 EventHint::Mark { start, end, style } => {
775 let mut meta = StyleMeta::default();
776 meta.insert(
777 style.key.clone(),
778 StyleMetaItem {
779 lamport,
780 peer: change.id.peer,
781 value: style.data,
782 },
783 );
784 let diff = DeltaRopeBuilder::new()
785 .retain(start as usize, Default::default())
786 .retain(
787 (end - start) as usize,
788 meta.to_option_map().unwrap_or_default().into(),
789 )
790 .build();
791 ans.push(TxnContainerDiff {
792 idx: container_idx,
793 diff: Diff::Text(diff),
794 });
795 }
796 EventHint::InsertText { styles, pos, .. } => {
797 let mut delta: TextDiff = DeltaRopeBuilder::new()
798 .retain(pos as usize, Default::default())
799 .build();
800 for op in ops_for_hint.iter() {
801 let InnerListOp::InsertText { slice, .. } = op.content.as_list().unwrap()
802 else {
803 unreachable!()
804 };
805
806 delta.push_insert(
807 slice.clone().into(),
808 styles.to_option_map().unwrap_or_default().into(),
809 );
810 }
811 ans.push(TxnContainerDiff {
812 idx: container_idx,
813 diff: Diff::Text(delta),
814 })
815 }
816 EventHint::DeleteText {
817 span,
818 unicode_len: _,
819 } => ans.push(TxnContainerDiff {
822 idx: container_idx,
823 diff: Diff::Text(
824 DeltaRopeBuilder::new()
825 .retain(span.start() as usize, Default::default())
826 .delete(span.len())
827 .build(),
828 ),
829 }),
830 EventHint::InsertList { pos, .. } => {
831 let mut index = pos;
834 for op in ops_for_hint.iter() {
835 let (range, _) = op.content.as_list().unwrap().as_insert().unwrap();
836 let values = doc
837 .arena
838 .get_values(range.to_range())
839 .into_iter()
840 .map(|v| ValueOrHandler::from_value(v, &doc));
841 let len = values.len();
842 ans.push(TxnContainerDiff {
843 idx: container_idx,
844 diff: Diff::List(
845 DeltaRopeBuilder::new()
846 .retain(index, Default::default())
847 .insert_many(values, Default::default())
848 .build(),
849 ),
850 });
851 index += len;
852 }
853 }
854 EventHint::DeleteList(s) => {
855 ans.push(TxnContainerDiff {
856 idx: container_idx,
857 diff: Diff::List(
858 DeltaRopeBuilder::new()
859 .retain(s.start() as usize, Default::default())
860 .delete(s.len())
861 .build(),
862 ),
863 });
864 }
865 EventHint::Map { key, value } => ans.push(TxnContainerDiff {
866 idx: container_idx,
867 diff: Diff::Map(ResolvedMapDelta::new().with_entry(
868 key,
869 ResolvedMapValue {
870 value: value.map(|v| ValueOrHandler::from_value(v, &doc)),
871 idlp: IdLp::new(peer, lamport),
872 },
873 )),
874 }),
875 EventHint::Tree(tree_diff) => {
876 let mut diff = TreeDiff::default();
877 diff.diff.extend(tree_diff.into_iter());
878 ans.push(TxnContainerDiff {
879 idx: container_idx,
880 diff: Diff::Tree(diff),
881 });
882 }
883 EventHint::Move { from, to, value } => {
884 let mut a = DeltaRopeBuilder::new()
885 .retain(from as usize, Default::default())
886 .delete(1)
887 .build();
888 a.compose(
889 &DeltaRopeBuilder::new()
890 .retain(to as usize, Default::default())
891 .insert(
892 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
893 ListDeltaMeta { from_move: true },
894 )
895 .build(),
896 );
897 ans.push(TxnContainerDiff {
898 idx: container_idx,
899 diff: Diff::List(a),
900 });
901 }
902 EventHint::SetList { index, value } => {
903 ans.push(TxnContainerDiff {
904 idx: container_idx,
905 diff: Diff::List(
906 DeltaRopeBuilder::new()
907 .retain(index, Default::default())
908 .delete(1)
909 .insert(
910 ArrayVec::from([ValueOrHandler::from_value(value, &doc)]),
911 Default::default(),
912 )
913 .build(),
914 ),
915 });
916 }
917 EventHint::MarkEnd => {
918 }
920 #[cfg(feature = "counter")]
921 EventHint::Counter(diff) => {
922 ans.push(TxnContainerDiff {
923 idx: container_idx,
924 diff: Diff::Counter(diff),
925 });
926 }
927 }
928
929 lamport += ops_for_hint
931 .iter()
932 .map(|x| x.content_len() as Lamport)
933 .sum::<Lamport>();
934 }
935 }
936
937 ans
938}