use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::io::Write;
use chrono::{SecondsFormat, Utc};
use crate::error::{Result, SclsError};
use crate::hash::{Blake2b, HASH_SIZE, MerkleTree};
use crate::records::{ChunkFormat, Header, Manifest, NamespaceInfo, RecordType, Summary};
pub const DEFAULT_TOOL: &str = "cardano-scrawls";
pub const DEFAULT_MAX_CHUNK_SIZE: usize = 16 * 1024 * 1024;
pub const INITIAL_CHUNK_SEQNO: u64 = 0;
#[derive(Debug)]
pub struct SclsWriterBuilder<W: Write> {
output: Option<W>,
slot_no: Option<u64>,
tool: String,
comment: Option<String>,
max_chunk_size: usize,
}
impl<W: Write> SclsWriterBuilder<W> {
pub fn new() -> Self {
Self::default()
}
pub fn output(mut self, writer: W) -> Self {
self.output = Some(writer);
self
}
pub fn slot_no(mut self, slot_no: u64) -> Self {
self.slot_no = Some(slot_no);
self
}
pub fn tool<S: AsRef<str>>(mut self, tool: S) -> Self {
self.tool = tool.as_ref().to_string();
self
}
pub fn comment<S: AsRef<str>>(mut self, comment: S) -> Self {
self.comment = Some(comment.as_ref().to_string());
self
}
pub fn max_chunk_size(mut self, max_chunk_size: usize) -> Self {
self.max_chunk_size = max_chunk_size;
self
}
pub fn build(self) -> Result<SclsWriter<W>> {
let output = self.output.ok_or(SclsError::WriterBuilderMissingOutput)?;
let slot_no = self.slot_no.ok_or(SclsError::WriterBuilderMissingSlotNo)?;
let mut writer = SclsWriter {
output,
slot_no,
tool: self.tool,
comment: self.comment,
max_chunk_size: self.max_chunk_size,
prev_namespace: None,
prev_ns_entry_key: None,
chunk_seqno: INITIAL_CHUNK_SEQNO,
current_chunk: None,
ns_state: BTreeMap::new(),
};
let header = Header::current();
header.write(&mut writer.output)?;
Ok(writer)
}
}
impl<W: Write> Default for SclsWriterBuilder<W> {
fn default() -> Self {
Self {
output: None,
slot_no: None,
tool: DEFAULT_TOOL.to_string(),
comment: None,
max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
}
}
}
#[derive(Debug)]
struct NamespaceState {
key_len: Option<u32>,
chunks: u64,
entries: u64,
merkle: MerkleTree,
}
impl NamespaceState {
fn new() -> Self {
Self {
key_len: None,
chunks: 0,
entries: 0,
merkle: MerkleTree::new(),
}
}
}
#[derive(Debug)]
struct ChunkState {
payload: Vec<u8>,
hasher: Blake2b,
entries: u32,
}
impl ChunkState {
fn new() -> Self {
Self {
payload: Vec::new(),
hasher: Blake2b::new_raw(),
entries: 0,
}
}
}
#[derive(Debug)]
pub struct SclsWriter<W> {
output: W,
slot_no: u64,
tool: String,
comment: Option<String>,
max_chunk_size: usize,
prev_namespace: Option<String>,
prev_ns_entry_key: Option<Vec<u8>>,
chunk_seqno: u64,
current_chunk: Option<ChunkState>,
ns_state: BTreeMap<String, NamespaceState>,
}
impl<W: Write> SclsWriter<W> {
pub fn builder() -> SclsWriterBuilder<W> {
SclsWriterBuilder::new()
}
pub fn write_entry(&mut self, namespace: &str, key: &[u8], value: &[u8]) -> Result<()> {
match &self.prev_namespace {
None => {
self.prev_namespace = Some(namespace.to_string());
self.ns_state
.insert(namespace.to_string(), NamespaceState::new());
}
Some(prev_namespace) => {
let order = prev_namespace.as_str().cmp(namespace);
match order {
Ordering::Equal => {}
Ordering::Less => {
self.flush_chunk()?;
self.prev_namespace = Some(namespace.to_string());
self.prev_ns_entry_key = None;
self.chunk_seqno = INITIAL_CHUNK_SEQNO;
self.ns_state
.insert(namespace.to_string(), NamespaceState::new());
}
Ordering::Greater => {
return Err(SclsError::NonMonotonicNamespace {
previous: prev_namespace.clone(),
found: namespace.to_string(),
});
}
};
}
}
if let Some(prev_key) = &self.prev_ns_entry_key
&& prev_key.as_slice() >= key
{
return Err(SclsError::NonStrictlyMonotonicKeys {
namespace: namespace.to_string(),
});
}
if let Some(chunk) = &self.current_chunk
&& chunk.payload.len() >= self.max_chunk_size
{
self.flush_chunk()?;
}
if self.current_chunk.is_none() {
self.current_chunk = Some(ChunkState::new());
}
self.update_chunk(key, value)?;
self.prev_ns_entry_key = Some(key.to_vec());
Ok(())
}
fn update_chunk(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
let Some(namespace) = &self.prev_namespace else {
unreachable!("must only be called when the namespace is set");
};
let Some(chunk) = &mut self.current_chunk else {
unreachable!("must only be called when the current chunk is set");
};
let Some(ns_state) = self.ns_state.get_mut(namespace) else {
unreachable!("must only be called when the namespace state is set")
};
match ns_state.key_len {
Some(key_len) => {
if key.len() != key_len as usize {
return Err(SclsError::InconsistentKeyLength {
namespace: namespace.clone(),
expected: key_len as usize,
found: key.len(),
});
}
}
None => {
let key_len = u32::try_from(key.len())
.map_err(|_| SclsError::WireLengthOverflow("entry key".into()))?;
ns_state.key_len = Some(key_len);
}
};
let entry_len = u32::try_from(
key.len()
.checked_add(value.len())
.ok_or(SclsError::WireLengthOverflow("entry".into()))?,
)
.map_err(|_| SclsError::WireLengthOverflow("entry".into()))?;
let entry_digest = Blake2b::new_leaf()
.update(namespace.as_bytes())
.update(key)
.update(value)
.as_digest();
chunk.hasher.update(entry_digest.as_bytes());
ns_state.merkle.add_leaf(entry_digest);
chunk.entries += 1;
chunk.payload.extend_from_slice(&entry_len.to_be_bytes());
chunk.payload.extend_from_slice(key);
chunk.payload.extend_from_slice(value);
Ok(())
}
fn flush_chunk(&mut self) -> Result<()> {
let Some(namespace) = &self.prev_namespace else {
unreachable!("must only be called when the namespace is set");
};
let Some(chunk) = self.current_chunk.take() else {
unreachable!("must only be called when the current chunk is set");
};
let Some(ns_state) = self.ns_state.get_mut(namespace) else {
unreachable!("must only be called when the namespace state is set")
};
let len_ns = u32::try_from(namespace.len())
.map_err(|_| SclsError::WireLengthOverflow("namespace".into()))?;
let len_key = ns_state.key_len.unwrap(); let len_entries = u32::try_from(chunk.payload.len())
.map_err(|_| SclsError::WireLengthOverflow("entries".into()))?;
let len_record = [
1, 8, 1, 4, len_ns, 4, len_entries, 4, HASH_SIZE as u32, ]
.iter()
.try_fold(0u32, |acc, &x| acc.checked_add(x))
.ok_or(SclsError::WireLengthOverflow("record".into()))?;
self.output.write_all(&len_record.to_be_bytes())?;
self.output.write_all(&[RecordType::Chunk.to_byte()])?;
self.output.write_all(&self.chunk_seqno.to_be_bytes())?;
self.output.write_all(&[ChunkFormat::Raw.to_byte()])?; self.output.write_all(&len_ns.to_be_bytes())?;
self.output.write_all(namespace.as_bytes())?;
self.output.write_all(&len_key.to_be_bytes())?;
self.output.write_all(&chunk.payload)?;
self.output.write_all(&chunk.entries.to_be_bytes())?;
self.output.write_all(chunk.hasher.as_digest().as_bytes())?;
self.chunk_seqno += 1;
ns_state.chunks += 1;
ns_state.entries += u64::from(chunk.entries);
Ok(())
}
pub fn finalise(mut self) -> Result<()> {
if self.current_chunk.is_some() {
self.flush_chunk()?;
}
let mut global_digest = MerkleTree::new();
let mut namespace_info = Vec::new();
let mut total_entries = 0u64;
let mut total_chunks = 0u64;
for (namespace, info) in self.ns_state {
let ns_digest = info.merkle.root();
let ns_leaf = Blake2b::new_leaf().update(ns_digest.as_bytes()).as_digest();
global_digest.add_leaf(ns_leaf);
total_entries += info.entries;
total_chunks += info.chunks;
namespace_info.push(NamespaceInfo {
entries_count: info.entries,
chunks_count: info.chunks,
name: namespace,
digest: ns_digest,
});
}
let created_at = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
let manifest = Manifest {
slot_no: self.slot_no,
total_entries,
total_chunks,
root_hash: global_digest.root(),
namespace_info,
prev_manifest: 0,
summary: Summary {
created_at,
tool: self.tool,
comment: self.comment,
},
};
manifest.write(&mut self.output)
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
use crate::reader::{Record, SclsReader, VerifyOptions};
use crate::records::Entry;
use proptest::prelude::*;
#[test]
fn accept_same_namespace() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("test", b"a", b"a").is_ok());
assert!(writer.write_entry("test", b"b", b"a").is_ok());
Ok(())
}
#[test]
fn accept_monotonic_namespace() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("a", b"a", b"a").is_ok());
assert!(writer.write_entry("b", b"a", b"a").is_ok());
Ok(())
}
#[test]
fn forbid_decreasing_namespace() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("b", b"a", b"a").is_ok());
let Err(err) = writer.write_entry("a", b"a", b"a") else {
panic!("descending namespace should be forbidden")
};
assert!(matches!(err, SclsError::NonMonotonicNamespace { .. }));
Ok(())
}
#[test]
fn forbid_previous_namespace() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("a", b"a", b"a").is_ok());
assert!(writer.write_entry("b", b"a", b"a").is_ok());
let Err(err) = writer.write_entry("a", b"b", b"a") else {
panic!("descending namespace should be forbidden");
};
assert!(matches!(err, SclsError::NonMonotonicNamespace { .. }));
Ok(())
}
#[test]
fn accept_strictly_increasing_keys() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("a", b"a", b"a").is_ok());
assert!(writer.write_entry("a", b"b", b"a").is_ok());
assert!(writer.write_entry("b", b"a", b"a").is_ok());
assert!(writer.write_entry("b", b"b", b"a").is_ok());
Ok(())
}
#[test]
fn forbid_equal_keys() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("test", b"a", b"a").is_ok());
assert!(matches!(
writer.write_entry("test", b"a", b"a"),
Err(SclsError::NonStrictlyMonotonicKeys { .. })
));
Ok(())
}
#[test]
fn forbid_decreasing_keys() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("test", b"b", b"a").is_ok());
assert!(matches!(
writer.write_entry("test", b"a", b"a"),
Err(SclsError::NonStrictlyMonotonicKeys { .. })
));
Ok(())
}
#[test]
fn key_order_resets_with_namespace() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("a", b"b", b"a").is_ok());
assert!(writer.write_entry("b", b"a", b"a").is_ok());
Ok(())
}
#[test]
fn forbid_inconsistent_key_length_within_namespace() -> Result<()> {
let buf = Vec::new();
let mut writer = SclsWriter::builder().output(buf).slot_no(0).build()?;
assert!(writer.write_entry("test", b"a", b"a").is_ok());
assert!(matches!(
writer.write_entry("test", b"foo", b"a"),
Err(SclsError::InconsistentKeyLength { .. })
));
Ok(())
}
#[test]
fn chunk_size_below_threshold() -> Result<()> {
let mut buf = Vec::new();
let mut writer = SclsWriter::builder()
.output(&mut buf)
.slot_no(0)
.max_chunk_size(12)
.build()?;
writer.write_entry("test", b"a", b"a")?; writer.write_entry("test", b"b", b"a")?;
let mut cursor = Cursor::new(buf.clone());
assert!(matches!(
Record::read_next(&mut cursor)?,
Some(Record::Header(_))
));
assert!(Record::read_next(&mut cursor)?.is_none());
Ok(())
}
#[test]
fn chunk_size_above_threshold() -> Result<()> {
let key = b"a";
let value = b"this is an oversized entry";
let mut buf = Vec::new();
let mut writer = SclsWriter::builder()
.output(&mut buf)
.slot_no(0)
.max_chunk_size(12)
.build()?;
writer.write_entry("test", key, value)?; writer.write_entry("test", b"b", b"a")?;
let mut cursor = Cursor::new(buf.clone());
assert!(matches!(
Record::read_next(&mut cursor)?,
Some(Record::Header(_))
));
let Some(Record::Chunk(chunk)) = Record::read_next(&mut cursor)? else {
panic!("expected chunk");
};
assert!(Record::read_next(&mut cursor)?.is_none());
chunk.for_each_entry(&mut cursor, |reader, key_len, val_len| {
let entry = Entry::materialise(reader, key_len, val_len)?;
assert_eq!(entry.key, key);
assert_eq!(entry.value, value);
Ok(())
})?;
Ok(())
}
prop_compose! {
fn valid_entry_writes()
(
pairs in proptest::collection::btree_set(("[a-z]+", any::<u8>()), 1..=20),
)
(
values in proptest::collection::vec(
proptest::collection::vec(any::<u8>(), 0..=16usize),
pairs.len()..=pairs.len(),
),
pairs in Just(pairs),
)
-> Vec<(String, Vec<u8>, Vec<u8>)> {
pairs.into_iter()
.zip(values)
.map(|((ns, key), value)| (ns, vec![key], value))
.collect()
}
}
proptest! {
#[test]
fn chunk_roundtrip(entries in valid_entry_writes()) {
let mut buf = Vec::new();
let mut writer = SclsWriter::builder().output(&mut buf).slot_no(0).build()?;
let mut written_by_ns: BTreeMap<String, Vec<Entry>> = BTreeMap::new();
let mut read_by_ns: BTreeMap<String, Vec<Entry>> = BTreeMap::new();
for (namespace, key, value) in &entries {
written_by_ns.entry(namespace.clone())
.or_default()
.push(Entry { key: key.clone(), value: value.clone() });
writer.write_entry(namespace.as_str(), key, value)?;
}
writer.write_entry("最终", b"a", b"a")?;
let mut cursor = Cursor::new(buf.clone());
let header_first = matches!(Record::read_next(&mut cursor)?, Some(Record::Header(_)));
prop_assert!(header_first);
let mut seqno = INITIAL_CHUNK_SEQNO;
let mut prev_namespace = None;
while let Some(Record::Chunk(chunk)) = Record::read_next(&mut cursor)? {
let mut cursor = Cursor::new(buf.clone());
if let Some(previous) = prev_namespace && previous != chunk.namespace {
seqno = INITIAL_CHUNK_SEQNO;
}
prop_assert_eq!(chunk.seqno, seqno);
seqno += 1;
prop_assert!(chunk.verify(&mut cursor).is_ok());
chunk.for_each_entry(&mut cursor, |reader, key_len, val_len| {
let namespace = chunk.namespace.clone();
let entry = Entry::materialise(reader, key_len, val_len)?;
read_by_ns.entry(namespace)
.or_default()
.push(entry);
Ok(())
})?;
prev_namespace = Some(chunk.namespace);
}
prop_assert_eq!(read_by_ns, written_by_ns);
}
#[test]
fn roundtrip_verification(entries in valid_entry_writes()) {
let mut buf = Vec::new();
let mut writer = SclsWriter::builder().output(&mut buf).slot_no(0).build()?;
for (namespace, key, value) in &entries {
writer.write_entry(namespace.as_str(), key, value)?;
}
writer.finalise()?;
let cursor = Cursor::new(buf);
let mut reader = SclsReader::new(cursor);
prop_assert!(reader.verify(VerifyOptions::full()).is_ok());
}
}
}