embedded_nal_coap/lib.rs
1//! A CoAP server and client implementation built on [embedded_nal_async].
2//!
3//! Usage and operation
4//! -------------------
5//!
6//! An example of how to use this will be available as part of the [coap-message-demos] crate.
7//!
8//! * Allocate a [CoAPShared] with a `CONCURRENT_REQUESTS` const of the number of outgoing requests
9//! that should be servicable simultaneously.
10//!
11//! * [CoAPShared::split()] that into a client and a server part.
12//!
13//! * Use the client's [CoAPRuntimeClient::to()] method to create a [coap_request::Stack] that can
14//! be used to send CoAP requests and receive a response. (Multiple responses, eg. from
15//! observation, are planned but not implemented yet).
16//!
17//! The [coap_request_implementations] crate contains suitable (albeit immature) building blocks
18//! for constructing requests.
19//!
20//! * Into the server's [CoAPRunner::run()] method, pass an unconnected [UDP
21//! socket](embedded_nal_async::UnconnectedUdp), a source of low-grade entropy (for
22//! retransmission jitter and that like) and a [CoAP server application](coap_handler::Handler).
23//!
24//! The [coap_handler_implementations] crate contains suitable building blocks for constructing
25//! such a server application (including some to combine handlers for individual resources into a
26//! handler that picks sub-handlers from the URI path).
27//!
28//! The future returned by the run function needs to be polled by an executor; note that it is
29//! not Send, and some executors need configuration to allow that.
30//!
31//!
32//! Caveats
33//! -------
34//!
35//! * The server does not perform any amplification mitigation (and the handler can't for lack of
36//! remote information); use this only in environments where this is acceptable (e.g. in closed
37//! networks).
38//!
39//! This will be mitigated in a future version.
40//!
41//! FIXME only provide the 3x buffer for responses / when the handler indicates that it needs
42//! more, 4.01 Echo? (The handler may be unhappy that it gets dropped; -handlers may need
43//! guidance on this)
44//!
45//! * This server does not uphold NSTART and PROBING_RATE, the fundamental flow control parameters
46//! of CoAP that make it OK for generic Internet applications.
47//!
48//! This will be addressed in a future version.
49//!
50//! FIXME pass in a time source
51//!
52//! * The server does not perform any message deduplication. All handler functions must therefore
53//! be idempotent.
54//!
55//! * Messages are created with as little copying as [embedded_nal] permits. For writable messages,
56//! that means that they need to be written to in ascending CoAP option number. This is in
57//! accordance with the implemented [coap_message::MinimalWritableMessage] and
58//! [coap_message::MutableWritableMessage] traits.
59//!
60//! That restriction enables this crate to not only be `no_std`, but to not require `alloc`
61//! either.
62//!
63//! Choices
64//! -------
65//!
66//! This implementation of CoAP chooses to go with a single task, thus only ever allocating a
67//! single buffer as part of the task. There are certainly alternative choices to be made here,
68//! which may either be implemented in a different crate or altered later (for example, if it turns
69//! out that a more effective implementation uses different tasks for send and receive, but uses a
70//! single buffer or an at-least-1-sized pool that gets locked by the tasks).
71#![no_std]
72
73mod udp_format;
74mod udp_read;
75mod udp_write;
76
77use core::net::{IpAddr, Ipv6Addr, SocketAddr};
78
79use udp_format::Type::{self, *};
80use udp_read::{ParseError::*, ParsedMessage};
81use udp_write::{finish_outgoing, MessageWritten, OutgoingRequestMessage, OutgoingResponseMessage};
82
83use coap_message::ReadableMessage;
84
85/// Maximum size of a CoAP message we need to expect
86///
87/// Also used in creating an output buffer as it's allocated the same way anyway.
88const MAX_SIZE: usize = 1152;
89
90// FIXME make configurable
91const TKL_MAX: usize = 8;
92
93// If we went with a second buffer to be able to send 5.03s early on, that'd need to be sized to
94// the maximum of token length we support
95
96/// Ways to reply to a message
97enum Extracted<R> {
98 Silent,
99 JustRst,
100 ProcessResponseThenAck(usize),
101 ProcessResponseThenSilent(usize),
102 ReactTo {
103 msgtype: Type,
104 token: heapless::Vec<u8, TKL_MAX>,
105 extracted: R,
106 },
107}
108use Extracted::*;
109
110/// Properties by which the state associated with a pending outgoing request is mached
111// Copy and Clone are mainly there because this makes it easier to pass this around while going
112// through the state machine of RequestState
113#[derive(Copy, Clone, Debug)]
114struct RequestMatchingDetails {
115 // If we, as an implementation detail, decide never to have pending requests to different
116 // remotes, may we just elide this?
117 remote: SocketAddr,
118 mid: u16,
119 token: u16,
120}
121
122/// State in which any of the slots used for outoging requests can be
123// Copy and Clone are mainly there because this makes it easier to pass this around while going
124// through the state machine
125#[derive(Copy, Clone, Debug)]
126enum RequestState {
127 /// Slot is unused
128 Empty,
129 /// A client application component places this when it wants to send a request
130 ///
131 /// FIXME: Would it make sense to also have a state where an ExpectMore can be turned into a
132 /// Pending, eg. to actively cancel an observation? (If RequestMatchingDetails chose not to
133 /// carry the remote, it'd need to be provided anew, with erroneous input from the application
134 /// just resulting in weird messages that are sent)
135 AllocateRequest { remote: SocketAddr },
136 /// The loop allows this application component to write the buffer, and awaits signalling
137 /// through `.app_to_loop`.
138 FillBuffer(RequestMatchingDetails),
139 /// The application component is done filling the buffer
140 BufferFilled {
141 details: RequestMatchingDetails,
142 /// Output of build_outgoing
143 written: MessageWritten,
144 },
145
146 /// State while the loop is awaiting a response
147 RequestSent(RequestMatchingDetails),
148
149 /// This is the loop sets once a response has arrived. Once the client is done, it needs to
150 /// signal through `.app_to_loop`.
151 // len? Or indices? (see also `.request()` comments)
152 ResponsePending {
153 details: RequestMatchingDetails,
154 len: usize,
155 },
156 // After that, the client either sets Empty or RequestSent.
157}
158use RequestState::*;
159
160/// The shared state of a CoAP main loop, which would typically run as a server and client
161/// simultaneously.
162///
163/// This is usually created once and then [`.split()`] up into a client and a server part.
164///
165/// Relevant generics:
166/// * `CONCURRENT_REQUESTS` indicates how many requests may be outstanding simultaneously on the
167/// client side. Note that this distinct from `NSTART` in two ways: `NSTART` is counted per peer
168/// (this is total), and is decremented as soon as the request is ACK'ed (this is until the
169/// (final) response has been processed).
170pub struct CoAPShared<const CONCURRENT_REQUESTS: usize> {
171 // This is held by run (even, even mainly, particular across await points) except while it is
172 // asking a specific request to act on it.
173 buffer: core::cell::RefCell<[u8; MAX_SIZE]>,
174
175 // This can be taken by anybody currently executing on our loop, and is not held across await
176 // points.
177 requests: core::cell::RefCell<[RequestState; CONCURRENT_REQUESTS]>,
178 // After anything but run() updates requests, it signals updated_request so that run can take
179 // any signals out of there and let the pending clients do their work.
180
181 // FIXME is NoopRawLoop safe in the presence of any executor?
182 app_to_loop: embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>,
183 // When run() needs anything from the client application, it pings it through this (FIXME:
184 // There has to be a better way than CONCURRENT_REQUESTS signals)
185 loop_to_app: [embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>;
186 CONCURRENT_REQUESTS],
187 // Signalled when the loop sets any slot to Empty. This is a stopgap measure against failing
188 // when more requests than CONCURRENT_REQUESTS should be sent. Should be avoided, though, as
189 // it's not a queue, and signaling it will cause something like the stampeding herd problem.
190 //
191 // We could signal the slot number here, but frankly, that wouldn't simplify code, and
192 // efficiency is out in this case already.
193 request_became_empty:
194 embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>,
195}
196
197/// Access to the client side of a [CoAPShared]
198///
199/// From this, requests can be be started through the [`.to()`] method.
200///
201/// This is the "runtime" variant because it can be copied around arbitrarily, and requests made
202/// through it take any available of the `CONCURRENT_REQUESTS` slots.
203///
204/// Note that requests sent through this only make progress while the socket part of it is
205/// `[.run()]` (and being awaited).
206#[derive(Copy, Clone)]
207pub struct CoAPRuntimeClient<'a, const CONCURRENT_REQUESTS: usize> {
208 shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
209}
210
211/// Access to the server side of a [CoAPShared]
212///
213/// This needs to be [`.run()`] both in order to serve peers and to make progress on any
214/// requests issued through the CoAPRuntimeClient on the same Shared.
215pub struct CoAPRunner<'a, const CONCURRENT_REQUESTS: usize> {
216 shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
217}
218
219/// Parts of the `.run()` loop that are exclusively held
220/// Relevant generics:
221/// * `Socket` indicates which kind of UDP socket this will run on
222/// * `Handler` is the CoAP request handler type -- the application's implementation of a CoAP
223/// server
224struct RunParts<'a, Socket, Handler, RandomSource, const CONCURRENT_REQUESTS: usize>
225where
226 Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
227 Handler: coap_handler::Handler,
228 RandomSource: rand_core::RngCore,
229{
230 shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
231
232 socket: &'a mut Socket,
233 handler: &'a mut Handler,
234
235 rand: &'a mut RandomSource,
236
237 // FIXME: There is time stuff involved actually -- not sending 64k messages within short times,
238 // and outbound rate limiting. This probably becomes trivial when applying NSTART and
239 // PROBING_RATE, and anything that has sub-ms RTTs is probably special enough.
240 // FIXME: While they were still locals of the loop() function we could imagine they wouldn't
241 // even be used when there are no ways to get requests; now, should we do away with their
242 // storage when not sending as a client?
243 next_mid: u16,
244 next_token: u16,
245}
246
247impl<const CONCURRENT_REQUESTS: usize> CoAPShared<CONCURRENT_REQUESTS> {
248 pub fn new() -> Self {
249 // weird array initialization but OK
250 const EMPTY_SIGNAL: embassy_sync::signal::Signal<
251 embassy_sync::blocking_mutex::raw::NoopRawMutex,
252 (),
253 > = embassy_sync::signal::Signal::new();
254 Self {
255 buffer: [0; MAX_SIZE].into(),
256 requests: [Empty; CONCURRENT_REQUESTS].into(),
257 app_to_loop: Default::default(),
258 loop_to_app: [EMPTY_SIGNAL; CONCURRENT_REQUESTS].into(),
259 request_became_empty: Default::default(),
260 }
261 }
262
263 /// Split a CoAPShared into a client and a server/runner part.
264 ///
265 /// While technically both of them are just thin wrappers around a shared reference, this split
266 /// ensures that the otherwise-panicking constraints about who grabs which mutices when are
267 /// upheld.
268 pub fn split<'a>(
269 &'a self,
270 ) -> (
271 CoAPRuntimeClient<'a, CONCURRENT_REQUESTS>,
272 CoAPRunner<'a, CONCURRENT_REQUESTS>,
273 ) {
274 (
275 CoAPRuntimeClient { shared: self },
276 CoAPRunner { shared: self },
277 )
278 }
279}
280
281impl<'a, const CONCURRENT_REQUESTS: usize> CoAPRunner<'a, CONCURRENT_REQUESTS> {
282 /// Service the socket, sending any incoming requests to the handler, simultaneously taking
283 /// care to interact with the structures shared with the client parts to send out the requests
284 /// and file back the responses.
285 ///
286 /// If any error occurs on the socket, this terminates.
287 pub async fn run<Socket, Handler, RandomSource>(
288 &self,
289 socket: &mut Socket,
290 handler: &mut Handler,
291 rand: &mut RandomSource,
292 ) -> Result<(), Socket::Error>
293 where
294 Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
295 Handler: coap_handler::Handler,
296 RandomSource: rand_core::RngCore,
297 {
298 let start_values = rand.next_u32();
299 let next_mid = (start_values >> 16) as u16;
300 let next_token = start_values as u16;
301
302 let mut run_parts = RunParts {
303 shared: self.shared,
304 socket,
305 handler,
306 next_mid,
307 next_token,
308 rand,
309 };
310
311 run_parts.run().await
312 }
313}
314
315impl<'a, const CONCURRENT_REQUESTS: usize> CoAPRuntimeClient<'a, CONCURRENT_REQUESTS> {
316 /// Set up a request to a particular network peer
317 ///
318 /// This starts off a builder that (as of now) is immediately usable as a
319 /// [coap_request::Stack]; on the long run, this may gain extra builder steps (such as a
320 /// `.reliably()` or `.unreliably()`, or `.longrunning()` to tap into a larger token space), or
321 /// more diverse `.to_...()` methods for better control.
322 ///
323 /// FIXME: Currently, requests set up this way are sent as CONs but without retransmission.
324 pub fn to(&self, address: SocketAddr) -> RequestingCoAPClient<'_, CONCURRENT_REQUESTS> {
325 RequestingCoAPClient {
326 shared: self.shared,
327 address,
328 }
329 }
330}
331
332/// The actual [coap_request::Stack] implementation derived from a [CoAPShared] by putting in an
333/// address through `.to()`.
334pub struct RequestingCoAPClient<'a, const CONCURRENT_REQUESTS: usize> {
335 address: SocketAddr,
336 shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
337}
338
339#[derive(Debug)]
340#[cfg_attr(feature = "defmt", derive(defmt::Format))]
341#[non_exhaustive]
342pub enum TransportError {
343 /// Writing to the request buffer failed
344 ///
345 /// This can be justified a transport error -- for example, writing too large a payload just
346 /// doesn't work over UDP (but may work over TCP).
347 CouldNotWrite(coap_message_implementations::inmemory_write::WriteError),
348 /// An RST was sent in response to the request message
349 GotRst,
350}
351
352impl<'a, const CONCURRENT_REQUESTS: usize> coap_request::Stack
353 for RequestingCoAPClient<'a, CONCURRENT_REQUESTS>
354{
355 type RequestUnionError = coap_message_implementations::inmemory_write::WriteError;
356 type RequestMessage<'b> = coap_message_implementations::inmemory_write::Message<'b> where Self: 'b;
357 type ResponseMessage<'b> = coap_message_implementations::inmemory::Message<'b> where Self: 'b;
358 type TransportError = TransportError;
359
360 async fn request<Req: coap_request::Request<Self>>(
361 &mut self,
362 mut request: Req,
363 ) -> Result<Req::Output, TransportError> {
364 /// The clean-up code of this function lives in this guard's Drop. It ensures that even if
365 /// the request is dropped prematurely, the runner will not try to wake up a task that's
366 /// not expecting responses any more anyway, and that the slot does not get leaked.
367 struct DropGuard<'a, const CONCURRENT_REQUESTS: usize> {
368 shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
369 slot: usize,
370 }
371 impl<'a, const CONCURRENT_REQUESTS: usize> Drop for DropGuard<'a, CONCURRENT_REQUESTS> {
372 fn drop(&mut self) {
373 let mut requests = self.shared.requests.borrow_mut();
374 requests[self.slot] = Empty;
375 drop(requests);
376 self.shared.request_became_empty.signal(());
377 self.shared.app_to_loop.signal(());
378 }
379 }
380
381 let slot;
382 'found: loop {
383 let mut requests = self.shared.requests.borrow_mut();
384 for (i, request) in requests.iter_mut().enumerate() {
385 if matches!(*request, Empty) {
386 *request = AllocateRequest {
387 remote: self.address,
388 };
389 drop(requests);
390 self.shared.app_to_loop.signal(());
391 slot = i;
392 break 'found;
393 }
394 }
395 drop(requests);
396 self.shared.request_became_empty.wait().await;
397 }
398
399 // We're in control of that slot, but if our execution does get dropped, let's not block
400 // that one forever.
401 let make_empty = DropGuard {
402 shared: self.shared,
403 slot,
404 };
405
406 self.shared.loop_to_app[slot].wait().await;
407 // We're blocking the CoAP thread now. While we're allowed to keep the buffer across that,
408 // we can't keep requests across await points (other clients may want to enqueue their
409 // requests there), but we don't need to leave requests[slot] in any sensible state because
410 // we're "owning" that now.
411
412 let mut buf = self.shared.buffer.borrow_mut();
413
414 let requests = self.shared.requests.borrow();
415 let FillBuffer(details) = requests[slot] else {
416 panic!(
417 "Lockstep violation: Runner signalled us even though it did not give us the buffer"
418 );
419 };
420 drop(requests);
421
422 let mut outgoing = OutgoingRequestMessage::new(buf.as_mut());
423 let carry = request
424 .build_request(outgoing.message())
425 .await
426 .map_err(TransportError::CouldNotWrite)?;
427 let written = outgoing.done();
428
429 let mut requests = self.shared.requests.borrow_mut();
430 requests[slot] = BufferFilled { details, written };
431 drop(requests);
432
433 drop(buf);
434
435 // Not blocking any more after this
436 self.shared.app_to_loop.signal(());
437
438 self.shared.loop_to_app[slot].wait().await;
439 // Once more, we have exclusive control of the buffer while we're reading, and we block the
440 // runner; as before, requests are not to be held across await.
441
442 let buf = self.shared.buffer.borrow();
443
444 let requests = self.shared.requests.borrow();
445 let ResponsePending { len, .. } = requests[slot] else {
446 panic!("Lockstep violation: Runner signalled us even though no response is pending");
447 };
448 drop(requests);
449
450 // FIXME: There is overhead in running this twice. Should we pass indices? Or just parse
451 // result that incorporates an AsRef<[u8]> that is the locked buffer?
452 let Ok(ParsedMessage {
453 message, msgtype, ..
454 }) = udp_read::parse::<TKL_MAX>(&buf[..len])
455 else {
456 unreachable!("Message was parsed once already");
457 };
458
459 if msgtype == RST {
460 return Err(TransportError::GotRst);
461 }
462
463 let response = request.process_response(&message, carry).await;
464
465 drop(buf);
466
467 // Run cleanup-up code, setting requests[slot] to Empty and signalling the runner that it
468 // owns the buffer again.
469 drop(make_empty);
470
471 Ok(response)
472 }
473}
474
475impl<'a, Socket, Handler, RandomSource, const CONCURRENT_REQUESTS: usize>
476 RunParts<'a, Socket, Handler, RandomSource, CONCURRENT_REQUESTS>
477where
478 Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
479 Handler: coap_handler::Handler,
480 RandomSource: rand_core::RngCore,
481{
482 async fn run(&mut self) -> Result<(), Socket::Error> {
483 loop {
484 let mut buf = self.shared.buffer.borrow_mut();
485 // Receive step
486 match embassy_futures::select::select(
487 self.shared.app_to_loop.wait(),
488 self.socket.receive_into(&mut *buf),
489 )
490 .await
491 {
492 // app_to_loop signalled
493 embassy_futures::select::Either::First(_) => {
494 // Nothing was written in there, but the application will need to write here
495 // soon
496 drop(buf);
497
498 self.process_app_to_loop().await?;
499 }
500 // Message was received
501 embassy_futures::select::Either::Second(receive_result) => {
502 let (len, local, remote) = receive_result?;
503
504 self.process_incoming(buf, len, local, remote).await?;
505 }
506 };
507 }
508 }
509
510 /// Build the request by signalling and awaiting the sending task, and send it
511 async fn send_request(&mut self, slot: usize) -> Result<(), Socket::Error> {
512 self.shared.loop_to_app[slot].signal(());
513 // Across here we should only have the slot number as a local variable
514 self.shared.app_to_loop.wait().await;
515 // FIXME: Actually, there could be any reason, plus we'd really need to clear: So
516 // we should loop, clearing the signal, while our buffer is not BufferFilled, and
517 // then at any rate signal again so that we don't lose signals.
518
519 let mut requests = self.shared.requests.borrow_mut();
520 let BufferFilled { details, written } = &requests[slot] else {
521 // FIXME: allow it to declare that it has changed its mind
522 panic!("Sequence violation by client");
523 };
524
525 let mut buf = self.shared.buffer.borrow_mut();
526 let len = finish_outgoing(buf.as_mut(), true, details.mid, details.token, *written);
527
528 // FIXME can this be any-protocol?
529 const SOCK_UNSPEC: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
530 // FIXME: Should we allow the requester to do follow-ups from the same address, or
531 // will they just not see the change and suffer breakage mid-blockwise?
532 let local = SOCK_UNSPEC;
533 let remote = details.remote;
534
535 let new_state = RequestSent(*details);
536 requests[slot] = new_state;
537 drop(requests);
538
539 self.socket.send(local, remote, &buf[..len]).await?;
540
541 Ok(())
542 }
543
544 async fn process_app_to_loop(&mut self) -> Result<(), Socket::Error> {
545 // We now have to process *any* slots that are due for us to handle, lest we may lose
546 // information that was in the `app_to_loop`. It suffices to process any slot once, for
547 // even if this is later changed to allow running on multiple threads, the slot will only
548 // enter an application processable state after app_to_loop has been cleared, and the new
549 // signal will be pending on the next run of the main loop.
550
551 for slot in 0..CONCURRENT_REQUESTS {
552 // At least as long as this is based on a Mutex that is not Send, there should be no
553 // need for any optimizations that avoid the drop-and-borrow across the loop (for the
554 // compiler can do them).
555 let mut requests = self.shared.requests.borrow_mut();
556
557 // That's the only relevant state to consider here. The other states an app can state and
558 // then signal (BufferFilled, and the RequestSent or Empty after reading a response) are
559 // processed at different places (down here, and in the receive position, respectively).
560 if let AllocateRequest { remote } = &mut requests[slot] {
561 self.next_mid += 1;
562 self.next_token += 1;
563 requests[slot] = FillBuffer(RequestMatchingDetails {
564 mid: self.next_mid,
565 token: self.next_token,
566 remote: *remote,
567 });
568 drop(requests);
569
570 self.send_request(slot).await?;
571 }
572 }
573 Ok(())
574 }
575
576 /// A message has arrived.
577 ///
578 /// Process it as a request or response, sending any response, RST or ACK as needed.
579 async fn process_incoming(
580 &mut self,
581 buf: core::cell::RefMut<'_, [u8; MAX_SIZE]>,
582 len: usize,
583 local: SocketAddr,
584 remote: SocketAddr,
585 ) -> Result<(), Socket::Error> {
586 let action = match udp_read::parse::<TKL_MAX>(&buf[..len]) {
587 Err(MessageTooShort | WrongVersion) => Silent,
588 Err(TokenTooLong { msgtype: CON }) => JustRst,
589 Err(TokenTooLong { msgtype: _ }) => Silent,
590 Ok(ParsedMessage {
591 msgtype,
592 msgid,
593 token,
594 message,
595 }) => {
596 match coap_numbers::code::classify(message.code()) {
597 coap_numbers::code::Range::Response(_) | coap_numbers::code::Range::Empty => {
598 // FIXME: If the response is coming in on a RST, we should probably not
599 // even bother the message handler. (Well at least it should cancel
600 // retransmissions, so it probably *should* go in there).
601
602 match (msgtype, self.match_to_slot(msgtype, msgid, &token)) {
603 (CON, Some(slot)) => ProcessResponseThenAck(slot),
604 (CON, None) => JustRst,
605 (_, Some(slot)) => ProcessResponseThenSilent(slot),
606 (_, None) => Silent,
607 }
608 }
609 coap_numbers::code::Range::Request => {
610 let action = if msgtype == ACK || msgtype == RST {
611 // These should never be responses; ignoring them as protocol errors.
612 Silent
613 } else {
614 ReactTo {
615 msgtype,
616 token: token.try_into().expect("TKL was suitably constrained"),
617 extracted: self.handler.extract_request_data(&message),
618 }
619 };
620 action
621 }
622 _ => Silent,
623 }
624 }
625 };
626
627 let mut buf = match &action {
628 ProcessResponseThenAck(slot) | ProcessResponseThenSilent(slot) => {
629 drop(buf);
630 self.handle_response(*slot, len).await;
631 self.shared.buffer.borrow_mut()
632 }
633 _ => buf,
634 };
635
636 // Send step
637 let written = match action {
638 ReactTo {
639 msgtype,
640 token,
641 extracted,
642 } => {
643 let responsetype = match msgtype {
644 CON => ACK,
645 _ => NON,
646 };
647
648 // Note that we never really use token, but that may change when we start async'ing
649 // the server side (FIXME: does it pay to reduce token to its tkl parts if all we
650 // do is go back later anyway?)
651 let mut outgoing = OutgoingResponseMessage::new::<TKL_MAX>(
652 buf.as_mut(),
653 responsetype,
654 token.len(),
655 );
656
657 // FIXME amplification mitigation
658
659 use coap_message::error::RenderableOnMinimal;
660 use coap_message::MinimalWritableMessage;
661 let outmsg = outgoing.message();
662 match extracted {
663 Ok(extracted) => {
664 let rendered = self.handler.build_response(outmsg, extracted);
665 if let Err(e) = rendered {
666 // Response building errors get two chances to render
667 outmsg.reset();
668 if let Err(e2) = e.render(outmsg) {
669 outmsg.reset();
670 if let Err(_) = e2.render(outmsg) {
671 outmsg.reset();
672 outmsg.set_code(coap_numbers::code::INTERNAL_SERVER_ERROR);
673 }
674 }
675 }
676 }
677 Err(e) => {
678 // Extraction time errors get two chances to render
679 if let Err(e2) = e.render(outmsg) {
680 outmsg.reset();
681 if let Err(_) = e2.render(outmsg) {
682 outmsg.reset();
683 outmsg.set_code(coap_numbers::code::INTERNAL_SERVER_ERROR);
684 }
685 }
686 }
687 }
688 outgoing.done()
689 }
690 JustRst => OutgoingResponseMessage::empty(buf.as_mut(), RST),
691 ProcessResponseThenAck(_) => OutgoingResponseMessage::empty(buf.as_mut(), ACK),
692 Silent | ProcessResponseThenSilent(_) => return Ok(()),
693 };
694
695 self.socket.send(local, remote, &buf[..written]).await?;
696
697 Ok(())
698 }
699
700 fn match_to_slot(&mut self, msgtype: Type, mid: u16, token: &[u8]) -> Option<usize> {
701 // FIXME Should we check for the address as well? That'd require reliable determination of
702 // being multicast.
703
704 // If it's not 2-byte we didn't send it.
705 let token = u16::from_be_bytes(token[..].try_into().ok()?);
706
707 let requests = self.shared.requests.borrow();
708 for (slot, r) in requests.iter().enumerate() {
709 if let RequestSent(details) = r {
710 // This includes piggy-backed ACKs w/o token, and RSTs, neither of which contain a
711 // message, but at least the latter is relevant to requests. Have to filter out
712 // NONs and CONs because they're from the sender's space, and could match randomly.
713 if (msgtype == ACK || msgtype == RST) && details.mid == mid {
714 return Some(slot);
715 }
716 // We're only falling through here when receiving NON responses, or CON responses.
717 if details.token == token {
718 // If it's an ACK or RST in here, something went seriously wrong in the peer.
719 return Some(slot);
720 }
721 }
722 }
723 None
724 }
725
726 /// Returns true if the response was expected
727 async fn handle_response(&mut self, slot: usize, len: usize) {
728 let mut requests = self.shared.requests.borrow_mut();
729 let RequestSent(details) = requests[slot] else {
730 unreachable!();
731 };
732 requests[slot] = ResponsePending { details, len };
733 drop(requests);
734
735 self.shared.loop_to_app[slot].signal(());
736 loop {
737 // Across here we should only have the slot number as a local variable
738 self.shared.app_to_loop.wait().await;
739 let requests = self.shared.requests.borrow();
740 if let ResponsePending { .. } = requests[slot] {
741 // Something else sent a signal
742 continue;
743 } else {
744 // Not caring for the moment if it was Empty or RequestSent -- the latter would be
745 // useful for observations and other non-traditional responses, but our client
746 // doesn't do that yet.
747 break;
748 }
749 }
750 // Signaling self no matter whether ResponsePending was found inbetween or not -- even if,
751 // the app_to_loop.await() we just did could have masked another client tasks's request to
752 // allocate something, and signalling self is easier than setting up a flag and an extra
753 // code path.
754 self.shared.app_to_loop.signal(());
755 }
756}