use crate::error::{ChannelError, Result};
use crate::protocol::EncodedMessage;
use std::collections::VecDeque;
use std::time::Duration;
#[derive(Debug, Clone, Default)]
pub struct ChannelMetrics {
pub bytes_sent: u64,
pub bytes_received: u64,
pub messages_sent: u64,
pub messages_received: u64,
pub latency_avg_ms: f32,
pub error_rate: f32,
pub bandwidth_available: u32,
}
pub trait Channel {
fn send(&mut self, message: EncodedMessage) -> Result<()>;
fn receive(&mut self, timeout: Duration) -> Result<EncodedMessage>;
fn is_available(&self) -> bool;
fn metrics(&self) -> ChannelMetrics;
fn close(&mut self);
}
#[derive(Debug)]
pub struct MemoryChannel {
tx_buffer: VecDeque<EncodedMessage>,
rx_buffer: VecDeque<EncodedMessage>,
max_buffer_size: usize,
is_open: bool,
metrics: ChannelMetrics,
}
impl MemoryChannel {
pub fn new() -> Self {
Self {
tx_buffer: VecDeque::new(),
rx_buffer: VecDeque::new(),
max_buffer_size: 1000,
is_open: true,
metrics: ChannelMetrics::default(),
}
}
pub fn with_buffer_size(max_size: usize) -> Self {
Self {
tx_buffer: VecDeque::with_capacity(max_size),
rx_buffer: VecDeque::with_capacity(max_size),
max_buffer_size: max_size,
is_open: true,
metrics: ChannelMetrics::default(),
}
}
pub fn push_incoming(&mut self, message: EncodedMessage) {
self.rx_buffer.push_back(message);
}
pub fn pop_outgoing(&mut self) -> Option<EncodedMessage> {
self.tx_buffer.pop_front()
}
pub fn pending_outgoing(&self) -> usize {
self.tx_buffer.len()
}
pub fn pending_incoming(&self) -> usize {
self.rx_buffer.len()
}
pub fn transfer_to(&mut self, other: &mut MemoryChannel) {
while let Some(msg) = self.tx_buffer.pop_front() {
other.rx_buffer.push_back(msg);
}
}
}
impl Default for MemoryChannel {
fn default() -> Self {
Self::new()
}
}
impl Channel for MemoryChannel {
fn send(&mut self, message: EncodedMessage) -> Result<()> {
if !self.is_open {
return Err(ChannelError::Disconnected {
reason: "Channel is closed".to_string(),
}
.into());
}
if self.tx_buffer.len() >= self.max_buffer_size {
return Err(ChannelError::BufferFull.into());
}
let msg_size = message.len();
self.tx_buffer.push_back(message);
self.metrics.bytes_sent += msg_size as u64;
self.metrics.messages_sent += 1;
Ok(())
}
fn receive(&mut self, _timeout: Duration) -> Result<EncodedMessage> {
if !self.is_open {
return Err(ChannelError::Disconnected {
reason: "Channel is closed".to_string(),
}
.into());
}
match self.rx_buffer.pop_front() {
Some(msg) => {
self.metrics.bytes_received += msg.len() as u64;
self.metrics.messages_received += 1;
Ok(msg)
}
None => Err(ChannelError::Timeout { timeout_ms: 0 }.into()),
}
}
fn is_available(&self) -> bool {
self.is_open
}
fn metrics(&self) -> ChannelMetrics {
self.metrics.clone()
}
fn close(&mut self) {
self.is_open = false;
}
}
#[derive(Debug)]
pub struct ChannelPair {
pub emitter_to_receiver: MemoryChannel,
pub receiver_to_emitter: MemoryChannel,
}
impl ChannelPair {
pub fn new() -> Self {
Self {
emitter_to_receiver: MemoryChannel::new(),
receiver_to_emitter: MemoryChannel::new(),
}
}
pub fn transfer(&mut self) {
while let Some(msg) = self.emitter_to_receiver.pop_outgoing() {
self.emitter_to_receiver.push_incoming(msg);
}
while let Some(msg) = self.receiver_to_emitter.pop_outgoing() {
self.receiver_to_emitter.push_incoming(msg);
}
}
}
impl Default for ChannelPair {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct LossyChannel {
inner: MemoryChannel,
loss_rate: f32,
rng_state: u64,
}
impl LossyChannel {
pub fn new(loss_rate: f32) -> Self {
Self {
inner: MemoryChannel::new(),
loss_rate: loss_rate.clamp(0.0, 1.0),
rng_state: 12345,
}
}
fn next_random(&mut self) -> f32 {
self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
((self.rng_state >> 16) & 0x7fff) as f32 / 32767.0
}
}
impl Channel for LossyChannel {
fn send(&mut self, message: EncodedMessage) -> Result<()> {
if self.next_random() < self.loss_rate {
self.inner.metrics.bytes_sent += message.len() as u64;
self.inner.metrics.messages_sent += 1;
self.inner.metrics.error_rate = self.loss_rate;
return Ok(()); }
self.inner.send(message)
}
fn receive(&mut self, timeout: Duration) -> Result<EncodedMessage> {
self.inner.receive(timeout)
}
fn is_available(&self) -> bool {
self.inner.is_available()
}
fn metrics(&self) -> ChannelMetrics {
self.inner.metrics()
}
fn close(&mut self) {
self.inner.close()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::{MessageHeader, MessageType, Priority};
fn make_test_message(seq: u16) -> EncodedMessage {
EncodedMessage::new(
MessageHeader {
version: 1,
message_type: MessageType::Data,
priority: Priority::P3Normal,
sequence: seq,
timestamp: 0,
context_version: 0,
},
vec![0x00, 0x00, 0x42],
)
}
#[test]
fn test_memory_channel_send_receive() {
let mut channel = MemoryChannel::new();
let msg = make_test_message(1);
channel.send(msg.clone()).unwrap();
assert_eq!(channel.pending_outgoing(), 1);
assert_eq!(channel.pending_incoming(), 0);
let outgoing = channel.pop_outgoing().unwrap();
channel.push_incoming(outgoing);
assert_eq!(channel.pending_outgoing(), 0);
assert_eq!(channel.pending_incoming(), 1);
let received = channel.receive(Duration::from_secs(1)).unwrap();
assert_eq!(received.header.sequence, 1);
}
#[test]
fn test_memory_channel_buffer_full() {
let mut channel = MemoryChannel::with_buffer_size(2);
channel.send(make_test_message(1)).unwrap();
channel.send(make_test_message(2)).unwrap();
let result = channel.send(make_test_message(3));
assert!(result.is_err());
}
#[test]
fn test_memory_channel_closed() {
let mut channel = MemoryChannel::new();
channel.close();
let result = channel.send(make_test_message(1));
assert!(result.is_err());
}
#[test]
fn test_channel_pair() {
let mut pair = ChannelPair::new();
pair.emitter_to_receiver.send(make_test_message(1)).unwrap();
pair.emitter_to_receiver.send(make_test_message(2)).unwrap();
while let Some(msg) = pair.emitter_to_receiver.pop_outgoing() {
pair.emitter_to_receiver.push_incoming(msg);
}
let msg1 = pair
.emitter_to_receiver
.receive(Duration::from_secs(1))
.unwrap();
let msg2 = pair
.emitter_to_receiver
.receive(Duration::from_secs(1))
.unwrap();
assert_eq!(msg1.header.sequence, 1);
assert_eq!(msg2.header.sequence, 2);
}
#[test]
fn test_channel_metrics() {
let mut channel = MemoryChannel::new();
let msg = make_test_message(1);
let msg_size = msg.len();
channel.send(msg).unwrap();
let metrics = channel.metrics();
assert_eq!(metrics.bytes_sent, msg_size as u64);
assert_eq!(metrics.messages_sent, 1);
}
#[test]
fn test_lossy_channel() {
let mut channel = LossyChannel::new(0.5);
let mut sent = 0;
for i in 0..100 {
channel.send(make_test_message(i)).unwrap();
sent += 1;
}
let mut received = 0;
while channel.inner.pending_outgoing() > 0 {
let msg = channel.inner.pop_outgoing().unwrap();
channel.inner.push_incoming(msg);
received += 1;
}
assert!(received < sent);
assert!(received > 0);
}
}