1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::sync::Arc;

use spacetimedb_sats::{
    bsatn,
    buffer::{BufReader, BufWriter, DecodeError},
    ser::Serialize,
    ProductValue,
};
use thiserror::Error;

pub mod txdata;
pub use txdata::Txdata;

/// A **datatype** which can be encoded.
///
/// The transaction payload of the commitlog (i.e. individual records in the log)
/// must satisfy this trait.
pub trait Encode {
    /// Encode `self` to the given buffer.
    fn encode_record<W: BufWriter>(&self, writer: &mut W);
}

impl<T: Encode> Encode for Arc<T> {
    fn encode_record<W: BufWriter>(&self, writer: &mut W) {
        (**self).encode_record(writer)
    }
}

impl Encode for ProductValue {
    fn encode_record<W: BufWriter>(&self, writer: &mut W) {
        self.serialize(bsatn::Serializer::new(writer))
            .expect("bsatn serialize should never fail");
    }
}

impl Encode for () {
    fn encode_record<W: BufWriter>(&self, _writer: &mut W) {}
}

/// A decoder which can decode the transaction (aka record) format of the log.
///
/// Unlike [`Encode`], this is not a datatype: the canonical commitlog format
/// requires to look up row types during log traversal in order to be able to
/// decode (see also [`RowDecoder`]).
pub trait Decoder {
    /// The type of records this decoder can decode.
    /// This is also the type which can be appended to a commitlog, and so must
    /// satisfy [`Encode`].
    type Record: Encode;
    /// The type of decode errors, which must subsume [`DecodeError`].
    type Error: From<DecodeError>;

    /// Decode one [`Self::Record`] from the given buffer.
    ///
    /// The `version` argument corresponds to the log format version of the
    /// current segment (see [`segment::Header::log_format_version`]).
    ///
    /// The `tx_argument` is the transaction offset of the current record
    /// relative to the start of the log.
    fn decode_record<'a, R: BufReader<'a>>(
        &self,
        version: u8,
        tx_offset: u64,
        reader: &mut R,
    ) -> Result<Self::Record, Self::Error>;
}

impl<const N: usize> Encode for [u8; N] {
    fn encode_record<W: BufWriter>(&self, writer: &mut W) {
        writer.put_slice(&self[..])
    }
}

#[derive(Debug, Error)]
pub enum ArrayDecodeError {
    #[error(transparent)]
    Decode(#[from] DecodeError),
    #[error(transparent)]
    Traversal(#[from] crate::error::Traversal),
    #[error(transparent)]
    Io(#[from] std::io::Error),
}

pub struct ArrayDecoder<const N: usize>;

impl<const N: usize> Decoder for ArrayDecoder<N> {
    type Record = [u8; N];
    type Error = ArrayDecodeError;

    fn decode_record<'a, R: BufReader<'a>>(
        &self,
        _version: u8,
        _tx_offset: u64,
        reader: &mut R,
    ) -> Result<Self::Record, Self::Error> {
        Ok(reader.get_array()?)
    }
}