1use std::{
2 io::{self, Read, Write},
3 ops::Range,
4};
5
6use crc32c::{Crc32cReader, Crc32cWriter};
7use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};
8
9use crate::{
10 error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction,
11 DEFAULT_LOG_FORMAT_VERSION,
12};
13
14#[derive(Default)]
15enum Version {
16 V0,
17 #[default]
18 V1,
19}
20
21pub struct Header {
22 pub min_tx_offset: u64,
23 pub epoch: u64,
24 pub n: u16,
25 pub len: u32,
26}
27
28impl Header {
29 pub const LEN: usize = 8 + 8 + 2 + 4;
30
31 pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
45 Self::decode_v1(reader)
46 }
47
48 fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
49 use Version::*;
50 match v {
51 V0 => Self::decode_v0(reader),
52 V1 => Self::decode_v1(reader),
53 }
54 }
55
56 fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
57 let mut hdr = [0; Self::LEN - 8];
58 if let Err(e) = reader.read_exact(&mut hdr) {
59 if e.kind() == io::ErrorKind::UnexpectedEof {
60 return Ok(None);
61 }
62
63 return Err(e);
64 }
65 match &mut hdr.as_slice() {
66 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
67 buf => {
68 let min_tx_offset = buf.get_u64().map_err(decode_error)?;
69 let n = buf.get_u16().map_err(decode_error)?;
70 let len = buf.get_u32().map_err(decode_error)?;
71
72 Ok(Some(Self {
73 min_tx_offset,
74 epoch: Commit::DEFAULT_EPOCH,
75 n,
76 len,
77 }))
78 }
79 }
80 }
81
82 fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
83 let mut hdr = [0; Self::LEN];
84 if let Err(e) = reader.read_exact(&mut hdr) {
85 if e.kind() == io::ErrorKind::UnexpectedEof {
86 return Ok(None);
87 }
88
89 return Err(e);
90 }
91 match &mut hdr.as_slice() {
92 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
93 buf => {
94 let min_tx_offset = buf.get_u64().map_err(decode_error)?;
95 let epoch = buf.get_u64().map_err(decode_error)?;
96 let n = buf.get_u16().map_err(decode_error)?;
97 let len = buf.get_u32().map_err(decode_error)?;
98
99 Ok(Some(Self {
100 min_tx_offset,
101 epoch,
102 n,
103 len,
104 }))
105 }
106 }
107 }
108}
109
110#[derive(Clone, Debug, Default, PartialEq)]
112pub struct Commit {
113 pub min_tx_offset: u64,
118 pub epoch: u64,
130 pub n: u16,
132 pub records: Vec<u8>,
137}
138
139impl Commit {
140 pub const DEFAULT_EPOCH: u64 = 0;
141
142 pub const FRAMING_LEN: usize = Header::LEN + 4;
143 pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
144
145 pub fn tx_range(&self) -> Range<u64> {
147 self.min_tx_offset..self.min_tx_offset + self.n as u64
148 }
149
150 pub fn encoded_len(&self) -> usize {
152 Self::FRAMING_LEN + self.records.len()
153 }
154
155 pub fn write<W: Write>(&self, out: W) -> io::Result<u32> {
159 let mut out = Crc32cWriter::new(out);
160
161 let min_tx_offset = self.min_tx_offset.to_le_bytes();
162 let epoch = self.epoch.to_le_bytes();
163 let n = self.n.to_le_bytes();
164 let len = (self.records.len() as u32).to_le_bytes();
165
166 out.write_all(&min_tx_offset)?;
167 out.write_all(&epoch)?;
168 out.write_all(&n)?;
169 out.write_all(&len)?;
170 out.write_all(&self.records)?;
171
172 let crc = out.crc32c();
173 let mut out = out.into_inner();
174 out.write_all(&crc.to_le_bytes())?;
175
176 Ok(crc)
177 }
178
179 pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
189 let commit = StoredCommit::decode(reader)?;
190 Ok(commit.map(Into::into))
191 }
192
193 pub fn into_transactions<D: Decoder>(
219 self,
220 version: u8,
221 from_offset: u64,
222 de: &D,
223 ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
224 let records = Cursor::new(self.records);
225 (self.min_tx_offset..(self.min_tx_offset + self.n as u64))
226 .scan(records, move |recs, offset| {
227 let mut cursor = &*recs;
228 let ret = if offset < from_offset {
229 de.skip_record(version, offset, &mut cursor).err().map(Err)
230 } else {
231 let tx = de
232 .decode_record(version, offset, &mut cursor)
233 .map(|txdata| Transaction { offset, txdata });
234 Some(tx)
235 };
236
237 Some(ret)
238 })
239 .flatten()
240 }
241}
242
243impl From<StoredCommit> for Commit {
244 fn from(
245 StoredCommit {
246 min_tx_offset,
247 epoch,
248 n,
249 records,
250 checksum: _,
251 }: StoredCommit,
252 ) -> Self {
253 Self {
254 min_tx_offset,
255 epoch,
256 n,
257 records,
258 }
259 }
260}
261
262#[derive(Debug, PartialEq)]
267pub struct StoredCommit {
268 pub min_tx_offset: u64,
270 pub epoch: u64,
272 pub n: u16,
274 pub records: Vec<u8>,
276 pub checksum: u32,
278}
279
280impl StoredCommit {
281 pub fn tx_range(&self) -> Range<u64> {
283 self.min_tx_offset..self.min_tx_offset + self.n as u64
284 }
285
286 pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
294 Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
295 }
296
297 pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
298 let mut reader = Crc32cReader::new(reader);
299
300 let v = if log_format_version == 0 {
301 Version::V0
302 } else {
303 Version::V1
304 };
305 let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
306 return Ok(None);
307 };
308 let mut records = vec![0; hdr.len as usize];
309 reader.read_exact(&mut records)?;
310
311 let chk = reader.crc32c();
312 let crc = decode_u32(reader.into_inner())?;
313
314 if chk != crc {
315 return Err(invalid_data(ChecksumMismatch));
316 }
317
318 Ok(Some(Self {
319 min_tx_offset: hdr.min_tx_offset,
320 epoch: hdr.epoch,
321 n: hdr.n,
322 records,
323 checksum: crc,
324 }))
325 }
326
327 pub fn into_transactions<D: Decoder>(
332 self,
333 version: u8,
334 from_offset: u64,
335 de: &D,
336 ) -> impl Iterator<Item = Result<Transaction<D::Record>, D::Error>> + '_ {
337 Commit::from(self).into_transactions(version, from_offset, de)
338 }
339}
340
341#[derive(Clone, Debug, Eq, PartialEq)]
343pub struct Metadata {
344 pub tx_range: Range<u64>,
345 pub size_in_bytes: u64,
346 pub epoch: u64,
347}
348
349impl Metadata {
350 pub fn extract<R: io::Read>(reader: R) -> io::Result<Option<Self>> {
355 Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
356 }
357}
358
359impl From<Commit> for Metadata {
360 fn from(commit: Commit) -> Self {
361 Self {
362 tx_range: commit.tx_range(),
363 size_in_bytes: commit.encoded_len() as u64,
364 epoch: commit.epoch,
365 }
366 }
367}
368
369fn decode_u32<R: Read>(mut read: R) -> io::Result<u32> {
370 let mut buf = [0; 4];
371 read.read_exact(&mut buf)?;
372 Ok(u32::from_le_bytes(buf))
373}
374
375fn decode_error(e: DecodeError) -> io::Error {
376 invalid_data(e)
377}
378
379fn invalid_data<E>(e: E) -> io::Error
380where
381 E: Into<Box<dyn std::error::Error + Send + Sync>>,
382{
383 io::Error::new(io::ErrorKind::InvalidData, e)
384}
385
386#[cfg(test)]
387mod tests {
388 use std::num::NonZeroU8;
389
390 use proptest::prelude::*;
391
392 use super::*;
393 use crate::{payload::ArrayDecoder, tests::helpers::enable_logging, DEFAULT_LOG_FORMAT_VERSION};
394
395 #[test]
396 fn commit_roundtrip() {
397 let records = vec![0; 128];
398 let commit = Commit {
399 min_tx_offset: 0,
400 n: 3,
401 records,
402 epoch: Commit::DEFAULT_EPOCH,
403 };
404
405 let mut buf = Vec::with_capacity(commit.encoded_len());
406 commit.write(&mut buf).unwrap();
407 let commit2 = Commit::decode(&mut buf.as_slice()).unwrap().unwrap();
408
409 assert_eq!(commit, commit2);
410 }
411
412 #[test]
413 fn into_transactions_can_skip_txs() {
414 enable_logging();
415
416 let commit = Commit {
417 min_tx_offset: 0,
418 n: 4,
419 records: vec![0; 128],
420 epoch: Commit::DEFAULT_EPOCH,
421 };
422
423 let txs = commit
424 .into_transactions(DEFAULT_LOG_FORMAT_VERSION, 2, &ArrayDecoder::<32>)
425 .collect::<Result<Vec<_>, _>>()
426 .unwrap();
427
428 assert_eq!(
429 txs,
430 vec![
431 Transaction {
432 offset: 2,
433 txdata: [0u8; 32]
434 },
435 Transaction {
436 offset: 3,
437 txdata: [0; 32]
438 }
439 ]
440 )
441 }
442
443 proptest! {
444 #[test]
445 fn bitflip(pos in Header::LEN..512, mask in any::<NonZeroU8>()) {
446 let commit = Commit {
447 min_tx_offset: 42,
448 n: 10,
449 records: vec![1; 512],
450 epoch: Commit::DEFAULT_EPOCH,
451 };
452
453 let mut buf = Vec::with_capacity(commit.encoded_len());
454 commit.write(&mut buf).unwrap();
455
456 buf[pos] ^= mask.get();
459
460 match Commit::decode(&mut buf.as_slice()) {
461 Err(e) => {
462 assert_eq!(e.kind(), io::ErrorKind::InvalidData);
463 e.into_inner()
464 .unwrap()
465 .downcast::<ChecksumMismatch>()
466 .expect("IO inner should be checksum mismatch");
467 }
468 Ok(commit) => panic!("expected checksum mismatch, got valid commit: {commit:?}"),
469 }
470 }
471 }
472}