#![doc = include_str!("../README.md")]
use std::fs::{self, create_dir_all, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use of_core::{BookUpdate, TradePrint};
#[derive(Debug)]
pub enum PersistError {
Io(std::io::Error),
}
impl From<std::io::Error> for PersistError {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
pub type PersistResult<T> = Result<T, PersistError>;
#[derive(Debug, Clone, Copy)]
pub struct RetentionPolicy {
pub max_total_bytes: u64,
pub max_age_secs: u64,
}
#[derive(Debug, Clone)]
pub struct RollingStore {
root: PathBuf,
retention: Option<RetentionPolicy>,
}
impl RollingStore {
pub fn new(root: impl AsRef<Path>) -> PersistResult<Self> {
create_dir_all(root.as_ref())?;
Ok(Self {
root: root.as_ref().to_path_buf(),
retention: None,
})
}
pub fn with_retention(mut self, retention: Option<RetentionPolicy>) -> Self {
self.retention = retention;
self
}
pub fn append_book(&self, event: &BookUpdate) -> PersistResult<()> {
self.append_line(
&event.symbol.venue,
&event.symbol.symbol,
"book",
&format!(
"{{\"seq\":{},\"side\":\"{:?}\",\"level\":{},\"price\":{},\"size\":{},\"action\":\"{:?}\"}}",
event.sequence, event.side, event.level, event.price, event.size, event.action
),
)
}
pub fn append_trade(&self, event: &TradePrint) -> PersistResult<()> {
self.append_line(
&event.symbol.venue,
&event.symbol.symbol,
"trades",
&format!(
"{{\"seq\":{},\"price\":{},\"size\":{},\"aggressor\":\"{:?}\"}}",
event.sequence, event.price, event.size, event.aggressor_side
),
)
}
fn append_line(
&self,
venue: &str,
symbol: &str,
stream: &str,
line: &str,
) -> PersistResult<()> {
let mut dir = self.root.clone();
dir.push(venue);
dir.push(symbol);
create_dir_all(&dir)?;
let mut path = dir;
path.push(format!("{stream}.jsonl"));
let mut f = OpenOptions::new().create(true).append(true).open(path)?;
f.write_all(line.as_bytes())?;
f.write_all(b"\n")?;
self.prune_if_needed()?;
Ok(())
}
fn prune_if_needed(&self) -> PersistResult<()> {
let Some(policy) = self.retention else {
return Ok(());
};
let mut files = Vec::new();
collect_files(&self.root, &mut files)?;
if policy.max_age_secs > 0 {
let now = SystemTime::now();
for f in &files {
let age = now
.duration_since(f.modified)
.map(|d| d.as_secs())
.unwrap_or(0);
if age > policy.max_age_secs {
let _ = fs::remove_file(&f.path);
}
}
files.clear();
collect_files(&self.root, &mut files)?;
}
if policy.max_total_bytes > 0 {
let mut total: u64 = files.iter().map(|f| f.len).sum();
if total > policy.max_total_bytes {
files.sort_by_key(|f| f.modified);
for f in files {
if total <= policy.max_total_bytes {
break;
}
if fs::remove_file(&f.path).is_ok() {
total = total.saturating_sub(f.len);
}
}
}
}
Ok(())
}
}
#[derive(Debug)]
struct FileMeta {
path: PathBuf,
len: u64,
modified: SystemTime,
}
fn collect_files(root: &Path, out: &mut Vec<FileMeta>) -> PersistResult<()> {
if !root.exists() {
return Ok(());
}
for entry in fs::read_dir(root)? {
let entry = entry?;
let path = entry.path();
let ty = entry.file_type()?;
if ty.is_dir() {
collect_files(&path, out)?;
} else if ty.is_file() {
let meta = entry.metadata()?;
out.push(FileMeta {
path,
len: meta.len(),
modified: meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
});
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::fs;
use of_core::{BookAction, BookUpdate, Side, SymbolId};
use super::*;
#[test]
fn prunes_by_total_size() {
let root = temp_dir("persist_prune_size");
let store = RollingStore::new(&root)
.expect("store")
.with_retention(Some(RetentionPolicy {
max_total_bytes: 150,
max_age_secs: 0,
}));
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
for seq in 0..20 {
store
.append_book(&BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level: 0,
price: 100,
size: 1,
action: BookAction::Upsert,
sequence: seq,
ts_exchange_ns: 0,
ts_recv_ns: 0,
})
.expect("append");
}
let mut files = Vec::new();
collect_files(&root, &mut files).expect("collect");
let total: u64 = files.iter().map(|f| f.len).sum();
assert!(total <= 150);
}
#[test]
fn prunes_by_age() {
let root = temp_dir("persist_prune_age");
let old_path = root.join("old.jsonl");
fs::write(&old_path, b"old").expect("write old");
std::thread::sleep(std::time::Duration::from_millis(2200));
let store = RollingStore::new(&root)
.expect("store")
.with_retention(Some(RetentionPolicy {
max_total_bytes: 0,
max_age_secs: 1,
}));
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
store
.append_book(&BookUpdate {
symbol,
side: Side::Bid,
level: 0,
price: 100,
size: 1,
action: BookAction::Upsert,
sequence: 1,
ts_exchange_ns: 0,
ts_recv_ns: 0,
})
.expect("append");
assert!(!old_path.exists());
}
fn temp_dir(name: &str) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(format!(
"{}_{}_{}",
std::process::id(),
name,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock ok")
.as_nanos()
));
fs::create_dir_all(&path).expect("temp dir");
path
}
}