Skip to main content

kimberlite_storage/
checkpoint.rs

1//! Checkpoint support for verified reads.
2//!
3//! Checkpoints are periodic verification anchors stored IN the log as
4//! [`RecordKind::Checkpoint`] records. They enable efficient verified reads
5//! by providing trusted anchor points, reducing verification from O(n) to O(k)
6//! where k is the distance to the nearest checkpoint.
7//!
8//! # Checkpoint Payload Format
9//!
10//! ```text
11//! [chain_hash:32B][record_count:u64]
12//!       32B             8B
13//! ```
14
15use bytes::Bytes;
16use kimberlite_crypto::ChainHash;
17use kimberlite_types::{Checkpoint, CheckpointPolicy, Offset, Timestamp};
18
19use crate::StorageError;
20
21/// Size of the checkpoint payload in bytes.
22pub const CHECKPOINT_PAYLOAD_SIZE: usize = 40; // 32 (hash) + 8 (count)
23
24/// Serializes checkpoint data to bytes for storage as a record payload.
25///
26/// Format: `[chain_hash:32B][record_count:u64]`
27pub fn serialize_checkpoint_payload(chain_hash: &ChainHash, record_count: u64) -> Bytes {
28    let mut buf = Vec::with_capacity(CHECKPOINT_PAYLOAD_SIZE);
29    buf.extend_from_slice(chain_hash.as_bytes());
30    buf.extend_from_slice(&record_count.to_le_bytes());
31
32    debug_assert_eq!(buf.len(), CHECKPOINT_PAYLOAD_SIZE);
33    Bytes::from(buf)
34}
35
36/// Deserializes checkpoint data from a record payload.
37///
38/// # Errors
39///
40/// Returns an error if the payload is too small.
41pub fn deserialize_checkpoint_payload(
42    payload: &Bytes,
43    offset: Offset,
44) -> Result<(ChainHash, u64), StorageError> {
45    if payload.len() < CHECKPOINT_PAYLOAD_SIZE {
46        return Err(StorageError::InvalidCheckpointPayload {
47            offset,
48            reason: format!(
49                "payload too small: {} < {}",
50                payload.len(),
51                CHECKPOINT_PAYLOAD_SIZE
52            ),
53        });
54    }
55
56    let chain_hash_bytes: [u8; 32] = payload[0..32]
57        .try_into()
58        .expect("slice is exactly 32 bytes after bounds check");
59    let chain_hash = ChainHash::from_bytes(&chain_hash_bytes);
60
61    let record_count = u64::from_le_bytes(
62        payload[32..40]
63            .try_into()
64            .expect("slice is exactly 8 bytes after bounds check"),
65    );
66
67    Ok((chain_hash, record_count))
68}
69
70/// Creates a [`Checkpoint`] from deserialized checkpoint data.
71pub fn create_checkpoint(
72    offset: Offset,
73    chain_hash: ChainHash,
74    record_count: u64,
75    timestamp: Timestamp,
76) -> Checkpoint {
77    // Convert ChainHash to kimberlite_types::Hash
78    let hash = kimberlite_types::Hash::from_bytes(*chain_hash.as_bytes());
79    Checkpoint::new(offset, hash, record_count, timestamp)
80}
81
82/// In-memory sparse index of checkpoint offsets.
83///
84/// This index is rebuilt on startup by scanning for checkpoint
85/// records. It provides O(log n) lookup for the nearest checkpoint.
86///
87/// # Invariants
88///
89/// - Offsets are stored in ascending order
90/// - Each offset corresponds to a valid checkpoint record in the log
91#[derive(Debug, Clone, Default)]
92pub struct CheckpointIndex {
93    /// Checkpoint offsets in ascending order.
94    offsets: Vec<Offset>,
95}
96
97impl CheckpointIndex {
98    /// Creates an empty checkpoint index.
99    pub fn new() -> Self {
100        Self {
101            offsets: Vec::new(),
102        }
103    }
104
105    /// Adds a checkpoint offset to the index.
106    ///
107    /// # Panics
108    ///
109    /// Panics if the offset is not greater than the last offset (checkpoints
110    /// must be added in order).
111    pub fn add(&mut self, offset: Offset) {
112        if let Some(&last) = self.offsets.last() {
113            assert!(
114                offset > last,
115                "checkpoint offset must be greater than previous: {offset} <= {last}"
116            );
117        }
118        self.offsets.push(offset);
119    }
120
121    /// Finds the nearest checkpoint at or before the given offset.
122    ///
123    /// Returns `None` if there are no checkpoints before the offset.
124    pub fn find_nearest(&self, offset: Offset) -> Option<Offset> {
125        // Binary search for the largest offset <= target
126        match self.offsets.binary_search(&offset) {
127            Ok(idx) => Some(self.offsets[idx]),
128            Err(idx) => {
129                if idx == 0 {
130                    None
131                } else {
132                    Some(self.offsets[idx - 1])
133                }
134            }
135        }
136    }
137
138    /// Returns the number of checkpoints in the index.
139    pub fn len(&self) -> usize {
140        self.offsets.len()
141    }
142
143    /// Returns true if the index is empty.
144    pub fn is_empty(&self) -> bool {
145        self.offsets.is_empty()
146    }
147
148    /// Returns the last checkpoint offset, if any.
149    pub fn last(&self) -> Option<Offset> {
150        self.offsets.last().copied()
151    }
152
153    /// Returns an iterator over checkpoint offsets.
154    pub fn iter(&self) -> impl Iterator<Item = &Offset> {
155        self.offsets.iter()
156    }
157}
158
159/// Determines if a checkpoint should be created at the given offset.
160///
161/// This is a helper that applies the checkpoint policy.
162pub fn should_create_checkpoint(policy: &CheckpointPolicy, offset: Offset) -> bool {
163    policy.should_checkpoint(offset)
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn checkpoint_payload_roundtrip() {
172        let hash = ChainHash::from_bytes(&[0xab; 32]);
173        let count = 12345u64;
174
175        let payload = serialize_checkpoint_payload(&hash, count);
176        let (recovered_hash, recovered_count) =
177            deserialize_checkpoint_payload(&payload, Offset::new(0)).unwrap();
178
179        assert_eq!(recovered_hash, hash);
180        assert_eq!(recovered_count, count);
181    }
182
183    #[test]
184    fn checkpoint_index_find_nearest() {
185        let mut index = CheckpointIndex::new();
186        index.add(Offset::new(99));
187        index.add(Offset::new(199));
188        index.add(Offset::new(299));
189
190        // Exact match
191        assert_eq!(index.find_nearest(Offset::new(199)), Some(Offset::new(199)));
192
193        // Between checkpoints
194        assert_eq!(index.find_nearest(Offset::new(150)), Some(Offset::new(99)));
195        assert_eq!(index.find_nearest(Offset::new(250)), Some(Offset::new(199)));
196
197        // Before first checkpoint
198        assert_eq!(index.find_nearest(Offset::new(50)), None);
199
200        // After last checkpoint
201        assert_eq!(index.find_nearest(Offset::new(500)), Some(Offset::new(299)));
202    }
203
204    #[test]
205    fn checkpoint_index_empty() {
206        let index = CheckpointIndex::new();
207        assert!(index.is_empty());
208        assert_eq!(index.find_nearest(Offset::new(100)), None);
209    }
210}