use async_trait::async_trait;
use parking_lot::Mutex;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use telltale_types::FixedQ32;
use super::envelope::ProtocolEnvelope;
use crate::identifiers::RoleName;
type MessageQueues = Arc<Mutex<BTreeMap<(RoleName, RoleName), VecDeque<ProtocolEnvelope>>>>;
#[derive(Debug, thiserror::Error)]
pub enum TransportError {
#[error("destination unreachable: {0}")]
Unreachable(String),
#[error("no message available from {0}")]
NoMessage(String),
#[error("channel closed")]
ChannelClosed,
#[error("serialization error: {0}")]
Serialization(String),
#[error("timeout waiting for message")]
Timeout,
#[error("role not set on transport")]
RoleNotSet,
#[error("transport error: {0}")]
Other(String),
}
pub type TransportResult<T> = Result<T, TransportError>;
pub trait SimulatedTransport: Send {
fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()>;
fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope>;
fn peek(&self, from: &RoleName) -> bool;
fn pending_messages(&self) -> Vec<&ProtocolEnvelope>;
}
#[async_trait]
pub trait AsyncSimulatedTransport: Send + Sync {
async fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()>;
async fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope>;
fn try_recv(&mut self, from: &RoleName) -> TransportResult<Option<ProtocolEnvelope>>;
fn has_message(&self, from: &RoleName) -> bool;
}
#[derive(Debug, Default)]
pub struct InMemoryTransport {
role: Option<RoleName>,
queues: MessageQueues,
}
impl InMemoryTransport {
#[must_use]
pub fn new() -> Self {
Self {
role: None,
queues: Arc::new(Mutex::new(BTreeMap::new())),
}
}
#[must_use]
pub fn with_shared_queues(queues: MessageQueues) -> Self {
Self { role: None, queues }
}
pub fn set_role(&mut self, role: RoleName) {
self.role = Some(role);
}
#[must_use]
pub fn role(&self) -> Option<&RoleName> {
self.role.as_ref()
}
fn queue_key(from: &RoleName, to: &RoleName) -> (RoleName, RoleName) {
(from.clone(), to.clone())
}
#[must_use]
pub fn all_messages(&self) -> Vec<ProtocolEnvelope> {
let queues = self.queues.lock();
queues.values().flatten().cloned().collect()
}
pub fn clear(&mut self) {
let mut queues = self.queues.lock();
queues.clear();
}
#[must_use]
pub fn pending_count(&self) -> usize {
let queues = self.queues.lock();
queues.values().map(|q| q.len()).sum()
}
}
impl Clone for InMemoryTransport {
fn clone(&self) -> Self {
Self {
role: self.role.clone(),
queues: Arc::clone(&self.queues),
}
}
}
impl SimulatedTransport for InMemoryTransport {
fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
let from = self.role.as_ref().ok_or(TransportError::RoleNotSet)?;
let key = Self::queue_key(from, to);
let mut queues = self.queues.lock();
queues.entry(key).or_default().push_back(envelope);
Ok(())
}
fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
let to = self.role.as_ref().ok_or(TransportError::RoleNotSet)?;
let key = Self::queue_key(from, to);
let mut queues = self.queues.lock();
queues
.get_mut(&key)
.and_then(|q| q.pop_front())
.ok_or_else(|| TransportError::NoMessage(from.to_string()))
}
fn peek(&self, from: &RoleName) -> bool {
let Some(to) = self.role.as_ref() else {
return false;
};
let key = Self::queue_key(from, to);
let queues = self.queues.lock();
queues.get(&key).is_some_and(|q| !q.is_empty())
}
fn pending_messages(&self) -> Vec<&ProtocolEnvelope> {
Vec::new()
}
}
#[async_trait]
impl AsyncSimulatedTransport for InMemoryTransport {
async fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
SimulatedTransport::send(self, to, envelope)
}
async fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
SimulatedTransport::recv(self, from)
}
fn try_recv(&mut self, from: &RoleName) -> TransportResult<Option<ProtocolEnvelope>> {
match SimulatedTransport::recv(self, from) {
Ok(env) => Ok(Some(env)),
Err(TransportError::NoMessage(_)) => Ok(None),
Err(e) => Err(e),
}
}
fn has_message(&self, from: &RoleName) -> bool {
self.peek(from)
}
}
pub struct FaultyTransport<T> {
inner: T,
drop_rate: FixedQ32,
delay: bool,
seed: u64,
rng_state: u64,
}
impl<T> FaultyTransport<T> {
pub fn new(inner: T) -> Self {
Self {
inner,
drop_rate: FixedQ32::zero(),
delay: false,
seed: 12345,
rng_state: 12345,
}
}
pub fn with_drop_rate(mut self, rate: FixedQ32) -> Self {
self.drop_rate = rate.clamp(FixedQ32::zero(), FixedQ32::one());
self
}
pub fn with_delays(mut self) -> Self {
self.delay = true;
self
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.seed = seed;
self.rng_state = seed;
self
}
fn random_float(&mut self) -> FixedQ32 {
self.rng_state ^= self.rng_state << 13;
self.rng_state ^= self.rng_state >> 7;
self.rng_state ^= self.rng_state << 17;
let frac_bits = i64::try_from(self.rng_state >> 32).unwrap_or(i64::MAX);
FixedQ32::from_bits(frac_bits)
}
fn should_drop(&mut self) -> bool {
self.random_float() < self.drop_rate
}
}
impl<T: SimulatedTransport> SimulatedTransport for FaultyTransport<T> {
fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
if self.should_drop() {
return Ok(());
}
self.inner.send(to, envelope)
}
fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
self.inner.recv(from)
}
fn peek(&self, from: &RoleName) -> bool {
self.inner.peek(from)
}
fn pending_messages(&self) -> Vec<&ProtocolEnvelope> {
self.inner.pending_messages()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_envelope(from: &str, to: &str) -> ProtocolEnvelope {
ProtocolEnvelope::builder()
.protocol("Test")
.sender(RoleName::new(from).unwrap())
.recipient(RoleName::new(to).unwrap())
.message_type("Msg")
.payload(vec![1, 2, 3])
.build()
.unwrap()
}
#[test]
fn test_in_memory_transport() {
let queues = Arc::new(Mutex::new(BTreeMap::new()));
let mut client = InMemoryTransport::with_shared_queues(Arc::clone(&queues));
client.set_role(RoleName::from_static("Client"));
let mut server = InMemoryTransport::with_shared_queues(Arc::clone(&queues));
server.set_role(RoleName::from_static("Server"));
let env = make_envelope("Client", "Server");
let server_role = RoleName::from_static("Server");
SimulatedTransport::send(&mut client, &server_role, env).unwrap();
let client_role = RoleName::from_static("Client");
assert!(server.peek(&client_role));
let received = SimulatedTransport::recv(&mut server, &client_role).unwrap();
assert_eq!(received.from_role.as_str(), "Client");
assert_eq!(received.to_role.as_str(), "Server");
}
#[test]
fn test_no_message_error() {
let mut transport = InMemoryTransport::new();
transport.set_role(RoleName::from_static("Client"));
let server_role = RoleName::from_static("Server");
let result = SimulatedTransport::recv(&mut transport, &server_role);
assert!(matches!(result, Err(TransportError::NoMessage(_))));
}
#[test]
fn test_faulty_transport_drops() {
let inner = InMemoryTransport::new();
let mut faulty = FaultyTransport::new(inner)
.with_drop_rate(FixedQ32::one()) .with_seed(42);
faulty.inner.set_role(RoleName::from_static("Client"));
let env = make_envelope("Client", "Server");
let server_role = RoleName::from_static("Server");
faulty.send(&server_role, env).unwrap();
assert_eq!(faulty.inner.pending_count(), 0);
}
}