use std::fmt::Debug;
use std::pin::Pin;
use coap_lite::Packet;
use futures::stream::Fuse;
use futures::{SinkExt, StreamExt};
use log::{error, trace, warn};
use tokio::sync::mpsc::{Receiver, Sender};
use crate::packet_handler::{IntoHandler, PacketHandler};
use crate::transport::{FramedBinding, FramedItem, FramedReadError, Transport, TransportError};
pub struct CoapServer<Handler, Endpoint> {
binding: Fuse<Pin<Box<dyn FramedBinding<Endpoint>>>>,
packet_relay_rx: Receiver<FramedItem<Endpoint>>,
packet_relay_tx: Sender<FramedItem<Endpoint>>,
handler: Option<Handler>,
}
impl<Handler, Endpoint: Debug + Send + Clone + 'static> CoapServer<Handler, Endpoint>
where
Handler: PacketHandler<Endpoint> + Send + 'static,
{
pub async fn bind<T: Transport<Endpoint = Endpoint>>(
transport: T,
) -> Result<Self, TransportError> {
let binding = transport.bind().await?;
let (packet_tx, packet_rx) = tokio::sync::mpsc::channel(32);
Ok(Self {
binding: binding.fuse(),
packet_relay_rx: packet_rx,
packet_relay_tx: packet_tx,
handler: None,
})
}
pub async fn serve(
mut self,
handler: impl IntoHandler<Handler, Endpoint>,
) -> Result<(), FatalServerError> {
let mtu = self.binding.get_ref().mtu();
self.handler = Some(handler.into_handler(mtu));
loop {
tokio::select! {
event = self.binding.select_next_some() => {
self.handle_rx_event(event)?;
}
Some(item) = self.packet_relay_rx.recv() => {
self.handle_packet_relay(item).await;
}
}
}
}
fn handle_rx_event(
&self,
result: Result<FramedItem<Endpoint>, FramedReadError<Endpoint>>,
) -> Result<(), FatalServerError> {
match result {
Ok((packet, peer)) => {
trace!("Incoming packet from {peer:?}: {packet:?}");
self.do_handle_request(packet, peer)?
}
Err((transport_err, peer)) => {
warn!("Error from {peer:?}: {transport_err}");
if peer.is_none() {
return Err(transport_err.into());
}
}
}
Ok(())
}
fn do_handle_request(&self, packet: Packet, peer: Endpoint) -> Result<(), FatalServerError> {
let handler = self
.handler
.as_ref()
.ok_or_else(|| FatalServerError::InternalError("handler not set".to_string()))?;
let reply_stream = Self::gen_and_send_responses(
handler.clone(),
self.packet_relay_tx.clone(),
packet,
peer,
);
tokio::spawn(reply_stream);
Ok(())
}
async fn gen_and_send_responses(
handler: Handler,
packet_tx: Sender<FramedItem<Endpoint>>,
packet: Packet,
peer: Endpoint,
) {
let mut stream = handler.handle(packet, peer.clone());
while let Some(response) = stream.next().await {
let cloned_peer = peer.clone();
packet_tx
.send((response, cloned_peer))
.await
.expect("packet_rx closed?");
}
}
async fn handle_packet_relay(&mut self, item: FramedItem<Endpoint>) {
let peer = item.1.clone();
trace!("Outgoing packet to {:?}: {:?}", peer, item.0);
if let Err(e) = self.binding.send(item).await {
error!("Error sending to {peer:?}: {e:?}");
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum FatalServerError {
#[error("internal error: {0}")]
InternalError(String),
#[error("fatal transport error: {0}")]
Transport(#[from] TransportError),
}