loro_internal/
op.rs

1use crate::{
2    change::{Change, Lamport, Timestamp},
3    container::{idx::ContainerIdx, ContainerID},
4    estimated_size::EstimatedSize,
5    id::{Counter, PeerID, ID},
6    oplog::BlockChangeRef,
7    span::{HasCounter, HasId, HasLamport},
8};
9use crate::{delta::DeltaValue, LoroValue};
10use either::Either;
11use enum_as_inner::EnumAsInner;
12use loro_common::{CompactIdLp, ContainerType, CounterSpan, IdFull, IdLp, IdSpan};
13use rle::{HasIndex, HasLength, Mergable, Sliceable};
14use serde::{Deserialize, Serialize};
15use std::{borrow::Cow, ops::Range};
16
17mod content;
18pub use content::*;
19
20/// Operation is a unit of change.
21///
22/// A Op may have multiple atomic operations, since Op can be merged.
23#[derive(Debug, Clone)]
24pub struct Op {
25    pub(crate) counter: Counter,
26    pub(crate) container: ContainerIdx,
27    pub(crate) content: InnerContent,
28}
29
30impl EstimatedSize for Op {
31    fn estimate_storage_size(&self) -> usize {
32        // TODO: use benchmark to get the optimal estimated size for each container type
33        self.content
34            .estimate_storage_size(self.container.get_type())
35    }
36}
37
38#[derive(Debug, Clone)]
39pub(crate) struct OpWithId {
40    pub peer: PeerID,
41    pub op: Op,
42    pub lamport: Option<Lamport>,
43}
44
45impl OpWithId {
46    pub fn id(&self) -> ID {
47        ID {
48            peer: self.peer,
49            counter: self.op.counter,
50        }
51    }
52
53    pub fn id_full(&self) -> IdFull {
54        IdFull::new(
55            self.peer,
56            self.op.counter,
57            self.lamport.expect("op should already be imported"),
58        )
59    }
60
61    #[allow(unused)]
62    pub fn id_span(&self) -> IdSpan {
63        IdSpan::new(
64            self.peer,
65            self.op.counter,
66            self.op.counter + self.op.atom_len() as Counter,
67        )
68    }
69}
70
71#[derive(Debug, Clone, PartialEq)]
72#[cfg_attr(feature = "wasm", derive(Serialize, Deserialize))]
73pub struct RemoteOp<'a> {
74    pub(crate) counter: Counter,
75    pub(crate) container: ContainerID,
76    pub(crate) content: RawOpContent<'a>,
77}
78
79/// This is used to propagate messages between inner module.
80/// It's a temporary struct, and will be converted to Op when it's persisted.
81#[derive(Debug, Clone)]
82pub struct RawOp<'a> {
83    pub id: ID,
84    pub lamport: Lamport,
85    pub container: ContainerIdx,
86    pub content: RawOpContent<'a>,
87}
88
89impl RawOp<'_> {
90    pub(crate) fn id_full(&self) -> loro_common::IdFull {
91        IdFull::new(self.id.peer, self.id.counter, self.lamport)
92    }
93
94    pub(crate) fn idlp(&self) -> loro_common::IdLp {
95        IdLp::new(self.id.peer, self.lamport)
96    }
97}
98
99/// RichOp includes lamport and timestamp info, which is used for conflict resolution.
100#[derive(Debug, Clone)]
101pub struct RichOp<'a> {
102    op: Cow<'a, Op>,
103    pub peer: PeerID,
104    lamport: Lamport,
105    pub timestamp: Timestamp,
106    pub start: usize,
107    pub end: usize,
108}
109
110impl Op {
111    #[inline]
112    #[allow(unused)]
113    pub(crate) fn new(id: ID, content: InnerContent, container: ContainerIdx) -> Self {
114        Op {
115            counter: id.counter,
116            content,
117            container,
118        }
119    }
120
121    /// If the estimated storage size of the content is greater than the given size,
122    /// return the length of the content that makes the estimated storage size equal to the given size.
123    /// Otherwise, return None.
124    pub(crate) fn check_whether_slice_content_to_fit_in_size(&self, size: usize) -> Option<usize> {
125        if self.estimate_storage_size() <= size {
126            return None;
127        }
128
129        match &self.content {
130            InnerContent::List(l) => match l {
131                crate::container::list::list_op::InnerListOp::Insert { .. } => {
132                    if matches!(self.container.get_type(), ContainerType::Text) {
133                        Some(size.min(self.atom_len()))
134                    } else {
135                        Some((size / 4).min(self.atom_len()))
136                    }
137                }
138                crate::container::list::list_op::InnerListOp::InsertText { .. } => {
139                    Some(size.min(self.atom_len()))
140                }
141                _ => unreachable!(),
142            },
143            _ => unreachable!(),
144        }
145    }
146}
147
148impl RemoteOp<'_> {
149    #[allow(unused)]
150    pub(crate) fn into_static(self) -> RemoteOp<'static> {
151        RemoteOp {
152            counter: self.counter,
153            container: self.container,
154            content: self.content.to_static(),
155        }
156    }
157}
158
159impl Mergable for Op {
160    fn is_mergable(&self, other: &Self, cfg: &()) -> bool {
161        self.counter + self.content_len() as Counter == other.counter
162            && self.container == other.container
163            && self.content.is_mergable(&other.content, cfg)
164    }
165
166    fn merge(&mut self, other: &Self, cfg: &()) {
167        self.content.merge(&other.content, cfg)
168    }
169}
170
171impl HasLength for Op {
172    fn content_len(&self) -> usize {
173        self.content.content_len()
174    }
175}
176
177impl Sliceable for Op {
178    fn slice(&self, from: usize, to: usize) -> Self {
179        assert!(to > from, "{to} should be greater than {from}");
180        assert!(to <= self.atom_len());
181        let content: InnerContent = self.content.slice(from, to);
182        Op {
183            counter: (self.counter + from as Counter),
184            content,
185            container: self.container,
186        }
187    }
188}
189
190impl Mergable for RemoteOp<'_> {
191    fn is_mergable(&self, _other: &Self, _cfg: &()) -> bool {
192        // don't merge remote op, because it's already merged.
193        false
194    }
195
196    fn merge(&mut self, _other: &Self, _: &()) {
197        unreachable!()
198    }
199}
200
201impl HasLength for RemoteOp<'_> {
202    fn content_len(&self) -> usize {
203        self.content.atom_len()
204    }
205}
206
207impl HasIndex for Op {
208    type Int = Counter;
209
210    fn get_start_index(&self) -> Self::Int {
211        self.counter
212    }
213}
214
215impl HasIndex for RemoteOp<'_> {
216    type Int = Counter;
217
218    fn get_start_index(&self) -> Self::Int {
219        self.counter
220    }
221}
222
223impl HasCounter for Op {
224    fn ctr_start(&self) -> Counter {
225        self.counter
226    }
227}
228
229impl HasCounter for RemoteOp<'_> {
230    fn ctr_start(&self) -> Counter {
231        self.counter
232    }
233}
234
235impl HasId for RichOp<'_> {
236    fn id_start(&self) -> ID {
237        ID {
238            peer: self.peer,
239            counter: self.op.counter + self.start as Counter,
240        }
241    }
242}
243
244impl HasLength for RichOp<'_> {
245    fn content_len(&self) -> usize {
246        self.end - self.start
247    }
248}
249
250impl HasLamport for RichOp<'_> {
251    fn lamport(&self) -> Lamport {
252        self.lamport + self.start as Lamport
253    }
254}
255
256impl<'a> RichOp<'a> {
257    pub fn new_by_change(change: &Change<Op>, op: &'a Op) -> Self {
258        let diff = op.counter - change.id.counter;
259        RichOp {
260            op: Cow::Borrowed(op),
261            peer: change.id.peer,
262            lamport: change.lamport + diff as Lamport,
263            timestamp: change.timestamp,
264            start: 0,
265            end: op.atom_len(),
266        }
267    }
268
269    /// we want the overlap part of the op and change[start..end]
270    ///
271    /// op is contained in the change, but it's not necessary overlap with change[start..end]
272    pub fn new_by_slice_on_change(change: &Change<Op>, start: i32, end: i32, op: &'a Op) -> Self {
273        debug_assert!(end > start);
274        let op_index_in_change = op.counter - change.id.counter;
275        let op_slice_start = (start - op_index_in_change).clamp(0, op.atom_len() as i32);
276        let op_slice_end = (end - op_index_in_change).clamp(0, op.atom_len() as i32);
277        RichOp {
278            op: Cow::Borrowed(op),
279            peer: change.id.peer,
280            lamport: change.lamport + op_index_in_change as Lamport,
281            timestamp: change.timestamp,
282            start: op_slice_start as usize,
283            end: op_slice_end as usize,
284        }
285    }
286
287    pub fn new_by_cnt_range(change: &Change<Op>, span: CounterSpan, op: &'a Op) -> Option<Self> {
288        let op_index_in_change = op.counter - change.id.counter;
289        let op_slice_start = (span.start - op.counter).clamp(0, op.atom_len() as i32);
290        let op_slice_end = (span.end - op.counter).clamp(0, op.atom_len() as i32);
291        if op_slice_start == op_slice_end {
292            return None;
293        }
294        Some(RichOp {
295            op: Cow::Borrowed(op),
296            peer: change.id.peer,
297            lamport: change.lamport + op_index_in_change as Lamport,
298            timestamp: change.timestamp,
299            start: op_slice_start as usize,
300            end: op_slice_end as usize,
301        })
302    }
303
304    pub(crate) fn new_iter_by_cnt_range(
305        change: BlockChangeRef,
306        span: CounterSpan,
307    ) -> RichOpBlockIter {
308        RichOpBlockIter {
309            change,
310            span,
311            op_index: 0,
312        }
313    }
314
315    pub fn op(&self) -> Cow<'_, Op> {
316        if self.start == 0 && self.end == self.op.content_len() {
317            self.op.clone()
318        } else {
319            Cow::Owned(self.op.slice(self.start, self.end))
320        }
321    }
322
323    pub fn raw_op(&self) -> &Op {
324        &self.op
325    }
326
327    pub fn client_id(&self) -> u64 {
328        self.peer
329    }
330
331    pub fn container(&self) -> ContainerIdx {
332        self.op.container
333    }
334
335    pub fn timestamp(&self) -> i64 {
336        self.timestamp
337    }
338
339    pub fn start(&self) -> usize {
340        self.start
341    }
342
343    pub fn end(&self) -> usize {
344        self.end
345    }
346
347    pub fn counter(&self) -> Counter {
348        self.op.counter + self.start as Counter
349    }
350
351    #[allow(unused)]
352    pub(crate) fn id(&self) -> ID {
353        ID {
354            peer: self.peer,
355            counter: self.op.counter + self.start as Counter,
356        }
357    }
358
359    pub(crate) fn id_full(&self) -> IdFull {
360        IdFull::new(
361            self.peer,
362            self.op.counter + self.start as Counter,
363            self.lamport + self.start as Lamport,
364        )
365    }
366
367    pub fn idlp(&self) -> IdLp {
368        IdLp {
369            lamport: self.lamport + self.start as Lamport,
370            peer: self.peer,
371        }
372    }
373}
374
375pub(crate) struct RichOpBlockIter {
376    change: BlockChangeRef,
377    span: CounterSpan,
378    op_index: usize,
379}
380
381impl Iterator for RichOpBlockIter {
382    type Item = RichOp<'static>;
383
384    fn next(&mut self) -> Option<Self::Item> {
385        let op = self.change.ops.get(self.op_index)?.clone();
386        let op_offset_in_change = op.counter - self.change.id.counter;
387        let op_slice_start = (self.span.start - op.counter).clamp(0, op.atom_len() as i32);
388        let op_slice_end = (self.span.end - op.counter).clamp(0, op.atom_len() as i32);
389        self.op_index += 1;
390        if op_slice_start == op_slice_end {
391            return self.next();
392        }
393
394        Some(RichOp {
395            op: Cow::Owned(op),
396            peer: self.change.id.peer,
397            lamport: self.change.lamport + op_offset_in_change as Lamport,
398            timestamp: self.change.timestamp,
399            start: op_slice_start as usize,
400            end: op_slice_end as usize,
401        })
402    }
403}
404
405// Note: It will be encoded into binary format, so the order of its fields should not be changed.
406#[derive(PartialEq, Debug, EnumAsInner, Clone, Serialize, Deserialize)]
407pub enum ListSlice<'a> {
408    RawData(Cow<'a, [LoroValue]>),
409    RawStr {
410        str: Cow<'a, str>,
411        unicode_len: usize,
412    },
413}
414
415impl<'a> ListSlice<'a> {
416    pub fn from_borrowed_str(str: &'a str) -> Self {
417        Self::RawStr {
418            str: Cow::Borrowed(str),
419            unicode_len: str.chars().count(),
420        }
421    }
422}
423
424#[repr(transparent)]
425#[derive(PartialEq, Eq, Debug, Clone, Serialize)]
426pub struct SliceRange(pub Range<u32>);
427
428const UNKNOWN_START: u32 = u32::MAX / 2;
429impl SliceRange {
430    #[inline(always)]
431    pub fn is_unknown(&self) -> bool {
432        self.0.start == UNKNOWN_START
433    }
434
435    #[inline(always)]
436    pub fn new_unknown(size: u32) -> Self {
437        Self(UNKNOWN_START..UNKNOWN_START + size)
438    }
439
440    #[inline(always)]
441    pub fn new(range: Range<u32>) -> Self {
442        Self(range)
443    }
444
445    #[inline(always)]
446    pub fn to_range(&self) -> Range<usize> {
447        self.0.start as usize..self.0.end as usize
448    }
449}
450
451impl From<Range<u32>> for SliceRange {
452    fn from(a: Range<u32>) -> Self {
453        SliceRange(a)
454    }
455}
456
457impl HasLength for SliceRange {
458    fn content_len(&self) -> usize {
459        self.0.len()
460    }
461}
462
463impl Sliceable for SliceRange {
464    fn slice(&self, from: usize, to: usize) -> Self {
465        if self.is_unknown() {
466            Self::new_unknown((to - from) as u32)
467        } else {
468            SliceRange(self.0.start + from as u32..self.0.start + to as u32)
469        }
470    }
471}
472
473impl Mergable for SliceRange {
474    fn merge(&mut self, other: &Self, _: &()) {
475        if self.is_unknown() {
476            self.0.end += other.0.end - other.0.start;
477        } else {
478            self.0.end = other.0.end;
479        }
480    }
481
482    fn is_mergable(&self, other: &Self, _conf: &()) -> bool
483    where
484        Self: Sized,
485    {
486        (self.is_unknown() && other.is_unknown()) || self.0.end == other.0.start
487    }
488}
489
490impl ListSlice<'_> {
491    #[inline(always)]
492    pub fn unknown_range(len: usize) -> SliceRange {
493        let start = UNKNOWN_START;
494        let end = len as u32 + UNKNOWN_START;
495        SliceRange(start..end)
496    }
497
498    #[inline(always)]
499    pub fn is_unknown(range: &SliceRange) -> bool {
500        range.is_unknown()
501    }
502
503    pub fn to_static(&self) -> ListSlice<'static> {
504        match self {
505            ListSlice::RawData(x) => ListSlice::RawData(Cow::Owned(x.to_vec())),
506            ListSlice::RawStr { str, unicode_len } => ListSlice::RawStr {
507                str: Cow::Owned(str.to_string()),
508                unicode_len: *unicode_len,
509            },
510        }
511    }
512}
513
514impl HasLength for ListSlice<'_> {
515    fn content_len(&self) -> usize {
516        match self {
517            ListSlice::RawStr { unicode_len, .. } => *unicode_len,
518            ListSlice::RawData(x) => x.len(),
519        }
520    }
521}
522
523impl Sliceable for ListSlice<'_> {
524    fn slice(&self, from: usize, to: usize) -> Self {
525        match self {
526            ListSlice::RawStr {
527                str,
528                unicode_len: _,
529            } => {
530                let ans = str.chars().skip(from).take(to - from).collect::<String>();
531                ListSlice::RawStr {
532                    str: Cow::Owned(ans),
533                    unicode_len: to - from,
534                }
535            }
536            ListSlice::RawData(x) => match x {
537                Cow::Borrowed(x) => ListSlice::RawData(Cow::Borrowed(&x[from..to])),
538                Cow::Owned(x) => ListSlice::RawData(Cow::Owned(x[from..to].into())),
539            },
540        }
541    }
542}
543
544impl Mergable for ListSlice<'_> {
545    fn is_mergable(&self, _other: &Self, _: &()) -> bool {
546        false
547    }
548}
549
550#[derive(Debug, Clone)]
551pub struct SliceWithId {
552    pub values: Either<SliceRange, LoroValue>,
553    /// This field is no-none when diff calculating movable list that need to handle_unknown
554    pub elem_id: Option<CompactIdLp>,
555    pub id: IdFull,
556}
557
558impl DeltaValue for SliceWithId {
559    fn value_extend(&mut self, other: Self) -> Result<(), Self> {
560        if self.id.peer != other.id.peer {
561            return Err(other);
562        }
563
564        if self.id.counter + self.length() as Counter != other.id.counter {
565            return Err(other);
566        }
567
568        if self.id.lamport + self.length() as Lamport != other.id.lamport {
569            return Err(other);
570        }
571
572        match (&mut self.values, &other.values) {
573            (Either::Left(left_range), Either::Left(right_range))
574                if left_range.0.end == right_range.0.start =>
575            {
576                left_range.0.end = right_range.0.end;
577                Ok(())
578            }
579            _ => {
580                // If one is SliceRange and the other is LoroValue, we can't merge
581                Err(other)
582            }
583        }
584    }
585
586    fn take(&mut self, target_len: usize) -> Self {
587        match &mut self.values {
588            Either::Left(range) => {
589                let ans = range.slice(0, target_len);
590                let this = range.slice(target_len, range.atom_len());
591                *range = this;
592                let old_id = self.id;
593                self.id = self.id.inc(target_len as i32);
594                Self {
595                    id: old_id,
596                    values: Either::Left(ans),
597                    elem_id: None,
598                }
599            }
600            Either::Right(_) => unimplemented!(),
601        }
602    }
603
604    fn length(&self) -> usize {
605        match &self.values {
606            Either::Left(r) => r.atom_len(),
607            Either::Right(_) => 1,
608        }
609    }
610}
611
612#[cfg(test)]
613mod test {
614    use crate::LoroValue;
615
616    use super::ListSlice;
617
618    #[test]
619    fn fix_fields_order() {
620        let list_slice = vec![
621            ListSlice::RawData(vec![LoroValue::Bool(true)].into()),
622            ListSlice::RawStr {
623                str: "".into(),
624                unicode_len: 0,
625            },
626        ];
627        let list_slice_buf = vec![2, 0, 1, 1, 1, 1, 0, 0];
628        assert_eq!(
629            &postcard::to_allocvec(&list_slice).unwrap(),
630            &list_slice_buf
631        );
632        assert_eq!(
633            postcard::from_bytes::<Vec<ListSlice>>(&list_slice_buf).unwrap(),
634            list_slice
635        );
636    }
637}