loro_internal/
change.rs

1//! [Change]s are merged ops.
2//!
3//! Every [Change] has deps on other [Change]s. All [Change]s in the document thus form a DAG.
4//!
5//! Note: `dep` can only point to the end of the other [Change]. This is the invariant of [Change]s.
6
7use crate::{
8    dag::DagNode,
9    estimated_size::EstimatedSize,
10    id::{Counter, ID},
11    op::Op,
12    span::{HasId, HasLamport},
13    version::Frontiers,
14};
15use loro_common::{HasCounter, HasCounterSpan, PeerID};
16use num::traits::AsPrimitive;
17use rle::{HasIndex, HasLength, Mergable, RleVec, Sliceable};
18use smallvec::SmallVec;
19
20pub type Timestamp = i64;
21pub type Lamport = u32;
22
23/// A `Change` contains a list of [Op]s.
24///
25/// When undo/redo we should always undo/redo a whole [Change].
26// PERF change slice and getting length is kinda slow I guess
27#[derive(Debug, Clone, PartialEq)]
28pub struct Change<O = Op> {
29    /// id of the first op in the change
30    pub(crate) id: ID,
31    /// Lamport timestamp of the change. It can be calculated from deps
32    pub(crate) lamport: Lamport,
33    pub(crate) deps: Frontiers,
34    /// [Unix time](https://en.wikipedia.org/wiki/Unix_time)
35    /// It is the number of seconds that have elapsed since 00:00:00 UTC on 1 January 1970.
36    pub(crate) timestamp: Timestamp,
37    pub(crate) commit_msg: Option<Arc<str>>,
38    pub(crate) ops: RleVec<[O; 1]>,
39}
40
41pub(crate) struct ChangeRef<'a, O = Op> {
42    pub(crate) id: &'a ID,
43    pub(crate) lamport: &'a Lamport,
44    pub(crate) deps: &'a Frontiers,
45    pub(crate) timestamp: &'a Timestamp,
46    pub(crate) commit_msg: &'a Option<Arc<str>>,
47    pub(crate) ops: &'a RleVec<[O; 1]>,
48}
49
50impl<'a, O> ChangeRef<'a, O> {
51    pub fn from_change(change: &'a Change<O>) -> Self {
52        Self {
53            id: &change.id,
54            lamport: &change.lamport,
55            deps: &change.deps,
56            timestamp: &change.timestamp,
57            commit_msg: &change.commit_msg,
58            ops: &change.ops,
59        }
60    }
61}
62
63impl<O> Change<O> {
64    pub fn new(
65        ops: RleVec<[O; 1]>,
66        deps: Frontiers,
67        id: ID,
68        lamport: Lamport,
69        timestamp: Timestamp,
70    ) -> Self {
71        Change {
72            ops,
73            deps,
74            id,
75            lamport,
76            timestamp,
77            commit_msg: None,
78        }
79    }
80
81    #[inline]
82    pub fn ops(&self) -> &RleVec<[O; 1]> {
83        &self.ops
84    }
85
86    #[inline]
87    pub fn deps(&self) -> &Frontiers {
88        &self.deps
89    }
90
91    #[inline]
92    pub fn peer(&self) -> PeerID {
93        self.id.peer
94    }
95
96    #[inline]
97    pub fn lamport(&self) -> Lamport {
98        self.lamport
99    }
100
101    #[inline]
102    pub fn timestamp(&self) -> Timestamp {
103        self.timestamp
104    }
105
106    #[inline]
107    pub fn id(&self) -> ID {
108        self.id
109    }
110
111    #[inline]
112    pub fn deps_on_self(&self) -> bool {
113        if let Some(id) = self.deps.as_single() {
114            id.peer == self.id.peer
115        } else {
116            false
117        }
118    }
119
120    pub fn message(&self) -> Option<&Arc<str>> {
121        self.commit_msg.as_ref()
122    }
123}
124
125impl<O: EstimatedSize> EstimatedSize for Change<O> {
126    /// Estimate the storage size of the change in bytes
127    #[inline]
128    fn estimate_storage_size(&self) -> usize {
129        let id_size = 2;
130        let lamport_size = 1;
131        let timestamp_size = 1;
132        let deps_size = (self.deps.len().max(1) - 1) * 4;
133        let ops_size = self
134            .ops
135            .iter()
136            .map(|op| op.estimate_storage_size())
137            .sum::<usize>();
138        id_size + lamport_size + timestamp_size + ops_size + deps_size
139    }
140}
141
142impl<O: Mergable + HasLength + HasIndex + Debug> HasIndex for Change<O> {
143    type Int = Counter;
144
145    fn get_start_index(&self) -> Self::Int {
146        self.id.counter
147    }
148}
149
150impl<O> HasId for Change<O> {
151    fn id_start(&self) -> ID {
152        self.id
153    }
154}
155
156impl<O> HasCounter for Change<O> {
157    fn ctr_start(&self) -> Counter {
158        self.id.counter
159    }
160}
161
162impl<O> HasLamport for Change<O> {
163    fn lamport(&self) -> Lamport {
164        self.lamport
165    }
166}
167
168impl<O> Mergable for Change<O> {
169    fn is_mergable(&self, _other: &Self, _conf: &()) -> bool
170    where
171        Self: Sized,
172    {
173        false
174    }
175
176    fn merge(&mut self, _other: &Self, _conf: &())
177    where
178        Self: Sized,
179    {
180        unreachable!()
181    }
182}
183
184impl<O: Mergable + HasLength + HasIndex + Debug> Change<O> {
185    pub fn len(&self) -> usize {
186        self.ops.span().as_()
187    }
188
189    pub fn is_empty(&self) -> bool {
190        self.ops.is_empty()
191    }
192}
193
194use std::{fmt::Debug, sync::Arc};
195impl<O: Mergable + HasLength + HasIndex + Debug> HasLength for Change<O> {
196    fn content_len(&self) -> usize {
197        self.ops.span().as_()
198    }
199}
200
201impl<O: Mergable + HasLength + HasIndex + Sliceable + HasCounter + Debug> Sliceable for Change<O> {
202    // TODO: feels slow, need to confirm whether this affects performance
203    fn slice(&self, from: usize, to: usize) -> Self {
204        assert!(from < to);
205        assert!(to <= self.atom_len());
206        let from_counter = self.id.counter + from as Counter;
207        let to_counter = self.id.counter + to as Counter;
208        let ops = {
209            if from >= to {
210                RleVec::new()
211            } else {
212                let mut ans: SmallVec<[_; 1]> = SmallVec::new();
213                let mut start_index = 0;
214                if self.ops.len() >= 8 {
215                    let result = self
216                        .ops
217                        .binary_search_by(|op| op.ctr_end().cmp(&from_counter));
218                    start_index = match result {
219                        Ok(i) => i,
220                        Err(i) => i,
221                    };
222                }
223
224                for i in start_index..self.ops.len() {
225                    let op = &self.ops[i];
226                    if op.ctr_start() >= to_counter {
227                        break;
228                    }
229                    if op.ctr_end() <= from_counter {
230                        continue;
231                    }
232
233                    let start_offset =
234                        ((from_counter - op.ctr_start()).max(0) as usize).min(op.atom_len());
235                    let end_offset =
236                        ((to_counter - op.ctr_start()).max(0) as usize).min(op.atom_len());
237                    assert_ne!(start_offset, end_offset);
238                    ans.push(op.slice(start_offset, end_offset))
239                }
240
241                RleVec::from(ans)
242            }
243        };
244        assert_eq!(ops.first().unwrap().ctr_start(), from_counter);
245        assert_eq!(ops.last().unwrap().ctr_end(), to_counter);
246        Self {
247            ops,
248            deps: if from > 0 {
249                Frontiers::from_id(self.id.inc(from as Counter - 1))
250            } else {
251                self.deps.clone()
252            },
253            id: self.id.inc(from as Counter),
254            lamport: self.lamport + from as Lamport,
255            timestamp: self.timestamp,
256            commit_msg: self.commit_msg.clone(),
257        }
258    }
259}
260
261impl DagNode for Change {
262    fn deps(&self) -> &Frontiers {
263        &self.deps
264    }
265}
266
267impl Change {
268    pub fn can_merge_right(&self, other: &Self, merge_interval: i64) -> bool {
269        if other.id.peer == self.id.peer
270            && other.id.counter == self.id.counter + self.content_len() as Counter
271            && other.deps.len() == 1
272            && other.deps.as_single().unwrap().peer == self.id.peer
273            && other.timestamp - self.timestamp <= merge_interval
274            && self.commit_msg == other.commit_msg
275        {
276            debug_assert!(other.timestamp >= self.timestamp);
277            debug_assert!(other.lamport == self.lamport + self.len() as Lamport);
278            true
279        } else {
280            false
281        }
282    }
283}
284
285/// [Unix time](https://en.wikipedia.org/wiki/Unix_time)
286/// It is the number of milliseconds that have elapsed since 00:00:00 UTC on 1 January 1970.
287#[cfg(not(target_arch = "wasm32"))]
288pub(crate) fn get_sys_timestamp() -> f64 {
289    use std::time::{SystemTime, UNIX_EPOCH};
290    SystemTime::now()
291        .duration_since(UNIX_EPOCH)
292        .unwrap()
293        .as_millis()
294        .as_()
295}
296
297/// [Unix time](https://en.wikipedia.org/wiki/Unix_time)
298/// It is the number of seconds that have elapsed since 00:00:00 UTC on 1 January 1970.
299#[cfg(target_arch = "wasm32")]
300pub fn get_sys_timestamp() -> f64 {
301    use wasm_bindgen::prelude::wasm_bindgen;
302    #[wasm_bindgen]
303    extern "C" {
304        // Use `js_namespace` here to bind `console.log(..)` instead of just
305        // `log(..)`
306        #[wasm_bindgen(js_namespace = Date)]
307        pub fn now() -> f64;
308    }
309
310    now()
311}
312
313#[cfg(test)]
314mod test {
315    use super::*;
316    #[test]
317    fn size_of_change() {
318        let size = std::mem::size_of::<Change>();
319        println!("{}", size);
320    }
321}