use std::{future::Future, time::Duration};
pub use bytes::{Bytes, BytesMut};
use crate::message::*;
mod buffered;
pub mod adversary;
pub mod simple;
pub mod stats;
pub mod trace;
#[cfg(feature = "mux")]
pub mod mux;
#[cfg(feature = "setup")]
pub use buffered::BufferedError;
pub use buffered::BufferedMsgRelay;
pub use simple::SimpleMessageRelay;
#[derive(Debug, Copy, Clone)]
pub struct MessageSendError;
pub trait Sender: Send + Sync {
fn feed(
&self,
message: Bytes,
) -> impl Future<Output = Result<(), MessageSendError>> + Send;
fn send(
&self,
message: Bytes,
) -> impl Future<Output = Result<(), MessageSendError>> + Send {
async move {
self.feed(message).await?;
self.flush().await
}
}
fn flush(
&self,
) -> impl Future<Output = Result<(), MessageSendError>> + Send {
async { Ok(()) }
}
}
pub trait SplitSender {
fn split_sender(&self) -> impl Sender + 'static;
}
pub trait Relay: Send + Sync {
fn send(
&self,
message: Bytes,
) -> impl Future<Output = Result<(), MessageSendError>> + Send {
async move {
self.feed(message).await?;
self.flush().await
}
}
fn feed(
&self,
message: Bytes,
) -> impl Future<Output = Result<(), MessageSendError>> + Send;
fn flush(
&self,
) -> impl Future<Output = Result<(), MessageSendError>> + Send {
async { Ok(()) }
}
fn ask(
&self,
id: &MsgId,
ttl: Duration,
) -> impl Future<Output = Result<(), MessageSendError>> + Send {
self.feed(allocate_message(id, ttl, 0, &[]))
}
fn next(&mut self) -> impl Future<Output = Option<BytesMut>> + Send;
}
pub trait InjectMessage {
fn inject_message(&self, msg: Bytes);
}
impl<R: Relay> Relay for &mut R {
fn ask(
&self,
id: &MsgId,
ttl: Duration,
) -> impl Future<Output = Result<(), MessageSendError>> {
(**self).ask(id, ttl)
}
fn feed(
&self,
message: Bytes,
) -> impl Future<Output = Result<(), MessageSendError>> {
(**self).feed(message)
}
fn flush(&self) -> impl Future<Output = Result<(), MessageSendError>> {
(**self).flush()
}
fn send(
&self,
message: Bytes,
) -> impl Future<Output = Result<(), MessageSendError>> {
(**self).send(message)
}
fn next(&mut self) -> impl Future<Output = Option<BytesMut>> {
(**self).next()
}
}
pub struct SkipAsk<R> {
relay: R,
skip: bool,
}
impl<R: Relay> SkipAsk<R> {
pub fn new(relay: R) -> Self {
Self { relay, skip: true }
}
pub fn with_ask(self) -> Self {
let Self { relay, .. } = self;
Self { relay, skip: false }
}
pub fn with_ask_if(self, ask: bool) -> Self {
let Self { relay, .. } = self;
Self { relay, skip: !ask }
}
}
impl<R: Relay> Relay for SkipAsk<R> {
async fn ask(
&self,
id: &MsgId,
ttl: Duration,
) -> Result<(), MessageSendError> {
if self.skip {
return Ok(());
}
self.feed(allocate_message(id, ttl, 0, &[])).await
}
async fn feed(&self, message: Bytes) -> Result<(), MessageSendError> {
if self.skip && message.len() == MESSAGE_HEADER_SIZE {
return Ok(());
}
self.relay.feed(message).await
}
fn flush(&self) -> impl Future<Output = Result<(), MessageSendError>> {
self.relay.flush()
}
fn next(&mut self) -> impl Future<Output = Option<BytesMut>> {
self.relay.next()
}
}
pub trait MessageRelayService {
type MessageRelay: Relay;
fn connect(&self) -> impl Future<Output = Option<Self::MessageRelay>>;
}