use bytes::Bytes;
use kimberlite_crypto::ChainHash;
use kimberlite_types::{Checkpoint, CheckpointPolicy, Offset, Timestamp};
use crate::StorageError;
pub const CHECKPOINT_PAYLOAD_SIZE: usize = 40;
pub fn serialize_checkpoint_payload(chain_hash: &ChainHash, record_count: u64) -> Bytes {
let mut buf = Vec::with_capacity(CHECKPOINT_PAYLOAD_SIZE);
buf.extend_from_slice(chain_hash.as_bytes());
buf.extend_from_slice(&record_count.to_le_bytes());
debug_assert_eq!(buf.len(), CHECKPOINT_PAYLOAD_SIZE);
Bytes::from(buf)
}
pub fn deserialize_checkpoint_payload(
payload: &Bytes,
offset: Offset,
) -> Result<(ChainHash, u64), StorageError> {
if payload.len() < CHECKPOINT_PAYLOAD_SIZE {
return Err(StorageError::InvalidCheckpointPayload {
offset,
reason: format!(
"payload too small: {} < {}",
payload.len(),
CHECKPOINT_PAYLOAD_SIZE
),
});
}
let chain_hash_bytes: [u8; 32] = payload[0..32]
.try_into()
.expect("slice is exactly 32 bytes after bounds check");
let chain_hash = ChainHash::from_bytes(&chain_hash_bytes);
let record_count = u64::from_le_bytes(
payload[32..40]
.try_into()
.expect("slice is exactly 8 bytes after bounds check"),
);
Ok((chain_hash, record_count))
}
pub fn create_checkpoint(
offset: Offset,
chain_hash: ChainHash,
record_count: u64,
timestamp: Timestamp,
) -> Checkpoint {
let hash = kimberlite_types::Hash::from_bytes(*chain_hash.as_bytes());
Checkpoint::new(offset, hash, record_count, timestamp)
}
#[derive(Debug, Clone, Default)]
pub struct CheckpointIndex {
offsets: Vec<Offset>,
}
impl CheckpointIndex {
pub fn new() -> Self {
Self {
offsets: Vec::new(),
}
}
pub fn add(&mut self, offset: Offset) {
if let Some(&last) = self.offsets.last() {
assert!(
offset > last,
"checkpoint offset must be greater than previous: {offset} <= {last}"
);
}
self.offsets.push(offset);
}
pub fn find_nearest(&self, offset: Offset) -> Option<Offset> {
match self.offsets.binary_search(&offset) {
Ok(idx) => Some(self.offsets[idx]),
Err(idx) => {
if idx == 0 {
None
} else {
Some(self.offsets[idx - 1])
}
}
}
}
pub fn len(&self) -> usize {
self.offsets.len()
}
pub fn is_empty(&self) -> bool {
self.offsets.is_empty()
}
pub fn last(&self) -> Option<Offset> {
self.offsets.last().copied()
}
pub fn iter(&self) -> impl Iterator<Item = &Offset> {
self.offsets.iter()
}
}
pub fn should_create_checkpoint(policy: &CheckpointPolicy, offset: Offset) -> bool {
policy.should_checkpoint(offset)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn checkpoint_payload_roundtrip() {
let hash = ChainHash::from_bytes(&[0xab; 32]);
let count = 12345u64;
let payload = serialize_checkpoint_payload(&hash, count);
let (recovered_hash, recovered_count) =
deserialize_checkpoint_payload(&payload, Offset::new(0)).unwrap();
assert_eq!(recovered_hash, hash);
assert_eq!(recovered_count, count);
}
#[test]
fn checkpoint_index_find_nearest() {
let mut index = CheckpointIndex::new();
index.add(Offset::new(99));
index.add(Offset::new(199));
index.add(Offset::new(299));
assert_eq!(index.find_nearest(Offset::new(199)), Some(Offset::new(199)));
assert_eq!(index.find_nearest(Offset::new(150)), Some(Offset::new(99)));
assert_eq!(index.find_nearest(Offset::new(250)), Some(Offset::new(199)));
assert_eq!(index.find_nearest(Offset::new(50)), None);
assert_eq!(index.find_nearest(Offset::new(500)), Some(Offset::new(299)));
}
#[test]
fn checkpoint_index_empty() {
let index = CheckpointIndex::new();
assert!(index.is_empty());
assert_eq!(index.find_nearest(Offset::new(100)), None);
}
}