use super::{Error, Receiver, Sender};
use crate::{
authenticated::UnboundedMailbox, Address, AddressableTrackedPeers, Channel,
PeerSetSubscription, TrackedPeers,
};
use commonware_cryptography::PublicKey;
use commonware_runtime::{Clock, Quota};
use commonware_utils::{
channel::{fallible::FallibleExt, mpsc, oneshot, ring},
ordered::Map,
};
use rand_distr::Normal;
use std::time::Duration;
pub enum Message<P: PublicKey, E: Clock> {
Register {
channel: Channel,
public_key: P,
quota: Quota,
#[allow(clippy::type_complexity)]
result: oneshot::Sender<Result<(Sender<P, E>, Receiver<P>), Error>>,
},
Track {
id: u64,
peers: TrackedPeers<P>,
},
PeerSet {
id: u64,
response: oneshot::Sender<Option<TrackedPeers<P>>>,
},
Subscribe {
response: oneshot::Sender<PeerSetSubscription<P>>,
},
SubscribeConnected {
response: oneshot::Sender<ring::Receiver<Vec<P>>>,
},
LimitBandwidth {
public_key: P,
egress_cap: Option<usize>,
ingress_cap: Option<usize>,
result: oneshot::Sender<()>,
},
AddLink {
sender: P,
receiver: P,
sampler: Normal<f64>,
success_rate: f64,
result: oneshot::Sender<Result<(), Error>>,
},
RemoveLink {
sender: P,
receiver: P,
result: oneshot::Sender<Result<(), Error>>,
},
Block {
from: P,
to: P,
},
Blocked {
result: oneshot::Sender<Result<Vec<(P, P)>, Error>>,
},
}
impl<P: PublicKey, E: Clock> std::fmt::Debug for Message<P, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Register { .. } => f.debug_struct("Register").finish_non_exhaustive(),
Self::Track { id, .. } => f
.debug_struct("Track")
.field("id", id)
.finish_non_exhaustive(),
Self::PeerSet { id, .. } => f
.debug_struct("PeerSet")
.field("id", id)
.finish_non_exhaustive(),
Self::Subscribe { .. } => f.debug_struct("Subscribe").finish_non_exhaustive(),
Self::SubscribeConnected { .. } => {
f.debug_struct("SubscribeConnected").finish_non_exhaustive()
}
Self::LimitBandwidth { .. } => f.debug_struct("LimitBandwidth").finish_non_exhaustive(),
Self::AddLink { .. } => f.debug_struct("AddLink").finish_non_exhaustive(),
Self::RemoveLink { .. } => f.debug_struct("RemoveLink").finish_non_exhaustive(),
Self::Block { from, to } => f
.debug_struct("Block")
.field("from", from)
.field("to", to)
.finish(),
Self::Blocked { .. } => f.debug_struct("Blocked").finish_non_exhaustive(),
}
}
}
#[derive(Clone)]
pub struct Link {
pub latency: Duration,
pub jitter: Duration,
pub success_rate: f64,
}
#[derive(Debug)]
pub struct Oracle<P: PublicKey, E: Clock> {
sender: UnboundedMailbox<Message<P, E>>,
}
impl<P: PublicKey, E: Clock> Clone for Oracle<P, E> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<P: PublicKey, E: Clock> Oracle<P, E> {
pub(crate) const fn new(sender: UnboundedMailbox<Message<P, E>>) -> Self {
Self { sender }
}
pub fn control(&self, me: P) -> Control<P, E> {
Control {
me,
sender: self.sender.clone(),
}
}
pub fn manager(&self) -> Manager<P, E> {
Manager {
oracle: self.clone(),
}
}
pub fn socket_manager(&self) -> SocketManager<P, E> {
SocketManager {
oracle: self.clone(),
}
}
pub async fn blocked(&self) -> Result<Vec<(P, P)>, Error> {
self.sender
.0
.request(|result| Message::Blocked { result })
.await
.ok_or(Error::NetworkClosed)?
}
pub async fn limit_bandwidth(
&self,
public_key: P,
egress_cap: Option<usize>,
ingress_cap: Option<usize>,
) -> Result<(), Error> {
self.sender
.0
.request(|result| Message::LimitBandwidth {
public_key,
egress_cap,
ingress_cap,
result,
})
.await
.ok_or(Error::NetworkClosed)
}
pub async fn add_link(&self, sender: P, receiver: P, config: Link) -> Result<(), Error> {
if sender == receiver {
return Err(Error::LinkingSelf);
}
if config.success_rate < 0.0 || config.success_rate > 1.0 {
return Err(Error::InvalidSuccessRate(config.success_rate));
}
let latency_ms = config.latency.as_secs_f64() * 1000.0;
let jitter_ms = config.jitter.as_secs_f64() * 1000.0;
let sampler = Normal::new(latency_ms, jitter_ms).unwrap();
self.sender
.0
.request(|result| Message::AddLink {
sender,
receiver,
sampler,
success_rate: config.success_rate,
result,
})
.await
.ok_or(Error::NetworkClosed)?
}
pub async fn remove_link(&self, sender: P, receiver: P) -> Result<(), Error> {
if sender == receiver {
return Err(Error::LinkingSelf);
}
self.sender
.0
.request(|result| Message::RemoveLink {
sender,
receiver,
result,
})
.await
.ok_or(Error::NetworkClosed)?
}
fn track(&self, id: u64, peers: TrackedPeers<P>) {
self.sender.0.send_lossy(Message::Track { id, peers });
}
async fn peer_set(&self, id: u64) -> Option<TrackedPeers<P>> {
self.sender
.0
.request(|response| Message::PeerSet { id, response })
.await
.flatten()
}
async fn subscribe(&self) -> PeerSetSubscription<P> {
self.sender
.0
.request(|response| Message::Subscribe { response })
.await
.unwrap_or_else(|| {
let (_, rx) = mpsc::unbounded_channel();
rx
})
}
}
pub struct Manager<P: PublicKey, E: Clock> {
oracle: Oracle<P, E>,
}
impl<P: PublicKey, E: Clock> std::fmt::Debug for Manager<P, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Manager").finish_non_exhaustive()
}
}
impl<P: PublicKey, E: Clock> Clone for Manager<P, E> {
fn clone(&self) -> Self {
Self {
oracle: self.oracle.clone(),
}
}
}
impl<P: PublicKey, E: Clock> crate::Provider for Manager<P, E> {
type PublicKey = P;
async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
self.oracle.peer_set(id).await
}
async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
self.oracle.subscribe().await
}
}
impl<P: PublicKey, E: Clock> crate::Manager for Manager<P, E> {
async fn track<R>(&mut self, id: u64, peers: R)
where
R: Into<TrackedPeers<Self::PublicKey>> + Send,
{
self.oracle.track(id, peers.into());
}
}
pub struct SocketManager<P: PublicKey, E: Clock> {
oracle: Oracle<P, E>,
}
impl<P: PublicKey, E: Clock> std::fmt::Debug for SocketManager<P, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SocketManager").finish_non_exhaustive()
}
}
impl<P: PublicKey, E: Clock> Clone for SocketManager<P, E> {
fn clone(&self) -> Self {
Self {
oracle: self.oracle.clone(),
}
}
}
impl<P: PublicKey, E: Clock> crate::Provider for SocketManager<P, E> {
type PublicKey = P;
async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
self.oracle.peer_set(id).await
}
async fn subscribe(&mut self) -> PeerSetSubscription<P> {
self.oracle.subscribe().await
}
}
impl<P: PublicKey, E: Clock> crate::AddressableManager for SocketManager<P, E> {
async fn track<R>(&mut self, id: u64, peers: R)
where
R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
{
let peers = peers.into();
self.oracle.track(
id,
TrackedPeers::new(peers.primary.into_keys(), peers.secondary.into_keys()),
);
}
async fn overwrite(&mut self, _peers: Map<Self::PublicKey, Address>) {
}
}
#[derive(Debug)]
pub struct Control<P: PublicKey, E: Clock> {
me: P,
sender: UnboundedMailbox<Message<P, E>>,
}
impl<P: PublicKey, E: Clock> Clone for Control<P, E> {
fn clone(&self) -> Self {
Self {
me: self.me.clone(),
sender: self.sender.clone(),
}
}
}
impl<P: PublicKey, E: Clock> Control<P, E> {
pub async fn register(
&self,
channel: Channel,
quota: Quota,
) -> Result<(Sender<P, E>, Receiver<P>), Error> {
let public_key = self.me.clone();
self.sender
.0
.request(|result| Message::Register {
channel,
public_key,
quota,
result,
})
.await
.ok_or(Error::NetworkClosed)?
}
}
impl<P: PublicKey, E: Clock> crate::Blocker for Control<P, E> {
type PublicKey = P;
async fn block(&mut self, public_key: P) {
self.sender.0.send_lossy(Message::Block {
from: self.me.clone(),
to: public_key,
});
}
}