kimberlite_storage/
checkpoint.rs1use bytes::Bytes;
16use kimberlite_crypto::ChainHash;
17use kimberlite_types::{Checkpoint, CheckpointPolicy, Offset, Timestamp};
18
19use crate::StorageError;
20
21pub const CHECKPOINT_PAYLOAD_SIZE: usize = 40; pub fn serialize_checkpoint_payload(chain_hash: &ChainHash, record_count: u64) -> Bytes {
28 let mut buf = Vec::with_capacity(CHECKPOINT_PAYLOAD_SIZE);
29 buf.extend_from_slice(chain_hash.as_bytes());
30 buf.extend_from_slice(&record_count.to_le_bytes());
31
32 debug_assert_eq!(buf.len(), CHECKPOINT_PAYLOAD_SIZE);
33 Bytes::from(buf)
34}
35
36pub fn deserialize_checkpoint_payload(
42 payload: &Bytes,
43 offset: Offset,
44) -> Result<(ChainHash, u64), StorageError> {
45 if payload.len() < CHECKPOINT_PAYLOAD_SIZE {
46 return Err(StorageError::InvalidCheckpointPayload {
47 offset,
48 reason: format!(
49 "payload too small: {} < {}",
50 payload.len(),
51 CHECKPOINT_PAYLOAD_SIZE
52 ),
53 });
54 }
55
56 let chain_hash_bytes: [u8; 32] = payload[0..32]
57 .try_into()
58 .expect("slice is exactly 32 bytes after bounds check");
59 let chain_hash = ChainHash::from_bytes(&chain_hash_bytes);
60
61 let record_count = u64::from_le_bytes(
62 payload[32..40]
63 .try_into()
64 .expect("slice is exactly 8 bytes after bounds check"),
65 );
66
67 Ok((chain_hash, record_count))
68}
69
70pub fn create_checkpoint(
72 offset: Offset,
73 chain_hash: ChainHash,
74 record_count: u64,
75 timestamp: Timestamp,
76) -> Checkpoint {
77 let hash = kimberlite_types::Hash::from_bytes(*chain_hash.as_bytes());
79 Checkpoint::new(offset, hash, record_count, timestamp)
80}
81
82#[derive(Debug, Clone, Default)]
92pub struct CheckpointIndex {
93 offsets: Vec<Offset>,
95}
96
97impl CheckpointIndex {
98 pub fn new() -> Self {
100 Self {
101 offsets: Vec::new(),
102 }
103 }
104
105 pub fn add(&mut self, offset: Offset) {
112 if let Some(&last) = self.offsets.last() {
113 assert!(
114 offset > last,
115 "checkpoint offset must be greater than previous: {offset} <= {last}"
116 );
117 }
118 self.offsets.push(offset);
119 }
120
121 pub fn find_nearest(&self, offset: Offset) -> Option<Offset> {
125 match self.offsets.binary_search(&offset) {
127 Ok(idx) => Some(self.offsets[idx]),
128 Err(idx) => {
129 if idx == 0 {
130 None
131 } else {
132 Some(self.offsets[idx - 1])
133 }
134 }
135 }
136 }
137
138 pub fn len(&self) -> usize {
140 self.offsets.len()
141 }
142
143 pub fn is_empty(&self) -> bool {
145 self.offsets.is_empty()
146 }
147
148 pub fn last(&self) -> Option<Offset> {
150 self.offsets.last().copied()
151 }
152
153 pub fn iter(&self) -> impl Iterator<Item = &Offset> {
155 self.offsets.iter()
156 }
157}
158
159pub fn should_create_checkpoint(policy: &CheckpointPolicy, offset: Offset) -> bool {
163 policy.should_checkpoint(offset)
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn checkpoint_payload_roundtrip() {
172 let hash = ChainHash::from_bytes(&[0xab; 32]);
173 let count = 12345u64;
174
175 let payload = serialize_checkpoint_payload(&hash, count);
176 let (recovered_hash, recovered_count) =
177 deserialize_checkpoint_payload(&payload, Offset::new(0)).unwrap();
178
179 assert_eq!(recovered_hash, hash);
180 assert_eq!(recovered_count, count);
181 }
182
183 #[test]
184 fn checkpoint_index_find_nearest() {
185 let mut index = CheckpointIndex::new();
186 index.add(Offset::new(99));
187 index.add(Offset::new(199));
188 index.add(Offset::new(299));
189
190 assert_eq!(index.find_nearest(Offset::new(199)), Some(Offset::new(199)));
192
193 assert_eq!(index.find_nearest(Offset::new(150)), Some(Offset::new(99)));
195 assert_eq!(index.find_nearest(Offset::new(250)), Some(Offset::new(199)));
196
197 assert_eq!(index.find_nearest(Offset::new(50)), None);
199
200 assert_eq!(index.find_nearest(Offset::new(500)), Some(Offset::new(299)));
202 }
203
204 #[test]
205 fn checkpoint_index_empty() {
206 let index = CheckpointIndex::new();
207 assert!(index.is_empty());
208 assert_eq!(index.find_nearest(Offset::new(100)), None);
209 }
210}