1use std::{
2 io::{self, Read, Write},
3 ops::Range,
4};
5
6use crc32c::{Crc32cReader, Crc32cWriter};
7use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};
8
9use crate::{
10 error::ChecksumMismatch,
11 payload::Decoder,
12 segment::{CHECKSUM_ALGORITHM_CRC32C, CHECKSUM_CRC32C_LEN},
13 Transaction, DEFAULT_LOG_FORMAT_VERSION,
14};
15
16#[derive(Default)]
17enum Version {
18 V0,
19 #[default]
20 V1,
21}
22
23pub struct Header {
24 pub min_tx_offset: u64,
25 pub epoch: u64,
26 pub n: u16,
27 pub len: u32,
28}
29
30impl Header {
31 pub const LEN: usize = 8 + 8 + 2 + 4;
32
33 pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
47 Self::decode_v1(reader)
48 }
49
50 fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
51 use Version::*;
52 match v {
53 V0 => Self::decode_v0(reader),
54 V1 => Self::decode_v1(reader),
55 }
56 }
57
58 fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
59 let mut hdr = [0; Self::LEN - 8];
60 if let Err(e) = reader.read_exact(&mut hdr) {
61 if e.kind() == io::ErrorKind::UnexpectedEof {
62 return Ok(None);
63 }
64
65 return Err(e);
66 }
67 match &mut hdr.as_slice() {
68 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
69 buf => {
70 let min_tx_offset = buf.get_u64().map_err(decode_error)?;
71 let n = buf.get_u16().map_err(decode_error)?;
72 let len = buf.get_u32().map_err(decode_error)?;
73
74 Ok(Some(Self {
75 min_tx_offset,
76 epoch: Commit::DEFAULT_EPOCH,
77 n,
78 len,
79 }))
80 }
81 }
82 }
83
84 fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
85 let mut hdr = [0; Self::LEN];
86 if let Err(e) = reader.read_exact(&mut hdr) {
87 if e.kind() == io::ErrorKind::UnexpectedEof {
88 return Ok(None);
89 }
90
91 return Err(e);
92 }
93 match &mut hdr.as_slice() {
94 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
95 buf => {
96 let min_tx_offset = buf.get_u64().map_err(decode_error)?;
97 let epoch = buf.get_u64().map_err(decode_error)?;
98 let n = buf.get_u16().map_err(decode_error)?;
99 let len = buf.get_u32().map_err(decode_error)?;
100
101 Ok(Some(Self {
102 min_tx_offset,
103 epoch,
104 n,
105 len,
106 }))
107 }
108 }
109 }
110}
111
112#[derive(Clone, Debug, Default, PartialEq)]
114#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
115pub struct Commit {
116 pub min_tx_offset: u64,
121 pub epoch: u64,
133 pub n: u16,
135 pub records: Vec<u8>,
140}
141
142impl Commit {
143 pub const DEFAULT_EPOCH: u64 = 0;
144
145 pub const FRAMING_LEN: usize = Header::LEN + Self::CHECKSUM_LEN;
146 pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
147 pub const CHECKSUM_LEN: usize = CHECKSUM_CRC32C_LEN;
148
149 pub fn tx_range(&self) -> Range<u64> {
151 self.min_tx_offset..self.min_tx_offset + self.n as u64
152 }
153
154 pub fn encoded_len(&self) -> usize {
156 Self::FRAMING_LEN + self.records.len()
157 }
158
159 pub fn write<W: Write>(&self, out: W) -> io::Result<u32> {
163 let mut out = Crc32cWriter::new(out);
164
165 let min_tx_offset = self.min_tx_offset.to_le_bytes();
166 let epoch = self.epoch.to_le_bytes();
167 let n = self.n.to_le_bytes();
168 let len = (self.records.len() as u32).to_le_bytes();
169
170 out.write_all(&min_tx_offset)?;
171 out.write_all(&epoch)?;
172 out.write_all(&n)?;
173 out.write_all(&len)?;
174 out.write_all(&self.records)?;
175
176 let crc = out.crc32c();
177 let mut out = out.into_inner();
178 out.write_all(&crc.to_le_bytes())?;
179
180 Ok(crc)
181 }
182
183 pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
193 let commit = StoredCommit::decode(reader)?;
194 Ok(commit.map(Into::into))
195 }
196
197 pub fn into_transactions<D: Decoder>(
223 self,
224 version: u8,
225 from_offset: u64,
226 de: &D,
227 ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
228 let records = Cursor::new(self.records);
229 (self.min_tx_offset..(self.min_tx_offset + self.n as u64))
230 .scan(records, move |recs, offset| {
231 let mut cursor = &*recs;
232 let ret = if offset < from_offset {
233 de.skip_record(version, offset, &mut cursor).err().map(Err)
234 } else {
235 let tx = de
236 .decode_record(version, offset, &mut cursor)
237 .map(|txdata| Transaction { offset, txdata });
238 Some(tx)
239 };
240
241 Some(ret)
242 })
243 .flatten()
244 }
245}
246
247impl From<StoredCommit> for Commit {
248 fn from(
249 StoredCommit {
250 min_tx_offset,
251 epoch,
252 n,
253 records,
254 checksum: _,
255 }: StoredCommit,
256 ) -> Self {
257 Self {
258 min_tx_offset,
259 epoch,
260 n,
261 records,
262 }
263 }
264}
265
266#[derive(Debug, PartialEq)]
271pub struct StoredCommit {
272 pub min_tx_offset: u64,
274 pub epoch: u64,
276 pub n: u16,
278 pub records: Vec<u8>,
280 pub checksum: u32,
282}
283
284impl StoredCommit {
285 pub fn tx_range(&self) -> Range<u64> {
287 self.min_tx_offset..self.min_tx_offset + self.n as u64
288 }
289
290 pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
298 Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
299 }
300
301 pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
302 let mut reader = Crc32cReader::new(reader);
303
304 let v = if log_format_version == 0 {
305 Version::V0
306 } else {
307 Version::V1
308 };
309 let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
310 return Ok(None);
311 };
312 let mut records = vec![0; hdr.len as usize];
313 reader.read_exact(&mut records)?;
314
315 let chk = reader.crc32c();
316 let crc = decode_u32(reader.into_inner())?;
317
318 if chk != crc {
319 return Err(invalid_data(ChecksumMismatch));
320 }
321
322 Ok(Some(Self {
323 min_tx_offset: hdr.min_tx_offset,
324 epoch: hdr.epoch,
325 n: hdr.n,
326 records,
327 checksum: crc,
328 }))
329 }
330
331 pub fn into_transactions<D: Decoder>(
336 self,
337 version: u8,
338 from_offset: u64,
339 de: &D,
340 ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
341 Commit::from(self).into_transactions(version, from_offset, de)
342 }
343}
344
345#[derive(Clone, Debug, Eq, PartialEq)]
347pub struct Metadata {
348 pub tx_range: Range<u64>,
349 pub size_in_bytes: u64,
350 pub epoch: u64,
351}
352
353impl Metadata {
354 pub fn extract<R: io::Read>(reader: R) -> io::Result<Option<Self>> {
359 Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
360 }
361}
362
363impl From<Commit> for Metadata {
364 fn from(commit: Commit) -> Self {
365 Self {
366 tx_range: commit.tx_range(),
367 size_in_bytes: commit.encoded_len() as u64,
368 epoch: commit.epoch,
369 }
370 }
371}
372
373fn decode_u32<R: Read>(mut read: R) -> io::Result<u32> {
374 let mut buf = [0; 4];
375 read.read_exact(&mut buf)?;
376 Ok(u32::from_le_bytes(buf))
377}
378
379fn decode_error(e: DecodeError) -> io::Error {
380 invalid_data(e)
381}
382
383fn invalid_data<E>(e: E) -> io::Error
384where
385 E: Into<Box<dyn std::error::Error + Send + Sync>>,
386{
387 io::Error::new(io::ErrorKind::InvalidData, e)
388}
389
390#[cfg(test)]
391mod tests {
392 use std::num::NonZeroU8;
393
394 use proptest::prelude::*;
395
396 use super::*;
397 use crate::{payload::ArrayDecoder, tests::helpers::enable_logging, DEFAULT_LOG_FORMAT_VERSION};
398
399 #[test]
400 fn commit_roundtrip() {
401 let records = vec![0; 128];
402 let commit = Commit {
403 min_tx_offset: 0,
404 n: 3,
405 records,
406 epoch: Commit::DEFAULT_EPOCH,
407 };
408
409 let mut buf = Vec::with_capacity(commit.encoded_len());
410 commit.write(&mut buf).unwrap();
411 let commit2 = Commit::decode(&mut buf.as_slice()).unwrap().unwrap();
412
413 assert_eq!(commit, commit2);
414 }
415
416 #[test]
417 fn into_transactions_can_skip_txs() {
418 enable_logging();
419
420 let commit = Commit {
421 min_tx_offset: 0,
422 n: 4,
423 records: vec![0; 128],
424 epoch: Commit::DEFAULT_EPOCH,
425 };
426
427 let txs = commit
428 .into_transactions(DEFAULT_LOG_FORMAT_VERSION, 2, &ArrayDecoder::<32>)
429 .collect::<Result<Vec<_>, _>>()
430 .unwrap();
431
432 assert_eq!(
433 txs,
434 vec![
435 Transaction {
436 offset: 2,
437 txdata: [0u8; 32]
438 },
439 Transaction {
440 offset: 3,
441 txdata: [0; 32]
442 }
443 ]
444 )
445 }
446
447 proptest! {
448 #[test]
449 fn bitflip(pos in Header::LEN..512, mask in any::<NonZeroU8>()) {
450 let commit = Commit {
451 min_tx_offset: 42,
452 n: 10,
453 records: vec![1; 512],
454 epoch: Commit::DEFAULT_EPOCH,
455 };
456
457 let mut buf = Vec::with_capacity(commit.encoded_len());
458 commit.write(&mut buf).unwrap();
459
460 buf[pos] ^= mask.get();
463
464 match Commit::decode(&mut buf.as_slice()) {
465 Err(e) => {
466 assert_eq!(e.kind(), io::ErrorKind::InvalidData);
467 e.into_inner()
468 .unwrap()
469 .downcast::<ChecksumMismatch>()
470 .expect("IO inner should be checksum mismatch");
471 }
472 Ok(commit) => panic!("expected checksum mismatch, got valid commit: {commit:?}"),
473 }
474 }
475 }
476}