use super::{
ingress::{Mailbox, Message},
Config,
};
use crate::{
p2p::{Handler, Monitor},
Error,
};
use commonware_codec::Codec;
use commonware_cryptography::{Committable, Digestible, PublicKey};
use commonware_macros::select_loop;
use commonware_p2p::{utils::codec::wrap, Blocker, Receiver, Recipients, Sender};
use commonware_runtime::{
spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle,
Metrics, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, mpsc, oneshot},
futures::Pool,
};
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
use std::collections::{HashMap, HashSet};
use tracing::{debug, error};
pub struct Engine<E, B, Rq, Rs, P, M, H>
where
E: BufferPooler + Clock + Spawner,
P: PublicKey,
B: Blocker<PublicKey = P>,
Rq: Committable + Digestible + Codec,
Rs: Committable<Commitment = Rq::Commitment> + Digestible<Digest = Rq::Digest> + Codec,
M: Monitor<Response = Rs, PublicKey = P>,
H: Handler<Request = Rq, Response = Rs, PublicKey = P>,
{
context: ContextCell<E>,
blocker: B,
priority_request: bool,
request_codec: Rq::Cfg,
priority_response: bool,
response_codec: Rs::Cfg,
monitor: M,
handler: H,
mailbox: mpsc::Receiver<Message<P, Rq>>,
tracked: HashMap<Rq::Commitment, (HashSet<P>, HashSet<P>)>,
outstanding: Gauge,
requests: Counter,
responses: Counter,
}
impl<E, B, Rq, Rs, P, M, H> Engine<E, B, Rq, Rs, P, M, H>
where
E: BufferPooler + Clock + Spawner + Metrics,
P: PublicKey,
B: Blocker<PublicKey = P>,
Rq: Committable + Digestible + Codec,
Rs: Committable<Commitment = Rq::Commitment> + Digestible<Digest = Rq::Digest> + Codec,
M: Monitor<Response = Rs, PublicKey = P>,
H: Handler<Request = Rq, Response = Rs, PublicKey = P>,
{
pub fn new(context: E, cfg: Config<B, M, H, Rq::Cfg, Rs::Cfg>) -> (Self, Mailbox<P, Rq>) {
let (tx, rx) = mpsc::channel(cfg.mailbox_size);
let mailbox: Mailbox<P, Rq> = Mailbox::new(tx);
let outstanding = Gauge::default();
let requests = Counter::default();
let responses = Counter::default();
context.register(
"outstanding",
"outstanding commitments",
outstanding.clone(),
);
context.register("requests", "processed requests", requests.clone());
context.register("responses", "sent responses", responses.clone());
(
Self {
context: ContextCell::new(context),
blocker: cfg.blocker,
priority_request: cfg.priority_request,
request_codec: cfg.request_codec,
priority_response: cfg.priority_response,
response_codec: cfg.response_codec,
monitor: cfg.monitor,
handler: cfg.handler,
mailbox: rx,
tracked: HashMap::new(),
outstanding,
requests,
responses,
},
mailbox,
)
}
pub fn start(
mut self,
requests: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
responses: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
) -> Handle<()> {
spawn_cell!(self.context, self.run(requests, responses).await)
}
async fn run(
mut self,
requests: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
responses: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
) {
let (mut req_tx, mut req_rx) = wrap(
self.request_codec,
self.context.network_buffer_pool().clone(),
requests.0,
requests.1,
);
let (mut res_tx, mut res_rx) = wrap(
self.response_codec,
self.context.network_buffer_pool().clone(),
responses.0,
responses.1,
);
let mut processed: Pool<Result<(P, Rs), oneshot::error::RecvError>> = Pool::default();
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping engine");
},
Some(command) = self.mailbox.recv() else continue => {
match command {
Message::Send {
request,
recipients,
responder,
} => {
let commitment = request.commitment();
let entry = self.tracked.entry(commitment).or_insert_with(|| {
self.outstanding.inc();
(HashSet::new(), HashSet::new())
});
match req_tx
.send(recipients, request, self.priority_request)
.await
{
Ok(recipients) => {
entry.0.extend(recipients.iter().cloned());
responder.send_lossy(Ok(recipients));
}
Err(err) => {
error!(?err, ?commitment, "failed to send message");
responder.send_lossy(Err(Error::SendFailed(err.into())));
}
}
}
Message::Cancel { commitment } => {
if self.tracked.remove(&commitment).is_none() {
debug!(?commitment, "ignoring removal of unknown commitment");
}
let _ = self.outstanding.try_set(self.tracked.len());
}
}
},
Ok((peer, reply)) = processed.next_completed() else continue => {
self.responses.inc();
let _ = res_tx
.send(Recipients::One(peer), reply, self.priority_response)
.await;
},
message = req_rx.recv() => {
self.requests.inc();
let (peer, msg) = match message {
Ok(r) => r,
Err(err) => {
error!(?err, "request receiver failed");
break;
}
};
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
commonware_p2p::block!(self.blocker, peer, ?err, "invalid request");
continue;
}
};
let (tx, rx) = oneshot::channel();
self.handler.process(peer.clone(), msg, tx).await;
processed.push(async move { Ok((peer, rx.await?)) });
},
response = res_rx.recv() => {
let (peer, msg) = match response {
Ok(r) => r,
Err(err) => {
error!(?err, "response receiver failed");
break;
}
};
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
commonware_p2p::block!(self.blocker, peer, ?err, "invalid response");
continue;
}
};
let commitment = msg.commitment();
let Some(responses) = self.tracked.get_mut(&commitment) else {
debug!(?commitment, ?peer, "response for unknown commitment");
continue;
};
if !responses.0.contains(&peer) {
debug!(?commitment, ?peer, "never sent request");
continue;
}
if !responses.1.insert(peer.clone()) {
debug!(?commitment, ?peer, "duplicate response");
continue;
}
self.monitor.collected(peer, msg, responses.1.len()).await;
},
}
}
}