1use std::{
2 collections::VecDeque,
3 sync::{atomic::AtomicU64, Arc, Mutex},
4};
5
6use either::Either;
7use fxhash::FxHashMap;
8use loro_common::{
9 ContainerID, Counter, CounterSpan, HasIdSpan, IdSpan, LoroResult, LoroValue, PeerID,
10};
11use tracing::{debug_span, info_span, instrument};
12
13use crate::{
14 change::{get_sys_timestamp, Timestamp},
15 cursor::{AbsolutePosition, Cursor},
16 delta::TreeExternalDiff,
17 event::{Diff, EventTriggerKind},
18 version::Frontiers,
19 ContainerDiff, DiffEvent, DocDiff, LoroDoc, Subscription,
20};
21
22#[derive(Debug, Clone, Default)]
26pub struct DiffBatch {
27 pub cid_to_events: FxHashMap<ContainerID, Diff>,
28 pub order: Vec<ContainerID>,
29}
30
31impl DiffBatch {
32 pub fn new(diff: Vec<DocDiff>) -> Self {
33 let mut map: FxHashMap<ContainerID, Diff> = Default::default();
34 let mut order: Vec<ContainerID> = Vec::with_capacity(diff.len());
35 for d in diff.into_iter() {
36 for item in d.diff.into_iter() {
37 let old = map.insert(item.id.clone(), item.diff);
38 assert!(old.is_none());
39 order.push(item.id.clone());
40 }
41 }
42
43 Self {
44 cid_to_events: map,
45 order,
46 }
47 }
48
49 pub fn compose(&mut self, other: &Self) {
50 if other.cid_to_events.is_empty() {
51 return;
52 }
53
54 for (id, diff) in other.iter() {
55 if let Some(this_diff) = self.cid_to_events.get_mut(id) {
56 this_diff.compose_ref(diff);
57 } else {
58 self.cid_to_events.insert(id.clone(), diff.clone());
59 self.order.push(id.clone());
60 }
61 }
62 }
63
64 pub fn transform(&mut self, other: &Self, left_priority: bool) {
65 if other.cid_to_events.is_empty() || self.cid_to_events.is_empty() {
66 return;
67 }
68
69 for (idx, diff) in self.cid_to_events.iter_mut() {
70 if let Some(b_diff) = other.cid_to_events.get(idx) {
71 diff.transform(b_diff, left_priority);
72 }
73 }
74 }
75
76 pub fn clear(&mut self) {
77 self.cid_to_events.clear();
78 self.order.clear();
79 }
80
81 pub fn iter(&self) -> impl Iterator<Item = (&ContainerID, &Diff)> + '_ {
82 self.order
83 .iter()
84 .map(|cid| (cid, self.cid_to_events.get(cid).unwrap()))
85 }
86
87 #[allow(clippy::should_implement_trait)]
88 pub fn into_iter(self) -> impl Iterator<Item = (ContainerID, Diff)> {
89 let mut cid_to_events = self.cid_to_events;
90 self.order.into_iter().map(move |cid| {
91 let d = cid_to_events.remove(&cid).unwrap();
92 (cid, d)
93 })
94 }
95}
96
97fn transform_cursor(
98 cursor_with_pos: &mut CursorWithPos,
99 remote_diff: &DiffBatch,
100 doc: &LoroDoc,
101 container_remap: &FxHashMap<ContainerID, ContainerID>,
102) {
103 let mut cid = &cursor_with_pos.cursor.container;
104 while let Some(new_cid) = container_remap.get(cid) {
105 cid = new_cid;
106 }
107
108 if let Some(diff) = remote_diff.cid_to_events.get(cid) {
109 let new_pos = diff.transform_cursor(cursor_with_pos.pos.pos, false);
110 cursor_with_pos.pos.pos = new_pos;
111 };
112
113 let new_pos = cursor_with_pos.pos.pos;
114 match doc.get_handler(cid.clone()).unwrap() {
115 crate::handler::Handler::Text(h) => {
116 let Some(new_cursor) = h.get_cursor_internal(new_pos, cursor_with_pos.pos.side, false)
117 else {
118 return;
119 };
120
121 cursor_with_pos.cursor = new_cursor;
122 }
123 crate::handler::Handler::List(h) => {
124 let Some(new_cursor) = h.get_cursor(new_pos, cursor_with_pos.pos.side) else {
125 return;
126 };
127
128 cursor_with_pos.cursor = new_cursor;
129 }
130 crate::handler::Handler::MovableList(h) => {
131 let Some(new_cursor) = h.get_cursor(new_pos, cursor_with_pos.pos.side) else {
132 return;
133 };
134
135 cursor_with_pos.cursor = new_cursor;
136 }
137 crate::handler::Handler::Map(_) => {}
138 crate::handler::Handler::Tree(_) => {}
139 crate::handler::Handler::Unknown(_) => {}
140 #[cfg(feature = "counter")]
141 crate::handler::Handler::Counter(_) => {}
142 }
143}
144
145pub struct UndoManager {
152 peer: Arc<AtomicU64>,
153 container_remap: Arc<Mutex<FxHashMap<ContainerID, ContainerID>>>,
154 inner: Arc<Mutex<UndoManagerInner>>,
155 _peer_id_change_sub: Subscription,
156 _undo_sub: Subscription,
157 doc: LoroDoc,
158}
159
160impl std::fmt::Debug for UndoManager {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("UndoManager")
163 .field("peer", &self.peer)
164 .field("container_remap", &self.container_remap)
165 .field("inner", &self.inner)
166 .finish()
167 }
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum UndoOrRedo {
172 Undo,
173 Redo,
174}
175
176impl UndoOrRedo {
177 fn opposite(&self) -> UndoOrRedo {
178 match self {
179 Self::Undo => Self::Redo,
180 Self::Redo => Self::Undo,
181 }
182 }
183}
184
185pub type OnPush = Box<
188 dyn for<'a> Fn(UndoOrRedo, CounterSpan, Option<DiffEvent<'a>>) -> UndoItemMeta + Send + Sync,
189>;
190pub type OnPop = Box<dyn Fn(UndoOrRedo, CounterSpan, UndoItemMeta) + Send + Sync>;
191
192struct UndoManagerInner {
193 next_counter: Option<Counter>,
194 undo_stack: Stack,
195 redo_stack: Stack,
196 processing_undo: bool,
197 last_undo_time: i64,
198 merge_interval_in_ms: i64,
199 max_stack_size: usize,
200 exclude_origin_prefixes: Vec<Box<str>>,
201 last_popped_selection: Option<Vec<CursorWithPos>>,
202 on_push: Option<OnPush>,
203 on_pop: Option<OnPop>,
204}
205
206impl std::fmt::Debug for UndoManagerInner {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 f.debug_struct("UndoManagerInner")
209 .field("latest_counter", &self.next_counter)
210 .field("undo_stack", &self.undo_stack)
211 .field("redo_stack", &self.redo_stack)
212 .field("processing_undo", &self.processing_undo)
213 .field("last_undo_time", &self.last_undo_time)
214 .field("merge_interval", &self.merge_interval_in_ms)
215 .field("max_stack_size", &self.max_stack_size)
216 .field("exclude_origin_prefixes", &self.exclude_origin_prefixes)
217 .finish()
218 }
219}
220
221#[derive(Debug)]
222struct Stack {
223 stack: VecDeque<(VecDeque<StackItem>, Arc<Mutex<DiffBatch>>)>,
224 size: usize,
225}
226
227#[derive(Debug, Clone)]
228struct StackItem {
229 span: CounterSpan,
230 meta: UndoItemMeta,
231}
232
233#[derive(Debug, Default, Clone)]
238pub struct UndoItemMeta {
239 pub value: LoroValue,
240 pub cursors: Vec<CursorWithPos>,
241}
242
243#[derive(Debug, Clone)]
244pub struct CursorWithPos {
245 pub cursor: Cursor,
246 pub pos: AbsolutePosition,
247}
248
249impl UndoItemMeta {
250 pub fn new() -> Self {
251 Self {
252 value: LoroValue::Null,
253 cursors: Default::default(),
254 }
255 }
256
257 pub fn add_cursor(&mut self, cursor: &Cursor) {
262 self.cursors.push(CursorWithPos {
263 cursor: cursor.clone(),
264 pos: AbsolutePosition {
265 pos: cursor.origin_pos,
266 side: cursor.side,
267 },
268 });
269 }
270
271 pub fn set_value(&mut self, value: LoroValue) {
272 self.value = value;
273 }
274}
275
276impl Stack {
277 pub fn new() -> Self {
278 let mut stack = VecDeque::new();
279 stack.push_back((VecDeque::new(), Arc::new(Mutex::new(Default::default()))));
280 Stack { stack, size: 0 }
281 }
282
283 pub fn pop(&mut self) -> Option<(StackItem, Arc<Mutex<DiffBatch>>)> {
284 while self.stack.back().unwrap().0.is_empty() && self.stack.len() > 1 {
285 let (_, diff) = self.stack.pop_back().unwrap();
286 let diff = diff.try_lock().unwrap();
287 if !diff.cid_to_events.is_empty() {
288 self.stack
289 .back_mut()
290 .unwrap()
291 .1
292 .try_lock()
293 .unwrap()
294 .compose(&diff);
295 }
296 }
297
298 if self.stack.len() == 1 && self.stack.back().unwrap().0.is_empty() {
299 self.stack.back_mut().unwrap().1.try_lock().unwrap().clear();
301 return None;
302 }
303
304 self.size -= 1;
305 let last = self.stack.back_mut().unwrap();
306 last.0.pop_back().map(|x| (x, last.1.clone()))
307 }
311
312 pub fn push(&mut self, span: CounterSpan, meta: UndoItemMeta) {
313 self.push_with_merge(span, meta, false)
314 }
315
316 pub fn push_with_merge(&mut self, span: CounterSpan, meta: UndoItemMeta, can_merge: bool) {
317 let last = self.stack.back_mut().unwrap();
318 let last_remote_diff = last.1.try_lock().unwrap();
319 if !last_remote_diff.cid_to_events.is_empty() {
320 drop(last_remote_diff);
322 let mut v = VecDeque::new();
323 v.push_back(StackItem { span, meta });
324 self.stack
325 .push_back((v, Arc::new(Mutex::new(DiffBatch::default()))));
326
327 self.size += 1;
328 } else {
329 if can_merge {
330 if let Some(last_span) = last.0.back_mut() {
331 if last_span.span.end == span.start {
332 last_span.span.end = span.end;
334 return;
335 }
336 }
337 }
338
339 self.size += 1;
340 last.0.push_back(StackItem { span, meta });
341 }
342 }
343
344 pub fn compose_remote_event(&mut self, diff: &[&ContainerDiff]) {
345 if self.is_empty() {
346 return;
347 }
348
349 let remote_diff = &mut self.stack.back_mut().unwrap().1;
350 let mut remote_diff = remote_diff.try_lock().unwrap();
351 for e in diff {
352 if let Some(d) = remote_diff.cid_to_events.get_mut(&e.id) {
353 d.compose_ref(&e.diff);
354 } else {
355 remote_diff
356 .cid_to_events
357 .insert(e.id.clone(), e.diff.clone());
358 remote_diff.order.push(e.id.clone());
359 }
360 }
361 }
362
363 pub fn transform_based_on_this_delta(&mut self, diff: &DiffBatch) {
364 if self.is_empty() {
365 return;
366 }
367 let remote_diff = &mut self.stack.back_mut().unwrap().1;
368 remote_diff.try_lock().unwrap().transform(diff, false);
369 }
370
371 pub fn clear(&mut self) {
372 self.stack = VecDeque::new();
373 self.stack.push_back((VecDeque::new(), Default::default()));
374 self.size = 0;
375 }
376
377 pub fn is_empty(&self) -> bool {
378 self.size == 0
379 }
380
381 pub fn len(&self) -> usize {
382 self.size
383 }
384
385 fn pop_front(&mut self) {
386 if self.is_empty() {
387 return;
388 }
389
390 self.size -= 1;
391 let first = self.stack.front_mut().unwrap();
392 let f = first.0.pop_front();
393 assert!(f.is_some());
394 if first.0.is_empty() {
395 self.stack.pop_front();
396 }
397 }
398}
399
400impl Default for Stack {
401 fn default() -> Self {
402 Stack::new()
403 }
404}
405
406impl UndoManagerInner {
407 fn new(last_counter: Counter) -> Self {
408 Self {
409 next_counter: Some(last_counter),
410 undo_stack: Default::default(),
411 redo_stack: Default::default(),
412 processing_undo: false,
413 merge_interval_in_ms: 0,
414 last_undo_time: 0,
415 max_stack_size: usize::MAX,
416 exclude_origin_prefixes: vec![],
417 last_popped_selection: None,
418 on_pop: None,
419 on_push: None,
420 }
421 }
422
423 fn record_checkpoint(&mut self, latest_counter: Counter, event: Option<DiffEvent>) {
424 if Some(latest_counter) == self.next_counter {
425 return;
426 }
427
428 if self.next_counter.is_none() {
429 self.next_counter = Some(latest_counter);
430 return;
431 }
432
433 assert!(self.next_counter.unwrap() < latest_counter);
434 let now = get_sys_timestamp() as Timestamp;
435 let span = CounterSpan::new(self.next_counter.unwrap(), latest_counter);
436 let meta = self
437 .on_push
438 .as_ref()
439 .map(|x| x(UndoOrRedo::Undo, span, event))
440 .unwrap_or_default();
441
442 if !self.undo_stack.is_empty() && now - self.last_undo_time < self.merge_interval_in_ms {
443 self.undo_stack.push_with_merge(span, meta, true);
444 } else {
445 self.last_undo_time = now;
446 self.undo_stack.push(span, meta);
447 }
448
449 self.next_counter = Some(latest_counter);
450 self.redo_stack.clear();
451 while self.undo_stack.len() > self.max_stack_size {
452 self.undo_stack.pop_front();
453 }
454 }
455}
456
457fn get_counter_end(doc: &LoroDoc, peer: PeerID) -> Counter {
458 doc.oplog()
459 .try_lock()
460 .unwrap()
461 .vv()
462 .get(&peer)
463 .cloned()
464 .unwrap_or(0)
465}
466
467impl UndoManager {
468 pub fn new(doc: &LoroDoc) -> Self {
469 let peer = Arc::new(AtomicU64::new(doc.peer_id()));
470 let peer_clone = peer.clone();
471 let peer_clone2 = peer.clone();
472 let inner = Arc::new(Mutex::new(UndoManagerInner::new(get_counter_end(
473 doc,
474 doc.peer_id(),
475 ))));
476 let inner_clone = inner.clone();
477 let inner_clone2 = inner.clone();
478 let remap_containers = Arc::new(Mutex::new(FxHashMap::default()));
479 let remap_containers_clone = remap_containers.clone();
480 let undo_sub = doc.subscribe_root(Arc::new(move |event| match event.event_meta.by {
481 EventTriggerKind::Local => {
482 let Ok(mut inner) = inner_clone.try_lock() else {
485 return;
486 };
487 if inner.processing_undo {
488 return;
489 }
490 if let Some(id) = event
491 .event_meta
492 .to
493 .iter()
494 .find(|x| x.peer == peer_clone.load(std::sync::atomic::Ordering::Relaxed))
495 {
496 if inner
497 .exclude_origin_prefixes
498 .iter()
499 .any(|x| event.event_meta.origin.starts_with(&**x))
500 {
501 inner.undo_stack.compose_remote_event(event.events);
505 inner.redo_stack.compose_remote_event(event.events);
506 inner.next_counter = Some(id.counter + 1);
507 } else {
508 inner.record_checkpoint(id.counter + 1, Some(event));
509 }
510 }
511 }
512 EventTriggerKind::Import => {
513 let mut inner = inner_clone.try_lock().unwrap();
514
515 for e in event.events {
516 if let Diff::Tree(tree) = &e.diff {
517 for item in &tree.diff {
518 let target = item.target;
519 if let TreeExternalDiff::Create { .. } = &item.action {
520 remap_containers_clone
523 .try_lock()
524 .unwrap()
525 .remove(&target.associated_meta_container());
526 }
527 }
528 }
529 }
530
531 inner.undo_stack.compose_remote_event(event.events);
532 inner.redo_stack.compose_remote_event(event.events);
533 }
534 EventTriggerKind::Checkout => {
535 let mut inner = inner_clone.try_lock().unwrap();
536 inner.undo_stack.clear();
537 inner.redo_stack.clear();
538 inner.next_counter = None;
539 }
540 }));
541
542 let sub = doc.subscribe_peer_id_change(Box::new(move |id| {
543 let mut inner = inner_clone2.try_lock().unwrap();
544 inner.undo_stack.clear();
545 inner.redo_stack.clear();
546 inner.next_counter = Some(id.counter);
547 peer_clone2.store(id.peer, std::sync::atomic::Ordering::Relaxed);
548 true
549 }));
550
551 UndoManager {
552 peer,
553 container_remap: remap_containers,
554 inner,
555 _peer_id_change_sub: sub,
556 _undo_sub: undo_sub,
557 doc: doc.clone(),
558 }
559 }
560
561 pub fn peer(&self) -> PeerID {
562 self.peer.load(std::sync::atomic::Ordering::Relaxed)
563 }
564
565 pub fn set_merge_interval(&mut self, interval: i64) {
566 self.inner.try_lock().unwrap().merge_interval_in_ms = interval;
567 }
568
569 pub fn set_max_undo_steps(&mut self, size: usize) {
570 self.inner.try_lock().unwrap().max_stack_size = size;
571 }
572
573 pub fn add_exclude_origin_prefix(&mut self, prefix: &str) {
574 self.inner
575 .try_lock()
576 .unwrap()
577 .exclude_origin_prefixes
578 .push(prefix.into());
579 }
580
581 pub fn record_new_checkpoint(&mut self) -> LoroResult<()> {
582 self.doc.commit_then_renew();
583 let counter = get_counter_end(&self.doc, self.peer());
584 self.inner
585 .try_lock()
586 .unwrap()
587 .record_checkpoint(counter, None);
588 Ok(())
589 }
590
591 #[instrument(skip_all)]
592 pub fn undo(&mut self) -> LoroResult<bool> {
593 self.perform(
594 |x| &mut x.undo_stack,
595 |x| &mut x.redo_stack,
596 UndoOrRedo::Undo,
597 )
598 }
599
600 #[instrument(skip_all)]
601 pub fn redo(&mut self) -> LoroResult<bool> {
602 self.perform(
603 |x| &mut x.redo_stack,
604 |x| &mut x.undo_stack,
605 UndoOrRedo::Redo,
606 )
607 }
608
609 fn perform(
610 &mut self,
611 get_stack: impl Fn(&mut UndoManagerInner) -> &mut Stack,
612 get_opposite: impl Fn(&mut UndoManagerInner) -> &mut Stack,
613 kind: UndoOrRedo,
614 ) -> LoroResult<bool> {
615 let doc = &self.doc.clone();
616 self.record_new_checkpoint()?;
670 let end_counter = get_counter_end(doc, self.peer());
671 let mut top = {
672 let mut inner = self.inner.try_lock().unwrap();
673 inner.processing_undo = true;
674 get_stack(&mut inner).pop()
675 };
676
677 let mut executed = false;
678 while let Some((mut span, remote_diff)) = top {
679 let mut next_push_selection = None;
680 {
681 let inner = self.inner.clone();
682 let remote_change_clone = remote_diff.try_lock().unwrap().clone();
684 let commit = doc.undo_internal(
685 IdSpan {
686 peer: self.peer(),
687 counter: span.span,
688 },
689 &mut self.container_remap.try_lock().unwrap(),
690 Some(&remote_change_clone),
691 &mut |diff| {
692 info_span!("transform remote diff").in_scope(|| {
693 let mut inner = inner.try_lock().unwrap();
694 get_stack(&mut inner).transform_based_on_this_delta(diff);
696 });
697 },
698 )?;
699 drop(commit);
700 let mut inner = self.inner.try_lock().unwrap();
701 if let Some(x) = inner.on_pop.as_ref() {
702 for cursor in span.meta.cursors.iter_mut() {
703 transform_cursor(
707 cursor,
708 &remote_diff.try_lock().unwrap(),
709 doc,
710 &self.container_remap.try_lock().unwrap(),
711 );
712 }
713
714 x(kind, span.span, span.meta.clone());
715 let take = inner.last_popped_selection.take();
716 next_push_selection = take;
717 inner.last_popped_selection = Some(span.meta.cursors);
718 }
719 }
720 let new_counter = get_counter_end(doc, self.peer());
721 if end_counter != new_counter {
722 let mut inner = self.inner.try_lock().unwrap();
723 let mut meta = inner
724 .on_push
725 .as_ref()
726 .map(|x| {
727 x(
728 kind.opposite(),
729 CounterSpan::new(end_counter, new_counter),
730 None,
731 )
732 })
733 .unwrap_or_default();
734 if matches!(kind, UndoOrRedo::Undo) && get_opposite(&mut inner).is_empty() {
735 } else if let Some(inner) = next_push_selection.take() {
737 meta.cursors = inner;
739 }
740
741 get_opposite(&mut inner).push(CounterSpan::new(end_counter, new_counter), meta);
742 inner.next_counter = Some(new_counter);
743 executed = true;
744 break;
745 } else {
746 top = get_stack(&mut self.inner.try_lock().unwrap()).pop();
748 continue;
749 }
750 }
751
752 self.inner.try_lock().unwrap().processing_undo = false;
753 Ok(executed)
754 }
755
756 pub fn can_undo(&self) -> bool {
757 !self.inner.try_lock().unwrap().undo_stack.is_empty()
758 }
759
760 pub fn can_redo(&self) -> bool {
761 !self.inner.try_lock().unwrap().redo_stack.is_empty()
762 }
763
764 pub fn set_on_push(&self, on_push: Option<OnPush>) {
765 self.inner.try_lock().unwrap().on_push = on_push;
766 }
767
768 pub fn set_on_pop(&self, on_pop: Option<OnPop>) {
769 self.inner.try_lock().unwrap().on_pop = on_pop;
770 }
771
772 pub fn clear(&self) {
773 self.inner.try_lock().unwrap().undo_stack.clear();
774 self.inner.try_lock().unwrap().redo_stack.clear();
775 }
776}
777
778pub(crate) fn undo(
792 spans: Vec<(IdSpan, Frontiers)>,
793 last_frontiers_or_last_bi: Either<&Frontiers, &DiffBatch>,
794 calc_diff: impl Fn(&Frontiers, &Frontiers) -> DiffBatch,
795 on_last_event_a: &mut dyn FnMut(&DiffBatch),
796) -> DiffBatch {
797 let mut last_ci: Option<DiffBatch> = None;
815 for i in 0..spans.len() {
816 debug_span!("Undo", ?i, "Undo span {:?}", &spans[i]).in_scope(|| {
817 let (this_id_span, this_deps) = &spans[i];
818 let mut event_a_i = debug_span!("1. Calc event A_i").in_scope(|| {
822 calc_diff(&this_id_span.id_last().into(), this_deps)
824 });
825
826 let stack_diff_batch;
832 let event_b_i = 'block: {
833 let next = if i + 1 < spans.len() {
834 spans[i + 1].0.id_last().into()
835 } else {
836 match last_frontiers_or_last_bi {
837 Either::Left(last_frontiers) => last_frontiers.clone(),
838 Either::Right(right) => break 'block right,
839 }
840 };
841 stack_diff_batch = Some(calc_diff(&this_id_span.id_last().into(), &next));
842 stack_diff_batch.as_ref().unwrap()
843 };
844
845 let mut event_a_prime = if let Some(mut last_ci) = last_ci.take() {
849 last_ci.transform(&event_a_i, true);
853
854 event_a_i.compose(&last_ci);
855 event_a_i
856 } else {
857 event_a_i
858 };
859 if i == spans.len() - 1 {
860 on_last_event_a(&event_a_prime);
861 }
862 event_a_prime.transform(event_b_i, true);
866
867 let c_i = event_a_prime;
870 last_ci = Some(c_i);
871 });
872 }
873
874 last_ci.unwrap()
875}