use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
#[cfg(test)]
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::Notify;
use super::super::channel::ChannelName;
use super::entry::{RedexEntry, REDEX_ENTRY_SIZE};
use super::error::RedexError;
pub(super) struct RecoveredSegment {
pub disk: DiskSegment,
pub index: Vec<RedexEntry>,
pub payload_bytes: Vec<u8>,
pub timestamps: Option<Vec<u64>>,
}
pub(super) struct DiskSegment {
dir: PathBuf,
live_gen: AtomicU32,
idx_file: Mutex<File>,
dat_file: Mutex<File>,
ts_file: Mutex<File>,
worker_idx_file: Mutex<File>,
worker_dat_file: Mutex<File>,
worker_ts_file: Mutex<File>,
fsync_every_n: u64,
fsync_max_bytes: u64,
appends_since_sync: AtomicU64,
bytes_since_sync: AtomicU64,
pub(super) fsync_signal: Arc<Notify>,
poisoned: std::sync::atomic::AtomicBool,
#[cfg(test)]
fail_next_append: AtomicBool,
#[cfg(test)]
fail_after_dat_write: AtomicBool,
#[cfg(test)]
fail_next_sync: AtomicBool,
#[cfg(test)]
fail_next_compact: AtomicBool,
#[cfg(test)]
fail_next_idx_metadata: AtomicBool,
#[cfg(test)]
fail_next_ts_metadata: AtomicBool,
#[cfg(test)]
sync_count: AtomicU64,
}
impl DiskSegment {
pub(super) fn open(
base_dir: &Path,
name: &ChannelName,
fsync_every_n: u64,
fsync_max_bytes: u64,
) -> Result<RecoveredSegment, RedexError> {
let dir = channel_dir(base_dir, name);
std::fs::create_dir_all(&dir).map_err(RedexError::io)?;
let live_gen = resolve_live_generation(&dir).map_err(RedexError::io)?;
let live_dir = gen_dir(&dir, live_gen);
std::fs::create_dir_all(&live_dir).map_err(RedexError::io)?;
let idx_path = live_dir.join("idx");
let dat_path = live_dir.join("dat");
let (mut index, idx_len_truncated) = read_index(&idx_path)?;
let mut payload_bytes = read_payload(&dat_path)?;
if idx_len_truncated {
let file = OpenOptions::new()
.write(true)
.open(&idx_path)
.map_err(RedexError::io)?;
file.set_len((index.len() * REDEX_ENTRY_SIZE) as u64)
.map_err(RedexError::io)?;
file.sync_all().map_err(RedexError::io)?;
}
let dat_len = payload_bytes.len() as u64;
let mut truncate_at: Option<usize> = None;
for (i, e) in index.iter().enumerate().rev() {
if e.is_inline() {
continue;
}
let end = (e.payload_offset as u64).saturating_add(e.payload_len as u64);
if end > dat_len {
truncate_at = Some(i);
} else {
break;
}
}
let idx_trimmed = truncate_at.is_some();
if let Some(cut) = truncate_at {
index.truncate(cut);
}
if idx_trimmed {
let file = OpenOptions::new()
.write(true)
.open(&idx_path)
.map_err(RedexError::io)?;
file.set_len((index.len() * REDEX_ENTRY_SIZE) as u64)
.map_err(RedexError::io)?;
file.sync_all().map_err(RedexError::io)?;
}
let retained_dat_end = index
.iter()
.filter(|e| !e.is_inline())
.map(|e| (e.payload_offset as u64).saturating_add(e.payload_len as u64))
.max()
.unwrap_or(0);
if retained_dat_end < dat_len {
let file = OpenOptions::new()
.write(true)
.open(&dat_path)
.map_err(RedexError::io)?;
file.set_len(retained_dat_end).map_err(RedexError::io)?;
file.sync_all().map_err(RedexError::io)?;
payload_bytes.truncate(retained_dat_end as usize);
}
let ts_path = live_dir.join("ts");
let original_timestamps = read_timestamps(&ts_path, index.len())?;
let mut survivors: Vec<usize> = Vec::with_capacity(index.len());
for (i, e) in index.iter().enumerate() {
let payload: &[u8] = if e.is_inline() {
let Some(inline) = e.inline_payload() else {
continue;
};
let computed = super::entry::payload_checksum(&inline);
if e.checksum() != computed {
continue;
}
survivors.push(i);
continue;
} else {
let off = e.payload_offset as usize;
let len = e.payload_len as usize;
let end = off.saturating_add(len);
if end > payload_bytes.len() {
continue;
}
&payload_bytes[off..end]
};
let computed = super::entry::payload_checksum(payload);
if e.checksum() == computed {
survivors.push(i);
}
}
let bad_entries = index.len() - survivors.len();
if bad_entries > 0 {
let mut compacted = Vec::with_capacity(survivors.len());
for &i in &survivors {
compacted.push(index[i]);
}
index = compacted;
tracing::error!(
bad_entries,
surviving = index.len(),
"DiskSegment::open: dropped {} entries with bad checksums during recovery; \
on-disk dat may have torn writes or be corrupt",
bad_entries
);
}
let timestamps = original_timestamps.as_ref().map(|all_ts| {
survivors
.iter()
.map(|&i| all_ts.get(i).copied().unwrap_or(0))
.collect::<Vec<u64>>()
});
if let Some(ts) = timestamps.as_ref() {
if bad_entries > 0 {
if let Ok(mut file) = OpenOptions::new().write(true).truncate(true).open(&ts_path) {
use std::io::Write as _;
let mut buf = Vec::with_capacity(ts.len() * 8);
for &t in ts {
buf.extend_from_slice(&t.to_le_bytes());
}
let _ = file.write_all(&buf);
}
} else if !index.is_empty() {
if let Ok(file) = OpenOptions::new().write(true).open(&ts_path) {
let _ = file.set_len((index.len() * 8) as u64);
}
}
}
let idx_file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&idx_path)
.map_err(RedexError::io)?;
let dat_file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&dat_path)
.map_err(RedexError::io)?;
let ts_file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&ts_path)
.map_err(RedexError::io)?;
let worker_idx_file = idx_file.try_clone().map_err(RedexError::io)?;
let worker_dat_file = dat_file.try_clone().map_err(RedexError::io)?;
let worker_ts_file = ts_file.try_clone().map_err(RedexError::io)?;
sweep_orphan_generations(&dir, live_gen);
Ok(RecoveredSegment {
disk: DiskSegment {
dir,
live_gen: AtomicU32::new(live_gen),
idx_file: Mutex::new(idx_file),
dat_file: Mutex::new(dat_file),
ts_file: Mutex::new(ts_file),
worker_idx_file: Mutex::new(worker_idx_file),
worker_dat_file: Mutex::new(worker_dat_file),
worker_ts_file: Mutex::new(worker_ts_file),
fsync_every_n,
fsync_max_bytes,
appends_since_sync: AtomicU64::new(0),
bytes_since_sync: AtomicU64::new(0),
fsync_signal: Arc::new(Notify::new()),
poisoned: std::sync::atomic::AtomicBool::new(false),
#[cfg(test)]
fail_next_append: AtomicBool::new(false),
#[cfg(test)]
fail_after_dat_write: AtomicBool::new(false),
#[cfg(test)]
fail_next_sync: AtomicBool::new(false),
#[cfg(test)]
fail_next_compact: AtomicBool::new(false),
#[cfg(test)]
fail_next_idx_metadata: AtomicBool::new(false),
#[cfg(test)]
fail_next_ts_metadata: AtomicBool::new(false),
#[cfg(test)]
sync_count: AtomicU64::new(0),
},
index,
payload_bytes,
timestamps,
})
}
#[cfg(test)]
pub(super) fn sync_count(&self) -> u64 {
self.sync_count.load(Ordering::Acquire)
}
#[cfg(test)]
pub(super) fn worker_file_lens(&self) -> (u64, u64, u64) {
let dat = self
.worker_dat_file
.lock()
.metadata()
.map(|m| m.len())
.unwrap_or(0);
let idx = self
.worker_idx_file
.lock()
.metadata()
.map(|m| m.len())
.unwrap_or(0);
let ts = self
.worker_ts_file
.lock()
.metadata()
.map(|m| m.len())
.unwrap_or(0);
(dat, idx, ts)
}
fn maybe_sync_after_append(&self, applied: u64, bytes_written: u64) {
let mut should_signal = false;
if self.fsync_every_n > 0 && applied > 0 {
let prev = self.appends_since_sync.fetch_add(applied, Ordering::AcqRel);
let now = prev.saturating_add(applied);
if now >= self.fsync_every_n {
self.appends_since_sync.store(0, Ordering::Release);
should_signal = true;
}
}
if self.fsync_max_bytes > 0 && bytes_written > 0 {
let prev = self
.bytes_since_sync
.fetch_add(bytes_written, Ordering::AcqRel);
let now = prev.saturating_add(bytes_written);
if now >= self.fsync_max_bytes {
self.bytes_since_sync.store(0, Ordering::Release);
should_signal = true;
}
}
if should_signal {
self.fsync_signal.notify_one();
}
}
fn live_gen_path(&self, file_name: &str) -> PathBuf {
gen_dir(&self.dir, self.live_gen.load(Ordering::Acquire)).join(file_name)
}
#[cfg(test)]
pub(super) fn live_gen(&self) -> u32 {
self.live_gen.load(Ordering::Acquire)
}
#[cfg(test)]
pub(super) fn live_dir(&self) -> PathBuf {
gen_dir(&self.dir, self.live_gen.load(Ordering::Acquire))
}
fn rollback_truncate(&self, file_name: &str, target_len: u64) {
let path = self.live_gen_path(file_name);
match OpenOptions::new().write(true).open(&path) {
Ok(f) => {
if let Err(e) = f.set_len(target_len) {
tracing::error!(
error = %e,
path = %path.display(),
"redex disk rollback: {file_name} set_len failed; poisoning segment",
);
self.poisoned.store(true, Ordering::Release);
}
}
Err(e) => {
tracing::error!(
error = %e,
path = %path.display(),
"redex disk rollback: {file_name} open failed; poisoning segment",
);
self.poisoned.store(true, Ordering::Release);
}
}
}
fn rollback_after_idx_failure(&self, pre_idx_len: u64, dat_rollback: Option<u64>) {
self.rollback_truncate("idx", pre_idx_len);
if let Some(payload_len) = dat_rollback {
let dat_path = self.live_gen_path("dat");
let dat_target = match OpenOptions::new().read(true).open(&dat_path) {
Ok(f) => match f.metadata() {
Ok(m) => Some(m.len().saturating_sub(payload_len)),
Err(e) => {
tracing::error!(
error = %e,
path = %dat_path.display(),
"redex disk rollback: dat metadata failed; poisoning segment",
);
self.poisoned.store(true, Ordering::Release);
None
}
},
Err(e) => {
tracing::error!(
error = %e,
path = %dat_path.display(),
"redex disk rollback: dat open(read) failed; poisoning segment",
);
self.poisoned.store(true, Ordering::Release);
None
}
};
if let Some(target) = dat_target {
self.rollback_truncate("dat", target);
}
}
}
fn rollback_after_ts_failure(
&self,
pre_idx_len: u64,
pre_ts_len: u64,
dat_rollback: Option<u64>,
) {
self.rollback_truncate("ts", pre_ts_len);
self.rollback_after_idx_failure(pre_idx_len, dat_rollback);
}
#[cfg(test)]
pub(super) fn arm_next_append_failure(&self) {
self.fail_next_append.store(true, Ordering::Release);
}
#[allow(dead_code)]
pub(super) fn append_entry_at(
&self,
entry: &RedexEntry,
payload: &[u8],
timestamp_ns: u64,
) -> Result<(), RedexError> {
self.append_entry_inner(entry, payload, timestamp_ns)
}
#[allow(dead_code)]
pub(super) fn append_entry(
&self,
entry: &RedexEntry,
payload: &[u8],
) -> Result<(), RedexError> {
self.append_entry_inner(entry, payload, now_ns_disk())
}
fn append_entry_inner(
&self,
entry: &RedexEntry,
payload: &[u8],
timestamp_ns: u64,
) -> Result<(), RedexError> {
if self.poisoned.load(std::sync::atomic::Ordering::Acquire) {
return Err(RedexError::Io(
"redex segment is poisoned (partial-write rollback could not restore \
on-disk state to match in-memory); close and re-open the channel to recover"
.into(),
));
}
#[cfg(test)]
if self.fail_next_append.swap(false, Ordering::AcqRel) {
return Err(RedexError::Io("test-injected append failure".into()));
}
if !entry.is_inline() {
let mut dat = self.dat_file.lock();
let pre_len = dat.metadata().map_err(RedexError::io)?.len();
if let Err(e) = dat.write_all(payload) {
drop(dat);
let dat_path = self.live_gen_path("dat");
if let Ok(f) = OpenOptions::new().write(true).open(&dat_path) {
let _ = f.set_len(pre_len);
}
return Err(RedexError::io(e));
}
}
#[cfg(test)]
if self.fail_after_dat_write.swap(false, Ordering::AcqRel) {
if !entry.is_inline() {
let dat_path = self.live_gen_path("dat");
if let Ok(f) = OpenOptions::new().write(true).open(&dat_path) {
let cur = f.metadata().map(|m| m.len()).unwrap_or(0);
let _ = f.set_len(cur.saturating_sub(payload.len() as u64));
}
}
return Err(RedexError::Io(
"test-injected post-dat-pre-idx failure".into(),
));
}
let mut idx = self.idx_file.lock();
#[cfg(test)]
let idx_metadata = if self.fail_next_idx_metadata.swap(false, Ordering::AcqRel) {
Err(std::io::Error::other("test-injected idx.metadata failure"))
} else {
idx.metadata()
};
#[cfg(not(test))]
let idx_metadata = idx.metadata();
let pre_idx_len = match idx_metadata {
Ok(m) => m.len(),
Err(e) => {
drop(idx);
if !entry.is_inline() {
let dat_path = self.live_gen_path("dat");
if let Ok(f) = OpenOptions::new().write(true).open(&dat_path) {
let cur = f.metadata().map(|m| m.len()).unwrap_or(0);
let _ = f.set_len(cur.saturating_sub(payload.len() as u64));
}
}
return Err(RedexError::io(e));
}
};
if let Err(e) = idx.write_all(&entry.to_bytes()) {
drop(idx);
let dat_rollback = if entry.is_inline() {
None
} else {
Some(payload.len() as u64)
};
self.rollback_after_idx_failure(pre_idx_len, dat_rollback);
return Err(RedexError::io(e));
}
drop(idx);
let mut ts = self.ts_file.lock();
#[cfg(test)]
let ts_metadata = if self.fail_next_ts_metadata.swap(false, Ordering::AcqRel) {
Err(std::io::Error::other("test-injected ts.metadata failure"))
} else {
ts.metadata()
};
#[cfg(not(test))]
let ts_metadata = ts.metadata();
let pre_ts_len = match ts_metadata {
Ok(m) => m.len(),
Err(e) => {
drop(ts);
let dat_rollback = if entry.is_inline() {
None
} else {
Some(payload.len() as u64)
};
self.rollback_after_idx_failure(pre_idx_len, dat_rollback);
return Err(RedexError::io(e));
}
};
if let Err(e) = ts.write_all(×tamp_ns.to_le_bytes()) {
drop(ts);
let dat_rollback = if entry.is_inline() {
None
} else {
Some(payload.len() as u64)
};
self.rollback_after_ts_failure(pre_idx_len, pre_ts_len, dat_rollback);
return Err(RedexError::io(e));
}
drop(ts);
let dat_bytes = if entry.is_inline() {
0
} else {
payload.len() as u64
};
let total_bytes = dat_bytes + REDEX_ENTRY_SIZE as u64 + 8;
self.maybe_sync_after_append(1, total_bytes);
Ok(())
}
#[cfg(test)]
pub(super) fn arm_next_post_dat_failure(&self) {
self.fail_after_dat_write.store(true, Ordering::Release);
}
#[allow(dead_code)]
pub(super) fn append_entries(
&self,
entries_and_payloads: &[(RedexEntry, &[u8])],
) -> Result<(), RedexError> {
let now = now_ns_disk();
let timestamps: Vec<u64> = vec![now; entries_and_payloads.len()];
self.append_entries_inner(entries_and_payloads, ×tamps)
}
#[allow(dead_code)]
pub(super) fn append_entries_at(
&self,
entries_and_payloads: &[(RedexEntry, &[u8])],
timestamps: &[u64],
) -> Result<(), RedexError> {
if timestamps.len() != entries_and_payloads.len() {
return Err(RedexError::Io(format!(
"append_entries_at: timestamps len ({}) != entries len ({})",
timestamps.len(),
entries_and_payloads.len()
)));
}
self.append_entries_inner(entries_and_payloads, timestamps)
}
fn append_entries_inner(
&self,
entries_and_payloads: &[(RedexEntry, &[u8])],
timestamps: &[u64],
) -> Result<(), RedexError> {
if self.poisoned.load(std::sync::atomic::Ordering::Acquire) {
return Err(RedexError::Io(
"redex segment is poisoned (partial-write rollback could not restore \
on-disk state to match in-memory); close and re-open the channel to recover"
.into(),
));
}
#[cfg(test)]
if self.fail_next_append.swap(false, Ordering::AcqRel) {
return Err(RedexError::Io("test-injected append failure".into()));
}
let total_dat: usize = entries_and_payloads
.iter()
.filter(|(e, _)| !e.is_inline())
.map(|(_, p)| p.len())
.sum();
let mut dat_buf: Vec<u8> = Vec::with_capacity(total_dat);
let mut idx_buf: Vec<u8> =
Vec::with_capacity(entries_and_payloads.len() * REDEX_ENTRY_SIZE);
let mut ts_buf: Vec<u8> = Vec::with_capacity(timestamps.len() * 8);
for ((entry, payload), &t) in entries_and_payloads.iter().zip(timestamps) {
if !entry.is_inline() {
dat_buf.extend_from_slice(payload);
}
idx_buf.extend_from_slice(&entry.to_bytes());
ts_buf.extend_from_slice(&t.to_le_bytes());
}
let dat_pre_len: Option<u64> = if !dat_buf.is_empty() {
let mut dat = self.dat_file.lock();
let pre_len = dat.metadata().map_err(RedexError::io)?.len();
if let Err(e) = dat.write_all(&dat_buf) {
drop(dat);
self.rollback_truncate("dat", pre_len);
return Err(RedexError::io(e));
}
drop(dat);
Some(pre_len)
} else {
None
};
let mut idx = self.idx_file.lock();
#[cfg(test)]
let idx_metadata = if self.fail_next_idx_metadata.swap(false, Ordering::AcqRel) {
Err(std::io::Error::other("test-injected idx.metadata failure"))
} else {
idx.metadata()
};
#[cfg(not(test))]
let idx_metadata = idx.metadata();
let idx_pre_len = match idx_metadata {
Ok(m) => m.len(),
Err(e) => {
drop(idx);
if let Some(pre_len) = dat_pre_len {
self.rollback_truncate("dat", pre_len);
}
return Err(RedexError::io(e));
}
};
if let Err(e) = idx.write_all(&idx_buf) {
drop(idx);
self.rollback_truncate("idx", idx_pre_len);
if let Some(pre_len) = dat_pre_len {
self.rollback_truncate("dat", pre_len);
}
return Err(RedexError::io(e));
}
drop(idx);
let mut ts = self.ts_file.lock();
#[cfg(test)]
let ts_metadata = if self.fail_next_ts_metadata.swap(false, Ordering::AcqRel) {
Err(std::io::Error::other("test-injected ts.metadata failure"))
} else {
ts.metadata()
};
#[cfg(not(test))]
let ts_metadata = ts.metadata();
let ts_pre_len = match ts_metadata {
Ok(m) => m.len(),
Err(e) => {
drop(ts);
self.rollback_truncate("idx", idx_pre_len);
if let Some(pre_len) = dat_pre_len {
self.rollback_truncate("dat", pre_len);
}
return Err(RedexError::io(e));
}
};
if let Err(e) = ts.write_all(&ts_buf) {
drop(ts);
self.rollback_truncate("ts", ts_pre_len);
self.rollback_truncate("idx", idx_pre_len);
if let Some(pre_len) = dat_pre_len {
self.rollback_truncate("dat", pre_len);
}
return Err(RedexError::io(e));
}
drop(ts);
let total_bytes = (dat_buf.len() + idx_buf.len() + ts_buf.len()) as u64;
self.maybe_sync_after_append(entries_and_payloads.len() as u64, total_bytes);
Ok(())
}
pub(super) fn sync(&self) -> Result<(), RedexError> {
#[cfg(test)]
if self.fail_next_sync.swap(false, Ordering::AcqRel) {
return Err(RedexError::Io("test-injected sync failure".into()));
}
self.worker_dat_file
.lock()
.sync_all()
.map_err(RedexError::io)?;
self.worker_idx_file
.lock()
.sync_all()
.map_err(RedexError::io)?;
self.worker_ts_file
.lock()
.sync_all()
.map_err(RedexError::io)?;
self.bytes_since_sync.store(0, Ordering::Release);
#[cfg(test)]
self.sync_count.fetch_add(1, Ordering::AcqRel);
Ok(())
}
#[cfg(test)]
pub(super) fn arm_next_sync_failure(&self) {
self.fail_next_sync.store(true, Ordering::Release);
}
#[cfg(test)]
pub(super) fn arm_next_compact_failure(&self) {
self.fail_next_compact.store(true, Ordering::Release);
}
#[cfg(test)]
pub(super) fn arm_next_idx_metadata_failure(&self) {
self.fail_next_idx_metadata.store(true, Ordering::Release);
}
#[cfg(test)]
pub(super) fn arm_next_ts_metadata_failure(&self) {
self.fail_next_ts_metadata.store(true, Ordering::Release);
}
pub(super) fn compact_to(
&self,
surviving_index: &[RedexEntry],
surviving_timestamps: &[u64],
dat_base: u64,
) -> Result<(), RedexError> {
if self.poisoned.load(std::sync::atomic::Ordering::Acquire) {
return Err(RedexError::Io(
"redex segment is poisoned; refusing compact_to".into(),
));
}
#[cfg(test)]
if self.fail_next_compact.swap(false, Ordering::AcqRel) {
return Err(RedexError::Io("test-injected compact_to failure".into()));
}
if surviving_index.len() != surviving_timestamps.len() {
return Err(RedexError::Io(format!(
"compact_to: index/timestamp length mismatch ({} vs {})",
surviving_index.len(),
surviving_timestamps.len()
)));
}
let cur_gen = self.live_gen.load(Ordering::Acquire);
let next_gen = cur_gen.checked_add(1).ok_or_else(|| {
RedexError::Io(
"compact_to: live_gen at u32::MAX; refusing further compactions \
(re-create the channel to reset)"
.into(),
)
})?;
let cur_dir = gen_dir(&self.dir, cur_gen);
let next_dir = gen_dir(&self.dir, next_gen);
let mut new_idx_bytes = Vec::with_capacity(surviving_index.len() * REDEX_ENTRY_SIZE);
for entry in surviving_index {
if entry.is_inline() {
new_idx_bytes.extend_from_slice(&entry.to_bytes());
} else {
let abs = entry.payload_offset as u64;
let rebased = abs.checked_sub(dat_base).ok_or_else(|| {
RedexError::Encode(format!(
"compact_to: heap entry with payload_offset={} \
below dat_base={} would corrupt the new dat \
layout",
abs, dat_base
))
})?;
let mut e = *entry;
e.payload_offset = rebased as u32;
new_idx_bytes.extend_from_slice(&e.to_bytes());
}
}
let mut new_ts_bytes = Vec::with_capacity(surviving_timestamps.len() * 8);
for &t in surviving_timestamps {
new_ts_bytes.extend_from_slice(&t.to_le_bytes());
}
let cur_dat_path = cur_dir.join("dat");
let old_dat = read_payload(&cur_dat_path)?;
let new_dat = if dat_base as usize >= old_dat.len() {
Vec::new()
} else {
old_dat[dat_base as usize..].to_vec()
};
std::fs::create_dir_all(&next_dir).map_err(RedexError::io)?;
let next_idx_path = next_dir.join("idx");
let next_dat_path = next_dir.join("dat");
let next_ts_path = next_dir.join("ts");
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&next_idx_path)
.map_err(RedexError::io)?;
f.write_all(&new_idx_bytes).map_err(RedexError::io)?;
f.sync_all().map_err(RedexError::io)?;
}
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&next_dat_path)
.map_err(RedexError::io)?;
f.write_all(&new_dat).map_err(RedexError::io)?;
f.sync_all().map_err(RedexError::io)?;
}
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&next_ts_path)
.map_err(RedexError::io)?;
f.write_all(&new_ts_bytes).map_err(RedexError::io)?;
f.sync_all().map_err(RedexError::io)?;
}
fsync_dir(&next_dir).map_err(RedexError::io)?;
let new_idx = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&next_idx_path)
.map_err(RedexError::io)?;
let new_dat = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&next_dat_path)
.map_err(RedexError::io)?;
let new_ts = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&next_ts_path)
.map_err(RedexError::io)?;
let new_idx_worker = new_idx.try_clone().map_err(RedexError::io)?;
let new_dat_worker = new_dat.try_clone().map_err(RedexError::io)?;
let new_ts_worker = new_ts.try_clone().map_err(RedexError::io)?;
let mut dat_guard = self.dat_file.lock();
let mut idx_guard = self.idx_file.lock();
let mut ts_guard = self.ts_file.lock();
let mut worker_dat_guard = self.worker_dat_file.lock();
let mut worker_idx_guard = self.worker_idx_file.lock();
let mut worker_ts_guard = self.worker_ts_file.lock();
if let Err(e) = write_manifest_atomic(&self.dir, next_gen) {
return Err(RedexError::io(e));
}
*idx_guard = new_idx;
*dat_guard = new_dat;
*ts_guard = new_ts;
*worker_idx_guard = new_idx_worker;
*worker_dat_guard = new_dat_worker;
*worker_ts_guard = new_ts_worker;
self.live_gen.store(next_gen, Ordering::Release);
drop(worker_ts_guard);
drop(worker_idx_guard);
drop(worker_dat_guard);
drop(ts_guard);
drop(idx_guard);
drop(dat_guard);
delete_generation(&self.dir, cur_gen);
Ok(())
}
}
#[inline]
#[allow(dead_code)]
fn now_ns_disk() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
pub(super) fn channel_dir(base_dir: &Path, name: &ChannelName) -> PathBuf {
let mut p = base_dir.to_path_buf();
for seg in name.as_str().split('/') {
p.push(seg);
}
p
}
const MANIFEST_MAGIC: [u8; 4] = *b"REDM";
const MANIFEST_VERSION: u8 = 1;
const MANIFEST_SIZE: usize = 16;
const FIRST_GENERATION: u32 = 1;
fn gen_dir_name(gen: u32) -> String {
format!("v{:010}", gen)
}
fn manifest_path(channel_dir: &Path) -> PathBuf {
channel_dir.join("manifest")
}
fn manifest_tmp_path(channel_dir: &Path) -> PathBuf {
channel_dir.join("manifest.tmp")
}
fn gen_dir(channel_dir: &Path, gen: u32) -> PathBuf {
channel_dir.join(gen_dir_name(gen))
}
fn encode_manifest(generation: u32) -> [u8; MANIFEST_SIZE] {
let mut buf = [0u8; MANIFEST_SIZE];
buf[0..4].copy_from_slice(&MANIFEST_MAGIC);
buf[4] = MANIFEST_VERSION;
buf[5..9].copy_from_slice(&generation.to_le_bytes());
let checksum = xxhash_rust::xxh3::xxh3_64(&buf[0..12]) as u32;
buf[12..16].copy_from_slice(&checksum.to_le_bytes());
buf
}
fn decode_manifest(bytes: &[u8]) -> Option<u32> {
if bytes.len() != MANIFEST_SIZE {
return None;
}
if bytes[0..4] != MANIFEST_MAGIC {
return None;
}
if bytes[4] != MANIFEST_VERSION {
return None;
}
if bytes[9..12] != [0, 0, 0] {
return None;
}
let claimed_checksum = u32::from_le_bytes(bytes[12..16].try_into().ok()?);
let computed_checksum = xxhash_rust::xxh3::xxh3_64(&bytes[0..12]) as u32;
if claimed_checksum != computed_checksum {
return None;
}
let generation = u32::from_le_bytes(bytes[5..9].try_into().ok()?);
if generation < FIRST_GENERATION {
return None;
}
Some(generation)
}
fn read_manifest(channel_dir: &Path) -> Option<u32> {
let path = manifest_path(channel_dir);
let mut f = File::open(&path).ok()?;
let mut bytes = [0u8; MANIFEST_SIZE];
f.read_exact(&mut bytes).ok()?;
decode_manifest(&bytes)
}
fn write_manifest_atomic(channel_dir: &Path, generation: u32) -> std::io::Result<()> {
let bytes = encode_manifest(generation);
let tmp = manifest_tmp_path(channel_dir);
let target = manifest_path(channel_dir);
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp)?;
f.write_all(&bytes)?;
f.sync_all()?;
}
durable_rename(&tmp, &target)?;
if let Err(e) = fsync_dir(channel_dir) {
tracing::warn!(
error = %e,
path = %channel_dir.display(),
"redex manifest write: rename committed but fsync_dir failed; \
manifest is visible to subsequent reads but dirent is not yet \
durable on disk. Treating as success so the caller's cached-\
handle swap proceeds (on-disk and in-memory must stay aligned). \
Recovery's orphan-generation sweep handles a post-power-loss \
revert.",
);
}
Ok(())
}
fn enumerate_generations(channel_dir: &Path) -> std::io::Result<Vec<u32>> {
let mut gens: Vec<u32> = Vec::new();
let dir_iter = match std::fs::read_dir(channel_dir) {
Ok(it) => it,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(gens),
Err(e) => return Err(e),
};
for entry in dir_iter.flatten() {
let name = entry.file_name();
let name_str = match name.to_str() {
Some(s) => s,
None => continue,
};
if name_str.len() != 11 || !name_str.starts_with('v') {
continue;
}
let digits = &name_str[1..];
if !digits.chars().all(|c| c.is_ascii_digit()) {
continue;
}
let gen: u32 = match digits.parse() {
Ok(n) if n >= FIRST_GENERATION => n,
_ => continue,
};
match entry.file_type() {
Ok(ft) if ft.is_dir() => gens.push(gen),
_ => continue,
}
}
gens.sort_unstable_by(|a, b| b.cmp(a)); Ok(gens)
}
fn generation_is_complete(channel_dir: &Path, gen: u32) -> bool {
let dir = gen_dir(channel_dir, gen);
dir.join("idx").is_file() && dir.join("dat").is_file() && dir.join("ts").is_file()
}
fn delete_generation(channel_dir: &Path, gen: u32) {
let dir = gen_dir(channel_dir, gen);
if let Err(e) = std::fs::remove_dir_all(&dir) {
if e.kind() != std::io::ErrorKind::NotFound {
tracing::warn!(
error = %e,
path = %dir.display(),
"redex sweep: failed to delete orphan generation directory (non-fatal)",
);
}
}
}
fn sweep_orphan_generations(channel_dir: &Path, keep: u32) {
let gens = match enumerate_generations(channel_dir) {
Ok(gs) => gs,
Err(e) => {
tracing::warn!(
error = %e,
path = %channel_dir.display(),
"redex sweep: failed to enumerate generation directories (non-fatal)",
);
return;
}
};
for gen in gens {
if gen == keep {
continue;
}
delete_generation(channel_dir, gen);
}
let _ = std::fs::remove_file(manifest_tmp_path(channel_dir));
}
fn migrate_flat_layout_if_needed(channel_dir: &Path) -> std::io::Result<bool> {
let flat_idx = channel_dir.join("idx");
let flat_dat = channel_dir.join("dat");
let flat_ts = channel_dir.join("ts");
let any_flat = flat_idx.is_file() || flat_dat.is_file() || flat_ts.is_file();
if !any_flat {
return Ok(false);
}
let v1 = gen_dir(channel_dir, FIRST_GENERATION);
std::fs::create_dir_all(&v1)?;
if flat_idx.is_file() {
durable_rename(&flat_idx, &v1.join("idx"))?;
}
if flat_dat.is_file() {
durable_rename(&flat_dat, &v1.join("dat"))?;
}
if flat_ts.is_file() {
durable_rename(&flat_ts, &v1.join("ts"))?;
}
fsync_dir(&v1)?;
fsync_dir(channel_dir)?;
write_manifest_atomic(channel_dir, FIRST_GENERATION)?;
Ok(true)
}
fn resolve_live_generation(channel_dir: &Path) -> std::io::Result<u32> {
if let Some(gen) = read_manifest(channel_dir) {
if generation_is_complete(channel_dir, gen) {
return Ok(gen);
}
}
let candidates = enumerate_generations(channel_dir)?;
for gen in candidates {
if generation_is_complete(channel_dir, gen) {
if let Err(e) = write_manifest_atomic(channel_dir, gen) {
tracing::warn!(
error = %e,
path = %channel_dir.display(),
gen,
"redex resolve: failed to refresh manifest after fallback (non-fatal)",
);
}
return Ok(gen);
}
}
if migrate_flat_layout_if_needed(channel_dir)? {
return Ok(FIRST_GENERATION);
}
std::fs::create_dir_all(gen_dir(channel_dir, FIRST_GENERATION))?;
fsync_dir(channel_dir)?;
write_manifest_atomic(channel_dir, FIRST_GENERATION)?;
Ok(FIRST_GENERATION)
}
#[cfg(unix)]
fn fsync_dir(dir: &Path) -> std::io::Result<()> {
#[cfg(test)]
if let Some(e) = test_fsync_dir_consume_injected_failure() {
return Err(e);
}
std::fs::File::open(dir)?.sync_all()
}
#[cfg(not(unix))]
fn fsync_dir(_dir: &Path) -> std::io::Result<()> {
#[cfg(test)]
if let Some(e) = test_fsync_dir_consume_injected_failure() {
return Err(e);
}
Ok(())
}
#[cfg(test)]
thread_local! {
static FSYNC_DIR_FAIL_COUNTDOWN: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
}
#[cfg(test)]
pub(super) fn arm_fsync_dir_failure_at(n: u32) {
FSYNC_DIR_FAIL_COUNTDOWN.with(|c| c.set(n));
}
#[cfg(test)]
fn test_fsync_dir_consume_injected_failure() -> Option<std::io::Error> {
FSYNC_DIR_FAIL_COUNTDOWN.with(|c| {
let cur = c.get();
if cur == 0 {
return None;
}
c.set(cur - 1);
if cur == 1 {
Some(std::io::Error::other("test-injected fsync_dir failure"))
} else {
None
}
})
}
#[cfg(unix)]
fn durable_rename(src: &Path, dst: &Path) -> std::io::Result<()> {
std::fs::rename(src, dst)
}
#[cfg(windows)]
fn durable_rename(src: &Path, dst: &Path) -> std::io::Result<()> {
use std::os::windows::ffi::OsStrExt;
fn to_wide_null(p: &Path) -> Vec<u16> {
p.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect()
}
let src_w = to_wide_null(src);
let dst_w = to_wide_null(dst);
const MOVEFILE_REPLACE_EXISTING: u32 = 0x1;
const MOVEFILE_WRITE_THROUGH: u32 = 0x8;
extern "system" {
fn MoveFileExW(
existing_file_name: *const u16,
new_file_name: *const u16,
flags: u32,
) -> i32;
}
let ok = unsafe {
MoveFileExW(
src_w.as_ptr(),
dst_w.as_ptr(),
MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH,
)
};
if ok == 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
#[cfg(not(any(unix, windows)))]
fn durable_rename(src: &Path, dst: &Path) -> std::io::Result<()> {
std::fs::rename(src, dst)
}
#[expect(
clippy::expect_used,
reason = "loop iterates 0..full_records where full_records = bytes.len() / REDEX_ENTRY_SIZE; each REDEX_ENTRY_SIZE-byte slice converts infallibly"
)]
fn read_index(path: &Path) -> Result<(Vec<RedexEntry>, bool), RedexError> {
if !path.exists() {
return Ok((Vec::new(), false));
}
let mut f = File::open(path).map_err(RedexError::io)?;
let mut bytes = Vec::new();
f.read_to_end(&mut bytes).map_err(RedexError::io)?;
let full_records = bytes.len() / REDEX_ENTRY_SIZE;
let truncated = bytes.len() % REDEX_ENTRY_SIZE != 0;
let mut entries = Vec::with_capacity(full_records);
for i in 0..full_records {
let start = i * REDEX_ENTRY_SIZE;
let chunk: [u8; REDEX_ENTRY_SIZE] = bytes[start..start + REDEX_ENTRY_SIZE]
.try_into()
.expect("20-byte chunk");
entries.push(RedexEntry::from_bytes(&chunk));
}
Ok((entries, truncated))
}
#[expect(
clippy::expect_used,
reason = "bytes.len() >= expected_entries * 8 checked above; each 8-byte slice converts infallibly"
)]
fn read_timestamps(path: &Path, expected_entries: usize) -> Result<Option<Vec<u64>>, RedexError> {
if !path.exists() {
return Ok(None);
}
let mut f = File::open(path).map_err(RedexError::io)?;
let mut bytes = Vec::new();
f.read_to_end(&mut bytes).map_err(RedexError::io)?;
if bytes.len() < expected_entries * 8 {
return Ok(None);
}
let mut out = Vec::with_capacity(expected_entries);
for i in 0..expected_entries {
let chunk: [u8; 8] = bytes[i * 8..i * 8 + 8].try_into().expect("8 bytes");
out.push(u64::from_le_bytes(chunk));
}
Ok(Some(out))
}
fn read_payload(path: &Path) -> Result<Vec<u8>, RedexError> {
if !path.exists() {
return Ok(Vec::new());
}
let mut f = File::open(path).map_err(RedexError::io)?;
let mut bytes = Vec::new();
f.read_to_end(&mut bytes).map_err(RedexError::io)?;
Ok(bytes)
}
#[cfg(test)]
mod tests {
use super::super::entry::payload_checksum;
use super::*;
fn tmpdir() -> PathBuf {
let mut p = std::env::temp_dir();
p.push(format!(
"redex_disk_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&p).unwrap();
p
}
fn cleanup(p: &Path) {
let _ = std::fs::remove_dir_all(p);
}
#[test]
fn test_disk_append_and_recover() {
let base = tmpdir();
let name = ChannelName::new("t/disk1").unwrap();
{
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert!(recovered.index.is_empty());
assert!(recovered.payload_bytes.is_empty());
let p1 = b"alpha";
let e1 = RedexEntry::new_heap(0, 0, p1.len() as u32, 0, payload_checksum(p1));
recovered.disk.append_entry(&e1, p1).unwrap();
let p2 = b"beta";
let e2 = RedexEntry::new_heap(1, 5, p2.len() as u32, 0, payload_checksum(p2));
recovered.disk.append_entry(&e2, p2).unwrap();
recovered.disk.sync().unwrap();
}
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.index.len(), 2);
assert_eq!(recovered.index[0].seq, 0);
assert_eq!(recovered.index[1].seq, 1);
assert_eq!(&recovered.payload_bytes[..5], b"alpha");
assert_eq!(&recovered.payload_bytes[5..9], b"beta");
cleanup(&base);
}
#[test]
fn test_disk_inline_entries_skip_dat_file() {
let base = tmpdir();
let name = ChannelName::new("t/inline").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = *b"abcdefgh";
let entry = RedexEntry::new_inline(0, &payload, payload_checksum(&payload));
recovered.disk.append_entry(&entry, &payload).unwrap();
recovered.disk.sync().unwrap();
drop(recovered);
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.index.len(), 1);
assert!(recovered.index[0].is_inline());
assert!(recovered.payload_bytes.is_empty());
cleanup(&base);
}
#[test]
fn test_torn_idx_tail_is_truncated_on_reopen() {
let base = tmpdir();
let name = ChannelName::new("t/torn").unwrap();
{
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let p = b"ok";
let e = RedexEntry::new_heap(0, 0, p.len() as u32, 0, payload_checksum(p));
recovered.disk.append_entry(&e, p).unwrap();
recovered.disk.sync().unwrap();
}
let idx_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("idx");
let mut f = OpenOptions::new().append(true).open(&idx_path).unwrap();
f.write_all(&[0xFF; 7]).unwrap();
f.sync_all().unwrap();
drop(f);
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.index.len(), 1);
assert_eq!(recovered.index[0].seq, 0);
let after_len = std::fs::metadata(&idx_path).unwrap().len();
assert_eq!(after_len, 20);
cleanup(&base);
}
#[test]
fn test_channel_dir_handles_nested_names() {
let base = PathBuf::from("/tmp/base");
let name = ChannelName::new("sensors/lidar/front").unwrap();
let dir = channel_dir(&base, &name);
assert_eq!(dir, PathBuf::from("/tmp/base/sensors/lidar/front"));
}
#[test]
fn test_external_dat_truncation_drops_torn_heap_after_inlines() {
let base = tmpdir();
let name = ChannelName::new("t/external-trunc").unwrap();
let inline_a = *b"in_a____";
let inline_b = *b"in_b____";
let h1_payload = b"heap1";
let h2_payload = b"heap2_longer";
let h3_payload = b"heap3_data";
let h1_off = 0u32;
let h2_off = h1_off + h1_payload.len() as u32;
let h3_off = h2_off + h2_payload.len() as u32;
let dat_keep_len = (h2_off + h2_payload.len() as u32) as u64;
{
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_heap(
0,
h1_off,
h1_payload.len() as u32,
0,
payload_checksum(h1_payload),
),
h1_payload,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_inline(1, &inline_a, payload_checksum(&inline_a)),
&inline_a,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_heap(
2,
h2_off,
h2_payload.len() as u32,
0,
payload_checksum(h2_payload),
),
h2_payload,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_inline(3, &inline_b, payload_checksum(&inline_b)),
&inline_b,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_heap(
4,
h3_off,
h3_payload.len() as u32,
0,
payload_checksum(h3_payload),
),
h3_payload,
)
.unwrap();
recovered.disk.sync().unwrap();
}
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
OpenOptions::new()
.write(true)
.open(&dat_path)
.unwrap()
.set_len(dat_keep_len)
.unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let seqs: Vec<u64> = recovered.index.iter().map(|e| e.seq).collect();
assert_eq!(
seqs,
vec![0, 1, 2, 3],
"expected heap1,inline1,heap2,inline2 to survive (got seqs {:?})",
seqs
);
assert!(!recovered.index[0].is_inline(), "seq 0 should be heap");
assert!(recovered.index[1].is_inline(), "seq 1 should be inline");
assert!(!recovered.index[2].is_inline(), "seq 2 should be heap");
assert!(recovered.index[3].is_inline(), "seq 3 should be inline");
assert_eq!(
recovered.payload_bytes.len() as u64,
dat_keep_len,
"dat should be exactly heap1+heap2 bytes after recovery"
);
assert_eq!(
std::fs::metadata(&dat_path).unwrap().len(),
dat_keep_len,
"dat file size mismatch after recovery"
);
let idx_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("idx");
assert_eq!(
std::fs::metadata(&idx_path).unwrap().len(),
(4 * REDEX_ENTRY_SIZE) as u64,
"idx file should have exactly 4 records after recovery"
);
cleanup(&base);
}
#[test]
fn test_external_dat_truncation_to_first_heap_drops_everything_after() {
let base = tmpdir();
let name = ChannelName::new("t/external-trunc-deep").unwrap();
let inline_a = *b"in_a____";
let inline_b = *b"in_b____";
let h1_payload = b"heap1";
let h2_payload = b"heap2";
let h3_payload = b"heap3";
{
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_heap(0, 0, 5, 0, payload_checksum(h1_payload)),
h1_payload,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_inline(1, &inline_a, payload_checksum(&inline_a)),
&inline_a,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_heap(2, 5, 5, 0, payload_checksum(h2_payload)),
h2_payload,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_inline(3, &inline_b, payload_checksum(&inline_b)),
&inline_b,
)
.unwrap();
recovered
.disk
.append_entry(
&RedexEntry::new_heap(4, 10, 5, 0, payload_checksum(h3_payload)),
h3_payload,
)
.unwrap();
recovered.disk.sync().unwrap();
}
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
OpenOptions::new()
.write(true)
.open(&dat_path)
.unwrap()
.set_len(5)
.unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let seqs: Vec<u64> = recovered.index.iter().map(|e| e.seq).collect();
assert_eq!(
seqs,
vec![0, 1],
"deep dat truncation must keep only entries up to (but not past) the earliest torn heap (got seqs {:?})",
seqs
);
assert!(!recovered.index[0].is_inline());
assert!(recovered.index[1].is_inline());
assert_eq!(
std::fs::metadata(&dat_path).unwrap().len(),
5,
"dat should remain exactly heap1's bytes after recovery"
);
cleanup(&base);
}
#[test]
fn append_failure_after_dat_write_rolls_back_dat() {
let base = tmpdir();
let name = ChannelName::new("t/rollback").unwrap();
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let p1 = b"good-payload-A";
let e1 = RedexEntry::new_heap(0, 0, p1.len() as u32, 0, payload_checksum(p1));
recovered.disk.append_entry(&e1, p1).unwrap();
recovered.disk.sync().unwrap();
let dat_len_after_a = std::fs::metadata(&dat_path).unwrap().len();
assert_eq!(dat_len_after_a as usize, p1.len());
recovered.disk.arm_next_post_dat_failure();
let p2 = b"would-strand-these-bytes";
let e2 = RedexEntry::new_heap(1, p1.len() as u32, p2.len() as u32, 0, payload_checksum(p2));
let result = recovered.disk.append_entry(&e2, p2);
assert!(result.is_err(), "injected failure must surface as Err");
let dat_len_after_failure = std::fs::metadata(&dat_path).unwrap().len();
assert_eq!(
dat_len_after_failure, dat_len_after_a,
"dat must be rolled back to its pre-failure length; \
stranded bytes here would corrupt later reads"
);
recovered.disk.append_entry(&e2, p2).unwrap();
recovered.disk.sync().unwrap();
let final_dat_len = std::fs::metadata(&dat_path).unwrap().len();
assert_eq!(
final_dat_len as usize,
p1.len() + p2.len(),
"after successful retry, dat must contain exactly p1 + p2"
);
drop(recovered);
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.index.len(), 2);
assert_eq!(&recovered.payload_bytes[..p1.len()], p1);
assert_eq!(&recovered.payload_bytes[p1.len()..p1.len() + p2.len()], p2);
cleanup(&base);
}
#[test]
fn checksum_filter_preserves_ts_pairing_on_mid_file_drop() {
let base = tmpdir();
let name = ChannelName::new("t/ts_pair").unwrap();
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payloads: [&[u8]; 4] = [b"AAAAAAAAA", b"BBBBBBBBB", b"CCCCCCCCC", b"DDDDDDDDD"];
let timestamps: [u64; 4] = [1000, 2000, 3000, 4000];
let mut offset = 0u32;
for (i, p) in payloads.iter().enumerate() {
let e = RedexEntry::new_heap(i as u64, offset, p.len() as u32, 0, payload_checksum(p));
recovered
.disk
.append_entry_at(&e, p, timestamps[i])
.unwrap();
offset += p.len() as u32;
}
recovered.disk.sync().unwrap();
drop(recovered);
{
let mut bytes = std::fs::read(&dat_path).unwrap();
bytes[payloads[0].len()] ^= 0xFF; std::fs::write(&dat_path, &bytes).unwrap();
}
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.index.len(), 3, "one corrupt entry must drop");
let surviving_seqs: Vec<u64> = recovered.index.iter().map(|e| e.seq).collect();
assert_eq!(surviving_seqs, vec![0, 2, 3], "B should be dropped");
let ts = recovered.timestamps.expect("ts sidecar present");
assert_eq!(
ts,
vec![1000, 3000, 4000],
"surviving timestamps must come from the original index \
positions of the surviving entries; pre-fix this would \
have been [1000, 2000, 3000]"
);
cleanup(&base);
}
#[test]
fn test_disk_batch_mixed_heap_and_inline_roundtrip() {
let base = tmpdir();
let name = ChannelName::new("t/batch_mixed").unwrap();
let h1 = b"heap-one";
let inline_a = *b"inline_A"; let h2 = b"heap-two-longer";
let inline_b = *b"inline_B";
let h1_off = 0u32;
let h2_off = h1_off + h1.len() as u32;
let entries = [
(
RedexEntry::new_heap(10, h1_off, h1.len() as u32, 0, payload_checksum(h1)),
h1.as_slice(),
),
(
RedexEntry::new_inline(11, &inline_a, payload_checksum(&inline_a)),
inline_a.as_slice(),
),
(
RedexEntry::new_heap(12, h2_off, h2.len() as u32, 0, payload_checksum(h2)),
h2.as_slice(),
),
(
RedexEntry::new_inline(13, &inline_b, payload_checksum(&inline_b)),
inline_b.as_slice(),
),
];
let timestamps = [11_000u64, 22_000, 33_000, 44_000];
{
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
recovered
.disk
.append_entries_at(&entries, ×tamps)
.unwrap();
recovered.disk.sync().unwrap();
}
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
let dat_bytes = std::fs::read(&dat_path).unwrap();
assert_eq!(dat_bytes.len(), h1.len() + h2.len());
assert_eq!(&dat_bytes[..h1.len()], h1);
assert_eq!(&dat_bytes[h1.len()..], h2);
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let seqs: Vec<u64> = recovered.index.iter().map(|e| e.seq).collect();
assert_eq!(seqs, vec![10, 11, 12, 13]);
assert!(!recovered.index[0].is_inline());
assert!(recovered.index[1].is_inline());
assert!(!recovered.index[2].is_inline());
assert!(recovered.index[3].is_inline());
let ts = recovered.timestamps.expect("ts sidecar present");
assert_eq!(ts, vec![11_000, 22_000, 33_000, 44_000]);
cleanup(&base);
}
#[test]
fn test_disk_batch_all_inline_skips_dat() {
let base = tmpdir();
let name = ChannelName::new("t/batch_all_inline").unwrap();
let p1 = *b"alpha___";
let p2 = *b"beta____";
let p3 = *b"gamma___";
let entries = [
(
RedexEntry::new_inline(0, &p1, payload_checksum(&p1)),
p1.as_slice(),
),
(
RedexEntry::new_inline(1, &p2, payload_checksum(&p2)),
p2.as_slice(),
),
(
RedexEntry::new_inline(2, &p3, payload_checksum(&p3)),
p3.as_slice(),
),
];
let timestamps = [100u64, 200, 300];
{
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
recovered
.disk
.append_entries_at(&entries, ×tamps)
.unwrap();
recovered.disk.sync().unwrap();
}
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
assert_eq!(
std::fs::metadata(&dat_path).unwrap().len(),
0,
"all-inline batch must not write to dat"
);
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.index.len(), 3);
for e in &recovered.index {
assert!(e.is_inline());
}
let ts = recovered.timestamps.expect("ts sidecar present");
assert_eq!(ts, vec![100, 200, 300]);
cleanup(&base);
}
#[test]
fn test_worker_handles_share_os_file_with_appender() {
let base = tmpdir();
let name = ChannelName::new("t/worker_share").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let p1 = b"alpha";
let e1 = RedexEntry::new_heap(0, 0, p1.len() as u32, 0, payload_checksum(p1));
recovered.disk.append_entry(&e1, p1).unwrap();
let p2 = b"beta_payload";
let e2 = RedexEntry::new_heap(1, p1.len() as u32, p2.len() as u32, 0, payload_checksum(p2));
recovered.disk.append_entry(&e2, p2).unwrap();
let (dat_w, idx_w, ts_w) = recovered.disk.worker_file_lens();
assert_eq!(
dat_w,
(p1.len() + p2.len()) as u64,
"worker dat handle must reflect the appender's heap writes"
);
assert_eq!(idx_w, 2 * REDEX_ENTRY_SIZE as u64);
assert_eq!(ts_w, 2 * 8);
let dat_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("dat");
let idx_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("idx");
let ts_path = gen_dir(&channel_dir(&base, &name), FIRST_GENERATION).join("ts");
assert_eq!(dat_w, std::fs::metadata(&dat_path).unwrap().len());
assert_eq!(idx_w, std::fs::metadata(&idx_path).unwrap().len());
assert_eq!(ts_w, std::fs::metadata(&ts_path).unwrap().len());
recovered.disk.sync().unwrap();
cleanup(&base);
}
#[test]
fn test_compact_to_re_clones_worker_handles() {
let base = tmpdir();
let name = ChannelName::new("t/compact_workers").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payloads: [&[u8]; 3] = [b"first_one", b"second_one", b"third_one_"];
let mut offset = 0u32;
for (i, p) in payloads.iter().enumerate() {
let e = RedexEntry::new_heap(i as u64, offset, p.len() as u32, 0, payload_checksum(p));
recovered
.disk
.append_entry_at(&e, p, 1_000 * (i as u64 + 1))
.unwrap();
offset += p.len() as u32;
}
recovered.disk.sync().unwrap();
let third_dat_base = (payloads[0].len() + payloads[1].len()) as u64;
let surviving = vec![RedexEntry::new_heap(
2,
third_dat_base as u32,
payloads[2].len() as u32,
0,
payload_checksum(payloads[2]),
)];
let surviving_ts = vec![3_000u64];
recovered
.disk
.compact_to(&surviving, &surviving_ts, third_dat_base)
.unwrap();
let live = recovered.disk.live_dir();
let dat_path = live.join("dat");
let idx_path = live.join("idx");
let ts_path = live.join("ts");
let on_disk_dat = std::fs::metadata(&dat_path).unwrap().len();
let on_disk_idx = std::fs::metadata(&idx_path).unwrap().len();
let on_disk_ts = std::fs::metadata(&ts_path).unwrap().len();
assert_eq!(on_disk_dat, payloads[2].len() as u64, "sanity");
assert_eq!(on_disk_idx, REDEX_ENTRY_SIZE as u64, "sanity");
assert_eq!(on_disk_ts, 8, "sanity");
assert_eq!(recovered.disk.live_gen(), FIRST_GENERATION + 1);
let (dat_w, idx_w, ts_w) = recovered.disk.worker_file_lens();
assert_eq!(
dat_w, on_disk_dat,
"worker dat handle must point at the compacted dat file, \
not the placeholder"
);
assert_eq!(
idx_w, on_disk_idx,
"worker idx handle must point at the compacted idx file"
);
assert_eq!(
ts_w, on_disk_ts,
"worker ts handle must point at the compacted ts file"
);
let new_payload = b"after_compact";
let new_entry = RedexEntry::new_heap(
99,
on_disk_dat as u32,
new_payload.len() as u32,
0,
payload_checksum(new_payload),
);
recovered
.disk
.append_entry_at(&new_entry, new_payload, 4_000)
.unwrap();
recovered.disk.sync().unwrap();
drop(recovered);
let recovered2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
let seqs: Vec<u64> = recovered2.index.iter().map(|e| e.seq).collect();
assert_eq!(
seqs,
vec![2, 99],
"post-compact reopen must show the surviving entry and \
the post-compaction append"
);
let ts = recovered2.timestamps.expect("ts sidecar present");
assert_eq!(
ts,
vec![3_000, 4_000],
"timestamps must pair with the right index records after \
compaction + post-compact append"
);
cleanup(&base);
}
#[test]
fn fsync_dir_helper_succeeds_on_a_normal_directory() {
let tmp = std::env::temp_dir().join(format!(
"redex-fsync-dir-test-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
std::fs::create_dir_all(&tmp).expect("create tempdir");
super::fsync_dir(&tmp).expect("fsync_dir must succeed on a normal dir");
let _ = std::fs::remove_dir(&tmp);
}
#[cfg(not(unix))]
#[test]
fn fsync_dir_no_op_on_non_unix_returns_ok_even_for_nonexistent_paths() {
let bogus = std::path::PathBuf::from(format!(
"{}/redex-no-such-dir-{}",
std::env::temp_dir().display(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
assert!(!bogus.exists(), "test fixture: bogus path must not exist");
super::fsync_dir(&bogus).expect("non-Unix fsync_dir must be a no-op Ok");
}
#[test]
fn durable_rename_moves_file_and_preserves_contents() {
let dir = tmpdir();
let src = dir.join("durable_rename_src");
let dst = dir.join("durable_rename_dst");
std::fs::write(&src, b"durable_rename payload").expect("seed src");
assert!(src.exists());
assert!(!dst.exists());
super::durable_rename(&src, &dst).expect("durable_rename must succeed");
assert!(!src.exists(), "src must be unlinked after rename");
assert!(dst.exists(), "dst must exist after rename");
let bytes = std::fs::read(&dst).expect("read dst");
assert_eq!(bytes, b"durable_rename payload");
}
#[test]
fn durable_rename_replaces_existing_destination() {
let dir = tmpdir();
let src = dir.join("durable_rename_replace_src");
let dst = dir.join("durable_rename_replace_dst");
std::fs::write(&src, b"new contents").expect("seed src");
std::fs::write(&dst, b"old contents to be replaced").expect("seed dst");
super::durable_rename(&src, &dst).expect("rename over existing dst must succeed");
assert!(!src.exists(), "src must be unlinked after rename");
let bytes = std::fs::read(&dst).expect("read dst");
assert_eq!(
bytes, b"new contents",
"dst must hold src's content, not the old payload"
);
}
#[test]
fn durable_rename_surfaces_missing_source_error() {
let dir = tmpdir();
let bogus_src = dir.join("definitely-not-here");
let dst = dir.join("durable_rename_missing_dst");
assert!(!bogus_src.exists());
let err = super::durable_rename(&bogus_src, &dst)
.expect_err("rename of nonexistent src must fail");
assert!(!dst.exists(), "no dst should be created on failed rename");
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
}
#[test]
fn compact_to_post_rename_swap_is_atomic_on_success_path() {
let base = tmpdir();
let name = ChannelName::new("t/cr4_atomic").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payloads: [&[u8]; 3] = [b"alpha_aaaa", b"beta_bbbbb", b"gamma_cccc"];
let mut offset = 0u32;
for (i, p) in payloads.iter().enumerate() {
let e = RedexEntry::new_heap(i as u64, offset, p.len() as u32, 0, payload_checksum(p));
recovered
.disk
.append_entry_at(&e, p, 1_000 * (i as u64 + 1))
.unwrap();
offset += p.len() as u32;
}
recovered.disk.sync().unwrap();
let third_dat_base = (payloads[0].len() + payloads[1].len()) as u64;
let surviving = vec![RedexEntry::new_heap(
2,
third_dat_base as u32,
payloads[2].len() as u32,
0,
payload_checksum(payloads[2]),
)];
let surviving_ts = vec![3_000u64];
recovered
.disk
.compact_to(&surviving, &surviving_ts, third_dat_base)
.expect("happy path compact must succeed");
let live = recovered.disk.live_dir();
let dat_path = live.join("dat");
let idx_path = live.join("idx");
let ts_path = live.join("ts");
let on_disk_dat = std::fs::metadata(&dat_path).unwrap().len();
let on_disk_idx = std::fs::metadata(&idx_path).unwrap().len();
let on_disk_ts = std::fs::metadata(&ts_path).unwrap().len();
let (dat_w, idx_w, ts_w) = recovered.disk.worker_file_lens();
assert_eq!(
dat_w, on_disk_dat,
"worker dat must point at the new generation"
);
assert_eq!(
idx_w, on_disk_idx,
"worker idx must point at the new generation"
);
assert_eq!(
ts_w, on_disk_ts,
"worker ts must point at the new generation"
);
let new_payload = b"after_compact";
let new_entry = RedexEntry::new_heap(
99,
on_disk_dat as u32,
new_payload.len() as u32,
0,
payload_checksum(new_payload),
);
recovered
.disk
.append_entry_at(&new_entry, new_payload, 4_000)
.unwrap();
recovered.disk.sync().unwrap();
drop(recovered);
let recovered2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
let seqs: Vec<u64> = recovered2.index.iter().map(|e| e.seq).collect();
assert_eq!(
seqs,
vec![2, 99],
"post-compact append must be on the channel dat, not a tmp placeholder"
);
cleanup(&base);
}
#[test]
fn cubic_p1_poisoned_segment_refuses_writes() {
let base = tmpdir();
let name = ChannelName::new("t/poison").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload: &[u8] = b"first";
let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
recovered
.disk
.append_entry_at(&entry, payload, 1_000)
.expect("fresh segment must accept appends");
recovered
.disk
.poisoned
.store(true, std::sync::atomic::Ordering::Release);
let entry2 = RedexEntry::new_heap(1, 5, payload.len() as u32, 0, payload_checksum(payload));
let err = recovered
.disk
.append_entry_at(&entry2, payload, 2_000)
.expect_err("poisoned segment must refuse append_entry_at");
let msg = format!("{}", err);
assert!(
msg.contains("poisoned"),
"error message must reference poisoning; got: {msg}"
);
let err = recovered
.disk
.append_entries_at(&[(entry2, payload)], &[3_000])
.expect_err("poisoned segment must refuse append_entries_at");
assert!(
format!("{}", err).contains("poisoned"),
"append_entries_at error must reference poisoning"
);
let err = recovered
.disk
.compact_to(&[], &[], 0)
.expect_err("poisoned segment must refuse compact_to");
assert!(
format!("{}", err).contains("poisoned"),
"compact_to error must reference poisoning"
);
cleanup(&base);
}
#[test]
fn recovery_idx_dat_truncations_must_be_paired_with_sync_all() {
let src = include_str!("disk.rs");
let header = "pub(super) fn open(";
let start = src.find(header).expect("DiskSegment::open must exist");
let body_start = start + header.len();
let next_fn_offsets: Vec<usize> = ["\n fn ", "\n pub fn ", "\n pub(super) fn "]
.iter()
.filter_map(|p| src[body_start..].find(p).map(|i| i + body_start))
.collect();
let next_fn = *next_fn_offsets
.iter()
.min()
.expect("a following fn must exist after open()");
let body = &src[start..next_fn];
let lines: Vec<&str> = body
.lines()
.map(|l| match l.find("//") {
Some(idx) => &l[..idx],
None => l,
})
.collect();
let in_scope_markers = [
"set_len((index.len() * REDEX_ENTRY_SIZE)",
"set_len(retained_dat_end)",
];
let mut found_any = [false; 2];
let window = 5;
for (i, line) in lines.iter().enumerate() {
for (mi, marker) in in_scope_markers.iter().enumerate() {
if !line.contains(marker) {
continue;
}
found_any[mi] = true;
let mut paired = false;
for off in 1..=window {
if i + off >= lines.len() {
break;
}
if lines[i + off].contains("sync_all()") {
paired = true;
break;
}
}
assert!(
paired,
"regression: `{}` in DiskSegment::open recovery at \
(relative) line {} is not followed within {} lines \
by `sync_all()`. A crash between truncation and \
the next durable write reincarnates the torn tail.",
marker, i, window
);
}
}
for (mi, marker) in in_scope_markers.iter().enumerate() {
assert!(
found_any[mi],
"expected to find `{}` in DiskSegment::open — the \
recovery walk's truncation step appears to have \
been removed or refactored. Audit the new shape \
to confirm the fsync pairing is preserved.",
marker
);
}
}
#[test]
fn rollback_truncate_must_poison_on_failure() {
let src = include_str!("disk.rs");
let header = "fn rollback_truncate(";
let start = src.find(header).expect("rollback_truncate must exist");
let body_start = start + header.len();
let next_fn_offsets: Vec<usize> = ["\n fn ", "\n pub fn ", "\n pub(super) fn "]
.iter()
.filter_map(|p| src[body_start..].find(p).map(|i| i + body_start))
.collect();
let next_fn = *next_fn_offsets
.iter()
.min()
.expect("a following fn must exist after rollback_truncate");
let body = &src[start..next_fn];
let poison_count = body.matches("poisoned.store(true").count();
assert!(
poison_count >= 2,
"regression: rollback_truncate must call \
`poisoned.store(true, ...)` in BOTH the open-failure \
and set_len-failure arms (saw {} occurrences). \
Pre-fix the open-failure arm silently dropped the \
error and left the segment divergent.",
poison_count
);
assert!(
!body.contains("if let Ok(f) = OpenOptions::"),
"regression: rollback_truncate must not use the \
`if let Ok(f) = OpenOptions::...` shape — that \
discards the open error, the exact pre-fix bug. \
Use `match` and poison on the Err arm."
);
}
#[test]
fn manifest_codec_roundtrip() {
for &gen in &[FIRST_GENERATION, 2, 100, 1_000_000, u32::MAX] {
let bytes = encode_manifest(gen);
assert_eq!(
bytes.len(),
MANIFEST_SIZE,
"wire size must be {MANIFEST_SIZE}"
);
assert_eq!(
decode_manifest(&bytes),
Some(gen),
"round-trip must preserve generation {gen}",
);
}
}
#[test]
fn manifest_codec_rejects_garbage() {
let mut bad = encode_manifest(1);
bad[0] = b'X';
assert_eq!(decode_manifest(&bad), None, "bad magic must be rejected");
let mut bad = encode_manifest(1);
bad[4] = 99;
assert_eq!(
decode_manifest(&bad),
None,
"unknown version must be rejected"
);
let mut bad = encode_manifest(1);
bad[5..9].copy_from_slice(&0u32.to_le_bytes());
let cs = xxhash_rust::xxh3::xxh3_64(&bad[0..12]) as u32;
bad[12..16].copy_from_slice(&cs.to_le_bytes());
assert_eq!(decode_manifest(&bad), None, "gen 0 must be rejected");
let mut bad = encode_manifest(1);
bad[10] = 0xAA;
let cs = xxhash_rust::xxh3::xxh3_64(&bad[0..12]) as u32;
bad[12..16].copy_from_slice(&cs.to_le_bytes());
assert_eq!(
decode_manifest(&bad),
None,
"non-zero reserved must be rejected (defense in depth against \
a future producer that stuffs bits we may want to repurpose)",
);
let mut bad = encode_manifest(7);
bad[5] ^= 0x01;
assert_eq!(
decode_manifest(&bad),
None,
"bit-flip in generation must trip the checksum",
);
assert_eq!(
decode_manifest(&[0u8; 15]),
None,
"short slice must be rejected"
);
assert_eq!(decode_manifest(&[]), None, "empty slice must be rejected");
}
#[test]
fn open_brand_new_channel_creates_v1_and_manifest() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_init").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.disk.live_gen(), FIRST_GENERATION);
let chan = channel_dir(&base, &name);
let manifest = manifest_path(&chan);
assert!(
manifest.is_file(),
"manifest must exist on brand-new channel"
);
let mut buf = [0u8; MANIFEST_SIZE];
std::fs::File::open(&manifest)
.unwrap()
.read_exact(&mut buf)
.unwrap();
assert_eq!(decode_manifest(&buf), Some(FIRST_GENERATION));
let v1 = gen_dir(&chan, FIRST_GENERATION);
assert!(v1.is_dir(), "v0000000001/ must exist");
cleanup(&base);
}
#[test]
fn open_migrates_flat_layout_to_v1_generation() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_migrate").unwrap();
let chan = channel_dir(&base, &name);
std::fs::create_dir_all(&chan).unwrap();
let payload = b"legacy-payload-bytes";
let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
std::fs::write(chan.join("idx"), entry.to_bytes()).unwrap();
std::fs::write(chan.join("dat"), payload).unwrap();
std::fs::write(chan.join("ts"), 1234u64.to_le_bytes()).unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.disk.live_gen(), FIRST_GENERATION);
assert_eq!(recovered.index.len(), 1, "migrated entry must survive");
assert_eq!(recovered.payload_bytes, payload);
assert_eq!(recovered.timestamps.as_deref(), Some(&[1234u64][..]));
assert!(!chan.join("idx").exists(), "flat idx must be migrated");
assert!(!chan.join("dat").exists(), "flat dat must be migrated");
assert!(!chan.join("ts").exists(), "flat ts must be migrated");
let v1 = gen_dir(&chan, FIRST_GENERATION);
assert!(v1.join("idx").is_file());
assert!(v1.join("dat").is_file());
assert!(v1.join("ts").is_file());
assert_eq!(read_manifest(&chan), Some(FIRST_GENERATION));
drop(recovered);
let r2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(r2.disk.live_gen(), FIRST_GENERATION);
assert_eq!(r2.index.len(), 1);
cleanup(&base);
}
#[test]
fn open_resumes_migration_after_partial_flat_rename_crash() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_partial_migration").unwrap();
let chan = channel_dir(&base, &name);
std::fs::create_dir_all(&chan).unwrap();
let payload = b"resume-migration-payload";
let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
let v1 = gen_dir(&chan, FIRST_GENERATION);
std::fs::create_dir_all(&v1).unwrap();
std::fs::write(v1.join("idx"), entry.to_bytes()).unwrap();
std::fs::write(chan.join("dat"), payload).unwrap();
std::fs::write(chan.join("ts"), 4567u64.to_le_bytes()).unwrap();
assert!(!manifest_path(&chan).exists());
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.disk.live_gen(), FIRST_GENERATION);
assert_eq!(read_manifest(&chan), Some(FIRST_GENERATION));
assert!(
!chan.join("dat").exists(),
"remaining flat dat must have completed its migration",
);
assert!(
!chan.join("ts").exists(),
"remaining flat ts must have completed its migration",
);
assert!(v1.join("idx").is_file());
assert!(v1.join("dat").is_file());
assert!(v1.join("ts").is_file());
assert_eq!(
recovered.index.len(),
1,
"the pre-crash idx entry must survive"
);
assert_eq!(recovered.payload_bytes, payload);
assert_eq!(recovered.timestamps.as_deref(), Some(&[4567u64][..]));
drop(recovered);
let r2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(r2.disk.live_gen(), FIRST_GENERATION);
assert_eq!(r2.index.len(), 1);
cleanup(&base);
}
#[test]
fn open_resumes_migration_when_only_dat_remains_flat() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_partial_migration_dat_only").unwrap();
let chan = channel_dir(&base, &name);
std::fs::create_dir_all(&chan).unwrap();
let payload = b"dat-only-resume";
let entry = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
let v1 = gen_dir(&chan, FIRST_GENERATION);
std::fs::create_dir_all(&v1).unwrap();
std::fs::write(v1.join("idx"), entry.to_bytes()).unwrap();
std::fs::write(v1.join("ts"), 99u64.to_le_bytes()).unwrap();
std::fs::write(chan.join("dat"), payload).unwrap();
assert!(!manifest_path(&chan).exists());
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(recovered.disk.live_gen(), FIRST_GENERATION);
assert_eq!(read_manifest(&chan), Some(FIRST_GENERATION));
assert!(!chan.join("dat").exists());
assert!(v1.join("dat").is_file());
assert_eq!(recovered.index.len(), 1);
assert_eq!(recovered.payload_bytes, payload);
assert_eq!(recovered.timestamps.as_deref(), Some(&[99u64][..]));
cleanup(&base);
}
#[test]
fn open_falls_back_to_highest_complete_generation_when_manifest_missing() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_fallback").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = b"x";
let e = RedexEntry::new_heap(0, 0, 1, 0, payload_checksum(payload));
recovered.disk.append_entry_at(&e, payload, 1).unwrap();
recovered.disk.sync().unwrap();
let surviving = vec![*recovered.index.first().unwrap_or(&e)];
let _ = recovered.disk.compact_to(&surviving, &[1], 0);
let live_after_compact = recovered.disk.live_gen();
drop(recovered);
let chan = channel_dir(&base, &name);
std::fs::remove_file(manifest_path(&chan)).unwrap();
let recovered2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(
recovered2.disk.live_gen(),
live_after_compact,
"fallback must pick the highest validated generation",
);
assert_eq!(read_manifest(&chan), Some(live_after_compact));
cleanup(&base);
}
#[test]
fn open_falls_back_when_manifest_checksum_bad() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_torn").unwrap();
let recovered = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = b"y";
let e = RedexEntry::new_heap(0, 0, 1, 0, payload_checksum(payload));
recovered.disk.append_entry_at(&e, payload, 2).unwrap();
recovered.disk.sync().unwrap();
drop(recovered);
let chan = channel_dir(&base, &name);
let mut bytes = [0u8; MANIFEST_SIZE];
std::fs::File::open(manifest_path(&chan))
.unwrap()
.read_exact(&mut bytes)
.unwrap();
bytes[12] ^= 0x01;
std::fs::write(manifest_path(&chan), bytes).unwrap();
assert_eq!(read_manifest(&chan), None, "checksum guard must fire");
let recovered2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(
recovered2.disk.live_gen(),
FIRST_GENERATION,
"fallback must still find v1 after the manifest is torn",
);
assert_eq!(read_manifest(&chan), Some(FIRST_GENERATION));
cleanup(&base);
}
#[test]
fn open_sweeps_orphan_newer_generation_left_by_crashed_compact() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_orphan_newer").unwrap();
let r = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = b"live";
let e = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
r.disk.append_entry_at(&e, payload, 7).unwrap();
r.disk.sync().unwrap();
drop(r);
let chan = channel_dir(&base, &name);
let v2 = gen_dir(&chan, FIRST_GENERATION + 1);
std::fs::create_dir_all(&v2).unwrap();
std::fs::write(v2.join("idx"), b"\x00\x00").unwrap();
let r2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(r2.disk.live_gen(), FIRST_GENERATION);
assert!(
!v2.exists(),
"orphan generation directory must be swept on next open",
);
assert_eq!(r2.index.len(), 1);
assert_eq!(r2.payload_bytes, payload);
cleanup(&base);
}
#[test]
fn open_sweeps_orphan_older_generation_left_by_crashed_post_flip_cleanup() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_orphan_older").unwrap();
let r = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = b"v1-payload";
let e = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
r.disk.append_entry_at(&e, payload, 8).unwrap();
r.disk.sync().unwrap();
drop(r);
let chan = channel_dir(&base, &name);
let v1 = gen_dir(&chan, FIRST_GENERATION);
let v2 = gen_dir(&chan, FIRST_GENERATION + 1);
std::fs::create_dir_all(&v2).unwrap();
let new_payload = b"v2-payload";
let new_entry = RedexEntry::new_heap(
5,
0,
new_payload.len() as u32,
0,
payload_checksum(new_payload),
);
std::fs::write(v2.join("idx"), new_entry.to_bytes()).unwrap();
std::fs::write(v2.join("dat"), new_payload).unwrap();
std::fs::write(v2.join("ts"), 9u64.to_le_bytes()).unwrap();
write_manifest_atomic(&chan, FIRST_GENERATION + 1).unwrap();
assert!(v1.exists(), "setup: v1 must still be present pre-recovery");
let r2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(r2.disk.live_gen(), FIRST_GENERATION + 1);
assert!(
!v1.exists(),
"superseded prior generation must be swept on next open",
);
assert_eq!(r2.index.len(), 1);
assert_eq!(r2.payload_bytes, new_payload);
cleanup(&base);
}
#[test]
fn compact_to_advances_generation_and_swaps_manifest_atomically() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_compact_flow").unwrap();
let r = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = b"to-be-compacted";
let e = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
r.disk.append_entry_at(&e, payload, 11).unwrap();
r.disk.sync().unwrap();
let chan = channel_dir(&base, &name);
assert_eq!(r.disk.live_gen(), FIRST_GENERATION);
assert_eq!(read_manifest(&chan), Some(FIRST_GENERATION));
let surviving = vec![e];
r.disk.compact_to(&surviving, &[11], 0).unwrap();
assert_eq!(r.disk.live_gen(), FIRST_GENERATION + 1);
assert_eq!(read_manifest(&chan), Some(FIRST_GENERATION + 1));
let v2 = gen_dir(&chan, FIRST_GENERATION + 1);
assert!(v2.join("idx").is_file());
assert!(v2.join("dat").is_file());
assert!(v2.join("ts").is_file());
let v1 = gen_dir(&chan, FIRST_GENERATION);
assert!(!v1.exists(), "prior generation must be swept post-compact");
drop(r);
let r2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(r2.disk.live_gen(), FIRST_GENERATION + 1);
assert_eq!(r2.index.len(), 1);
cleanup(&base);
}
#[test]
fn recovery_converges_to_single_live_generation() {
let base = tmpdir();
let name = ChannelName::new("t/manifest_converge").unwrap();
let r = DiskSegment::open(&base, &name, 0, 0).unwrap();
let p1 = b"first-gen";
let e1 = RedexEntry::new_heap(0, 0, p1.len() as u32, 0, payload_checksum(p1));
r.disk.append_entry_at(&e1, p1, 100).unwrap();
r.disk.sync().unwrap();
r.disk.compact_to(&[e1], &[100], 0).unwrap();
assert_eq!(r.disk.live_gen(), FIRST_GENERATION + 1);
let p2 = b"second-gen";
let e2 = RedexEntry::new_heap(1, p1.len() as u32, p2.len() as u32, 0, payload_checksum(p2));
r.disk.append_entry_at(&e2, p2, 200).unwrap();
r.disk.sync().unwrap();
drop(r);
let chan = channel_dir(&base, &name);
let v3 = gen_dir(&chan, FIRST_GENERATION + 2);
std::fs::create_dir_all(&v3).unwrap();
std::fs::write(v3.join("idx"), b"\xAB\xCD").unwrap();
let r2 = DiskSegment::open(&base, &name, 0, 0).unwrap();
assert_eq!(r2.disk.live_gen(), FIRST_GENERATION + 1);
assert!(!v3.exists(), "orphan v3 must be swept");
assert_eq!(r2.index.len(), 2);
cleanup(&base);
}
#[test]
fn sweep_orphan_generations_keeps_only_designated_generation() {
let base = tmpdir();
let chan = base.join("sweep_test");
std::fs::create_dir_all(&chan).unwrap();
for gen in &[1u32, 2, 5, 7] {
std::fs::create_dir_all(gen_dir(&chan, *gen)).unwrap();
}
std::fs::create_dir_all(chan.join("not-a-gen-dir")).unwrap();
std::fs::write(chan.join("manifest.tmp"), b"stale").unwrap();
sweep_orphan_generations(&chan, 5);
assert!(gen_dir(&chan, 5).exists(), "kept generation must survive");
assert!(!gen_dir(&chan, 1).exists(), "v1 must be swept");
assert!(!gen_dir(&chan, 2).exists(), "v2 must be swept");
assert!(!gen_dir(&chan, 7).exists(), "v7 must be swept");
assert!(
chan.join("not-a-gen-dir").exists(),
"non-matching directories must NOT be swept",
);
assert!(
!chan.join("manifest.tmp").exists(),
"stale manifest.tmp must be cleaned up",
);
let _ = std::fs::remove_dir_all(&chan);
}
#[test]
fn enumerate_generations_filters_and_sorts() {
let base = tmpdir();
let chan = base.join("enumerate_test");
std::fs::create_dir_all(&chan).unwrap();
for d in &["v0000000001", "v0000000002", "v0000000010", "v0000000003"] {
std::fs::create_dir_all(chan.join(d)).unwrap();
}
std::fs::write(chan.join("v0000000001.txt"), b"file").unwrap();
std::fs::create_dir_all(chan.join("vXXXXXXXXXX")).unwrap();
std::fs::create_dir_all(chan.join("v00000001")).unwrap(); std::fs::create_dir_all(chan.join("v0000000000")).unwrap();
let gens = enumerate_generations(&chan).unwrap();
assert_eq!(
gens,
vec![10u32, 3, 2, 1],
"must return matching generations in descending order, \
skipping decoys and the reserved gen 0",
);
let _ = std::fs::remove_dir_all(&chan);
}
#[test]
fn poisoning_docs_and_errors_describe_actual_setters() {
let src = include_str!("disk.rs");
let stale_marker = format!("{}{}{}", "compact_to post", "-", "rename reopen failure",);
assert!(
!src.contains(&stale_marker),
"regression: source must not contain '{stale_marker}'. \
That parenthetical described a `compact_to` failure \
path that was deleted in the manifest-pointer rework \
(the rework opens the new generation's handles BEFORE \
the atomic flip, so a failed open aborts the compact \
with live state still intact). The `poisoned` flag's \
only setters now are the partial-write rollback paths \
(rollback_truncate, rollback_after_idx_failure); both \
the field rustdoc and the runtime error strings must \
describe that reality, not the deleted setter."
);
let new_marker = format!("{} {}", "partial-write rollback", "could not restore",);
let occurrences = src.matches(&new_marker).count();
assert!(
occurrences >= 2,
"regression: at least two error messages (one per \
append path) must use the wording '{new_marker}' to \
point operators at the real cause; saw {occurrences} \
occurrences in the source.",
);
}
#[test]
fn write_manifest_atomic_swallows_fsync_dir_failure_after_rename() {
let chan = tmpdir();
super::write_manifest_atomic(&chan, 1).expect("baseline manifest write must succeed");
assert_eq!(super::read_manifest(&chan), Some(1));
super::arm_fsync_dir_failure_at(1);
super::write_manifest_atomic(&chan, 2).expect(
"write_manifest_atomic must return Ok when fsync_dir fails \
AFTER durable_rename succeeded; surfacing Err would lie \
about whether the flip happened",
);
assert_eq!(
super::read_manifest(&chan),
Some(2),
"manifest must reflect the post-rename value even though \
the dirent fsync was injected to fail",
);
super::arm_fsync_dir_failure_at(0);
cleanup(&chan);
}
#[test]
fn write_manifest_atomic_propagates_pre_rename_failures() {
let nonexistent = tmpdir().join("does_not_exist_subdir");
let err = super::write_manifest_atomic(&nonexistent, 1)
.expect_err("write into a nonexistent dir must fail");
assert_eq!(
err.kind(),
std::io::ErrorKind::NotFound,
"pre-rename failures (here: tmp open against missing dir) \
must surface as Err so the caller knows the flip didn't \
happen; got {:?}",
err,
);
}
#[test]
fn compact_to_succeeds_when_post_rename_fsync_dir_fails() {
let base = tmpdir();
let name = ChannelName::new("t/compact_post_fsync_fail").unwrap();
let r = DiskSegment::open(&base, &name, 0, 0).unwrap();
let payload = b"survives-fsync_dir-failure";
let e = RedexEntry::new_heap(0, 0, payload.len() as u32, 0, payload_checksum(payload));
r.disk.append_entry_at(&e, payload, 5).unwrap();
r.disk.sync().unwrap();
super::arm_fsync_dir_failure_at(2);
r.disk.compact_to(&[e], &[5], 0).expect(
"compact_to must report success when fsync_dir fails \
AFTER the manifest rename succeeded",
);
assert_eq!(
r.disk.live_gen(),
FIRST_GENERATION + 1,
"live_gen must advance after a successful compact even \
when post-rename fsync_dir failed",
);
let chan = channel_dir(&base, &name);
assert_eq!(
super::read_manifest(&chan),
Some(FIRST_GENERATION + 1),
"on-disk manifest must reflect next_gen — the rename \
committed before the injected fsync_dir failure",
);
super::arm_fsync_dir_failure_at(0);
cleanup(&base);
}
#[test]
fn write_manifest_atomic_must_not_propagate_post_rename_fsync_dir() {
let src = include_str!("disk.rs");
let header = "fn write_manifest_atomic(";
let start = src.find(header).expect("write_manifest_atomic must exist");
let body_after = &src[start..];
let next_top_level = body_after
.find("\nfn ")
.or_else(|| body_after.find("\n#[cfg(test)]"))
.expect("a following item must exist");
let body = &body_after[..next_top_level];
assert!(
!body.contains("fsync_dir(channel_dir)?;"),
"regression: write_manifest_atomic must NOT propagate \
fsync_dir(channel_dir) errors via `?` after a successful \
durable_rename. The rename is the linearizing event — \
once it commits, returning Err lies to the caller about \
whether the flip happened, and any in-process appends \
between the failed write_manifest_atomic and process \
exit would land in the (now-dead) cur_gen. Wrap the \
call in `if let Err(e) = ... {{ tracing::warn!(...) }}` \
that logs and continues."
);
}
#[test]
fn migrate_flat_layout_has_exactly_one_caller() {
let src = include_str!("disk.rs");
let needle_call = "migrate_flat_layout_if_needed(";
let total = src.matches(needle_call).count();
let test_fn_header = "fn migrate_flat_layout_has_exactly_one_caller(";
let test_fn_start = src.find(test_fn_header).expect("this test must exist");
let pre_test = &src[..test_fn_start];
let cut = pre_test.rfind("\n\n").unwrap_or(0);
let src_minus_this_test = &src[..cut];
let outside = src_minus_this_test.matches(needle_call).count();
assert_eq!(
outside, 2,
"regression: `migrate_flat_layout_if_needed` must have \
exactly two source occurrences with the `(` call shape \
outside this test (one definition + one caller in \
`resolve_live_generation`); saw {outside}. The function \
clobbers any pre-existing v1 with flat-layout content, \
which is safe only under the precedence gate that \
`resolve_live_generation` enforces. A second caller \
would silently lose post-compact data on channels \
where flat files lingered alongside a real generation \
directory. See the function's `# Precedence assumption` \
rustdoc.",
);
assert!(
total >= outside,
"sanity: total occurrences ({total}) cannot exceed \
outside ({outside}); test self-reference accounting \
is broken.",
);
}
}