use crate::config;
use crate::error::Result;
use ant_protocol::evm::Amount;
use fs2::FileExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{self, DirEntry, File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, info, warn};
const PAYMENT_EXPIRATION_SECS: u64 = 24 * 60 * 60;
const PAYMENTS_SUBDIR: &str = "payments/single";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SingleNodeReceipt {
#[serde(default)]
pub version: u8,
pub proofs: HashMap<[u8; 32], Vec<u8>>,
pub first_pay_timestamp: u64,
pub storage_cost_atto: String,
pub gas_cost_wei: u128,
}
const SCHEMA_VERSION: u8 = 1;
impl SingleNodeReceipt {
fn new(now_secs: u64) -> Self {
Self {
version: SCHEMA_VERSION,
proofs: HashMap::new(),
first_pay_timestamp: now_secs,
storage_cost_atto: "0".to_string(),
gas_cost_wei: 0,
}
}
}
fn payments_dir() -> Result<PathBuf> {
let dir = config::data_dir()?.join(PAYMENTS_SUBDIR);
fs::create_dir_all(&dir)?;
Ok(dir)
}
fn file_hash_key(file_path: &str) -> String {
let digest = blake3::hash(file_path.as_bytes());
let bytes = digest.as_bytes();
let mut out = String::with_capacity(32);
for byte in &bytes[..16] {
out.push_str(&format!("{byte:02x}"));
}
out
}
fn receipt_path(dir: &Path, ts: u64, key: &str) -> PathBuf {
dir.join(format!("{ts}_{key}"))
}
pub fn append_wave(
file_path: &str,
new_proofs: HashMap<[u8; 32], Vec<u8>>,
wave_storage_cost_atto: &str,
wave_gas_cost_wei: u128,
) -> Result<PathBuf> {
let dir = payments_dir()?;
let key = file_hash_key(file_path);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let _guard = ReceiptLock::acquire(&dir, &key)?;
recover_orphaned_tmps(&dir, &key);
let (old_path, mut receipt) = match find_existing(&dir, &key)? {
Some((p, r)) => (Some(p), r),
None => (None, SingleNodeReceipt::new(now)),
};
let new_path = receipt_path(&dir, now, &key);
receipt.proofs.extend(new_proofs);
if let (Ok(prev), Ok(add)) = (
receipt.storage_cost_atto.parse::<Amount>(),
wave_storage_cost_atto.parse::<Amount>(),
) {
receipt.storage_cost_atto = prev.saturating_add(add).to_string();
}
receipt.gas_cost_wei = receipt.gas_cost_wei.saturating_add(wave_gas_cost_wei);
write_receipt_atomic(&new_path, &receipt)?;
if let Some(old) = old_path {
if old != new_path {
let _ = fs::remove_file(&old);
}
}
debug!(
"Appended {} proofs to single-node receipt for {file_path:?} ({})",
receipt.proofs.len(),
new_path.display()
);
Ok(new_path)
}
pub fn drop_proofs_for_file(file_path: &str, expected: &[([u8; 32], Vec<u8>)]) -> Result<()> {
if expected.is_empty() {
return Ok(());
}
let dir = payments_dir()?;
let key = file_hash_key(file_path);
let _guard = ReceiptLock::acquire(&dir, &key)?;
recover_orphaned_tmps(&dir, &key);
let Some((path, mut receipt)) = find_existing(&dir, &key)? else {
return Ok(());
};
let before = receipt.proofs.len();
let mut refreshed = 0usize;
for (addr, expected_bytes) in expected {
match receipt.proofs.get(addr) {
Some(current) if current == expected_bytes => {
receipt.proofs.remove(addr);
}
Some(_) => {
refreshed += 1;
}
None => {}
}
}
if refreshed > 0 {
info!(
"Skipped dropping {refreshed} stale proofs whose bytes changed since load \
(concurrent re-pay refreshed them — keeping the fresh proof)"
);
}
let dropped = before.saturating_sub(receipt.proofs.len());
if dropped == 0 {
return Ok(());
}
if receipt.proofs.is_empty() {
if let Err(e) = fs::remove_file(&path) {
warn!(
"Could not remove emptied single-node receipt {} ({e}); \
writing empty receipt instead",
path.display()
);
write_receipt_atomic(&path, &receipt)?;
} else {
debug!(
"Dropped final {dropped} proofs from single-node receipt for {file_path:?}; \
receipt removed"
);
}
return Ok(());
}
write_receipt_atomic(&path, &receipt)?;
debug!(
"Dropped {dropped} stale proofs from single-node receipt for {file_path:?} ({})",
path.display()
);
Ok(())
}
pub fn try_drop_proofs_for_file(file_path: &str, expected: &[([u8; 32], Vec<u8>)]) {
if let Err(e) = drop_proofs_for_file(file_path, expected) {
warn!(
"Failed to drop stale proofs from cached single-node receipt for \
{file_path:?}: {e}. Stale entries may be retried next attempt."
);
}
}
pub fn try_append_wave(
file_path: &str,
new_proofs: HashMap<[u8; 32], Vec<u8>>,
wave_storage_cost_atto: &str,
wave_gas_cost_wei: u128,
) {
if let Err(e) = append_wave(
file_path,
new_proofs,
wave_storage_cost_atto,
wave_gas_cost_wei,
) {
warn!(
"Failed to cache single-node payment receipt for {file_path:?}: {e}. \
Upload will proceed without resume support for this wave."
);
}
}
pub fn load_for_file(file_path: &str) -> Result<Option<(PathBuf, SingleNodeReceipt)>> {
cleanup_outdated();
let dir = payments_dir()?;
let key = file_hash_key(file_path);
let _guard = ReceiptLock::acquire(&dir, &key)?;
recover_orphaned_tmps(&dir, &key);
find_existing(&dir, &key)
}
pub fn try_load_for_file(file_path: &str) -> Option<(PathBuf, SingleNodeReceipt)> {
match load_for_file(file_path) {
Ok(opt) => opt,
Err(e) => {
warn!(
"Failed to look up cached single-node receipt for {file_path:?}: {e}. \
Starting a fresh upload."
);
None
}
}
}
pub fn delete_for_file(file_path: &str) -> Result<()> {
let dir = payments_dir()?;
let key = file_hash_key(file_path);
let _guard = ReceiptLock::acquire(&dir, &key)?;
if let Ok(read_dir) = fs::read_dir(&dir) {
for entry in read_dir.flatten() {
let path = entry.path();
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if name.ends_with(".lock") {
continue;
}
if !name.contains(&key) {
continue;
}
let _ = fs::remove_file(&path);
debug!("Deleted cached single-node receipt {}", path.display());
}
}
Ok(())
}
pub fn try_delete_for_file(file_path: &str) {
if let Err(e) = delete_for_file(file_path) {
warn!(
"Failed to delete cached single-node receipt for {file_path:?}: {e}. \
Will be cleaned up after expiry."
);
}
}
pub fn cleanup_outdated() {
let Ok(dir) = payments_dir() else {
return;
};
let Ok(read_dir) = fs::read_dir(&dir) else {
return;
};
for entry in read_dir.flatten() {
if is_expired_entry(&entry) {
let path = entry.path();
info!(
"Removing expired cached single-node payment file: {}",
path.display()
);
let _ = fs::remove_file(path);
}
}
}
fn recover_orphaned_tmps(dir: &Path, key: &str) {
let Ok(read_dir) = fs::read_dir(dir) else {
return;
};
let mut candidates: Vec<(u64, PathBuf, bool)> = Vec::new();
for entry in read_dir.flatten() {
let path = entry.path();
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if !name.ends_with(".tmp") || !name.contains(key) {
continue;
}
let ts = name
.split_once('_')
.and_then(|(ts, _)| ts.parse::<u64>().ok())
.unwrap_or(0);
let readable = read_receipt(&path).is_ok();
candidates.push((ts, path, readable));
}
candidates.sort_by_key(|c| std::cmp::Reverse(c.0));
let mut recovered = false;
for (_, path, readable) in candidates {
if recovered || !readable {
let _ = fs::remove_file(&path);
continue;
}
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
let canonical_name = &name[..name.len() - ".tmp".len()];
let canonical = path.with_file_name(canonical_name);
match fs::rename(&path, &canonical) {
Ok(()) => {
info!(
"Recovered orphaned receipt {} -> {}",
path.display(),
canonical.display()
);
recovered = true;
}
Err(e) => warn!(
"Could not recover orphaned receipt {} ({e})",
path.display()
),
}
}
dedupe_canonical_receipts(dir, key);
}
fn dedupe_canonical_receipts(dir: &Path, key: &str) {
let Ok(read_dir) = fs::read_dir(dir) else {
return;
};
let mut canonicals: Vec<(u64, PathBuf)> = Vec::new();
for entry in read_dir.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if name.ends_with(".tmp") || name.ends_with(".lock") {
continue;
}
if !name.contains(key) {
continue;
}
let ts = name
.split_once('_')
.and_then(|(ts, _)| ts.parse::<u64>().ok())
.unwrap_or(0);
canonicals.push((ts, path));
}
if canonicals.len() <= 1 {
return;
}
canonicals.sort_by_key(|c| std::cmp::Reverse(c.0));
let (winner_ts, winner_path) = canonicals[0].clone();
let mut winner = match read_receipt(&winner_path) {
Ok(r) => r,
Err(_) => {
warn!(
"Newest canonical {} unreadable; unlinking and retrying dedupe",
winner_path.display()
);
let _ = fs::remove_file(&winner_path);
return dedupe_canonical_receipts(dir, key);
}
};
let mut merged_from = 0usize;
for (_, stale) in canonicals.iter().skip(1) {
match read_receipt(stale) {
Ok(other) => {
let mut added = 0usize;
for (addr, bytes) in other.proofs {
winner.proofs.entry(addr).or_insert_with(|| {
added += 1;
bytes
});
}
if let (Ok(w), Ok(o)) = (
winner.storage_cost_atto.parse::<Amount>(),
other.storage_cost_atto.parse::<Amount>(),
) {
winner.storage_cost_atto = w.saturating_add(o).to_string();
}
winner.gas_cost_wei = winner.gas_cost_wei.saturating_add(other.gas_cost_wei);
winner.first_pay_timestamp =
winner.first_pay_timestamp.min(other.first_pay_timestamp);
merged_from += 1;
info!(
"Merged {added} proofs from older canonical {} into winner {}",
stale.display(),
winner_path.display()
);
}
Err(_) => {
warn!(
"Dropping unreadable duplicate canonical {} (no recoverable proofs)",
stale.display()
);
}
}
let _ = fs::remove_file(stale);
}
if merged_from > 0 {
if let Err(e) = write_receipt_atomic(&winner_path, &winner) {
warn!(
"Could not rewrite merged canonical receipt {} ({e}); \
winner retains pre-merge content and the older proofs \
are lost. Best-effort: leaving on-disk state as-is.",
winner_path.display()
);
}
}
let _ = winner_ts;
}
fn find_existing(dir: &Path, key: &str) -> Result<Option<(PathBuf, SingleNodeReceipt)>> {
let read_dir = match fs::read_dir(dir) {
Ok(rd) => rd,
Err(e) => {
debug!("Could not read payments dir {}: {e}", dir.display());
return Ok(None);
}
};
for entry in read_dir.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if name.ends_with(".lock") || name.ends_with(".tmp") {
continue;
}
if !name.contains(key) {
continue;
}
if is_expired_filename(name) {
continue;
}
match read_receipt(&path) {
Ok(receipt) => {
info!(
"Found previous single-node upload attempt, resuming with \
{} cached proofs from {}",
receipt.proofs.len(),
path.display()
);
return Ok(Some((path, receipt)));
}
Err(e) => {
warn!(
"Cached single-node receipt at {} is unreadable ({e}). \
Unlinking and starting a fresh upload.",
path.display()
);
let _ = fs::remove_file(&path);
}
}
}
Ok(None)
}
fn is_expired_entry(entry: &DirEntry) -> bool {
let path = entry.path();
if !path.is_file() {
return false;
}
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
return false;
};
if name.ends_with(".lock") || name.ends_with(".tmp") {
return false;
}
is_expired_filename(name)
}
fn is_expired_filename(name: &str) -> bool {
let ts_str = match name.split_once('_') {
Some((ts, _)) => ts,
None => return false,
};
let Ok(ts) = ts_str.parse::<u64>() else {
return false;
};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
now > ts.saturating_add(PAYMENT_EXPIRATION_SECS)
}
fn read_receipt(path: &Path) -> Result<SingleNodeReceipt> {
let handle = File::open(path)?;
let receipt: SingleNodeReceipt = rmp_serde::decode::from_read(BufReader::new(handle))
.map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?;
if receipt.version > SCHEMA_VERSION {
return Err(crate::error::Error::Io(std::io::Error::other(format!(
"cached receipt has unknown schema version {} (this binary supports up to {SCHEMA_VERSION})",
receipt.version
))));
}
Ok(receipt)
}
fn write_receipt_atomic(path: &Path, receipt: &SingleNodeReceipt) -> Result<()> {
let tmp_path = tmp_path_for(path);
{
let handle = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp_path)?;
let mut writer = BufWriter::new(handle);
if let Err(e) = rmp_serde::encode::write(&mut writer, receipt) {
let _ = fs::remove_file(&tmp_path);
return Err(crate::error::Error::Io(std::io::Error::other(
e.to_string(),
)));
}
let mut handle = writer.into_inner().map_err(|e| {
let _ = fs::remove_file(&tmp_path);
crate::error::Error::Io(std::io::Error::other(format!(
"BufWriter flush failed: {e}"
)))
})?;
if let Err(e) = handle.flush() {
let _ = fs::remove_file(&tmp_path);
return Err(e.into());
}
if let Err(e) = handle.sync_all() {
let _ = fs::remove_file(&tmp_path);
return Err(e.into());
}
}
if let Err(e) = fs::rename(&tmp_path, path) {
let _ = fs::remove_file(&tmp_path);
return Err(e.into());
}
if let Some(parent) = path.parent() {
if let Ok(dir) = File::open(parent) {
let _ = dir.sync_all();
}
}
Ok(())
}
fn tmp_path_for(path: &Path) -> PathBuf {
let mut tmp = path.to_path_buf();
let name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("receipt");
tmp.set_file_name(format!("{name}.tmp"));
tmp
}
struct ReceiptLock {
file: File,
}
impl ReceiptLock {
fn acquire(dir: &Path, key: &str) -> Result<Self> {
let path = dir.join(format!("{key}.lock"));
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&path)?;
file.lock_exclusive()?;
Ok(Self { file })
}
}
impl Drop for ReceiptLock {
fn drop(&mut self) {
let _ = FileExt::unlock(&self.file);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn file_hash_key_is_stable() {
assert_eq!(file_hash_key("/tmp/a"), file_hash_key("/tmp/a"));
assert_ne!(file_hash_key("/tmp/a"), file_hash_key("/tmp/b"));
}
#[test]
fn file_hash_key_uses_stable_digest_across_invocations() {
let expected = "491a1a569cd6c544074a70504b2b5183";
assert_eq!(file_hash_key("/tmp/anselme-cache-stable-test"), expected);
}
#[test]
fn append_wave_rotates_filename_so_late_waves_dont_age_out() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-ttl-rotation-test-{nanos}");
let mut wave1: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
wave1.insert([1u8; 32], vec![1]);
let path_after_wave1 = append_wave(&file_path, wave1, "10", 20)?;
let ts_after_wave1 = path_after_wave1
.file_name()
.and_then(|n| n.to_str())
.and_then(|n| n.split_once('_'))
.and_then(|(ts, _)| ts.parse::<u64>().ok())
.expect("wave1 receipt name parses");
std::thread::sleep(std::time::Duration::from_millis(1100));
let mut wave2: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
wave2.insert([2u8; 32], vec![2]);
let path_after_wave2 = append_wave(&file_path, wave2, "5", 10)?;
let ts_after_wave2 = path_after_wave2
.file_name()
.and_then(|n| n.to_str())
.and_then(|n| n.split_once('_'))
.and_then(|(ts, _)| ts.parse::<u64>().ok())
.expect("wave2 receipt name parses");
assert_ne!(
path_after_wave1, path_after_wave2,
"filename must rotate so TTL tracks LAST wave, not first"
);
assert!(
ts_after_wave2 > ts_after_wave1,
"rotated filename's timestamp must be strictly newer"
);
assert!(
!path_after_wave1.exists(),
"old canonical must be unlinked after the rewrite"
);
assert!(path_after_wave2.exists());
let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load");
assert!(loaded.proofs.contains_key(&[1u8; 32]));
assert!(loaded.proofs.contains_key(&[2u8; 32]));
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn expired_filename_detected() {
let stale = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.saturating_sub(PAYMENT_EXPIRATION_SECS + 60);
assert!(is_expired_filename(&format!("{stale}_abc")));
let fresh = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.saturating_sub(60);
assert!(!is_expired_filename(&format!("{fresh}_abc")));
}
#[test]
fn malformed_filename_is_not_expired() {
assert!(!is_expired_filename("nonsense"));
assert!(!is_expired_filename("not_a_number_abc"));
}
#[test]
fn drop_proofs_removes_only_specified_addresses() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-drop-proofs-test-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([1u8; 32], vec![1]);
proofs.insert([2u8; 32], vec![2]);
proofs.insert([3u8; 32], vec![3]);
append_wave(&file_path, proofs, "30", 60)?;
drop_proofs_for_file(&file_path, &[([2u8; 32], vec![2])])?;
let (_, loaded) = load_for_file(&file_path)?.expect("receipt still present");
assert_eq!(loaded.proofs.len(), 2);
assert!(loaded.proofs.contains_key(&[1u8; 32]));
assert!(!loaded.proofs.contains_key(&[2u8; 32]));
assert!(loaded.proofs.contains_key(&[3u8; 32]));
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn drop_proofs_skips_drop_if_bytes_have_changed() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-drop-cas-test-{nanos}");
let mut old: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
old.insert([5u8; 32], vec![0xAA]);
append_wave(&file_path, old, "10", 20)?;
let mut fresh: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
fresh.insert([5u8; 32], vec![0xBB]);
append_wave(&file_path, fresh, "0", 0)?;
drop_proofs_for_file(&file_path, &[([5u8; 32], vec![0xAA])])?;
let (_, loaded) = load_for_file(&file_path)?.expect("receipt still present");
assert_eq!(
loaded.proofs.get(&[5u8; 32]),
Some(&vec![0xBB]),
"fresh proof must NOT be clobbered by a CAS drop with stale bytes"
);
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn drop_proofs_removes_receipt_file_when_emptied() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-drop-empty-test-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([7u8; 32], vec![7]);
append_wave(&file_path, proofs, "10", 20)?;
drop_proofs_for_file(&file_path, &[([7u8; 32], vec![7])])?;
assert!(
load_for_file(&file_path)?.is_none(),
"empty receipt should be removed"
);
Ok(())
}
#[test]
fn drop_proofs_unknown_address_is_noop() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-drop-noop-test-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([9u8; 32], vec![9]);
append_wave(&file_path, proofs, "10", 20)?;
drop_proofs_for_file(&file_path, &[([42u8; 32], vec![42])])?;
let (_, loaded) = load_for_file(&file_path)?.expect("receipt still present");
assert_eq!(loaded.proofs.len(), 1);
assert!(loaded.proofs.contains_key(&[9u8; 32]));
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn drop_proofs_on_missing_receipt_is_noop() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-drop-missing-test-{nanos}");
drop_proofs_for_file(&file_path, &[([0u8; 32], vec![0])])?;
assert!(load_for_file(&file_path)?.is_none());
Ok(())
}
#[test]
fn write_receipt_atomic_leaves_no_tmp_file() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-atomic-tmp-test-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([5u8; 32], vec![5]);
let receipt_path = append_wave(&file_path, proofs, "1", 2)?;
let tmp = tmp_path_for(&receipt_path);
assert!(!tmp.exists(), "tmp sibling must be cleaned up after rename");
assert!(receipt_path.exists());
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn find_existing_ignores_lock_and_tmp_sidecars() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-sidecar-test-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([6u8; 32], vec![6]);
let receipt_path = append_wave(&file_path, proofs, "1", 2)?;
let dir = receipt_path.parent().expect("receipt has parent dir");
let stray_tmp = dir.join("123456_deadbeef.tmp");
fs::write(&stray_tmp, b"garbage")?;
let (loaded_path, loaded) = load_for_file(&file_path)?.expect("receipt still loaded");
assert_eq!(loaded_path, receipt_path);
assert_eq!(loaded.proofs.len(), 1);
assert!(stray_tmp.exists(), "stray tmp not auto-deleted by load");
cleanup_outdated();
assert!(stray_tmp.exists());
let _ = fs::remove_file(&stray_tmp);
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn concurrent_append_waves_do_not_lose_proofs() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-concurrent-test-{nanos}");
let fp1 = file_path.clone();
let fp2 = file_path.clone();
let t1 = std::thread::spawn(move || {
let mut wave: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
for i in 0u8..32 {
wave.insert([i; 32], vec![i]);
}
append_wave(&fp1, wave, "10", 20)
});
let t2 = std::thread::spawn(move || {
let mut wave: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
for i in 32u8..64 {
wave.insert([i; 32], vec![i]);
}
append_wave(&fp2, wave, "10", 20)
});
t1.join().expect("thread1 panicked")?;
t2.join().expect("thread2 panicked")?;
let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load");
assert_eq!(
loaded.proofs.len(),
64,
"all 64 proofs must survive concurrent appends"
);
for i in 0u8..64 {
assert!(
loaded.proofs.contains_key(&[i; 32]),
"proof {i} lost in concurrent append"
);
}
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn roundtrip_save_load_delete() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-resumable-single-test-{nanos}");
let mut wave1: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
wave1.insert([2u8; 32], vec![10, 20]);
let path1 = append_wave(&file_path, wave1, "50", 100)?;
assert!(path1.exists());
let mut wave2: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
wave2.insert([3u8; 32], vec![30, 40]);
let path2 = append_wave(&file_path, wave2, "70", 50)?;
assert_eq!(path1, path2);
let (loaded_path, loaded) = load_for_file(&file_path)?.expect("receipt should load");
assert_eq!(loaded_path, path1);
assert_eq!(loaded.proofs.len(), 2);
assert!(loaded.proofs.contains_key(&[2u8; 32]));
assert!(loaded.proofs.contains_key(&[3u8; 32]));
assert_eq!(loaded.storage_cost_atto, "120");
assert_eq!(loaded.gas_cost_wei, 150);
delete_for_file(&file_path)?;
assert!(load_for_file(&file_path)?.is_none());
Ok(())
}
#[test]
fn concurrent_append_after_existing_receipt_keeps_all_proofs() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-concurrent-silent-test-{nanos}");
let mut seed: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
seed.insert([200u8; 32], vec![200]);
seed.insert([201u8; 32], vec![201]);
seed.insert([202u8; 32], vec![202]);
seed.insert([203u8; 32], vec![203]);
append_wave(&file_path, seed, "1", 1)?;
const THREADS: u8 = 16;
const PER_THREAD: u8 = 8;
let handles: Vec<_> = (0..THREADS)
.map(|t| {
let fp = file_path.clone();
std::thread::spawn(move || {
let mut wave: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
let base = t.wrapping_mul(PER_THREAD);
for i in 0..PER_THREAD {
let addr = base.wrapping_add(i);
wave.insert([addr; 32], vec![addr]);
}
append_wave(&fp, wave, "1", 1)
})
})
.collect();
for h in handles {
h.join().expect("appender thread panicked")?;
}
let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load");
for k in [200u8, 201, 202, 203] {
assert!(
loaded.proofs.contains_key(&[k; 32]),
"seed proof {k} disappeared (silent loss)"
);
}
for t in 0..THREADS {
for i in 0..PER_THREAD {
let addr = t.wrapping_mul(PER_THREAD).wrapping_add(i);
assert!(
loaded.proofs.contains_key(&[addr; 32]),
"appended proof {addr} disappeared (silent loss)"
);
}
}
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn delete_for_file_unlinks_tmp_residue_and_keeps_lock() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-delete-skip-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([0x42; 32], vec![0x42]);
let receipt_path = append_wave(&file_path, proofs, "1", 1)?;
let dir = receipt_path.parent().expect("receipt has parent dir");
let key = file_hash_key(&file_path);
let stray_tmp = dir.join(format!("9999_{key}.tmp"));
fs::write(&stray_tmp, b"in-flight")?;
let lock_sidecar = dir.join(format!("{key}.lock"));
assert!(lock_sidecar.exists(), "append_wave should leave a .lock");
delete_for_file(&file_path)?;
assert!(!receipt_path.exists(), "canonical receipt deleted");
assert!(
!stray_tmp.exists(),
"delete_for_file must unlink .tmp residue (prevents zombie resurrection)"
);
assert!(
lock_sidecar.exists(),
"delete_for_file must not delete the .lock sidecar"
);
let _ = fs::remove_file(&lock_sidecar);
Ok(())
}
#[test]
fn orphaned_tmp_with_valid_receipt_is_recovered() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-orphan-recover-{nanos}");
let key = file_hash_key(&file_path);
let dir = payments_dir()?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let canonical = receipt_path(&dir, now, &key);
let tmp = tmp_path_for(&canonical);
let mut r = SingleNodeReceipt::new(now);
r.proofs.insert([0xEE; 32], vec![0xEE, 0xEF]);
r.storage_cost_atto = "13".into();
r.gas_cost_wei = 7;
let handle = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp)?;
rmp_serde::encode::write(&mut BufWriter::new(handle), &r)
.map_err(|e| crate::error::Error::Io(std::io::Error::other(e.to_string())))?;
assert!(tmp.exists());
assert!(!canonical.exists());
let (loaded_path, loaded) = load_for_file(&file_path)?.expect("orphan recovered");
assert!(
loaded.proofs.contains_key(&[0xEE; 32]),
"recovered proof bytes lost"
);
assert!(
!loaded_path.to_string_lossy().ends_with(".tmp"),
"loaded path should be canonical, not .tmp"
);
assert!(loaded_path.exists());
assert!(!tmp.exists(), "orphan .tmp should have been renamed away");
delete_for_file(&file_path)?;
let _ = fs::remove_file(dir.join(format!("{key}.lock")));
Ok(())
}
#[test]
fn orphaned_tmp_with_garbage_is_unlinked() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-orphan-unlink-{nanos}");
let key = file_hash_key(&file_path);
let dir = payments_dir()?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let canonical = receipt_path(&dir, now, &key);
let tmp = tmp_path_for(&canonical);
fs::write(&tmp, b"not valid msgpack")?;
assert!(tmp.exists());
let result = load_for_file(&file_path)?;
assert!(result.is_none(), "no usable receipt should be present");
assert!(
!tmp.exists(),
"garbage orphan .tmp should have been unlinked"
);
assert!(
!canonical.exists(),
"garbage must not be renamed to canonical"
);
let _ = fs::remove_file(dir.join(format!("{key}.lock")));
Ok(())
}
#[test]
fn write_receipt_atomic_preserves_existing_on_torn_tmp() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-atomic-preserve-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([0xAA; 32], vec![0xAA]);
let canonical = append_wave(&file_path, proofs, "10", 20)?;
let canonical_bytes_before = fs::read(&canonical)?;
let tmp = tmp_path_for(&canonical);
fs::write(&tmp, b"")?;
let (_, loaded) = load_for_file(&file_path)?.expect("canonical preserved");
assert_eq!(loaded.proofs.len(), 1);
assert!(loaded.proofs.contains_key(&[0xAA; 32]));
assert!(!tmp.exists(), "torn .tmp unlinked");
assert_eq!(
fs::read(&canonical)?,
canonical_bytes_before,
"canonical bytes unchanged by torn .tmp recovery"
);
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn concurrent_drop_and_append_keep_consistent_state() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-drop-append-concurrent-{nanos}");
let mut seed: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
seed.insert([99u8; 32], vec![99]);
append_wave(&file_path, seed, "1", 1)?;
let mut handles = Vec::new();
for i in 0u8..8 {
let fp = file_path.clone();
handles.push(std::thread::spawn(move || -> Result<()> {
if i % 2 == 0 {
let mut wave: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
wave.insert([i; 32], vec![i, i, i]);
append_wave(&fp, wave, "1", 1)?;
} else {
drop_proofs_for_file(&fp, &[([99u8; 32], vec![99])])?;
}
Ok(())
}));
}
for h in handles {
h.join().expect("thread panicked")?;
}
if let Some((_, loaded)) = load_for_file(&file_path)? {
for i in (0u8..8).step_by(2) {
assert!(
loaded.proofs.contains_key(&[i; 32]),
"appended proof {i} must survive concurrent drop+append"
);
assert_eq!(loaded.proofs.get(&[i; 32]), Some(&vec![i, i, i]));
}
} else {
panic!("receipt should still exist with all appended proofs");
}
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn wave_cost_above_u128_max_does_not_silently_drop_cumulative() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-cost-overflow-{nanos}");
let near_half_max = "170141183460469231731687303715884105728"; let mut w1: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
w1.insert([1u8; 32], vec![1]);
append_wave(&file_path, w1, near_half_max, 0)?;
let mut w2: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
w2.insert([2u8; 32], vec![2]);
append_wave(&file_path, w2, near_half_max, 0)?;
let (_, loaded) = load_for_file(&file_path)?.expect("receipt should load");
let expected = "340282366920938463463374607431768211456";
assert_eq!(
loaded.storage_cost_atto, expected,
"cumulative cost must NOT silently saturate at u128::MAX"
);
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn cleanup_outdated_skips_tmp_even_with_ancient_prefix() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-cleanup-tmp-skip-{nanos}");
let mut proofs: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
proofs.insert([0xAA; 32], vec![0xAA]);
append_wave(&file_path, proofs, "1", 1)?;
let dir = payments_dir()?;
let key = file_hash_key(&file_path);
let ancient_tmp = dir.join(format!("1_{key}.tmp"));
fs::write(&ancient_tmp, b"in-flight")?;
cleanup_outdated();
assert!(
ancient_tmp.exists(),
"cleanup_outdated must not reap .tmp by ancient timestamp prefix"
);
let _ = fs::remove_file(&ancient_tmp);
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn duplicate_canonical_receipts_are_merged_then_older_unlinked() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-dedupe-canonical-{nanos}");
let dir = payments_dir()?;
let key = file_hash_key(&file_path);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let old_ts = now.saturating_sub(120);
let new_ts = now.saturating_sub(60);
let old_path = dir.join(format!("{old_ts}_{key}"));
let new_path = dir.join(format!("{new_ts}_{key}"));
let mut old = SingleNodeReceipt::new(old_ts);
old.proofs.insert([1u8; 32], vec![0xAA]);
old.storage_cost_atto = "10".to_string();
old.gas_cost_wei = 20;
let mut new = SingleNodeReceipt::new(new_ts);
new.proofs.insert([2u8; 32], vec![0xBB]);
new.storage_cost_atto = "30".to_string();
new.gas_cost_wei = 40;
write_receipt_atomic(&old_path, &old)?;
write_receipt_atomic(&new_path, &new)?;
assert!(old_path.exists() && new_path.exists());
let _guard = ReceiptLock::acquire(&dir, &key)?;
dedupe_canonical_receipts(&dir, &key);
drop(_guard);
assert!(
!old_path.exists(),
"older canonical receipt must be unlinked after merge"
);
assert!(new_path.exists(), "newer canonical receipt must survive");
let merged = read_receipt(&new_path)?;
assert!(
merged.proofs.contains_key(&[1u8; 32]),
"older sibling's proof must be merged into the winner"
);
assert!(merged.proofs.contains_key(&[2u8; 32]));
assert_eq!(merged.proofs.len(), 2);
assert_eq!(merged.storage_cost_atto, "40", "costs must be summed");
assert_eq!(merged.gas_cost_wei, 60);
assert_eq!(
merged.first_pay_timestamp, old_ts,
"first_pay_timestamp must be the MIN across merged siblings"
);
delete_for_file(&file_path)?;
Ok(())
}
#[test]
fn unreadable_canonical_receipt_is_unlinked_by_find_existing() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-unreadable-canonical-{nanos}");
let dir = payments_dir()?;
let key = file_hash_key(&file_path);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let canonical = dir.join(format!("{now}_{key}"));
fs::write(&canonical, b"this is not msgpack")?;
assert!(canonical.exists());
let result = load_for_file(&file_path)?;
assert!(result.is_none(), "no usable receipt");
assert!(
!canonical.exists(),
"corrupt canonical receipt should be unlinked, not left for 24 h"
);
Ok(())
}
#[test]
fn future_schema_version_is_treated_as_unreadable() -> Result<()> {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let file_path = format!("/tmp/anselme-future-schema-{nanos}");
let dir = payments_dir()?;
let key = file_hash_key(&file_path);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let canonical = dir.join(format!("{now}_{key}"));
let receipt = SingleNodeReceipt {
version: SCHEMA_VERSION.saturating_add(1),
proofs: {
let mut m: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
m.insert([1u8; 32], vec![1]);
m
},
first_pay_timestamp: now,
storage_cost_atto: "10".to_string(),
gas_cost_wei: 20,
};
write_receipt_atomic(&canonical, &receipt)?;
assert!(canonical.exists());
let result = load_for_file(&file_path)?;
assert!(result.is_none(), "future schema must be rejected");
assert!(!canonical.exists(), "rejected receipt must be unlinked");
Ok(())
}
}