1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
//! A CoAP server and client implementation built on [embedded_nal_async].
//!
//! Usage and operation
//! -------------------
//!
//! An example of how to use this will be available as part of the [coap-message-demos] crate.
//!
//! * Allocate a [CoAPShared] with a `CONCURRENT_REQUESTS` const of the number of outgoing requests
//! that should be servicable simultaneously.
//!
//! * [CoAPShared::split()] that into a client and a server part.
//!
//! * Use the client's [CoAPRuntimeClient::to()] method to create a [coap_request::Stack] that can
//! be used to send CoAP requests and receive a response. (Multiple responses, eg. from
//! observation, are planned but not implemented yet).
//!
//! The [coap_request_implementations] crate contains suitable (albeit immature) building blocks
//! for constructing requests.
//!
//! * Into the server's [CoAPRunner::run()] method, pass an unconnected [UDP
//! socket](embedded_nal_async::UnconnectedUdp), a source of low-grade entropy (for
//! retransmission jitter and that like) and a [CoAP server application](coap_handler::Handler).
//!
//! The [coap_handler_implementations] crate contains suitable building blocks for constructing
//! such a server application (including some to combine handlers for individual resources into a
//! handler that picks sub-handlers from the URI path).
//!
//! The future returned by the run function needs to be polled by an executor; note that it is
//! not Send, and some executors need configuration to allow that.
//!
//!
//! Caveats
//! -------
//!
//! * The server does not perform any amplification mitigation (and the handler can't for lack of
//! remote information); use this only in environments where this is acceptable (e.g. in closed
//! networks).
//!
//! This will be mitigated in a future version.
//!
//! FIXME only provide the 3x buffer for responses / when the handler indicates that it needs
//! more, 4.01 Echo? (The handler may be unhappy that it gets dropped; -handlers may need
//! guidance on this)
//!
//! * This server does not uphold NSTART and PROBING_RATE, the fundamental flow control parameters
//! of CoAP that make it OK for generic Internet applications.
//!
//! This will be addressed in a future version.
//!
//! FIXME pass in a time source
//!
//! * The server does not perform any message deduplication. All handler functions must therefore
//! be idempotent.
//!
//! * Messages are created with as little copying as [embedded_nal] permits. For writable messages,
//! that means that they need to be written to in ascending CoAP option number. This is in
//! accordance with the implemented [coap_message::MinimalWritableMessage] and
//! [coap_message::MutableWritableMessage] traits.
//!
//! That restriction enables this crate to not only be `no_std`, but to not require `alloc`
//! either.
//!
//! Choices
//! -------
//!
//! This implementation of CoAP chooses to go with a single task, thus only ever allocating a
//! single buffer as part of the task. There are certainly alternative choices to be made here,
//! which may either be implemented in a different crate or altered later (for example, if it turns
//! out that a more effective implementation uses different tasks for send and receive, but uses a
//! single buffer or an at-least-1-sized pool that gets locked by the tasks).
#![no_std]
mod udp_format;
mod udp_read;
mod udp_write;
use udp_format::Type::{self, *};
use udp_read::{ParseError::*, ParsedMessage};
use udp_write::{finish_outgoing, MessageWritten, OutgoingRequestMessage, OutgoingResponseMessage};
use coap_message::ReadableMessage;
use embedded_nal_async::SocketAddr;
/// Maximum size of a CoAP message we need to expect
///
/// Also used in creating an output buffer as it's allocated the same way anyway.
const MAX_SIZE: usize = 1152;
// FIXME make configurable
const TKL_MAX: usize = 8;
// If we went with a second buffer to be able to send 5.03s early on, that'd need to be sized to
// the maximum of token length we support
/// Ways to reply to a message
enum Extracted<R> {
Silent,
JustRst,
ProcessResponseThenAck(usize),
ProcessResponseThenSilent(usize),
ReactTo {
msgtype: Type,
token: heapless::Vec<u8, TKL_MAX>,
extracted: R,
},
}
use Extracted::*;
/// Properties by which the state associated with a pending outgoing request is mached
// Copy and Clone are mainly there because this makes it easier to pass this around while going
// through the state machine of RequestState
#[derive(Copy, Clone, Debug)]
struct RequestMatchingDetails {
// If we, as an implementation detail, decide never to have pending requests to different
// remotes, may we just elide this?
remote: SocketAddr,
mid: u16,
token: u16,
}
/// State in which any of the slots used for outoging requests can be
// Copy and Clone are mainly there because this makes it easier to pass this around while going
// through the state machine
#[derive(Copy, Clone, Debug)]
enum RequestState {
/// Slot is unused
Empty,
/// A client application component places this when it wants to send a request
///
/// FIXME: Would it make sense to also have a state where an ExpectMore can be turned into a
/// Pending, eg. to actively cancel an observation? (If RequestMatchingDetails chose not to
/// carry the remote, it'd need to be provided anew, with erroneous input from the application
/// just resulting in weird messages that are sent)
AllocateRequest { remote: SocketAddr },
/// The loop allows this application component to write the buffer, and awaits signalling
/// through `.app_to_loop`.
FillBuffer(RequestMatchingDetails),
/// The application component is done filling the buffer
BufferFilled {
details: RequestMatchingDetails,
/// Output of build_outgoing
written: MessageWritten,
},
/// State while the loop is awaiting a response
RequestSent(RequestMatchingDetails),
/// This is the loop sets once a response has arrived. Once the client is done, it needs to
/// signal through `.app_to_loop`.
// len? Or indices? (see also `.request()` comments)
ResponsePending {
details: RequestMatchingDetails,
len: usize,
},
// After that, the client either sets Empty or RequestSent.
}
use RequestState::*;
/// The shared state of a CoAP main loop, which would typically run as a server and client
/// simultaneously.
///
/// This is usually created once and then [`.split()`] up into a client and a server part.
///
/// Relevant generics:
/// * `CONCURRENT_REQUESTS` indicates how many requests may be outstanding simultaneously on the
/// client side. Note that this distinct from `NSTART` in two ways: `NSTART` is counted per peer
/// (this is total), and is decremented as soon as the request is ACK'ed (this is until the
/// (final) response has been processed).
pub struct CoAPShared<const CONCURRENT_REQUESTS: usize> {
// This is held by run (even, even mainly, particular across await points) except while it is
// asking a specific request to act on it.
buffer: core::cell::RefCell<[u8; MAX_SIZE]>,
// This can be taken by anybody currently executing on our loop, and is not held across await
// points.
requests: core::cell::RefCell<[RequestState; CONCURRENT_REQUESTS]>,
// After anything but run() updates requests, it signals updated_request so that run can take
// any signals out of there and let the pending clients do their work.
// FIXME is NoopRawLoop safe in the presence of any executor?
app_to_loop: embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>,
// When run() needs anything from the client application, it pings it through this (FIXME:
// There has to be a better way than CONCURRENT_REQUESTS signals)
loop_to_app: [embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>;
CONCURRENT_REQUESTS],
// Signalled when the loop sets any slot to Empty. This is a stopgap measure against failing
// when more requests than CONCURRENT_REQUESTS should be sent. Should be avoided, though, as
// it's not a queue, and signaling it will cause something like the stampeding herd problem.
//
// We could signal the slot number here, but frankly, that wouldn't simplify code, and
// efficiency is out in this case already.
request_became_empty:
embassy_sync::signal::Signal<embassy_sync::blocking_mutex::raw::NoopRawMutex, ()>,
}
/// Access to the client side of a [CoAPShared]
///
/// From this, requests can be be started through the [`.to()`] method.
///
/// This is the "runtime" variant because it can be copied around arbitrarily, and requests made
/// through it take any available of the `CONCURRENT_REQUESTS` slots.
///
/// Note that requests sent through this only make progress while the socket part of it is
/// `[.run()]` (and being awaited).
#[derive(Copy, Clone)]
pub struct CoAPRuntimeClient<'a, const CONCURRENT_REQUESTS: usize> {
shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
}
/// Access to the server side of a [CoAPShared]
///
/// This needs to be [`.run()`] both in order to serve peers and to make progress on any
/// requests issued through the CoAPRuntimeClient on the same Shared.
pub struct CoAPRunner<'a, const CONCURRENT_REQUESTS: usize> {
shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
}
/// Parts of the `.run()` loop that are exclusively held
/// Relevant generics:
/// * `Socket` indicates which kind of UDP socket this will run on
/// * `Handler` is the CoAP request handler type -- the application's implementation of a CoAP
/// server
struct RunParts<'a, Socket, Handler, RandomSource, const CONCURRENT_REQUESTS: usize>
where
Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
Handler: coap_handler::Handler,
RandomSource: rand_core::RngCore,
{
shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
socket: &'a mut Socket,
handler: &'a mut Handler,
rand: &'a mut RandomSource,
// FIXME: There is time stuff involved actually -- not sending 64k messages within short times,
// and outbound rate limiting. This probably becomes trivial when applying NSTART and
// PROBING_RATE, and anything that has sub-ms RTTs is probably special enough.
// FIXME: While they were still locals of the loop() function we could imagine they wouldn't
// even be used when there are no ways to get requests; now, should we do away with their
// storage when not sending as a client?
next_mid: u16,
next_token: u16,
}
impl<const CONCURRENT_REQUESTS: usize> CoAPShared<CONCURRENT_REQUESTS> {
pub fn new() -> Self {
// weird array initialization but OK
const EMPTY_SIGNAL: embassy_sync::signal::Signal<
embassy_sync::blocking_mutex::raw::NoopRawMutex,
(),
> = embassy_sync::signal::Signal::new();
Self {
buffer: [0; MAX_SIZE].into(),
requests: [Empty; CONCURRENT_REQUESTS].into(),
app_to_loop: Default::default(),
loop_to_app: [EMPTY_SIGNAL; CONCURRENT_REQUESTS].into(),
request_became_empty: Default::default(),
}
}
/// Split a CoAPShared into a client and a server/runner part.
///
/// While technically both of them are just thin wrappers around a shared reference, this split
/// ensures that the otherwise-panicking constraints about who grabs which mutices when are
/// upheld.
pub fn split<'a>(
&'a self,
) -> (
CoAPRuntimeClient<'a, CONCURRENT_REQUESTS>,
CoAPRunner<'a, CONCURRENT_REQUESTS>,
) {
(
CoAPRuntimeClient { shared: self },
CoAPRunner { shared: self },
)
}
}
impl<'a, const CONCURRENT_REQUESTS: usize> CoAPRunner<'a, CONCURRENT_REQUESTS> {
/// Service the socket, sending any incoming requests to the handler, simultaneously taking
/// care to interact with the structures shared with the client parts to send out the requests
/// and file back the responses.
///
/// If any error occurs on the socket, this terminates.
pub async fn run<Socket, Handler, RandomSource>(
&self,
socket: &mut Socket,
handler: &mut Handler,
rand: &mut RandomSource,
) -> Result<(), Socket::Error>
where
Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
Handler: coap_handler::Handler,
RandomSource: rand_core::RngCore,
{
let start_values = rand.next_u32();
let next_mid = (start_values >> 16) as u16;
let next_token = start_values as u16;
let mut run_parts = RunParts {
shared: self.shared,
socket,
handler,
next_mid,
next_token,
rand,
};
run_parts.run().await
}
}
impl<'a, const CONCURRENT_REQUESTS: usize> CoAPRuntimeClient<'a, CONCURRENT_REQUESTS> {
/// Set up a request to a particular network peer
///
/// This starts off a builder that (as of now) is immediately usable as a
/// [coap_request::Stack]; on the long run, this may gain extra builder steps (such as a
/// `.reliably()` or `.unreliably()`, or `.longrunning()` to tap into a larger token space), or
/// more diverse `.to_...()` methods for better control.
///
/// FIXME: Currently, requests set up this way are sent as CONs but without retransmission.
pub fn to(&self, address: SocketAddr) -> RequestingCoAPClient<'_, CONCURRENT_REQUESTS> {
RequestingCoAPClient {
shared: self.shared,
address,
}
}
}
/// The actual [coap_request::Stack] implementation derived from a [CoAPShared] by putting in an
/// address through `.to()`.
pub struct RequestingCoAPClient<'a, const CONCURRENT_REQUESTS: usize> {
address: SocketAddr,
shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum TransportError {
/// Writing to the request buffer failed
///
/// This can be justified a transport error -- for example, writing too large a payload just
/// doesn't work over UDP (but may work over TCP).
CouldNotWrite(coap_message_implementations::inmemory_write::WriteError),
/// An RST was sent in response to the request message
GotRst,
}
impl<'a, const CONCURRENT_REQUESTS: usize> coap_request::Stack
for RequestingCoAPClient<'a, CONCURRENT_REQUESTS>
{
type RequestUnionError = coap_message_implementations::inmemory_write::WriteError;
type RequestMessage<'b> = coap_message_implementations::inmemory_write::Message<'b> where Self: 'b;
type ResponseMessage<'b> = coap_message_implementations::inmemory::Message<'b> where Self: 'b;
type TransportError = TransportError;
async fn request<Req: coap_request::Request<Self>>(
&mut self,
mut request: Req,
) -> Result<Req::Output, TransportError> {
/// The clean-up code of this function lives in this guard's Drop. It ensures that even if
/// the request is dropped prematurely, the runner will not try to wake up a task that's
/// not expecting responses any more anyway, and that the slot does not get leaked.
struct DropGuard<'a, const CONCURRENT_REQUESTS: usize> {
shared: &'a CoAPShared<CONCURRENT_REQUESTS>,
slot: usize,
}
impl<'a, const CONCURRENT_REQUESTS: usize> Drop for DropGuard<'a, CONCURRENT_REQUESTS> {
fn drop(&mut self) {
let mut requests = self.shared.requests.borrow_mut();
requests[self.slot] = Empty;
drop(requests);
self.shared.request_became_empty.signal(());
self.shared.app_to_loop.signal(());
}
}
let slot;
'found: loop {
let mut requests = self.shared.requests.borrow_mut();
for (i, request) in requests.iter_mut().enumerate() {
if matches!(*request, Empty) {
*request = AllocateRequest {
remote: self.address,
};
drop(requests);
self.shared.app_to_loop.signal(());
slot = i;
break 'found;
}
}
drop(requests);
self.shared.request_became_empty.wait().await;
}
// We're in control of that slot, but if our execution does get dropped, let's not block
// that one forever.
let make_empty = DropGuard {
shared: self.shared,
slot,
};
self.shared.loop_to_app[slot].wait().await;
// We're blocking the CoAP thread now. While we're allowed to keep the buffer across that,
// we can't keep requests across await points (other clients may want to enqueue their
// requests there), but we don't need to leave requests[slot] in any sensible state because
// we're "owning" that now.
let mut buf = self.shared.buffer.borrow_mut();
let requests = self.shared.requests.borrow();
let FillBuffer(details) = requests[slot] else {
panic!(
"Lockstep violation: Runner signalled us even though it did not give us the buffer"
);
};
drop(requests);
let mut outgoing = OutgoingRequestMessage::new(buf.as_mut());
let carry = request
.build_request(outgoing.message())
.await
.map_err(TransportError::CouldNotWrite)?;
let written = outgoing.done();
let mut requests = self.shared.requests.borrow_mut();
requests[slot] = BufferFilled { details, written };
drop(requests);
drop(buf);
// Not blocking any more after this
self.shared.app_to_loop.signal(());
self.shared.loop_to_app[slot].wait().await;
// Once more, we have exclusive control of the buffer while we're reading, and we block the
// runner; as before, requests are not to be held across await.
let buf = self.shared.buffer.borrow();
let requests = self.shared.requests.borrow();
let ResponsePending { len, .. } = requests[slot] else {
panic!("Lockstep violation: Runner signalled us even though no response is pending");
};
drop(requests);
// FIXME: There is overhead in running this twice. Should we pass indices? Or just parse
// result that incorporates an AsRef<[u8]> that is the locked buffer?
let Ok(ParsedMessage {
message, msgtype, ..
}) = udp_read::parse::<TKL_MAX>(&buf[..len])
else {
unreachable!("Message was parsed once already");
};
if msgtype == RST {
return Err(TransportError::GotRst);
}
let response = request.process_response(&message, carry).await;
drop(buf);
// Run cleanup-up code, setting requests[slot] to Empty and signalling the runner that it
// owns the buffer again.
drop(make_empty);
Ok(response)
}
}
impl<'a, Socket, Handler, RandomSource, const CONCURRENT_REQUESTS: usize>
RunParts<'a, Socket, Handler, RandomSource, CONCURRENT_REQUESTS>
where
Socket: embedded_nal_async::UnconnectedUdp + ?Sized,
Handler: coap_handler::Handler,
RandomSource: rand_core::RngCore,
{
async fn run(&mut self) -> Result<(), Socket::Error> {
loop {
let mut buf = self.shared.buffer.borrow_mut();
// Receive step
match embassy_futures::select::select(
self.shared.app_to_loop.wait(),
self.socket.receive_into(&mut *buf),
)
.await
{
// app_to_loop signalled
embassy_futures::select::Either::First(_) => {
// Nothing was written in there, but the application will need to write here
// soon
drop(buf);
self.process_app_to_loop().await?;
}
// Message was received
embassy_futures::select::Either::Second(receive_result) => {
let (len, local, remote) = receive_result?;
self.process_incoming(buf, len, local, remote).await?;
}
};
}
}
/// Build the request by signalling and awaiting the sending task, and send it
async fn send_request(&mut self, slot: usize) -> Result<(), Socket::Error> {
self.shared.loop_to_app[slot].signal(());
// Across here we should only have the slot number as a local variable
self.shared.app_to_loop.wait().await;
// FIXME: Actually, there could be any reason, plus we'd really need to clear: So
// we should loop, clearing the signal, while our buffer is not BufferFilled, and
// then at any rate signal again so that we don't lose signals.
let mut requests = self.shared.requests.borrow_mut();
let BufferFilled { details, written } = &requests[slot] else {
// FIXME: allow it to declare that it has changed its mind
panic!("Sequence violation by client");
};
let mut buf = self.shared.buffer.borrow_mut();
let len = finish_outgoing(buf.as_mut(), true, details.mid, details.token, *written);
// FIXME can this be any-protocol?
const SOCK_UNSPEC: SocketAddr = SocketAddr::new(
embedded_nal_async::IpAddr::V6(embedded_nal_async::Ipv6Addr::UNSPECIFIED),
0,
);
// FIXME: Should we allow the requester to do follow-ups from the same address, or
// will they just not see the change and suffer breakage mid-blockwise?
let local = SOCK_UNSPEC;
let remote = details.remote;
let new_state = RequestSent(*details);
requests[slot] = new_state;
drop(requests);
self.socket.send(local, remote, &buf[..len]).await?;
Ok(())
}
async fn process_app_to_loop(&mut self) -> Result<(), Socket::Error> {
// We now have to process *any* slots that are due for us to handle, lest we may lose
// information that was in the `app_to_loop`. It suffices to process any slot once, for
// even if this is later changed to allow running on multiple threads, the slot will only
// enter an application processable state after app_to_loop has been cleared, and the new
// signal will be pending on the next run of the main loop.
for slot in 0..CONCURRENT_REQUESTS {
// At least as long as this is based on a Mutex that is not Send, there should be no
// need for any optimizations that avoid the drop-and-borrow across the loop (for the
// compiler can do them).
let mut requests = self.shared.requests.borrow_mut();
// That's the only relevant state to consider here. The other states an app can state and
// then signal (BufferFilled, and the RequestSent or Empty after reading a response) are
// processed at different places (down here, and in the receive position, respectively).
if let AllocateRequest { remote } = &mut requests[slot] {
self.next_mid += 1;
self.next_token += 1;
requests[slot] = FillBuffer(RequestMatchingDetails {
mid: self.next_mid,
token: self.next_token,
remote: *remote,
});
drop(requests);
self.send_request(slot).await?;
}
}
Ok(())
}
/// A message has arrived.
///
/// Process it as a request or response, sending any response, RST or ACK as needed.
async fn process_incoming(
&mut self,
buf: core::cell::RefMut<'_, [u8; MAX_SIZE]>,
len: usize,
local: SocketAddr,
remote: SocketAddr,
) -> Result<(), Socket::Error> {
let action = match udp_read::parse::<TKL_MAX>(&buf[..len]) {
Err(MessageTooShort | WrongVersion) => Silent,
Err(TokenTooLong { msgtype: CON }) => JustRst,
Err(TokenTooLong { msgtype: _ }) => Silent,
Ok(ParsedMessage {
msgtype,
msgid,
token,
message,
}) => {
match coap_numbers::code::classify(message.code()) {
coap_numbers::code::Range::Response(_) | coap_numbers::code::Range::Empty => {
// FIXME: If the response is coming in on a RST, we should probably not
// even bother the message handler. (Well at least it should cancel
// retransmissions, so it probably *should* go in there).
match (msgtype, self.match_to_slot(msgtype, msgid, &token)) {
(CON, Some(slot)) => ProcessResponseThenAck(slot),
(CON, None) => JustRst,
(_, Some(slot)) => ProcessResponseThenSilent(slot),
(_, None) => Silent,
}
}
coap_numbers::code::Range::Request => {
let action = if msgtype == ACK || msgtype == RST {
// These should never be responses; ignoring them as protocol errors.
Silent
} else {
ReactTo {
msgtype,
token: token.try_into().expect("TKL was suitably constrained"),
extracted: self.handler.extract_request_data(&message),
}
};
action
}
_ => Silent,
}
}
};
let mut buf = match &action {
ProcessResponseThenAck(slot) | ProcessResponseThenSilent(slot) => {
drop(buf);
self.handle_response(*slot, len).await;
self.shared.buffer.borrow_mut()
}
_ => buf,
};
// Send step
let written = match action {
ReactTo {
msgtype,
token,
extracted,
} => {
let responsetype = match msgtype {
CON => ACK,
_ => NON,
};
// Note that we never really use token, but that may change when we start async'ing
// the server side (FIXME: does it pay to reduce token to its tkl parts if all we
// do is go back later anyway?)
let mut outgoing = OutgoingResponseMessage::new::<TKL_MAX>(
buf.as_mut(),
responsetype,
token.len(),
);
// FIXME amplification mitigation
use coap_message::error::RenderableOnMinimal;
use coap_message::MinimalWritableMessage;
let outmsg = outgoing.message();
match extracted {
Ok(extracted) => {
let rendered = self.handler.build_response(outmsg, extracted);
if let Err(e) = rendered {
// Response building errors get two chances to render
outmsg.reset();
if let Err(e2) = e.render(outmsg) {
outmsg.reset();
if let Err(_) = e2.render(outmsg) {
outmsg.reset();
outmsg.set_code(coap_numbers::code::INTERNAL_SERVER_ERROR);
}
}
}
}
Err(e) => {
// Extraction time errors get two chances to render
if let Err(e2) = e.render(outmsg) {
outmsg.reset();
if let Err(_) = e2.render(outmsg) {
outmsg.reset();
outmsg.set_code(coap_numbers::code::INTERNAL_SERVER_ERROR);
}
}
}
}
outgoing.done()
}
JustRst => OutgoingResponseMessage::empty(buf.as_mut(), RST),
ProcessResponseThenAck(_) => OutgoingResponseMessage::empty(buf.as_mut(), ACK),
Silent | ProcessResponseThenSilent(_) => return Ok(()),
};
self.socket.send(local, remote, &buf[..written]).await?;
Ok(())
}
fn match_to_slot(&mut self, msgtype: Type, mid: u16, token: &[u8]) -> Option<usize> {
// FIXME Should we check for the address as well? That'd require reliable determination of
// being multicast.
// If it's not 2-byte we didn't send it.
let token = u16::from_be_bytes(token[..].try_into().ok()?);
let requests = self.shared.requests.borrow();
for (slot, r) in requests.iter().enumerate() {
if let RequestSent(details) = r {
// This includes piggy-backed ACKs w/o token, and RSTs, neither of which contain a
// message, but at least the latter is relevant to requests. Have to filter out
// NONs and CONs because they're from the sender's space, and could match randomly.
if (msgtype == ACK || msgtype == RST) && details.mid == mid {
return Some(slot);
}
// We're only falling through here when receiving NON responses, or CON responses.
if details.token == token {
// If it's an ACK or RST in here, something went seriously wrong in the peer.
return Some(slot);
}
}
}
None
}
/// Returns true if the response was expected
async fn handle_response(&mut self, slot: usize, len: usize) {
let mut requests = self.shared.requests.borrow_mut();
let RequestSent(details) = requests[slot] else {
unreachable!();
};
requests[slot] = ResponsePending { details, len };
drop(requests);
self.shared.loop_to_app[slot].signal(());
loop {
// Across here we should only have the slot number as a local variable
self.shared.app_to_loop.wait().await;
let requests = self.shared.requests.borrow();
if let ResponsePending { .. } = requests[slot] {
// Something else sent a signal
continue;
} else {
// Not caring for the moment if it was Empty or RequestSent -- the latter would be
// useful for observations and other non-traditional responses, but our client
// doesn't do that yet.
break;
}
}
// Signaling self no matter whether ResponsePending was found inbetween or not -- even if,
// the app_to_loop.await() we just did could have masked another client tasks's request to
// allocate something, and signalling self is easier than setting up a flag and an extra
// code path.
self.shared.app_to_loop.signal(());
}
}