commonware-p2p 2026.4.0

Communicate with authenticated peers over encrypted connections.
Documentation
use super::Reservation;
use crate::{
    authenticated::{
        dialing::Dialable,
        lookup::actors::{peer, tracker::Metadata},
        mailbox::UnboundedMailbox,
        Mailbox,
    },
    types::Address,
    AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
};
use commonware_cryptography::PublicKey;
use commonware_utils::{
    channel::{fallible::FallibleExt, mpsc, oneshot},
    ordered::Map,
};
use std::net::IpAddr;

/// Messages that can be sent to the tracker actor.
#[derive(Debug)]
pub enum Message<C: PublicKey> {
    // ---------- Used by oracle ----------
    /// Register a peer set at a given index.
    Register {
        index: u64,
        peers: AddressableTrackedPeers<C>,
    },

    /// Update addresses for multiple peers without creating a new peer set.
    Overwrite { peers: Map<C, Address> },

    // ---------- Used by peer set provider ----------
    /// Fetch primary and secondary peers for a given ID.
    PeerSet {
        /// The index of the peer set to fetch.
        index: u64,
        /// One-shot channel to send the tracked peers.
        responder: oneshot::Sender<Option<TrackedPeers<C>>>,
    },
    /// Subscribe to notifications when new peer sets are added.
    Subscribe {
        /// One-shot channel to send the subscription receiver.
        responder: oneshot::Sender<PeerSetSubscription<C>>,
    },

    // ---------- Used by blocker ----------
    /// Block a peer, disconnecting them if currently connected and preventing future connections
    /// for as long as the peer remains in at least one active peer set.
    Block { public_key: C },

    // ---------- Used by peer ----------
    /// Notify the tracker that a peer has been successfully connected.
    Connect {
        /// The public key of the peer.
        public_key: C,

        /// The mailbox of the peer actor.
        peer: Mailbox<peer::Message>,
    },

    // ---------- Used by dialer ----------
    /// Request a list of dialable peers.
    Dialable {
        /// One-shot channel to send the dialable peers and next query deadline.
        responder: oneshot::Sender<Dialable<C>>,
    },

    /// Request a reservation for a particular peer to dial.
    ///
    /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
    /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
    /// blocked or already has an active reservation).
    Dial {
        /// The public key of the peer to reserve.
        public_key: C,

        /// Sender to respond with the reservation and ingress address.
        reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
    },

    // ---------- Used by listener ----------
    /// Check if a peer is acceptable (can accept an incoming connection from them).
    Acceptable {
        /// The public key of the peer to check.
        public_key: C,

        /// The IP address the peer connected from.
        source_ip: IpAddr,

        /// The sender to respond with whether the peer is acceptable.
        responder: oneshot::Sender<bool>,
    },

    /// Request a reservation for a particular peer.
    ///
    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
    /// has an active reservation).
    Listen {
        /// The public key of the peer to reserve.
        public_key: C,

        /// The sender to respond with the reservation.
        reservation: oneshot::Sender<Option<Reservation<C>>>,
    },

    // ---------- Used by reservation ----------
    /// Release a reservation.
    Release {
        /// The metadata of the reservation to release.
        metadata: Metadata<C>,
    },
}

impl<C: PublicKey> UnboundedMailbox<Message<C>> {
    /// Send a `Connect` message to the tracker.
    pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
        self.0.send_lossy(Message::Connect { public_key, peer });
    }

    /// Request dialable peers from the tracker.
    ///
    /// Returns an empty response if the tracker is shut down.
    pub async fn dialable(&mut self) -> Dialable<C> {
        self.0
            .request_or_default(|responder| Message::Dialable { responder })
            .await
    }

    /// Send a `Dial` message to the tracker.
    ///
    /// Returns `None` if the tracker is shut down.
    pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
        self.0
            .request(|reservation| Message::Dial {
                public_key,
                reservation,
            })
            .await
            .flatten()
    }

    /// Send an `Acceptable` message to the tracker.
    ///
    /// Returns `false` if the tracker is shut down.
    pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
        self.0
            .request_or(
                |responder| Message::Acceptable {
                    public_key,
                    source_ip,
                    responder,
                },
                false,
            )
            .await
    }

    /// Send a `Listen` message to the tracker.
    ///
    /// Returns `None` if the tracker is shut down.
    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
        self.0
            .request(|reservation| Message::Listen {
                public_key,
                reservation,
            })
            .await
            .flatten()
    }
}

/// Allows releasing reservations
#[derive(Clone, Debug)]
pub struct Releaser<C: PublicKey> {
    sender: UnboundedMailbox<Message<C>>,
}

impl<C: PublicKey> Releaser<C> {
    /// Create a new releaser.
    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
        Self { sender }
    }

    /// Release a reservation.
    pub fn release(&mut self, metadata: Metadata<C>) {
        self.sender.0.send_lossy(Message::Release { metadata });
    }
}

/// Mechanism to register authorized peers.
///
/// Peers that are not explicitly authorized
/// will be blocked by commonware-p2p.
#[derive(Debug, Clone)]
pub struct Oracle<C: PublicKey> {
    sender: UnboundedMailbox<Message<C>>,
}

impl<C: PublicKey> Oracle<C> {
    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
        Self { sender }
    }
}

impl<C: PublicKey> crate::Provider for Oracle<C> {
    type PublicKey = C;

    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
        self.sender
            .0
            .request(|responder| Message::PeerSet {
                index: id,
                responder,
            })
            .await
            .flatten()
    }

    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
        self.sender
            .0
            .request(|responder| Message::Subscribe { responder })
            .await
            .unwrap_or_else(|| {
                let (_, rx) = mpsc::unbounded_channel();
                rx
            })
    }
}

impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
    async fn track<R>(&mut self, index: u64, peers: R)
    where
        R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
    {
        self.sender.0.send_lossy(Message::Register {
            index,
            peers: peers.into(),
        });
    }

    async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
        self.sender.0.send_lossy(Message::Overwrite { peers });
    }
}

impl<C: PublicKey> crate::Blocker for Oracle<C> {
    type PublicKey = C;

    async fn block(&mut self, public_key: Self::PublicKey) {
        self.sender.0.send_lossy(Message::Block { public_key });
    }
}