use super::error::EngineError;
use crate::{
book::{
BookEvent, BookEventEnvelope,
protocol::{
command::{Command, CommandKind},
reject::RejectReason,
response::Response,
},
},
config::{Config, ThreadPinning},
disruptor::{Envelope, ResponseCallback, RingSlot},
engine::{
journaler::JournalHandler,
root::EngineRoot,
state::{EngineState, Market, MarketConfig},
},
error::RuntimeError,
types::{MarketId, MarketKind},
};
use std::{
collections::VecDeque,
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
use disrupt_rs::{
BusySpin, EventHandler, EventPoller, GatedSequence, MultiConsumerDependentsBarrier, Producer,
SingleProducerBarrier, build_single_producer,
builder::{MC, NC, SC, single::SPBuilder},
wait_strategies::WaitStrategy,
};
use tracing::{error, info, trace, warn};
pub type EngineEvent = RingSlot<BookEventEnvelope>;
type BuilderPollerResult<B, W> = (
EventPoller<EngineEvent, MultiConsumerDependentsBarrier, <W as WaitStrategy>::Notifier>,
W,
EngineBuilder<SPBuilder<B, EngineEvent, W, MultiConsumerDependentsBarrier>, W>,
);
pub struct EngineBuilder<B, W = BusySpin>
where
W: WaitStrategy,
{
config: Config,
root: EngineRoot,
recovered_events: Vec<(EngineEvent, bool)>, builder: B,
durability_gate: GatedSequence<W::Notifier>,
wal_poller: EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
journaler: JournalHandler,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
}
impl EngineBuilder<SPBuilder<NC, EngineEvent, BusySpin, SingleProducerBarrier>> {
pub fn with_markets(
config: Config,
markets: impl IntoIterator<Item = MarketConfig>,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<
EngineBuilder<SPBuilder<NC, EngineEvent, BusySpin, MultiConsumerDependentsBarrier>>,
> {
let mut state = EngineState::new(config.clone());
for market_config in markets {
let market_id = market_config.market_id();
state.markets.insert(
market_id,
Market::from_config_with_cfg(&config, market_config),
);
}
let root = EngineRoot::new_with(state, 0);
Self::build_inner(Some(root), config, wal_path)
}
pub fn new(
config: Config,
market_ids: impl IntoIterator<Item = MarketId>,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<
EngineBuilder<SPBuilder<NC, EngineEvent, BusySpin, MultiConsumerDependentsBarrier>>,
> {
let markets = market_ids
.into_iter()
.map(|mid| MarketConfig::multi_runner_dynamic(mid, MarketKind::InPlayCapable));
Self::with_markets(config, markets, wal_path)
}
pub fn from_root(
initial_root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<
EngineBuilder<SPBuilder<NC, EngineEvent, BusySpin, MultiConsumerDependentsBarrier>>,
> {
Self::build_inner(Some(initial_root), config, wal_path)
}
fn build_inner(
initial_root: Option<EngineRoot>,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<
EngineBuilder<SPBuilder<NC, EngineEvent, BusySpin, MultiConsumerDependentsBarrier>>,
> {
let wal_path = wal_path.as_ref();
info!(wal_path = ?wal_path, "opening engine wal");
crate::btx_metrics_init!(
"engine",
"wal_poller",
"projection_apply",
"projection_live",
"projection_snapshot",
"projection_disk",
);
let metrics = crate::metrics::globals::metrics();
let (mut root, after_seq) = match initial_root {
Some(r) => {
let seq = r.version();
(r, Some(seq))
}
None => (EngineRoot::new(EngineState::new(config.clone())), None),
};
let journaler = JournalHandler::open(wal_path, "walw", config.journal.clone())?;
let mut recovered_events = Vec::new();
journaler.recover_from::<BookEventEnvelope, _>(after_seq, |env, eob| {
let seq: u64 = env.seq.into();
let tx_id: u64 = env.tx_id.into();
root.observe_tx_id(tx_id);
let payload: BookEventEnvelope = rkyv::api::high::deserialize::<
BookEventEnvelope,
crate::disruptor::traits::RkyvError,
>(&env.payload)?;
let envelope = Envelope {
seq,
payload,
response_cb: None,
tx_id,
tx_len: env.tx_len.into(),
tx_ix: env.tx_ix.into(),
};
let expected = root.version().saturating_add(1);
assert_eq!(
expected, seq,
"sequence gap: expected {}, got {}",
expected, seq
);
root.apply_single(&envelope.payload);
recovered_events.push((RingSlot::new(envelope), eob));
Ok(())
})?;
info!(
recovered_events = recovered_events.len(),
last_seq = root.version(),
"recovered wal events"
);
let pinning = config.pinning.clone().unwrap_or_default();
let wait_strategy = BusySpin;
let (wal_poller, builder, durability_gate) =
build_disruptor_pipeline(&config, wait_strategy);
Ok(EngineBuilder {
config,
root,
recovered_events,
builder,
durability_gate,
wal_poller,
journaler,
shutdown: Arc::new(AtomicBool::new(false)),
metrics,
pinning,
wait_strategy,
})
}
}
impl<W> EngineBuilder<SPBuilder<NC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn with_wait_strategy<W2>(
self,
wait_strategy: W2,
) -> EngineBuilder<SPBuilder<NC, EngineEvent, W2, MultiConsumerDependentsBarrier>, W2>
where
W2: WaitStrategy + Clone + Send + 'static,
{
let (wal_poller, builder, durability_gate) =
build_disruptor_pipeline(&self.config, wait_strategy.clone());
EngineBuilder {
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
builder,
durability_gate,
wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy,
}
}
pub fn event_poller(self) -> BuilderPollerResult<SC, W> {
let (poller, builder) = self.builder.event_poller();
let wait_strategy = self.wait_strategy;
let builder = EngineBuilder {
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
builder,
durability_gate: self.durability_gate,
wal_poller: self.wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy,
};
let wait_strategy = builder.wait_strategy.clone();
(poller, wait_strategy, builder)
}
pub fn register_handler<H>(
self,
handler: H,
core_settings: Option<(usize, &'static str)>,
) -> EngineBuilder<SPBuilder<SC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
H: EventHandler<EngineEvent> + Send + 'static,
{
EngineBuilder {
builder: configure_handler_builder(self.builder, core_settings)
.handle_events_with(handler),
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
durability_gate: self.durability_gate,
wal_poller: self.wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy: self.wait_strategy,
}
}
pub fn register_handler_with_recovery<H>(
self,
mut handler: H,
core_settings: Option<(usize, &'static str)>,
) -> EngineBuilder<SPBuilder<SC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
H: EventHandler<EngineEvent> + Send + 'static,
{
replay(&mut handler, &self.recovered_events);
EngineBuilder {
builder: configure_handler_builder(self.builder, core_settings)
.handle_events_with(handler),
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
durability_gate: self.durability_gate,
wal_poller: self.wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy: self.wait_strategy,
}
}
}
impl<W> EngineBuilder<SPBuilder<SC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn event_poller(self) -> BuilderPollerResult<MC, W> {
let (poller, builder) = self.builder.event_poller();
let wait_strategy = self.wait_strategy;
let builder = EngineBuilder {
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
builder,
durability_gate: self.durability_gate,
wal_poller: self.wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy,
};
let wait_strategy = builder.wait_strategy.clone();
(poller, wait_strategy, builder)
}
pub fn register_handler<H>(
self,
handler: H,
core_settings: Option<(usize, &'static str)>,
) -> EngineBuilder<SPBuilder<MC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
H: EventHandler<EngineEvent> + Send + 'static,
{
EngineBuilder {
builder: configure_handler_builder(self.builder, core_settings)
.handle_events_with(handler),
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
durability_gate: self.durability_gate,
wal_poller: self.wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy: self.wait_strategy,
}
}
pub fn register_handler_with_recovery<H>(
self,
mut handler: H,
core_settings: Option<(usize, &'static str)>,
) -> EngineBuilder<SPBuilder<MC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
H: EventHandler<EngineEvent> + Send + 'static,
{
replay(&mut handler, &self.recovered_events);
EngineBuilder {
builder: configure_handler_builder(self.builder, core_settings)
.handle_events_with(handler),
config: self.config,
root: self.root,
recovered_events: self.recovered_events,
durability_gate: self.durability_gate,
wal_poller: self.wal_poller,
journaler: self.journaler,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy: self.wait_strategy,
}
}
pub fn build(self) -> anyhow::Result<Engine> {
let producer = self.builder.build();
build_engine(
producer,
EngineBuildCtx {
root: self.root,
wal_poller: self.wal_poller,
journaler: self.journaler,
durability_gate: self.durability_gate,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy: self.wait_strategy,
},
)
}
}
impl<W> EngineBuilder<SPBuilder<MC, EngineEvent, W, MultiConsumerDependentsBarrier>, W>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn event_poller(self) -> BuilderPollerResult<MC, W> {
let (poller, builder) = self.builder.event_poller();
let wait_strategy = self.wait_strategy;
let builder = EngineBuilder {
builder,
wait_strategy,
..self
};
let wait_strategy = builder.wait_strategy.clone();
(poller, wait_strategy, builder)
}
pub fn register_handler<H>(
self,
handler: H,
core_settings: Option<(usize, &'static str)>,
) -> Self
where
H: EventHandler<EngineEvent> + Send + 'static,
{
Self {
builder: configure_handler_builder(self.builder, core_settings)
.handle_events_with(handler),
..self
}
}
pub fn register_handler_with_recovery<H>(
self,
mut handler: H,
core_settings: Option<(usize, &'static str)>,
) -> Self
where
H: EventHandler<EngineEvent> + Send + 'static,
{
replay(&mut handler, &self.recovered_events);
Self {
builder: configure_handler_builder(self.builder, core_settings)
.handle_events_with(handler),
..self
}
}
pub fn build(self) -> anyhow::Result<Engine> {
let producer = self.builder.build();
build_engine(
producer,
EngineBuildCtx {
root: self.root,
wal_poller: self.wal_poller,
journaler: self.journaler,
durability_gate: self.durability_gate,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy: self.wait_strategy,
},
)
}
}
fn replay<H: EventHandler<EngineEvent>>(handler: &mut H, events: &[(EngineEvent, bool)]) {
for (e, eob) in events {
handler.on_event(e, 0, *eob);
}
}
#[allow(clippy::type_complexity)]
fn build_disruptor_pipeline<W>(
config: &Config,
wait_strategy: W,
) -> (
EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
SPBuilder<NC, EngineEvent, W, MultiConsumerDependentsBarrier>,
GatedSequence<W::Notifier>,
)
where
W: WaitStrategy,
{
let (wal_poller, builder) = build_single_producer(
config.ring_size_pow2,
|| {
RingSlot::new(Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
market_seq: 0,
timestamp: crate::types::unix_epoch(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let (builder, gates) = builder.new_gates::<1>();
let durability_gate = gates[0].clone();
let builder = builder.and_then_with_dependents(&gates);
(wal_poller, builder, durability_gate)
}
fn configure_handler_builder<B, W>(builder: B, core_settings: Option<(usize, &'static str)>) -> B
where
B: disrupt_rs::ProcessorSettings<EngineEvent, W>,
W: WaitStrategy,
{
match core_settings {
Some((id, name)) => builder.thread_name(name).pin_at_core(id),
None => builder,
}
}
struct EngineBuildCtx<W: WaitStrategy> {
root: EngineRoot,
wal_poller: EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
journaler: JournalHandler,
durability_gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
}
fn build_engine<B, W>(
producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
ctx: EngineBuildCtx<W>,
) -> anyhow::Result<Engine>
where
B: disrupt_rs::Barrier + 'static,
W: WaitStrategy + Clone + Send + 'static,
{
let EngineBuildCtx {
root,
wal_poller,
journaler,
durability_gate,
shutdown,
metrics,
pinning,
wait_strategy,
} = ctx;
let wal_thread = journaler
.into_poller(
wal_poller,
wait_strategy.clone(),
durability_gate,
Arc::clone(&shutdown),
)
.poll_pinned(pinning.wal_poller_core);
let (cmd_tx, cmd_rx) =
flume::bounded::<EngineMsg>(crate::config::DEFAULT_ENGINE_CHANNEL_CAPACITY);
let handler = EngineHandler { tx: cmd_tx };
let engine_thread = {
let shutdown = Arc::clone(&shutdown);
let core_id = pinning.engine_core;
thread::spawn(move || {
if let Some(id) = core_id
&& !core_affinity::set_for_current(core_affinity::CoreId { id })
{
warn!(
core_id = id,
"failed to pin engine thread to requested core"
);
}
crate::metrics_stage!("engine");
run_engine_loop(root, producer, cmd_rx, shutdown);
})
};
info!("engine threads started");
Ok(Engine {
handler,
metrics,
shutdown,
engine_thread: Some(engine_thread),
wal_thread: Some(wal_thread),
})
}
pub struct Engine {
handler: EngineHandler,
metrics: crate::metrics::Metrics,
shutdown: Arc<AtomicBool>,
engine_thread: Option<thread::JoinHandle<()>>,
wal_thread: Option<thread::JoinHandle<anyhow::Result<()>>>,
}
impl Engine {
pub fn handle(&self) -> EngineHandler {
self.handler.clone()
}
pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
self.metrics.snapshot()
}
pub fn metrics(&self) -> crate::metrics::Metrics {
self.metrics.clone()
}
pub fn stop(&mut self) -> anyhow::Result<()> {
self.shutdown.store(true, Ordering::Release);
let Some(engine_thread) = self.engine_thread.take() else {
if let Some(wal_thread) = self.wal_thread.take()
&& wal_thread.join().is_err()
{
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
return Ok(());
};
if engine_thread.join().is_err() {
error!("engine thread panicked");
}
if let Some(wal_thread) = self.wal_thread.take() {
match wal_thread.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!(error = ?e, "wal thread returned error");
return Err(e);
}
Err(_) => {
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
}
}
Ok(())
}
}
fn run_engine_loop<B, W>(
mut root: EngineRoot,
mut producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
cmd_rx: flume::Receiver<EngineMsg>,
shutdown: Arc<AtomicBool>,
) where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
let mut close_rr_cursor: Option<MarketId> = None;
let mut internal_queue: VecDeque<EngineMsg> = VecDeque::new();
while !shutdown.load(Ordering::Acquire) {
crate::metric!({ inflight: 0 });
crate::metric!({ queue_len: cmd_rx.len().saturating_add(internal_queue.len()) as i64 });
let msg = if let Some(m) = internal_queue.pop_front() {
m
} else {
match cmd_rx.recv_timeout(Duration::from_millis(
crate::config::DEFAULT_ENGINE_POLL_TIMEOUT_MS,
)) {
Ok(m) => m,
Err(flume::RecvTimeoutError::Timeout) => {
let Some(market_id) = pick_next_closing_market(&root, &mut close_rr_cursor)
else {
continue;
};
EngineMsg::Command {
cmd: continue_close_command(market_id),
resp_tx: None,
}
}
Err(flume::RecvTimeoutError::Disconnected) => break,
}
};
crate::metric!({ inflight: 1, in_total: 1 });
crate::metric!({ queue_len: cmd_rx.len().saturating_add(internal_queue.len()) as i64 });
let t0 = Instant::now();
let EngineMsg::Command { cmd, resp_tx } = msg;
let had_response_channel = resp_tx.is_some();
let was_internal_continue = matches!(&cmd.kind, CommandKind::ContinueCloseMarket);
trace!(
market_id = ?cmd.market_id,
correlation_id = ?cmd.correlation_id,
command = ?cmd.kind,
"engine command"
);
let start_seq = root.version().saturating_add(1);
if let Err(err) = root.handle(cmd) {
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 0,
inflight: 0
});
if let Some(tx) = resp_tx
&& tx
.send(Ok(Response::Rejected {
reason: err.into_reason(),
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending rejection");
}
if !was_internal_continue
&& let Some(market_id) = pick_next_closing_market(&root, &mut close_rr_cursor)
{
internal_queue.push_back(EngineMsg::Command {
cmd: continue_close_command(market_id),
resp_tx: None,
});
}
continue;
}
let events = root.take_changes();
let events_len = events.len();
debug_assert!(
events_len > 0,
"accepted commands must emit at least one event"
);
let tx_id = root.take_tx_id();
let tx_len = match u16::try_from(events_len) {
Ok(len) => len,
Err(_) => {
error!(
events_len,
"tx_len overflow: too many events for single transaction"
);
if let Some(tx) = resp_tx.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
continue;
}
};
root.apply(&events);
let resp_tx_for_error = resp_tx.clone();
let callback = resp_tx.map(|tx| ResponseCallback { tx });
if events_len == 1 {
let Some(payload) = events.into_iter().next() else {
error!("event batch empty despite events_len == 1");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
break;
};
producer.publish(move |slot| {
slot.seq = start_seq;
slot.payload = payload;
slot.response_cb = callback;
slot.tx_id = tx_id;
slot.tx_len = 1;
slot.tx_ix = 0;
});
} else {
let mut payloads = events.into_iter();
let mut publish_error = false;
producer.batch_publish(events_len, move |iter| {
let publish_error = &mut publish_error;
let mut last_slot = None;
for (ix, slot) in iter.enumerate() {
if *publish_error {
break;
}
let Some(payload) = payloads.next() else {
*publish_error = true;
break;
};
slot.seq = start_seq.saturating_add(ix as u64);
slot.payload = payload;
slot.response_cb = None;
slot.tx_id = tx_id;
slot.tx_len = tx_len;
slot.tx_ix = ix as u16;
last_slot = Some(slot);
}
if let Some(slot) = last_slot {
slot.response_cb = callback;
}
});
if publish_error {
error!("batch_publish iterator longer than events");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
break;
}
}
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: events_len as u64,
inflight: 0
});
if (!was_internal_continue || had_response_channel)
&& let Some(market_id) = pick_next_closing_market(&root, &mut close_rr_cursor)
{
internal_queue.push_back(EngineMsg::Command {
cmd: continue_close_command(market_id),
resp_tx: None,
});
}
}
}
fn continue_close_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
market_id,
kind: CommandKind::ContinueCloseMarket,
}
}
impl Drop for Engine {
fn drop(&mut self) {
if let Err(err) = self.stop() {
error!(error = ?err, "failed to stop engine cleanly during drop");
}
}
}
fn pick_next_closing_market(root: &EngineRoot, cursor: &mut Option<MarketId>) -> Option<MarketId> {
let mut closing: Vec<MarketId> = root
.state()
.markets
.iter()
.filter_map(|(&mid, m)| {
(m.book.market_state() == crate::book::BookMarketState::Closing
&& m.book.close_process_state().is_some())
.then_some(mid)
})
.collect();
if closing.is_empty() {
*cursor = None;
return None;
}
closing.sort_unstable_by_key(|m| m.0);
let next = match *cursor {
Some(cur) => closing
.iter()
.copied()
.find(|mid| mid.0 > cur.0)
.unwrap_or(closing[0]),
None => closing[0],
};
*cursor = Some(next);
Some(next)
}
#[derive(Clone)]
pub struct EngineHandler {
pub(crate) tx: flume::Sender<EngineMsg>,
}
pub(crate) enum EngineMsg {
Command {
cmd: Command,
resp_tx:
Option<flume::Sender<Result<crate::book::protocol::response::Response, EngineError>>>,
},
}
impl EngineHandler {
pub fn submit(&self, cmd: Command) -> Result<(), EngineError> {
self.tx
.send(EngineMsg::Command { cmd, resp_tx: None })
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(())
}
pub fn submit_with_response(
&self,
cmd: Command,
) -> Result<
flume::Receiver<Result<crate::book::protocol::response::Response, EngineError>>,
EngineError,
> {
let (resp_tx, resp_rx) = flume::bounded(1);
self.tx
.send(EngineMsg::Command {
cmd,
resp_tx: Some(resp_tx),
})
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(resp_rx)
}
}