spacetimedb_commitlog/
commit.rs

1use std::{
2    io::{self, Read, Write},
3    ops::Range,
4};
5
6use crc32c::{Crc32cReader, Crc32cWriter};
7use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};
8
9use crate::{
10    error::ChecksumMismatch,
11    payload::Decoder,
12    segment::{CHECKSUM_ALGORITHM_CRC32C, CHECKSUM_CRC32C_LEN},
13    Transaction, DEFAULT_LOG_FORMAT_VERSION,
14};
15
16#[derive(Default)]
17enum Version {
18    V0,
19    #[default]
20    V1,
21}
22
23pub struct Header {
24    pub min_tx_offset: u64,
25    pub epoch: u64,
26    pub n: u16,
27    pub len: u32,
28}
29
30impl Header {
31    pub const LEN: usize = /* offset */ 8 + /* epoch */ 8 + /* n */ 2 + /* len */  4;
32
33    /// Read [`Self::LEN`] bytes from `reader` and interpret them as the
34    /// "header" of a [`Commit`].
35    ///
36    /// Returns `None` if:
37    ///
38    /// - The reader cannot provide exactly [`Self::LEN`] bytes
39    ///
40    ///   I.e. it is at EOF
41    ///
42    /// - Or, the read bytes are all zeroes
43    ///
44    ///   This is to allow preallocation of segments.
45    ///
46    pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
47        Self::decode_v1(reader)
48    }
49
50    fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
51        use Version::*;
52        match v {
53            V0 => Self::decode_v0(reader),
54            V1 => Self::decode_v1(reader),
55        }
56    }
57
58    fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
59        let mut hdr = [0; Self::LEN - 8];
60        if let Err(e) = reader.read_exact(&mut hdr) {
61            if e.kind() == io::ErrorKind::UnexpectedEof {
62                return Ok(None);
63            }
64
65            return Err(e);
66        }
67        match &mut hdr.as_slice() {
68            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
69            buf => {
70                let min_tx_offset = buf.get_u64().map_err(decode_error)?;
71                let n = buf.get_u16().map_err(decode_error)?;
72                let len = buf.get_u32().map_err(decode_error)?;
73
74                Ok(Some(Self {
75                    min_tx_offset,
76                    epoch: Commit::DEFAULT_EPOCH,
77                    n,
78                    len,
79                }))
80            }
81        }
82    }
83
84    fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
85        let mut hdr = [0; Self::LEN];
86        if let Err(e) = reader.read_exact(&mut hdr) {
87            if e.kind() == io::ErrorKind::UnexpectedEof {
88                return Ok(None);
89            }
90
91            return Err(e);
92        }
93        match &mut hdr.as_slice() {
94            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
95            buf => {
96                let min_tx_offset = buf.get_u64().map_err(decode_error)?;
97                let epoch = buf.get_u64().map_err(decode_error)?;
98                let n = buf.get_u16().map_err(decode_error)?;
99                let len = buf.get_u32().map_err(decode_error)?;
100
101                Ok(Some(Self {
102                    min_tx_offset,
103                    epoch,
104                    n,
105                    len,
106                }))
107            }
108        }
109    }
110}
111
112/// Entry type of a [`crate::Commitlog`].
113#[derive(Clone, Debug, Default, PartialEq)]
114#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
115pub struct Commit {
116    /// The offset of the first record in this commit.
117    ///
118    /// The offset starts from zero and is counted from the beginning of the
119    /// entire log.
120    pub min_tx_offset: u64,
121    /// The epoch within which the commit was created.
122    ///
123    /// Indicates the monotonically increasing term number of the leader when
124    /// the commitlog is being written to in a distributed deployment.
125    ///
126    /// The default epoch is 0 (zero). It should be used when the log is written
127    /// to by a single process.
128    ///
129    /// Note, however, that an existing log may have a non-zero epoch.
130    /// It is currently unspecified how a commitlog is transitioned between
131    /// distributed and single-node deployment, wrt the epoch.
132    pub epoch: u64,
133    /// The number of records in the commit.
134    pub n: u16,
135    /// A buffer of all records in the commit in serialized form.
136    ///
137    /// Readers must bring their own [`crate::Decoder`] to interpret this buffer.
138    /// `n` indicates how many records the buffer contains.
139    pub records: Vec<u8>,
140}
141
142impl Commit {
143    pub const DEFAULT_EPOCH: u64 = 0;
144
145    pub const FRAMING_LEN: usize = Header::LEN + Self::CHECKSUM_LEN;
146    pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
147    pub const CHECKSUM_LEN: usize = CHECKSUM_CRC32C_LEN;
148
149    /// The range of transaction offsets contained in this commit.
150    pub fn tx_range(&self) -> Range<u64> {
151        self.min_tx_offset..self.min_tx_offset + self.n as u64
152    }
153
154    /// Length in bytes of this commit when written to the log via [`Self::write`].
155    pub fn encoded_len(&self) -> usize {
156        Self::FRAMING_LEN + self.records.len()
157    }
158
159    /// Serialize and write `self` to `out`.
160    ///
161    /// Returns the crc32 checksum of the commit on success.
162    pub fn write<W: Write>(&self, out: W) -> io::Result<u32> {
163        let mut out = Crc32cWriter::new(out);
164
165        let min_tx_offset = self.min_tx_offset.to_le_bytes();
166        let epoch = self.epoch.to_le_bytes();
167        let n = self.n.to_le_bytes();
168        let len = (self.records.len() as u32).to_le_bytes();
169
170        out.write_all(&min_tx_offset)?;
171        out.write_all(&epoch)?;
172        out.write_all(&n)?;
173        out.write_all(&len)?;
174        out.write_all(&self.records)?;
175
176        let crc = out.crc32c();
177        let mut out = out.into_inner();
178        out.write_all(&crc.to_le_bytes())?;
179
180        Ok(crc)
181    }
182
183    /// Attempt to read one [`Commit`] from the given [`Read`]er.
184    ///
185    /// Returns `None` if the reader is already at EOF.
186    ///
187    /// Verifies the checksum of the commit. If it doesn't match, an error of
188    /// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
189    /// [`ChecksumMismatch`] is returned.
190    ///
191    /// To retain access to the checksum, use [`StoredCommit::decode`].
192    pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
193        let commit = StoredCommit::decode(reader)?;
194        Ok(commit.map(Into::into))
195    }
196
197    /// Convert `self` into an iterator yielding [`Transaction`]s.
198    ///
199    /// The supplied [`Decoder`] is responsible for extracting individual
200    /// transactions from the `records` buffer.
201    ///
202    /// `version` is the log format version of the current segment, and gets
203    /// passed to [`Decoder::decode_record`].
204    ///
205    /// `from_offset` is the transaction offset within the current commit from
206    /// which to start decoding. That is:
207    ///
208    /// * if the tx offset within the commit is smaller than `from_offset`,
209    ///   [`Decoder::skip_record`] is called.
210    ///
211    ///   The iterator does not yield a value, unless `skip_record` returns an
212    ///   error.
213    ///
214    /// * if the tx offset within the commit is greater of equal to `from_offset`,
215    ///   [`Decoder::decode_record`] is called.
216    ///
217    ///   The iterator yields the result of this call.
218    ///
219    /// * if `from_offset` doesn't fall into the current commit, the iterator
220    ///   yields nothing.
221    ///
222    pub fn into_transactions<D: Decoder>(
223        self,
224        version: u8,
225        from_offset: u64,
226        de: &D,
227    ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
228        let records = Cursor::new(self.records);
229        (self.min_tx_offset..(self.min_tx_offset + self.n as u64))
230            .scan(records, move |recs, offset| {
231                let mut cursor = &*recs;
232                let ret = if offset < from_offset {
233                    de.skip_record(version, offset, &mut cursor).err().map(Err)
234                } else {
235                    let tx = de
236                        .decode_record(version, offset, &mut cursor)
237                        .map(|txdata| Transaction { offset, txdata });
238                    Some(tx)
239                };
240
241                Some(ret)
242            })
243            .flatten()
244    }
245}
246
247impl From<StoredCommit> for Commit {
248    fn from(
249        StoredCommit {
250            min_tx_offset,
251            epoch,
252            n,
253            records,
254            checksum: _,
255        }: StoredCommit,
256    ) -> Self {
257        Self {
258            min_tx_offset,
259            epoch,
260            n,
261            records,
262        }
263    }
264}
265
266/// A [`Commit`] as stored on disk.
267///
268/// Differs from [`Commit`] only in the presence of a `checksum` field, which
269/// is computed when encoding a commit for storage.
270#[derive(Debug, PartialEq)]
271pub struct StoredCommit {
272    /// See [`Commit::min_tx_offset`].
273    pub min_tx_offset: u64,
274    /// See [`Commit::epoch`].
275    pub epoch: u64,
276    /// See [`Commit::n`].
277    pub n: u16,
278    /// See [`Commit::records`].
279    pub records: Vec<u8>,
280    /// The checksum computed when encoding a [`Commit`] for storage.
281    pub checksum: u32,
282}
283
284impl StoredCommit {
285    /// The range of transaction offsets contained in this commit.
286    pub fn tx_range(&self) -> Range<u64> {
287        self.min_tx_offset..self.min_tx_offset + self.n as u64
288    }
289
290    /// Attempt to read one [`StoredCommit`] from the given [`Read`]er.
291    ///
292    /// Returns `None` if the reader is already at EOF.
293    ///
294    /// Verifies the checksum of the commit. If it doesn't match, an error of
295    /// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
296    /// [`ChecksumMismatch`] is returned.
297    pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
298        Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
299    }
300
301    pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
302        let mut reader = Crc32cReader::new(reader);
303
304        let v = if log_format_version == 0 {
305            Version::V0
306        } else {
307            Version::V1
308        };
309        let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
310            return Ok(None);
311        };
312        let mut records = vec![0; hdr.len as usize];
313        reader.read_exact(&mut records)?;
314
315        let chk = reader.crc32c();
316        let crc = decode_u32(reader.into_inner())?;
317
318        if chk != crc {
319            return Err(invalid_data(ChecksumMismatch));
320        }
321
322        Ok(Some(Self {
323            min_tx_offset: hdr.min_tx_offset,
324            epoch: hdr.epoch,
325            n: hdr.n,
326            records,
327            checksum: crc,
328        }))
329    }
330
331    /// Convert `self` into an iterator yielding [`Transaction`]s.
332    ///
333    /// The supplied [`Decoder`] is responsible for extracting individual
334    /// transactions from the `records` buffer.
335    pub fn into_transactions<D: Decoder>(
336        self,
337        version: u8,
338        from_offset: u64,
339        de: &D,
340    ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
341        Commit::from(self).into_transactions(version, from_offset, de)
342    }
343}
344
345/// Numbers needed to compute [`crate::segment::Header`].
346#[derive(Clone, Debug, Eq, PartialEq)]
347pub struct Metadata {
348    pub tx_range: Range<u64>,
349    pub size_in_bytes: u64,
350    pub epoch: u64,
351}
352
353impl Metadata {
354    /// Extract the [`Metadata`] of a single [`Commit`] from the given reader.
355    ///
356    /// Note that this decodes the commit due to checksum verification.
357    /// Like [`Commit::decode`], returns `None` if the reader is at EOF already.
358    pub fn extract<R: io::Read>(reader: R) -> io::Result<Option<Self>> {
359        Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
360    }
361}
362
363impl From<Commit> for Metadata {
364    fn from(commit: Commit) -> Self {
365        Self {
366            tx_range: commit.tx_range(),
367            size_in_bytes: commit.encoded_len() as u64,
368            epoch: commit.epoch,
369        }
370    }
371}
372
373fn decode_u32<R: Read>(mut read: R) -> io::Result<u32> {
374    let mut buf = [0; 4];
375    read.read_exact(&mut buf)?;
376    Ok(u32::from_le_bytes(buf))
377}
378
379fn decode_error(e: DecodeError) -> io::Error {
380    invalid_data(e)
381}
382
383fn invalid_data<E>(e: E) -> io::Error
384where
385    E: Into<Box<dyn std::error::Error + Send + Sync>>,
386{
387    io::Error::new(io::ErrorKind::InvalidData, e)
388}
389
390#[cfg(test)]
391mod tests {
392    use std::num::NonZeroU8;
393
394    use proptest::prelude::*;
395
396    use super::*;
397    use crate::{payload::ArrayDecoder, tests::helpers::enable_logging, DEFAULT_LOG_FORMAT_VERSION};
398
399    #[test]
400    fn commit_roundtrip() {
401        let records = vec![0; 128];
402        let commit = Commit {
403            min_tx_offset: 0,
404            n: 3,
405            records,
406            epoch: Commit::DEFAULT_EPOCH,
407        };
408
409        let mut buf = Vec::with_capacity(commit.encoded_len());
410        commit.write(&mut buf).unwrap();
411        let commit2 = Commit::decode(&mut buf.as_slice()).unwrap().unwrap();
412
413        assert_eq!(commit, commit2);
414    }
415
416    #[test]
417    fn into_transactions_can_skip_txs() {
418        enable_logging();
419
420        let commit = Commit {
421            min_tx_offset: 0,
422            n: 4,
423            records: vec![0; 128],
424            epoch: Commit::DEFAULT_EPOCH,
425        };
426
427        let txs = commit
428            .into_transactions(DEFAULT_LOG_FORMAT_VERSION, 2, &ArrayDecoder::<32>)
429            .collect::<Result<Vec<_>, _>>()
430            .unwrap();
431
432        assert_eq!(
433            txs,
434            vec![
435                Transaction {
436                    offset: 2,
437                    txdata: [0u8; 32]
438                },
439                Transaction {
440                    offset: 3,
441                    txdata: [0; 32]
442                }
443            ]
444        )
445    }
446
447    proptest! {
448        #[test]
449        fn bitflip(pos in Header::LEN..512, mask in any::<NonZeroU8>()) {
450            let commit = Commit {
451                min_tx_offset: 42,
452                n: 10,
453                records: vec![1; 512],
454                epoch: Commit::DEFAULT_EPOCH,
455            };
456
457            let mut buf = Vec::with_capacity(commit.encoded_len());
458            commit.write(&mut buf).unwrap();
459
460            // Flip bit in the `records` section,
461            // so we get `ChecksumMismatch` not any other error.
462            buf[pos] ^= mask.get();
463
464            match Commit::decode(&mut buf.as_slice()) {
465                Err(e) => {
466                    assert_eq!(e.kind(), io::ErrorKind::InvalidData);
467                    e.into_inner()
468                        .unwrap()
469                        .downcast::<ChecksumMismatch>()
470                        .expect("IO inner should be checksum mismatch");
471                }
472                Ok(commit) => panic!("expected checksum mismatch, got valid commit: {commit:?}"),
473            }
474        }
475    }
476}