use super::Reservation;
use crate::{
authenticated::{
dialing::Dialable,
lookup::actors::{peer, tracker::Metadata},
},
types::Address,
AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
};
use commonware_actor::{
mailbox::{self, Policy},
Feedback,
};
use commonware_cryptography::PublicKey;
use commonware_utils::{
channel::{mpsc, oneshot},
ordered::Map,
};
use std::{collections::VecDeque, net::IpAddr};
#[derive(Debug)]
pub enum Message<C: PublicKey> {
Register {
index: u64,
peers: AddressableTrackedPeers<C>,
},
Overwrite { peers: Map<C, Address> },
PeerSet {
index: u64,
responder: oneshot::Sender<Option<TrackedPeers<C>>>,
},
Subscribe {
responder: oneshot::Sender<PeerSetSubscription<C>>,
},
Block { public_key: C },
Connect {
public_key: C,
peer: peer::Mailbox,
},
Dialable {
responder: oneshot::Sender<Dialable<C>>,
},
Dial {
public_key: C,
reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
},
Acceptable {
public_key: C,
source_ip: IpAddr,
responder: oneshot::Sender<bool>,
},
Listen {
public_key: C,
source_ip: IpAddr,
reservation: oneshot::Sender<Option<Reservation<C>>>,
},
Release {
metadata: Metadata<C>,
},
}
impl<C: PublicKey> Policy for Message<C> {
type Overflow = VecDeque<Self>;
fn handle(overflow: &mut Self::Overflow, message: Self) {
overflow.push_back(message);
}
}
#[derive(Clone, Debug)]
pub struct Mailbox<C: PublicKey>(mailbox::Sender<Message<C>>);
impl<C: PublicKey> Mailbox<C> {
pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
Self(sender)
}
pub(crate) fn connect(&self, public_key: C, peer: peer::Mailbox) -> Feedback {
self.0.enqueue(Message::Connect { public_key, peer })
}
pub(crate) async fn dialable(&self) -> Dialable<C> {
let (responder, receiver) = oneshot::channel();
let _ = self.0.enqueue(Message::Dialable { responder });
receiver.await.unwrap_or_default()
}
pub(crate) async fn dial(&self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
let (reservation, receiver) = oneshot::channel();
let _ = self.0.enqueue(Message::Dial {
public_key,
reservation,
});
receiver.await.ok().flatten()
}
pub(crate) async fn acceptable(&self, public_key: C, source_ip: IpAddr) -> bool {
let (responder, receiver) = oneshot::channel();
let _ = self.0.enqueue(Message::Acceptable {
public_key,
source_ip,
responder,
});
receiver.await.unwrap_or(false)
}
pub(crate) async fn listen(&self, public_key: C, source_ip: IpAddr) -> Option<Reservation<C>> {
let (reservation, receiver) = oneshot::channel();
let _ = self.0.enqueue(Message::Listen {
public_key,
source_ip,
reservation,
});
receiver.await.ok().flatten()
}
}
#[derive(Clone, Debug)]
pub struct Releaser<C: PublicKey> {
sender: mailbox::Sender<Message<C>>,
}
impl<C: PublicKey> Releaser<C> {
pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
Self { sender }
}
pub fn release(&mut self, metadata: Metadata<C>) -> Feedback {
self.sender.enqueue(Message::Release { metadata })
}
}
#[derive(Debug, Clone)]
pub struct Oracle<C: PublicKey> {
sender: mailbox::Sender<Message<C>>,
}
impl<C: PublicKey> Oracle<C> {
pub(super) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
Self { sender }
}
}
impl<C: PublicKey> crate::Provider for Oracle<C> {
type PublicKey = C;
async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
let (responder, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::PeerSet {
index: id,
responder,
});
receiver.await.ok().flatten()
}
async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
let (responder, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Subscribe { responder });
receiver.await.unwrap_or_else(|_| {
let (_, rx) = mpsc::unbounded_channel();
rx
})
}
}
impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
fn track<R>(&mut self, index: u64, peers: R) -> Feedback
where
R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
{
self.sender.enqueue(Message::Register {
index,
peers: peers.into(),
})
}
fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) -> Feedback {
self.sender.enqueue(Message::Overwrite { peers })
}
}
impl<C: PublicKey> crate::Blocker for Oracle<C> {
type PublicKey = C;
fn block(&mut self, public_key: Self::PublicKey) -> Feedback {
self.sender.enqueue(Message::Block { public_key })
}
}