coap_server/app/
app_handler.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use coap_lite::{BlockHandler, CoapRequest, MessageClass, MessageType, Packet};
8use futures::Stream;
9use log::{debug, warn};
10use tokio::sync::mpsc::UnboundedSender;
11use tokio::sync::Mutex;
12use tokio_stream::wrappers::UnboundedReceiverStream;
13
14use crate::app::app_builder::AppBuilder;
15use crate::app::block_handler_util::new_block_handler;
16use crate::app::coap_utils::new_pong_message;
17use crate::app::core_handler::CoreRequestHandler;
18use crate::app::error::CoapError;
19use crate::app::path_matcher::{MatchedResult, PathMatcher};
20use crate::app::resource_builder::BuildParameters;
21use crate::app::resource_handler::ResourceHandler;
22use crate::app::retransmission_manager::{RetransmissionManager, TransmissionParameters};
23use crate::app::Request;
24use crate::packet_handler::PacketHandler;
25
26const DEFAULT_DISCOVERABLE: bool = true;
27pub(crate) const DEFAULT_BLOCK_TRANSFER: bool = true;
28
29/// Main PacketHandler for an application suite of handlers.  Efficiency and concurrency are
30/// the primary goals of this implementation, but with the need to balance developer friendliness
31/// of the main API.
32pub struct AppHandler<Endpoint: Debug + Clone + Ord + Eq + Hash> {
33    retransmission_manager: Arc<Mutex<RetransmissionManager<Endpoint>>>,
34
35    /// Special internal [`coap_lite::BlockHandler`] that we use only for formatting errors
36    /// that might be larger than MTU.
37    error_block_handler: Arc<Mutex<BlockHandler<Endpoint>>>,
38
39    /// Full set of handlers registered for this app, grouped by path but searchable using inexact
40    /// matching.  See [`PathMatcher`] for more.
41    handlers_by_path: Arc<PathMatcher<ResourceHandler<Endpoint>>>,
42}
43
44impl<Endpoint: Debug + Clone + Ord + Eq + Hash> Clone for AppHandler<Endpoint> {
45    fn clone(&self) -> Self {
46        Self {
47            retransmission_manager: self.retransmission_manager.clone(),
48            error_block_handler: self.error_block_handler.clone(),
49            handlers_by_path: self.handlers_by_path.clone(),
50        }
51    }
52}
53
54impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> PacketHandler<Endpoint>
55    for AppHandler<Endpoint>
56{
57    fn handle<'a>(
58        &'a self,
59        packet: Packet,
60        peer: Endpoint,
61    ) -> Pin<Box<dyn Stream<Item = Packet> + Send + 'a>> {
62        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
63
64        // TODO: This spawn is technically unnecessary as we could implement a Stream ourselves
65        // similar to how async-stream crate does it, but the boiler plate doesn't really seem
66        // worth it for now.
67        tokio::spawn({
68            let cloned_self = self.clone();
69            async move {
70                cloned_self.handle_packet(tx, packet, peer).await;
71            }
72        });
73        Box::pin(UnboundedReceiverStream::new(rx))
74    }
75}
76
77impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endpoint> {
78    pub fn from_builder(builder: AppBuilder<Endpoint>, mtu: Option<u32>) -> Self {
79        let retransmission_manager = Arc::new(Mutex::new(RetransmissionManager::new(
80            TransmissionParameters::default(),
81        )));
82
83        let build_params = BuildParameters {
84            mtu,
85            retransmission_manager: retransmission_manager.clone(),
86        };
87
88        let error_block_handler = Arc::new(Mutex::new(new_block_handler(mtu)));
89
90        let mut discoverable_resources = Vec::new();
91        let mut handlers = HashMap::new();
92        for resource_builder in builder.resources {
93            let resource = resource_builder.build(build_params.clone());
94            if let Some(discoverable) = resource.discoverable {
95                discoverable_resources.push(discoverable);
96            }
97            handlers.insert(resource.path, resource.handler);
98        }
99
100        let discoverable = builder.config.discoverable.unwrap_or(DEFAULT_DISCOVERABLE);
101        if discoverable {
102            let core_resource = CoreRequestHandler::new_resource_builder(discoverable_resources)
103                .build(build_params.clone());
104            handlers.insert(core_resource.path, core_resource.handler);
105        }
106
107        let handlers_by_path = Arc::new(PathMatcher::from_path_strings(handlers));
108
109        Self {
110            retransmission_manager,
111            error_block_handler,
112            handlers_by_path,
113        }
114    }
115
116    async fn handle_packet(&self, tx: UnboundedSender<Packet>, packet: Packet, peer: Endpoint) {
117        match packet.header.code {
118            MessageClass::Request(_) => {
119                self.handle_get(tx, packet, peer).await;
120            }
121            MessageClass::Response(_) => {
122                warn!("Spurious response message from {peer:?}, ignoring...");
123            }
124            MessageClass::Empty => {
125                match packet.header.get_type() {
126                    t @ (MessageType::Acknowledgement | MessageType::Reset) => {
127                        let mut retransmission_manager = self.retransmission_manager.lock().await;
128                        if let Err(packet) =
129                            retransmission_manager.maybe_handle_reply(packet, &peer)
130                        {
131                            let message_id = packet.header.message_id;
132                            debug!(
133                                "Got {t:?} from {peer:?} for unrecognized message ID {message_id}"
134                            );
135                        }
136                    }
137                    MessageType::Confirmable => {
138                        // A common way in CoAP to trigger a cheap "ping" to make sure
139                        // the server is alive.
140                        tx.send(new_pong_message(&packet)).unwrap();
141                    }
142                    MessageType::NonConfirmable => {
143                        debug!("Ignoring Non-Confirmable Empty message from {peer:?}");
144                    }
145                }
146            }
147            code => {
148                warn!("Unhandled message code {code} from {peer:?}, ignoring...");
149            }
150        }
151    }
152
153    async fn handle_get(&self, tx: UnboundedSender<Packet>, packet: Packet, peer: Endpoint) {
154        let mut request = CoapRequest::from_packet(packet, peer);
155        if let Err(e) = self.try_handle_get(&tx, &mut request).await {
156            if request.apply_from_error(e.into_handling_error()) {
157                // If the error happens to need block2 handling, let's do that here...
158                let _ = self
159                    .error_block_handler
160                    .lock()
161                    .await
162                    .intercept_response(&mut request);
163                tx.send(request.response.unwrap().message).unwrap();
164            }
165        }
166    }
167
168    async fn try_handle_get(
169        &self,
170        tx: &UnboundedSender<Packet>,
171        request: &mut CoapRequest<Endpoint>,
172    ) -> Result<(), CoapError> {
173        let paths = request.get_path_as_vec().map_err(CoapError::bad_request)?;
174
175        let resource = self.handlers_by_path.lookup(&paths);
176        if log::log_enabled!(log::Level::Debug) {
177            let peer = &request.source;
178            let method = request.get_method();
179            let path = request.get_path();
180            let handler_label = resource
181                .as_ref()
182                .map_or_else(|| ": <no resource>!", |_| ": matched resource...");
183            debug!("Received from [{peer:?}]: {method:?} /{path}{handler_label}");
184        }
185
186        match resource {
187            Some(MatchedResult {
188                matched_index,
189                value,
190            }) => {
191                let wrapped_request = Request {
192                    original: request.clone(),
193                    unmatched_path: Vec::from(&paths[matched_index..]),
194                };
195
196                value.handle(tx, wrapped_request).await
197            }
198            None => Err(CoapError::not_found()),
199        }
200    }
201}