1use std::{borrow::Cow, io::Write, marker::PhantomData, num::NonZeroU64, ops::Range};
2
3use crate::{convert, ActorId, ChangeHash, ScalarValue};
4
5use super::{parse, shift_range, CheckSum, ChunkType, Columns, Header, RawColumns};
6
7mod change_op_columns;
8pub(crate) use change_op_columns::ChangeOpsColumns;
9pub(crate) use change_op_columns::{ChangeOp, ReadChangeOpError};
10
11mod change_actors;
12pub(crate) use change_actors::PredOutOfOrder;
13mod compressed;
14mod op_with_change_actors;
15pub(crate) use compressed::Compressed;
16
17pub(crate) const DEFLATE_MIN_SIZE: usize = 256;
18
19pub(crate) trait OpReadState {}
25#[derive(Debug, Clone, PartialEq)]
26pub(crate) struct Verified;
27#[derive(Debug, Clone, PartialEq)]
28pub(crate) struct Unverified;
29impl OpReadState for Verified {}
30impl OpReadState for Unverified {}
31
32#[derive(Clone, Debug)]
44pub(crate) struct Change<'a, O: OpReadState> {
45 pub(crate) bytes: Cow<'a, [u8]>,
47 pub(crate) header: Header,
48 pub(crate) dependencies: Vec<ChangeHash>,
49 pub(crate) actor: ActorId,
50 pub(crate) other_actors: Vec<ActorId>,
51 pub(crate) seq: u64,
52 pub(crate) start_op: NonZeroU64,
53 pub(crate) timestamp: i64,
54 pub(crate) message: Option<String>,
55 pub(crate) ops_meta: ChangeOpsColumns,
56 pub(crate) ops_data: Range<usize>,
58 pub(crate) extra_bytes: Range<usize>,
59 pub(crate) num_ops: usize,
60 pub(crate) _phantom: PhantomData<O>,
61}
62
63impl<O: OpReadState> PartialEq for Change<'_, O> {
64 fn eq(&self, other: &Self) -> bool {
65 self.bytes == other.bytes
66 }
67}
68
69#[derive(thiserror::Error, Debug)]
70pub(crate) enum ParseError {
71 #[error(transparent)]
72 Leb128(#[from] parse::leb128::Error),
73 #[error(transparent)]
74 InvalidUtf8(#[from] parse::InvalidUtf8),
75 #[error("failed to parse change columns: {0}")]
76 RawColumns(#[from] crate::storage::columns::raw_column::ParseError),
77 #[error("failed to parse header: {0}")]
78 Header(#[from] super::chunk::error::Header),
79 #[error("change contained compressed columns")]
80 CompressedChangeCols,
81 #[error("invalid change cols: {0}")]
82 InvalidColumns(Box<dyn std::error::Error + Send + Sync + 'static>),
83}
84
85impl<'a> Change<'a, Unverified> {
86 pub(crate) fn parse(
87 input: parse::Input<'a>,
88 ) -> parse::ParseResult<'a, Change<'a, Unverified>, ParseError> {
89 let (i, header) = Header::parse(input)?;
91 let parse::Split {
92 first: chunk_input,
93 remaining,
94 } = i.split(header.data_bytes().len());
95 let (_, change) = Self::parse_following_header(chunk_input, header)?;
96 Ok((remaining, change))
97 }
98
99 pub(crate) fn parse_following_header(
101 input: parse::Input<'a>,
102 header: Header,
103 ) -> parse::ParseResult<'a, Change<'a, Unverified>, ParseError> {
104 let (i, deps) = parse::length_prefixed(parse::change_hash)(input)?;
105 let (i, actor) = parse::actor_id(i)?;
106 let (i, seq) = parse::leb128_u64(i)?;
107 let (i, start_op) = parse::nonzero_leb128_u64(i)?;
108 let (i, timestamp) = parse::leb128_i64(i)?;
109 let (i, message_len) = parse::leb128_u64(i)?;
110 let (i, message) = parse::utf_8(message_len as usize, i)?;
111 let (i, other_actors) = parse::length_prefixed(parse::actor_id)(i)?;
112 let (i, ops_meta) = RawColumns::parse(i)?;
113 let (
114 i,
115 parse::RangeOf {
116 range: ops_data, ..
117 },
118 ) = parse::range_of(|i| parse::take_n(ops_meta.total_column_len(), i), i)?;
119
120 let (
121 _i,
122 parse::RangeOf {
123 range: extra_bytes, ..
124 },
125 ) = parse::range_of(parse::take_rest, i)?;
126
127 let ops_meta = ops_meta
128 .uncompressed()
129 .ok_or(parse::ParseError::Error(ParseError::CompressedChangeCols))?;
130
131 let col_layout = Columns::parse2(ops_data.len(), ops_meta.iter())
132 .map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?;
133 let ops_meta = ChangeOpsColumns::try_from(col_layout)
134 .map_err(|e| parse::ParseError::Error(ParseError::InvalidColumns(Box::new(e))))?;
135
136 Ok((
137 parse::Input::empty(),
138 Change {
139 bytes: input.bytes().into(),
140 header,
141 dependencies: deps,
142 actor,
143 other_actors,
144 seq,
145 start_op,
146 timestamp,
147 message: if message.is_empty() {
148 None
149 } else {
150 Some(message)
151 },
152 ops_meta,
153 ops_data,
154 extra_bytes,
155 num_ops: 0,
156 _phantom: PhantomData,
157 },
158 ))
159 }
160
161 pub(crate) fn iter_ops(
164 &'a self,
165 ) -> impl Iterator<Item = Result<ChangeOp, ReadChangeOpError>> + Clone + 'a {
166 self.ops_meta.iter(self.ops_data())
167 }
168
169 pub(crate) fn verify_ops<F: FnMut(ChangeOp)>(
177 self,
178 mut f: F,
179 ) -> Result<Change<'a, Verified>, ReadChangeOpError> {
180 let mut num_ops = 0;
181 for op in self.iter_ops() {
182 f(op?);
183 num_ops += 1;
184 }
185 if u32::try_from(u64::from(self.start_op)).is_err() {
186 return Err(ReadChangeOpError::CounterTooLarge);
187 }
188 Ok(Change {
189 bytes: self.bytes,
190 header: self.header,
191 dependencies: self.dependencies,
192 actor: self.actor,
193 other_actors: self.other_actors,
194 seq: self.seq,
195 start_op: self.start_op,
196 timestamp: self.timestamp,
197 message: self.message,
198 ops_meta: self.ops_meta,
199 ops_data: self.ops_data,
200 extra_bytes: self.extra_bytes,
201 num_ops,
202 _phantom: PhantomData,
203 })
204 }
205}
206
207impl<'a> Change<'a, Verified> {
208 pub(crate) fn len(&self) -> usize {
209 self.num_ops
210 }
211
212 pub(crate) fn builder() -> ChangeBuilder<Unset, Unset, Unset, Unset> {
213 ChangeBuilder::new()
214 }
215
216 pub(crate) fn iter_ops(&'a self) -> impl Iterator<Item = ChangeOp> + Clone + 'a {
217 self.ops_meta.iter(self.ops_data()).map(|o| o.unwrap())
220 }
221}
222
223impl<O: OpReadState> Change<'_, O> {
224 pub(crate) fn checksum(&self) -> CheckSum {
225 self.header.checksum()
226 }
227
228 pub(crate) fn actor(&self) -> &ActorId {
229 &self.actor
230 }
231 pub(crate) fn other_actors(&self) -> &[ActorId] {
232 &self.other_actors
233 }
234
235 pub(crate) fn start_op(&self) -> NonZeroU64 {
236 self.start_op
237 }
238
239 pub(crate) fn message(&self) -> &Option<String> {
240 &self.message
241 }
242
243 pub(crate) fn dependencies(&self) -> &[ChangeHash] {
244 &self.dependencies
245 }
246
247 pub(crate) fn seq(&self) -> u64 {
248 self.seq
249 }
250
251 pub(crate) fn timestamp(&self) -> i64 {
252 self.timestamp
253 }
254
255 pub(crate) fn extra_bytes(&self) -> &[u8] {
256 &self.bytes[self.extra_bytes.clone()]
257 }
258
259 pub(crate) fn checksum_valid(&self) -> bool {
260 self.header.checksum_valid()
261 }
262
263 pub(crate) fn body_bytes(&self) -> &[u8] {
264 &self.bytes[self.header.len()..]
265 }
266
267 pub(crate) fn bytes(&self) -> &[u8] {
268 &self.bytes
269 }
270
271 pub(crate) fn hash(&self) -> ChangeHash {
272 self.header.hash()
273 }
274
275 pub(crate) fn ops_data(&self) -> &[u8] {
276 &self.bytes[self.ops_data.clone()]
277 }
278
279 pub(crate) fn into_owned(self) -> Change<'static, O> {
280 Change {
281 dependencies: self.dependencies,
282 bytes: Cow::Owned(self.bytes.into_owned()),
283 header: self.header,
284 actor: self.actor,
285 other_actors: self.other_actors,
286 seq: self.seq,
287 start_op: self.start_op,
288 timestamp: self.timestamp,
289 message: self.message,
290 ops_meta: self.ops_meta,
291 ops_data: self.ops_data,
292 num_ops: self.num_ops,
293 extra_bytes: self.extra_bytes,
294 _phantom: PhantomData,
295 }
296 }
297
298 pub(crate) fn compress(&self) -> Option<Compressed<'static>> {
299 if self.bytes.len() > DEFLATE_MIN_SIZE {
300 Some(Compressed::compress(self))
301 } else {
302 None
303 }
304 }
305}
306
307fn length_prefixed_bytes<B: AsRef<[u8]>>(b: B, out: &mut Vec<u8>) -> usize {
308 let prefix_len = leb128::write::unsigned(out, b.as_ref().len() as u64).unwrap();
309 out.write_all(b.as_ref()).unwrap();
310 prefix_len + b.as_ref().len()
311}
312
313pub(crate) struct Unset;
315pub(crate) struct Set<T> {
316 value: T,
317}
318
319#[allow(non_camel_case_types)]
320pub(crate) struct ChangeBuilder<START_OP, ACTOR, SEQ, TIME> {
321 dependencies: Vec<ChangeHash>,
322 actor: ACTOR,
323 seq: SEQ,
324 start_op: START_OP,
325 timestamp: TIME,
326 message: Option<String>,
327 extra_bytes: Option<Vec<u8>>,
328}
329
330impl ChangeBuilder<Unset, Unset, Unset, Unset> {
331 pub(crate) fn new() -> Self {
332 Self {
333 dependencies: vec![],
334 actor: Unset,
335 seq: Unset,
336 start_op: Unset,
337 timestamp: Unset,
338 message: None,
339 extra_bytes: None,
340 }
341 }
342}
343
344#[allow(non_camel_case_types)]
345impl<START_OP, ACTOR, SEQ, TIME> ChangeBuilder<START_OP, ACTOR, SEQ, TIME> {
346 pub(crate) fn with_dependencies(self, mut dependencies: Vec<ChangeHash>) -> Self {
347 dependencies.sort_unstable();
348 Self {
349 dependencies,
350 ..self
351 }
352 }
353
354 pub(crate) fn with_message(self, message: Option<String>) -> Self {
355 Self { message, ..self }
356 }
357
358 pub(crate) fn with_extra_bytes(self, extra_bytes: Vec<u8>) -> Self {
359 Self {
360 extra_bytes: Some(extra_bytes),
361 ..self
362 }
363 }
364}
365
366#[allow(non_camel_case_types)]
367impl<START_OP, ACTOR, TIME> ChangeBuilder<START_OP, ACTOR, Unset, TIME> {
368 pub(crate) fn with_seq(self, seq: u64) -> ChangeBuilder<START_OP, ACTOR, Set<u64>, TIME> {
369 ChangeBuilder {
370 dependencies: self.dependencies,
371 actor: self.actor,
372 seq: Set { value: seq },
373 start_op: self.start_op,
374 timestamp: self.timestamp,
375 message: self.message,
376 extra_bytes: self.extra_bytes,
377 }
378 }
379}
380
381#[allow(non_camel_case_types)]
382impl<START_OP, SEQ, TIME> ChangeBuilder<START_OP, Unset, SEQ, TIME> {
383 pub(crate) fn with_actor(
384 self,
385 actor: ActorId,
386 ) -> ChangeBuilder<START_OP, Set<ActorId>, SEQ, TIME> {
387 ChangeBuilder {
388 dependencies: self.dependencies,
389 actor: Set { value: actor },
390 seq: self.seq,
391 start_op: self.start_op,
392 timestamp: self.timestamp,
393 message: self.message,
394 extra_bytes: self.extra_bytes,
395 }
396 }
397}
398
399impl<ACTOR, SEQ, TIME> ChangeBuilder<Unset, ACTOR, SEQ, TIME> {
400 pub(crate) fn with_start_op(
401 self,
402 start_op: NonZeroU64,
403 ) -> ChangeBuilder<Set<NonZeroU64>, ACTOR, SEQ, TIME> {
404 ChangeBuilder {
405 dependencies: self.dependencies,
406 actor: self.actor,
407 seq: self.seq,
408 start_op: Set { value: start_op },
409 timestamp: self.timestamp,
410 message: self.message,
411 extra_bytes: self.extra_bytes,
412 }
413 }
414}
415
416#[allow(non_camel_case_types)]
417impl<START_OP, ACTOR, SEQ> ChangeBuilder<START_OP, ACTOR, SEQ, Unset> {
418 pub(crate) fn with_timestamp(self, time: i64) -> ChangeBuilder<START_OP, ACTOR, SEQ, Set<i64>> {
419 ChangeBuilder {
420 dependencies: self.dependencies,
421 actor: self.actor,
422 seq: self.seq,
423 start_op: self.start_op,
424 timestamp: Set { value: time },
425 message: self.message,
426 extra_bytes: self.extra_bytes,
427 }
428 }
429}
430
431pub(crate) trait AsChangeOp<'a> {
437 type ActorId;
440 type OpId: convert::OpId<Self::ActorId>;
442 type PredIter: Iterator<Item = Self::OpId> + ExactSizeIterator;
444
445 fn obj(&self) -> convert::ObjId<Self::OpId>;
446 fn key(&self) -> convert::Key<'a, Self::OpId>;
447 fn insert(&self) -> bool;
448 fn action(&self) -> u64;
449 fn val(&self) -> Cow<'a, ScalarValue>;
450 fn pred(&self) -> Self::PredIter;
451 fn expand(&self) -> bool;
452 fn mark_name(&self) -> Option<Cow<'a, smol_str::SmolStr>>;
453}
454
455impl ChangeBuilder<Set<NonZeroU64>, Set<ActorId>, Set<u64>, Set<i64>> {
456 pub(crate) fn build<'a, 'b, A, I, O>(
457 self,
458 ops: I,
459 ) -> Result<Change<'static, Verified>, PredOutOfOrder>
460 where
461 A: AsChangeOp<'a, OpId = O> + 'a + std::fmt::Debug,
462 O: convert::OpId<&'a ActorId> + 'a,
463 I: Iterator<Item = A> + Clone + 'a + ExactSizeIterator,
464 {
465 let num_ops = ops.len();
466 let mut col_data = Vec::new();
467 let actors = change_actors::ChangeActors::new(self.actor.value, ops)?;
468 let cols = ChangeOpsColumns::encode(actors.iter(), &mut col_data);
469
470 let (actor, other_actors) = actors.done();
471
472 let mut data = Vec::with_capacity(col_data.len());
473 leb128::write::unsigned(&mut data, self.dependencies.len() as u64).unwrap();
474 for dep in &self.dependencies {
475 data.write_all(dep.as_bytes()).unwrap();
476 }
477 length_prefixed_bytes(&actor, &mut data);
478 leb128::write::unsigned(&mut data, self.seq.value).unwrap();
479 leb128::write::unsigned(&mut data, self.start_op.value.into()).unwrap();
480 leb128::write::signed(&mut data, self.timestamp.value).unwrap();
481 length_prefixed_bytes(
482 self.message.as_ref().map(|m| m.as_bytes()).unwrap_or(&[]),
483 &mut data,
484 );
485 leb128::write::unsigned(&mut data, other_actors.len() as u64).unwrap();
486 for actor in other_actors.iter() {
487 length_prefixed_bytes(actor, &mut data);
488 }
489 cols.raw_columns().write(&mut data);
490 let ops_data_start = data.len();
491 let ops_data = ops_data_start..(ops_data_start + col_data.len());
492
493 data.extend(col_data);
494 let extra_bytes =
495 data.len()..(data.len() + self.extra_bytes.as_ref().map(|e| e.len()).unwrap_or(0));
496 if let Some(extra) = self.extra_bytes {
497 data.extend(extra);
498 }
499
500 let header = Header::new(ChunkType::Change, &data);
501
502 let mut bytes = Vec::with_capacity(header.len() + data.len());
503 header.write(&mut bytes);
504 bytes.extend(data);
505
506 let ops_data = shift_range(ops_data, header.len());
507 let extra_bytes = shift_range(extra_bytes, header.len());
508
509 Ok(Change {
510 bytes: Cow::Owned(bytes),
511 header,
512 dependencies: self.dependencies,
513 actor,
514 other_actors,
515 seq: self.seq.value,
516 start_op: self.start_op.value,
517 timestamp: self.timestamp.value,
518 message: self.message,
519 ops_meta: cols,
520 ops_data,
521 extra_bytes,
522 num_ops,
523 _phantom: PhantomData,
524 })
525 }
526}