loro_internal/
op.rs

1use crate::{
2    change::{Change, Lamport, Timestamp},
3    container::{idx::ContainerIdx, ContainerID},
4    estimated_size::EstimatedSize,
5    id::{Counter, PeerID, ID},
6    oplog::BlockChangeRef,
7    span::{HasCounter, HasId, HasLamport},
8};
9use crate::{delta::DeltaValue, LoroValue};
10use either::Either;
11use enum_as_inner::EnumAsInner;
12use loro_common::{CompactIdLp, ContainerType, CounterSpan, IdFull, IdLp, IdSpan};
13use rle::{HasIndex, HasLength, Mergable, Sliceable};
14use serde::{Deserialize, Serialize};
15use std::{borrow::Cow, ops::Range};
16
17mod content;
18pub use content::*;
19
20/// Operation is a unit of change.
21///
22/// A Op may have multiple atomic operations, since Op can be merged.
23#[derive(Debug, Clone)]
24pub struct Op {
25    pub(crate) counter: Counter,
26    pub(crate) container: ContainerIdx,
27    pub(crate) content: InnerContent,
28}
29
30impl EstimatedSize for Op {
31    fn estimate_storage_size(&self) -> usize {
32        // TODO: use benchmark to get the optimal estimated size for each container type
33        self.content
34            .estimate_storage_size(self.container.get_type())
35    }
36}
37
38#[derive(Debug, Clone)]
39pub(crate) struct OpWithId {
40    pub peer: PeerID,
41    pub op: Op,
42}
43
44impl OpWithId {
45    #[allow(unused)]
46    pub fn id_span(&self) -> IdSpan {
47        IdSpan::new(
48            self.peer,
49            self.op.counter,
50            self.op.counter + self.op.atom_len() as Counter,
51        )
52    }
53}
54
55#[derive(Debug, Clone, PartialEq)]
56#[cfg_attr(feature = "wasm", derive(Serialize, Deserialize))]
57pub struct RemoteOp<'a> {
58    pub(crate) counter: Counter,
59    pub(crate) container: ContainerID,
60    pub(crate) content: RawOpContent<'a>,
61}
62
63/// This is used to propagate messages between inner module.
64/// It's a temporary struct, and will be converted to Op when it's persisted.
65#[derive(Debug, Clone)]
66pub struct RawOp<'a> {
67    pub id: ID,
68    pub lamport: Lamport,
69    pub container: ContainerIdx,
70    pub content: RawOpContent<'a>,
71}
72
73impl RawOp<'_> {
74    pub(crate) fn id_full(&self) -> loro_common::IdFull {
75        IdFull::new(self.id.peer, self.id.counter, self.lamport)
76    }
77
78    pub(crate) fn idlp(&self) -> loro_common::IdLp {
79        IdLp::new(self.id.peer, self.lamport)
80    }
81}
82
83/// RichOp includes lamport and timestamp info, which is used for conflict resolution.
84#[derive(Debug, Clone)]
85pub struct RichOp<'a> {
86    op: Cow<'a, Op>,
87    pub peer: PeerID,
88    lamport: Lamport,
89    pub timestamp: Timestamp,
90    pub start: usize,
91    pub end: usize,
92}
93
94impl Op {
95    #[inline]
96    #[allow(unused)]
97    pub(crate) fn new(id: ID, content: InnerContent, container: ContainerIdx) -> Self {
98        Op {
99            counter: id.counter,
100            content,
101            container,
102        }
103    }
104
105    /// If the estimated storage size of the content is greater than the given size,
106    /// return the length of the content that makes the estimated storage size equal to the given size.
107    /// Otherwise, return None.
108    pub(crate) fn check_whether_slice_content_to_fit_in_size(&self, size: usize) -> Option<usize> {
109        if self.estimate_storage_size() <= size {
110            return None;
111        }
112
113        match &self.content {
114            InnerContent::List(l) => match l {
115                crate::container::list::list_op::InnerListOp::Insert { .. } => {
116                    if matches!(self.container.get_type(), ContainerType::Text) {
117                        Some(size.min(self.atom_len()))
118                    } else {
119                        Some((size / 4).min(self.atom_len()))
120                    }
121                }
122                crate::container::list::list_op::InnerListOp::InsertText { .. } => {
123                    Some(size.min(self.atom_len()))
124                }
125                _ => unreachable!(),
126            },
127            _ => unreachable!(),
128        }
129    }
130}
131
132impl RemoteOp<'_> {
133    #[allow(unused)]
134    pub(crate) fn into_static(self) -> RemoteOp<'static> {
135        RemoteOp {
136            counter: self.counter,
137            container: self.container,
138            content: self.content.to_static(),
139        }
140    }
141}
142
143impl Mergable for Op {
144    fn is_mergable(&self, other: &Self, cfg: &()) -> bool {
145        self.counter + self.content_len() as Counter == other.counter
146            && self.container == other.container
147            && self.content.is_mergable(&other.content, cfg)
148    }
149
150    fn merge(&mut self, other: &Self, cfg: &()) {
151        self.content.merge(&other.content, cfg)
152    }
153}
154
155impl HasLength for Op {
156    fn content_len(&self) -> usize {
157        self.content.content_len()
158    }
159}
160
161impl Sliceable for Op {
162    fn slice(&self, from: usize, to: usize) -> Self {
163        assert!(to > from, "{to} should be greater than {from}");
164        assert!(to <= self.atom_len());
165        let content: InnerContent = self.content.slice(from, to);
166        Op {
167            counter: (self.counter + from as Counter),
168            content,
169            container: self.container,
170        }
171    }
172}
173
174impl Mergable for RemoteOp<'_> {
175    fn is_mergable(&self, _other: &Self, _cfg: &()) -> bool {
176        // don't merge remote op, because it's already merged.
177        false
178    }
179
180    fn merge(&mut self, _other: &Self, _: &()) {
181        unreachable!()
182    }
183}
184
185impl HasLength for RemoteOp<'_> {
186    fn content_len(&self) -> usize {
187        self.content.atom_len()
188    }
189}
190
191impl HasIndex for Op {
192    type Int = Counter;
193
194    fn get_start_index(&self) -> Self::Int {
195        self.counter
196    }
197}
198
199impl HasIndex for RemoteOp<'_> {
200    type Int = Counter;
201
202    fn get_start_index(&self) -> Self::Int {
203        self.counter
204    }
205}
206
207impl HasCounter for Op {
208    fn ctr_start(&self) -> Counter {
209        self.counter
210    }
211}
212
213impl HasCounter for RemoteOp<'_> {
214    fn ctr_start(&self) -> Counter {
215        self.counter
216    }
217}
218
219impl HasId for RichOp<'_> {
220    fn id_start(&self) -> ID {
221        ID {
222            peer: self.peer,
223            counter: self.op.counter + self.start as Counter,
224        }
225    }
226}
227
228impl HasLength for RichOp<'_> {
229    fn content_len(&self) -> usize {
230        self.end - self.start
231    }
232}
233
234impl HasLamport for RichOp<'_> {
235    fn lamport(&self) -> Lamport {
236        self.lamport + self.start as Lamport
237    }
238}
239
240impl<'a> RichOp<'a> {
241    pub fn new_by_change(change: &Change<Op>, op: &'a Op) -> Self {
242        let diff = op.counter - change.id.counter;
243        RichOp {
244            op: Cow::Borrowed(op),
245            peer: change.id.peer,
246            lamport: change.lamport + diff as Lamport,
247            timestamp: change.timestamp,
248            start: 0,
249            end: op.atom_len(),
250        }
251    }
252
253    /// we want the overlap part of the op and change[start..end]
254    ///
255    /// op is contained in the change, but it's not necessary overlap with change[start..end]
256    pub fn new_by_slice_on_change(change: &Change<Op>, start: i32, end: i32, op: &'a Op) -> Self {
257        debug_assert!(end > start);
258        let op_index_in_change = op.counter - change.id.counter;
259        let op_slice_start = (start - op_index_in_change).clamp(0, op.atom_len() as i32);
260        let op_slice_end = (end - op_index_in_change).clamp(0, op.atom_len() as i32);
261        RichOp {
262            op: Cow::Borrowed(op),
263            peer: change.id.peer,
264            lamport: change.lamport + op_index_in_change as Lamport,
265            timestamp: change.timestamp,
266            start: op_slice_start as usize,
267            end: op_slice_end as usize,
268        }
269    }
270
271    pub fn new_by_cnt_range(change: &Change<Op>, span: CounterSpan, op: &'a Op) -> Option<Self> {
272        let op_index_in_change = op.counter - change.id.counter;
273        let op_slice_start = (span.start - op.counter).clamp(0, op.atom_len() as i32);
274        let op_slice_end = (span.end - op.counter).clamp(0, op.atom_len() as i32);
275        if op_slice_start == op_slice_end {
276            return None;
277        }
278        Some(RichOp {
279            op: Cow::Borrowed(op),
280            peer: change.id.peer,
281            lamport: change.lamport + op_index_in_change as Lamport,
282            timestamp: change.timestamp,
283            start: op_slice_start as usize,
284            end: op_slice_end as usize,
285        })
286    }
287
288    pub(crate) fn new_iter_by_cnt_range(
289        change: BlockChangeRef,
290        span: CounterSpan,
291    ) -> RichOpBlockIter {
292        RichOpBlockIter {
293            change,
294            span,
295            op_index: 0,
296        }
297    }
298
299    pub fn op(&self) -> Cow<'_, Op> {
300        if self.start == 0 && self.end == self.op.content_len() {
301            self.op.clone()
302        } else {
303            Cow::Owned(self.op.slice(self.start, self.end))
304        }
305    }
306
307    pub fn raw_op(&self) -> &Op {
308        &self.op
309    }
310
311    pub fn client_id(&self) -> u64 {
312        self.peer
313    }
314
315    pub fn container(&self) -> ContainerIdx {
316        self.op.container
317    }
318
319    pub fn timestamp(&self) -> i64 {
320        self.timestamp
321    }
322
323    pub fn start(&self) -> usize {
324        self.start
325    }
326
327    pub fn end(&self) -> usize {
328        self.end
329    }
330
331    pub fn counter(&self) -> Counter {
332        self.op.counter + self.start as Counter
333    }
334
335    #[allow(unused)]
336    pub(crate) fn id(&self) -> ID {
337        ID {
338            peer: self.peer,
339            counter: self.op.counter + self.start as Counter,
340        }
341    }
342
343    pub(crate) fn id_full(&self) -> IdFull {
344        IdFull::new(
345            self.peer,
346            self.op.counter + self.start as Counter,
347            self.lamport + self.start as Lamport,
348        )
349    }
350
351    pub fn idlp(&self) -> IdLp {
352        IdLp {
353            lamport: self.lamport + self.start as Lamport,
354            peer: self.peer,
355        }
356    }
357}
358
359pub(crate) struct RichOpBlockIter {
360    change: BlockChangeRef,
361    span: CounterSpan,
362    op_index: usize,
363}
364
365impl Iterator for RichOpBlockIter {
366    type Item = RichOp<'static>;
367
368    fn next(&mut self) -> Option<Self::Item> {
369        let op = self.change.ops.get(self.op_index)?.clone();
370        let op_offset_in_change = op.counter - self.change.id.counter;
371        let op_slice_start = (self.span.start - op.counter).clamp(0, op.atom_len() as i32);
372        let op_slice_end = (self.span.end - op.counter).clamp(0, op.atom_len() as i32);
373        self.op_index += 1;
374        if op_slice_start == op_slice_end {
375            return self.next();
376        }
377
378        Some(RichOp {
379            op: Cow::Owned(op),
380            peer: self.change.id.peer,
381            lamport: self.change.lamport + op_offset_in_change as Lamport,
382            timestamp: self.change.timestamp,
383            start: op_slice_start as usize,
384            end: op_slice_end as usize,
385        })
386    }
387}
388
389// Note: It will be encoded into binary format, so the order of its fields should not be changed.
390#[derive(PartialEq, Debug, EnumAsInner, Clone, Serialize, Deserialize)]
391pub enum ListSlice<'a> {
392    RawData(Cow<'a, [LoroValue]>),
393    RawStr {
394        str: Cow<'a, str>,
395        unicode_len: usize,
396    },
397}
398
399impl<'a> ListSlice<'a> {
400    pub fn from_borrowed_str(str: &'a str) -> Self {
401        Self::RawStr {
402            str: Cow::Borrowed(str),
403            unicode_len: str.chars().count(),
404        }
405    }
406}
407
408#[repr(transparent)]
409#[derive(PartialEq, Eq, Debug, Clone, Serialize)]
410pub struct SliceRange(pub Range<u32>);
411
412const UNKNOWN_START: u32 = u32::MAX / 2;
413impl SliceRange {
414    #[inline(always)]
415    pub fn is_unknown(&self) -> bool {
416        self.0.start == UNKNOWN_START
417    }
418
419    #[inline(always)]
420    pub fn new_unknown(size: u32) -> Self {
421        Self(UNKNOWN_START..UNKNOWN_START + size)
422    }
423
424    #[inline(always)]
425    pub fn new(range: Range<u32>) -> Self {
426        Self(range)
427    }
428
429    #[inline(always)]
430    pub fn to_range(&self) -> Range<usize> {
431        self.0.start as usize..self.0.end as usize
432    }
433}
434
435impl From<Range<u32>> for SliceRange {
436    fn from(a: Range<u32>) -> Self {
437        SliceRange(a)
438    }
439}
440
441impl HasLength for SliceRange {
442    fn content_len(&self) -> usize {
443        self.0.len()
444    }
445}
446
447impl Sliceable for SliceRange {
448    fn slice(&self, from: usize, to: usize) -> Self {
449        if self.is_unknown() {
450            Self::new_unknown((to - from) as u32)
451        } else {
452            SliceRange(self.0.start + from as u32..self.0.start + to as u32)
453        }
454    }
455}
456
457impl Mergable for SliceRange {
458    fn merge(&mut self, other: &Self, _: &()) {
459        if self.is_unknown() {
460            self.0.end += other.0.end - other.0.start;
461        } else {
462            self.0.end = other.0.end;
463        }
464    }
465
466    fn is_mergable(&self, other: &Self, _conf: &()) -> bool
467    where
468        Self: Sized,
469    {
470        (self.is_unknown() && other.is_unknown()) || self.0.end == other.0.start
471    }
472}
473
474impl ListSlice<'_> {
475    #[inline(always)]
476    pub fn unknown_range(len: usize) -> SliceRange {
477        let start = UNKNOWN_START;
478        let end = len as u32 + UNKNOWN_START;
479        SliceRange(start..end)
480    }
481
482    #[inline(always)]
483    pub fn is_unknown(range: &SliceRange) -> bool {
484        range.is_unknown()
485    }
486
487    pub fn to_static(&self) -> ListSlice<'static> {
488        match self {
489            ListSlice::RawData(x) => ListSlice::RawData(Cow::Owned(x.to_vec())),
490            ListSlice::RawStr { str, unicode_len } => ListSlice::RawStr {
491                str: Cow::Owned(str.to_string()),
492                unicode_len: *unicode_len,
493            },
494        }
495    }
496}
497
498impl HasLength for ListSlice<'_> {
499    fn content_len(&self) -> usize {
500        match self {
501            ListSlice::RawStr { unicode_len, .. } => *unicode_len,
502            ListSlice::RawData(x) => x.len(),
503        }
504    }
505}
506
507impl Sliceable for ListSlice<'_> {
508    fn slice(&self, from: usize, to: usize) -> Self {
509        match self {
510            ListSlice::RawStr {
511                str,
512                unicode_len: _,
513            } => {
514                let ans = str.chars().skip(from).take(to - from).collect::<String>();
515                ListSlice::RawStr {
516                    str: Cow::Owned(ans),
517                    unicode_len: to - from,
518                }
519            }
520            ListSlice::RawData(x) => match x {
521                Cow::Borrowed(x) => ListSlice::RawData(Cow::Borrowed(&x[from..to])),
522                Cow::Owned(x) => ListSlice::RawData(Cow::Owned(x[from..to].into())),
523            },
524        }
525    }
526}
527
528impl Mergable for ListSlice<'_> {
529    fn is_mergable(&self, _other: &Self, _: &()) -> bool {
530        false
531    }
532}
533
534#[derive(Debug, Clone)]
535pub struct SliceWithId {
536    pub values: Either<SliceRange, LoroValue>,
537    /// This field is no-none when diff calculating movable list that need to handle_unknown
538    pub elem_id: Option<CompactIdLp>,
539    pub id: IdFull,
540}
541
542impl DeltaValue for SliceWithId {
543    fn value_extend(&mut self, other: Self) -> Result<(), Self> {
544        if self.id.peer != other.id.peer {
545            return Err(other);
546        }
547
548        if self.id.counter + self.length() as Counter != other.id.counter {
549            return Err(other);
550        }
551
552        if self.id.lamport + self.length() as Lamport != other.id.lamport {
553            return Err(other);
554        }
555
556        match (&mut self.values, &other.values) {
557            (Either::Left(left_range), Either::Left(right_range))
558                if left_range.0.end == right_range.0.start =>
559            {
560                left_range.0.end = right_range.0.end;
561                Ok(())
562            }
563            _ => {
564                // If one is SliceRange and the other is LoroValue, we can't merge
565                Err(other)
566            }
567        }
568    }
569
570    fn take(&mut self, target_len: usize) -> Self {
571        match &mut self.values {
572            Either::Left(range) => {
573                let ans = range.slice(0, target_len);
574                let this = range.slice(target_len, range.atom_len());
575                *range = this;
576                let old_id = self.id;
577                self.id = self.id.inc(target_len as i32);
578                Self {
579                    id: old_id,
580                    values: Either::Left(ans),
581                    elem_id: None,
582                }
583            }
584            Either::Right(_) => unimplemented!(),
585        }
586    }
587
588    fn length(&self) -> usize {
589        match &self.values {
590            Either::Left(r) => r.atom_len(),
591            Either::Right(_) => 1,
592        }
593    }
594}
595
596#[cfg(test)]
597mod test {
598    use crate::LoroValue;
599
600    use super::ListSlice;
601
602    #[test]
603    fn fix_fields_order() {
604        let list_slice = vec![
605            ListSlice::RawData(vec![LoroValue::Bool(true)].into()),
606            ListSlice::RawStr {
607                str: "".into(),
608                unicode_len: 0,
609            },
610        ];
611        let list_slice_buf = vec![2, 0, 1, 1, 1, 1, 0, 0];
612        assert_eq!(
613            &postcard::to_allocvec(&list_slice).unwrap(),
614            &list_slice_buf
615        );
616        assert_eq!(
617            postcard::from_bytes::<Vec<ListSlice>>(&list_slice_buf).unwrap(),
618            list_slice
619        );
620    }
621}