mod batcher;
pub mod halt;
use std::time::Duration;
use bytes::Bytes;
pub use halt::{HaltCallback, HaltReason, noop_halt};
use nautilus_core::UnixNanos;
use crate::{
backend::IndexKey,
entry::{PayloadType, Topic},
headers::Headers,
snapshot::SnapshotAnchor,
};
pub const DEFAULT_CHANNEL_CAPACITY: usize = 10_000;
pub const DEFAULT_MAX_BATCH_ENTRIES: usize = 100;
pub const DEFAULT_MAX_BATCH_LATENCY: Duration = Duration::from_millis(5);
pub const DEFAULT_HALT_THRESHOLD: Duration = Duration::from_millis(250);
#[derive(Clone, Debug)]
pub struct WriterConfig {
pub channel_capacity: usize,
pub max_batch_entries: usize,
pub max_batch_latency: Duration,
pub halt_threshold: Duration,
}
impl Default for WriterConfig {
fn default() -> Self {
Self {
channel_capacity: DEFAULT_CHANNEL_CAPACITY,
max_batch_entries: DEFAULT_MAX_BATCH_ENTRIES,
max_batch_latency: DEFAULT_MAX_BATCH_LATENCY,
halt_threshold: DEFAULT_HALT_THRESHOLD,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct EntryDraft {
pub headers: Headers,
pub topic: Topic,
pub payload_type: PayloadType,
pub payload: Bytes,
pub ts_init: UnixNanos,
pub index_keys: Vec<IndexKey>,
}
impl EntryDraft {
#[must_use]
pub const fn without_indices(
headers: Headers,
topic: Topic,
payload_type: PayloadType,
payload: Bytes,
ts_init: UnixNanos,
) -> Self {
Self {
headers,
topic,
payload_type,
payload,
ts_init,
index_keys: Vec::new(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum SubmitError {
#[error("writer is closed")]
Closed,
#[error("submit stalled for {stalled_for:?}, halt threshold {threshold:?}")]
HaltSignaled {
stalled_for: Duration,
threshold: Duration,
},
}
#[cfg(not(madsim))]
mod imp {
use std::{
fmt::Debug,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{self, RecvTimeoutError, SyncSender, TrySendError},
},
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use nautilus_core::time::AtomicTime;
use super::{
EntryDraft, SnapshotAnchor, SubmitError, WriterConfig,
batcher::{self, WriterMessage},
halt::{HaltCallback, HaltReason},
};
use crate::{backend::EventStore, error::EventStoreError};
const WRITER_THREAD_NAME: &str = "event-store-writer";
const SUBMIT_RETRY_INTERVAL: Duration = Duration::from_micros(100);
pub struct EventStoreWriter {
tx: Option<SyncSender<WriterMessage>>,
handle: Option<JoinHandle<()>>,
high_watermark: Arc<AtomicU64>,
halt: HaltCallback,
halt_threshold: Duration,
halted: AtomicBool,
clock: &'static AtomicTime,
}
impl Debug for EventStoreWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(EventStoreWriter))
.field(
"high_watermark",
&self.high_watermark.load(Ordering::Acquire),
)
.field("halt_threshold", &self.halt_threshold)
.field("halted", &self.halted.load(Ordering::Acquire))
.field("tx_attached", &self.tx.is_some())
.finish_non_exhaustive()
}
}
impl EventStoreWriter {
#[allow(clippy::needless_pass_by_value)] pub fn spawn(
backend: Box<dyn EventStore + Send>,
clock: &'static AtomicTime,
halt: HaltCallback,
config: WriterConfig,
) -> Result<Self, EventStoreError> {
let initial_hwm = backend.high_watermark()?;
let high_watermark = Arc::new(AtomicU64::new(initial_hwm));
let (tx, rx) = mpsc::sync_channel::<WriterMessage>(config.channel_capacity);
let watermark_for_thread = Arc::clone(&high_watermark);
let halt_for_thread = Arc::clone(&halt);
let halt_threshold = config.halt_threshold;
let config_for_thread = config;
let handle = thread::Builder::new()
.name(WRITER_THREAD_NAME.to_string())
.spawn(move || {
batcher::run(
backend,
rx,
config_for_thread,
halt_for_thread,
watermark_for_thread,
clock,
);
})
.map_err(|e| EventStoreError::Backend(format!("spawn writer thread: {e}")))?;
Ok(Self {
tx: Some(tx),
handle: Some(handle),
high_watermark,
halt,
halt_threshold,
halted: AtomicBool::new(false),
clock,
})
}
pub fn submit(&self, draft: EntryDraft) -> Result<(), SubmitError> {
if self.halted.load(Ordering::Acquire) {
return Err(SubmitError::Closed);
}
let tx = self.tx.as_ref().ok_or(SubmitError::Closed)?;
let ts_publish = self.clock.get_time_ns();
let pending = WriterMessage::Entry { draft, ts_publish };
let start = Instant::now();
match self.enqueue_with_backpressure(tx, pending, start) {
Ok(()) => Ok(()),
Err(EnqueueFailure::Stalled(elapsed)) => Err(SubmitError::HaltSignaled {
stalled_for: elapsed,
threshold: self.halt_threshold,
}),
Err(EnqueueFailure::Closed) => Err(SubmitError::Closed),
}
}
#[must_use]
pub fn high_watermark(&self) -> u64 {
self.high_watermark.load(Ordering::Acquire)
}
pub fn record_snapshot_anchor(
&self,
blob_ref: impl Into<String>,
content_hash: impl Into<String>,
) -> Result<SnapshotAnchor, EventStoreError> {
if self.halted.load(Ordering::Acquire) {
return Err(EventStoreError::Closed);
}
let tx = self.tx.as_ref().ok_or(EventStoreError::Closed)?;
let (ack_tx, ack_rx) = mpsc::sync_channel::<Result<SnapshotAnchor, EventStoreError>>(1);
let pending = WriterMessage::RecordSnapshotAnchor {
blob_ref: blob_ref.into(),
content_hash: content_hash.into(),
ack: ack_tx,
};
let start = Instant::now();
if let Err(e) = self.enqueue_with_backpressure(tx, pending, start) {
match e {
EnqueueFailure::Stalled(elapsed) => {
return Err(EventStoreError::Backend(format!(
"snapshot anchor submit stalled for {elapsed:?}, halt threshold {:?}",
self.halt_threshold,
)));
}
EnqueueFailure::Closed => return Err(EventStoreError::Closed),
}
}
match ack_rx.recv_timeout(self.halt_threshold) {
Ok(result) => result,
Err(RecvTimeoutError::Timeout) => {
let elapsed = start.elapsed();
self.signal_backpressure_stall(elapsed);
Err(EventStoreError::Backend(format!(
"snapshot anchor ack stalled for {elapsed:?}, halt threshold {:?}",
self.halt_threshold,
)))
}
Err(RecvTimeoutError::Disconnected) => Err(EventStoreError::Backend(
"snapshot anchor ack channel disconnected".to_string(),
)),
}
}
fn enqueue_with_backpressure(
&self,
tx: &SyncSender<WriterMessage>,
mut pending: WriterMessage,
start: Instant,
) -> Result<(), EnqueueFailure> {
loop {
let elapsed = start.elapsed();
if elapsed >= self.halt_threshold {
self.signal_backpressure_stall(elapsed);
return Err(EnqueueFailure::Stalled(elapsed));
}
match tx.try_send(pending) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(returned)) => {
pending = returned;
thread::sleep(SUBMIT_RETRY_INTERVAL);
}
Err(TrySendError::Disconnected(_)) => return Err(EnqueueFailure::Closed),
}
}
}
fn signal_backpressure_stall(&self, stalled_for: Duration) {
self.halted.store(true, Ordering::Release);
(self.halt)(HaltReason::BackpressureStall {
stalled_for,
threshold: self.halt_threshold,
});
}
pub fn close(mut self, run_ended: EntryDraft) -> Result<u64, EventStoreError> {
let tx = self
.tx
.take()
.ok_or_else(|| EventStoreError::Backend("writer already closed".to_string()))?;
let (ack_tx, ack_rx) = mpsc::sync_channel::<Result<u64, EventStoreError>>(1);
tx.send(WriterMessage::Close {
run_ended,
ack: ack_tx,
})
.map_err(|_| EventStoreError::Backend("writer thread disconnected".to_string()))?;
drop(tx);
let result = ack_rx.recv().map_err(|_| {
EventStoreError::Backend("writer ack channel disconnected".to_string())
})?;
if let Some(handle) = self.handle.take() {
handle
.join()
.map_err(|_| EventStoreError::Backend("writer thread panicked".to_string()))?;
}
result
}
}
enum EnqueueFailure {
Stalled(Duration),
Closed,
}
impl Drop for EventStoreWriter {
fn drop(&mut self) {
self.tx.take();
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
}
#[cfg(madsim)]
mod imp {
use std::{
fmt::Debug,
sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
},
};
use nautilus_core::time::AtomicTime;
use super::{
EntryDraft, SnapshotAnchor, SubmitError, WriterConfig, batcher,
halt::{HaltCallback, HaltReason},
};
use crate::{backend::EventStore, error::EventStoreError, manifest::RunStatus};
pub struct EventStoreWriter {
inner: Mutex<Inner>,
high_watermark: Arc<AtomicU64>,
halt: HaltCallback,
clock: &'static AtomicTime,
}
impl Debug for EventStoreWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(EventStoreWriter))
.field(
"high_watermark",
&self.high_watermark.load(Ordering::Acquire),
)
.finish_non_exhaustive()
}
}
struct Inner {
backend: Box<dyn EventStore + Send>,
next_seq: u64,
closed: bool,
}
impl EventStoreWriter {
#[allow(clippy::needless_pass_by_value)] pub fn spawn(
backend: Box<dyn EventStore + Send>,
clock: &'static AtomicTime,
halt: HaltCallback,
_config: WriterConfig,
) -> Result<Self, EventStoreError> {
let initial_hwm = backend.high_watermark()?;
let high_watermark = Arc::new(AtomicU64::new(initial_hwm));
let inner = Inner {
backend,
next_seq: initial_hwm + 1,
closed: false,
};
Ok(Self {
inner: Mutex::new(inner),
high_watermark,
halt,
clock,
})
}
pub fn submit(&self, draft: EntryDraft) -> Result<(), SubmitError> {
let mut inner = self.inner.lock().expect("writer mutex poisoned");
if inner.closed {
return Err(SubmitError::Closed);
}
let ts_publish = self.clock.get_time_ns();
let seq = inner.next_seq;
let append = batcher::build_append_entry(draft, ts_publish, seq);
match inner.backend.append_batch(std::slice::from_ref(&append)) {
Ok(new_hwm) => {
inner.next_seq = seq + 1;
self.high_watermark.store(new_hwm, Ordering::Release);
Ok(())
}
Err(e) => {
(self.halt)(HaltReason::from_backend_error(&e));
inner.closed = true;
Err(SubmitError::Closed)
}
}
}
#[must_use]
pub fn high_watermark(&self) -> u64 {
self.high_watermark.load(Ordering::Acquire)
}
pub fn record_snapshot_anchor(
&self,
blob_ref: impl Into<String>,
content_hash: impl Into<String>,
) -> Result<SnapshotAnchor, EventStoreError> {
let mut inner = self.inner.lock().expect("writer mutex poisoned");
if inner.closed {
return Err(EventStoreError::Closed);
}
let anchor = SnapshotAnchor::new(
self.high_watermark.load(Ordering::Acquire),
blob_ref,
content_hash,
);
match inner.backend.record_snapshot_anchor(anchor.clone()) {
Ok(()) => Ok(anchor),
Err(e) => {
(self.halt)(HaltReason::from_backend_error(&e));
inner.closed = true;
Err(e)
}
}
}
pub fn close(self, run_ended: EntryDraft) -> Result<u64, EventStoreError> {
let mut inner = self.inner.lock().expect("writer mutex poisoned");
if inner.closed {
return Err(EventStoreError::Backend(
"writer already closed".to_string(),
));
}
let ts_publish = self.clock.get_time_ns();
let seq = inner.next_seq;
let append = batcher::build_append_entry(run_ended, ts_publish, seq);
match inner.backend.append_batch(std::slice::from_ref(&append)) {
Ok(new_hwm) => {
inner.next_seq = seq + 1;
self.high_watermark.store(new_hwm, Ordering::Release);
}
Err(e) => {
(self.halt)(HaltReason::from_backend_error(&e));
inner.closed = true;
return Err(e);
}
}
match inner.backend.seal(RunStatus::Ended) {
Ok(()) => {
inner.closed = true;
Ok(self.high_watermark.load(Ordering::Acquire))
}
Err(e) => {
(self.halt)(HaltReason::from_backend_error(&e));
inner.closed = true;
Err(e)
}
}
}
}
}
pub use imp::EventStoreWriter;
#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
use std::sync::{
Arc, Mutex,
atomic::{AtomicUsize, Ordering},
};
use bytes::Bytes;
use indexmap::IndexMap;
use nautilus_core::{UnixNanos, time::get_atomic_clock_static};
use rstest::{fixture, rstest};
use ustr::Ustr;
use super::*;
use crate::{
backend::{AppendEntry, EventStore, IndexKind, MemoryBackend, ScanDirection},
entry::EventStoreEntry,
error::EventStoreError,
manifest::{RegisteredComponents, RunManifest, RunStatus},
};
fn manifest(run_id: &str) -> RunManifest {
RunManifest {
run_id: run_id.to_string(),
parent_run_id: None,
instance_id: "trader-001".to_string(),
binary_hash: "deadbeef".to_string(),
schema_version: 1,
crate_versions: "feedface".to_string(),
feature_flags: Vec::new(),
adapter_versions: IndexMap::new(),
config_hash: "cafebabe".to_string(),
registered_components: RegisteredComponents::default(),
seed: None,
start_ts_init: UnixNanos::from(0),
end_ts_init: None,
high_watermark: 0,
status: RunStatus::Running,
}
}
fn entry_draft(ts_init: u64) -> EntryDraft {
EntryDraft {
headers: Headers::empty(),
topic: "exec.command.SubmitOrder".into(),
payload_type: Ustr::from("SubmitOrder"),
payload: Bytes::from_static(b"\x01\x02\x03\x04"),
ts_init: UnixNanos::from(ts_init),
index_keys: Vec::new(),
}
}
fn run_ended_draft() -> EntryDraft {
EntryDraft {
headers: Headers::empty(),
topic: "run.lifecycle.RunEnded".into(),
payload_type: Ustr::from("RunEnded"),
payload: Bytes::new(),
ts_init: UnixNanos::from(9_999),
index_keys: Vec::new(),
}
}
#[derive(Debug)]
struct SharedMemory(Arc<Mutex<MemoryBackend>>);
impl SharedMemory {
fn new() -> (Self, Arc<Mutex<MemoryBackend>>) {
let arc = Arc::new(Mutex::new(MemoryBackend::new()));
(Self(Arc::clone(&arc)), arc)
}
}
impl EventStore for SharedMemory {
fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
unreachable!("test wrapper does not forward open_run")
}
fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.append_batch(entries)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.scan_range(from, to, direction)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
self.0.lock().expect("shared memory poisoned").scan_seq(seq)
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.lookup(kind, key)
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.iter_index_keys(kind)
}
fn record_snapshot_anchor(
&mut self,
anchor: SnapshotAnchor,
) -> Result<(), EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.record_snapshot_anchor(anchor)
}
fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.latest_snapshot_anchor()
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
self.0.lock().expect("shared memory poisoned").seal(status)
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
self.0.lock().expect("shared memory poisoned").manifest()
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.high_watermark()
}
}
#[derive(Debug)]
struct BlockingBackend {
inner: Arc<Mutex<MemoryBackend>>,
gate: Arc<(Mutex<bool>, std::sync::Condvar)>,
appends_seen: Arc<AtomicUsize>,
}
impl BlockingBackend {
fn new(
inner: Arc<Mutex<MemoryBackend>>,
gate: Arc<(Mutex<bool>, std::sync::Condvar)>,
appends_seen: Arc<AtomicUsize>,
) -> Self {
Self {
inner,
gate,
appends_seen,
}
}
}
impl EventStore for BlockingBackend {
fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
unreachable!("test wrapper does not forward open_run")
}
fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.appends_seen.fetch_add(1, Ordering::SeqCst);
let (lock, cvar) = &*self.gate;
let mut released = lock.lock().expect("gate poisoned");
while !*released {
released = cvar.wait(released).expect("gate wait");
}
self.inner
.lock()
.expect("inner poisoned")
.append_batch(entries)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
self.inner
.lock()
.expect("inner poisoned")
.scan_range(from, to, direction)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
self.inner.lock().expect("inner poisoned").scan_seq(seq)
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
self.inner.lock().expect("inner poisoned").lookup(kind, key)
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
self.inner
.lock()
.expect("inner poisoned")
.iter_index_keys(kind)
}
fn record_snapshot_anchor(
&mut self,
anchor: SnapshotAnchor,
) -> Result<(), EventStoreError> {
self.inner
.lock()
.expect("inner poisoned")
.record_snapshot_anchor(anchor)
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
self.inner.lock().expect("inner poisoned").seal(status)
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
self.inner.lock().expect("inner poisoned").manifest()
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
self.inner.lock().expect("inner poisoned").high_watermark()
}
}
#[derive(Debug, Default)]
struct DiskFailureBackend {
appends_seen: Arc<AtomicUsize>,
}
impl EventStore for DiskFailureBackend {
fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
Ok(())
}
fn append_batch(&mut self, _: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.appends_seen.fetch_add(1, Ordering::SeqCst);
Err(EventStoreError::Disk("ENOSPC".to_string()))
}
fn scan_range(
&self,
_: u64,
_: u64,
_: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
Ok(Vec::new())
}
fn scan_seq(&self, _: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
Ok(None)
}
fn lookup(&self, _: IndexKind, _: &str) -> Result<Option<u64>, EventStoreError> {
Ok(None)
}
fn iter_index_keys(&self, _: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
Ok(Vec::new())
}
fn seal(&mut self, _: RunStatus) -> Result<(), EventStoreError> {
Ok(())
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
Err(EventStoreError::Backend("disk failure".to_string()))
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
Ok(0)
}
}
#[fixture]
fn captured_halt() -> (HaltCallback, Arc<Mutex<Vec<HaltReason>>>) {
let captured: Arc<Mutex<Vec<HaltReason>>> = Arc::new(Mutex::new(Vec::new()));
let captured_for_cb = Arc::clone(&captured);
let halt: HaltCallback = Arc::new(move |reason| {
captured_for_cb
.lock()
.expect("captured halt poisoned")
.push(reason);
});
(halt, captured)
}
#[rstest]
fn submit_then_close_records_entries_and_seals(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let (wrapper, shared) = SharedMemory::new();
shared
.lock()
.expect("shared")
.open_run(manifest("run-1"))
.expect("open");
let writer = EventStoreWriter::spawn(
Box::new(wrapper),
get_atomic_clock_static(),
halt,
WriterConfig::default(),
)
.expect("spawn");
for ts in 10_u64..15_u64 {
writer.submit(entry_draft(ts)).expect("submit");
}
let final_hwm = writer.close(run_ended_draft()).expect("close");
assert_eq!(final_hwm, 6);
let backend = shared.lock().expect("shared");
let m = backend.manifest().expect("manifest");
assert_eq!(m.status, RunStatus::Ended);
assert_eq!(m.high_watermark, 6);
let last = backend.scan_seq(6).expect("scan").expect("present");
assert_eq!(last.payload_type.as_str(), "RunEnded");
assert!(captured.lock().expect("captured").is_empty());
}
#[rstest]
fn record_snapshot_anchor_flushes_pending_entries_and_replay_tail_starts_after_anchor(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let (wrapper, shared) = SharedMemory::new();
shared
.lock()
.expect("shared")
.open_run(manifest("run-anchor"))
.expect("open");
let writer = EventStoreWriter::spawn(
Box::new(wrapper),
get_atomic_clock_static(),
halt,
WriterConfig::default(),
)
.expect("spawn");
writer.submit(entry_draft(10)).expect("submit first");
writer.submit(entry_draft(11)).expect("submit second");
let anchor = writer
.record_snapshot_anchor("cache://position-snapshots/P-1/0", "blake3:abc")
.expect("record anchor");
assert_eq!(anchor.high_watermark, 2);
writer.submit(entry_draft(12)).expect("submit third");
writer.submit(entry_draft(13)).expect("submit fourth");
let final_hwm = writer.close(run_ended_draft()).expect("close");
let backend = shared.lock().expect("shared");
assert_eq!(
backend.latest_snapshot_anchor().expect("latest anchor"),
Some(anchor.clone()),
);
let tail_seqs: Vec<_> = backend
.scan_range(anchor.high_watermark + 1, final_hwm, ScanDirection::Forward)
.expect("scan tail")
.into_iter()
.map(|entry| entry.seq)
.collect();
assert_eq!(tail_seqs, vec![3, 4, 5]);
assert!(captured.lock().expect("captured").is_empty());
}
#[rstest]
fn batches_respect_max_entries_threshold(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, _) = captured_halt;
let inner = Arc::new(Mutex::new(MemoryBackend::new()));
inner
.lock()
.expect("inner")
.open_run(manifest("run-batch"))
.expect("open");
let appends_seen = Arc::new(AtomicUsize::new(0));
let gate = Arc::new((Mutex::new(true), std::sync::Condvar::new()));
let backend = BlockingBackend::new(
Arc::clone(&inner),
Arc::clone(&gate),
Arc::clone(&appends_seen),
);
let config = WriterConfig {
channel_capacity: 16,
max_batch_entries: 2,
max_batch_latency: Duration::from_secs(30),
halt_threshold: Duration::from_secs(30),
};
let clock = get_atomic_clock_static();
let boxed = Box::new(backend);
let writer = EventStoreWriter::spawn(boxed, clock, halt, config).expect("spawn");
for ts in 10_u64..16_u64 {
writer.submit(entry_draft(ts)).expect("submit");
}
let final_hwm = writer.close(run_ended_draft()).expect("close");
assert_eq!(final_hwm, 7);
assert_eq!(appends_seen.load(Ordering::SeqCst), 4);
}
#[rstest]
fn submit_signals_halt_when_stalled_past_threshold(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let inner = Arc::new(Mutex::new(MemoryBackend::new()));
inner
.lock()
.expect("inner")
.open_run(manifest("run-halt"))
.expect("open");
let gate = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let appends_seen = Arc::new(AtomicUsize::new(0));
let backend = BlockingBackend::new(
Arc::clone(&inner),
Arc::clone(&gate),
Arc::clone(&appends_seen),
);
let halt_threshold = Duration::from_millis(50);
let config = WriterConfig {
channel_capacity: 1,
max_batch_entries: 1,
max_batch_latency: Duration::from_millis(1),
halt_threshold,
};
let clock = get_atomic_clock_static();
let boxed = Box::new(backend);
let writer = EventStoreWriter::spawn(boxed, clock, halt, config).expect("spawn");
writer.submit(entry_draft(10)).expect("first submit fits");
std::thread::sleep(Duration::from_millis(20));
let _ = writer.submit(entry_draft(11));
let stalled = writer.submit(entry_draft(12)).expect_err("must stall");
match stalled {
SubmitError::HaltSignaled { .. } => {}
SubmitError::Closed => panic!("expected HaltSignaled, was Closed"),
}
let captured_reasons = captured.lock().expect("captured");
assert_eq!(
captured_reasons.len(),
1,
"halt callback must fire exactly once",
);
assert_backpressure_stall(captured_reasons.first(), halt_threshold);
drop(captured_reasons);
let post_halt = writer
.submit(entry_draft(13))
.expect_err("post-halt submit");
match post_halt {
SubmitError::Closed => {}
SubmitError::HaltSignaled { .. } => {
panic!("expected Closed after halt, was HaltSignaled")
}
}
assert_eq!(
captured.lock().expect("captured").len(),
1,
"halt callback must not refire after the first stall",
);
let (lock, cvar) = &*gate;
*lock.lock().expect("gate") = true;
cvar.notify_all();
}
#[rstest]
fn record_snapshot_anchor_signals_halt_when_ack_stalls(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let inner = Arc::new(Mutex::new(MemoryBackend::new()));
inner
.lock()
.expect("inner")
.open_run(manifest("run-anchor-halt"))
.expect("open");
let gate = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let appends_seen = Arc::new(AtomicUsize::new(0));
let backend = BlockingBackend::new(
Arc::clone(&inner),
Arc::clone(&gate),
Arc::clone(&appends_seen),
);
let halt_threshold = Duration::from_millis(50);
let writer = EventStoreWriter::spawn(
Box::new(backend),
get_atomic_clock_static(),
halt,
WriterConfig {
channel_capacity: 2,
max_batch_entries: 1,
max_batch_latency: Duration::from_millis(1),
halt_threshold,
},
)
.expect("spawn");
writer.submit(entry_draft(10)).expect("first submit fits");
std::thread::sleep(Duration::from_millis(20));
let err = writer
.record_snapshot_anchor("cache://position-snapshots/P-1/0", "blake3:abc")
.expect_err("snapshot anchor ack must time out");
let post_halt = writer
.submit(entry_draft(11))
.expect_err("post-halt submit");
let (lock, cvar) = &*gate;
*lock.lock().expect("gate") = true;
cvar.notify_all();
match err {
EventStoreError::Backend(msg) => {
assert!(
msg.contains("snapshot anchor ack stalled"),
"msg was: {msg}"
);
}
other => panic!("expected Backend, was {other:?}"),
}
match post_halt {
SubmitError::Closed => {}
SubmitError::HaltSignaled { .. } => {
panic!("expected Closed after anchor halt, was HaltSignaled")
}
}
let captured_reasons = captured.lock().expect("captured");
assert_eq!(
captured_reasons.len(),
1,
"halt callback must fire exactly once",
);
assert_backpressure_stall(captured_reasons.first(), halt_threshold);
}
#[rstest]
fn record_snapshot_anchor_signals_halt_when_submit_stalls(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let inner = Arc::new(Mutex::new(MemoryBackend::new()));
inner
.lock()
.expect("inner")
.open_run(manifest("run-anchor-submit-halt"))
.expect("open");
let gate = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let appends_seen = Arc::new(AtomicUsize::new(0));
let backend = BlockingBackend::new(
Arc::clone(&inner),
Arc::clone(&gate),
Arc::clone(&appends_seen),
);
let halt_threshold = Duration::from_millis(50);
let writer = EventStoreWriter::spawn(
Box::new(backend),
get_atomic_clock_static(),
halt,
WriterConfig {
channel_capacity: 1,
max_batch_entries: 1,
max_batch_latency: Duration::from_secs(30),
halt_threshold,
},
)
.expect("spawn");
writer.submit(entry_draft(10)).expect("first submit fits");
let mut waited = Duration::ZERO;
while appends_seen.load(Ordering::SeqCst) == 0 && waited < Duration::from_secs(1) {
std::thread::sleep(Duration::from_millis(2));
waited += Duration::from_millis(2);
}
assert_eq!(
appends_seen.load(Ordering::SeqCst),
1,
"writer must be blocked inside the first backend append",
);
writer.submit(entry_draft(11)).expect("second submit fits");
let err = writer
.record_snapshot_anchor("cache://position-snapshots/P-1/0", "blake3:abc")
.expect_err("snapshot anchor submit must time out");
let post_halt = writer
.submit(entry_draft(12))
.expect_err("post-halt submit");
let (lock, cvar) = &*gate;
*lock.lock().expect("gate") = true;
cvar.notify_all();
match err {
EventStoreError::Backend(msg) => {
assert!(
msg.contains("snapshot anchor submit stalled"),
"msg was: {msg}"
);
}
other => panic!("expected Backend, was {other:?}"),
}
match post_halt {
SubmitError::Closed => {}
SubmitError::HaltSignaled { .. } => {
panic!("expected Closed after anchor halt, was HaltSignaled")
}
}
let captured_reasons = captured.lock().expect("captured");
assert_eq!(
captured_reasons.len(),
1,
"halt callback must fire exactly once",
);
assert_backpressure_stall(captured_reasons.first(), halt_threshold);
}
#[rstest]
fn backend_disk_error_fires_halt_and_closes_writer(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let backend = DiskFailureBackend::default();
let writer = EventStoreWriter::spawn(
Box::new(backend),
get_atomic_clock_static(),
halt,
WriterConfig {
channel_capacity: 4,
max_batch_entries: 1,
max_batch_latency: Duration::from_millis(1),
halt_threshold: Duration::from_millis(500),
},
)
.expect("spawn");
writer
.submit(entry_draft(10))
.expect("first submit fits in channel before writer fail-stops");
let mut waited = Duration::ZERO;
let deadline = Duration::from_millis(500);
while captured.lock().expect("captured").is_empty() && waited < deadline {
std::thread::sleep(Duration::from_millis(10));
waited += Duration::from_millis(10);
}
let captured_reasons = captured.lock().expect("captured");
assert!(matches!(
captured_reasons.first(),
Some(HaltReason::BackendDisk(_))
));
drop(captured_reasons);
let mut closed_seen = false;
for _ in 0..50 {
match writer.submit(entry_draft(11)) {
Err(SubmitError::Closed) => {
closed_seen = true;
break;
}
_ => std::thread::sleep(Duration::from_millis(10)),
}
}
assert!(closed_seen, "submits must surface Closed after fail-stop");
let close_result = writer.close(run_ended_draft());
assert!(close_result.is_err());
}
#[rstest]
fn time_driven_flush_advances_watermark_before_close(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, _) = captured_halt;
let (wrapper, shared) = SharedMemory::new();
shared
.lock()
.expect("shared")
.open_run(manifest("run-time"))
.expect("open");
let writer = EventStoreWriter::spawn(
Box::new(wrapper),
get_atomic_clock_static(),
halt,
WriterConfig {
channel_capacity: 32,
max_batch_entries: 100,
max_batch_latency: Duration::from_millis(20),
halt_threshold: Duration::from_secs(30),
},
)
.expect("spawn");
writer.submit(entry_draft(10)).expect("submit");
let mut waited = Duration::ZERO;
while writer.high_watermark() == 0 && waited < Duration::from_millis(500) {
std::thread::sleep(Duration::from_millis(5));
waited += Duration::from_millis(5);
}
assert_eq!(
writer.high_watermark(),
1,
"latency window must commit a sub-batch entry before close",
);
let final_hwm = writer.close(run_ended_draft()).expect("close");
assert_eq!(final_hwm, 2);
}
#[rstest]
fn entry_draft_without_indices_constructor() {
let topic: crate::entry::Topic = "exec.command.SubmitOrder".into();
let payload_type = Ustr::from("SubmitOrder");
let payload = Bytes::from_static(b"\x01\x02");
let ts_init = UnixNanos::from(42);
let draft = EntryDraft::without_indices(
Headers::empty(),
topic,
payload_type,
payload.clone(),
ts_init,
);
assert!(draft.headers.is_empty());
assert_eq!(draft.topic.as_ref(), "exec.command.SubmitOrder");
assert_eq!(draft.payload_type.as_str(), "SubmitOrder");
assert_eq!(draft.payload, payload);
assert_eq!(draft.ts_init, ts_init);
assert!(draft.index_keys.is_empty());
}
fn assert_backpressure_stall(reason: Option<&HaltReason>, expected_threshold: Duration) {
match reason {
Some(HaltReason::BackpressureStall {
stalled_for,
threshold,
}) => {
assert!(
*stalled_for >= expected_threshold,
"stalled_for {stalled_for:?} must be >= {expected_threshold:?}",
);
assert_eq!(*threshold, expected_threshold);
}
other => panic!("expected BackpressureStall, was {other:?}"),
}
}
}
#[cfg(test)]
#[cfg(madsim)]
mod madsim_tests {
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use indexmap::IndexMap;
use nautilus_core::{UnixNanos, time::get_atomic_clock_static};
use rstest::rstest;
use ustr::Ustr;
use super::*;
use crate::{
backend::{AppendEntry, EventStore, IndexKind, MemoryBackend, ScanDirection},
entry::EventStoreEntry,
error::EventStoreError,
manifest::{RegisteredComponents, RunManifest, RunStatus},
};
fn manifest(run_id: &str) -> RunManifest {
RunManifest {
run_id: run_id.to_string(),
parent_run_id: None,
instance_id: "trader-001".to_string(),
binary_hash: "deadbeef".to_string(),
schema_version: 1,
crate_versions: "feedface".to_string(),
feature_flags: Vec::new(),
adapter_versions: IndexMap::new(),
config_hash: "cafebabe".to_string(),
registered_components: RegisteredComponents::default(),
seed: None,
start_ts_init: UnixNanos::from(0),
end_ts_init: None,
high_watermark: 0,
status: RunStatus::Running,
}
}
fn entry_draft(ts_init: u64) -> EntryDraft {
EntryDraft {
headers: Headers::empty(),
topic: "exec.command.SubmitOrder".into(),
payload_type: Ustr::from("SubmitOrder"),
payload: Bytes::from_static(b"\x01\x02\x03\x04"),
ts_init: UnixNanos::from(ts_init),
index_keys: Vec::new(),
}
}
#[derive(Debug)]
struct SharedMemory(Arc<Mutex<MemoryBackend>>);
impl SharedMemory {
fn new() -> (Self, Arc<Mutex<MemoryBackend>>) {
let arc = Arc::new(Mutex::new(MemoryBackend::new()));
(Self(Arc::clone(&arc)), arc)
}
}
impl EventStore for SharedMemory {
fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
unreachable!("test wrapper does not forward open_run")
}
fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.append_batch(entries)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.scan_range(from, to, direction)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
self.0.lock().expect("shared memory poisoned").scan_seq(seq)
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.lookup(kind, key)
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.iter_index_keys(kind)
}
fn record_snapshot_anchor(
&mut self,
anchor: SnapshotAnchor,
) -> Result<(), EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.record_snapshot_anchor(anchor)
}
fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.latest_snapshot_anchor()
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
self.0.lock().expect("shared memory poisoned").seal(status)
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
self.0.lock().expect("shared memory poisoned").manifest()
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
self.0
.lock()
.expect("shared memory poisoned")
.high_watermark()
}
}
#[rstest]
fn record_snapshot_anchor_records_current_watermark_under_madsim() {
let (wrapper, shared) = SharedMemory::new();
shared
.lock()
.expect("shared")
.open_run(manifest("run-anchor"))
.expect("open");
let writer = EventStoreWriter::spawn(
Box::new(wrapper),
get_atomic_clock_static(),
noop_halt(),
WriterConfig::default(),
)
.expect("spawn");
writer.submit(entry_draft(10)).expect("submit first");
writer.submit(entry_draft(11)).expect("submit second");
let anchor = writer
.record_snapshot_anchor("cache://position-snapshots/P-1/0", "blake3:abc")
.expect("record anchor");
let backend = shared.lock().expect("shared");
assert_eq!(anchor.high_watermark, 2);
assert_eq!(
backend.latest_snapshot_anchor().expect("latest anchor"),
Some(anchor),
);
}
}