use super::Reservation;
use crate::{
authenticated::{
dialing::Dialable,
lookup::actors::{peer, tracker::Metadata},
mailbox::UnboundedMailbox,
Mailbox,
},
types::Address,
AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
};
use commonware_cryptography::PublicKey;
use commonware_utils::{
channel::{fallible::FallibleExt, mpsc, oneshot},
ordered::Map,
};
use std::net::IpAddr;
#[derive(Debug)]
pub enum Message<C: PublicKey> {
Register {
index: u64,
peers: AddressableTrackedPeers<C>,
},
Overwrite { peers: Map<C, Address> },
PeerSet {
index: u64,
responder: oneshot::Sender<Option<TrackedPeers<C>>>,
},
Subscribe {
responder: oneshot::Sender<PeerSetSubscription<C>>,
},
Block { public_key: C },
Connect {
public_key: C,
peer: Mailbox<peer::Message>,
},
Dialable {
responder: oneshot::Sender<Dialable<C>>,
},
Dial {
public_key: C,
reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
},
Acceptable {
public_key: C,
source_ip: IpAddr,
responder: oneshot::Sender<bool>,
},
Listen {
public_key: C,
reservation: oneshot::Sender<Option<Reservation<C>>>,
},
Release {
metadata: Metadata<C>,
},
}
impl<C: PublicKey> UnboundedMailbox<Message<C>> {
pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
self.0.send_lossy(Message::Connect { public_key, peer });
}
pub async fn dialable(&mut self) -> Dialable<C> {
self.0
.request_or_default(|responder| Message::Dialable { responder })
.await
}
pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
self.0
.request(|reservation| Message::Dial {
public_key,
reservation,
})
.await
.flatten()
}
pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
self.0
.request_or(
|responder| Message::Acceptable {
public_key,
source_ip,
responder,
},
false,
)
.await
}
pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
self.0
.request(|reservation| Message::Listen {
public_key,
reservation,
})
.await
.flatten()
}
}
#[derive(Clone, Debug)]
pub struct Releaser<C: PublicKey> {
sender: UnboundedMailbox<Message<C>>,
}
impl<C: PublicKey> Releaser<C> {
pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
Self { sender }
}
pub fn release(&mut self, metadata: Metadata<C>) {
self.sender.0.send_lossy(Message::Release { metadata });
}
}
#[derive(Debug, Clone)]
pub struct Oracle<C: PublicKey> {
sender: UnboundedMailbox<Message<C>>,
}
impl<C: PublicKey> Oracle<C> {
pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
Self { sender }
}
}
impl<C: PublicKey> crate::Provider for Oracle<C> {
type PublicKey = C;
async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
self.sender
.0
.request(|responder| Message::PeerSet {
index: id,
responder,
})
.await
.flatten()
}
async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
self.sender
.0
.request(|responder| Message::Subscribe { responder })
.await
.unwrap_or_else(|| {
let (_, rx) = mpsc::unbounded_channel();
rx
})
}
}
impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
async fn track<R>(&mut self, index: u64, peers: R)
where
R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
{
self.sender.0.send_lossy(Message::Register {
index,
peers: peers.into(),
});
}
async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
self.sender.0.send_lossy(Message::Overwrite { peers });
}
}
impl<C: PublicKey> crate::Blocker for Oracle<C> {
type PublicKey = C;
async fn block(&mut self, public_key: Self::PublicKey) {
self.sender.0.send_lossy(Message::Block { public_key });
}
}