use std::sync::Arc;
use crate::{
interface_manager::{
InterfaceState, Profile,
interface_impls::tokio_udp::TokioUdpInterface,
profiles::direct_edge::{DirectEdge, process_frame},
utils::std::{ReceiverError, StdQueue},
},
net_stack::NetStackHandle,
};
use crate::interface_manager::profiles::direct_edge::{CENTRAL_NODE_ID, EDGE_NODE_ID};
use crate::logging::{error, info, trace, warn};
use bbq2::prod_cons::framed::FramedConsumer;
use bbq2::traits::bbqhdl::BbqHandle;
use maitake_sync::WaitQueue;
use tokio::{net::UdpSocket, select};
pub type StdUdpClientIm = DirectEdge<TokioUdpInterface>;
pub struct RxWorker<N: NetStackHandle> {
stack: N,
skt: Arc<UdpSocket>,
closer: Arc<WaitQueue>,
net_id: u16,
}
impl<N> RxWorker<N>
where
N: NetStackHandle<Profile = DirectEdge<TokioUdpInterface>>,
{
pub async fn run(mut self) -> Result<(), ReceiverError> {
info!("Started rx_worker");
let res = self.run_inner().await;
info!("Finished rx_worker");
self.stack.stack().manage_profile(|im| {
_ = im.set_interface_state((), InterfaceState::Down);
});
res
}
pub async fn run_inner(&mut self) -> Result<(), ReceiverError> {
let mut raw_buf = [0u8; 4096];
let mut net_id = Some(self.net_id);
loop {
let rd = self.skt.recv_from(&mut raw_buf);
let close = self.closer.wait();
let ct = select! {
r = rd => {
match r {
Ok((0, _)) => {
warn!("received nothing, retrying");
continue
},
Err(e) => {
warn!("receiver error, retrying. error: {}", e);
continue
},
Ok((ct, remote_address)) => {
trace!("received {} bytes from {}", ct, remote_address);
ct
},
}
}
_c = close => {
return Err(ReceiverError::SocketClosed);
}
};
let buf = &mut raw_buf[..ct];
process_frame(&mut net_id, buf, &self.stack, ());
}
}
}
#[derive(Debug, PartialEq)]
pub struct SocketAlreadyActive;
pub enum InterfaceKind {
Target,
Controller,
}
pub async fn register_interface<N>(
stack: N,
socket: UdpSocket,
queue: StdQueue,
interface_kind: InterfaceKind,
ident: <<N as NetStackHandle>::Profile as Profile>::InterfaceIdent,
) -> Result<(), SocketAlreadyActive>
where
N: NetStackHandle<Profile = DirectEdge<TokioUdpInterface>>,
N: Send + 'static,
{
let arc_socket = Arc::new(socket);
let (rx, tx) = (arc_socket.clone(), arc_socket);
let net_id = 1_u16;
let closer = Arc::new(WaitQueue::new());
stack.stack().manage_profile(|im| {
match im.interface_state(()) {
Some(InterfaceState::Down) => {}
Some(InterfaceState::Inactive) => return Err(SocketAlreadyActive),
Some(InterfaceState::ActiveLocal { .. }) => return Err(SocketAlreadyActive),
Some(InterfaceState::Active { .. }) => return Err(SocketAlreadyActive),
None => {}
}
match interface_kind {
InterfaceKind::Controller => {
trace!("UDP controller is active");
im.set_interface_state(
ident,
InterfaceState::Active {
net_id,
node_id: CENTRAL_NODE_ID,
},
)
}
InterfaceKind::Target => {
trace!("UDP target is active");
im.set_interface_state(
ident,
InterfaceState::Active {
net_id,
node_id: EDGE_NODE_ID,
},
)
}
}
.map_err(|_| SocketAlreadyActive)?;
Ok(())
})?;
let rx_worker = RxWorker {
stack,
skt: rx,
closer: closer.clone(),
net_id,
};
tokio::task::spawn(tx_worker(tx, queue.framed_consumer(), closer.clone()));
tokio::task::spawn(rx_worker.run());
Ok(())
}
async fn tx_worker(tx: Arc<UdpSocket>, rx: FramedConsumer<StdQueue>, closer: Arc<WaitQueue>) {
info!("Started tx_worker");
loop {
let rxf = rx.wait_read();
let clf = closer.wait();
let frame = select! {
r = rxf => r,
_c = clf => {
break;
}
};
let len = frame.len();
trace!("sending pkt len:{}", len);
let res = tx.send(&frame).await;
frame.release();
if let Err(e) = res {
error!("Err: {e:?}");
break;
}
}
warn!("Closing interface");
}