batpak 0.8.2

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
pub(crate) mod id;
pub(crate) mod scan;
pub(crate) mod sidx;

pub(crate) use id::SegmentId;

use crate::event::Event;
use crate::store::{EncodedBytes, ExtensionKey, StoreError};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::io::{Read, Seek, SeekFrom, Write};
// serde(with) resolves via string path — no explicit wire import needed.

/// Segment file format: magic(4) + header_len(4 BE) + header(msgpack) + frames
/// Frame: \[len:u32 BE\]\[crc32:u32 BE\]\[msgpack\]
/// Files named: {segment_id:06}.fbat. Sequential u64.
pub const SEGMENT_MAGIC: &[u8; 4] = b"FBAT";
/// File extension used for all segment files (without the leading dot).
pub const SEGMENT_EXTENSION: &str = "fbat";

/// Maximum allowed frame payload size in bytes. Frames claiming a payload
/// larger than this are rejected as corrupt before allocation, preventing
/// a malicious or corrupt segment file from causing unbounded memory use.
pub(crate) const MAX_FRAME_PAYLOAD: usize = 256 * 1024 * 1024;

/// Segment file header, serialized as MessagePack after the magic bytes.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SegmentHeader {
    /// Segment format version number.
    pub version: u16,
    /// Reserved flags field; currently always 0.
    pub flags: u16,
    /// Nanoseconds since Unix epoch when this segment was created.
    pub created_ns: i64,
    /// Numeric identifier of this segment file.
    pub segment_id: u64,
}

/// FramePayload: what gets serialized into each frame.
/// entity and scope are stored as strings (not Coordinate) because segments
/// are the persistence layer — they don't depend on the Coordinate type.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct FramePayload<P> {
    /// The event data stored in this frame.
    pub event: Event<P>,
    /// Entity name string (e.g. `"entity:42"`).
    pub entity: String,
    /// Scope name string (e.g. `"profile"`).
    pub scope: String,
    /// Opaque receipt extension bytes committed with this frame.
    #[serde(default)]
    pub receipt_extensions: BTreeMap<ExtensionKey, EncodedBytes>,
}

#[derive(Serialize)]
pub(crate) struct FramePayloadRef<'a, P> {
    pub event: &'a Event<P>,
    pub entity: &'a str,
    pub scope: &'a str,
    pub receipt_extensions: &'a BTreeMap<ExtensionKey, EncodedBytes>,
}

/// Typestate marker for an active (writable) segment.
pub struct Active;
/// Typestate marker for a sealed (read-only) segment.
pub struct Sealed;
/// A segment file handle parameterized by its lifecycle state (`Active` or `Sealed`).
pub struct Segment<State> {
    /// Parsed header of this segment file.
    pub header: SegmentHeader,
    /// Filesystem path to the segment file.
    pub path: std::path::PathBuf,
    file: Option<std::fs::File>,
    written_bytes: u64,
    _state: std::marker::PhantomData<State>,
}

/// Outcome of a compaction run: whether work happened, was skipped because
/// the sealed-segment count was below the configured threshold, or failed
/// inside the swap-point protocol. A `Failed` result guarantees the live
/// index was NOT mutated — the F6 / FREEZE-4 contract routes rebuild
/// failures here so callers can distinguish "did nothing" from "tried and
/// failed" from "did compact" without clobbering the reader-visible state.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum CompactionOutcome {
    /// Compaction merged and replaced sealed segments.
    Performed,
    /// Compaction was a no-op: sealed-segment count was below `min_segments`.
    Skipped,
    /// The compact-swap protocol aborted before the swap point; the live
    /// index is unchanged. `reason` describes which step failed (off-side
    /// rebuild error, disk-side scan error, etc.). See
    /// `src/store/lifecycle.rs::compact` and
    /// `StoreIndex::replace_contents_from_fresh` for the swap invariants.
    Failed {
        /// Human-readable description of which swap-point step aborted.
        reason: String,
    },
}

/// Result returned by a compaction run.
#[derive(Debug)]
pub struct CompactionResult {
    /// Whether the compaction actually ran, was skipped, or failed.
    pub outcome: CompactionOutcome,
    /// Number of sealed segment files that were merged and removed. Always
    /// `0` for [`CompactionOutcome::Skipped`] and
    /// [`CompactionOutcome::Failed`].
    pub segments_removed: usize,
    /// Total bytes freed by removing the merged segment files. Always `0`
    /// for [`CompactionOutcome::Skipped`] and
    /// [`CompactionOutcome::Failed`].
    pub bytes_reclaimed: u64,
}

/// frame_encode: serialize data to msgpack, wrap in \[len:u32 BE\]\[crc32:u32 BE\]\[msgpack\]
/// Segment payloads are always encoded with the canonical named-field MessagePack helper.
/// \[DEP:crate::encoding::to_bytes\] -> `Result<Vec<u8>, encode::Error>`
/// \[DEP:crc32fast::hash\] → u32
///
/// # Errors
/// Returns `StoreError::Serialization` if the data cannot be serialized to MessagePack.
pub fn frame_encode<T: serde::Serialize>(data: &T) -> Result<Vec<u8>, StoreError> {
    let msgpack =
        crate::encoding::to_bytes(data).map_err(|e| StoreError::Serialization(Box::new(e)))?;
    let crc = crc32fast::hash(&msgpack);
    let len = u32::try_from(msgpack.len()).map_err(|_| StoreError::ser_msg("frame exceeds 4GB"))?;

    let mut frame = Vec::with_capacity(8 + msgpack.len());
    frame.extend_from_slice(&len.to_be_bytes());
    frame.extend_from_slice(&crc.to_be_bytes());
    frame.extend_from_slice(&msgpack);
    Ok(frame)
}

/// Error from frame_decode. Does not include segment_id — the caller
/// wraps this with the correct segment context.
#[derive(Debug)]
#[non_exhaustive]
pub enum FrameDecodeError {
    /// The buffer is shorter than the minimum 8-byte frame header.
    TooShort,
    /// The buffer ends before the full frame payload is available.
    Truncated {
        /// Total bytes expected for the complete frame (header + payload).
        expected_len: usize,
        /// Bytes actually available in the buffer.
        available: usize,
    },
    /// The CRC32 checksum in the frame header did not match the payload.
    CrcMismatch {
        /// CRC value stored in the frame header.
        expected: u32,
        /// CRC value computed from the actual payload bytes.
        actual: u32,
    },
}

impl std::fmt::Display for FrameDecodeError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::TooShort => write!(f, "frame too short for header"),
            Self::Truncated {
                expected_len,
                available,
            } => {
                write!(
                    f,
                    "frame truncated: expected {expected_len} bytes, got {available}"
                )
            }
            Self::CrcMismatch { expected, actual } => {
                write!(
                    f,
                    "CRC mismatch: expected {expected:#010x}, got {actual:#010x}"
                )
            }
        }
    }
}

impl std::error::Error for FrameDecodeError {}

/// frame_decode: read \[len\]\[crc\]\[msgpack\], verify CRC, return msgpack bytes.
/// Returns (msgpack_bytes, total_frame_size_consumed).
///
/// # Errors
/// Returns `FrameDecodeError::TooShort` if the buffer is under 8 bytes.
/// Returns `FrameDecodeError::Truncated` if the buffer ends before the full frame payload.
/// Returns `FrameDecodeError::CrcMismatch` if the checksum does not match the payload.
pub fn frame_decode(buf: &[u8]) -> Result<(&[u8], usize), FrameDecodeError> {
    if buf.len() < 8 {
        return Err(FrameDecodeError::TooShort);
    }
    let len = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
    let expected_crc = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);

    // A6: explicit bounds check before slicing. `8 + len` can overflow
    // `usize` on 32-bit targets (where usize is 32-bit and a u32 payload
    // length can approach usize::MAX); the checked_add guards against
    // that case so we never index past the end of the buffer.
    let expected_len = 8usize.checked_add(len).ok_or(FrameDecodeError::Truncated {
        expected_len: usize::MAX,
        available: buf.len(),
    })?;
    if buf.len() < expected_len {
        return Err(FrameDecodeError::Truncated {
            expected_len,
            available: buf.len(),
        });
    }
    let msgpack = &buf[8..expected_len];
    let actual_crc = crc32fast::hash(msgpack);
    if actual_crc != expected_crc {
        return Err(FrameDecodeError::CrcMismatch {
            expected: expected_crc,
            actual: actual_crc,
        });
    }
    Ok((msgpack, expected_len))
}

/// Segment naming helper.
pub fn segment_filename(segment_id: u64) -> String {
    format!("{:06}.{}", segment_id, SEGMENT_EXTENSION)
}

impl Segment<Active> {
    /// Create new active segment.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if the segment file cannot be created or the header cannot be written.
    /// Returns `StoreError::Serialization` if the segment header cannot be serialized.
    pub fn create_with_created_ns(
        dir: &std::path::Path,
        segment_id: u64,
        created_ns: i64,
    ) -> Result<Self, StoreError> {
        let path = dir.join(segment_filename(segment_id));
        let mut file = crate::store::platform::fs::create_new_file(&path)?;

        let header = SegmentHeader {
            version: 1,
            flags: 0,
            created_ns,
            segment_id,
        };

        // Write magic + header_len(u32 BE) + header(msgpack)
        file.write_all(SEGMENT_MAGIC).map_err(StoreError::Io)?;
        let header_bytes = crate::encoding::to_bytes(&header)
            .map_err(|e| StoreError::Serialization(Box::new(e)))?;
        let header_len = u32::try_from(header_bytes.len())
            .map_err(|_| StoreError::ser_msg("segment header length exceeds u32::MAX"))?
            .to_be_bytes();
        file.write_all(&header_len).map_err(StoreError::Io)?;
        file.write_all(&header_bytes).map_err(StoreError::Io)?;

        Ok(Self {
            header,
            path,
            file: Some(file),
            written_bytes: (4 + 4 + header_bytes.len()) as u64, // magic + len + header
            _state: std::marker::PhantomData,
        })
    }

    /// Write a frame. Returns offset where frame starts.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if writing to the segment file fails.
    pub fn write_frame(&mut self, frame: &[u8]) -> Result<u64, StoreError> {
        let offset = self.written_bytes;
        if let Some(ref mut f) = self.file {
            f.write_all(frame).map_err(StoreError::Io)?;
        }
        self.written_bytes += frame.len() as u64;
        Ok(offset)
    }

    /// Append all frame bytes from an existing segment file, skipping that file's header.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if the source file cannot be read or frames cannot be written.
    /// Returns `StoreError::Corrupt` if the source file does not begin with the expected magic bytes.
    pub fn append_frames_from_segment(
        &mut self,
        path: &std::path::Path,
    ) -> Result<u64, StoreError> {
        let mut source = crate::store::platform::fs::open_file(path).map_err(StoreError::Io)?;
        let mut magic = [0u8; 4];
        source.read_exact(&mut magic).map_err(StoreError::Io)?;
        if &magic != SEGMENT_MAGIC {
            return Err(StoreError::corrupt_magic(0));
        }

        let mut header_len_buf = [0u8; 4];
        source
            .read_exact(&mut header_len_buf)
            .map_err(StoreError::Io)?;
        let header_len = u32::from_be_bytes(header_len_buf) as u64;
        let frames_start = 8 + header_len;

        // Determine where frames end: if the segment has a SIDX footer,
        // the frames stop at string_table_offset. Otherwise, frames extend
        // to the end of the file.
        let file_len = source.seek(SeekFrom::End(0)).map_err(StoreError::Io)?;
        let frames_end = detect_sidx_boundary(&mut source, file_len)?.unwrap_or(file_len);

        source
            .seek(SeekFrom::Start(frames_start))
            .map_err(StoreError::Io)?;

        let offset = self.written_bytes;
        if let Some(ref mut destination) = self.file {
            let bytes_to_copy = frames_end.saturating_sub(frames_start);
            let copied = std::io::copy(&mut source.take(bytes_to_copy), destination)
                .map_err(StoreError::Io)?;
            self.written_bytes += copied;
        }
        Ok(offset)
    }

    /// Returns `true` if the segment has reached or exceeded `max_bytes` and should be rotated.
    pub(crate) fn needs_rotation(&self, max_bytes: u64) -> bool {
        self.written_bytes >= max_bytes
    }

    /// Flush the segment file to durable storage using the specified sync mode.
    ///
    /// # Errors
    /// Returns `StoreError::Io` if the OS-level sync call fails.
    pub fn sync_with_mode(&mut self, mode: &crate::store::SyncMode) -> Result<(), StoreError> {
        if let Some(ref f) = self.file {
            crate::store::platform::sync::sync_file_with_mode(f, mode)?;
        }
        Ok(())
    }

    /// Write a SIDX footer to the end of this segment before sealing.
    /// The footer enables fast cold-start index rebuild by storing compact
    /// binary entries instead of requiring full msgpack frame deserialization.
    ///
    /// # Errors
    /// Returns `StoreError::Io`, `StoreError::Serialization`, or
    /// `StoreError::SegmentTooManyEntries` if writing fails.
    pub(crate) fn write_sidx_footer(
        &mut self,
        collector: &crate::store::segment::sidx::SidxEntryCollector,
    ) -> Result<(), StoreError> {
        if let Some(ref mut f) = self.file {
            collector.write_footer(f, self.header.segment_id)?;
        }
        Ok(())
    }

    /// Seal: close file handle, transition to Sealed.
    pub fn seal(mut self) -> Segment<Sealed> {
        drop(self.file.take());
        Segment {
            header: self.header,
            path: self.path,
            file: None,
            written_bytes: self.written_bytes,
            _state: std::marker::PhantomData,
        }
    }
}

/// Check whether a segment file ends with a SIDX footer.
/// If so, return the byte offset where the string table starts (= end of frames).
/// If not, return `None` (frames extend to EOF).
pub(crate) fn detect_sidx_boundary<R: Read + Seek>(
    source: &mut R,
    file_len: u64,
) -> Result<Option<u64>, StoreError> {
    // SIDX trailer is the last 16 bytes: [string_table_offset:u64 LE][entry_count:u32 LE][magic:4]
    const TRAILER_LEN: u64 = 16;
    if file_len < TRAILER_LEN {
        return Ok(None);
    }
    source
        .seek(SeekFrom::End(-(TRAILER_LEN as i64)))
        .map_err(StoreError::Io)?;
    let mut trailer = [0u8; 16];
    source.read_exact(&mut trailer).map_err(StoreError::Io)?;

    // Check magic at bytes 12..16
    if &trailer[12..16] != crate::store::segment::sidx::SIDX_MAGIC {
        return Ok(None);
    }
    // string_table_offset at bytes 0..8
    let string_table_offset = u64::from_le_bytes([
        trailer[0], trailer[1], trailer[2], trailer[3], trailer[4], trailer[5], trailer[6],
        trailer[7],
    ]);
    Ok(Some(string_table_offset))
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[test]
    fn needs_rotation_tracks_written_bytes_threshold() {
        let dir = TempDir::new().expect("tmpdir");
        let mut segment: Segment<Active> =
            Segment::create_with_created_ns(dir.path(), 1, 0).expect("create segment");
        let frame = frame_encode(&serde_json::json!({"payload": "rotation-threshold"}))
            .expect("encode frame");

        assert!(
            !segment.needs_rotation(1024),
            "PROPERTY: a fresh segment must not report rotation before any frames are written"
        );

        segment.write_frame(&frame).expect("write frame");

        assert!(
            segment.needs_rotation(1),
            "PROPERTY: needs_rotation(max_bytes=1) must flip true after any real frame write"
        );
        assert!(
            !segment.needs_rotation(1024),
            "PROPERTY: needs_rotation must stay false below the threshold"
        );
    }
}