coap_server/app/
app_handler.rs1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use coap_lite::{BlockHandler, CoapRequest, MessageClass, MessageType, Packet};
8use futures::Stream;
9use log::{debug, warn};
10use tokio::sync::mpsc::UnboundedSender;
11use tokio::sync::Mutex;
12use tokio_stream::wrappers::UnboundedReceiverStream;
13
14use crate::app::app_builder::AppBuilder;
15use crate::app::block_handler_util::new_block_handler;
16use crate::app::coap_utils::new_pong_message;
17use crate::app::core_handler::CoreRequestHandler;
18use crate::app::error::CoapError;
19use crate::app::path_matcher::{MatchedResult, PathMatcher};
20use crate::app::resource_builder::BuildParameters;
21use crate::app::resource_handler::ResourceHandler;
22use crate::app::retransmission_manager::{RetransmissionManager, TransmissionParameters};
23use crate::app::Request;
24use crate::packet_handler::PacketHandler;
25
26const DEFAULT_DISCOVERABLE: bool = true;
27pub(crate) const DEFAULT_BLOCK_TRANSFER: bool = true;
28
29pub struct AppHandler<Endpoint: Debug + Clone + Ord + Eq + Hash> {
33 retransmission_manager: Arc<Mutex<RetransmissionManager<Endpoint>>>,
34
35 error_block_handler: Arc<Mutex<BlockHandler<Endpoint>>>,
38
39 handlers_by_path: Arc<PathMatcher<ResourceHandler<Endpoint>>>,
42}
43
44impl<Endpoint: Debug + Clone + Ord + Eq + Hash> Clone for AppHandler<Endpoint> {
45 fn clone(&self) -> Self {
46 Self {
47 retransmission_manager: self.retransmission_manager.clone(),
48 error_block_handler: self.error_block_handler.clone(),
49 handlers_by_path: self.handlers_by_path.clone(),
50 }
51 }
52}
53
54impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> PacketHandler<Endpoint>
55 for AppHandler<Endpoint>
56{
57 fn handle<'a>(
58 &'a self,
59 packet: Packet,
60 peer: Endpoint,
61 ) -> Pin<Box<dyn Stream<Item = Packet> + Send + 'a>> {
62 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
63
64 tokio::spawn({
68 let cloned_self = self.clone();
69 async move {
70 cloned_self.handle_packet(tx, packet, peer).await;
71 }
72 });
73 Box::pin(UnboundedReceiverStream::new(rx))
74 }
75}
76
77impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endpoint> {
78 pub fn from_builder(builder: AppBuilder<Endpoint>, mtu: Option<u32>) -> Self {
79 let retransmission_manager = Arc::new(Mutex::new(RetransmissionManager::new(
80 TransmissionParameters::default(),
81 )));
82
83 let build_params = BuildParameters {
84 mtu,
85 retransmission_manager: retransmission_manager.clone(),
86 };
87
88 let error_block_handler = Arc::new(Mutex::new(new_block_handler(mtu)));
89
90 let mut discoverable_resources = Vec::new();
91 let mut handlers = HashMap::new();
92 for resource_builder in builder.resources {
93 let resource = resource_builder.build(build_params.clone());
94 if let Some(discoverable) = resource.discoverable {
95 discoverable_resources.push(discoverable);
96 }
97 handlers.insert(resource.path, resource.handler);
98 }
99
100 let discoverable = builder.config.discoverable.unwrap_or(DEFAULT_DISCOVERABLE);
101 if discoverable {
102 let core_resource = CoreRequestHandler::new_resource_builder(discoverable_resources)
103 .build(build_params.clone());
104 handlers.insert(core_resource.path, core_resource.handler);
105 }
106
107 let handlers_by_path = Arc::new(PathMatcher::from_path_strings(handlers));
108
109 Self {
110 retransmission_manager,
111 error_block_handler,
112 handlers_by_path,
113 }
114 }
115
116 async fn handle_packet(&self, tx: UnboundedSender<Packet>, packet: Packet, peer: Endpoint) {
117 match packet.header.code {
118 MessageClass::Request(_) => {
119 self.handle_get(tx, packet, peer).await;
120 }
121 MessageClass::Response(_) => {
122 warn!("Spurious response message from {peer:?}, ignoring...");
123 }
124 MessageClass::Empty => {
125 match packet.header.get_type() {
126 t @ (MessageType::Acknowledgement | MessageType::Reset) => {
127 let mut retransmission_manager = self.retransmission_manager.lock().await;
128 if let Err(packet) =
129 retransmission_manager.maybe_handle_reply(packet, &peer)
130 {
131 let message_id = packet.header.message_id;
132 debug!(
133 "Got {t:?} from {peer:?} for unrecognized message ID {message_id}"
134 );
135 }
136 }
137 MessageType::Confirmable => {
138 tx.send(new_pong_message(&packet)).unwrap();
141 }
142 MessageType::NonConfirmable => {
143 debug!("Ignoring Non-Confirmable Empty message from {peer:?}");
144 }
145 }
146 }
147 code => {
148 warn!("Unhandled message code {code} from {peer:?}, ignoring...");
149 }
150 }
151 }
152
153 async fn handle_get(&self, tx: UnboundedSender<Packet>, packet: Packet, peer: Endpoint) {
154 let mut request = CoapRequest::from_packet(packet, peer);
155 if let Err(e) = self.try_handle_get(&tx, &mut request).await {
156 if request.apply_from_error(e.into_handling_error()) {
157 let _ = self
159 .error_block_handler
160 .lock()
161 .await
162 .intercept_response(&mut request);
163 tx.send(request.response.unwrap().message).unwrap();
164 }
165 }
166 }
167
168 async fn try_handle_get(
169 &self,
170 tx: &UnboundedSender<Packet>,
171 request: &mut CoapRequest<Endpoint>,
172 ) -> Result<(), CoapError> {
173 let paths = request.get_path_as_vec().map_err(CoapError::bad_request)?;
174
175 let resource = self.handlers_by_path.lookup(&paths);
176 if log::log_enabled!(log::Level::Debug) {
177 let peer = &request.source;
178 let method = request.get_method();
179 let path = request.get_path();
180 let handler_label = resource
181 .as_ref()
182 .map_or_else(|| ": <no resource>!", |_| ": matched resource...");
183 debug!("Received from [{peer:?}]: {method:?} /{path}{handler_label}");
184 }
185
186 match resource {
187 Some(MatchedResult {
188 matched_index,
189 value,
190 }) => {
191 let wrapped_request = Request {
192 original: request.clone(),
193 unmatched_path: Vec::from(&paths[matched_index..]),
194 };
195
196 value.handle(tx, wrapped_request).await
197 }
198 None => Err(CoapError::not_found()),
199 }
200 }
201}