Skip to main content

nautilus_event_store/capture/
adapter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The bus capture adapter.
17//!
18//! [`BusCaptureAdapter`] is the seam between the message bus and the
19//! [`EventStoreWriter`]. The kernel calls [`BusCaptureAdapter::capture`] inside its bus
20//! dispatch wrappers, immediately before the message reaches downstream handlers, so
21//! every captured entry is durably submitted *before* a subscriber observes it. The
22//! adapter consults the [`EncoderRegistry`] allow-list to decide whether to capture, and
23//! converts the typed message into an [`EntryDraft`] for the writer.
24//!
25//! No-drop contract: any [`SubmitError`] from the writer fires the adapter's halt
26//! callback exactly once (so kernel fail-stop runs even when the writer's own halt path
27//! has not, such as when a caller closes the writer externally) and surfaces as
28//! [`CaptureError::Submit`]. Subsequent capture calls short-circuit with
29//! [`CaptureError::Halted`] without re-entering the writer.
30//!
31//! Under `cfg(madsim)` the writer's `submit` is a synchronous in-thread commit, so the
32//! adapter exposes the same surface and no thread-scheduling differences leak into
33//! tests.
34
35use std::{
36    fmt::Debug,
37    sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU64, Ordering},
40    },
41};
42
43use nautilus_core::UnixNanos;
44
45use crate::{
46    capture::{encoder::EncodeError, registry::EncoderRegistry},
47    entry::Topic,
48    headers::Headers,
49    writer::{EntryDraft, EventStoreWriter, HaltCallback, HaltReason, SubmitError},
50};
51
52/// Errors returned by [`BusCaptureAdapter::capture`].
53///
54/// Each variant maps to a SPEC-named failure mode at the dispatch boundary; the kernel's
55/// fail-stop callback is the system response to [`CaptureError::Submit`] and
56/// [`CaptureError::Halted`].
57#[derive(Debug, thiserror::Error)]
58pub enum CaptureError {
59    /// The encoder rejected the message.
60    #[error("encode failure: {0}")]
61    Encode(#[from] EncodeError),
62    /// The writer rejected the submit.
63    ///
64    /// The adapter halt callback has fired before this error returns, so the kernel
65    /// fail-stop path is already in motion when the caller observes it.
66    #[error("writer submit failed: {0}")]
67    Submit(#[from] SubmitError),
68    /// A prior capture observed a writer failure and the adapter has fail-stopped.
69    ///
70    /// The halt callback fired on the original failure; subsequent captures short-circuit
71    /// without re-entering the writer to keep the no-drop contract intact (a stuck or
72    /// closed writer must not silently swallow captures).
73    #[error("capture adapter halted")]
74    Halted,
75}
76
77/// Captures bus traffic and forwards encoded entries to the [`EventStoreWriter`].
78///
79/// One adapter instance owns one writer; the kernel constructs the adapter after spawning
80/// the writer, then registers it with the bus dispatch wrappers. The adapter is `Send +
81/// Sync` so it can be shared between bus subscribers, but in practice the message bus is
82/// single-threaded and the adapter lives on the engine thread.
83pub struct BusCaptureAdapter {
84    writer: Arc<EventStoreWriter>,
85    registry: Arc<EncoderRegistry>,
86    halt: HaltCallback,
87    halted: AtomicBool,
88    submit_counter: Option<Arc<AtomicU64>>,
89}
90
91impl Debug for BusCaptureAdapter {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct(stringify!(BusCaptureAdapter))
94            .field("registered_encoders", &self.registry.len())
95            .field("halted", &self.halted.load(Ordering::Acquire))
96            .finish_non_exhaustive()
97    }
98}
99
100impl BusCaptureAdapter {
101    /// Constructs a new adapter over `writer`, `registry`, and `halt`.
102    ///
103    /// `halt` is the adapter-level fail-stop callback. The writer carries its own halt
104    /// callback for backend and backpressure failures; the adapter callback fires on any
105    /// submit error so [`SubmitError::Closed`] (which can originate outside the writer's
106    /// own halt path, e.g. an external close) still reaches the kernel.
107    #[must_use]
108    pub fn new(
109        writer: Arc<EventStoreWriter>,
110        registry: Arc<EncoderRegistry>,
111        halt: HaltCallback,
112    ) -> Self {
113        Self {
114            writer,
115            registry,
116            halt,
117            halted: AtomicBool::new(false),
118            submit_counter: None,
119        }
120    }
121
122    /// Shares an entry-submit counter with the data-marker capture path.
123    #[must_use]
124    pub fn with_submit_counter(mut self, submit_counter: Arc<AtomicU64>) -> Self {
125        self.submit_counter = Some(submit_counter);
126        self
127    }
128
129    /// Returns whether the adapter has fail-stopped.
130    #[must_use]
131    pub fn is_halted(&self) -> bool {
132        self.halted.load(Ordering::Acquire)
133    }
134
135    /// Returns the encoder allow-list this adapter consults.
136    #[must_use]
137    pub fn registry(&self) -> &EncoderRegistry {
138        &self.registry
139    }
140
141    /// Returns the wrapped writer's current durable high-watermark.
142    #[must_use]
143    pub fn high_watermark(&self) -> u64 {
144        self.writer.high_watermark()
145    }
146
147    /// Captures a state-affecting bus message.
148    ///
149    /// Looks up the encoder for `T`, builds an [`EntryDraft`], and forwards it to the
150    /// writer. Returns `Ok(false)` when the type has no registered encoder so the adapter
151    /// can be wired into bus dispatch paths that carry a mix of state-affecting and
152    /// non-state-affecting messages without surfacing per-message errors.
153    ///
154    /// `topic` is the bus topic the message was dispatched on, `headers` are the
155    /// dispatch-time correlation headers (defaulting to [`Headers::empty`] until header
156    /// propagation lands across all message types), and `ts_init` is the domain
157    /// timestamp from `AtomicTime` (typically the message's own `ts_init` field).
158    ///
159    /// # Errors
160    ///
161    /// Returns:
162    ///
163    /// - [`CaptureError::Halted`] when a prior capture already observed a writer failure
164    ///   and the adapter has fail-stopped.
165    /// - [`CaptureError::Encode`] when the registered encoder rejects the message.
166    /// - [`CaptureError::Submit`] when the writer rejects the submit; the adapter halt
167    ///   callback fires before this error returns.
168    pub fn capture<T: 'static>(
169        &self,
170        topic: Topic,
171        message: &T,
172        headers: Headers,
173        ts_init: UnixNanos,
174    ) -> Result<bool, CaptureError> {
175        self.capture_any(topic, message as &dyn std::any::Any, headers, ts_init)
176    }
177
178    /// Type-erased counterpart to [`Self::capture`].
179    ///
180    /// Bus dispatch hands messages to the tap as `&dyn Any` because the static type is
181    /// not in scope at the registration site. This method dispatches on the concrete
182    /// type behind the trait object and follows the same fail-stop semantics as
183    /// [`Self::capture`].
184    ///
185    /// # Errors
186    ///
187    /// See [`Self::capture`].
188    pub fn capture_any(
189        &self,
190        topic: Topic,
191        message: &dyn std::any::Any,
192        headers: Headers,
193        ts_init: UnixNanos,
194    ) -> Result<bool, CaptureError> {
195        if self.halted.load(Ordering::Acquire) {
196            return Err(CaptureError::Halted);
197        }
198
199        let Some((payload_type, encoded)) = self.registry.encode_any(message)? else {
200            return Ok(false);
201        };
202
203        let draft = EntryDraft {
204            headers,
205            topic,
206            payload_type,
207            payload: encoded.payload,
208            ts_init,
209            index_keys: encoded.index_keys,
210        };
211
212        match self.writer.submit(draft) {
213            Ok(()) => {
214                if let Some(submit_counter) = self.submit_counter.as_ref() {
215                    submit_counter.fetch_add(1, Ordering::AcqRel);
216                }
217                Ok(true)
218            }
219            Err(e) => {
220                self.fail_stop(&e);
221                Err(CaptureError::Submit(e))
222            }
223        }
224    }
225
226    fn fail_stop(&self, err: &SubmitError) {
227        if self
228            .halted
229            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
230            .is_ok()
231        {
232            (self.halt)(halt_reason_from_submit(err));
233        }
234    }
235}
236
237/// Maps a [`SubmitError`] onto the [`HaltReason`] the adapter signals to its kernel.
238///
239/// [`SubmitError::HaltSignaled`] preserves the writer-side stall measurement so the
240/// kernel sees the same backpressure context the writer's own halt callback would carry.
241/// [`SubmitError::Closed`] surfaces as a backend error since the writer is no longer
242/// accepting work and the cause is opaque to the adapter (could be external close,
243/// crashed writer thread, or a terminal disk error already reported separately).
244fn halt_reason_from_submit(err: &SubmitError) -> HaltReason {
245    match err {
246        SubmitError::HaltSignaled {
247            stalled_for,
248            threshold,
249        } => HaltReason::BackpressureStall {
250            stalled_for: *stalled_for,
251            threshold: *threshold,
252        },
253        SubmitError::Closed => HaltReason::BackendError("event store writer closed".to_string()),
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use std::{
260        sync::{
261            Arc, Mutex,
262            atomic::{AtomicU64, Ordering},
263        },
264        time::Duration,
265    };
266
267    use bytes::Bytes;
268    use indexmap::IndexMap;
269    use nautilus_core::{UnixNanos, time::get_atomic_clock_static};
270    use rstest::{fixture, rstest};
271    use ustr::Ustr;
272
273    use super::*;
274    use crate::{
275        backend::{AppendEntry, EventStore, IndexKey, IndexKind, MemoryBackend, ScanDirection},
276        capture::encoder::EncodedPayload,
277        entry::EventStoreEntry,
278        error::EventStoreError,
279        manifest::{RegisteredComponents, RunManifest, RunStatus},
280        writer::WriterConfig,
281    };
282
283    #[derive(Debug)]
284    struct StubCommand {
285        client_order_id: String,
286    }
287
288    #[derive(Debug)]
289    struct StubEvent {
290        client_order_id: String,
291        venue_order_id: String,
292    }
293
294    #[derive(Debug)]
295    struct UnknownMessage;
296
297    #[derive(Debug)]
298    struct FailingMessage;
299
300    fn manifest(run_id: &str) -> RunManifest {
301        RunManifest {
302            run_id: run_id.to_string(),
303            parent_run_id: None,
304            instance_id: "trader-001".to_string(),
305            binary_hash: "deadbeef".to_string(),
306            schema_version: 1,
307            crate_versions: "feedface".to_string(),
308            feature_flags: Vec::new(),
309            adapter_versions: IndexMap::new(),
310            config_hash: "cafebabe".to_string(),
311            registered_components: RegisteredComponents::default(),
312            seed: None,
313            start_ts_init: UnixNanos::from(0),
314            end_ts_init: None,
315            high_watermark: 0,
316            status: RunStatus::Running,
317        }
318    }
319
320    fn stub_registry() -> Arc<EncoderRegistry> {
321        let mut registry = EncoderRegistry::new();
322        registry.register::<StubCommand, _>(Ustr::from("StubCommand"), |c| {
323            Ok(EncodedPayload::new(
324                Bytes::copy_from_slice(c.client_order_id.as_bytes()),
325                vec![IndexKey::new(
326                    IndexKind::ClientOrderId,
327                    c.client_order_id.clone(),
328                )],
329            ))
330        });
331        registry.register::<StubEvent, _>(Ustr::from("StubEvent"), |e| {
332            Ok(EncodedPayload::new(
333                Bytes::copy_from_slice(e.client_order_id.as_bytes()),
334                vec![
335                    IndexKey::new(IndexKind::ClientOrderId, e.client_order_id.clone()),
336                    IndexKey::new(IndexKind::VenueOrderId, e.venue_order_id.clone()),
337                ],
338            ))
339        });
340        registry.register::<FailingMessage, _>(Ustr::from("FailingMessage"), |_| {
341            Err(EncodeError::Serialize(
342                "encoder rejected message".to_string(),
343            ))
344        });
345        Arc::new(registry)
346    }
347
348    #[fixture]
349    fn captured_halt() -> (HaltCallback, Arc<Mutex<Vec<HaltReason>>>) {
350        let captured: Arc<Mutex<Vec<HaltReason>>> = Arc::new(Mutex::new(Vec::new()));
351        let captured_for_cb = Arc::clone(&captured);
352        let halt: HaltCallback = Arc::new(move |reason| {
353            captured_for_cb
354                .lock()
355                .expect("captured halt poisoned")
356                .push(reason);
357        });
358        (halt, captured)
359    }
360
361    fn writer_with_open_run(
362        run_id: &str,
363        halt: HaltCallback,
364    ) -> (Arc<EventStoreWriter>, Arc<Mutex<MemoryBackend>>) {
365        let backend_arc: Arc<Mutex<MemoryBackend>> = Arc::new(Mutex::new(MemoryBackend::new()));
366        backend_arc
367            .lock()
368            .expect("inner")
369            .open_run(manifest(run_id))
370            .expect("open run");
371
372        let wrapper = SharedMemory(Arc::clone(&backend_arc));
373        let writer = EventStoreWriter::spawn(
374            Box::new(wrapper),
375            get_atomic_clock_static(),
376            halt,
377            WriterConfig::default(),
378        )
379        .expect("spawn");
380        (Arc::new(writer), backend_arc)
381    }
382
383    /// Wraps a shared `MemoryBackend` so the writer thread can append while the test
384    /// reads the same instance from the engine thread.
385    #[derive(Debug)]
386    struct SharedMemory(Arc<Mutex<MemoryBackend>>);
387
388    impl EventStore for SharedMemory {
389        fn open_run(&mut self, _: RunManifest) -> Result<(), EventStoreError> {
390            unreachable!("test wrapper does not forward open_run")
391        }
392
393        fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
394            self.0.lock().expect("shared").append_batch(entries)
395        }
396
397        fn scan_range(
398            &self,
399            from: u64,
400            to: u64,
401            direction: ScanDirection,
402        ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
403            self.0
404                .lock()
405                .expect("shared")
406                .scan_range(from, to, direction)
407        }
408
409        fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
410            self.0.lock().expect("shared").scan_seq(seq)
411        }
412
413        fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
414            self.0.lock().expect("shared").lookup(kind, key)
415        }
416
417        fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
418            self.0.lock().expect("shared").iter_index_keys(kind)
419        }
420
421        fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
422            self.0.lock().expect("shared").seal(status)
423        }
424
425        fn manifest(&self) -> Result<RunManifest, EventStoreError> {
426            self.0.lock().expect("shared").manifest()
427        }
428
429        fn high_watermark(&self) -> Result<u64, EventStoreError> {
430            self.0.lock().expect("shared").high_watermark()
431        }
432    }
433
434    fn drain(writer: &Arc<EventStoreWriter>, target_hwm: u64) {
435        let mut waited = Duration::ZERO;
436        let deadline = Duration::from_secs(2);
437        while writer.high_watermark() < target_hwm && waited < deadline {
438            std::thread::sleep(Duration::from_millis(5));
439            waited += Duration::from_millis(5);
440        }
441        assert!(
442            writer.high_watermark() >= target_hwm,
443            "writer high_watermark {} did not reach {target_hwm} within {:?}",
444            writer.high_watermark(),
445            deadline,
446        );
447    }
448
449    #[rstest]
450    fn capture_records_registered_command_and_returns_true(
451        captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
452    ) {
453        let (halt, captured) = captured_halt;
454        let (writer, backend) = writer_with_open_run("run-cmd", Arc::clone(&halt));
455        let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
456
457        let cmd = StubCommand {
458            client_order_id: "O-1".to_string(),
459        };
460        let captured_flag = adapter
461            .capture::<StubCommand>(
462                Topic::from("exec.command.SubmitOrder"),
463                &cmd,
464                Headers::empty(),
465                UnixNanos::from(100),
466            )
467            .expect("capture");
468
469        assert!(captured_flag);
470        drain(&writer, 1);
471
472        let backend = backend.lock().expect("backend");
473        let entry = backend.scan_seq(1).expect("scan").expect("present");
474        assert_eq!(entry.payload_type.as_str(), "StubCommand");
475        assert_eq!(entry.topic.as_ref(), "exec.command.SubmitOrder");
476        assert_eq!(entry.payload.as_ref(), b"O-1");
477
478        let seq = backend
479            .lookup(IndexKind::ClientOrderId, "O-1")
480            .expect("lookup")
481            .expect("indexed");
482        assert_eq!(seq, 1);
483
484        assert!(captured.lock().expect("captured").is_empty());
485        assert!(!adapter.is_halted());
486    }
487
488    #[rstest]
489    fn capture_returns_false_for_unknown_type(
490        captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
491    ) {
492        let (halt, _captured) = captured_halt;
493        let (writer, _backend) = writer_with_open_run("run-unknown", Arc::clone(&halt));
494        let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
495
496        let captured_flag = adapter
497            .capture::<UnknownMessage>(
498                Topic::from("data.market.unknown"),
499                &UnknownMessage,
500                Headers::empty(),
501                UnixNanos::from(50),
502            )
503            .expect("capture");
504
505        assert!(!captured_flag);
506        assert_eq!(writer.high_watermark(), 0);
507        assert!(!adapter.is_halted());
508    }
509
510    #[rstest]
511    fn submit_counter_increments_on_each_captured_entry(
512        captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
513    ) {
514        let (halt, _captured) = captured_halt;
515        let (writer, _backend) = writer_with_open_run("run-submit-counter", Arc::clone(&halt));
516        let submit_counter = Arc::new(AtomicU64::new(1));
517        let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt)
518            .with_submit_counter(Arc::clone(&submit_counter));
519
520        adapter
521            .capture::<StubCommand>(
522                Topic::from("exec.command.SubmitOrder"),
523                &StubCommand {
524                    client_order_id: "O-counter-1".to_string(),
525                },
526                Headers::empty(),
527                UnixNanos::from(100),
528            )
529            .expect("first capture");
530        adapter
531            .capture::<UnknownMessage>(
532                Topic::from("data.market.unknown"),
533                &UnknownMessage,
534                Headers::empty(),
535                UnixNanos::from(101),
536            )
537            .expect("unknown type");
538        adapter
539            .capture::<StubEvent>(
540                Topic::from("exec.event.OrderFilled"),
541                &StubEvent {
542                    client_order_id: "O-counter-1".to_string(),
543                    venue_order_id: "V-counter-1".to_string(),
544                },
545                Headers::empty(),
546                UnixNanos::from(102),
547            )
548            .expect("second capture");
549
550        assert_eq!(submit_counter.load(Ordering::Acquire), 3);
551    }
552
553    #[rstest]
554    fn capture_records_event_indices_atomically(
555        captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
556    ) {
557        let (halt, _captured) = captured_halt;
558        let (writer, backend) = writer_with_open_run("run-event", Arc::clone(&halt));
559        let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
560
561        let event = StubEvent {
562            client_order_id: "O-2".to_string(),
563            venue_order_id: "V-9".to_string(),
564        };
565        adapter
566            .capture::<StubEvent>(
567                Topic::from("exec.event.OrderFilled"),
568                &event,
569                Headers::empty(),
570                UnixNanos::from(200),
571            )
572            .expect("capture");
573        drain(&writer, 1);
574
575        let backend = backend.lock().expect("backend");
576        let by_client = backend
577            .lookup(IndexKind::ClientOrderId, "O-2")
578            .expect("lookup")
579            .expect("indexed");
580        let by_venue = backend
581            .lookup(IndexKind::VenueOrderId, "V-9")
582            .expect("lookup")
583            .expect("indexed");
584        assert_eq!(by_client, 1);
585        assert_eq!(by_venue, 1);
586    }
587
588    #[rstest]
589    fn capture_propagates_encoder_error_without_halting(
590        captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
591    ) {
592        // An encoder failure is the encoder's contract violation, not a writer fail-stop:
593        // the caller should see CaptureError::Encode but the adapter must stay live so a
594        // subsequent capture for an allow-listed type still goes through.
595        let (halt, captured) = captured_halt;
596        let (writer, backend) = writer_with_open_run("run-encode-err", Arc::clone(&halt));
597        let adapter = BusCaptureAdapter::new(Arc::clone(&writer), stub_registry(), halt);
598
599        let err = adapter
600            .capture::<FailingMessage>(
601                Topic::from("exec.command.Failing"),
602                &FailingMessage,
603                Headers::empty(),
604                UnixNanos::from(500),
605            )
606            .expect_err("encoder must reject");
607
608        match err {
609            CaptureError::Encode(EncodeError::Serialize(msg)) => {
610                assert!(msg.contains("rejected"), "msg was: {msg}");
611            }
612            other => panic!("expected Encode(Serialize), was {other:?}"),
613        }
614        assert!(
615            !adapter.is_halted(),
616            "encoder failure must not fail-stop the adapter",
617        );
618        assert!(captured.lock().expect("captured").is_empty());
619
620        // Subsequent capture for a registered type still works.
621        adapter
622            .capture::<StubCommand>(
623                Topic::from("exec.command.SubmitOrder"),
624                &StubCommand {
625                    client_order_id: "O-after-encode-err".to_string(),
626                },
627                Headers::empty(),
628                UnixNanos::from(501),
629            )
630            .expect("capture after encoder error");
631        drain(&writer, 1);
632        let backend = backend.lock().expect("backend");
633        assert_eq!(backend.high_watermark().expect("hwm"), 1);
634    }
635
636    #[rstest]
637    #[case::backpressure(
638        SubmitError::HaltSignaled {
639            stalled_for: Duration::from_millis(750),
640            threshold: Duration::from_millis(250),
641        },
642        HaltReason::BackpressureStall {
643            stalled_for: Duration::from_millis(750),
644            threshold: Duration::from_millis(250),
645        },
646    )]
647    #[case::closed(
648        SubmitError::Closed,
649        HaltReason::BackendError("event store writer closed".to_string()),
650    )]
651    fn halt_reason_from_submit_preserves_failure_context(
652        #[case] err: SubmitError,
653        #[case] expected: HaltReason,
654    ) {
655        let actual = halt_reason_from_submit(&err);
656
657        match (actual, expected) {
658            (
659                HaltReason::BackpressureStall {
660                    stalled_for: a_s,
661                    threshold: a_t,
662                },
663                HaltReason::BackpressureStall {
664                    stalled_for: e_s,
665                    threshold: e_t,
666                },
667            ) => {
668                assert_eq!(a_s, e_s);
669                assert_eq!(a_t, e_t);
670            }
671            (HaltReason::BackendError(a), HaltReason::BackendError(e)) => {
672                assert_eq!(a, e);
673            }
674            (actual, expected) => {
675                panic!("variant mismatch: actual={actual:?} expected={expected:?}")
676            }
677        }
678    }
679
680    #[rstest]
681    fn submit_failure_halts_adapter_and_fires_callback_once(
682        captured_halt: (HaltCallback, Arc<Mutex<Vec<HaltReason>>>),
683    ) {
684        // A halted writer surfaces SubmitError::Closed; the adapter must mirror that
685        // into a single halt-callback firing and then short-circuit subsequent captures
686        // without forwarding further submits.
687        let (halt, captured) = captured_halt;
688        let (writer, _backend) = writer_with_open_run("run-halt", Arc::clone(&halt));
689
690        // Close the writer behind the adapter's back so the next submit returns Closed.
691        let writer_clone = Arc::clone(&writer);
692        // Build the adapter before the close so it owns a strong ref the close path
693        // doesn't see.
694        let adapter = BusCaptureAdapter::new(writer_clone, stub_registry(), halt);
695
696        // Drop one of the outer Arc clones, then force a graceful close on the writer
697        // by unwrapping. We can't unwrap because the adapter holds a clone, so emulate a
698        // closed writer with a separate test that simulates the failure path through
699        // a stub. We use a stub writer adapter instead to keep this test deterministic.
700        drop(writer);
701
702        // Build a fresh adapter wired to a stub that always returns SubmitError::Closed
703        // so we exercise the halt path without depending on writer-internal lifecycle.
704        let halt_for_stub: HaltCallback = adapter_halt_for(&captured);
705        let stub_adapter = StubFailAdapter::new(halt_for_stub);
706
707        let err = stub_adapter
708            .capture::<StubCommand>(
709                Topic::from("exec.command.SubmitOrder"),
710                &StubCommand {
711                    client_order_id: "O-fail".to_string(),
712                },
713                Headers::empty(),
714                UnixNanos::from(1),
715            )
716            .expect_err("first submit fails");
717        assert!(matches!(err, CaptureError::Submit(SubmitError::Closed)));
718        assert!(stub_adapter.is_halted());
719        assert_eq!(captured.lock().expect("captured").len(), 1);
720
721        let err2 = stub_adapter
722            .capture::<StubCommand>(
723                Topic::from("exec.command.SubmitOrder"),
724                &StubCommand {
725                    client_order_id: "O-fail-2".to_string(),
726                },
727                Headers::empty(),
728                UnixNanos::from(2),
729            )
730            .expect_err("second submit short-circuits");
731        assert!(matches!(err2, CaptureError::Halted));
732        assert_eq!(
733            captured.lock().expect("captured").len(),
734            1,
735            "halt callback must not refire after the first failure",
736        );
737
738        // Drop the adapter so its writer Arc is released.
739        drop(adapter);
740    }
741
742    fn adapter_halt_for(captured: &Arc<Mutex<Vec<HaltReason>>>) -> HaltCallback {
743        let captured_for_cb = Arc::clone(captured);
744        Arc::new(move |reason| {
745            captured_for_cb
746                .lock()
747                .expect("captured halt poisoned")
748                .push(reason);
749        })
750    }
751
752    /// Stand-in for [`BusCaptureAdapter`] that mirrors its halt-state machine but
753    /// always sees [`SubmitError::Closed`] from a synthetic writer. Lets the halt-path
754    /// test stay deterministic without racing against a real writer's shutdown sequence.
755    struct StubFailAdapter {
756        registry: Arc<EncoderRegistry>,
757        halt: HaltCallback,
758        halted: AtomicBool,
759    }
760
761    impl StubFailAdapter {
762        fn new(halt: HaltCallback) -> Self {
763            Self {
764                registry: stub_registry(),
765                halt,
766                halted: AtomicBool::new(false),
767            }
768        }
769
770        fn is_halted(&self) -> bool {
771            self.halted.load(Ordering::Acquire)
772        }
773
774        fn capture<T: 'static>(
775            &self,
776            _topic: Topic,
777            message: &T,
778            _headers: Headers,
779            _ts_init: UnixNanos,
780        ) -> Result<bool, CaptureError> {
781            if self.halted.load(Ordering::Acquire) {
782                return Err(CaptureError::Halted);
783            }
784            let Some((_pt, _encoded)) = self.registry.encode(message)? else {
785                return Ok(false);
786            };
787            let err = SubmitError::Closed;
788
789            if self
790                .halted
791                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
792                .is_ok()
793            {
794                (self.halt)(super::halt_reason_from_submit(&err));
795            }
796            Err(CaptureError::Submit(err))
797        }
798    }
799}