1use crate::{
8 dag::DagNode,
9 estimated_size::EstimatedSize,
10 id::{Counter, ID},
11 op::Op,
12 span::{HasId, HasLamport},
13 version::Frontiers,
14};
15use loro_common::{HasCounter, HasCounterSpan, PeerID};
16use num::traits::AsPrimitive;
17use rle::{HasIndex, HasLength, Mergable, RleVec, Sliceable};
18use smallvec::SmallVec;
19
20pub type Timestamp = i64;
21pub type Lamport = u32;
22
23#[derive(Debug, Clone, PartialEq)]
28pub struct Change<O = Op> {
29 pub(crate) id: ID,
31 pub(crate) lamport: Lamport,
33 pub(crate) deps: Frontiers,
34 pub(crate) timestamp: Timestamp,
37 pub(crate) commit_msg: Option<Arc<str>>,
38 pub(crate) ops: RleVec<[O; 1]>,
39}
40
41pub(crate) struct ChangeRef<'a, O = Op> {
42 pub(crate) id: &'a ID,
43 pub(crate) lamport: &'a Lamport,
44 pub(crate) deps: &'a Frontiers,
45 pub(crate) timestamp: &'a Timestamp,
46 pub(crate) commit_msg: &'a Option<Arc<str>>,
47 pub(crate) ops: &'a RleVec<[O; 1]>,
48}
49
50impl<'a, O> ChangeRef<'a, O> {
51 pub fn from_change(change: &'a Change<O>) -> Self {
52 Self {
53 id: &change.id,
54 lamport: &change.lamport,
55 deps: &change.deps,
56 timestamp: &change.timestamp,
57 commit_msg: &change.commit_msg,
58 ops: &change.ops,
59 }
60 }
61}
62
63impl<O> Change<O> {
64 pub fn new(
65 ops: RleVec<[O; 1]>,
66 deps: Frontiers,
67 id: ID,
68 lamport: Lamport,
69 timestamp: Timestamp,
70 ) -> Self {
71 Change {
72 ops,
73 deps,
74 id,
75 lamport,
76 timestamp,
77 commit_msg: None,
78 }
79 }
80
81 #[inline]
82 pub fn ops(&self) -> &RleVec<[O; 1]> {
83 &self.ops
84 }
85
86 #[inline]
87 pub fn deps(&self) -> &Frontiers {
88 &self.deps
89 }
90
91 #[inline]
92 pub fn peer(&self) -> PeerID {
93 self.id.peer
94 }
95
96 #[inline]
97 pub fn lamport(&self) -> Lamport {
98 self.lamport
99 }
100
101 #[inline]
102 pub fn timestamp(&self) -> Timestamp {
103 self.timestamp
104 }
105
106 #[inline]
107 pub fn id(&self) -> ID {
108 self.id
109 }
110
111 #[inline]
112 pub fn deps_on_self(&self) -> bool {
113 if let Some(id) = self.deps.as_single() {
114 id.peer == self.id.peer
115 } else {
116 false
117 }
118 }
119
120 pub fn message(&self) -> Option<&Arc<str>> {
121 self.commit_msg.as_ref()
122 }
123}
124
125impl<O: EstimatedSize> EstimatedSize for Change<O> {
126 #[inline]
128 fn estimate_storage_size(&self) -> usize {
129 let id_size = 2;
130 let lamport_size = 1;
131 let timestamp_size = 1;
132 let deps_size = (self.deps.len().max(1) - 1) * 4;
133 let ops_size = self
134 .ops
135 .iter()
136 .map(|op| op.estimate_storage_size())
137 .sum::<usize>();
138 id_size + lamport_size + timestamp_size + ops_size + deps_size
139 }
140}
141
142impl<O: Mergable + HasLength + HasIndex + Debug> HasIndex for Change<O> {
143 type Int = Counter;
144
145 fn get_start_index(&self) -> Self::Int {
146 self.id.counter
147 }
148}
149
150impl<O> HasId for Change<O> {
151 fn id_start(&self) -> ID {
152 self.id
153 }
154}
155
156impl<O> HasCounter for Change<O> {
157 fn ctr_start(&self) -> Counter {
158 self.id.counter
159 }
160}
161
162impl<O> HasLamport for Change<O> {
163 fn lamport(&self) -> Lamport {
164 self.lamport
165 }
166}
167
168impl<O> Mergable for Change<O> {
169 fn is_mergable(&self, _other: &Self, _conf: &()) -> bool
170 where
171 Self: Sized,
172 {
173 false
174 }
175
176 fn merge(&mut self, _other: &Self, _conf: &())
177 where
178 Self: Sized,
179 {
180 unreachable!()
181 }
182}
183
184impl<O: Mergable + HasLength + HasIndex + Debug> Change<O> {
185 pub fn len(&self) -> usize {
186 self.ops.span().as_()
187 }
188
189 pub fn is_empty(&self) -> bool {
190 self.ops.is_empty()
191 }
192}
193
194use std::{fmt::Debug, sync::Arc};
195impl<O: Mergable + HasLength + HasIndex + Debug> HasLength for Change<O> {
196 fn content_len(&self) -> usize {
197 self.ops.span().as_()
198 }
199}
200
201impl<O: Mergable + HasLength + HasIndex + Sliceable + HasCounter + Debug> Sliceable for Change<O> {
202 fn slice(&self, from: usize, to: usize) -> Self {
204 assert!(from < to);
205 assert!(to <= self.atom_len());
206 let from_counter = self.id.counter + from as Counter;
207 let to_counter = self.id.counter + to as Counter;
208 let ops = {
209 if from >= to {
210 RleVec::new()
211 } else {
212 let mut ans: SmallVec<[_; 1]> = SmallVec::new();
213 let mut start_index = 0;
214 if self.ops.len() >= 8 {
215 let result = self
216 .ops
217 .binary_search_by(|op| op.ctr_end().cmp(&from_counter));
218 start_index = match result {
219 Ok(i) => i,
220 Err(i) => i,
221 };
222 }
223
224 for i in start_index..self.ops.len() {
225 let op = &self.ops[i];
226 if op.ctr_start() >= to_counter {
227 break;
228 }
229 if op.ctr_end() <= from_counter {
230 continue;
231 }
232
233 let start_offset =
234 ((from_counter - op.ctr_start()).max(0) as usize).min(op.atom_len());
235 let end_offset =
236 ((to_counter - op.ctr_start()).max(0) as usize).min(op.atom_len());
237 assert_ne!(start_offset, end_offset);
238 ans.push(op.slice(start_offset, end_offset))
239 }
240
241 RleVec::from(ans)
242 }
243 };
244 assert_eq!(ops.first().unwrap().ctr_start(), from_counter);
245 assert_eq!(ops.last().unwrap().ctr_end(), to_counter);
246 Self {
247 ops,
248 deps: if from > 0 {
249 Frontiers::from_id(self.id.inc(from as Counter - 1))
250 } else {
251 self.deps.clone()
252 },
253 id: self.id.inc(from as Counter),
254 lamport: self.lamport + from as Lamport,
255 timestamp: self.timestamp,
256 commit_msg: self.commit_msg.clone(),
257 }
258 }
259}
260
261impl DagNode for Change {
262 fn deps(&self) -> &Frontiers {
263 &self.deps
264 }
265}
266
267impl Change {
268 pub fn can_merge_right(&self, other: &Self, merge_interval: i64) -> bool {
269 if other.id.peer == self.id.peer
270 && other.id.counter == self.id.counter + self.content_len() as Counter
271 && other.deps.len() == 1
272 && other.deps.as_single().unwrap().peer == self.id.peer
273 && other.timestamp - self.timestamp <= merge_interval
274 && self.commit_msg == other.commit_msg
275 {
276 debug_assert!(other.timestamp >= self.timestamp);
277 debug_assert!(other.lamport == self.lamport + self.len() as Lamport);
278 true
279 } else {
280 false
281 }
282 }
283}
284
285#[cfg(not(target_arch = "wasm32"))]
288pub(crate) fn get_sys_timestamp() -> f64 {
289 use std::time::{SystemTime, UNIX_EPOCH};
290 SystemTime::now()
291 .duration_since(UNIX_EPOCH)
292 .unwrap()
293 .as_millis()
294 .as_()
295}
296
297#[cfg(target_arch = "wasm32")]
300pub fn get_sys_timestamp() -> f64 {
301 use wasm_bindgen::prelude::wasm_bindgen;
302 #[wasm_bindgen]
303 extern "C" {
304 #[wasm_bindgen(js_namespace = Date)]
307 pub fn now() -> f64;
308 }
309
310 now()
311}
312
313#[cfg(test)]
314mod test {
315 use super::*;
316 #[test]
317 fn size_of_change() {
318 let size = std::mem::size_of::<Change>();
319 println!("{}", size);
320 }
321}