coap_server/
server.rs

1use std::fmt::Debug;
2use std::pin::Pin;
3
4use coap_lite::Packet;
5use futures::stream::Fuse;
6use futures::{SinkExt, StreamExt};
7use log::{error, trace, warn};
8use tokio::sync::mpsc::{Receiver, Sender};
9
10use crate::packet_handler::{IntoHandler, PacketHandler};
11use crate::transport::{FramedBinding, FramedItem, FramedReadError, Transport, TransportError};
12
13/// Primary server API to configure, bind, and ultimately run the CoAP server.
14pub struct CoapServer<Handler, Endpoint> {
15    binding: Fuse<Pin<Box<dyn FramedBinding<Endpoint>>>>,
16    packet_relay_rx: Receiver<FramedItem<Endpoint>>,
17    packet_relay_tx: Sender<FramedItem<Endpoint>>,
18    handler: Option<Handler>,
19}
20
21impl<Handler, Endpoint: Debug + Send + Clone + 'static> CoapServer<Handler, Endpoint>
22where
23    Handler: PacketHandler<Endpoint> + Send + 'static,
24{
25    /// Bind the server to a specific source of incoming packets in a transport-agnostic way.  Most
26    /// customers will wish to use [`crate::udp::UdpTransport`].
27    pub async fn bind<T: Transport<Endpoint = Endpoint>>(
28        transport: T,
29    ) -> Result<Self, TransportError> {
30        let binding = transport.bind().await?;
31        let (packet_tx, packet_rx) = tokio::sync::mpsc::channel(32);
32        Ok(Self {
33            binding: binding.fuse(),
34            packet_relay_rx: packet_rx,
35            packet_relay_tx: packet_tx,
36            handler: None,
37        })
38    }
39
40    /// Run the server "forever".  Note that the function may return a fatal error if the server
41    /// encounters unrecoverable issues, typically due to programmer error in this crate itself
42    /// or transport errors not related to a specific peer.  The intention is that this crate
43    /// should be highly reliable and run indefinitely for properly configured use cases.
44    pub async fn serve(
45        mut self,
46        handler: impl IntoHandler<Handler, Endpoint>,
47    ) -> Result<(), FatalServerError> {
48        let mtu = self.binding.get_ref().mtu();
49        self.handler = Some(handler.into_handler(mtu));
50
51        loop {
52            tokio::select! {
53                event = self.binding.select_next_some() => {
54                    self.handle_rx_event(event)?;
55                }
56                Some(item) = self.packet_relay_rx.recv() => {
57                    self.handle_packet_relay(item).await;
58                }
59            }
60        }
61    }
62
63    fn handle_rx_event(
64        &self,
65        result: Result<FramedItem<Endpoint>, FramedReadError<Endpoint>>,
66    ) -> Result<(), FatalServerError> {
67        match result {
68            Ok((packet, peer)) => {
69                trace!("Incoming packet from {peer:?}: {packet:?}");
70                self.do_handle_request(packet, peer)?
71            }
72            Err((transport_err, peer)) => {
73                warn!("Error from {peer:?}: {transport_err}");
74                if peer.is_none() {
75                    return Err(transport_err.into());
76                }
77            }
78        }
79
80        Ok(())
81    }
82
83    fn do_handle_request(&self, packet: Packet, peer: Endpoint) -> Result<(), FatalServerError> {
84        let handler = self
85            .handler
86            .as_ref()
87            .ok_or_else(|| FatalServerError::InternalError("handler not set".to_string()))?;
88        let reply_stream = Self::gen_and_send_responses(
89            handler.clone(),
90            self.packet_relay_tx.clone(),
91            packet,
92            peer,
93        );
94        tokio::spawn(reply_stream);
95        Ok(())
96    }
97
98    async fn gen_and_send_responses(
99        handler: Handler,
100        packet_tx: Sender<FramedItem<Endpoint>>,
101        packet: Packet,
102        peer: Endpoint,
103    ) {
104        let mut stream = handler.handle(packet, peer.clone());
105        while let Some(response) = stream.next().await {
106            let cloned_peer = peer.clone();
107            packet_tx
108                .send((response, cloned_peer))
109                .await
110                .expect("packet_rx closed?");
111        }
112    }
113
114    async fn handle_packet_relay(&mut self, item: FramedItem<Endpoint>) {
115        let peer = item.1.clone();
116        trace!("Outgoing packet to {:?}: {:?}", peer, item.0);
117        if let Err(e) = self.binding.send(item).await {
118            error!("Error sending to {peer:?}: {e:?}");
119        }
120    }
121}
122
123/// Fatal error preventing the server from starting or continuing.  Typically the result of
124/// programmer error or misconfiguration.
125#[derive(thiserror::Error, Debug)]
126pub enum FatalServerError {
127    /// Programmer error within this crate, file a bug!
128    #[error("internal error: {0}")]
129    InternalError(String),
130
131    /// Transport error that is not related to any individual peer but would prevent any future
132    /// packet exchanges on the transport.  Must abort the server.
133    #[error("fatal transport error: {0}")]
134    Transport(#[from] TransportError),
135}