spacetimedb_commitlog/
payload.rs

1use std::sync::Arc;
2
3use spacetimedb_sats::{
4    bsatn,
5    buffer::{BufReader, BufWriter, DecodeError},
6    ser::Serialize,
7    ProductValue,
8};
9use thiserror::Error;
10
11pub mod txdata;
12pub use txdata::Txdata;
13
14/// A **datatype** which can be encoded.
15///
16/// The transaction payload of the commitlog (i.e. individual records in the log)
17/// must satisfy this trait.
18pub trait Encode {
19    /// Encode `self` to the given buffer.
20    fn encode_record<W: BufWriter>(&self, writer: &mut W);
21}
22
23impl<T: Encode> Encode for Arc<T> {
24    fn encode_record<W: BufWriter>(&self, writer: &mut W) {
25        (**self).encode_record(writer)
26    }
27}
28
29impl Encode for ProductValue {
30    fn encode_record<W: BufWriter>(&self, writer: &mut W) {
31        self.serialize(bsatn::Serializer::new(writer))
32            .expect("bsatn serialize should never fail");
33    }
34}
35
36impl Encode for () {
37    fn encode_record<W: BufWriter>(&self, _writer: &mut W) {}
38}
39
40/// A decoder which can decode the transaction (aka record) format of the log.
41///
42/// Unlike [`Encode`], this is not a datatype: the canonical commitlog format
43/// requires to look up row types during log traversal in order to be able to
44/// decode.
45pub trait Decoder {
46    /// The type of records this decoder can decode.
47    /// This is also the type which can be appended to a commitlog, and so must
48    /// satisfy [`Encode`].
49    type Record: Encode;
50    /// The type of decode errors, which must subsume [`DecodeError`].
51    type Error: From<DecodeError>;
52
53    /// Decode one [`Self::Record`] from the given buffer.
54    ///
55    /// The `version` argument corresponds to the log format version of the
56    /// current segment (see [`crate::segment::Header::log_format_version`]).
57    ///
58    /// The `tx_argument` is the transaction offset of the current record
59    /// relative to the start of the log.
60    fn decode_record<'a, R: BufReader<'a>>(
61        &self,
62        version: u8,
63        tx_offset: u64,
64        reader: &mut R,
65    ) -> Result<Self::Record, Self::Error>;
66
67    /// Variant of [`Self::decode_record`] which discards the decoded
68    /// [`Self::Record`].
69    ///
70    /// Useful for folds which don't need to yield or collect record values.
71    ///
72    /// The default implementation just drops the record returned from
73    /// [`Self::decode_record`]. Implementations may want to override this, such
74    /// that the record is not allocated in the first place.
75    fn consume_record<'a, R: BufReader<'a>>(
76        &self,
77        version: u8,
78        tx_offset: u64,
79        reader: &mut R,
80    ) -> Result<(), Self::Error> {
81        self.decode_record(version, tx_offset, reader).map(drop)
82    }
83
84    /// Advance `reader` past the next [`Self::Record`], without returning it
85    /// or including it in a fold.
86    fn skip_record<'a, R: BufReader<'a>>(&self, version: u8, tx_offset: u64, reader: &mut R)
87        -> Result<(), Self::Error>;
88}
89
90impl<const N: usize> Encode for [u8; N] {
91    fn encode_record<W: BufWriter>(&self, writer: &mut W) {
92        writer.put_slice(&self[..])
93    }
94}
95
96#[derive(Debug, Error)]
97pub enum ArrayDecodeError {
98    #[error(transparent)]
99    Decode(#[from] DecodeError),
100    #[error(transparent)]
101    Traversal(#[from] crate::error::Traversal),
102    #[error(transparent)]
103    Io(#[from] std::io::Error),
104}
105
106pub struct ArrayDecoder<const N: usize>;
107
108impl<const N: usize> Decoder for ArrayDecoder<N> {
109    type Record = [u8; N];
110    type Error = ArrayDecodeError;
111
112    fn decode_record<'a, R: BufReader<'a>>(
113        &self,
114        _version: u8,
115        _tx_offset: u64,
116        reader: &mut R,
117    ) -> Result<Self::Record, Self::Error> {
118        Ok(*reader.get_array()?)
119    }
120
121    fn skip_record<'a, R: BufReader<'a>>(
122        &self,
123        version: u8,
124        tx_offset: u64,
125        reader: &mut R,
126    ) -> Result<(), Self::Error> {
127        self.decode_record(version, tx_offset, reader).map(drop)
128    }
129}