use crate::error::{Error, Result};
use crate::merkle::Hash;
use crate::tile::{Tile, recompute_root, tiles_for_size};
use std::collections::BTreeMap;
use std::ops::Range;
pub use crate::coniks::Namespace;
const DEDUP_CONTENT_CONTEXT: &str = "metamorphic-log/ingest-dedup-content/v1";
const DEDUP_TOKEN_CONTEXT: &str = "metamorphic-log/ingest-dedup-token/v1";
fn push_lp(out: &mut Vec<u8>, bytes: &[u8]) {
out.extend_from_slice(&(bytes.len() as u32).to_be_bytes());
out.extend_from_slice(bytes);
}
#[derive(Debug, Clone, Default)]
pub struct Sequencer {
next: BTreeMap<String, u64>,
}
impl Sequencer {
#[must_use]
pub fn new() -> Self {
Self {
next: BTreeMap::new(),
}
}
#[must_use]
pub fn peek(&self, namespace: &Namespace) -> u64 {
self.next.get(namespace.as_str()).copied().unwrap_or(0)
}
pub fn next(&mut self, namespace: &Namespace) -> u64 {
let slot = self.next.entry(namespace.as_str().to_string()).or_insert(0);
let assigned = *slot;
*slot = assigned
.checked_add(1)
.expect("sequence position overflowed u64");
assigned
}
pub fn reserve(&mut self, namespace: &Namespace, count: u64) -> Result<Range<u64>> {
let slot = self.next.entry(namespace.as_str().to_string()).or_insert(0);
let start = *slot;
let end = start
.checked_add(count)
.ok_or_else(|| Error::SequenceOverflow {
namespace: namespace.as_str().to_string(),
})?;
*slot = end;
Ok(start..end)
}
pub fn resume_from(&mut self, namespace: &Namespace, next: u64) -> Result<()> {
let current = self.peek(namespace);
if next < current {
return Err(Error::SequenceRegression {
namespace: namespace.as_str().to_string(),
current,
requested: next,
});
}
self.next.insert(namespace.as_str().to_string(), next);
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DedupKey([u8; 64]);
impl DedupKey {
#[must_use]
pub fn from_record(namespace: &Namespace, payload: &[u8]) -> Self {
Self::derive(DEDUP_CONTENT_CONTEXT, namespace, payload)
}
#[must_use]
pub fn from_token(namespace: &Namespace, token: &[u8]) -> Self {
Self::derive(DEDUP_TOKEN_CONTEXT, namespace, token)
}
fn derive(context: &str, namespace: &Namespace, payload: &[u8]) -> Self {
let mut input = Vec::with_capacity(8 + namespace.as_str().len() + payload.len());
push_lp(&mut input, namespace.as_str().as_bytes());
push_lp(&mut input, payload);
Self(metamorphic_crypto::hash::sha3_512_with_context(
context, &input,
))
}
#[must_use]
pub fn as_bytes(&self) -> &[u8; 64] {
&self.0
}
#[must_use]
pub fn to_hex(&self) -> String {
crate::encoding::hex_encode(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlushPlan {
pub tiles: Vec<Tile>,
pub entry_bundles: Vec<Tile>,
}
fn tile_leaf_end(tile: &Tile) -> u128 {
let span = 256u128.pow(u32::from(tile.level()));
(u128::from(tile.index()) * 256 + u128::from(tile.width())) * span
}
fn tile_is_dirty(tile: &Tile, old_size: u64) -> bool {
tile_leaf_end(tile) > u128::from(old_size)
}
pub fn tiles_to_flush(old_size: u64, new_size: u64) -> Result<Vec<Tile>> {
if new_size < old_size {
return Err(Error::SizeRegression {
size1: old_size,
size2: new_size,
});
}
Ok(tiles_for_size(new_size)
.into_iter()
.filter(|t| tile_is_dirty(t, old_size))
.collect())
}
pub fn entry_bundles_to_flush(old_size: u64, new_size: u64) -> Result<Vec<Tile>> {
Ok(tiles_to_flush(old_size, new_size)?
.into_iter()
.filter(|t| t.level() == 0)
.collect())
}
pub fn plan_flush(old_size: u64, new_size: u64) -> Result<FlushPlan> {
let tiles = tiles_to_flush(old_size, new_size)?;
let entry_bundles = tiles.iter().filter(|t| t.level() == 0).copied().collect();
Ok(FlushPlan {
tiles,
entry_bundles,
})
}
pub trait TileReader {
type Error;
fn read_tile(&self, tile: &Tile) -> core::result::Result<Vec<u8>, Self::Error>;
fn read_entry_bundle(&self, tile: &Tile) -> core::result::Result<Vec<u8>, Self::Error>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReadPathError<E> {
Backend(E),
Tile(Error),
}
pub fn recompute_root_via<R: TileReader>(
reader: &R,
size: u64,
) -> core::result::Result<Hash, ReadPathError<R::Error>> {
let mut leaf_hashes = Vec::new();
for tile in tiles_for_size(size).into_iter().filter(|t| t.level() == 0) {
let bytes = reader.read_tile(&tile).map_err(ReadPathError::Backend)?;
let hashes = tile.hashes(&bytes).map_err(ReadPathError::Tile)?;
leaf_hashes.extend_from_slice(&hashes);
}
Ok(recompute_root(&leaf_hashes))
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
use crate::merkle::MerkleTree;
use std::collections::HashMap;
fn ns(s: &str) -> Namespace {
Namespace::parse(s).unwrap()
}
#[test]
fn sequencer_is_monotonic_and_per_namespace() {
let mut seq = Sequencer::new();
let a = ns("alpha");
let b = ns("beta");
assert_eq!(seq.next(&a), 0);
assert_eq!(seq.next(&a), 1);
assert_eq!(seq.next(&b), 0); assert_eq!(seq.next(&a), 2);
assert_eq!(seq.peek(&a), 3);
assert_eq!(seq.peek(&b), 1);
}
#[test]
fn reserve_assigns_contiguous_block() {
let mut seq = Sequencer::new();
let a = ns("alpha");
assert_eq!(seq.next(&a), 0);
assert_eq!(seq.reserve(&a, 5).unwrap(), 1..6);
assert_eq!(seq.next(&a), 6);
assert_eq!(seq.reserve(&a, 0).unwrap(), 7..7);
assert_eq!(seq.peek(&a), 7);
}
#[test]
fn resume_from_is_monotonic_safe() {
let mut seq = Sequencer::new();
let a = ns("alpha");
seq.next(&a);
seq.next(&a); seq.resume_from(&a, 10).unwrap(); assert_eq!(seq.next(&a), 10);
seq.resume_from(&a, 11).unwrap();
assert!(matches!(
seq.resume_from(&a, 5),
Err(Error::SequenceRegression {
current: 11,
requested: 5,
..
})
));
}
#[test]
fn dedup_key_is_deterministic_and_scoped() {
let a = ns("alpha");
let b = ns("beta");
let k1 = DedupKey::from_record(&a, b"hello");
let k2 = DedupKey::from_record(&a, b"hello");
assert_eq!(k1, k2); assert_ne!(k1, DedupKey::from_record(&a, b"world")); assert_ne!(k1, DedupKey::from_record(&b, b"hello")); assert_ne!(
DedupKey::from_record(&a, b"x"),
DedupKey::from_token(&a, b"x")
);
assert_eq!(k1.to_hex().len(), 128);
}
#[test]
fn flush_geometry_matches_tile_substrate() {
let plan = plan_flush(0, 70_000).unwrap();
assert_eq!(plan.tiles, tiles_for_size(70_000));
assert!(plan.entry_bundles.iter().all(|t| t.level() == 0));
assert_eq!(
plan.entry_bundles.len(),
tiles_for_size(70_000)
.iter()
.filter(|t| t.level() == 0)
.count()
);
}
#[test]
fn flush_only_touches_changed_tiles() {
let dirty = tiles_to_flush(256, 512).unwrap();
assert!(
!dirty
.iter()
.any(|t| t.level() == 0 && t.index() == 0 && !t.is_partial()),
"finalized level-0 tile 0 must not be reflushed"
);
assert!(
dirty.iter().any(|t| t.level() == 0 && t.index() == 1),
"new level-0 tile 1 must be flushed"
);
assert!(
dirty.iter().any(|t| t.level() == 1),
"grown level-1 partial tile must be flushed"
);
}
#[test]
fn flush_noop_when_size_unchanged() {
assert!(tiles_to_flush(1000, 1000).unwrap().is_empty());
assert_eq!(
plan_flush(1000, 1000).unwrap(),
FlushPlan {
tiles: vec![],
entry_bundles: vec![]
}
);
}
#[test]
fn flush_rejects_size_regression() {
assert!(matches!(
tiles_to_flush(500, 499),
Err(Error::SizeRegression {
size1: 500,
size2: 499
})
));
assert!(plan_flush(500, 499).is_err());
assert!(entry_bundles_to_flush(500, 499).is_err());
}
struct MemReader {
tiles: HashMap<String, Vec<u8>>,
}
impl MemReader {
fn from_tree(tree: &MerkleTree, size: u64) -> Self {
let mut tiles = HashMap::new();
for tile in tiles_for_size(size).into_iter().filter(|t| t.level() == 0) {
let start = tile.index() * u64::from(crate::tile::TILE_WIDTH);
let mut bytes = Vec::new();
for i in 0..u64::from(tile.width()) {
bytes.extend_from_slice(&tree.leaf_hash(start + i).unwrap());
}
tiles.insert(tile.path(), bytes);
}
Self { tiles }
}
}
impl TileReader for MemReader {
type Error = String;
fn read_tile(&self, tile: &Tile) -> core::result::Result<Vec<u8>, String> {
self.tiles
.get(&tile.path())
.cloned()
.ok_or_else(|| format!("missing tile {}", tile.path()))
}
fn read_entry_bundle(&self, tile: &Tile) -> core::result::Result<Vec<u8>, String> {
self.tiles
.get(&tile.path())
.cloned()
.ok_or_else(|| format!("missing entries {}", tile.entries_path()))
}
}
#[test]
fn read_path_bridge_recomputes_checkpoint_root() {
let mut tree = MerkleTree::new();
for i in 0u32..1000 {
tree.push(&i.to_be_bytes());
}
let reader = MemReader::from_tree(&tree, 1000);
let root = recompute_root_via(&reader, 1000).unwrap();
assert_eq!(root, tree.root());
}
#[test]
fn read_path_bridge_empty_tree() {
let reader = MemReader {
tiles: HashMap::new(),
};
assert_eq!(
recompute_root_via(&reader, 0).unwrap(),
crate::merkle::empty_root()
);
}
#[test]
fn read_path_bridge_surfaces_backend_error() {
let reader = MemReader {
tiles: HashMap::new(),
};
assert!(matches!(
recompute_root_via(&reader, 300),
Err(ReadPathError::Backend(_))
));
}
#[test]
fn read_path_bridge_surfaces_malformed_tile() {
let mut tiles = HashMap::new();
let t = tiles_for_size(1)[0];
tiles.insert(t.path(), vec![0u8; 31]);
let reader = MemReader { tiles };
assert!(matches!(
recompute_root_via(&reader, 1),
Err(ReadPathError::Tile(_))
));
}
}