1use std::fmt::Debug;
2use std::pin::Pin;
3
4use coap_lite::Packet;
5use futures::stream::Fuse;
6use futures::{SinkExt, StreamExt};
7use log::{error, trace, warn};
8use tokio::sync::mpsc::{Receiver, Sender};
9
10use crate::packet_handler::{IntoHandler, PacketHandler};
11use crate::transport::{FramedBinding, FramedItem, FramedReadError, Transport, TransportError};
12
13pub struct CoapServer<Handler, Endpoint> {
15 binding: Fuse<Pin<Box<dyn FramedBinding<Endpoint>>>>,
16 packet_relay_rx: Receiver<FramedItem<Endpoint>>,
17 packet_relay_tx: Sender<FramedItem<Endpoint>>,
18 handler: Option<Handler>,
19}
20
21impl<Handler, Endpoint: Debug + Send + Clone + 'static> CoapServer<Handler, Endpoint>
22where
23 Handler: PacketHandler<Endpoint> + Send + 'static,
24{
25 pub async fn bind<T: Transport<Endpoint = Endpoint>>(
28 transport: T,
29 ) -> Result<Self, TransportError> {
30 let binding = transport.bind().await?;
31 let (packet_tx, packet_rx) = tokio::sync::mpsc::channel(32);
32 Ok(Self {
33 binding: binding.fuse(),
34 packet_relay_rx: packet_rx,
35 packet_relay_tx: packet_tx,
36 handler: None,
37 })
38 }
39
40 pub async fn serve(
45 mut self,
46 handler: impl IntoHandler<Handler, Endpoint>,
47 ) -> Result<(), FatalServerError> {
48 let mtu = self.binding.get_ref().mtu();
49 self.handler = Some(handler.into_handler(mtu));
50
51 loop {
52 tokio::select! {
53 event = self.binding.select_next_some() => {
54 self.handle_rx_event(event)?;
55 }
56 Some(item) = self.packet_relay_rx.recv() => {
57 self.handle_packet_relay(item).await;
58 }
59 }
60 }
61 }
62
63 fn handle_rx_event(
64 &self,
65 result: Result<FramedItem<Endpoint>, FramedReadError<Endpoint>>,
66 ) -> Result<(), FatalServerError> {
67 match result {
68 Ok((packet, peer)) => {
69 trace!("Incoming packet from {peer:?}: {packet:?}");
70 self.do_handle_request(packet, peer)?
71 }
72 Err((transport_err, peer)) => {
73 warn!("Error from {peer:?}: {transport_err}");
74 if peer.is_none() {
75 return Err(transport_err.into());
76 }
77 }
78 }
79
80 Ok(())
81 }
82
83 fn do_handle_request(&self, packet: Packet, peer: Endpoint) -> Result<(), FatalServerError> {
84 let handler = self
85 .handler
86 .as_ref()
87 .ok_or_else(|| FatalServerError::InternalError("handler not set".to_string()))?;
88 let reply_stream = Self::gen_and_send_responses(
89 handler.clone(),
90 self.packet_relay_tx.clone(),
91 packet,
92 peer,
93 );
94 tokio::spawn(reply_stream);
95 Ok(())
96 }
97
98 async fn gen_and_send_responses(
99 handler: Handler,
100 packet_tx: Sender<FramedItem<Endpoint>>,
101 packet: Packet,
102 peer: Endpoint,
103 ) {
104 let mut stream = handler.handle(packet, peer.clone());
105 while let Some(response) = stream.next().await {
106 let cloned_peer = peer.clone();
107 packet_tx
108 .send((response, cloned_peer))
109 .await
110 .expect("packet_rx closed?");
111 }
112 }
113
114 async fn handle_packet_relay(&mut self, item: FramedItem<Endpoint>) {
115 let peer = item.1.clone();
116 trace!("Outgoing packet to {:?}: {:?}", peer, item.0);
117 if let Err(e) = self.binding.send(item).await {
118 error!("Error sending to {peer:?}: {e:?}");
119 }
120 }
121}
122
123#[derive(thiserror::Error, Debug)]
126pub enum FatalServerError {
127 #[error("internal error: {0}")]
129 InternalError(String),
130
131 #[error("fatal transport error: {0}")]
134 Transport(#[from] TransportError),
135}