use knx_rs_core::cemi::CemiFrame;
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::error::KnxIpError;
use crate::{KnxConnection, KnxFuture};
pub struct Multiplexer<C: KnxConnection> {
conn: C,
broadcast_tx: broadcast::Sender<CemiFrame>,
cmd_rx: mpsc::Receiver<MuxCmd>,
cmd_tx: mpsc::Sender<MuxCmd>,
}
enum MuxCmd {
Send(CemiFrame, oneshot::Sender<Result<(), KnxIpError>>),
}
impl<C: KnxConnection + 'static> Multiplexer<C> {
pub fn new(conn: C) -> Self {
let (broadcast_tx, _) = broadcast::channel(128);
let (cmd_tx, cmd_rx) = mpsc::channel(32);
Self {
conn,
broadcast_tx,
cmd_rx,
cmd_tx,
}
}
pub fn handle(&self) -> MultiplexHandle {
MultiplexHandle {
rx: self.broadcast_tx.subscribe(),
cmd_tx: self.cmd_tx.clone(),
}
}
pub async fn run(mut self) {
loop {
tokio::select! {
frame = self.conn.recv() => {
if let Some(cemi) = frame {
let _ = self.broadcast_tx.send(cemi);
} else {
tracing::debug!("multiplexer: connection closed");
break;
}
}
cmd = self.cmd_rx.recv() => {
match cmd {
Some(MuxCmd::Send(frame, reply)) => {
let result = self.conn.send(frame).await;
let _ = reply.send(result);
}
None => {
tracing::debug!("multiplexer: all handles dropped");
self.conn.close().await;
break;
}
}
}
}
}
}
}
pub struct MultiplexHandle {
rx: broadcast::Receiver<CemiFrame>,
cmd_tx: mpsc::Sender<MuxCmd>,
}
impl MultiplexHandle {
pub async fn send_frame(&self, frame: CemiFrame) -> Result<(), KnxIpError> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(MuxCmd::Send(frame, tx))
.await
.map_err(|_| KnxIpError::Closed)?;
rx.await.map_err(|_| KnxIpError::Closed)?
}
pub async fn recv(&mut self) -> Option<CemiFrame> {
loop {
match self.rx.recv().await {
Ok(frame) => return Some(frame),
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "multiplex handle lagged");
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
}
impl KnxConnection for MultiplexHandle {
fn send(&self, frame: CemiFrame) -> KnxFuture<'_, Result<(), KnxIpError>> {
let cmd_tx = self.cmd_tx.clone();
Box::pin(async move {
let (tx, rx) = oneshot::channel();
cmd_tx
.send(MuxCmd::Send(frame, tx))
.await
.map_err(|_| KnxIpError::Closed)?;
rx.await.map_err(|_| KnxIpError::Closed)?
})
}
fn recv(&mut self) -> KnxFuture<'_, Option<CemiFrame>> {
Box::pin(async move { Self::recv(self).await })
}
fn close(&mut self) -> KnxFuture<'_, ()> {
Box::pin(core::future::ready(()))
}
}