use std::collections::BTreeMap;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::sync::{Arc, PoisonError, RwLock};
use talea_core::types::Seq;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use crate::frame::{HEADER_LEN, WireEvent, decode_frame};
pub const DEFAULT_SEGMENT_MAX: u64 = 128 * 1024 * 1024;
#[derive(Clone)]
pub struct SegmentCatalog(pub(crate) Arc<RwLock<BTreeMap<Seq, PathBuf>>>);
fn read_catalog(
lock: &RwLock<BTreeMap<Seq, PathBuf>>,
) -> std::sync::RwLockReadGuard<'_, BTreeMap<Seq, PathBuf>> {
lock.read().unwrap_or_else(PoisonError::into_inner)
}
fn write_catalog(
lock: &RwLock<BTreeMap<Seq, PathBuf>>,
) -> std::sync::RwLockWriteGuard<'_, BTreeMap<Seq, PathBuf>> {
lock.write().unwrap_or_else(PoisonError::into_inner)
}
impl SegmentCatalog {
pub async fn scan_from(&self, from: Seq, limit: usize) -> std::io::Result<Vec<WireEvent>> {
if limit == 0 {
return Ok(vec![]);
}
let snapshot = read_catalog(&self.0).clone();
let (Some((&first_base, _)), Some((&last_base, _))) =
(snapshot.first_key_value(), snapshot.last_key_value())
else {
return Ok(vec![]);
};
let start_base = snapshot
.range(..=from)
.next_back()
.map(|(k, _)| *k)
.unwrap_or(first_base);
let mut results = Vec::new();
'segments: for (&seg_base, path) in snapshot.range(start_base..) {
if results.len() >= limit {
break;
}
let is_last = seg_base == last_base;
let bytes = tokio::fs::read(path).await?;
let mut pos = 0usize;
loop {
if results.len() >= limit {
break;
}
match decode_frame(&bytes[pos..]) {
Ok(None) => break,
Ok(Some((ev, consumed))) => {
pos += consumed;
if ev.seq >= from {
results.push(ev);
}
}
Err(_) if is_last => {
break 'segments;
}
Err(e) => {
return Err(std::io::Error::other(format!(
"decode error in {path:?} at offset {pos}: {e}"
)));
}
}
}
}
Ok(results)
}
pub async fn scan_with_pos(
&self,
from: Seq,
limit: usize,
) -> std::io::Result<Vec<(WireEvent, crate::state::FramePos)>> {
if limit == 0 {
return Ok(vec![]);
}
let snapshot = read_catalog(&self.0).clone();
let (Some((&first_base, _)), Some((&last_base, _))) =
(snapshot.first_key_value(), snapshot.last_key_value())
else {
return Ok(vec![]);
};
let start_base = snapshot
.range(..=from)
.next_back()
.map(|(k, _)| *k)
.unwrap_or(first_base);
let mut results = Vec::new();
'segments: for (&seg_base, path) in snapshot.range(start_base..) {
if results.len() >= limit {
break;
}
let is_last = seg_base == last_base;
let bytes = tokio::fs::read(path).await?;
let mut byte_offset: u64 = 0;
loop {
if results.len() >= limit {
break;
}
match decode_frame(&bytes[byte_offset as usize..]) {
Ok(None) => break,
Ok(Some((ev, consumed))) => {
let frame_start = byte_offset;
byte_offset += consumed as u64;
if ev.seq >= from {
results.push((ev, (seg_base, frame_start)));
}
}
Err(_) if is_last => {
break 'segments;
}
Err(e) => {
return Err(std::io::Error::other(format!(
"decode error in {path:?} at offset {byte_offset}: {e}"
)));
}
}
}
}
Ok(results)
}
pub async fn read_at(
&self,
segment_base: Seq,
offset: u64,
expected_seq: Seq,
) -> std::io::Result<WireEvent> {
let path = {
let guard = read_catalog(&self.0);
guard.get(&segment_base).cloned().ok_or_else(|| {
std::io::Error::other(format!("unknown segment base {segment_base}"))
})?
};
let mut file = File::open(&path).await?;
let file_len = file.metadata().await?.len();
file.seek(SeekFrom::Start(offset)).await?;
let mut len_bytes = [0u8; 4];
let mut crc_bytes = [0u8; HEADER_LEN - 4];
file.read_exact(&mut len_bytes).await?;
file.read_exact(&mut crc_bytes).await?;
let payload_len = u32::from_le_bytes(len_bytes) as usize;
let frame_end = offset
.checked_add((HEADER_LEN + payload_len) as u64)
.ok_or_else(|| std::io::Error::other("frame length arithmetic overflow"))?;
if frame_end > file_len {
return Err(std::io::Error::other(format!(
"frame header at {offset} claims {payload_len} bytes past end of segment"
)));
}
let mut frame = vec![0u8; HEADER_LEN + payload_len];
frame[..4].copy_from_slice(&len_bytes);
frame[4..HEADER_LEN].copy_from_slice(&crc_bytes);
file.read_exact(&mut frame[HEADER_LEN..]).await?;
let ev = match decode_frame(&frame) {
Ok(Some((ev, _))) => ev,
Ok(None) => return Err(std::io::Error::other("empty frame at read_at")),
Err(e) => {
return Err(std::io::Error::other(format!(
"decode error at offset {offset}: {e}"
)));
}
};
if ev.seq != expected_seq {
return Err(std::io::Error::other(format!(
"frame at {segment_base}/{offset} has seq {got}, expected {expected_seq} — stale or wrong position",
got = ev.seq,
)));
}
Ok(ev)
}
}
#[derive(Debug)]
enum Validation {
Clean,
Truncate(u64), Corrupt(u64), }
fn segment_name(base: Seq) -> String {
format!("segment-{:020}.log", base)
}
fn parse_base(name: &str) -> Option<Seq> {
let s = name.strip_prefix("segment-")?.strip_suffix(".log")?;
s.parse().ok()
}
fn fsync_dir(dir: &Path) -> std::io::Result<()> {
std::fs::File::open(dir)?.sync_all()
}
fn validate(path: &Path, is_final: bool) -> std::io::Result<Validation> {
let bytes = match std::fs::read(path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(Validation::Clean);
}
Err(e) => return Err(e),
};
let mut pos: usize = 0;
let mut last_good: usize = 0;
loop {
match decode_frame(&bytes[pos..]) {
Ok(None) => return Ok(Validation::Clean),
Ok(Some((_, consumed))) => {
pos += consumed;
last_good = pos;
}
Err(_) if is_final => {
return Ok(Validation::Truncate(last_good as u64));
}
Err(_) => {
return Ok(Validation::Corrupt(pos as u64));
}
}
}
}
pub struct SegmentSet {
dir: PathBuf,
catalog: SegmentCatalog,
active: File,
active_len: u64,
segment_max: u64,
#[cfg(test)]
pub sync_hook: Option<std::sync::Arc<dyn Fn() -> std::io::Result<()> + Send + Sync>>,
}
impl SegmentSet {
pub async fn open(dir: &Path) -> std::io::Result<Self> {
Self::open_with_max(dir, DEFAULT_SEGMENT_MAX).await
}
pub async fn open_with_max(dir: &Path, segment_max: u64) -> std::io::Result<Self> {
tokio::fs::create_dir_all(dir).await?;
let mut segments: BTreeMap<Seq, PathBuf> = BTreeMap::new();
let mut rd = tokio::fs::read_dir(dir).await?;
while let Some(entry) = rd.next_entry().await? {
let name = entry
.file_name()
.into_string()
.map_err(|_| std::io::Error::other("non-UTF-8 filename in segment dir"))?;
if let Some(base) = parse_base(&name) {
segments.insert(base, entry.path());
}
}
if segments.is_empty() {
let path = dir.join(segment_name(1));
segments.insert(1, path);
}
let bases: Vec<Seq> = segments.keys().copied().collect();
let final_base = *bases
.last()
.ok_or_else(|| std::io::Error::other("segment map empty after seeding"))?;
for base in &bases {
let path = segments[base].clone();
let is_final = *base == final_base;
match validate(&path, is_final)? {
Validation::Clean => {}
Validation::Truncate(good_len) => {
let discarded = std::fs::metadata(&path)?.len().saturating_sub(good_len);
tracing::warn!(
?path,
discarded_bytes = discarded,
"torn tail in final segment; truncating to last good frame"
);
let f = std::fs::OpenOptions::new().write(true).open(&path)?;
f.set_len(good_len)?;
f.sync_all()?;
}
Validation::Corrupt(off) => {
return Err(std::io::Error::other(format!(
"corrupt frame in sealed segment {path:?} at offset {off}"
)));
}
}
}
let active_path = segments[&final_base].clone();
let active = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&active_path)
.await?;
let active_len = active.metadata().await?.len();
fsync_dir(dir)?;
let catalog = SegmentCatalog(Arc::new(RwLock::new(segments)));
Ok(Self {
dir: dir.to_path_buf(),
catalog,
active,
active_len,
segment_max,
#[cfg(test)]
sync_hook: None,
})
}
pub fn catalog(&self) -> SegmentCatalog {
self.catalog.clone()
}
pub async fn append(&mut self, frame: &[u8]) -> std::io::Result<()> {
self.active.write_all(frame).await?;
self.active_len += frame.len() as u64;
Ok(())
}
pub async fn sync(&mut self) -> std::io::Result<()> {
#[cfg(test)]
if let Some(h) = &self.sync_hook {
h()?;
}
self.active.sync_all().await
}
#[cfg(test)]
pub fn set_sync_hook(
&mut self,
hook: Option<std::sync::Arc<dyn Fn() -> std::io::Result<()> + Send + Sync>>,
) {
self.sync_hook = hook;
}
pub async fn maybe_rotate(&mut self, next_seq: Seq) -> std::io::Result<()> {
if self.active_len >= self.segment_max {
self.active.sync_all().await?;
let new_path = self.dir.join(segment_name(next_seq));
let new_file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&new_path)
.await?;
fsync_dir(&self.dir)?;
write_catalog(&self.catalog.0).insert(next_seq, new_path);
self.active = new_file;
self.active_len = 0;
}
Ok(())
}
pub async fn scan_from(&self, from: Seq, limit: usize) -> std::io::Result<Vec<WireEvent>> {
self.catalog.scan_from(from, limit).await
}
pub async fn scan_with_pos(
&self,
from: Seq,
limit: usize,
) -> std::io::Result<Vec<(WireEvent, crate::state::FramePos)>> {
self.catalog.scan_with_pos(from, limit).await
}
pub fn next_pos(&self) -> (Seq, u64) {
let base = read_catalog(&self.catalog.0)
.last_key_value()
.map_or(1, |(&b, _)| b);
(base, self.active_len)
}
pub async fn read_at(
&self,
segment_base: Seq,
offset: u64,
expected_seq: Seq,
) -> std::io::Result<WireEvent> {
self.catalog
.read_at(segment_base, offset, expected_seq)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::frame::{WireEvent, encode_frame};
use talea_core::events::LedgerEvent;
use talea_core::store::AccountCfg;
use talea_core::types::*;
fn ev(seq: Seq) -> WireEvent {
WireEvent {
seq,
at: talea_core::store::ledger_now(),
event: LedgerEvent::AccountOpened {
def: AccountDef {
id: AccountId {
book: Book("b".into()),
path: format!("a{seq}"),
},
asset: AssetId::new("USD"),
kind: AccountKind::Asset,
},
cfg: AccountCfg {
normal_side: None,
min_balance: None,
},
},
}
}
#[tokio::test]
async fn append_then_scan_round_trips() {
let dir = tempfile::tempdir().unwrap();
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
for s in 1..=5 {
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
let got = seg.scan_from(1, 100).await.unwrap();
assert_eq!(
got.iter().map(|e| e.seq).collect::<Vec<_>>(),
vec![1, 2, 3, 4, 5]
);
let page = seg.scan_from(3, 2).await.unwrap();
assert_eq!(page.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![3, 4]);
}
#[tokio::test]
async fn rotation_starts_a_new_segment_named_by_base_seq() {
let dir = tempfile::tempdir().unwrap();
let mut seg = SegmentSet::open_with_max(dir.path(), 64).await.unwrap(); for s in 1..=10 {
seg.maybe_rotate(s).await.unwrap();
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
let names: Vec<String> = std::fs::read_dir(dir.path())
.unwrap()
.map(|e| e.unwrap().file_name().into_string().unwrap())
.collect();
assert!(names.len() >= 2, "expected rotation, got {names:?}");
assert_eq!(seg.scan_from(1, 100).await.unwrap().len(), 10);
}
#[tokio::test]
async fn torn_tail_on_final_segment_truncates() {
let dir = tempfile::tempdir().unwrap();
{
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
for s in 1..=3 {
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
}
let path = std::fs::read_dir(dir.path())
.unwrap()
.next()
.unwrap()
.unwrap()
.path();
let len = std::fs::metadata(&path).unwrap().len();
let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
f.set_len(len - 5).unwrap();
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
let got = seg.scan_from(1, 100).await.unwrap();
assert_eq!(got.len(), 2, "torn third frame must be truncated away");
seg.append(&encode_frame(&ev(3)).unwrap()).await.unwrap();
seg.sync().await.unwrap();
assert_eq!(seg.scan_from(1, 100).await.unwrap().len(), 3);
}
#[tokio::test]
async fn corruption_in_sealed_segment_refuses_open() {
let dir = tempfile::tempdir().unwrap();
{
let mut seg = SegmentSet::open_with_max(dir.path(), 64).await.unwrap();
for s in 1..=10 {
seg.maybe_rotate(s).await.unwrap();
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
}
let mut paths: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.map(|e| e.unwrap().path())
.collect();
paths.sort();
let mut bytes = std::fs::read(&paths[0]).unwrap();
let mid = bytes.len() / 2;
bytes[mid] ^= 0xff;
std::fs::write(&paths[0], bytes).unwrap();
assert!(SegmentSet::open(dir.path()).await.is_err());
}
#[tokio::test]
async fn torn_header_with_partial_payload_truncates() {
let dir = tempfile::tempdir().unwrap();
let good_len: u64;
{
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
for s in 1..=3i64 {
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
good_len = seg.next_pos().1;
}
let payload_len: u32 = 20;
let dummy_payload = vec![0xABu8; payload_len as usize];
let crc = crc32fast::hash(&dummy_payload);
let mut torn_fragment = Vec::with_capacity(HEADER_LEN + 10);
torn_fragment.extend_from_slice(&payload_len.to_le_bytes());
torn_fragment.extend_from_slice(&crc.to_le_bytes());
torn_fragment.extend_from_slice(&dummy_payload[..10]);
let path = std::fs::read_dir(dir.path())
.unwrap()
.next()
.unwrap()
.unwrap()
.path();
{
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
f.write_all(&torn_fragment).unwrap();
}
let seg = SegmentSet::open(dir.path()).await.unwrap();
let got = seg.scan_from(1, 100).await.unwrap();
assert_eq!(
got.len(),
3,
"torn 4th frame must be truncated; got {} frames",
got.len()
);
let actual_len = std::fs::metadata(&path).unwrap().len();
assert_eq!(
actual_len, good_len,
"file must be truncated back to {good_len}, found {actual_len}"
);
}
#[tokio::test]
async fn scan_with_pos_round_trips_via_read_at() {
let dir = tempfile::tempdir().unwrap();
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
for s in 1..=3i64 {
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
let pairs = seg.scan_with_pos(1, 100).await.unwrap();
assert_eq!(pairs.len(), 3);
for (wire, (seg_base, offset)) in &pairs {
let from_disk = seg.read_at(*seg_base, *offset, wire.seq).await.unwrap();
assert_eq!(
from_disk.seq, wire.seq,
"read_at must return the same seq at the recorded pos"
);
let ok = matches!(
(&wire.event, &from_disk.event),
(
talea_core::events::LedgerEvent::AccountOpened { .. },
talea_core::events::LedgerEvent::AccountOpened { .. },
)
);
assert!(ok, "event variant must match after read_at");
}
}
#[tokio::test]
async fn scan_stops_cleanly_at_inflight_torn_tail() {
let dir = tempfile::tempdir().unwrap();
{
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
for s in 1..=3i64 {
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.sync().await.unwrap();
}
let seg_path = std::fs::read_dir(dir.path())
.unwrap()
.next()
.unwrap()
.unwrap()
.path();
{
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&seg_path)
.unwrap();
let payload_len: u32 = 20;
f.write_all(&payload_len.to_le_bytes()).unwrap();
f.write_all(&0x12345678u32.to_le_bytes()).unwrap(); f.write_all(&[0xAB; 10]).unwrap(); }
let seg = SegmentSet::open(dir.path()).await.unwrap();
let got = seg.scan_from(1, 100).await.unwrap();
assert_eq!(
got.iter().map(|e| e.seq).collect::<Vec<_>>(),
vec![1, 2, 3],
"scan_from must return exactly 3 complete frames; torn tail must be a clean stop"
);
let dir2 = tempfile::tempdir().unwrap();
let seg2 = {
let mut s = SegmentSet::open(dir2.path()).await.unwrap();
for i in 1..=3i64 {
s.append(&encode_frame(&ev(i)).unwrap()).await.unwrap();
}
s.sync().await.unwrap();
s
};
let seg_path2 = std::fs::read_dir(dir2.path())
.unwrap()
.next()
.unwrap()
.unwrap()
.path();
{
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&seg_path2)
.unwrap();
f.write_all(&[0xFF; 16]).unwrap(); }
let got2 = seg2.catalog().scan_from(1, 100).await.unwrap();
assert_eq!(
got2.iter().map(|e| e.seq).collect::<Vec<_>>(),
vec![1, 2, 3],
"live scan_from must stop cleanly at in-flight garbage in the active segment"
);
let dir3 = tempfile::tempdir().unwrap();
{
let mut seg3 = SegmentSet::open_with_max(dir3.path(), 64).await.unwrap();
for s in 1..=10i64 {
seg3.maybe_rotate(s).await.unwrap();
seg3.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg3.sync().await.unwrap();
}
let mut paths3: Vec<_> = std::fs::read_dir(dir3.path())
.unwrap()
.map(|e| e.unwrap().path())
.collect();
paths3.sort();
let mut bytes3 = std::fs::read(&paths3[0]).unwrap();
let mid3 = bytes3.len() / 2;
bytes3[mid3] ^= 0xff;
std::fs::write(&paths3[0], &bytes3).unwrap();
let open_result = SegmentSet::open(dir3.path()).await;
assert!(
open_result.is_err(),
"opening with a corrupt sealed segment must fail"
);
}
#[tokio::test]
async fn empty_final_segment_after_rotation_is_clean() {
let dir = tempfile::tempdir().unwrap();
let pre_rotation_base: Seq;
let rotated_base: Seq = 4;
{
let mut seg = SegmentSet::open_with_max(dir.path(), 1).await.unwrap();
for s in 1..=3i64 {
seg.maybe_rotate(s).await.unwrap();
seg.append(&encode_frame(&ev(s)).unwrap()).await.unwrap();
}
seg.maybe_rotate(rotated_base).await.unwrap();
seg.sync().await.unwrap();
pre_rotation_base = seg.next_pos().0;
}
let mut seg = SegmentSet::open(dir.path()).await.unwrap();
let got = seg.scan_from(1, 100).await.unwrap();
assert_eq!(
got.len(),
3,
"expected 3 pre-rotation frames, got {}",
got.len()
);
let (base, off) = seg.next_pos();
assert_eq!(
base, pre_rotation_base,
"expected rotated base {pre_rotation_base}, got {base}"
);
assert_eq!(off, 0, "new segment must start at offset 0");
seg.append(&encode_frame(&ev(rotated_base)).unwrap())
.await
.unwrap();
seg.sync().await.unwrap();
let all = seg.scan_from(1, 100).await.unwrap();
assert_eq!(
all.len(),
4,
"expected 4 frames after append, got {}",
all.len()
);
}
}