spacetimedb_commitlog/
commit.rs

1use std::{
2    io::{self, Read, Write},
3    ops::Range,
4};
5
6use crc32c::{Crc32cReader, Crc32cWriter};
7use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};
8
9use crate::{
10    error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction,
11    DEFAULT_LOG_FORMAT_VERSION,
12};
13
14#[derive(Default)]
15enum Version {
16    V0,
17    #[default]
18    V1,
19}
20
21pub struct Header {
22    pub min_tx_offset: u64,
23    pub epoch: u64,
24    pub n: u16,
25    pub len: u32,
26}
27
28impl Header {
29    pub const LEN: usize = /* offset */ 8 + /* epoch */ 8 + /* n */ 2 + /* len */  4;
30
31    /// Read [`Self::LEN`] bytes from `reader` and interpret them as the
32    /// "header" of a [`Commit`].
33    ///
34    /// Returns `None` if:
35    ///
36    /// - The reader cannot provide exactly [`Self::LEN`] bytes
37    ///
38    ///   I.e. it is at EOF
39    ///
40    /// - Or, the read bytes are all zeroes
41    ///
42    ///   This is to allow preallocation of segments.
43    ///
44    pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
45        Self::decode_v1(reader)
46    }
47
48    fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
49        use Version::*;
50        match v {
51            V0 => Self::decode_v0(reader),
52            V1 => Self::decode_v1(reader),
53        }
54    }
55
56    fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
57        let mut hdr = [0; Self::LEN - 8];
58        if let Err(e) = reader.read_exact(&mut hdr) {
59            if e.kind() == io::ErrorKind::UnexpectedEof {
60                return Ok(None);
61            }
62
63            return Err(e);
64        }
65        match &mut hdr.as_slice() {
66            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
67            buf => {
68                let min_tx_offset = buf.get_u64().map_err(decode_error)?;
69                let n = buf.get_u16().map_err(decode_error)?;
70                let len = buf.get_u32().map_err(decode_error)?;
71
72                Ok(Some(Self {
73                    min_tx_offset,
74                    epoch: Commit::DEFAULT_EPOCH,
75                    n,
76                    len,
77                }))
78            }
79        }
80    }
81
82    fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
83        let mut hdr = [0; Self::LEN];
84        if let Err(e) = reader.read_exact(&mut hdr) {
85            if e.kind() == io::ErrorKind::UnexpectedEof {
86                return Ok(None);
87            }
88
89            return Err(e);
90        }
91        match &mut hdr.as_slice() {
92            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
93            buf => {
94                let min_tx_offset = buf.get_u64().map_err(decode_error)?;
95                let epoch = buf.get_u64().map_err(decode_error)?;
96                let n = buf.get_u16().map_err(decode_error)?;
97                let len = buf.get_u32().map_err(decode_error)?;
98
99                Ok(Some(Self {
100                    min_tx_offset,
101                    epoch,
102                    n,
103                    len,
104                }))
105            }
106        }
107    }
108}
109
110/// Entry type of a [`crate::Commitlog`].
111#[derive(Clone, Debug, Default, PartialEq)]
112pub struct Commit {
113    /// The offset of the first record in this commit.
114    ///
115    /// The offset starts from zero and is counted from the beginning of the
116    /// entire log.
117    pub min_tx_offset: u64,
118    /// The epoch within which the commit was created.
119    ///
120    /// Indicates the monotonically increasing term number of the leader when
121    /// the commitlog is being written to in a distributed deployment.
122    ///
123    /// The default epoch is 0 (zero). It should be used when the log is written
124    /// to by a single process.
125    ///
126    /// Note, however, that an existing log may have a non-zero epoch.
127    /// It is currently unspecified how a commitlog is transitioned between
128    /// distributed and single-node deployment, wrt the epoch.
129    pub epoch: u64,
130    /// The number of records in the commit.
131    pub n: u16,
132    /// A buffer of all records in the commit in serialized form.
133    ///
134    /// Readers must bring their own [`crate::Decoder`] to interpret this buffer.
135    /// `n` indicates how many records the buffer contains.
136    pub records: Vec<u8>,
137}
138
139impl Commit {
140    pub const DEFAULT_EPOCH: u64 = 0;
141
142    pub const FRAMING_LEN: usize = Header::LEN + /* crc32 */ 4;
143    pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
144
145    /// The range of transaction offsets contained in this commit.
146    pub fn tx_range(&self) -> Range<u64> {
147        self.min_tx_offset..self.min_tx_offset + self.n as u64
148    }
149
150    /// Length in bytes of this commit when written to the log via [`Self::write`].
151    pub fn encoded_len(&self) -> usize {
152        Self::FRAMING_LEN + self.records.len()
153    }
154
155    /// Serialize and write `self` to `out`.
156    ///
157    /// Returns the crc32 checksum of the commit on success.
158    pub fn write<W: Write>(&self, out: W) -> io::Result<u32> {
159        let mut out = Crc32cWriter::new(out);
160
161        let min_tx_offset = self.min_tx_offset.to_le_bytes();
162        let epoch = self.epoch.to_le_bytes();
163        let n = self.n.to_le_bytes();
164        let len = (self.records.len() as u32).to_le_bytes();
165
166        out.write_all(&min_tx_offset)?;
167        out.write_all(&epoch)?;
168        out.write_all(&n)?;
169        out.write_all(&len)?;
170        out.write_all(&self.records)?;
171
172        let crc = out.crc32c();
173        let mut out = out.into_inner();
174        out.write_all(&crc.to_le_bytes())?;
175
176        Ok(crc)
177    }
178
179    /// Attempt to read one [`Commit`] from the given [`Read`]er.
180    ///
181    /// Returns `None` if the reader is already at EOF.
182    ///
183    /// Verifies the checksum of the commit. If it doesn't match, an error of
184    /// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
185    /// [`ChecksumMismatch`] is returned.
186    ///
187    /// To retain access to the checksum, use [`StoredCommit::decode`].
188    pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
189        let commit = StoredCommit::decode(reader)?;
190        Ok(commit.map(Into::into))
191    }
192
193    /// Convert `self` into an iterator yielding [`Transaction`]s.
194    ///
195    /// The supplied [`Decoder`] is responsible for extracting individual
196    /// transactions from the `records` buffer.
197    ///
198    /// `version` is the log format version of the current segment, and gets
199    /// passed to [`Decoder::decode_record`].
200    ///
201    /// `from_offset` is the transaction offset within the current commit from
202    /// which to start decoding. That is:
203    ///
204    /// * if the tx offset within the commit is smaller than `from_offset`,
205    ///   [`Decoder::skip_record`] is called.
206    ///
207    ///   The iterator does not yield a value, unless `skip_record` returns an
208    ///   error.
209    ///
210    /// * if the tx offset within the commit is greater of equal to `from_offset`,
211    ///   [`Decoder::decode_record`] is called.
212    ///
213    ///   The iterator yields the result of this call.
214    ///
215    /// * if `from_offset` doesn't fall into the current commit, the iterator
216    ///   yields nothing.
217    ///
218    pub fn into_transactions<D: Decoder>(
219        self,
220        version: u8,
221        from_offset: u64,
222        de: &D,
223    ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
224        let records = Cursor::new(self.records);
225        (self.min_tx_offset..(self.min_tx_offset + self.n as u64))
226            .scan(records, move |recs, offset| {
227                let mut cursor = &*recs;
228                let ret = if offset < from_offset {
229                    de.skip_record(version, offset, &mut cursor).err().map(Err)
230                } else {
231                    let tx = de
232                        .decode_record(version, offset, &mut cursor)
233                        .map(|txdata| Transaction { offset, txdata });
234                    Some(tx)
235                };
236
237                Some(ret)
238            })
239            .flatten()
240    }
241}
242
243impl From<StoredCommit> for Commit {
244    fn from(
245        StoredCommit {
246            min_tx_offset,
247            epoch,
248            n,
249            records,
250            checksum: _,
251        }: StoredCommit,
252    ) -> Self {
253        Self {
254            min_tx_offset,
255            epoch,
256            n,
257            records,
258        }
259    }
260}
261
262/// A [`Commit`] as stored on disk.
263///
264/// Differs from [`Commit`] only in the presence of a `checksum` field, which
265/// is computed when encoding a commit for storage.
266#[derive(Debug, PartialEq)]
267pub struct StoredCommit {
268    /// See [`Commit::min_tx_offset`].
269    pub min_tx_offset: u64,
270    /// See [`Commit::epoch`].
271    pub epoch: u64,
272    /// See [`Commit::n`].
273    pub n: u16,
274    /// See [`Commit::records`].
275    pub records: Vec<u8>,
276    /// The checksum computed when encoding a [`Commit`] for storage.
277    pub checksum: u32,
278}
279
280impl StoredCommit {
281    /// The range of transaction offsets contained in this commit.
282    pub fn tx_range(&self) -> Range<u64> {
283        self.min_tx_offset..self.min_tx_offset + self.n as u64
284    }
285
286    /// Attempt to read one [`StoredCommit`] from the given [`Read`]er.
287    ///
288    /// Returns `None` if the reader is already at EOF.
289    ///
290    /// Verifies the checksum of the commit. If it doesn't match, an error of
291    /// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
292    /// [`ChecksumMismatch`] is returned.
293    pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
294        Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
295    }
296
297    pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
298        let mut reader = Crc32cReader::new(reader);
299
300        let v = if log_format_version == 0 {
301            Version::V0
302        } else {
303            Version::V1
304        };
305        let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
306            return Ok(None);
307        };
308        let mut records = vec![0; hdr.len as usize];
309        reader.read_exact(&mut records)?;
310
311        let chk = reader.crc32c();
312        let crc = decode_u32(reader.into_inner())?;
313
314        if chk != crc {
315            return Err(invalid_data(ChecksumMismatch));
316        }
317
318        Ok(Some(Self {
319            min_tx_offset: hdr.min_tx_offset,
320            epoch: hdr.epoch,
321            n: hdr.n,
322            records,
323            checksum: crc,
324        }))
325    }
326
327    /// Convert `self` into an iterator yielding [`Transaction`]s.
328    ///
329    /// The supplied [`Decoder`] is responsible for extracting individual
330    /// transactions from the `records` buffer.
331    pub fn into_transactions<D: Decoder>(
332        self,
333        version: u8,
334        from_offset: u64,
335        de: &D,
336    ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
337        Commit::from(self).into_transactions(version, from_offset, de)
338    }
339}
340
341/// Numbers needed to compute [`crate::segment::Header`].
342#[derive(Clone, Debug, Eq, PartialEq)]
343pub struct Metadata {
344    pub tx_range: Range<u64>,
345    pub size_in_bytes: u64,
346    pub epoch: u64,
347}
348
349impl Metadata {
350    /// Extract the [`Metadata`] of a single [`Commit`] from the given reader.
351    ///
352    /// Note that this decodes the commit due to checksum verification.
353    /// Like [`Commit::decode`], returns `None` if the reader is at EOF already.
354    pub fn extract<R: io::Read>(reader: R) -> io::Result<Option<Self>> {
355        Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
356    }
357}
358
359impl From<Commit> for Metadata {
360    fn from(commit: Commit) -> Self {
361        Self {
362            tx_range: commit.tx_range(),
363            size_in_bytes: commit.encoded_len() as u64,
364            epoch: commit.epoch,
365        }
366    }
367}
368
369fn decode_u32<R: Read>(mut read: R) -> io::Result<u32> {
370    let mut buf = [0; 4];
371    read.read_exact(&mut buf)?;
372    Ok(u32::from_le_bytes(buf))
373}
374
375fn decode_error(e: DecodeError) -> io::Error {
376    invalid_data(e)
377}
378
379fn invalid_data<E>(e: E) -> io::Error
380where
381    E: Into<Box<dyn std::error::Error + Send + Sync>>,
382{
383    io::Error::new(io::ErrorKind::InvalidData, e)
384}
385
386#[cfg(test)]
387mod tests {
388    use std::num::NonZeroU8;
389
390    use proptest::prelude::*;
391
392    use super::*;
393    use crate::{payload::ArrayDecoder, tests::helpers::enable_logging, DEFAULT_LOG_FORMAT_VERSION};
394
395    #[test]
396    fn commit_roundtrip() {
397        let records = vec![0; 128];
398        let commit = Commit {
399            min_tx_offset: 0,
400            n: 3,
401            records,
402            epoch: Commit::DEFAULT_EPOCH,
403        };
404
405        let mut buf = Vec::with_capacity(commit.encoded_len());
406        commit.write(&mut buf).unwrap();
407        let commit2 = Commit::decode(&mut buf.as_slice()).unwrap().unwrap();
408
409        assert_eq!(commit, commit2);
410    }
411
412    #[test]
413    fn into_transactions_can_skip_txs() {
414        enable_logging();
415
416        let commit = Commit {
417            min_tx_offset: 0,
418            n: 4,
419            records: vec![0; 128],
420            epoch: Commit::DEFAULT_EPOCH,
421        };
422
423        let txs = commit
424            .into_transactions(DEFAULT_LOG_FORMAT_VERSION, 2, &ArrayDecoder::<32>)
425            .collect::<Result<Vec<_>, _>>()
426            .unwrap();
427
428        assert_eq!(
429            txs,
430            vec![
431                Transaction {
432                    offset: 2,
433                    txdata: [0u8; 32]
434                },
435                Transaction {
436                    offset: 3,
437                    txdata: [0; 32]
438                }
439            ]
440        )
441    }
442
443    proptest! {
444        #[test]
445        fn bitflip(pos in Header::LEN..512, mask in any::<NonZeroU8>()) {
446            let commit = Commit {
447                min_tx_offset: 42,
448                n: 10,
449                records: vec![1; 512],
450                epoch: Commit::DEFAULT_EPOCH,
451            };
452
453            let mut buf = Vec::with_capacity(commit.encoded_len());
454            commit.write(&mut buf).unwrap();
455
456            // Flip bit in the `records` section,
457            // so we get `ChecksumMismatch` not any other error.
458            buf[pos] ^= mask.get();
459
460            match Commit::decode(&mut buf.as_slice()) {
461                Err(e) => {
462                    assert_eq!(e.kind(), io::ErrorKind::InvalidData);
463                    e.into_inner()
464                        .unwrap()
465                        .downcast::<ChecksumMismatch>()
466                        .expect("IO inner should be checksum mismatch");
467                }
468                Ok(commit) => panic!("expected checksum mismatch, got valid commit: {commit:?}"),
469            }
470        }
471    }
472}