use std::fs::{self, OpenOptions};
use std::io::{self, Write as IoWrite};
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use chrono::Datelike;
use crate::event::writer::shard_header;
use crate::lock::ShardLock;
#[derive(Debug, thiserror::Error)]
pub enum ShardError {
#[error("shard I/O error: {0}")]
Io(#[from] io::Error),
#[error("lock error: {0}")]
Lock(#[from] crate::lock::LockError),
#[error("failed to initialize .bones directory: {0}")]
InitFailed(io::Error),
#[error("invalid shard filename: {0}")]
InvalidShardName(String),
#[error("corrupted shard {path}: {reason}")]
CorruptedShard {
path: PathBuf,
reason: String,
},
}
#[derive(Debug, Clone)]
pub struct ShardIntegrityIssue {
pub shard_name: String,
pub problem: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShardManifest {
pub shard_name: String,
pub event_count: u64,
pub byte_len: u64,
pub file_hash: String,
}
impl ShardManifest {
#[must_use]
pub fn to_string_repr(&self) -> String {
format!(
"shard: {}\nevent_count: {}\nbyte_len: {}\nfile_hash: {}\n",
self.shard_name, self.event_count, self.byte_len, self.file_hash
)
}
#[must_use]
pub fn from_string_repr(s: &str) -> Option<Self> {
let mut shard_name = None;
let mut event_count = None;
let mut byte_len = None;
let mut file_hash = None;
for line in s.lines() {
if let Some(val) = line.strip_prefix("shard: ") {
shard_name = Some(val.to_string());
} else if let Some(val) = line.strip_prefix("event_count: ") {
event_count = val.parse().ok();
} else if let Some(val) = line.strip_prefix("byte_len: ") {
byte_len = val.parse().ok();
} else if let Some(val) = line.strip_prefix("file_hash: ") {
file_hash = Some(val.to_string());
}
}
Some(Self {
shard_name: shard_name?,
event_count: event_count?,
byte_len: byte_len?,
file_hash: file_hash?,
})
}
}
pub struct ShardManager {
bones_dir: PathBuf,
}
impl ShardManager {
#[must_use]
pub fn new(bones_dir: impl Into<PathBuf>) -> Self {
Self {
bones_dir: bones_dir.into(),
}
}
#[must_use]
pub fn events_dir(&self) -> PathBuf {
self.bones_dir.join("events")
}
#[must_use]
pub fn lock_path(&self) -> PathBuf {
self.bones_dir.join("lock")
}
#[must_use]
pub fn clock_path(&self) -> PathBuf {
self.bones_dir.join("cache").join("clock")
}
#[must_use]
pub fn shard_filename(year: i32, month: u32) -> String {
format!("{year:04}-{month:02}.events")
}
#[must_use]
pub fn shard_path(&self, year: i32, month: u32) -> PathBuf {
self.events_dir().join(Self::shard_filename(year, month))
}
#[must_use]
pub fn manifest_path(&self, year: i32, month: u32) -> PathBuf {
self.events_dir()
.join(format!("{year:04}-{month:02}.manifest"))
}
pub fn ensure_dirs(&self) -> Result<(), ShardError> {
fs::create_dir_all(self.events_dir()).map_err(ShardError::InitFailed)?;
fs::create_dir_all(self.bones_dir.join("cache")).map_err(ShardError::InitFailed)?;
Ok(())
}
pub fn init(&self) -> Result<(i32, u32), ShardError> {
self.ensure_dirs()?;
let shards = self.list_shards()?;
if shards.is_empty() {
let (year, month) = current_year_month();
self.create_shard(year, month)?;
Ok((year, month))
} else if let Some(&(year, month)) = shards.last() {
Ok((year, month))
} else {
unreachable!("shards is non-empty")
}
}
pub fn list_shards(&self) -> Result<Vec<(i32, u32)>, ShardError> {
let events_dir = self.events_dir();
if !events_dir.exists() {
return Ok(Vec::new());
}
let mut shards = Vec::new();
for entry in fs::read_dir(&events_dir)? {
let entry = entry?;
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(ym) = parse_shard_filename(&name_str) {
shards.push(ym);
}
}
shards.sort_unstable();
Ok(shards)
}
pub fn active_shard(&self) -> Result<Option<(i32, u32)>, ShardError> {
let shards = self.list_shards()?;
Ok(shards.last().copied())
}
pub fn create_shard(&self, year: i32, month: u32) -> Result<PathBuf, ShardError> {
let path = self.shard_path(year, month);
if path.exists() {
return Ok(path);
}
let header = shard_header();
fs::write(&path, header)?;
Ok(path)
}
pub fn rotate_if_needed(&self) -> Result<(i32, u32), ShardError> {
let (current_year, current_month) = current_year_month();
let active = self.active_shard()?;
match active {
Some((y, m)) if y == current_year && m == current_month => Ok((y, m)),
Some((y, m)) => {
self.write_manifest(y, m)?;
self.create_shard(current_year, current_month)?;
Ok((current_year, current_month))
}
None => {
self.create_shard(current_year, current_month)?;
Ok((current_year, current_month))
}
}
}
pub fn write_manifest(&self, year: i32, month: u32) -> Result<ShardManifest, ShardError> {
let shard_path = self.shard_path(year, month);
let content = fs::read(&shard_path)?;
let content_str = String::from_utf8_lossy(&content);
let event_count = content_str
.lines()
.filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
.count() as u64;
let byte_len = content.len() as u64;
let file_hash = format!("blake3:{}", blake3::hash(&content).to_hex());
let manifest = ShardManifest {
shard_name: Self::shard_filename(year, month),
event_count,
byte_len,
file_hash,
};
let manifest_path = self.manifest_path(year, month);
fs::write(&manifest_path, manifest.to_string_repr())?;
Ok(manifest)
}
pub fn read_manifest(
&self,
year: i32,
month: u32,
) -> Result<Option<ShardManifest>, ShardError> {
let manifest_path = self.manifest_path(year, month);
if !manifest_path.exists() {
return Ok(None);
}
let content = fs::read_to_string(&manifest_path)?;
Ok(ShardManifest::from_string_repr(&content))
}
pub fn validate_sealed_shards(&self) -> Result<Vec<ShardIntegrityIssue>, ShardError> {
let shards = self.list_shards()?;
let mut issues = Vec::new();
let sealed = if shards.len() > 1 {
&shards[..shards.len() - 1]
} else {
return Ok(issues);
};
for &(year, month) in sealed {
let shard_path = self.shard_path(year, month);
let shard_name = Self::shard_filename(year, month);
let Some(manifest) = self.read_manifest(year, month)? else {
tracing::warn!(shard = %shard_name, "sealed shard has no manifest");
issues.push(ShardIntegrityIssue {
shard_name: shard_name.clone(),
problem: "sealed shard has no manifest file".into(),
});
continue;
};
let file_len = fs::metadata(&shard_path)?.len();
if file_len != manifest.byte_len {
tracing::error!(
shard = %shard_name,
expected = manifest.byte_len,
actual = file_len,
"sealed shard byte length mismatch"
);
issues.push(ShardIntegrityIssue {
shard_name,
problem: format!(
"byte length mismatch: manifest says {} bytes, file is {} bytes",
manifest.byte_len, file_len
),
});
}
}
Ok(issues)
}
pub fn append(
&self,
line: &str,
durable: bool,
lock_timeout: Duration,
) -> Result<i64, ShardError> {
self.ensure_dirs()?;
let _lock = ShardLock::acquire(&self.lock_path(), lock_timeout)?;
let (year, month) = self.rotate_if_needed()?;
let shard_path = self.shard_path(year, month);
if shard_path.exists() {
validate_shard_header(&shard_path)?;
}
let ts = self.next_timestamp()?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&shard_path)?;
file.write_all(line.as_bytes())?;
file.flush()?;
if durable {
file.sync_data()?;
}
Ok(ts)
}
pub fn append_raw(&self, year: i32, month: u32, line: &str) -> Result<(), ShardError> {
let shard_path = self.shard_path(year, month);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&shard_path)?;
file.write_all(line.as_bytes())?;
file.flush()?;
Ok(())
}
pub fn read_clock(&self) -> Result<i64, ShardError> {
let path = self.clock_path();
if !path.exists() {
return Ok(0);
}
let content = fs::read_to_string(&path)?;
Ok(content.trim().parse::<i64>().unwrap_or(0))
}
pub fn next_timestamp(&self) -> Result<i64, ShardError> {
let last = self.read_clock()?;
let now = system_time_us();
let next = std::cmp::max(now, last + 1);
self.write_clock(next)?;
Ok(next)
}
fn write_clock(&self, value: i64) -> Result<(), ShardError> {
let path = self.clock_path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&path, value.to_string())?;
Ok(())
}
pub fn recover_torn_writes(&self) -> Result<Option<u64>, ShardError> {
let Some(active) = self.active_shard()? else {
return Ok(None);
};
let shard_path = self.shard_path(active.0, active.1);
recover_shard_torn_write(&shard_path)
}
pub fn replay(&self) -> Result<String, ShardError> {
let shards = self.list_shards()?;
let mut content = String::new();
for (year, month) in shards {
let path = self.shard_path(year, month);
let shard_content = fs::read_to_string(&path)?;
content.push_str(&shard_content);
}
Ok(content)
}
pub fn read_shard(&self, year: i32, month: u32) -> Result<String, ShardError> {
let path = self.shard_path(year, month);
Ok(fs::read_to_string(&path)?)
}
pub fn total_content_len(&self) -> Result<usize, ShardError> {
let shards = self.list_shards()?;
let mut total = 0usize;
for (year, month) in shards {
let path = self.shard_path(year, month);
let meta = fs::metadata(&path)?;
total = total.saturating_add(usize::try_from(meta.len()).unwrap_or(usize::MAX));
}
Ok(total)
}
pub fn replay_from_offset(&self, offset: usize) -> Result<(String, usize), ShardError> {
let shards = self.list_shards()?;
let mut cumulative: usize = 0;
let mut result = String::new();
let mut found_start = false;
for (year, month) in shards {
let path = self.shard_path(year, month);
let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
let shard_end = cumulative.saturating_add(shard_len);
if shard_end <= offset {
cumulative = shard_end;
continue;
}
let shard_content = fs::read_to_string(&path)?;
if found_start {
result.push_str(&shard_content);
} else {
let within = offset.saturating_sub(cumulative);
let within = within.min(shard_content.len());
let within = snap_to_char_boundary(&shard_content, within);
result.push_str(&shard_content[within..]);
found_start = true;
}
cumulative = shard_end;
}
Ok((result, cumulative))
}
pub fn read_content_range(
&self,
start_offset: usize,
end_offset: usize,
) -> Result<String, ShardError> {
if start_offset >= end_offset {
return Ok(String::new());
}
let shards = self.list_shards()?;
let mut cumulative: usize = 0;
let mut result = String::new();
for (year, month) in shards {
let path = self.shard_path(year, month);
let shard_len = usize::try_from(fs::metadata(&path)?.len()).unwrap_or(usize::MAX);
let shard_end = cumulative.saturating_add(shard_len);
if shard_end <= start_offset {
cumulative = shard_end;
continue;
}
if cumulative >= end_offset {
break;
}
let shard_content = fs::read_to_string(&path)?;
let within_start = if cumulative < start_offset {
(start_offset - cumulative).min(shard_content.len())
} else {
0
};
let within_end = if shard_end > end_offset {
(end_offset - cumulative).min(shard_content.len())
} else {
shard_content.len()
};
let within_start = snap_to_char_boundary(&shard_content, within_start);
let within_end = snap_to_char_boundary(&shard_content, within_end);
if within_start < within_end {
result.push_str(&shard_content[within_start..within_end]);
}
cumulative = shard_end;
}
Ok(result)
}
pub fn event_count(&self) -> Result<u64, ShardError> {
let content = self.replay()?;
let count = content
.lines()
.filter(|line| !line.is_empty() && !line.starts_with('#') && !line.trim().is_empty())
.count();
Ok(count as u64)
}
pub fn replay_lines(
&self,
) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
self.replay_lines_from_offset(0)
}
pub fn replay_lines_from_offset(
&self,
offset: usize,
) -> Result<impl Iterator<Item = io::Result<(usize, String)>>, ShardError> {
let shards = self.list_shards()?;
let bones_dir = self.bones_dir.clone();
Ok(ShardLineIterator {
shards,
current_shard_idx: 0,
current_reader: None,
cumulative_offset: 0,
bones_dir,
}
.skip_to_offset(offset))
}
pub fn is_empty(&self) -> Result<bool, ShardError> {
let shards = self.list_shards()?;
Ok(shards.is_empty())
}
}
struct ShardLineIterator {
shards: Vec<(i32, u32)>,
current_shard_idx: usize,
current_reader: Option<io::BufReader<fs::File>>,
cumulative_offset: usize,
bones_dir: PathBuf,
}
impl ShardLineIterator {
fn skip_to_offset(mut self, offset: usize) -> Self {
while self.current_shard_idx < self.shards.len() {
let (year, month) = self.shards[self.current_shard_idx];
let shard_path = self
.bones_dir
.join("events")
.join(ShardManager::shard_filename(year, month));
if let Ok(meta) = fs::metadata(shard_path) {
let shard_len = usize::try_from(meta.len()).unwrap_or(usize::MAX);
if self.cumulative_offset + shard_len <= offset {
self.cumulative_offset += shard_len;
self.current_shard_idx += 1;
continue;
}
}
break;
}
self.cumulative_offset = offset;
self
}
}
impl Iterator for ShardLineIterator {
type Item = io::Result<(usize, String)>;
fn next(&mut self) -> Option<Self::Item> {
use std::io::{BufRead, Seek, SeekFrom};
loop {
if self.current_reader.is_none() {
if self.current_shard_idx >= self.shards.len() {
return None;
}
let (year, month) = self.shards[self.current_shard_idx];
let shard_path = self
.bones_dir
.join("events")
.join(ShardManager::shard_filename(year, month));
if is_forwarding_pointer(&shard_path) {
tracing::warn!(
shard = %shard_path.display(),
"skipping legacy forwarding-pointer shard during replay"
);
if let Ok(meta) = fs::metadata(&shard_path) {
self.cumulative_offset += usize::try_from(meta.len()).unwrap_or(0);
}
self.current_shard_idx += 1;
continue;
}
if let Err(e) = validate_shard_header(&shard_path) {
tracing::error!(
shard = %shard_path.display(),
error = %e,
"shard header validation failed"
);
return Some(Err(io::Error::new(
io::ErrorKind::InvalidData,
e.to_string(),
)));
}
let mut file = match fs::File::open(shard_path) {
Ok(f) => f,
Err(e) => return Some(Err(e)),
};
let mut cumulative_before = 0;
for i in 0..self.current_shard_idx {
let (y, m) = self.shards[i];
let p = self
.bones_dir
.join("events")
.join(ShardManager::shard_filename(y, m));
if let Ok(meta) = fs::metadata(p) {
cumulative_before += usize::try_from(meta.len()).unwrap_or(usize::MAX);
}
}
if self.cumulative_offset > cumulative_before {
let within = self.cumulative_offset - cumulative_before;
if let Err(e) = file.seek(SeekFrom::Start(within as u64)) {
return Some(Err(e));
}
}
self.current_reader = Some(io::BufReader::new(file));
}
let reader = self
.current_reader
.as_mut()
.expect("reader was just set above");
let mut line = String::new();
let offset = self.cumulative_offset;
match reader.read_line(&mut line) {
Ok(0) => {
self.current_reader = None;
self.current_shard_idx += 1;
}
Ok(n) => {
self.cumulative_offset += n;
return Some(Ok((offset, line)));
}
Err(e) => return Some(Err(e)),
}
}
}
}
const fn snap_to_char_boundary(s: &str, offset: usize) -> usize {
if offset >= s.len() {
return s.len();
}
let mut pos = offset;
while pos < s.len() && !s.is_char_boundary(pos) {
pos += 1;
}
pos
}
#[must_use]
fn current_year_month() -> (i32, u32) {
let now = chrono::Utc::now();
(now.year(), now.month())
}
#[allow(clippy::cast_possible_truncation)]
#[must_use]
fn system_time_us() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_micros() as i64)
}
fn is_forwarding_pointer(path: &Path) -> bool {
let Ok(meta) = fs::metadata(path) else {
return false;
};
if meta.len() > 30 {
return false;
}
let Ok(content) = fs::read_to_string(path) else {
return false;
};
parse_shard_filename(content.trim()).is_some()
}
pub fn validate_shard_header(path: &Path) -> Result<(), ShardError> {
use crate::event::writer::SHARD_HEADER;
use std::io::{BufRead, BufReader};
if is_forwarding_pointer(path) {
return Ok(());
}
let file = fs::File::open(path)?;
let mut reader = BufReader::new(file);
let mut first_line = String::new();
let n = reader.read_line(&mut first_line)?;
if n == 0 {
return Ok(());
}
let trimmed = first_line.trim_end();
if trimmed != SHARD_HEADER {
return Err(ShardError::CorruptedShard {
path: path.to_path_buf(),
reason: format!(
"expected header '{}', found '{}'",
SHARD_HEADER,
trimmed.chars().take(80).collect::<String>()
),
});
}
Ok(())
}
fn parse_shard_filename(name: &str) -> Option<(i32, u32)> {
let stem = name.strip_suffix(".events")?;
if stem == "current" {
return None;
}
let (year_str, month_str) = stem.split_once('-')?;
let year: i32 = year_str.parse().ok()?;
let month: u32 = month_str.parse().ok()?;
if !(1..=12).contains(&month) {
return None;
}
Some((year, month))
}
fn recover_shard_torn_write(path: &Path) -> Result<Option<u64>, ShardError> {
let metadata = fs::metadata(path)?;
let file_len = metadata.len();
if file_len == 0 {
return Ok(None);
}
let content = fs::read(path)?;
let last_newline = content.iter().rposition(|&b| b == b'\n');
if let Some(pos) = last_newline {
let expected_len = (pos + 1) as u64;
if expected_len < file_len {
let truncated = file_len - expected_len;
let file = OpenOptions::new().write(true).open(path)?;
file.set_len(expected_len)?;
Ok(Some(truncated))
} else {
Ok(None)
}
} else {
let file = OpenOptions::new().write(true).open(path)?;
file.set_len(0)?;
Ok(Some(file_len))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn snap_to_char_boundary_ascii() {
let s = "hello world";
assert_eq!(snap_to_char_boundary(s, 0), 0);
assert_eq!(snap_to_char_boundary(s, 5), 5);
assert_eq!(snap_to_char_boundary(s, 11), 11); assert_eq!(snap_to_char_boundary(s, 100), 11); }
#[test]
fn snap_to_char_boundary_emoji() {
let s = "ab✅cd🎉ef";
assert_eq!(snap_to_char_boundary(s, 2), 2); assert_eq!(snap_to_char_boundary(s, 3), 5); assert_eq!(snap_to_char_boundary(s, 4), 5); assert_eq!(snap_to_char_boundary(s, 5), 5); assert_eq!(snap_to_char_boundary(s, 8), 11); assert_eq!(snap_to_char_boundary(s, 9), 11);
assert_eq!(snap_to_char_boundary(s, 10), 11);
}
fn setup() -> (TempDir, ShardManager) {
let tmp = TempDir::new().expect("tempdir");
let bones_dir = tmp.path().join(".bones");
let mgr = ShardManager::new(&bones_dir);
(tmp, mgr)
}
#[test]
fn parse_valid_shard_filenames() {
assert_eq!(parse_shard_filename("2026-01.events"), Some((2026, 1)));
assert_eq!(parse_shard_filename("2026-12.events"), Some((2026, 12)));
assert_eq!(parse_shard_filename("1999-06.events"), Some((1999, 6)));
}
#[test]
fn parse_invalid_shard_filenames() {
assert_eq!(parse_shard_filename("current.events"), None);
assert_eq!(parse_shard_filename("2026-13.events"), None); assert_eq!(parse_shard_filename("2026-00.events"), None); assert_eq!(parse_shard_filename("not-a-shard.txt"), None);
assert_eq!(parse_shard_filename("2026-01.manifest"), None);
assert_eq!(parse_shard_filename(""), None);
}
#[test]
fn shard_manager_paths() {
let mgr = ShardManager::new("/repo/.bones");
assert_eq!(mgr.events_dir(), PathBuf::from("/repo/.bones/events"));
assert_eq!(mgr.lock_path(), PathBuf::from("/repo/.bones/lock"));
assert_eq!(mgr.clock_path(), PathBuf::from("/repo/.bones/cache/clock"));
assert_eq!(
mgr.shard_path(2026, 2),
PathBuf::from("/repo/.bones/events/2026-02.events")
);
assert_eq!(
mgr.manifest_path(2026, 1),
PathBuf::from("/repo/.bones/events/2026-01.manifest")
);
}
#[test]
fn shard_filename_format() {
assert_eq!(ShardManager::shard_filename(2026, 1), "2026-01.events");
assert_eq!(ShardManager::shard_filename(2026, 12), "2026-12.events");
assert_eq!(ShardManager::shard_filename(1999, 6), "1999-06.events");
}
#[test]
fn ensure_dirs_creates_directories() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("should create dirs");
assert!(mgr.events_dir().exists());
assert!(mgr.bones_dir.join("cache").exists());
}
#[test]
fn ensure_dirs_is_idempotent() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("first");
mgr.ensure_dirs().expect("second");
assert!(mgr.events_dir().exists());
}
#[test]
fn init_creates_first_shard() {
let (_tmp, mgr) = setup();
let (year, month) = mgr.init().expect("init");
let (expected_year, expected_month) = current_year_month();
assert_eq!(year, expected_year);
assert_eq!(month, expected_month);
let shard_path = mgr.shard_path(year, month);
assert!(shard_path.exists());
let content = fs::read_to_string(&shard_path).expect("read");
assert!(content.starts_with("# bones event log v1"));
}
#[test]
fn init_is_idempotent() {
let (_tmp, mgr) = setup();
let first = mgr.init().expect("first");
let second = mgr.init().expect("second");
assert_eq!(first, second);
}
#[test]
fn validate_shard_header_reports_unicode_without_panicking() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let path = mgr.shard_path(2026, 4);
fs::write(&path, format!("{}\n", "é".repeat(120))).expect("write shard");
let err = validate_shard_header(&path).expect_err("invalid header");
assert!(err.to_string().contains("expected header"));
}
#[test]
fn list_shards_empty() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let shards = mgr.list_shards().expect("list");
assert!(shards.is_empty());
}
#[test]
fn list_shards_returns_sorted() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 3).expect("create");
mgr.create_shard(2026, 1).expect("create");
mgr.create_shard(2026, 2).expect("create");
let shards = mgr.list_shards().expect("list");
assert_eq!(shards, vec![(2026, 1), (2026, 2), (2026, 3)]);
}
#[test]
fn list_shards_skips_non_shard_files() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
fs::write(mgr.events_dir().join("readme.txt"), "hi").expect("write");
fs::write(mgr.events_dir().join("2026-01.manifest"), "manifest").expect("write");
let shards = mgr.list_shards().expect("list");
assert_eq!(shards, vec![(2026, 1)]);
}
#[test]
fn list_shards_no_events_dir() {
let (_tmp, mgr) = setup();
let shards = mgr.list_shards().expect("list");
assert!(shards.is_empty());
}
#[test]
fn create_shard_writes_header() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let path = mgr.create_shard(2026, 2).expect("create");
let content = fs::read_to_string(&path).expect("read");
assert!(content.starts_with("# bones event log v1"));
assert!(content.contains("# fields:"));
assert_eq!(content.lines().count(), 2);
}
#[test]
fn create_shard_idempotent() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let p1 = mgr.create_shard(2026, 2).expect("first");
fs::write(&p1, "modified").expect("write");
let p2 = mgr.create_shard(2026, 2).expect("second");
assert_eq!(p1, p2);
let content = fs::read_to_string(&p2).expect("read");
assert_eq!(content, "modified");
}
#[test]
fn clock_starts_at_zero() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let ts = mgr.read_clock().expect("read");
assert_eq!(ts, 0);
}
#[test]
fn clock_is_monotonic() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let t1 = mgr.next_timestamp().expect("t1");
let t2 = mgr.next_timestamp().expect("t2");
let t3 = mgr.next_timestamp().expect("t3");
assert!(t2 > t1);
assert!(t3 > t2);
}
#[test]
fn clock_reads_back_written_value() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.write_clock(42_000_000).expect("write");
let ts = mgr.read_clock().expect("read");
assert_eq!(ts, 42_000_000);
}
#[test]
fn clock_never_goes_backward() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let future = system_time_us() + 10_000_000;
mgr.write_clock(future).expect("write");
let next = mgr.next_timestamp().expect("next");
assert!(next > future, "clock should advance past future value");
}
#[test]
fn append_raw_adds_line() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 2).expect("create");
mgr.append_raw(2026, 2, "event line 1\n").expect("append");
mgr.append_raw(2026, 2, "event line 2\n").expect("append");
let content = mgr.read_shard(2026, 2).expect("read");
assert!(content.contains("event line 1"));
assert!(content.contains("event line 2"));
}
#[test]
fn append_with_lock() {
let (_tmp, mgr) = setup();
mgr.init().expect("init");
let _ts = mgr
.append("test event line\n", false, Duration::from_secs(1))
.expect("append");
let content = mgr.replay().expect("replay");
assert!(content.contains("test event line"));
}
#[test]
fn append_returns_monotonic_timestamps() {
let (_tmp, mgr) = setup();
mgr.init().expect("init");
let t1 = mgr
.append("line1\n", false, Duration::from_secs(1))
.expect("t1");
let t2 = mgr
.append("line2\n", false, Duration::from_secs(1))
.expect("t2");
assert!(t2 > t1);
}
#[test]
fn recover_clean_file() {
let (_tmp, mgr) = setup();
mgr.init().expect("init");
let (y, m) = current_year_month();
mgr.append_raw(y, m, "complete line\n").expect("append");
let recovered = mgr.recover_torn_writes().expect("recover");
assert_eq!(recovered, None);
}
#[test]
fn recover_torn_write_truncates() {
let (_tmp, mgr) = setup();
let (y, m) = mgr.init().expect("init");
let shard_path = mgr.shard_path(y, m);
{
let mut f = OpenOptions::new()
.append(true)
.open(&shard_path)
.expect("open");
f.write_all(b"complete line\npartial line without newline")
.expect("write");
f.flush().expect("flush");
}
let recovered = mgr.recover_torn_writes().expect("recover");
assert!(recovered.is_some());
let truncated = recovered.expect("checked is_some");
assert_eq!(truncated, "partial line without newline".len() as u64);
let content = fs::read_to_string(&shard_path).expect("read");
assert!(content.ends_with('\n'));
assert!(content.contains("complete line"));
assert!(!content.contains("partial line without newline"));
}
#[test]
fn recover_no_newline_at_all() {
let (_tmp, mgr) = setup();
let (y, m) = mgr.init().expect("init");
let shard_path = mgr.shard_path(y, m);
fs::write(&shard_path, "no newlines here").expect("write");
let recovered = mgr.recover_torn_writes().expect("recover");
assert_eq!(recovered, Some("no newlines here".len() as u64));
let content = fs::read_to_string(&shard_path).expect("read");
assert!(content.is_empty());
}
#[test]
fn recover_empty_file() {
let (_tmp, mgr) = setup();
let (y, m) = mgr.init().expect("init");
let shard_path = mgr.shard_path(y, m);
fs::write(&shard_path, "").expect("write");
let recovered = mgr.recover_torn_writes().expect("recover");
assert_eq!(recovered, None);
}
#[test]
fn recover_no_active_shard() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let recovered = mgr.recover_torn_writes().expect("recover");
assert_eq!(recovered, None);
}
#[test]
fn replay_empty_repo() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let content = mgr.replay().expect("replay");
assert!(content.is_empty());
}
#[test]
fn replay_single_shard() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event-a\n").expect("append");
let content = mgr.replay().expect("replay");
assert!(content.contains("event-a"));
}
#[test]
fn replay_multiple_shards_in_order() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.create_shard(2026, 2).expect("create");
mgr.create_shard(2026, 3).expect("create");
mgr.append_raw(2026, 1, "event-jan\n").expect("append");
mgr.append_raw(2026, 2, "event-feb\n").expect("append");
mgr.append_raw(2026, 3, "event-mar\n").expect("append");
let content = mgr.replay().expect("replay");
let jan_pos = content.find("event-jan").expect("jan");
let feb_pos = content.find("event-feb").expect("feb");
let mar_pos = content.find("event-mar").expect("mar");
assert!(jan_pos < feb_pos);
assert!(feb_pos < mar_pos);
}
#[test]
fn event_count_empty() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
assert_eq!(mgr.event_count().expect("count"), 0);
}
#[test]
fn event_count_excludes_comments_and_blanks() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event1\n").expect("append");
mgr.append_raw(2026, 1, "event2\n").expect("append");
mgr.append_raw(2026, 1, "\n").expect("blank");
assert_eq!(mgr.event_count().expect("count"), 2);
}
#[test]
fn is_empty_no_shards() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
assert!(mgr.is_empty().expect("empty"));
}
#[test]
fn is_empty_with_shards() {
let (_tmp, mgr) = setup();
mgr.init().expect("init");
assert!(!mgr.is_empty().expect("empty"));
}
#[test]
fn write_and_read_manifest() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event-line-1\n").expect("append");
mgr.append_raw(2026, 1, "event-line-2\n").expect("append");
let written = mgr.write_manifest(2026, 1).expect("write manifest");
assert_eq!(written.shard_name, "2026-01.events");
assert_eq!(written.event_count, 2);
assert!(written.byte_len > 0);
assert!(written.file_hash.starts_with("blake3:"));
let read = mgr
.read_manifest(2026, 1)
.expect("read")
.expect("should exist");
assert_eq!(read, written);
}
#[test]
fn manifest_roundtrip() {
let manifest = ShardManifest {
shard_name: "2026-01.events".into(),
event_count: 42,
byte_len: 12345,
file_hash: "blake3:abcdef0123456789".into(),
};
let repr = manifest.to_string_repr();
let parsed = ShardManifest::from_string_repr(&repr).expect("parse");
assert_eq!(parsed, manifest);
}
#[test]
fn read_manifest_missing() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let result = mgr.read_manifest(2026, 1).expect("read");
assert!(result.is_none());
}
#[test]
fn manifest_event_count_excludes_comments() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event1\n").expect("append");
let manifest = mgr.write_manifest(2026, 1).expect("manifest");
assert_eq!(manifest.event_count, 1);
}
#[test]
fn rotate_creates_shard_if_none_exist() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let (y, m) = mgr.rotate_if_needed().expect("rotate");
let (ey, em) = current_year_month();
assert_eq!((y, m), (ey, em));
assert!(mgr.shard_path(y, m).exists());
}
#[test]
fn rotate_no_op_same_month() {
let (_tmp, mgr) = setup();
let (y, m) = mgr.init().expect("init");
let (y2, m2) = mgr.rotate_if_needed().expect("rotate");
assert_eq!((y, m), (y2, m2));
}
#[test]
fn rotate_different_month_seals_and_creates() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2025, 11).expect("create");
mgr.append_raw(2025, 11, "old event\n").expect("append");
let (y, m) = mgr.rotate_if_needed().expect("rotate");
let (ey, em) = current_year_month();
assert_eq!((y, m), (ey, em));
assert!(mgr.manifest_path(2025, 11).exists());
assert!(mgr.shard_path(ey, em).exists());
}
#[test]
fn frozen_shard_not_modified_by_append() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2025, 6).expect("create");
mgr.append_raw(2025, 6, "old event\n").expect("append");
let old_content = mgr.read_shard(2025, 6).expect("read");
mgr.init().expect("init");
mgr.append("new event\n", false, Duration::from_secs(1))
.expect("append");
let after_content = mgr.read_shard(2025, 6).expect("read");
assert_eq!(old_content, after_content);
}
#[test]
fn system_time_us_is_positive() {
let ts = system_time_us();
assert!(ts > 0, "system time should be positive: {ts}");
}
#[test]
fn system_time_us_is_reasonable() {
let ts = system_time_us();
let jan_2020_us: i64 = 1_577_836_800_000_000;
assert!(ts > jan_2020_us, "system time too small: {ts}");
}
#[test]
fn total_content_len_empty_repo() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
let len = mgr.total_content_len().expect("len");
assert_eq!(len, 0);
}
#[test]
fn total_content_len_single_shard() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "line1\n").expect("append");
mgr.append_raw(2026, 1, "line2\n").expect("append");
let full = mgr.replay().expect("replay");
let len = mgr.total_content_len().expect("len");
assert_eq!(len, full.len());
}
#[test]
fn total_content_len_multiple_shards() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("shard 1");
mgr.create_shard(2026, 2).expect("shard 2");
mgr.append_raw(2026, 1, "jan-event\n").expect("append jan");
mgr.append_raw(2026, 2, "feb-event\n").expect("append feb");
let full = mgr.replay().expect("replay");
let len = mgr.total_content_len().expect("len");
assert_eq!(len, full.len(), "total_content_len must match replay len");
}
#[test]
fn read_content_range_empty_range() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event\n").expect("append");
let result = mgr.read_content_range(5, 5).expect("range");
assert!(result.is_empty());
}
#[test]
fn read_content_range_within_single_shard() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "ABCDEF\n").expect("append");
let full = mgr.replay().expect("replay");
let pos = full.find("ABCDEF").expect("ABCDEF must be in shard");
let range = mgr.read_content_range(pos, pos + 7).expect("range");
assert_eq!(range, "ABCDEF\n");
}
#[test]
fn read_content_range_across_shard_boundary() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("shard 1");
mgr.create_shard(2026, 2).expect("shard 2");
mgr.append_raw(2026, 1, "jan-last-line\n").expect("jan");
mgr.append_raw(2026, 2, "feb-first-line\n").expect("feb");
let full = mgr.replay().expect("replay");
let range = mgr.read_content_range(0, full.len()).expect("full range");
assert_eq!(range, full);
}
#[test]
fn read_content_range_beyond_end() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event\n").expect("append");
let full = mgr.replay().expect("replay");
let range = mgr
.read_content_range(full.len(), full.len() + 100)
.expect("beyond end");
assert!(range.is_empty());
}
#[test]
fn replay_from_offset_zero_returns_full_content() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event1\n").expect("e1");
mgr.append_raw(2026, 1, "event2\n").expect("e2");
let full = mgr.replay().expect("full replay");
let (from_zero, total_len) = mgr.replay_from_offset(0).expect("from 0");
assert_eq!(from_zero, full);
assert_eq!(total_len, full.len());
}
#[test]
fn replay_from_offset_skips_content_before_cursor() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event1\n").expect("e1");
mgr.append_raw(2026, 1, "event2\n").expect("e2");
mgr.append_raw(2026, 1, "event3\n").expect("e3");
let full = mgr.replay().expect("full replay");
let cursor = full.find("event3").expect("event3 in content");
let (tail, total_len) = mgr.replay_from_offset(cursor).expect("from cursor");
assert_eq!(tail, "event3\n");
assert_eq!(total_len, full.len());
}
#[test]
fn replay_from_offset_at_end_returns_empty() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create");
mgr.append_raw(2026, 1, "event1\n").expect("e1");
let full = mgr.replay().expect("full replay");
let (tail, total_len) = mgr.replay_from_offset(full.len()).expect("at end");
assert!(tail.is_empty(), "tail should be empty at end of content");
assert_eq!(total_len, full.len());
}
#[test]
fn replay_from_offset_skips_sealed_shards_before_cursor() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("jan");
mgr.create_shard(2026, 2).expect("feb");
mgr.append_raw(2026, 1, "jan-event1\n").expect("jan e1");
mgr.append_raw(2026, 1, "jan-event2\n").expect("jan e2");
mgr.append_raw(2026, 2, "feb-event1\n").expect("feb e1");
mgr.append_raw(2026, 2, "feb-event2\n").expect("feb e2");
let full = mgr.replay().expect("full replay");
let jan_shard_len = mgr.read_shard(2026, 1).expect("read jan").len();
let (tail, total_len) = mgr
.replay_from_offset(jan_shard_len)
.expect("from feb start");
assert!(
!tail.contains("jan-event"),
"jan events should not appear in tail"
);
assert!(tail.contains("feb-event1"), "feb events must be in tail");
assert!(tail.contains("feb-event2"), "feb events must be in tail");
assert_eq!(total_len, full.len());
}
#[test]
fn replay_from_offset_total_len_equals_total_content_len() {
let (_tmp, mgr) = setup();
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("shard 1");
mgr.create_shard(2026, 2).expect("shard 2");
mgr.append_raw(2026, 1, "event-a\n").expect("ea");
mgr.append_raw(2026, 2, "event-b\n").expect("eb");
let total = mgr.total_content_len().expect("total_content_len");
let (_, replay_total) = mgr.replay_from_offset(0).expect("replay_from_offset");
assert_eq!(
total, replay_total,
"total_content_len and replay_from_offset total must agree"
);
}
#[test]
fn replay_lines_skips_forwarding_pointer_shard() {
let dir = tempfile::tempdir().expect("tmpdir");
let mgr = ShardManager::new(dir.path());
mgr.ensure_dirs().expect("dirs");
mgr.create_shard(2026, 1).expect("create jan");
mgr.append_raw(2026, 1, "# bones event log v1\n")
.expect("header");
mgr.append_raw(2026, 1, "real-event-line\n").expect("event");
let feb_path = dir.path().join("events").join("2026-02.events");
fs::write(&feb_path, "2026-03.events").expect("write forwarding pointer");
mgr.create_shard(2026, 3).expect("create mar");
mgr.append_raw(2026, 3, "# bones event log v1\n")
.expect("mar header");
mgr.append_raw(2026, 3, "another-event-line\n")
.expect("mar event");
let lines: Vec<String> = mgr
.replay_lines()
.expect("replay_lines")
.map(|r| r.expect("line").1)
.collect();
assert!(
lines.iter().any(|l| l.contains("real-event-line")),
"jan event missing"
);
assert!(
lines.iter().any(|l| l.contains("another-event-line")),
"mar event missing"
);
assert!(
!lines.iter().any(|l| l.contains("2026-03.events")),
"forwarding pointer content must not appear in replay"
);
}
}