commonware-collector 2026.5.0

Collect responses to committable requests.
Documentation
use super::{
    ingress::{Mailbox, Message},
    Config,
};
use crate::p2p::{Handler, Monitor};
use commonware_actor::mailbox;
use commonware_codec::Codec;
use commonware_cryptography::{Committable, Digestible, PublicKey};
use commonware_macros::select_loop;
use commonware_p2p::{utils::codec::wrap, Blocker, Receiver, Recipients, Sender};
use commonware_runtime::{
    spawn_cell,
    telemetry::metrics::{Counter, Gauge, GaugeExt, MetricsExt as _},
    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
};
use commonware_utils::{channel::oneshot, futures::Pool};
use std::collections::{HashMap, HashSet};
use tracing::{debug, error};

/// Engine that will disperse messages and collect responses.
pub struct Engine<E, B, Rq, Rs, P, M, H>
where
    E: BufferPooler + Clock + Spawner,
    P: PublicKey,
    B: Blocker<PublicKey = P>,
    Rq: Committable + Digestible + Codec,
    Rs: Committable<Commitment = Rq::Commitment> + Digestible<Digest = Rq::Digest> + Codec,
    M: Monitor<Response = Rs, PublicKey = P>,
    H: Handler<Request = Rq, Response = Rs, PublicKey = P>,
{
    // Configuration
    context: ContextCell<E>,
    blocker: B,
    priority_request: bool,
    request_codec: Rq::Cfg,
    priority_response: bool,
    response_codec: Rs::Cfg,

    // Message passing
    monitor: M,
    handler: H,
    mailbox: mailbox::Receiver<Message<P, Rq>>,

    // State
    tracked: HashMap<Rq::Commitment, (HashSet<P>, HashSet<P>)>,

    // Metrics
    outstanding: Gauge,
    requests: Counter,
    responses: Counter,
}

impl<E, B, Rq, Rs, P, M, H> Engine<E, B, Rq, Rs, P, M, H>
where
    E: BufferPooler + Clock + Spawner + Metrics,
    P: PublicKey,
    B: Blocker<PublicKey = P>,
    Rq: Committable + Digestible + Codec,
    Rs: Committable<Commitment = Rq::Commitment> + Digestible<Digest = Rq::Digest> + Codec,
    M: Monitor<Response = Rs, PublicKey = P>,
    H: Handler<Request = Rq, Response = Rs, PublicKey = P>,
{
    /// Creates a new engine with the given configuration.
    ///
    /// Returns a tuple of the engine and the mailbox for sending messages.
    pub fn new(context: E, cfg: Config<B, M, H, Rq::Cfg, Rs::Cfg>) -> (Self, Mailbox<P, Rq>) {
        // Create mailbox
        let (tx, rx) = mailbox::new(context.child("mailbox"), cfg.mailbox_size);
        let mailbox: Mailbox<P, Rq> = Mailbox::new(tx);

        // Create metrics
        let outstanding = context.gauge("outstanding", "outstanding commitments");
        let requests = context.counter("requests", "processed requests");
        let responses = context.counter("responses", "sent responses");

        (
            Self {
                context: ContextCell::new(context),
                blocker: cfg.blocker,
                priority_request: cfg.priority_request,
                request_codec: cfg.request_codec,
                priority_response: cfg.priority_response,
                response_codec: cfg.response_codec,
                monitor: cfg.monitor,
                handler: cfg.handler,
                mailbox: rx,
                tracked: HashMap::new(),
                outstanding,
                requests,
                responses,
            },
            mailbox,
        )
    }

    /// Starts the engine with the given network channels.
    ///
    /// Returns a handle that can be used to wait for the engine to complete.
    pub fn start(
        mut self,
        requests: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
        responses: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
    ) -> Handle<()> {
        spawn_cell!(self.context, self.run(requests, responses))
    }

    async fn run(
        mut self,
        requests: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
        responses: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
    ) {
        // Wrap channels
        let (mut req_tx, mut req_rx) = wrap(
            self.request_codec,
            self.context.network_buffer_pool().clone(),
            requests.0,
            requests.1,
        );
        let (mut res_tx, mut res_rx) = wrap(
            self.response_codec,
            self.context.network_buffer_pool().clone(),
            responses.0,
            responses.1,
        );

        // Create futures pool
        let mut processed: Pool<Result<(P, Rs), oneshot::error::RecvError>> = Pool::default();
        select_loop! {
            self.context,
            on_stopped => {
                debug!("context shutdown, stopping engine");
            },
            // Command from the mailbox
            Some(command) = self.mailbox.recv() else continue => match command {
                Message::Send {
                    request,
                    recipients,
                } => {
                    let commitment = request.commitment();
                    let recipients = req_tx.send(recipients, request, self.priority_request);
                    if !recipients.is_empty() {
                        let entry = self.tracked.entry(commitment).or_insert_with(|| {
                            self.outstanding.inc();
                            (HashSet::new(), HashSet::new())
                        });
                        entry.0.extend(recipients);
                    }
                }
                Message::Cancel { commitment } => {
                    if self.tracked.remove(&commitment).is_none() {
                        debug!(?commitment, "ignoring removal of unknown commitment");
                    }
                    let _ = self.outstanding.try_set(self.tracked.len());
                }
            },

            // Response from a handler
            Ok((peer, reply)) = processed.next_completed() else continue => {
                self.responses.inc();

                // Send the response
                let _ = res_tx.send(Recipients::One(peer), reply, self.priority_response);
            },

            // Request from an originator
            message = req_rx.recv() => {
                self.requests.inc();

                // Error handling
                let (peer, msg) = match message {
                    Ok(r) => r,
                    Err(err) => {
                        error!(?err, "request receiver failed");
                        break;
                    }
                };
                let msg = match msg {
                    Ok(msg) => msg,
                    Err(err) => {
                        commonware_p2p::block!(self.blocker, peer, ?err, "invalid request");
                        continue;
                    }
                };

                // Handle the request
                let (tx, rx) = oneshot::channel();
                self.handler.process(peer.clone(), msg, tx);
                processed.push(async move { Ok((peer, rx.await?)) });
            },

            // Response from a handler
            response = res_rx.recv() => {
                // Error handling
                let (peer, msg) = match response {
                    Ok(r) => r,
                    Err(err) => {
                        error!(?err, "response receiver failed");
                        break;
                    }
                };
                let msg = match msg {
                    Ok(msg) => msg,
                    Err(err) => {
                        commonware_p2p::block!(self.blocker, peer, ?err, "invalid response");
                        continue;
                    }
                };

                // Handle the response
                let commitment = msg.commitment();
                let Some(responses) = self.tracked.get_mut(&commitment) else {
                    debug!(?commitment, ?peer, "response for unknown commitment");
                    continue;
                };
                if !responses.0.contains(&peer) {
                    debug!(?commitment, ?peer, "never sent request");
                    continue;
                }
                if !responses.1.insert(peer.clone()) {
                    debug!(?commitment, ?peer, "duplicate response");
                    continue;
                }

                // Send the response to the monitor
                self.monitor.collected(peer, msg, responses.1.len());
            },
        }
    }
}