Skip to main content

nodedb_wal/record/
calvin.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Calvin scheduler WAL payload types.
4//!
5//! Uses fixed little-endian encoding (no msgpack framing) for zero-allocation
6//! replay, consistent with other WAL payload types in this crate.
7
8use crate::error::{Result, WalError};
9
10/// Fixed size of a `CalvinApplied` payload on disk.
11///
12/// Layout: `epoch (u64 LE) | position (u32 LE) | vshard_id (u32 LE)` = 16 bytes.
13pub const CALVIN_APPLIED_PAYLOAD_SIZE: usize = 16;
14
15/// Payload for [`super::types::RecordType::CalvinApplied`].
16///
17/// Written by the Calvin executor after a `MetaOp::CalvinExecute` batch
18/// commits successfully. Encodes `{ epoch, position, vshard_id }` so the
19/// scheduler's restart path can scan the WAL and find `last_applied_epoch`
20/// for a given vshard without reading the full Raft log.
21///
22/// Fixed little-endian encoding — no msgpack framing.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct CalvinAppliedPayload {
25    /// Sequencer epoch of the applied transaction.
26    pub epoch: u64,
27    /// Zero-based position within the epoch batch.
28    pub position: u32,
29    /// The vshard that applied this transaction.
30    pub vshard_id: u32,
31}
32
33impl CalvinAppliedPayload {
34    /// Construct a new payload.
35    pub fn new(epoch: u64, position: u32, vshard_id: u32) -> Self {
36        Self {
37            epoch,
38            position,
39            vshard_id,
40        }
41    }
42
43    /// Serialize to fixed-size bytes for WAL append.
44    ///
45    /// Layout: `epoch (8 LE) | position (4 LE) | vshard_id (4 LE)`.
46    pub fn to_bytes(&self) -> [u8; CALVIN_APPLIED_PAYLOAD_SIZE] {
47        let mut buf = [0u8; CALVIN_APPLIED_PAYLOAD_SIZE];
48        buf[..8].copy_from_slice(&self.epoch.to_le_bytes());
49        buf[8..12].copy_from_slice(&self.position.to_le_bytes());
50        buf[12..16].copy_from_slice(&self.vshard_id.to_le_bytes());
51        buf
52    }
53
54    /// Deserialize from bytes read from the WAL.
55    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
56        if bytes.len() < CALVIN_APPLIED_PAYLOAD_SIZE {
57            return Err(WalError::InvalidPayload {
58                detail: format!(
59                    "CalvinApplied payload must be {CALVIN_APPLIED_PAYLOAD_SIZE} bytes, got {}",
60                    bytes.len()
61                ),
62            });
63        }
64        let epoch = u64::from_le_bytes(bytes[..8].try_into().expect(
65            "invariant: bounds-checked above (bytes.len() >= CALVIN_APPLIED_PAYLOAD_SIZE >= 8)",
66        ));
67        let position = u32::from_le_bytes(bytes[8..12].try_into().expect(
68            "invariant: bounds-checked above (bytes.len() >= CALVIN_APPLIED_PAYLOAD_SIZE >= 12)",
69        ));
70        let vshard_id = u32::from_le_bytes(bytes[12..16].try_into().expect(
71            "invariant: bounds-checked above (bytes.len() >= CALVIN_APPLIED_PAYLOAD_SIZE >= 16)",
72        ));
73        Ok(Self {
74            epoch,
75            position,
76            vshard_id,
77        })
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    #[test]
86    fn roundtrip() {
87        let p = CalvinAppliedPayload::new(42, 7, 3);
88        let bytes = p.to_bytes();
89        let decoded = CalvinAppliedPayload::from_bytes(&bytes).unwrap();
90        assert_eq!(decoded.epoch, 42);
91        assert_eq!(decoded.position, 7);
92        assert_eq!(decoded.vshard_id, 3);
93    }
94
95    #[test]
96    fn too_short_payload_returns_error() {
97        let result = CalvinAppliedPayload::from_bytes(&[0u8; 5]);
98        assert!(result.is_err());
99    }
100
101    #[test]
102    fn payload_size_is_correct() {
103        let p = CalvinAppliedPayload::new(u64::MAX, u32::MAX, u32::MAX);
104        let bytes = p.to_bytes();
105        assert_eq!(bytes.len(), CALVIN_APPLIED_PAYLOAD_SIZE);
106    }
107}