spacetimedb_commitlog/payload.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
use std::sync::Arc;
use spacetimedb_sats::{
bsatn,
buffer::{BufReader, BufWriter, DecodeError},
ser::Serialize,
ProductValue,
};
use thiserror::Error;
pub mod txdata;
pub use txdata::Txdata;
/// A **datatype** which can be encoded.
///
/// The transaction payload of the commitlog (i.e. individual records in the log)
/// must satisfy this trait.
pub trait Encode {
/// Encode `self` to the given buffer.
fn encode_record<W: BufWriter>(&self, writer: &mut W);
}
impl<T: Encode> Encode for Arc<T> {
fn encode_record<W: BufWriter>(&self, writer: &mut W) {
(**self).encode_record(writer)
}
}
impl Encode for ProductValue {
fn encode_record<W: BufWriter>(&self, writer: &mut W) {
self.serialize(bsatn::Serializer::new(writer))
.expect("bsatn serialize should never fail");
}
}
impl Encode for () {
fn encode_record<W: BufWriter>(&self, _writer: &mut W) {}
}
/// A decoder which can decode the transaction (aka record) format of the log.
///
/// Unlike [`Encode`], this is not a datatype: the canonical commitlog format
/// requires to look up row types during log traversal in order to be able to
/// decode.
pub trait Decoder {
/// The type of records this decoder can decode.
/// This is also the type which can be appended to a commitlog, and so must
/// satisfy [`Encode`].
type Record: Encode;
/// The type of decode errors, which must subsume [`DecodeError`].
type Error: From<DecodeError>;
/// Decode one [`Self::Record`] from the given buffer.
///
/// The `version` argument corresponds to the log format version of the
/// current segment (see [`crate::segment::Header::log_format_version`]).
///
/// The `tx_argument` is the transaction offset of the current record
/// relative to the start of the log.
fn decode_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> Result<Self::Record, Self::Error>;
/// Variant of [`Self::decode_record`] which discards the decoded
/// [`Self::Record`].
///
/// Useful for folds which don't need to yield or collect record values.
///
/// The default implementation just drops the record returned from
/// [`Self::decode_record`]. Implementations may want to override this, such
/// that the record is not allocated in the first place.
fn consume_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> Result<(), Self::Error> {
self.decode_record(version, tx_offset, reader).map(drop)
}
/// Advance `reader` past the next [`Self::Record`], without returning it
/// or including it in a fold.
fn skip_record<'a, R: BufReader<'a>>(&self, version: u8, tx_offset: u64, reader: &mut R)
-> Result<(), Self::Error>;
}
impl<const N: usize> Encode for [u8; N] {
fn encode_record<W: BufWriter>(&self, writer: &mut W) {
writer.put_slice(&self[..])
}
}
#[derive(Debug, Error)]
pub enum ArrayDecodeError {
#[error(transparent)]
Decode(#[from] DecodeError),
#[error(transparent)]
Traversal(#[from] crate::error::Traversal),
#[error(transparent)]
Io(#[from] std::io::Error),
}
pub struct ArrayDecoder<const N: usize>;
impl<const N: usize> Decoder for ArrayDecoder<N> {
type Record = [u8; N];
type Error = ArrayDecodeError;
fn decode_record<'a, R: BufReader<'a>>(
&self,
_version: u8,
_tx_offset: u64,
reader: &mut R,
) -> Result<Self::Record, Self::Error> {
Ok(reader.get_array()?)
}
fn skip_record<'a, R: BufReader<'a>>(
&self,
version: u8,
tx_offset: u64,
reader: &mut R,
) -> Result<(), Self::Error> {
self.decode_record(version, tx_offset, reader).map(drop)
}
}