use std::future::Future;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use super::daemon::{Broker, Session};
use super::protocol::{CommsOut, CommsRequest};
pub(crate) const LINK_CHANNEL_DEPTH: usize = 256;
pub const MAX_FRAME_BYTES: usize = 16 * 1024 * 1024;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct PeerCred {
pub uid: Option<u32>,
pub pid: Option<u32>,
}
pub trait CommsLink: Send {
fn recv(&mut self) -> impl Future<Output = std::io::Result<Option<CommsRequest>>> + Send;
fn send(&mut self, out: CommsOut) -> impl Future<Output = std::io::Result<()>> + Send;
fn peer_cred(&self) -> PeerCred;
}
pub trait CommsFrontend: Send {
fn serve(
self: Box<Self>,
broker: Arc<Broker>,
shutdown: watch::Receiver<bool>,
) -> impl Future<Output = std::io::Result<()>> + Send;
}
pub(crate) async fn serve_link<L: CommsLink>(broker: Arc<Broker>, mut link: L) {
broker.link_connected();
let (link_tx, mut link_rx) = mpsc::channel::<CommsOut>(LINK_CHANNEL_DEPTH);
let mut session = Session::default();
loop {
tokio::select! {
inbound = link.recv() => {
match inbound {
Ok(Some(req)) => {
let resp = broker.handle(req, &mut session, &link_tx).await;
if link.send(CommsOut::Response(resp)).await.is_err() {
break;
}
}
Ok(None) | Err(_) => break,
}
}
note = link_rx.recv() => {
match note {
Some(out) => {
if link.send(out).await.is_err() {
break;
}
}
None => break,
}
}
}
}
broker.link_disconnected();
}