use std::{
fmt::Debug,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
};
use nautilus_core::UnixNanos;
use crate::{
capture::{encoder::EncodeError, registry::EncoderRegistry},
entry::Topic,
headers::Headers,
writer::{EntryDraft, EventStoreWriter, HaltCallback, HaltReason, SubmitError},
};
#[derive(Debug, thiserror::Error)]
pub enum CaptureError {
#[error("encode failure: {0}")]
Encode(#[from] EncodeError),
#[error("writer submit failed: {0}")]
Submit(#[from] SubmitError),
#[error("capture adapter halted")]
Halted,
}
pub struct BusCaptureAdapter {
writer: Arc<EventStoreWriter>,
registry: Arc<EncoderRegistry>,
halt: HaltCallback,
halted: AtomicBool,
submit_counter: Option<Arc<AtomicU64>>,
}
impl Debug for BusCaptureAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(BusCaptureAdapter))
.field("registered_encoders", &self.registry.len())
.field("halted", &self.halted.load(Ordering::Acquire))
.finish_non_exhaustive()
}
}
impl BusCaptureAdapter {
#[must_use]
pub fn new(
writer: Arc<EventStoreWriter>,
registry: Arc<EncoderRegistry>,
halt: HaltCallback,
) -> Self {
Self {
writer,
registry,
halt,
halted: AtomicBool::new(false),
submit_counter: None,
}
}
#[must_use]
pub fn with_submit_counter(mut self, submit_counter: Arc<AtomicU64>) -> Self {
self.submit_counter = Some(submit_counter);
self
}
#[must_use]
pub fn is_halted(&self) -> bool {
self.halted.load(Ordering::Acquire)
}
#[must_use]
pub fn registry(&self) -> &EncoderRegistry {
&self.registry
}
#[must_use]
pub fn high_watermark(&self) -> u64 {
self.writer.high_watermark()
}
pub fn capture<T: 'static>(
&self,
topic: Topic,
message: &T,
headers: Headers,
ts_init: UnixNanos,
) -> Result<bool, CaptureError> {
self.capture_any(topic, message as &dyn std::any::Any, headers, ts_init)
}
pub fn capture_any(
&self,
topic: Topic,
message: &dyn std::any::Any,
headers: Headers,
ts_init: UnixNanos,
) -> Result<bool, CaptureError> {
if self.halted.load(Ordering::Acquire) {
return Err(CaptureError::Halted);
}
let Some((payload_type, encoded)) = self.registry.encode_any(message)? else {
return Ok(false);
};
let draft = EntryDraft {
headers,
topic,
payload_type,
payload: encoded.payload,
ts_init,
index_keys: encoded.index_keys,
};
match self.writer.submit(draft) {
Ok(()) => {
if let Some(submit_counter) = self.submit_counter.as_ref() {
submit_counter.fetch_add(1, Ordering::AcqRel);
}
Ok(true)
}
Err(e) => {
self.fail_stop(&e);
Err(CaptureError::Submit(e))
}
}
}
fn fail_stop(&self, err: &SubmitError) {
if self
.halted
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
(self.halt)(halt_reason_from_submit(err));
}
}
}
fn halt_reason_from_submit(err: &SubmitError) -> HaltReason {
match err {
SubmitError::HaltSignaled {
stalled_for,
threshold,
} => HaltReason::BackpressureStall {
stalled_for: *stalled_for,
threshold: *threshold,
},
SubmitError::Closed => HaltReason::BackendError("event store writer closed".to_string()),
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use bytes::Bytes;
use indexmap::IndexMap;
use nautilus_core::{UnixNanos, time::get_atomic_clock_static};
use rstest::{fixture, rstest};
use ustr::Ustr;
use super::*;
use crate::{
backend::{AppendEntry, EventStore, IndexKey, IndexKind, MemoryBackend, ScanDirection},
capture::encoder::EncodedPayload,
entry::EventStoreEntry,
error::EventStoreError,
manifest::{RegisteredComponents, RunManifest, RunStatus},
writer::WriterConfig,
};
#[derive(Debug)]
struct StubCommand {
client_order_id: String,
}
#[derive(Debug)]
struct StubEvent {
client_order_id: String,
venue_order_id: String,
}
#[derive(Debug)]
struct UnknownMessage;
#[derive(Debug)]
struct FailingMessage;
fn manifest(run_id: &str) -> RunManifest {
RunManifest {
run_id: run_id.to_string(),
parent_run_id: None,
instance_id: "trader-001".to_string(),
binary_hash: "deadbeef".to_string(),
schema_version: 1,
crate_versions: "feedface".to_string(),
feature_flags: Vec::new(),
adapter_versions: IndexMap::new(),
config_hash: "cafebabe".to_string(),
registered_components: RegisteredComponents::default(),
seed: None,
start_ts_init: UnixNanos::from(0),
end_ts_init: None,
high_watermark: 0,
status: RunStatus::Running,
}
}
fn stub_registry() -> Arc<EncoderRegistry> {
let mut registry = EncoderRegistry::new();
registry.register::<StubCommand, _>(Ustr::from("StubCommand"), |c| {
Ok(EncodedPayload::new(
Bytes::copy_from_slice(c.client_order_id.as_bytes()),
vec![IndexKey::new(
IndexKind::ClientOrderId,
c.client_order_id.clone(),
)],
))
});
registry.register::<StubEvent, _>(Ustr::from("StubEvent"), |e| {
Ok(EncodedPayload::new(
Bytes::copy_from_slice(e.client_order_id.as_bytes()),
vec![
IndexKey::new(IndexKind::ClientOrderId, e.client_order_id.clone()),
IndexKey::new(IndexKind::VenueOrderId, e.venue_order_id.clone()),
],
))
});
registry.register::<FailingMessage, _>(Ustr::from("FailingMessage"), |_| {
Err(EncodeError::Serialize(
"encoder rejected message".to_string(),
))
});
Arc::new(registry)
}
#[fixture]
fn captured_halt() -> (HaltCallback, Arc<Mutex<Vec<HaltReason>>>) {
let captured: Arc<Mutex<Vec<HaltReason>>> = Arc::new(Mutex::new(Vec::new()));
let captured_for_cb = Arc::clone(&captured);
let halt: HaltCallback = Arc::new(move |reason| {
captured_for_cb
.lock()
.expect("captured halt poisoned")
.push(reason);
});
(halt, captured)
}
fn writer_with_open_run(
run_id: &str,
halt: HaltCallback,
) -> (Arc<EventStoreWriter>, Arc<Mutex<MemoryBackend>>) {
let backend_arc: Arc<Mutex<MemoryBackend>> = Arc::new(Mutex::new(MemoryBackend::new()));
backend_arc
.lock()
.expect("inner")
.open_run(manifest(run_id))
.expect("open run");
let wrapper = SharedMemory(Arc::clone(&backend_arc));
let writer = EventStoreWriter::spawn(
Box::new(wrapper),
get_atomic_clock_static(),
halt,
WriterConfig::default(),
)
.expect("spawn");
(Arc::new(writer), backend_arc)
}
#[derive(Debug)]
struct SharedMemory(Arc<Mutex<MemoryBackend>>);
impl EventStore for SharedMemory {
fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
unreachable!("test wrapper does not forward open_run")
}
fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.0.lock().expect("shared").append_batch(entries)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
self.0
.lock()
.expect("shared")
.scan_range(from, to, direction)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
self.0.lock().expect("shared").scan_seq(seq)
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
self.0.lock().expect("shared").lookup(kind, key)
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
self.0.lock().expect("shared").iter_index_keys(kind)
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
self.0.lock().expect("shared").seal(status)
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
self.0.lock().expect("shared").manifest()
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
self.0.lock().expect("shared").high_watermark()
}
}
fn drain(writer: &Arc<EventStoreWriter>, target_hwm: u64) {
let mut waited = Duration::ZERO;
let deadline = Duration::from_secs(2);
while writer.high_watermark() < target_hwm && waited < deadline {
std::thread::sleep(Duration::from_millis(5));
waited += Duration::from_millis(5);
}
assert!(
writer.high_watermark() >= target_hwm,
"writer high_watermark {} did not reach {target_hwm} within {:?}",
writer.high_watermark(),
deadline,
);
}
#[rstest]
fn capture_records_registered_command_and_returns_true(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let (writer, backend) = writer_with_open_run("run-cmd", Arc::clone(&halt));
let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
let cmd = StubCommand {
client_order_id: "O-1".to_string(),
};
let captured_flag = adapter
.capture::<StubCommand>(
Topic::from("exec.command.SubmitOrder"),
&cmd,
Headers::empty(),
UnixNanos::from(100),
)
.expect("capture");
assert!(captured_flag);
drain(&writer, 1);
let backend = backend.lock().expect("backend");
let entry = backend.scan_seq(1).expect("scan").expect("present");
assert_eq!(entry.payload_type.as_str(), "StubCommand");
assert_eq!(entry.topic.as_ref(), "exec.command.SubmitOrder");
assert_eq!(entry.payload.as_ref(), b"O-1");
let seq = backend
.lookup(IndexKind::ClientOrderId, "O-1")
.expect("lookup")
.expect("indexed");
assert_eq!(seq, 1);
assert!(captured.lock().expect("captured").is_empty());
assert!(!adapter.is_halted());
}
#[rstest]
fn capture_returns_false_for_unknown_type(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, _captured) = captured_halt;
let (writer, _backend) = writer_with_open_run("run-unknown", Arc::clone(&halt));
let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
let captured_flag = adapter
.capture::<UnknownMessage>(
Topic::from("data.market.unknown"),
&UnknownMessage,
Headers::empty(),
UnixNanos::from(50),
)
.expect("capture");
assert!(!captured_flag);
assert_eq!(writer.high_watermark(), 0);
assert!(!adapter.is_halted());
}
#[rstest]
fn submit_counter_increments_on_each_captured_entry(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, _captured) = captured_halt;
let (writer, _backend) = writer_with_open_run("run-submit-counter", Arc::clone(&halt));
let submit_counter = Arc::new(AtomicU64::new(1));
let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt)
.with_submit_counter(Arc::clone(&submit_counter));
adapter
.capture::<StubCommand>(
Topic::from("exec.command.SubmitOrder"),
&StubCommand {
client_order_id: "O-counter-1".to_string(),
},
Headers::empty(),
UnixNanos::from(100),
)
.expect("first capture");
adapter
.capture::<UnknownMessage>(
Topic::from("data.market.unknown"),
&UnknownMessage,
Headers::empty(),
UnixNanos::from(101),
)
.expect("unknown type");
adapter
.capture::<StubEvent>(
Topic::from("exec.event.OrderFilled"),
&StubEvent {
client_order_id: "O-counter-1".to_string(),
venue_order_id: "V-counter-1".to_string(),
},
Headers::empty(),
UnixNanos::from(102),
)
.expect("second capture");
assert_eq!(submit_counter.load(Ordering::Acquire), 3);
}
#[rstest]
fn capture_records_event_indices_atomically(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, _captured) = captured_halt;
let (writer, backend) = writer_with_open_run("run-event", Arc::clone(&halt));
let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
let event = StubEvent {
client_order_id: "O-2".to_string(),
venue_order_id: "V-9".to_string(),
};
adapter
.capture::<StubEvent>(
Topic::from("exec.event.OrderFilled"),
&event,
Headers::empty(),
UnixNanos::from(200),
)
.expect("capture");
drain(&writer, 1);
let backend = backend.lock().expect("backend");
let by_client = backend
.lookup(IndexKind::ClientOrderId, "O-2")
.expect("lookup")
.expect("indexed");
let by_venue = backend
.lookup(IndexKind::VenueOrderId, "V-9")
.expect("lookup")
.expect("indexed");
assert_eq!(by_client, 1);
assert_eq!(by_venue, 1);
}
#[rstest]
fn capture_propagates_encoder_error_without_halting(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let (writer, backend) = writer_with_open_run("run-encode-err", Arc::clone(&halt));
let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
let err = adapter
.capture::<FailingMessage>(
Topic::from("exec.command.Failing"),
&FailingMessage,
Headers::empty(),
UnixNanos::from(500),
)
.expect_err("encoder must reject");
match err {
CaptureError::Encode(EncodeError::Serialize(msg)) => {
assert!(msg.contains("rejected"), "msg was: {msg}");
}
other => panic!("expected Encode(Serialize), was {other:?}"),
}
assert!(
!adapter.is_halted(),
"encoder failure must not fail-stop the adapter",
);
assert!(captured.lock().expect("captured").is_empty());
adapter
.capture::<StubCommand>(
Topic::from("exec.command.SubmitOrder"),
&StubCommand {
client_order_id: "O-after-encode-err".to_string(),
},
Headers::empty(),
UnixNanos::from(501),
)
.expect("capture after encoder error");
drain(&writer, 1);
let backend = backend.lock().expect("backend");
assert_eq!(backend.high_watermark().expect("hwm"), 1);
}
#[rstest]
#[case::backpressure(
SubmitError::HaltSignaled {
stalled_for: Duration::from_millis(750),
threshold: Duration::from_millis(250),
},
HaltReason::BackpressureStall {
stalled_for: Duration::from_millis(750),
threshold: Duration::from_millis(250),
},
)]
#[case::closed(
SubmitError::Closed,
HaltReason::BackendError("event store writer closed".to_string()),
)]
fn halt_reason_from_submit_preserves_failure_context(
#[case] err: SubmitError,
#[case] expected: HaltReason,
) {
let actual = halt_reason_from_submit(&err);
match (actual, expected) {
(
HaltReason::BackpressureStall {
stalled_for: a_s,
threshold: a_t,
},
HaltReason::BackpressureStall {
stalled_for: e_s,
threshold: e_t,
},
) => {
assert_eq!(a_s, e_s);
assert_eq!(a_t, e_t);
}
(HaltReason::BackendError(a), HaltReason::BackendError(e)) => {
assert_eq!(a, e);
}
(actual, expected) => {
panic!("variant mismatch: actual={actual:?} expected={expected:?}")
}
}
}
#[rstest]
fn submit_failure_halts_adapter_and_fires_callback_once(
captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
) {
let (halt, captured) = captured_halt;
let (writer, _backend) = writer_with_open_run("run-halt", Arc::clone(&halt));
let writer_clone = Arc::clone(&writer);
let adapter = BusCaptureAdapter::new(writer_clone, stub_registry(), halt);
drop(writer);
let halt_for_stub: HaltCallback = adapter_halt_for(&captured);
let stub_adapter = StubFailAdapter::new(halt_for_stub);
let err = stub_adapter
.capture::<StubCommand>(
Topic::from("exec.command.SubmitOrder"),
&StubCommand {
client_order_id: "O-fail".to_string(),
},
Headers::empty(),
UnixNanos::from(1),
)
.expect_err("first submit fails");
assert!(matches!(err, CaptureError::Submit(SubmitError::Closed)));
assert!(stub_adapter.is_halted());
assert_eq!(captured.lock().expect("captured").len(), 1);
let err2 = stub_adapter
.capture::<StubCommand>(
Topic::from("exec.command.SubmitOrder"),
&StubCommand {
client_order_id: "O-fail-2".to_string(),
},
Headers::empty(),
UnixNanos::from(2),
)
.expect_err("second submit short-circuits");
assert!(matches!(err2, CaptureError::Halted));
assert_eq!(
captured.lock().expect("captured").len(),
1,
"halt callback must not refire after the first failure",
);
drop(adapter);
}
fn adapter_halt_for(captured: &Arc<Mutex<Vec<HaltReason>>>) -> HaltCallback {
let captured_for_cb = Arc::clone(captured);
Arc::new(move |reason| {
captured_for_cb
.lock()
.expect("captured halt poisoned")
.push(reason);
})
}
struct StubFailAdapter {
registry: Arc<EncoderRegistry>,
halt: HaltCallback,
halted: AtomicBool,
}
impl StubFailAdapter {
fn new(halt: HaltCallback) -> Self {
Self {
registry: stub_registry(),
halt,
halted: AtomicBool::new(false),
}
}
fn is_halted(&self) -> bool {
self.halted.load(Ordering::Acquire)
}
fn capture<T: 'static>(
&self,
_topic: Topic,
message: &T,
_headers: Headers,
_ts_init: UnixNanos,
) -> Result<bool, CaptureError> {
if self.halted.load(Ordering::Acquire) {
return Err(CaptureError::Halted);
}
let Some((_pt, _encoded)) = self.registry.encode(message)? else {
return Ok(false);
};
let err = SubmitError::Closed;
if self
.halted
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
(self.halt)(super::halt_reason_from_submit(&err));
}
Err(CaptureError::Submit(err))
}
}
}