use crate::{
event,
inet::datagram,
io::{rx, tx},
path::Tuple,
};
use core::task::{Context, Poll};
use std::sync::{Arc, Mutex};
pub type Handle = Tuple;
#[derive(Clone, Debug, Default)]
pub struct Channel {
messages: Arc<Mutex<Vec<Message>>>,
}
impl Channel {
pub fn push(&self, message: Message) {
self.messages.lock().unwrap().push(message);
}
pub fn pop(&self) -> Option<Message> {
self.messages.lock().unwrap().pop()
}
#[inline]
fn queue<F: FnOnce(&mut Queue<'static>)>(&mut self, f: F) {
if let Ok(mut messages) = self.messages.lock() {
let messages: &mut Vec<_> = &mut *messages;
let messages: &'static mut _ = unsafe {
core::mem::transmute(messages)
};
let mut queue = Queue { messages };
f(&mut queue);
}
}
}
#[derive(Clone, Debug)]
pub struct Message {
pub header: datagram::Header<Tuple>,
pub payload: Vec<u8>,
}
impl Default for Message {
fn default() -> Self {
Self {
header: datagram::Header {
ecn: Default::default(),
path: Tuple {
local_address: Default::default(),
remote_address: Default::default(),
},
},
payload: Default::default(),
}
}
}
impl tx::Tx for Channel {
type PathHandle = Tuple;
type Queue = Queue<'static>;
type Error = ();
#[inline]
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Pending
}
#[inline]
fn queue<F: FnOnce(&mut Self::Queue)>(&mut self, f: F) {
Self::queue(self, f)
}
#[inline]
fn handle_error<E: event::EndpointPublisher>(self, _error: Self::Error, _event: &mut E) {
}
}
impl rx::Rx for Channel {
type PathHandle = Tuple;
type Queue = Queue<'static>;
type Error = ();
#[inline]
fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let messages = self.messages.lock().map_err(|_| ())?;
if messages.is_empty() {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
#[inline]
fn queue<F: FnOnce(&mut Self::Queue)>(&mut self, f: F) {
Self::queue(self, f)
}
#[inline]
fn handle_error<E: event::EndpointPublisher>(self, _error: Self::Error, _event: &mut E) {
}
}
pub struct Queue<'a> {
messages: &'a mut Vec<Message>,
}
impl tx::Queue for Queue<'_> {
type Handle = Tuple;
#[inline]
fn push<M: tx::Message<Handle = Self::Handle>>(
&mut self,
mut message: M,
) -> Result<tx::Outcome, tx::Error> {
let mut out = Message::default();
out.header.ecn = message.ecn();
out.header.path = *message.path_handle();
out.payload.resize(1500, 0);
let buffer = tx::PayloadBuffer::new(&mut out.payload);
let len = message.write_payload(buffer, 0)?;
self.messages.push(out);
let outcome = tx::Outcome { index: 0, len };
Ok(outcome)
}
#[inline]
fn capacity(&self) -> usize {
usize::MAX - self.messages.len()
}
}
impl rx::Queue for Queue<'_> {
type Handle = Tuple;
#[inline]
fn for_each<F: FnMut(datagram::Header<Self::Handle>, &mut [u8])>(&mut self, mut on_packet: F) {
for mut message in self.messages.drain(..) {
on_packet(message.header, &mut message.payload);
}
}
#[inline]
fn is_empty(&self) -> bool {
self.messages.is_empty()
}
}