automerge/storage/
change.rs

1use std::{borrow::Cow, io::Write, marker::PhantomData, num::NonZeroU64, ops::Range};
2
3use crate::{convert, ActorId, ChangeHash, ScalarValue};
4
5use super::{parse, shift_range, CheckSum, ChunkType, Columns, Header, RawColumns};
6
7mod change_op_columns;
8pub(crate) use change_op_columns::ChangeOpsColumns;
9pub(crate) use change_op_columns::{ChangeOp, ReadChangeOpError};
10
11mod change_actors;
12pub(crate) use change_actors::PredOutOfOrder;
13mod compressed;
14mod op_with_change_actors;
15pub(crate) use compressed::Compressed;
16
17pub(crate) const DEFLATE_MIN_SIZE: usize = 256;
18
19/// Changes present an iterator over the operations encoded in them. Before we have read these
20/// changes we don't know if they are valid, so we expose an iterator with items which are
21/// `Result`s. However, frequently we know that the changes are valid, this trait is used as a
22/// witness that we have verified the operations in a change so we can expose an iterator which
23/// does not return `Results`
24pub(crate) trait OpReadState {}
25#[derive(Debug, Clone, PartialEq)]
26pub(crate) struct Verified;
27#[derive(Debug, Clone, PartialEq)]
28pub(crate) struct Unverified;
29impl OpReadState for Verified {}
30impl OpReadState for Unverified {}
31
32/// A `Change` is the result of parsing a change chunk as specified in [1]
33///
34/// The type parameter to this type represents whether or not operation have been "verified".
35/// Operations in a change chunk are stored in a compressed column oriented storage format. In
36/// general there is no guarantee that this storage is valid. Therefore we use the `OpReadState`
37/// type parameter to distinguish between contexts where we know that the ops are valid and those
38/// where we don't. The `Change::verify_ops` method can be used to obtain a verified `Change` which
39/// can provide an iterator over `ChangeOp`s directly, rather than over `Result<ChangeOp,
40/// ReadChangeOpError>`.
41///
42/// [1]: https://alexjg.github.io/automerge-storage-docs/#change-chunks
43#[derive(Clone, Debug)]
44pub(crate) struct Change<'a, O: OpReadState> {
45    /// The raw bytes of the entire chunk containing this change, including the header.
46    pub(crate) bytes: Cow<'a, [u8]>,
47    pub(crate) header: Header,
48    pub(crate) dependencies: Vec<ChangeHash>,
49    pub(crate) actor: ActorId,
50    pub(crate) other_actors: Vec<ActorId>,
51    pub(crate) seq: u64,
52    pub(crate) start_op: NonZeroU64,
53    pub(crate) timestamp: i64,
54    pub(crate) message: Option<String>,
55    pub(crate) ops_meta: ChangeOpsColumns,
56    /// The range in `Self::bytes` where the ops column data is
57    pub(crate) ops_data: Range<usize>,
58    pub(crate) extra_bytes: Range<usize>,
59    pub(crate) num_ops: usize,
60    pub(crate) _phantom: PhantomData<O>,
61}
62
63impl<O: OpReadState> PartialEq for Change<'_, O> {
64    fn eq(&self, other: &Self) -> bool {
65        self.bytes == other.bytes
66    }
67}
68
69#[derive(thiserror::Error, Debug)]
70pub(crate) enum ParseError {
71    #[error(transparent)]
72    Leb128(#[from] parse::leb128::Error),
73    #[error(transparent)]
74    InvalidUtf8(#[from] parse::InvalidUtf8),
75    #[error("failed to parse change columns: {0}")]
76    RawColumns(#[from] crate::storage::columns::raw_column::ParseError),
77    #[error("failed to parse header: {0}")]
78    Header(#[from] super::chunk::error::Header),
79    #[error("change contained compressed columns")]
80    CompressedChangeCols,
81    #[error("invalid change cols: {0}")]
82    InvalidColumns(Box<dyn std::error::Error + Send + Sync + 'static>),
83}
84
85impl<'a> Change<'a, Unverified> {
86    pub(crate) fn parse(
87        input: parse::Input<'a>,
88    ) -> parse::ParseResult<'a, Change<'a, Unverified>, ParseError> {
89        // TODO(alex): check chunk type
90        let (i, header) = Header::parse(input)?;
91        let parse::Split {
92            first: chunk_input,
93            remaining,
94        } = i.split(header.data_bytes().len());
95        let (_, change) = Self::parse_following_header(chunk_input, header)?;
96        Ok((remaining, change))
97    }
98
99    /// Parse a change chunk. `input` should be the entire chunk, including the header bytes.
100    pub(crate) fn parse_following_header(
101        input: parse::Input<'a>,
102        header: Header,
103    ) -> parse::ParseResult<'a, Change<'a, Unverified>, ParseError> {
104        let (i, deps) = parse::length_prefixed(parse::change_hash)(input)?;
105        let (i, actor) = parse::actor_id(i)?;
106        let (i, seq) = parse::leb128_u64(i)?;
107        let (i, start_op) = parse::nonzero_leb128_u64(i)?;
108        let (i, timestamp) = parse::leb128_i64(i)?;
109        let (i, message_len) = parse::leb128_u64(i)?;
110        let (i, message) = parse::utf_8(message_len as usize, i)?;
111        let (i, other_actors) = parse::length_prefixed(parse::actor_id)(i)?;
112        let (i, ops_meta) = RawColumns::parse(i)?;
113        let (
114            i,
115            parse::RangeOf {
116                range: ops_data, ..
117            },
118        ) = parse::range_of(|i| parse::take_n(ops_meta.total_column_len(), i), i)?;
119
120        let (
121            _i,
122            parse::RangeOf {
123                range: extra_bytes, ..
124            },
125        ) = parse::range_of(parse::take_rest, i)?;
126
127        let ops_meta = ops_meta
128            .uncompressed()
129            .ok_or(parse::ParseError::Error(ParseError::CompressedChangeCols))?;
130
131        let col_layout = Columns::parse2(ops_data.len(), ops_meta.iter())
132            .map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?;
133        let ops_meta = ChangeOpsColumns::try_from(col_layout)
134            .map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?;
135
136        Ok((
137            parse::Input::empty(),
138            Change {
139                bytes: input.bytes().into(),
140                header,
141                dependencies: deps,
142                actor,
143                other_actors,
144                seq,
145                start_op,
146                timestamp,
147                message: if message.is_empty() {
148                    None
149                } else {
150                    Some(message)
151                },
152                ops_meta,
153                ops_data,
154                extra_bytes,
155                num_ops: 0,
156                _phantom: PhantomData,
157            },
158        ))
159    }
160
161    /// Iterate over the ops in this chunk. The iterator will return an error if any of the ops are
162    /// malformed.
163    pub(crate) fn iter_ops(
164        &'a self,
165    ) -> impl Iterator<Item = Result<ChangeOp, ReadChangeOpError>> + Clone + 'a {
166        self.ops_meta.iter(self.ops_data())
167    }
168
169    /// Verify all the ops in this change executing `f` for each one
170    ///
171    /// `f` will be called for each op in this change, allowing callers to collect additional
172    /// information about the ops (e.g. all the actor IDs in the change, or the number of ops)
173    ///
174    /// # Errors
175    /// * If there is an error reading an operation
176    pub(crate) fn verify_ops<F: FnMut(ChangeOp)>(
177        self,
178        mut f: F,
179    ) -> Result<Change<'a, Verified>, ReadChangeOpError> {
180        let mut num_ops = 0;
181        for op in self.iter_ops() {
182            f(op?);
183            num_ops += 1;
184        }
185        if u32::try_from(u64::from(self.start_op)).is_err() {
186            return Err(ReadChangeOpError::CounterTooLarge);
187        }
188        Ok(Change {
189            bytes: self.bytes,
190            header: self.header,
191            dependencies: self.dependencies,
192            actor: self.actor,
193            other_actors: self.other_actors,
194            seq: self.seq,
195            start_op: self.start_op,
196            timestamp: self.timestamp,
197            message: self.message,
198            ops_meta: self.ops_meta,
199            ops_data: self.ops_data,
200            extra_bytes: self.extra_bytes,
201            num_ops,
202            _phantom: PhantomData,
203        })
204    }
205}
206
207impl<'a> Change<'a, Verified> {
208    pub(crate) fn len(&self) -> usize {
209        self.num_ops
210    }
211
212    pub(crate) fn builder() -> ChangeBuilder<Unset, Unset, Unset, Unset> {
213        ChangeBuilder::new()
214    }
215
216    pub(crate) fn iter_ops(&'a self) -> impl Iterator<Item = ChangeOp> + Clone + 'a {
217        // SAFETY: This unwrap is okay because a `Change<'_, Verified>` can only be constructed
218        // using either `verify_ops` or `Builder::build`, so we know the ops columns are valid.
219        self.ops_meta.iter(self.ops_data()).map(|o| o.unwrap())
220    }
221}
222
223impl<O: OpReadState> Change<'_, O> {
224    pub(crate) fn checksum(&self) -> CheckSum {
225        self.header.checksum()
226    }
227
228    pub(crate) fn actor(&self) -> &ActorId {
229        &self.actor
230    }
231    pub(crate) fn other_actors(&self) -> &[ActorId] {
232        &self.other_actors
233    }
234
235    pub(crate) fn start_op(&self) -> NonZeroU64 {
236        self.start_op
237    }
238
239    pub(crate) fn message(&self) -> &Option<String> {
240        &self.message
241    }
242
243    pub(crate) fn dependencies(&self) -> &[ChangeHash] {
244        &self.dependencies
245    }
246
247    pub(crate) fn seq(&self) -> u64 {
248        self.seq
249    }
250
251    pub(crate) fn timestamp(&self) -> i64 {
252        self.timestamp
253    }
254
255    pub(crate) fn extra_bytes(&self) -> &[u8] {
256        &self.bytes[self.extra_bytes.clone()]
257    }
258
259    pub(crate) fn checksum_valid(&self) -> bool {
260        self.header.checksum_valid()
261    }
262
263    pub(crate) fn body_bytes(&self) -> &[u8] {
264        &self.bytes[self.header.len()..]
265    }
266
267    pub(crate) fn bytes(&self) -> &[u8] {
268        &self.bytes
269    }
270
271    pub(crate) fn hash(&self) -> ChangeHash {
272        self.header.hash()
273    }
274
275    pub(crate) fn ops_data(&self) -> &[u8] {
276        &self.bytes[self.ops_data.clone()]
277    }
278
279    pub(crate) fn into_owned(self) -> Change<'static, O> {
280        Change {
281            dependencies: self.dependencies,
282            bytes: Cow::Owned(self.bytes.into_owned()),
283            header: self.header,
284            actor: self.actor,
285            other_actors: self.other_actors,
286            seq: self.seq,
287            start_op: self.start_op,
288            timestamp: self.timestamp,
289            message: self.message,
290            ops_meta: self.ops_meta,
291            ops_data: self.ops_data,
292            num_ops: self.num_ops,
293            extra_bytes: self.extra_bytes,
294            _phantom: PhantomData,
295        }
296    }
297
298    pub(crate) fn compress(&self) -> Option<Compressed<'static>> {
299        if self.bytes.len() > DEFLATE_MIN_SIZE {
300            Some(Compressed::compress(self))
301        } else {
302            None
303        }
304    }
305}
306
307fn length_prefixed_bytes<B: AsRef<[u8]>>(b: B, out: &mut Vec<u8>) -> usize {
308    let prefix_len = leb128::write::unsigned(out, b.as_ref().len() as u64).unwrap();
309    out.write_all(b.as_ref()).unwrap();
310    prefix_len + b.as_ref().len()
311}
312
313// Bunch of type safe builder boilerplate
314pub(crate) struct Unset;
315pub(crate) struct Set<T> {
316    value: T,
317}
318
319#[allow(non_camel_case_types)]
320pub(crate) struct ChangeBuilder<START_OP, ACTOR, SEQ, TIME> {
321    dependencies: Vec<ChangeHash>,
322    actor: ACTOR,
323    seq: SEQ,
324    start_op: START_OP,
325    timestamp: TIME,
326    message: Option<String>,
327    extra_bytes: Option<Vec<u8>>,
328}
329
330impl ChangeBuilder<Unset, Unset, Unset, Unset> {
331    pub(crate) fn new() -> Self {
332        Self {
333            dependencies: vec![],
334            actor: Unset,
335            seq: Unset,
336            start_op: Unset,
337            timestamp: Unset,
338            message: None,
339            extra_bytes: None,
340        }
341    }
342}
343
344#[allow(non_camel_case_types)]
345impl<START_OP, ACTOR, SEQ, TIME> ChangeBuilder<START_OP, ACTOR, SEQ, TIME> {
346    pub(crate) fn with_dependencies(self, mut dependencies: Vec<ChangeHash>) -> Self {
347        dependencies.sort_unstable();
348        Self {
349            dependencies,
350            ..self
351        }
352    }
353
354    pub(crate) fn with_message(self, message: Option<String>) -> Self {
355        Self { message, ..self }
356    }
357
358    pub(crate) fn with_extra_bytes(self, extra_bytes: Vec<u8>) -> Self {
359        Self {
360            extra_bytes: Some(extra_bytes),
361            ..self
362        }
363    }
364}
365
366#[allow(non_camel_case_types)]
367impl<START_OP, ACTOR, TIME> ChangeBuilder<START_OP, ACTOR, Unset, TIME> {
368    pub(crate) fn with_seq(self, seq: u64) -> ChangeBuilder<START_OP, ACTOR, Set<u64>, TIME> {
369        ChangeBuilder {
370            dependencies: self.dependencies,
371            actor: self.actor,
372            seq: Set { value: seq },
373            start_op: self.start_op,
374            timestamp: self.timestamp,
375            message: self.message,
376            extra_bytes: self.extra_bytes,
377        }
378    }
379}
380
381#[allow(non_camel_case_types)]
382impl<START_OP, SEQ, TIME> ChangeBuilder<START_OP, Unset, SEQ, TIME> {
383    pub(crate) fn with_actor(
384        self,
385        actor: ActorId,
386    ) -> ChangeBuilder<START_OP, Set<ActorId>, SEQ, TIME> {
387        ChangeBuilder {
388            dependencies: self.dependencies,
389            actor: Set { value: actor },
390            seq: self.seq,
391            start_op: self.start_op,
392            timestamp: self.timestamp,
393            message: self.message,
394            extra_bytes: self.extra_bytes,
395        }
396    }
397}
398
399impl<ACTOR, SEQ, TIME> ChangeBuilder<Unset, ACTOR, SEQ, TIME> {
400    pub(crate) fn with_start_op(
401        self,
402        start_op: NonZeroU64,
403    ) -> ChangeBuilder<Set<NonZeroU64>, ACTOR, SEQ, TIME> {
404        ChangeBuilder {
405            dependencies: self.dependencies,
406            actor: self.actor,
407            seq: self.seq,
408            start_op: Set { value: start_op },
409            timestamp: self.timestamp,
410            message: self.message,
411            extra_bytes: self.extra_bytes,
412        }
413    }
414}
415
416#[allow(non_camel_case_types)]
417impl<START_OP, ACTOR, SEQ> ChangeBuilder<START_OP, ACTOR, SEQ, Unset> {
418    pub(crate) fn with_timestamp(self, time: i64) -> ChangeBuilder<START_OP, ACTOR, SEQ, Set<i64>> {
419        ChangeBuilder {
420            dependencies: self.dependencies,
421            actor: self.actor,
422            seq: self.seq,
423            start_op: self.start_op,
424            timestamp: Set { value: time },
425            message: self.message,
426            extra_bytes: self.extra_bytes,
427        }
428    }
429}
430
431/// A row to be encoded as a change op
432///
433/// The lifetime `'a` is the lifetime of the value and key data types. For types which cannot
434/// provide a reference (e.g. because they are decoding from some columnar storage on each
435/// iteration) this should be `'static`.
436pub(crate) trait AsChangeOp<'a> {
437    /// The type of the Actor ID component of the op IDs for this impl. This is typically either
438    /// `&'a ActorID` or `usize`
439    type ActorId;
440    /// The type of the op IDs this impl produces.
441    type OpId: convert::OpId<Self::ActorId>;
442    /// The type of the predecessor iterator returned by `Self::pred`. This can often be omitted
443    type PredIter: Iterator<Item = Self::OpId> + ExactSizeIterator;
444
445    fn obj(&self) -> convert::ObjId<Self::OpId>;
446    fn key(&self) -> convert::Key<'a, Self::OpId>;
447    fn insert(&self) -> bool;
448    fn action(&self) -> u64;
449    fn val(&self) -> Cow<'a, ScalarValue>;
450    fn pred(&self) -> Self::PredIter;
451    fn expand(&self) -> bool;
452    fn mark_name(&self) -> Option<Cow<'a, smol_str::SmolStr>>;
453}
454
455impl ChangeBuilder<Set<NonZeroU64>, Set<ActorId>, Set<u64>, Set<i64>> {
456    pub(crate) fn build<'a, 'b, A, I, O>(
457        self,
458        ops: I,
459    ) -> Result<Change<'static, Verified>, PredOutOfOrder>
460    where
461        A: AsChangeOp<'a, OpId = O> + 'a + std::fmt::Debug,
462        O: convert::OpId<&'a ActorId> + 'a,
463        I: Iterator<Item = A> + Clone + 'a + ExactSizeIterator,
464    {
465        let num_ops = ops.len();
466        let mut col_data = Vec::new();
467        let actors = change_actors::ChangeActors::new(self.actor.value, ops)?;
468        let cols = ChangeOpsColumns::encode(actors.iter(), &mut col_data);
469
470        let (actor, other_actors) = actors.done();
471
472        let mut data = Vec::with_capacity(col_data.len());
473        leb128::write::unsigned(&mut data, self.dependencies.len() as u64).unwrap();
474        for dep in &self.dependencies {
475            data.write_all(dep.as_bytes()).unwrap();
476        }
477        length_prefixed_bytes(&actor, &mut data);
478        leb128::write::unsigned(&mut data, self.seq.value).unwrap();
479        leb128::write::unsigned(&mut data, self.start_op.value.into()).unwrap();
480        leb128::write::signed(&mut data, self.timestamp.value).unwrap();
481        length_prefixed_bytes(
482            self.message.as_ref().map(|m| m.as_bytes()).unwrap_or(&[]),
483            &mut data,
484        );
485        leb128::write::unsigned(&mut data, other_actors.len() as u64).unwrap();
486        for actor in other_actors.iter() {
487            length_prefixed_bytes(actor, &mut data);
488        }
489        cols.raw_columns().write(&mut data);
490        let ops_data_start = data.len();
491        let ops_data = ops_data_start..(ops_data_start + col_data.len());
492
493        data.extend(col_data);
494        let extra_bytes =
495            data.len()..(data.len() + self.extra_bytes.as_ref().map(|e| e.len()).unwrap_or(0));
496        if let Some(extra) = self.extra_bytes {
497            data.extend(extra);
498        }
499
500        let header = Header::new(ChunkType::Change, &data);
501
502        let mut bytes = Vec::with_capacity(header.len() + data.len());
503        header.write(&mut bytes);
504        bytes.extend(data);
505
506        let ops_data = shift_range(ops_data, header.len());
507        let extra_bytes = shift_range(extra_bytes, header.len());
508
509        Ok(Change {
510            bytes: Cow::Owned(bytes),
511            header,
512            dependencies: self.dependencies,
513            actor,
514            other_actors,
515            seq: self.seq.value,
516            start_op: self.start_op.value,
517            timestamp: self.timestamp.value,
518            message: self.message,
519            ops_meta: cols,
520            ops_data,
521            extra_bytes,
522            num_ops,
523            _phantom: PhantomData,
524        })
525    }
526}