1use std::{
36 fmt::Debug,
37 sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 },
41};
42
43use nautilus_core::UnixNanos;
44
45use crate::{
46 capture::{encoder::EncodeError, registry::EncoderRegistry},
47 entry::Topic,
48 headers::Headers,
49 writer::{EntryDraft, EventStoreWriter, HaltCallback, HaltReason, SubmitError},
50};
51
52#[derive(Debug, thiserror::Error)]
58pub enum CaptureError {
59 #[error("encode failure: {0}")]
61 Encode(#[from] EncodeError),
62 #[error("writer submit failed: {0}")]
67 Submit(#[from] SubmitError),
68 #[error("capture adapter halted")]
74 Halted,
75}
76
77pub struct BusCaptureAdapter {
84 writer: Arc<EventStoreWriter>,
85 registry: Arc<EncoderRegistry>,
86 halt: HaltCallback,
87 halted: AtomicBool,
88 submit_counter: Option<Arc<AtomicU64>>,
89}
90
91impl Debug for BusCaptureAdapter {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct(stringify!(BusCaptureAdapter))
94 .field("registered_encoders", &self.registry.len())
95 .field("halted", &self.halted.load(Ordering::Acquire))
96 .finish_non_exhaustive()
97 }
98}
99
100impl BusCaptureAdapter {
101 #[must_use]
108 pub fn new(
109 writer: Arc<EventStoreWriter>,
110 registry: Arc<EncoderRegistry>,
111 halt: HaltCallback,
112 ) -> Self {
113 Self {
114 writer,
115 registry,
116 halt,
117 halted: AtomicBool::new(false),
118 submit_counter: None,
119 }
120 }
121
122 #[must_use]
124 pub fn with_submit_counter(mut self, submit_counter: Arc<AtomicU64>) -> Self {
125 self.submit_counter = Some(submit_counter);
126 self
127 }
128
129 #[must_use]
131 pub fn is_halted(&self) -> bool {
132 self.halted.load(Ordering::Acquire)
133 }
134
135 #[must_use]
137 pub fn registry(&self) -> &EncoderRegistry {
138 &self.registry
139 }
140
141 #[must_use]
143 pub fn high_watermark(&self) -> u64 {
144 self.writer.high_watermark()
145 }
146
147 pub fn capture<T: 'static>(
169 &self,
170 topic: Topic,
171 message: &T,
172 headers: Headers,
173 ts_init: UnixNanos,
174 ) -> Result<bool, CaptureError> {
175 self.capture_any(topic, message as &dyn std::any::Any, headers, ts_init)
176 }
177
178 pub fn capture_any(
189 &self,
190 topic: Topic,
191 message: &dyn std::any::Any,
192 headers: Headers,
193 ts_init: UnixNanos,
194 ) -> Result<bool, CaptureError> {
195 if self.halted.load(Ordering::Acquire) {
196 return Err(CaptureError::Halted);
197 }
198
199 let Some((payload_type, encoded)) = self.registry.encode_any(message)? else {
200 return Ok(false);
201 };
202
203 let draft = EntryDraft {
204 headers,
205 topic,
206 payload_type,
207 payload: encoded.payload,
208 ts_init,
209 index_keys: encoded.index_keys,
210 };
211
212 match self.writer.submit(draft) {
213 Ok(()) => {
214 if let Some(submit_counter) = self.submit_counter.as_ref() {
215 submit_counter.fetch_add(1, Ordering::AcqRel);
216 }
217 Ok(true)
218 }
219 Err(e) => {
220 self.fail_stop(&e);
221 Err(CaptureError::Submit(e))
222 }
223 }
224 }
225
226 fn fail_stop(&self, err: &SubmitError) {
227 if self
228 .halted
229 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
230 .is_ok()
231 {
232 (self.halt)(halt_reason_from_submit(err));
233 }
234 }
235}
236
237fn halt_reason_from_submit(err: &SubmitError) -> HaltReason {
245 match err {
246 SubmitError::HaltSignaled {
247 stalled_for,
248 threshold,
249 } => HaltReason::BackpressureStall {
250 stalled_for: *stalled_for,
251 threshold: *threshold,
252 },
253 SubmitError::Closed => HaltReason::BackendError("event store writer closed".to_string()),
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use std::{
260 sync::{
261 Arc, Mutex,
262 atomic::{AtomicU64, Ordering},
263 },
264 time::Duration,
265 };
266
267 use bytes::Bytes;
268 use indexmap::IndexMap;
269 use nautilus_core::{UnixNanos, time::get_atomic_clock_static};
270 use rstest::{fixture, rstest};
271 use ustr::Ustr;
272
273 use super::*;
274 use crate::{
275 backend::{AppendEntry, EventStore, IndexKey, IndexKind, MemoryBackend, ScanDirection},
276 capture::encoder::EncodedPayload,
277 entry::EventStoreEntry,
278 error::EventStoreError,
279 manifest::{RegisteredComponents, RunManifest, RunStatus},
280 writer::WriterConfig,
281 };
282
283 #[derive(Debug)]
284 struct StubCommand {
285 client_order_id: String,
286 }
287
288 #[derive(Debug)]
289 struct StubEvent {
290 client_order_id: String,
291 venue_order_id: String,
292 }
293
294 #[derive(Debug)]
295 struct UnknownMessage;
296
297 #[derive(Debug)]
298 struct FailingMessage;
299
300 fn manifest(run_id: &str) -> RunManifest {
301 RunManifest {
302 run_id: run_id.to_string(),
303 parent_run_id: None,
304 instance_id: "trader-001".to_string(),
305 binary_hash: "deadbeef".to_string(),
306 schema_version: 1,
307 crate_versions: "feedface".to_string(),
308 feature_flags: Vec::new(),
309 adapter_versions: IndexMap::new(),
310 config_hash: "cafebabe".to_string(),
311 registered_components: RegisteredComponents::default(),
312 seed: None,
313 start_ts_init: UnixNanos::from(0),
314 end_ts_init: None,
315 high_watermark: 0,
316 status: RunStatus::Running,
317 }
318 }
319
320 fn stub_registry() -> Arc<EncoderRegistry> {
321 let mut registry = EncoderRegistry::new();
322 registry.register::<StubCommand, _>(Ustr::from("StubCommand"), |c| {
323 Ok(EncodedPayload::new(
324 Bytes::copy_from_slice(c.client_order_id.as_bytes()),
325 vec![IndexKey::new(
326 IndexKind::ClientOrderId,
327 c.client_order_id.clone(),
328 )],
329 ))
330 });
331 registry.register::<StubEvent, _>(Ustr::from("StubEvent"), |e| {
332 Ok(EncodedPayload::new(
333 Bytes::copy_from_slice(e.client_order_id.as_bytes()),
334 vec![
335 IndexKey::new(IndexKind::ClientOrderId, e.client_order_id.clone()),
336 IndexKey::new(IndexKind::VenueOrderId, e.venue_order_id.clone()),
337 ],
338 ))
339 });
340 registry.register::<FailingMessage, _>(Ustr::from("FailingMessage"), |_| {
341 Err(EncodeError::Serialize(
342 "encoder rejected message".to_string(),
343 ))
344 });
345 Arc::new(registry)
346 }
347
348 #[fixture]
349 fn captured_halt() -> (HaltCallback, Arc<Mutex<Vec<HaltReason>>>) {
350 let captured: Arc<Mutex<Vec<HaltReason>>> = Arc::new(Mutex::new(Vec::new()));
351 let captured_for_cb = Arc::clone(&captured);
352 let halt: HaltCallback = Arc::new(move |reason| {
353 captured_for_cb
354 .lock()
355 .expect("captured halt poisoned")
356 .push(reason);
357 });
358 (halt, captured)
359 }
360
361 fn writer_with_open_run(
362 run_id: &str,
363 halt: HaltCallback,
364 ) -> (Arc<EventStoreWriter>, Arc<Mutex<MemoryBackend>>) {
365 let backend_arc: Arc<Mutex<MemoryBackend>> = Arc::new(Mutex::new(MemoryBackend::new()));
366 backend_arc
367 .lock()
368 .expect("inner")
369 .open_run(manifest(run_id))
370 .expect("open run");
371
372 let wrapper = SharedMemory(Arc::clone(&backend_arc));
373 let writer = EventStoreWriter::spawn(
374 Box::new(wrapper),
375 get_atomic_clock_static(),
376 halt,
377 WriterConfig::default(),
378 )
379 .expect("spawn");
380 (Arc::new(writer), backend_arc)
381 }
382
383 #[derive(Debug)]
386 struct SharedMemory(Arc<Mutex<MemoryBackend>>);
387
388 impl EventStore for SharedMemory {
389 fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
390 unreachable!("test wrapper does not forward open_run")
391 }
392
393 fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
394 self.0.lock().expect("shared").append_batch(entries)
395 }
396
397 fn scan_range(
398 &self,
399 from: u64,
400 to: u64,
401 direction: ScanDirection,
402 ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
403 self.0
404 .lock()
405 .expect("shared")
406 .scan_range(from, to, direction)
407 }
408
409 fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
410 self.0.lock().expect("shared").scan_seq(seq)
411 }
412
413 fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
414 self.0.lock().expect("shared").lookup(kind, key)
415 }
416
417 fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
418 self.0.lock().expect("shared").iter_index_keys(kind)
419 }
420
421 fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
422 self.0.lock().expect("shared").seal(status)
423 }
424
425 fn manifest(&self) -> Result<RunManifest, EventStoreError> {
426 self.0.lock().expect("shared").manifest()
427 }
428
429 fn high_watermark(&self) -> Result<u64, EventStoreError> {
430 self.0.lock().expect("shared").high_watermark()
431 }
432 }
433
434 fn drain(writer: &Arc<EventStoreWriter>, target_hwm: u64) {
435 let mut waited = Duration::ZERO;
436 let deadline = Duration::from_secs(2);
437 while writer.high_watermark() < target_hwm && waited < deadline {
438 std::thread::sleep(Duration::from_millis(5));
439 waited += Duration::from_millis(5);
440 }
441 assert!(
442 writer.high_watermark() >= target_hwm,
443 "writer high_watermark {} did not reach {target_hwm} within {:?}",
444 writer.high_watermark(),
445 deadline,
446 );
447 }
448
449 #[rstest]
450 fn capture_records_registered_command_and_returns_true(
451 captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
452 ) {
453 let (halt, captured) = captured_halt;
454 let (writer, backend) = writer_with_open_run("run-cmd", Arc::clone(&halt));
455 let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
456
457 let cmd = StubCommand {
458 client_order_id: "O-1".to_string(),
459 };
460 let captured_flag = adapter
461 .capture::<StubCommand>(
462 Topic::from("exec.command.SubmitOrder"),
463 &cmd,
464 Headers::empty(),
465 UnixNanos::from(100),
466 )
467 .expect("capture");
468
469 assert!(captured_flag);
470 drain(&writer, 1);
471
472 let backend = backend.lock().expect("backend");
473 let entry = backend.scan_seq(1).expect("scan").expect("present");
474 assert_eq!(entry.payload_type.as_str(), "StubCommand");
475 assert_eq!(entry.topic.as_ref(), "exec.command.SubmitOrder");
476 assert_eq!(entry.payload.as_ref(), b"O-1");
477
478 let seq = backend
479 .lookup(IndexKind::ClientOrderId, "O-1")
480 .expect("lookup")
481 .expect("indexed");
482 assert_eq!(seq, 1);
483
484 assert!(captured.lock().expect("captured").is_empty());
485 assert!(!adapter.is_halted());
486 }
487
488 #[rstest]
489 fn capture_returns_false_for_unknown_type(
490 captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
491 ) {
492 let (halt, _captured) = captured_halt;
493 let (writer, _backend) = writer_with_open_run("run-unknown", Arc::clone(&halt));
494 let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
495
496 let captured_flag = adapter
497 .capture::<UnknownMessage>(
498 Topic::from("data.market.unknown"),
499 &UnknownMessage,
500 Headers::empty(),
501 UnixNanos::from(50),
502 )
503 .expect("capture");
504
505 assert!(!captured_flag);
506 assert_eq!(writer.high_watermark(), 0);
507 assert!(!adapter.is_halted());
508 }
509
510 #[rstest]
511 fn submit_counter_increments_on_each_captured_entry(
512 captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
513 ) {
514 let (halt, _captured) = captured_halt;
515 let (writer, _backend) = writer_with_open_run("run-submit-counter", Arc::clone(&halt));
516 let submit_counter = Arc::new(AtomicU64::new(1));
517 let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt)
518 .with_submit_counter(Arc::clone(&submit_counter));
519
520 adapter
521 .capture::<StubCommand>(
522 Topic::from("exec.command.SubmitOrder"),
523 &StubCommand {
524 client_order_id: "O-counter-1".to_string(),
525 },
526 Headers::empty(),
527 UnixNanos::from(100),
528 )
529 .expect("first capture");
530 adapter
531 .capture::<UnknownMessage>(
532 Topic::from("data.market.unknown"),
533 &UnknownMessage,
534 Headers::empty(),
535 UnixNanos::from(101),
536 )
537 .expect("unknown type");
538 adapter
539 .capture::<StubEvent>(
540 Topic::from("exec.event.OrderFilled"),
541 &StubEvent {
542 client_order_id: "O-counter-1".to_string(),
543 venue_order_id: "V-counter-1".to_string(),
544 },
545 Headers::empty(),
546 UnixNanos::from(102),
547 )
548 .expect("second capture");
549
550 assert_eq!(submit_counter.load(Ordering::Acquire), 3);
551 }
552
553 #[rstest]
554 fn capture_records_event_indices_atomically(
555 captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
556 ) {
557 let (halt, _captured) = captured_halt;
558 let (writer, backend) = writer_with_open_run("run-event", Arc::clone(&halt));
559 let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
560
561 let event = StubEvent {
562 client_order_id: "O-2".to_string(),
563 venue_order_id: "V-9".to_string(),
564 };
565 adapter
566 .capture::<StubEvent>(
567 Topic::from("exec.event.OrderFilled"),
568 &event,
569 Headers::empty(),
570 UnixNanos::from(200),
571 )
572 .expect("capture");
573 drain(&writer, 1);
574
575 let backend = backend.lock().expect("backend");
576 let by_client = backend
577 .lookup(IndexKind::ClientOrderId, "O-2")
578 .expect("lookup")
579 .expect("indexed");
580 let by_venue = backend
581 .lookup(IndexKind::VenueOrderId, "V-9")
582 .expect("lookup")
583 .expect("indexed");
584 assert_eq!(by_client, 1);
585 assert_eq!(by_venue, 1);
586 }
587
588 #[rstest]
589 fn capture_propagates_encoder_error_without_halting(
590 captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
591 ) {
592 let (halt, captured) = captured_halt;
596 let (writer, backend) = writer_with_open_run("run-encode-err", Arc::clone(&halt));
597 let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
598
599 let err = adapter
600 .capture::<FailingMessage>(
601 Topic::from("exec.command.Failing"),
602 &FailingMessage,
603 Headers::empty(),
604 UnixNanos::from(500),
605 )
606 .expect_err("encoder must reject");
607
608 match err {
609 CaptureError::Encode(EncodeError::Serialize(msg)) => {
610 assert!(msg.contains("rejected"), "msg was: {msg}");
611 }
612 other => panic!("expected Encode(Serialize), was {other:?}"),
613 }
614 assert!(
615 !adapter.is_halted(),
616 "encoder failure must not fail-stop the adapter",
617 );
618 assert!(captured.lock().expect("captured").is_empty());
619
620 adapter
622 .capture::<StubCommand>(
623 Topic::from("exec.command.SubmitOrder"),
624 &StubCommand {
625 client_order_id: "O-after-encode-err".to_string(),
626 },
627 Headers::empty(),
628 UnixNanos::from(501),
629 )
630 .expect("capture after encoder error");
631 drain(&writer, 1);
632 let backend = backend.lock().expect("backend");
633 assert_eq!(backend.high_watermark().expect("hwm"), 1);
634 }
635
636 #[rstest]
637 #[case::backpressure(
638 SubmitError::HaltSignaled {
639 stalled_for: Duration::from_millis(750),
640 threshold: Duration::from_millis(250),
641 },
642 HaltReason::BackpressureStall {
643 stalled_for: Duration::from_millis(750),
644 threshold: Duration::from_millis(250),
645 },
646 )]
647 #[case::closed(
648 SubmitError::Closed,
649 HaltReason::BackendError("event store writer closed".to_string()),
650 )]
651 fn halt_reason_from_submit_preserves_failure_context(
652 #[case] err: SubmitError,
653 #[case] expected: HaltReason,
654 ) {
655 let actual = halt_reason_from_submit(&err);
656
657 match (actual, expected) {
658 (
659 HaltReason::BackpressureStall {
660 stalled_for: a_s,
661 threshold: a_t,
662 },
663 HaltReason::BackpressureStall {
664 stalled_for: e_s,
665 threshold: e_t,
666 },
667 ) => {
668 assert_eq!(a_s, e_s);
669 assert_eq!(a_t, e_t);
670 }
671 (HaltReason::BackendError(a), HaltReason::BackendError(e)) => {
672 assert_eq!(a, e);
673 }
674 (actual, expected) => {
675 panic!("variant mismatch: actual={actual:?} expected={expected:?}")
676 }
677 }
678 }
679
680 #[rstest]
681 fn submit_failure_halts_adapter_and_fires_callback_once(
682 captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
683 ) {
684 let (halt, captured) = captured_halt;
688 let (writer, _backend) = writer_with_open_run("run-halt", Arc::clone(&halt));
689
690 let writer_clone = Arc::clone(&writer);
692 let adapter = BusCaptureAdapter::new(writer_clone, stub_registry(), halt);
695
696 drop(writer);
701
702 let halt_for_stub: HaltCallback = adapter_halt_for(&captured);
705 let stub_adapter = StubFailAdapter::new(halt_for_stub);
706
707 let err = stub_adapter
708 .capture::<StubCommand>(
709 Topic::from("exec.command.SubmitOrder"),
710 &StubCommand {
711 client_order_id: "O-fail".to_string(),
712 },
713 Headers::empty(),
714 UnixNanos::from(1),
715 )
716 .expect_err("first submit fails");
717 assert!(matches!(err, CaptureError::Submit(SubmitError::Closed)));
718 assert!(stub_adapter.is_halted());
719 assert_eq!(captured.lock().expect("captured").len(), 1);
720
721 let err2 = stub_adapter
722 .capture::<StubCommand>(
723 Topic::from("exec.command.SubmitOrder"),
724 &StubCommand {
725 client_order_id: "O-fail-2".to_string(),
726 },
727 Headers::empty(),
728 UnixNanos::from(2),
729 )
730 .expect_err("second submit short-circuits");
731 assert!(matches!(err2, CaptureError::Halted));
732 assert_eq!(
733 captured.lock().expect("captured").len(),
734 1,
735 "halt callback must not refire after the first failure",
736 );
737
738 drop(adapter);
740 }
741
742 fn adapter_halt_for(captured: &Arc<Mutex<Vec<HaltReason>>>) -> HaltCallback {
743 let captured_for_cb = Arc::clone(captured);
744 Arc::new(move |reason| {
745 captured_for_cb
746 .lock()
747 .expect("captured halt poisoned")
748 .push(reason);
749 })
750 }
751
752 struct StubFailAdapter {
756 registry: Arc<EncoderRegistry>,
757 halt: HaltCallback,
758 halted: AtomicBool,
759 }
760
761 impl StubFailAdapter {
762 fn new(halt: HaltCallback) -> Self {
763 Self {
764 registry: stub_registry(),
765 halt,
766 halted: AtomicBool::new(false),
767 }
768 }
769
770 fn is_halted(&self) -> bool {
771 self.halted.load(Ordering::Acquire)
772 }
773
774 fn capture<T: 'static>(
775 &self,
776 _topic: Topic,
777 message: &T,
778 _headers: Headers,
779 _ts_init: UnixNanos,
780 ) -> Result<bool, CaptureError> {
781 if self.halted.load(Ordering::Acquire) {
782 return Err(CaptureError::Halted);
783 }
784 let Some((_pt, _encoded)) = self.registry.encode(message)? else {
785 return Ok(false);
786 };
787 let err = SubmitError::Closed;
788
789 if self
790 .halted
791 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
792 .is_ok()
793 {
794 (self.halt)(super::halt_reason_from_submit(&err));
795 }
796 Err(CaptureError::Submit(err))
797 }
798 }
799}