use std::{
fmt::Debug,
fs,
io::ErrorKind,
path::{Path, PathBuf},
};
use bincode::config::{Configuration, standard};
use nautilus_core::UnixNanos;
use redb::{
CommitError, Database, DatabaseError, Durability, ReadOnlyDatabase, ReadTransaction,
ReadableDatabase, ReadableTable, StorageError, TableDefinition, TableError, TransactionError,
WriteTransaction,
};
use crate::{
backend::{AppendEntry, EventStore, IndexKey, IndexKind, ScanDirection},
entry::EventStoreEntry,
error::EventStoreError,
manifest::{RunManifest, RunStatus},
snapshot::{SnapshotAnchor, validate_new_anchor},
};
const ENTRIES_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("entries");
const MANIFEST_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("manifest");
const CLIENT_ORDER_INDEX: TableDefinition<&str, u64> = TableDefinition::new("client_order_id_idx");
const VENUE_ORDER_INDEX: TableDefinition<&str, u64> = TableDefinition::new("venue_order_id_idx");
const SNAPSHOT_ANCHOR_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("snapshot_anchor");
const MANIFEST_KEY: &str = "current";
const SNAPSHOT_ANCHOR_KEY: &str = "latest";
const BINCODE_CONFIG: Configuration = standard();
#[derive(Debug)]
pub struct RedbBackend {
base_dir: PathBuf,
state: Option<RunState>,
}
#[derive(Debug)]
struct RunState {
db: RunDatabase,
manifest: RunManifest,
high_watermark: u64,
max_ts_init: UnixNanos,
file_path: PathBuf,
}
enum RunDatabase {
ReadWrite(Database),
ReadOnly(ReadOnlyDatabase),
}
impl Debug for RunDatabase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ReadWrite(_) => f.write_str("RunDatabase::ReadWrite"),
Self::ReadOnly(_) => f.write_str("RunDatabase::ReadOnly"),
}
}
}
impl RunDatabase {
fn readable(&self) -> &dyn ReadableDatabase {
match self {
Self::ReadWrite(db) => db,
Self::ReadOnly(db) => db,
}
}
fn read_write(&self) -> Result<&Database, EventStoreError> {
match self {
Self::ReadWrite(db) => Ok(db),
Self::ReadOnly(_) => Err(EventStoreError::Closed),
}
}
fn begin_read(&self) -> Result<ReadTransaction, EventStoreError> {
self.readable().begin_read().map_err(map_transaction_err)
}
}
impl RedbBackend {
#[must_use]
pub fn new(base_dir: impl Into<PathBuf>) -> Self {
Self {
base_dir: base_dir.into(),
state: None,
}
}
#[must_use]
pub fn run_dir(&self, instance_id: &str) -> PathBuf {
self.base_dir.join(instance_id)
}
#[must_use]
pub fn run_path(&self, instance_id: &str, run_id: &str) -> PathBuf {
self.run_dir(instance_id).join(format!("{run_id}.redb"))
}
pub fn current_path(&self) -> Result<&Path, EventStoreError> {
Ok(self.state()?.file_path.as_path())
}
pub fn open_sealed(
base_dir: impl Into<PathBuf>,
instance_id: &str,
run_id: &str,
) -> Result<Self, EventStoreError> {
let base = base_dir.into();
let path = base.join(instance_id).join(format!("{run_id}.redb"));
Self::open_sealed_path(base, path)
}
pub fn open_sealed_file(path: impl Into<PathBuf>) -> Result<Self, EventStoreError> {
let path = path.into();
let base = path
.parent()
.and_then(Path::parent)
.map_or_else(PathBuf::new, Path::to_path_buf);
Self::open_sealed_path(base, path)
}
fn open_sealed_path(base: PathBuf, path: PathBuf) -> Result<Self, EventStoreError> {
if !path.exists() {
return Err(EventStoreError::Backend(format!(
"no run file at {}",
path.display()
)));
}
let db = ReadOnlyDatabase::open(&path).map_err(map_read_only_database_err)?;
let manifest = Self::read_manifest(&db)?.ok_or_else(|| {
EventStoreError::Corrupted(format!(
"missing manifest in run file at {}",
path.display()
))
})?;
if !manifest.is_sealed() {
return Err(EventStoreError::Backend(format!(
"run file at {} is not sealed, status was {:?}",
path.display(),
manifest.status,
)));
}
let (high_watermark, max_ts_init) = Self::compute_progress(&db)?;
Ok(Self {
base_dir: base,
state: Some(RunState {
db: RunDatabase::ReadOnly(db),
manifest,
high_watermark,
max_ts_init,
file_path: path,
}),
})
}
pub fn list_runs(
base_dir: &Path,
instance_id: &str,
) -> Result<Vec<RunManifest>, EventStoreError> {
let dir = base_dir.join(instance_id);
let entries = match fs::read_dir(&dir) {
Ok(it) => it,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => {
return Err(EventStoreError::Backend(format!(
"read_dir {}: {e}",
dir.display()
)));
}
};
let mut manifests = Vec::new();
for entry in entries {
let entry = entry.map_err(|e| {
EventStoreError::Backend(format!("read_dir entry in {}: {e}", dir.display()))
})?;
let path = entry.path();
if !is_run_file(&path) {
continue;
}
let db = ReadOnlyDatabase::open(&path).map_err(map_read_only_database_err)?;
let manifest = Self::read_manifest(&db)?.ok_or_else(|| {
EventStoreError::Corrupted(format!(
"missing manifest in run file at {}",
path.display()
))
})?;
manifests.push(manifest);
}
manifests.sort_by_key(|m| m.start_ts_init);
Ok(manifests)
}
fn state(&self) -> Result<&RunState, EventStoreError> {
self.state
.as_ref()
.ok_or_else(|| EventStoreError::Backend("no run open".to_string()))
}
fn state_mut(&mut self) -> Result<&mut RunState, EventStoreError> {
self.state
.as_mut()
.ok_or_else(|| EventStoreError::Backend("no run open".to_string()))
}
fn initialize_fresh(db: &Database, manifest: &RunManifest) -> Result<(), EventStoreError> {
let txn = begin_immediate_write(db)?;
{
txn.open_table(ENTRIES_TABLE).map_err(map_table_err)?;
txn.open_table(CLIENT_ORDER_INDEX).map_err(map_table_err)?;
txn.open_table(VENUE_ORDER_INDEX).map_err(map_table_err)?;
txn.open_table(SNAPSHOT_ANCHOR_TABLE)
.map_err(map_table_err)?;
}
insert_run_manifest(&txn, manifest)?;
txn.commit().map_err(map_commit_err)?;
Ok(())
}
fn write_manifest(db: &Database, manifest: &RunManifest) -> Result<(), EventStoreError> {
let txn = begin_immediate_write(db)?;
insert_run_manifest(&txn, manifest)?;
txn.commit().map_err(map_commit_err)?;
Ok(())
}
fn read_manifest<D: ReadableDatabase + ?Sized>(
db: &D,
) -> Result<Option<RunManifest>, EventStoreError> {
let txn = db.begin_read().map_err(map_transaction_err)?;
let table = txn.open_table(MANIFEST_TABLE).map_err(map_table_err)?;
let Some(value) = table.get(MANIFEST_KEY).map_err(map_storage_err)? else {
return Ok(None);
};
let bytes = value.value();
let (manifest, _) =
bincode::serde::decode_from_slice::<RunManifest, _>(bytes, BINCODE_CONFIG)
.map_err(|e| EventStoreError::Corrupted(format!("decode manifest: {e}")))?;
Ok(Some(manifest))
}
fn read_snapshot_anchor<D: ReadableDatabase + ?Sized>(
db: &D,
) -> Result<Option<SnapshotAnchor>, EventStoreError> {
let txn = db.begin_read().map_err(map_transaction_err)?;
let table = match txn.open_table(SNAPSHOT_ANCHOR_TABLE) {
Ok(table) => table,
Err(TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => return Err(map_table_err(e)),
};
let Some(value) = table.get(SNAPSHOT_ANCHOR_KEY).map_err(map_storage_err)? else {
return Ok(None);
};
let bytes = value.value();
let (anchor, _) =
bincode::serde::decode_from_slice::<SnapshotAnchor, _>(bytes, BINCODE_CONFIG)
.map_err(|e| EventStoreError::Corrupted(format!("decode snapshot anchor: {e}")))?;
Ok(Some(anchor))
}
fn compute_progress<D: ReadableDatabase + ?Sized>(
db: &D,
) -> Result<(u64, UnixNanos), EventStoreError> {
let txn = db.begin_read().map_err(map_transaction_err)?;
let table = txn.open_table(ENTRIES_TABLE).map_err(map_table_err)?;
let Some((last_key, _)) = table.last().map_err(map_storage_err)? else {
return Ok((0, UnixNanos::default()));
};
let high_watermark = last_key.value();
let mut max_ts = UnixNanos::default();
let iter = table.iter().map_err(map_storage_err)?;
for row in iter {
let (_, value) = row.map_err(map_storage_err)?;
let bytes = value.value();
let (entry, _) =
bincode::serde::decode_from_slice::<EventStoreEntry, _>(bytes, BINCODE_CONFIG)
.map_err(|e| {
EventStoreError::Corrupted(format!("decode entry on load: {e}"))
})?;
if entry.ts_init > max_ts {
max_ts = entry.ts_init;
}
}
Ok((high_watermark, max_ts))
}
}
impl EventStore for RedbBackend {
fn open_run(&mut self, mut manifest: RunManifest) -> Result<(), EventStoreError> {
if let Some(state) = &self.state {
if matches!(state.db, RunDatabase::ReadOnly(_)) {
return Err(EventStoreError::Closed);
}
if !state.manifest.is_sealed() {
return Err(EventStoreError::CrashedPredecessor);
}
}
let dir = self.run_dir(&manifest.instance_id);
fs::create_dir_all(&dir).map_err(|e| {
let msg = format!("create dir {}: {e}", dir.display());
if is_disk_pressure(e.kind()) {
EventStoreError::Disk(msg)
} else {
EventStoreError::Backend(msg)
}
})?;
let path = self.run_path(&manifest.instance_id, &manifest.run_id);
let path_existed = path.exists();
let db = Database::create(&path).map_err(map_database_err)?;
if path_existed {
let on_disk = Self::read_manifest(&db)?.ok_or_else(|| {
EventStoreError::Corrupted(format!(
"missing manifest in existing run file at {}",
path.display()
))
})?;
if !matches!(on_disk.status, RunStatus::Running) {
return Err(EventStoreError::Backend(format!(
"run file at {} already sealed, status was {:?}",
path.display(),
on_disk.status
)));
}
let (high_watermark, max_ts_init) = Self::compute_progress(&db)?;
let mut recovered = on_disk;
recovered.high_watermark = high_watermark;
self.state = Some(RunState {
db: RunDatabase::ReadWrite(db),
manifest: recovered,
high_watermark,
max_ts_init,
file_path: path,
});
return Err(EventStoreError::CrashedPredecessor);
}
manifest.status = RunStatus::Running;
manifest.end_ts_init = None;
manifest.high_watermark = 0;
Self::initialize_fresh(&db, &manifest)?;
self.state = Some(RunState {
db: RunDatabase::ReadWrite(db),
manifest,
high_watermark: 0,
max_ts_init: UnixNanos::default(),
file_path: path,
});
Ok(())
}
fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
let state = self.state_mut()?;
if state.manifest.is_sealed() {
return Err(EventStoreError::Closed);
}
if entries.is_empty() {
return Ok(state.high_watermark);
}
for (expected, append) in (state.high_watermark + 1..).zip(entries.iter()) {
if append.entry.seq != expected {
return Err(EventStoreError::OutOfOrder {
high_watermark: state.high_watermark,
seq: append.entry.seq,
});
}
}
let encoded: Vec<Vec<u8>> = entries
.iter()
.map(|append| {
bincode::serde::encode_to_vec(&append.entry, BINCODE_CONFIG).map_err(|e| {
EventStoreError::Backend(format!("encode entry seq={}: {e}", append.entry.seq))
})
})
.collect::<Result<_, _>>()?;
let db = state.db.read_write()?;
let txn = begin_immediate_write(db)?;
{
let mut entries_table = txn.open_table(ENTRIES_TABLE).map_err(map_table_err)?;
let mut client_table = txn.open_table(CLIENT_ORDER_INDEX).map_err(map_table_err)?;
let mut venue_table = txn.open_table(VENUE_ORDER_INDEX).map_err(map_table_err)?;
for (append, bytes) in entries.iter().zip(encoded.iter()) {
entries_table
.insert(append.entry.seq, bytes.as_slice())
.map_err(map_storage_err)?;
for IndexKey { kind, key } in &append.index_keys {
let table = match kind {
IndexKind::ClientOrderId => &mut client_table,
IndexKind::VenueOrderId => &mut venue_table,
};
let already = table.get(key.as_str()).map_err(map_storage_err)?.is_some();
if !already {
table
.insert(key.as_str(), append.entry.seq)
.map_err(map_storage_err)?;
}
}
}
}
txn.commit().map_err(map_commit_err)?;
let mut max_ts = state.max_ts_init;
let mut new_hwm = state.high_watermark;
for append in entries {
if append.entry.ts_init > max_ts {
max_ts = append.entry.ts_init;
}
new_hwm = append.entry.seq;
}
state.high_watermark = new_hwm;
state.max_ts_init = max_ts;
state.manifest.high_watermark = new_hwm;
Ok(new_hwm)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
let state = self.state()?;
if from > to || from == 0 || state.high_watermark == 0 {
return Ok(Vec::new());
}
let lo = from;
let hi = to.min(state.high_watermark);
if lo > hi {
return Ok(Vec::new());
}
let txn = state.db.begin_read()?;
let table = txn.open_table(ENTRIES_TABLE).map_err(map_table_err)?;
let mut out = Vec::new();
let mut expected = lo;
let iter = table.range(lo..=hi).map_err(map_storage_err)?;
for row in iter {
let (k, v) = row.map_err(map_storage_err)?;
let seq = k.value();
if seq != expected {
return Err(EventStoreError::Gap {
prev: expected.saturating_sub(1),
next: seq,
missing: expected,
});
}
let bytes = v.value();
let (entry, _) =
bincode::serde::decode_from_slice::<EventStoreEntry, _>(bytes, BINCODE_CONFIG)
.map_err(|e| {
EventStoreError::Corrupted(format!("decode entry seq={seq}: {e}"))
})?;
if entry.recompute_hash() != entry.entry_hash {
return Err(EventStoreError::HashMismatch { seq });
}
out.push(entry);
expected = seq + 1;
}
if expected <= hi {
return Err(EventStoreError::Gap {
prev: expected.saturating_sub(1),
next: hi + 1,
missing: expected,
});
}
if matches!(direction, ScanDirection::Reverse) {
out.reverse();
}
Ok(out)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
let state = self.state()?;
if seq == 0 || seq > state.high_watermark {
return Ok(None);
}
let txn = state.db.begin_read()?;
let table = txn.open_table(ENTRIES_TABLE).map_err(map_table_err)?;
let Some(value) = table.get(seq).map_err(map_storage_err)? else {
return Err(EventStoreError::Gap {
prev: seq.saturating_sub(1),
next: seq + 1,
missing: seq,
});
};
let bytes = value.value();
let (entry, _) =
bincode::serde::decode_from_slice::<EventStoreEntry, _>(bytes, BINCODE_CONFIG)
.map_err(|e| EventStoreError::Corrupted(format!("decode entry seq={seq}: {e}")))?;
if entry.recompute_hash() != entry.entry_hash {
return Err(EventStoreError::HashMismatch { seq });
}
Ok(Some(entry))
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
let state = self.state()?;
let txn = state.db.begin_read()?;
let definition = match kind {
IndexKind::ClientOrderId => CLIENT_ORDER_INDEX,
IndexKind::VenueOrderId => VENUE_ORDER_INDEX,
};
let table = txn.open_table(definition).map_err(map_table_err)?;
let value = table.get(key).map_err(map_storage_err)?;
Ok(value.map(|v| v.value()))
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
let state = self.state()?;
let txn = state.db.begin_read()?;
let definition = match kind {
IndexKind::ClientOrderId => CLIENT_ORDER_INDEX,
IndexKind::VenueOrderId => VENUE_ORDER_INDEX,
};
let table = txn.open_table(definition).map_err(map_table_err)?;
let iter = table.iter().map_err(map_storage_err)?;
let mut out = Vec::new();
for row in iter {
let (k, v) = row.map_err(map_storage_err)?;
out.push((k.value().to_string(), v.value()));
}
Ok(out)
}
fn record_snapshot_anchor(&mut self, anchor: SnapshotAnchor) -> Result<(), EventStoreError> {
let state = self.state_mut()?;
if state.manifest.is_sealed() {
return Err(EventStoreError::Closed);
}
let latest = Self::read_snapshot_anchor(state.db.readable())?;
validate_new_anchor(&anchor, state.high_watermark, latest.as_ref())?;
let bytes = bincode::serde::encode_to_vec(&anchor, BINCODE_CONFIG)
.map_err(|e| EventStoreError::Backend(format!("encode snapshot anchor: {e}")))?;
let db = state.db.read_write()?;
let txn = begin_immediate_write(db)?;
{
let mut table = txn
.open_table(SNAPSHOT_ANCHOR_TABLE)
.map_err(map_table_err)?;
table
.insert(SNAPSHOT_ANCHOR_KEY, bytes.as_slice())
.map_err(map_storage_err)?;
}
txn.commit().map_err(map_commit_err)?;
Ok(())
}
fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
Self::read_snapshot_anchor(self.state()?.db.readable())
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
let state = self.state_mut()?;
if matches!(status, RunStatus::Running) {
return Err(EventStoreError::Backend(
"seal status must be a terminal state, was Running".to_string(),
));
}
if state.manifest.is_sealed() {
return Err(EventStoreError::Closed);
}
let mut updated = state.manifest.clone();
updated.status = status;
updated.high_watermark = state.high_watermark;
if state.high_watermark > 0 {
updated.end_ts_init = Some(state.max_ts_init);
}
Self::write_manifest(state.db.read_write()?, &updated)?;
state.manifest = updated;
Ok(())
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
Ok(self.state()?.manifest.clone())
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
Ok(self.state()?.high_watermark)
}
}
fn begin_immediate_write(db: &Database) -> Result<WriteTransaction, EventStoreError> {
let mut txn = db.begin_write().map_err(map_transaction_err)?;
txn.set_durability(Durability::Immediate)
.map_err(|e| EventStoreError::Backend(format!("set durability: {e}")))?;
Ok(txn)
}
fn insert_run_manifest(
txn: &WriteTransaction,
manifest: &RunManifest,
) -> Result<(), EventStoreError> {
let bytes = bincode::serde::encode_to_vec(manifest, BINCODE_CONFIG)
.map_err(|e| EventStoreError::Backend(format!("encode manifest: {e}")))?;
let mut table = txn.open_table(MANIFEST_TABLE).map_err(map_table_err)?;
table
.insert(MANIFEST_KEY, bytes.as_slice())
.map_err(map_storage_err)?;
Ok(())
}
fn map_storage_err(err: StorageError) -> EventStoreError {
match err {
StorageError::Io(io_err) if is_disk_pressure(io_err.kind()) => {
EventStoreError::Disk(io_err.to_string())
}
StorageError::Corrupted(msg) => EventStoreError::Corrupted(msg),
other => EventStoreError::Backend(other.to_string()),
}
}
fn is_disk_pressure(kind: ErrorKind) -> bool {
matches!(
kind,
ErrorKind::FileTooLarge | ErrorKind::StorageFull | ErrorKind::QuotaExceeded
)
}
fn map_database_err(err: DatabaseError) -> EventStoreError {
match err {
DatabaseError::RepairAborted => EventStoreError::Corrupted(
"database requires repair and cannot be verified read-only".to_string(),
),
DatabaseError::UpgradeRequired(version) => EventStoreError::Corrupted(format!(
"database file format version {version} requires manual upgrade",
)),
DatabaseError::Storage(storage) => map_storage_err(storage),
other => EventStoreError::Backend(other.to_string()),
}
}
fn map_read_only_database_err(err: DatabaseError) -> EventStoreError {
match err {
DatabaseError::Storage(StorageError::Io(io_err)) if is_corrupt_read(io_err.kind()) => {
EventStoreError::Corrupted(format!("read-only open failed: {io_err}"))
}
other => map_database_err(other),
}
}
fn is_corrupt_read(kind: ErrorKind) -> bool {
matches!(kind, ErrorKind::UnexpectedEof | ErrorKind::InvalidData)
}
fn map_table_err(err: TableError) -> EventStoreError {
match err {
TableError::Storage(storage) => map_storage_err(storage),
TableError::TableDoesNotExist(_)
| TableError::TableTypeMismatch { .. }
| TableError::TableIsMultimap(_)
| TableError::TableIsNotMultimap(_)
| TableError::TypeDefinitionChanged { .. } => EventStoreError::Corrupted(err.to_string()),
other => EventStoreError::Backend(other.to_string()),
}
}
fn map_commit_err(err: CommitError) -> EventStoreError {
match err {
CommitError::Storage(storage) => map_storage_err(storage),
other => EventStoreError::Backend(other.to_string()),
}
}
fn is_run_file(path: &Path) -> bool {
path.extension().and_then(|s| s.to_str()) == Some("redb")
&& path
.file_name()
.and_then(|s| s.to_str())
.is_none_or(|name| !name.ends_with(".markers.redb"))
}
fn map_transaction_err(err: TransactionError) -> EventStoreError {
match err {
TransactionError::Storage(storage) => map_storage_err(storage),
other => EventStoreError::Backend(other.to_string()),
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
#[rstest]
#[case::file_too_large(ErrorKind::FileTooLarge, true)]
#[case::storage_full(ErrorKind::StorageFull, true)]
#[case::quota_exceeded(ErrorKind::QuotaExceeded, true)]
#[case::other(ErrorKind::Other, false)]
#[case::not_found(ErrorKind::NotFound, false)]
#[case::permission_denied(ErrorKind::PermissionDenied, false)]
#[case::interrupted(ErrorKind::Interrupted, false)]
fn is_disk_pressure_matches_documented_kinds(#[case] kind: ErrorKind, #[case] expected: bool) {
assert_eq!(is_disk_pressure(kind), expected);
}
#[rstest]
fn map_storage_err_classifies_disk_pressure_as_disk() {
let io_err = std::io::Error::from(ErrorKind::StorageFull);
let mapped = map_storage_err(StorageError::Io(io_err));
match mapped {
EventStoreError::Disk(_) => {}
other => panic!("expected Disk, was {other:?}"),
}
}
#[rstest]
fn map_storage_err_classifies_corrupted_as_corrupted() {
let mapped = map_storage_err(StorageError::Corrupted("boom".to_string()));
match mapped {
EventStoreError::Corrupted(msg) => assert!(msg.contains("boom")),
other => panic!("expected Corrupted, was {other:?}"),
}
}
#[rstest]
fn map_storage_err_falls_back_to_backend_for_unrelated_io() {
let io_err = std::io::Error::from(ErrorKind::PermissionDenied);
let mapped = map_storage_err(StorageError::Io(io_err));
match mapped {
EventStoreError::Backend(_) => {}
other => panic!("expected Backend, was {other:?}"),
}
}
}