embedded_nal_coap/
lib.rs

1//! A CoAP server and client implementation built on [embedded_nal_async].
2//!
3//! Usage and operation
4//! -------------------
5//!
6//! An example of how to use this will be available as part of the [coap-message-demos] crate.
7//!
8//! * Allocate a [CoAPShared] with a `CONCURRENT_REQUESTS` const of the number of outgoing requests
9//!   that should be servicable simultaneously.
10//!
11//! * [CoAPShared::split()] that into a client and a server part.
12//!
13//! * Use the client's [CoAPRuntimeClient::to()] method to create a [coap_request::Stack] that can
14//!   be used to send CoAP requests and receive a response. (Multiple responses, eg. from
15//!   observation, are planned but not implemented yet).
16//!
17//!   The [coap_request_implementations] crate contains suitable (albeit immature) building blocks
18//!   for constructing requests.
19//!
20//! * Into the server's [CoAPRunner::run()] method, pass an unconnected [UDP
21//!   socket](embedded_nal_async::UnconnectedUdp), a source of low-grade entropy (for
22//!   retransmission jitter and that like) and a [CoAP server application](coap_handler::Handler).
23//!
24//!   The [coap_handler_implementations] crate contains suitable building blocks for constructing
25//!   such a server application (including some to combine handlers for individual resources into a
26//!   handler that picks sub-handlers from the URI path).
27//!
28//!   The future returned by the run function needs to be polled by an executor; note that it is
29//!   not Send, and some executors need configuration to allow that.
30//!
31//!
32//! Caveats
33//! -------
34//!
35//! * The server does not perform any amplification mitigation (and the handler can't for lack of
36//!   remote information); use this only in environments where this is acceptable (e.g. in closed
37//!   networks).
38//!
39//!   This will be mitigated in a future version.
40//!
41//!   FIXME only provide the 3x buffer for responses / when the handler indicates that it needs
42//!   more, 4.01 Echo? (The handler may be unhappy that it gets dropped; -handlers may need
43//!   guidance on this)
44//!
45//! * This server does not uphold NSTART and PROBING_RATE, the fundamental flow control parameters
46//!   of CoAP that make it OK for generic Internet applications.
47//!
48//!   This will be addressed in a future version.
49//!
50//!   FIXME pass in a time source
51//!
52//! * The server does not perform any message deduplication. All handler functions must therefore
53//!   be idempotent.
54//!
55//! * Messages are created with as little copying as [embedded_nal] permits. For writable messages,
56//!   that means that they need to be written to in ascending CoAP option number. This is in
57//!   accordance with the implemented [coap_message::MinimalWritableMessage] and
58//!   [coap_message::MutableWritableMessage] traits.
59//!
60//!   That restriction enables this crate to not only be `no_std`, but to not require `alloc`
61//!   either.
62//!
63//! Choices
64//! -------
65//!
66//! This implementation of CoAP chooses to go with a single task, thus only ever allocating a
67//! single buffer as part of the task. There are certainly alternative choices to be made here,
68//! which may either be implemented in a different crate or altered later (for example, if it turns
69//! out that a more effective implementation uses different tasks for send and receive, but uses a
70//! single buffer or an at-least-1-sized pool that gets locked by the tasks).
71#![no_std]
72
73mod udp_format;
74mod udp_read;
75mod udp_write;
76
77use core::net::{IpAddr, Ipv6Addr, SocketAddr};
78
79use udp_format::Type::{self, *};
80use udp_read::{ParseError::*, ParsedMessage};
81use udp_write::{finish_outgoing, MessageWritten, OutgoingRequestMessage, OutgoingResponseMessage};
82
83use coap_message::ReadableMessage;
84
85/// Maximum size of a CoAP message we need to expect
86///
87/// Also used in creating an output buffer as it's allocated the same way anyway.
88const MAX_SIZE: usize = 1152;
89
90// FIXME make configurable
91const TKL_MAX: usize = 8;
92
93// If we went with a second buffer to be able to send 5.03s early on, that'd need to be sized to
94// the maximum of token length we support
95
96/// Ways to reply to a message
97enum Extracted<R> {
98    Silent,
99    JustRst,
100    ProcessResponseThenAck(usize),
101    ProcessResponseThenSilent(usize),
102    ReactTo {
103        msgtype: Type,
104        token: heapless::Vec<u8, TKL_MAX>,
105        extracted: R,
106    },
107}
108use Extracted::*;
109
110/// Properties by which the state associated with a pending outgoing request is mached
111// Copy and Clone are mainly there because this makes it easier to pass this around while going
112// through the state machine of RequestState
113#[derive(Copy, Clone, Debug)]
114struct RequestMatchingDetails {
115    // If we, as an implementation detail, decide never to have pending requests to different
116    // remotes, may we just elide this?
117    remote: SocketAddr,
118    mid: u16,
119    token: u16,
120}
121
122/// State in which any of the slots used for outoging requests can be
123// Copy and Clone are mainly there because this makes it easier to pass this around while going
124// through the state machine
125#[derive(Copy, Clone, Debug)]
126enum RequestState {
127    /// Slot is unused
128    Empty,
129    /// A client application component places this when it wants to send a request
130    ///
131    /// FIXME: Would it make sense to also have a state where an ExpectMore can be turned into a
132    /// Pending, eg. to actively cancel an observation? (If RequestMatchingDetails chose not to
133    /// carry the remote, it'd need to be provided anew, with erroneous input from the application
134    /// just resulting in weird messages that are sent)
135    AllocateRequest { remote: SocketAddr },
136    /// The loop allows this application component to write the buffer, and awaits signalling
137    /// through `.app_to_loop`.
138    FillBuffer(RequestMatchingDetails),
139    /// The application component is done filling the buffer
140    BufferFilled {
141        details: RequestMatchingDetails,
142        /// Output of build_outgoing
143        written: MessageWritten,
144    },
145
146    /// State while the loop is awaiting a response
147    RequestSent(RequestMatchingDetails),
148
149    /// This is the loop sets once a response has arrived. Once the client is done, it needs to
150    /// signal through `.app_to_loop`.
151    // len? Or indices? (see also `.request()` comments)
152    ResponsePending {
153        details: RequestMatchingDetails,
154        len: usize,
155    },
156    // After that, the client either sets Empty or RequestSent.
157}
158use RequestState::*;
159
160/// The shared state of a CoAP main loop, which would typically run as a server and client
161/// simultaneously.
162///
163/// This is usually created once and then [`.split()`] up into a client and a server part.
164///
165/// Relevant generics:
166/// * `CONCURRENT_REQUESTS` indicates how many requests may be outstanding simultaneously on the
167///   client side. Note that this distinct from `NSTART` in two ways: `NSTART` is counted per peer
168///   (this is total), and is decremented as soon as the request is ACK'ed (this is until the
169///   (final) response has been processed).
170pub struct CoAPShared<const CONCURRENT_REQUESTS: usize> {
171    // This is held by run (even, even mainly, particular across await points) except while it is
172    // asking a specific request to act on it.
173    buffer: core::cell::RefCell<[u8; MAX_SIZE]>,
174
175    // This can be taken by anybody currently executing on our loop, and is not held across await
176    // points.
177    requests: core::cell::RefCell<[RequestState; CONCURRENT_REQUESTS]>,
178    // After anything but run() updates requests, it signals updated_request so that run can take
179    // any signals out of there and let the pending clients do their work.
180
181    // FIXME is NoopRawLoop safe in the presence of any executor?
182    app_to_loop: embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>,
183    // When run() needs anything from the client application, it pings it through this (FIXME:
184    // There has to be a better way than CONCURRENT_REQUESTS signals)
185    loop_to_app: [embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>;
186        CONCURRENT_REQUESTS],
187    // Signalled when the loop sets any slot to Empty. This is a stopgap measure against failing
188    // when more requests than CONCURRENT_REQUESTS should be sent. Should be avoided, though, as
189    // it's not a queue, and signaling it will cause something like the stampeding herd problem.
190    //
191    // We could signal the slot number here, but frankly, that wouldn't simplify code, and
192    // efficiency is out in this case already.
193    request_became_empty:
194        embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>,
195}
196
197/// Access to the client side of a [CoAPShared]
198///
199/// From this, requests can be be started through the [`.to()`] method.
200///
201/// This is the "runtime" variant because it can be copied around arbitrarily, and requests made
202/// through it take any available of the `CONCURRENT_REQUESTS` slots.
203///
204/// Note that requests sent through this only make progress while the socket part of it is
205/// `[.run()]` (and being awaited).
206#[derive(Copy, Clone)]
207pub struct CoAPRuntimeClient<'a, const CONCURRENT_REQUESTS: usize> {
208    shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
209}
210
211/// Access to the server side of a [CoAPShared]
212///
213/// This needs to be [`.run()`] both in order to serve peers and to make progress on any
214/// requests issued through the CoAPRuntimeClient on the same Shared.
215pub struct CoAPRunner<'a, const CONCURRENT_REQUESTS: usize> {
216    shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
217}
218
219/// Parts of the `.run()` loop that are exclusively held
220/// Relevant generics:
221/// * `Socket` indicates which kind of UDP socket this will run on
222/// * `Handler` is the CoAP request handler type -- the application's implementation of a CoAP
223///   server
224struct RunParts<'a, Socket, Handler, RandomSource, const CONCURRENT_REQUESTS: usize>
225where
226    Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
227    Handler: coap_handler::Handler,
228    RandomSource: rand_core::RngCore,
229{
230    shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
231
232    socket: &'a mut Socket,
233    handler: &'a mut Handler,
234
235    rand: &'a mut RandomSource,
236
237    // FIXME: There is time stuff involved actually -- not sending 64k messages within short times,
238    // and outbound rate limiting. This probably becomes trivial when applying NSTART and
239    // PROBING_RATE, and anything that has sub-ms RTTs is probably special enough.
240    // FIXME: While they were still locals of the loop() function we could imagine they wouldn't
241    // even be used when there are no ways to get requests; now, should we do away with their
242    // storage when not sending as a client?
243    next_mid: u16,
244    next_token: u16,
245}
246
247impl<const CONCURRENT_REQUESTS: usize> CoAPShared<CONCURRENT_REQUESTS> {
248    pub fn new() -> Self {
249        // weird array initialization but OK
250        const EMPTY_SIGNAL: embassy_sync::signal::Signal<
251            embassy_sync::blocking_mutex::raw::NoopRawMutex,
252            (),
253        > = embassy_sync::signal::Signal::new();
254        Self {
255            buffer: [0; MAX_SIZE].into(),
256            requests: [Empty; CONCURRENT_REQUESTS].into(),
257            app_to_loop: Default::default(),
258            loop_to_app: [EMPTY_SIGNAL; CONCURRENT_REQUESTS].into(),
259            request_became_empty: Default::default(),
260        }
261    }
262
263    /// Split a CoAPShared into a client and a server/runner part.
264    ///
265    /// While technically both of them are just thin wrappers around a shared reference, this split
266    /// ensures that the otherwise-panicking constraints about who grabs which mutices when are
267    /// upheld.
268    pub fn split<'a>(
269        &'a self,
270    ) -> (
271        CoAPRuntimeClient<'a, CONCURRENT_REQUESTS>,
272        CoAPRunner<'a, CONCURRENT_REQUESTS>,
273    ) {
274        (
275            CoAPRuntimeClient { shared: self },
276            CoAPRunner { shared: self },
277        )
278    }
279}
280
281impl<'a, const CONCURRENT_REQUESTS: usize> CoAPRunner<'a, CONCURRENT_REQUESTS> {
282    /// Service the socket, sending any incoming requests to the handler, simultaneously taking
283    /// care to interact with the structures shared with the client parts to send out the requests
284    /// and file back the responses.
285    ///
286    /// If any error occurs on the socket, this terminates.
287    pub async fn run<Socket, Handler, RandomSource>(
288        &self,
289        socket: &mut Socket,
290        handler: &mut Handler,
291        rand: &mut RandomSource,
292    ) -> Result<(), Socket::Error>
293    where
294        Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
295        Handler: coap_handler::Handler,
296        RandomSource: rand_core::RngCore,
297    {
298        let start_values = rand.next_u32();
299        let next_mid = (start_values >> 16) as u16;
300        let next_token = start_values as u16;
301
302        let mut run_parts = RunParts {
303            shared: self.shared,
304            socket,
305            handler,
306            next_mid,
307            next_token,
308            rand,
309        };
310
311        run_parts.run().await
312    }
313}
314
315impl<'a, const CONCURRENT_REQUESTS: usize> CoAPRuntimeClient<'a, CONCURRENT_REQUESTS> {
316    /// Set up a request to a particular network peer
317    ///
318    /// This starts off a builder that (as of now) is immediately usable as a
319    /// [coap_request::Stack]; on the long run, this may gain extra builder steps (such as a
320    /// `.reliably()` or `.unreliably()`, or `.longrunning()` to tap into a larger token space), or
321    /// more diverse `.to_...()` methods for better control.
322    ///
323    /// FIXME: Currently, requests set up this way are sent as CONs but without retransmission.
324    pub fn to(&self, address: SocketAddr) -> RequestingCoAPClient<'_, CONCURRENT_REQUESTS> {
325        RequestingCoAPClient {
326            shared: self.shared,
327            address,
328        }
329    }
330}
331
332/// The actual [coap_request::Stack] implementation derived from a [CoAPShared] by putting in an
333/// address through `.to()`.
334pub struct RequestingCoAPClient<'a, const CONCURRENT_REQUESTS: usize> {
335    address: SocketAddr,
336    shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
337}
338
339#[derive(Debug)]
340#[cfg_attr(feature = "defmt", derive(defmt::Format))]
341#[non_exhaustive]
342pub enum TransportError {
343    /// Writing to the request buffer failed
344    ///
345    /// This can be justified a transport error -- for example, writing too large a payload just
346    /// doesn't work over UDP (but may work over TCP).
347    CouldNotWrite(coap_message_implementations::inmemory_write::WriteError),
348    /// An RST was sent in response to the request message
349    GotRst,
350}
351
352impl<'a, const CONCURRENT_REQUESTS: usize> coap_request::Stack
353    for RequestingCoAPClient<'a, CONCURRENT_REQUESTS>
354{
355    type RequestUnionError = coap_message_implementations::inmemory_write::WriteError;
356    type RequestMessage<'b> = coap_message_implementations::inmemory_write::Message<'b> where Self: 'b;
357    type ResponseMessage<'b> = coap_message_implementations::inmemory::Message<'b> where Self: 'b;
358    type TransportError = TransportError;
359
360    async fn request<Req: coap_request::Request<Self>>(
361        &mut self,
362        mut request: Req,
363    ) -> Result<Req::Output, TransportError> {
364        /// The clean-up code of this function lives in this guard's Drop. It ensures that even if
365        /// the request is dropped prematurely, the runner will not try to wake up a task that's
366        /// not expecting responses any more anyway, and that the slot does not get leaked.
367        struct DropGuard<'a, const CONCURRENT_REQUESTS: usize> {
368            shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
369            slot: usize,
370        }
371        impl<'a, const CONCURRENT_REQUESTS: usize> Drop for DropGuard<'a, CONCURRENT_REQUESTS> {
372            fn drop(&mut self) {
373                let mut requests = self.shared.requests.borrow_mut();
374                requests[self.slot] = Empty;
375                drop(requests);
376                self.shared.request_became_empty.signal(());
377                self.shared.app_to_loop.signal(());
378            }
379        }
380
381        let slot;
382        'found: loop {
383            let mut requests = self.shared.requests.borrow_mut();
384            for (i, request) in requests.iter_mut().enumerate() {
385                if matches!(*request, Empty) {
386                    *request = AllocateRequest {
387                        remote: self.address,
388                    };
389                    drop(requests);
390                    self.shared.app_to_loop.signal(());
391                    slot = i;
392                    break 'found;
393                }
394            }
395            drop(requests);
396            self.shared.request_became_empty.wait().await;
397        }
398
399        // We're in control of that slot, but if our execution does get dropped, let's not block
400        // that one forever.
401        let make_empty = DropGuard {
402            shared: self.shared,
403            slot,
404        };
405
406        self.shared.loop_to_app[slot].wait().await;
407        // We're blocking the CoAP thread now. While we're allowed to keep the buffer across that,
408        // we can't keep requests across await points (other clients may want to enqueue their
409        // requests there), but we don't need to leave requests[slot] in any sensible state because
410        // we're "owning" that now.
411
412        let mut buf = self.shared.buffer.borrow_mut();
413
414        let requests = self.shared.requests.borrow();
415        let FillBuffer(details) = requests[slot] else {
416            panic!(
417                "Lockstep violation: Runner signalled us even though it did not give us the buffer"
418            );
419        };
420        drop(requests);
421
422        let mut outgoing = OutgoingRequestMessage::new(buf.as_mut());
423        let carry = request
424            .build_request(outgoing.message())
425            .await
426            .map_err(TransportError::CouldNotWrite)?;
427        let written = outgoing.done();
428
429        let mut requests = self.shared.requests.borrow_mut();
430        requests[slot] = BufferFilled { details, written };
431        drop(requests);
432
433        drop(buf);
434
435        // Not blocking any more after this
436        self.shared.app_to_loop.signal(());
437
438        self.shared.loop_to_app[slot].wait().await;
439        // Once more, we have exclusive control of the buffer while we're reading, and we block the
440        // runner; as before, requests are not to be held across await.
441
442        let buf = self.shared.buffer.borrow();
443
444        let requests = self.shared.requests.borrow();
445        let ResponsePending { len, .. } = requests[slot] else {
446            panic!("Lockstep violation: Runner signalled us even though no response is pending");
447        };
448        drop(requests);
449
450        // FIXME: There is overhead in running this twice. Should we pass indices? Or just parse
451        // result that incorporates an AsRef<[u8]> that is the locked buffer?
452        let Ok(ParsedMessage {
453            message, msgtype, ..
454        }) = udp_read::parse::<TKL_MAX>(&buf[..len])
455        else {
456            unreachable!("Message was parsed once already");
457        };
458
459        if msgtype == RST {
460            return Err(TransportError::GotRst);
461        }
462
463        let response = request.process_response(&message, carry).await;
464
465        drop(buf);
466
467        // Run cleanup-up code, setting requests[slot] to Empty and signalling the runner that it
468        // owns the buffer again.
469        drop(make_empty);
470
471        Ok(response)
472    }
473}
474
475impl<'a, Socket, Handler, RandomSource, const CONCURRENT_REQUESTS: usize>
476    RunParts<'a, Socket, Handler, RandomSource, CONCURRENT_REQUESTS>
477where
478    Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
479    Handler: coap_handler::Handler,
480    RandomSource: rand_core::RngCore,
481{
482    async fn run(&mut self) -> Result<(), Socket::Error> {
483        loop {
484            let mut buf = self.shared.buffer.borrow_mut();
485            // Receive step
486            match embassy_futures::select::select(
487                self.shared.app_to_loop.wait(),
488                self.socket.receive_into(&mut *buf),
489            )
490            .await
491            {
492                // app_to_loop signalled
493                embassy_futures::select::Either::First(_) => {
494                    // Nothing was written in there, but the application will need to write here
495                    // soon
496                    drop(buf);
497
498                    self.process_app_to_loop().await?;
499                }
500                // Message was received
501                embassy_futures::select::Either::Second(receive_result) => {
502                    let (len, local, remote) = receive_result?;
503
504                    self.process_incoming(buf, len, local, remote).await?;
505                }
506            };
507        }
508    }
509
510    /// Build the request by signalling and awaiting the sending task, and send it
511    async fn send_request(&mut self, slot: usize) -> Result<(), Socket::Error> {
512        self.shared.loop_to_app[slot].signal(());
513        // Across here we should only have the slot number as a local variable
514        self.shared.app_to_loop.wait().await;
515        // FIXME: Actually, there could be any reason, plus we'd really need to clear: So
516        // we should loop, clearing the signal, while our buffer is not BufferFilled, and
517        // then at any rate signal again so that we don't lose signals.
518
519        let mut requests = self.shared.requests.borrow_mut();
520        let BufferFilled { details, written } = &requests[slot] else {
521            // FIXME: allow it to declare that it has changed its mind
522            panic!("Sequence violation by client");
523        };
524
525        let mut buf = self.shared.buffer.borrow_mut();
526        let len = finish_outgoing(buf.as_mut(), true, details.mid, details.token, *written);
527
528        // FIXME can this be any-protocol?
529        const SOCK_UNSPEC: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
530        // FIXME: Should we allow the requester to do follow-ups from the same address, or
531        // will they just not see the change and suffer breakage mid-blockwise?
532        let local = SOCK_UNSPEC;
533        let remote = details.remote;
534
535        let new_state = RequestSent(*details);
536        requests[slot] = new_state;
537        drop(requests);
538
539        self.socket.send(local, remote, &buf[..len]).await?;
540
541        Ok(())
542    }
543
544    async fn process_app_to_loop(&mut self) -> Result<(), Socket::Error> {
545        // We now have to process *any* slots that are due for us to handle, lest we may lose
546        // information that was in the `app_to_loop`. It suffices to process any slot once, for
547        // even if this is later changed to allow running on multiple threads, the slot will only
548        // enter an application processable state after app_to_loop has been cleared, and the new
549        // signal will be pending on the next run of the main loop.
550
551        for slot in 0..CONCURRENT_REQUESTS {
552            // At least as long as this is based on a Mutex that is not Send, there should be no
553            // need for any optimizations that avoid the drop-and-borrow across the loop (for the
554            // compiler can do them).
555            let mut requests = self.shared.requests.borrow_mut();
556
557            // That's the only relevant state to consider here. The other states an app can state and
558            // then signal (BufferFilled, and the RequestSent or Empty after reading a response) are
559            // processed at different places (down here, and in the receive position, respectively).
560            if let AllocateRequest { remote } = &mut requests[slot] {
561                self.next_mid += 1;
562                self.next_token += 1;
563                requests[slot] = FillBuffer(RequestMatchingDetails {
564                    mid: self.next_mid,
565                    token: self.next_token,
566                    remote: *remote,
567                });
568                drop(requests);
569
570                self.send_request(slot).await?;
571            }
572        }
573        Ok(())
574    }
575
576    /// A message has arrived.
577    ///
578    /// Process it as a request or response, sending any response, RST or ACK as needed.
579    async fn process_incoming(
580        &mut self,
581        buf: core::cell::RefMut<'_, [u8; MAX_SIZE]>,
582        len: usize,
583        local: SocketAddr,
584        remote: SocketAddr,
585    ) -> Result<(), Socket::Error> {
586        let action = match udp_read::parse::<TKL_MAX>(&buf[..len]) {
587            Err(MessageTooShort | WrongVersion) => Silent,
588            Err(TokenTooLong { msgtype: CON }) => JustRst,
589            Err(TokenTooLong { msgtype: _ }) => Silent,
590            Ok(ParsedMessage {
591                msgtype,
592                msgid,
593                token,
594                message,
595            }) => {
596                match coap_numbers::code::classify(message.code()) {
597                    coap_numbers::code::Range::Response(_) | coap_numbers::code::Range::Empty => {
598                        // FIXME: If the response is coming in on a RST, we should probably not
599                        // even bother the message handler. (Well at least it should cancel
600                        // retransmissions, so it probably *should* go in there).
601
602                        match (msgtype, self.match_to_slot(msgtype, msgid, &token)) {
603                            (CON, Some(slot)) => ProcessResponseThenAck(slot),
604                            (CON, None) => JustRst,
605                            (_, Some(slot)) => ProcessResponseThenSilent(slot),
606                            (_, None) => Silent,
607                        }
608                    }
609                    coap_numbers::code::Range::Request => {
610                        let action = if msgtype == ACK || msgtype == RST {
611                            // These should never be responses; ignoring them as protocol errors.
612                            Silent
613                        } else {
614                            ReactTo {
615                                msgtype,
616                                token: token.try_into().expect("TKL was suitably constrained"),
617                                extracted: self.handler.extract_request_data(&message),
618                            }
619                        };
620                        action
621                    }
622                    _ => Silent,
623                }
624            }
625        };
626
627        let mut buf = match &action {
628            ProcessResponseThenAck(slot) | ProcessResponseThenSilent(slot) => {
629                drop(buf);
630                self.handle_response(*slot, len).await;
631                self.shared.buffer.borrow_mut()
632            }
633            _ => buf,
634        };
635
636        // Send step
637        let written = match action {
638            ReactTo {
639                msgtype,
640                token,
641                extracted,
642            } => {
643                let responsetype = match msgtype {
644                    CON => ACK,
645                    _ => NON,
646                };
647
648                // Note that we never really use token, but that may change when we start async'ing
649                // the server side (FIXME: does it pay to reduce token to its tkl parts if all we
650                // do is go back later anyway?)
651                let mut outgoing = OutgoingResponseMessage::new::<TKL_MAX>(
652                    buf.as_mut(),
653                    responsetype,
654                    token.len(),
655                );
656
657                // FIXME amplification mitigation
658
659                use coap_message::error::RenderableOnMinimal;
660                use coap_message::MinimalWritableMessage;
661                let outmsg = outgoing.message();
662                match extracted {
663                    Ok(extracted) => {
664                        let rendered = self.handler.build_response(outmsg, extracted);
665                        if let Err(e) = rendered {
666                            // Response building errors get two chances to render
667                            outmsg.reset();
668                            if let Err(e2) = e.render(outmsg) {
669                                outmsg.reset();
670                                if let Err(_) = e2.render(outmsg) {
671                                    outmsg.reset();
672                                    outmsg.set_code(coap_numbers::code::INTERNAL_SERVER_ERROR);
673                                }
674                            }
675                        }
676                    }
677                    Err(e) => {
678                        // Extraction time errors get two chances to render
679                        if let Err(e2) = e.render(outmsg) {
680                            outmsg.reset();
681                            if let Err(_) = e2.render(outmsg) {
682                                outmsg.reset();
683                                outmsg.set_code(coap_numbers::code::INTERNAL_SERVER_ERROR);
684                            }
685                        }
686                    }
687                }
688                outgoing.done()
689            }
690            JustRst => OutgoingResponseMessage::empty(buf.as_mut(), RST),
691            ProcessResponseThenAck(_) => OutgoingResponseMessage::empty(buf.as_mut(), ACK),
692            Silent | ProcessResponseThenSilent(_) => return Ok(()),
693        };
694
695        self.socket.send(local, remote, &buf[..written]).await?;
696
697        Ok(())
698    }
699
700    fn match_to_slot(&mut self, msgtype: Type, mid: u16, token: &[u8]) -> Option<usize> {
701        // FIXME Should we check for the address as well? That'd require reliable determination of
702        // being multicast.
703
704        // If it's not 2-byte we didn't send it.
705        let token = u16::from_be_bytes(token[..].try_into().ok()?);
706
707        let requests = self.shared.requests.borrow();
708        for (slot, r) in requests.iter().enumerate() {
709            if let RequestSent(details) = r {
710                // This includes piggy-backed ACKs w/o token, and RSTs, neither of which contain a
711                // message, but at least the latter is relevant to requests. Have to filter out
712                // NONs and CONs because they're from the sender's space, and could match randomly.
713                if (msgtype == ACK || msgtype == RST) && details.mid == mid {
714                    return Some(slot);
715                }
716                // We're only falling through here when receiving NON responses, or CON responses.
717                if details.token == token {
718                    // If it's an ACK or RST in here, something went seriously wrong in the peer.
719                    return Some(slot);
720                }
721            }
722        }
723        None
724    }
725
726    /// Returns true if the response was expected
727    async fn handle_response(&mut self, slot: usize, len: usize) {
728        let mut requests = self.shared.requests.borrow_mut();
729        let RequestSent(details) = requests[slot] else {
730            unreachable!();
731        };
732        requests[slot] = ResponsePending { details, len };
733        drop(requests);
734
735        self.shared.loop_to_app[slot].signal(());
736        loop {
737            // Across here we should only have the slot number as a local variable
738            self.shared.app_to_loop.wait().await;
739            let requests = self.shared.requests.borrow();
740            if let ResponsePending { .. } = requests[slot] {
741                // Something else sent a signal
742                continue;
743            } else {
744                // Not caring for the moment if it was Empty or RequestSent -- the latter would be
745                // useful for observations and other non-traditional responses, but our client
746                // doesn't do that yet.
747                break;
748            }
749        }
750        // Signaling self no matter whether ResponsePending was found inbetween or not -- even if,
751        // the app_to_loop.await() we just did could have masked another client tasks's request to
752        // allocate something, and signalling self is easier than setting up a flag and an extra
753        // code path.
754        self.shared.app_to_loop.signal(());
755    }
756}