pub mod driver;
pub mod hdlc;
pub mod tcp_client;
pub mod tcp_server;
pub mod udp;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use tokio::task;
use tokio_util::sync::CancellationToken;
use crate::hash::AddressHash;
use crate::hash::Hash;
use crate::packet::Packet;
pub use driver::{InterfaceDriver, InterfaceDriverFactory};
pub type InterfaceTxSender = mpsc::Sender<TxMessage>;
pub type InterfaceTxReceiver = mpsc::Receiver<TxMessage>;
pub type InterfaceRxSender = mpsc::Sender<RxMessage>;
pub type InterfaceRxReceiver = mpsc::Receiver<RxMessage>;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum TxMessageType {
Broadcast(Option<AddressHash>),
Direct(AddressHash),
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct TxMessage {
pub tx_type: TxMessageType,
pub packet: Packet,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct RxMessage {
pub address: AddressHash, pub packet: Packet, }
pub struct InterfaceChannel {
pub address: AddressHash,
pub rx_channel: InterfaceRxSender,
pub tx_channel: InterfaceTxReceiver,
pub stop: CancellationToken,
}
impl InterfaceChannel {
pub fn make_rx_channel(cap: usize) -> (InterfaceRxSender, InterfaceRxReceiver) {
mpsc::channel(cap)
}
pub fn make_tx_channel(cap: usize) -> (InterfaceTxSender, InterfaceTxReceiver) {
mpsc::channel(cap)
}
pub fn new(
rx_channel: InterfaceRxSender,
tx_channel: InterfaceTxReceiver,
address: AddressHash,
stop: CancellationToken,
) -> Self {
Self {
address,
rx_channel,
tx_channel,
stop,
}
}
pub fn address(&self) -> &AddressHash {
&self.address
}
pub fn split(self) -> (InterfaceRxSender, InterfaceTxReceiver) {
(self.rx_channel, self.tx_channel)
}
}
pub trait Interface {
fn mtu() -> usize;
}
struct LocalInterface {
address: AddressHash,
tx_send: InterfaceTxSender,
stop: CancellationToken,
}
pub struct InterfaceContext<T: Interface> {
pub inner: Arc<Mutex<T>>,
pub channel: InterfaceChannel,
pub cancel: CancellationToken,
}
pub struct InterfaceManager {
counter: usize,
rx_recv: Arc<tokio::sync::Mutex<InterfaceRxReceiver>>,
rx_send: InterfaceRxSender,
cancel: CancellationToken,
ifaces: Vec<LocalInterface>,
}
impl InterfaceManager {
pub fn new(rx_cap: usize) -> Self {
let (rx_send, rx_recv) = InterfaceChannel::make_rx_channel(rx_cap);
let rx_recv = Arc::new(tokio::sync::Mutex::new(rx_recv));
Self {
counter: 0,
rx_recv,
rx_send,
cancel: CancellationToken::new(),
ifaces: Vec::new(),
}
}
pub fn new_channel(&mut self, tx_cap: usize) -> InterfaceChannel {
self.counter += 1;
let counter_bytes = self.counter.to_le_bytes();
let address = AddressHash::new_from_hash(&Hash::new_from_slice(&counter_bytes[..]));
let (tx_send, tx_recv) = InterfaceChannel::make_tx_channel(tx_cap);
log::debug!("iface: create channel {}", address);
let stop = CancellationToken::new();
self.ifaces.push(LocalInterface {
address,
tx_send,
stop: stop.clone(),
});
InterfaceChannel {
rx_channel: self.rx_send.clone(),
tx_channel: tx_recv,
address,
stop,
}
}
pub fn new_context<T: Interface>(&mut self, inner: T) -> InterfaceContext<T> {
let channel = self.new_channel(1);
let inner = Arc::new(Mutex::new(inner));
InterfaceContext::<T> {
inner: inner.clone(),
channel,
cancel: self.cancel.clone(),
}
}
pub fn spawn<T: Interface, F, R>(&mut self, inner: T, worker: F) -> AddressHash
where
F: FnOnce(InterfaceContext<T>) -> R,
R: std::future::Future<Output = ()> + Send + 'static,
R::Output: Send + 'static,
{
let context = self.new_context(inner);
let address = *context.channel.address();
task::spawn(worker(context));
address
}
pub fn receiver(&self) -> Arc<tokio::sync::Mutex<InterfaceRxReceiver>> {
self.rx_recv.clone()
}
pub fn cleanup(&mut self) {
self.ifaces.retain(|iface| !iface.stop.is_cancelled());
}
pub async fn send(&self, message: TxMessage) {
for iface in &self.ifaces {
let should_send = match message.tx_type {
TxMessageType::Broadcast(address) => {
let mut should_send = true;
if let Some(address) = address {
should_send = address != iface.address;
}
should_send
}
TxMessageType::Direct(address) => address == iface.address,
};
if should_send && !iface.stop.is_cancelled() {
let _ = iface.tx_send.send(message).await;
}
}
}
}
impl Drop for InterfaceManager {
fn drop(&mut self) {
self.cancel.cancel();
}
}