mctp_estack/
router.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2/*
3 * Copyright (c) 2024-2025 Code Construct
4 */
5
6//! MCTP Routing
7
8#[allow(unused)]
9use crate::fmt::{debug, error, info, trace, warn};
10
11use core::cell::RefCell;
12use core::future::{poll_fn, Future};
13use core::pin::pin;
14use core::task::Poll;
15
16use crate::reassemble::Reassembler;
17use crate::{
18    AppCookie, Fragmenter, ReceiveHandle, SendOutput, Stack, MAX_MTU,
19    MAX_PAYLOAD,
20};
21use mctp::{Eid, Error, MsgIC, MsgType, Result, Tag, TagValue};
22
23use embassy_sync::waitqueue::{MultiWakerRegistration, WakerRegistration};
24use embassy_sync::zerocopy_channel::{Channel, Receiver, Sender};
25
26use heapless::Vec;
27
28// TODO sizing is a bit arbitrary. They don't take up much space.
29const MAX_LISTENERS: usize = 20;
30const MAX_RECEIVERS: usize = 50;
31
32// TODO: feature to configure mutex?
33type RawMutex = embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
34type AsyncMutex<T> = embassy_sync::mutex::Mutex<RawMutex, T>;
35type BlockingMutex<T> =
36    embassy_sync::blocking_mutex::Mutex<RawMutex, RefCell<T>>;
37
38type PortRawMutex = embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
39// type PortRawMutex = embassy_sync::blocking_mutex::raw::NoopRawMutex;
40
41// Identifier for a Port
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
43pub struct PortId(pub u8);
44
45/// A trait implemented by applications to determine the routing table.
46pub trait PortLookup: Send {
47    /// Returns the `PortId` for a destination EID.
48    ///
49    /// `PortId` is an index into the array of `ports` provided to [`Router::new`]
50    ///
51    /// Return `None` to drop the packet as unreachable. This lookup
52    /// is only called for outbound packets - packets destined for the local EID
53    /// will not be passed to this callback.
54    ///
55    /// `source_port` is the incoming interface of a forwarded packet,
56    /// or `None` for locally generated packets.
57    fn by_eid(
58        &mut self,
59        eid: Eid,
60        source_port: Option<PortId>,
61    ) -> Option<PortId>;
62}
63
64/// Used like `heapless::Vec`, but lets the mut buffer be written into
65/// without zero-fill every time.
66struct PktBuf {
67    data: [u8; MAX_MTU],
68    len: usize,
69    dest: Eid,
70}
71
72impl PktBuf {
73    const fn new() -> Self {
74        Self {
75            data: [0u8; MAX_MTU],
76            len: 0,
77            dest: Eid(0),
78        }
79    }
80
81    fn set(&mut self, data: &[u8]) -> Result<()> {
82        let hdr = Reassembler::header(data);
83        debug_assert!(hdr.is_ok());
84        let hdr = hdr?;
85        let dst = self.data.get_mut(..data.len()).ok_or(Error::NoSpace)?;
86        dst.copy_from_slice(data);
87        self.len = data.len();
88        self.dest = Eid(hdr.dest_endpoint_id());
89        Ok(())
90    }
91}
92
93impl core::ops::Deref for PktBuf {
94    type Target = [u8];
95
96    fn deref(&self) -> &[u8] {
97        &self.data[..self.len]
98    }
99}
100
101/// The "producer" side of a queue of packets to send out a MCTP port/interface.
102///
103/// It will be used by `Routing` to enqueue packets to a port.
104pub struct PortTop<'a> {
105    /// Forwarded packet queue.
106    /// The outer mutex will not be held over an await.
107    packets: AsyncMutex<Sender<'a, PortRawMutex, PktBuf>>,
108
109    /// Temporary storage to flatten vectorised local sent messages
110    // prior to fragmentation and queueing.
111    message: AsyncMutex<Vec<u8, MAX_PAYLOAD>>,
112
113    mtu: usize,
114}
115
116impl PortTop<'_> {
117    /// Enqueues a packet.
118    ///
119    /// Do not call with locks held.
120    /// May block waiting for a port queue to flush.
121    /// Packet must be a valid MCTP packet, may panic otherwise.
122    async fn forward_packet(&self, pkt: &[u8]) -> Result<()> {
123        debug_assert!(Reassembler::header(pkt).is_ok());
124
125        let mut sender = self.packets.lock().await;
126        // Note: must not await while holding `sender`
127
128        // Check space first (can't rollback after try_send)
129        if pkt.len() > self.mtu {
130            debug!("Forward packet too large");
131            return Err(Error::NoSpace);
132        }
133
134        // Get a slot to send
135        let slot = sender.try_send().ok_or_else(|| {
136            debug!("Dropped forward packet");
137            Error::TxFailure
138        })?;
139
140        // Fill the buffer
141        // OK unwrap: pkt.len() checked above.
142        slot.set(pkt).unwrap();
143        sender.send_done();
144        Ok(())
145    }
146
147    /// Fragments and enqueues a message.
148    ///
149    /// Do not call with locks held.
150    /// May block waiting for a port queue to flush.
151    async fn send_message(
152        &self,
153        fragmenter: &mut Fragmenter,
154        pkt: &[&[u8]],
155    ) -> Result<Tag> {
156        trace!("send_message");
157        let mut msg;
158        let payload = if pkt.len() == 1 {
159            // Avoid the copy when sending a single slice
160            pkt[0]
161        } else {
162            msg = self.message.lock().await;
163            msg.clear();
164            for p in pkt {
165                msg.extend_from_slice(p).map_err(|_| {
166                    debug!("Message too large");
167                    Error::NoSpace
168                })?;
169            }
170            &msg
171        };
172
173        loop {
174            let mut sender = self.packets.lock().await;
175
176            let qpkt = sender.send().await;
177            qpkt.len = 0;
178            qpkt.dest = fragmenter.dest();
179            let r = fragmenter.fragment(payload, &mut qpkt.data);
180            match r {
181                SendOutput::Packet(p) => {
182                    qpkt.len = p.len();
183                    sender.send_done();
184                    if fragmenter.is_done() {
185                        break Ok(fragmenter.tag());
186                    }
187                }
188                SendOutput::Error { err, .. } => {
189                    debug!("Error packetising");
190                    sender.send_done();
191                    break Err(err);
192                }
193                SendOutput::Complete { .. } => unreachable!(),
194            }
195        }
196    }
197}
198
199/// The "consumer" side of a queue of packets to send out a MCTP interface,
200///
201/// An MCTP transport implementation will read packets to send with
202/// [`outbound()`](Self::outbound).
203pub struct PortBottom<'a> {
204    /// packet queue
205    packets: Receiver<'a, PortRawMutex, PktBuf>,
206}
207
208impl PortBottom<'_> {
209    /// Retrieve an outbound packet to send for this port.
210    ///
211    /// Should call [`outbound_done()`](Self::outbound_done) to consume the
212    /// packet and advance the queue.
213    /// `outbound()` may be called multiple times to peek at the same packet.
214    /// Also returns the destination EID.
215    pub async fn outbound(&mut self) -> (&[u8], Eid) {
216        if self.packets.len() > 1 {
217            trace!("packets avail {}", self.packets.len());
218        }
219        let pkt = self.packets.receive().await;
220        (pkt, pkt.dest)
221    }
222
223    /// Attempt to retrieve an outbound packet.
224    ///
225    /// This is the same as [`outbound()`](Self::outbound) but returns
226    /// `None` immediately if not available.
227    ///
228    /// Should call [`outbound_done()`](Self::outbound_done) to consume the
229    /// packet and advance the queue.
230    /// `try_outbound()` may be called multiple times to peek at the same packet.
231    pub fn try_outbound(&mut self) -> Option<(&[u8], Eid)> {
232        trace!("packets avail {} try", self.packets.len());
233        self.packets.try_receive().map(|pkt| (&**pkt, pkt.dest))
234    }
235
236    /// Consume the outbound packet and advance the queue.
237    pub fn outbound_done(&mut self) {
238        self.packets.receive_done()
239    }
240}
241
242/// Storage for a Port, being a physical MCTP interface.
243// TODO: instead of storing Vec<u8, N>, it could
244// store `&'r []` and a length field, which would allow different ports
245// have different MAX_MESSAGE/MAX_MTU. Does add another lifetime parameter.
246pub struct PortStorage<const FORWARD_QUEUE: usize = 4> {
247    /// forwarded packet queue
248    packets: [PktBuf; FORWARD_QUEUE],
249}
250
251impl<const FORWARD_QUEUE: usize> PortStorage<FORWARD_QUEUE> {
252    pub fn new() -> Self {
253        Self {
254            packets: [const { PktBuf::new() }; FORWARD_QUEUE],
255        }
256    }
257}
258
259impl<const FORWARD_QUEUE: usize> Default for PortStorage<FORWARD_QUEUE> {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265pub struct PortBuilder<'a> {
266    /// forwarded packet queue
267    packets: Channel<'a, PortRawMutex, PktBuf>,
268}
269
270impl<'a> PortBuilder<'a> {
271    pub fn new<const FORWARD_QUEUE: usize>(
272        storage: &'a mut PortStorage<FORWARD_QUEUE>,
273    ) -> Self {
274        // PortBuilder and PortStorage need to be separate structs, since
275        // zerocopy_channel::Channel takes a slice.
276        Self {
277            packets: Channel::new(storage.packets.as_mut_slice()),
278        }
279    }
280
281    pub fn build(
282        &mut self,
283        mtu: usize,
284    ) -> Result<(PortTop<'_>, PortBottom<'_>)> {
285        if mtu > MAX_MTU {
286            debug!("port mtu {} > MAX_MTU {}", mtu, MAX_MTU);
287            return Err(Error::BadArgument);
288        }
289
290        let (ps, pr) = self.packets.split();
291
292        let t = PortTop {
293            message: AsyncMutex::new(Vec::new()),
294            packets: AsyncMutex::new(ps),
295            mtu,
296        };
297        let b = PortBottom { packets: pr };
298        Ok((t, b))
299    }
300}
301
302/// An async MCTP stack with routing.
303///
304/// This interfaces between transport ports and MCTP using applications.
305///
306/// Applications can use [`req()`](Self::req) and [`listener()`](Self::listener)
307/// to obtain instances of the [`mctp`] async traits.
308///
309/// Device-provided input handlers feed input MCTP packets to
310/// [`inbound()`](Self::inbound).
311///
312/// For outbound packets each port has queue split into `PortTop` and `PortBottom`.
313/// `Router` will feed packets for a port into the top, and device output handlers
314/// will read from [`PortBottom`] and write out the specific MCTP transport.
315///
316/// [`update_time()`](Self::update_time) should be called periodically to
317/// handle timeouts.
318///
319/// Packets not destined for the local EID will be forwarded out a port
320/// determined by the user-provided [`PortLookup`] implementation.
321pub struct Router<'r> {
322    inner: AsyncMutex<RouterInner<'r>>,
323    ports: &'r [PortTop<'r>],
324
325    /// Listeners for different message types.
326    // Has a separate non-async Mutex so it can be used by RouterAsyncListener::drop()
327    // TODO filter by more than just MsgType, maybe have a Map of some sort?
328    app_listeners:
329        BlockingMutex<[Option<(MsgType, WakerRegistration)>; MAX_LISTENERS]>,
330}
331
332pub struct RouterInner<'r> {
333    /// Core MCTP stack
334    stack: Stack,
335
336    // Wakers for RouterAsyncReqChannel and RouterAsyncRespChannel
337    app_receive_wakers: MultiWakerRegistration<MAX_RECEIVERS>,
338
339    lookup: &'r mut dyn PortLookup,
340}
341
342impl<'r> Router<'r> {
343    /// Create a new Router.
344    ///
345    /// The EID of the provided `stack` is used to match local destination packets.
346    ///
347    /// `ports` is a list of transport interfaces for the router. The indices
348    /// of the `ports`  slice are used as `PortId` identifiers.
349    ///
350    /// `lookup` callbacks define the routing table for outbound packets.
351    pub fn new(
352        stack: Stack,
353        ports: &'r [PortTop<'r>],
354        lookup: &'r mut dyn PortLookup,
355    ) -> Self {
356        let inner = RouterInner {
357            stack,
358            app_receive_wakers: MultiWakerRegistration::new(),
359            lookup,
360        };
361
362        Self {
363            inner: AsyncMutex::new(inner),
364            app_listeners: BlockingMutex::new(RefCell::new(
365                [const { None }; MAX_LISTENERS],
366            )),
367            ports,
368        }
369    }
370
371    /// Called periodically to update the clock and check timeouts.
372    ///
373    /// A suitable interval (milliseconds) for the next call to `update_time()` will
374    /// be returned, currently a maximum of 100 ms.
375    pub async fn update_time(&self, now_millis: u64) -> Result<u64> {
376        let mut inner = self.inner.lock().await;
377        let (next, expired) = inner.stack.update(now_millis)?;
378        if expired {
379            // Wake pending sockets in case one was waiting on a now-expired response.
380            // TODO something more efficient, maybe Reassembler should hold a waker?
381            inner.app_receive_wakers.wake();
382        }
383        Ok(next)
384    }
385
386    /// Provide an incoming packet to the router.
387    ///
388    /// This expects a single MCTP packet, with no transport binding header.
389    ///
390    /// Returns the packet's MCTP source EID for any valid packet,
391    /// regardless of whether the packet is handled, forwarded, or dropped.
392    pub async fn inbound(&self, pkt: &[u8], port: PortId) -> Option<Eid> {
393        let mut inner = self.inner.lock().await;
394
395        let Ok(header) = Reassembler::header(pkt) else {
396            return None;
397        };
398        // Source EID is returned even if packet routing fails
399        let ret_src = Some(Eid(header.source_endpoint_id()));
400
401        // Handle locally if possible
402        if inner.stack.is_local_dest(pkt) {
403            match inner.stack.receive(pkt) {
404                // Complete message
405                Ok(Some((msg, handle))) => {
406                    let typ = msg.typ;
407                    let tag = msg.tag;
408                    drop(inner);
409                    self.incoming_local(tag, typ, handle).await;
410                    return ret_src;
411                }
412                // Fragment consumed, message is incomplete
413                Ok(None) => {
414                    return ret_src;
415                }
416                Err(e) => {
417                    debug!("Dropped local recv packet. {}", e);
418                    return ret_src;
419                }
420            }
421        }
422
423        // Look for a route to forward to
424        let dest_eid = Eid(header.dest_endpoint_id());
425
426        let Some(p) = inner.lookup.by_eid(dest_eid, Some(port)) else {
427            debug!("No route for recv {}", dest_eid);
428            return ret_src;
429        };
430        drop(inner);
431
432        let Some(top) = self.ports.get(p.0 as usize) else {
433            debug!("Bad port ID from lookup");
434            return ret_src;
435        };
436
437        let _ = top.forward_packet(pkt).await;
438        ret_src
439    }
440
441    async fn incoming_local(
442        &self,
443        tag: Tag,
444        typ: MsgType,
445        handle: ReceiveHandle,
446    ) {
447        trace!("incoming local, type {}", typ.0);
448        if tag.is_owner() {
449            self.incoming_listener(typ, handle).await
450        } else {
451            self.incoming_response(tag, handle).await
452        }
453    }
454
455    async fn incoming_listener(&self, typ: MsgType, handle: ReceiveHandle) {
456        let mut inner = self.inner.lock().await;
457        let mut handle = Some(handle);
458
459        // wake the packet listener
460        self.app_listeners.lock(|a| {
461            let mut a = a.borrow_mut();
462            // Find the matching listener
463            for (cookie, entry) in a.iter_mut().enumerate() {
464                if let Some((t, waker)) = entry {
465                    trace!("entry. {} vs {}", t.0, typ.0);
466                    if *t == typ {
467                        // OK unwrap: only set once
468                        let handle = handle.take().unwrap();
469                        inner
470                            .stack
471                            .set_cookie(&handle, Some(AppCookie(cookie)));
472                        inner.stack.return_handle(handle);
473                        waker.wake();
474                        trace!("listener match");
475                        break;
476                    }
477                }
478            }
479        });
480
481        if let Some(handle) = handle.take() {
482            trace!("listener no match");
483            inner.stack.finished_receive(handle);
484        }
485    }
486
487    async fn incoming_response(&self, _tag: Tag, handle: ReceiveHandle) {
488        let mut inner = self.inner.lock().await;
489        inner.stack.return_handle(handle);
490        // TODO: inefficient waking them all. should
491        // probably wake only the useful one.
492        inner.app_receive_wakers.wake();
493    }
494
495    fn app_bind(&self, typ: MsgType) -> Result<AppCookie> {
496        self.app_listeners.lock(|a| {
497            let mut a = a.borrow_mut();
498
499            // Check for existing binds with the same type
500            for bind in a.iter() {
501                if bind.as_ref().is_some_and(|(t, _)| *t == typ) {
502                    return Err(Error::AddrInUse);
503                }
504            }
505
506            // Find a free slot
507            if let Some((i, bind)) =
508                a.iter_mut().enumerate().find(|(_i, bind)| bind.is_none())
509            {
510                *bind = Some((typ, WakerRegistration::new()));
511                return Ok(AppCookie(i));
512            }
513
514            Err(Error::NoSpace)
515        })
516    }
517
518    fn app_unbind(&self, cookie: AppCookie) -> Result<()> {
519        self.app_listeners.lock(|a| {
520            let mut a = a.borrow_mut();
521            let bind = a.get_mut(cookie.0).ok_or(Error::BadArgument)?;
522
523            if bind.is_none() {
524                return Err(Error::BadArgument);
525            }
526
527            // Clear the bind.
528            *bind = None;
529            // No need to wake any waker, unbind only occurs
530            // on RouterAsyncListener::drop.
531            Ok(())
532        })
533    }
534
535    /// Receive a message.
536    ///
537    /// Listeners will pass the cookie returned from `[app_bind]`.
538    /// Other receivers will pass `tag_eid`.
539    async fn app_recv_message<'f>(
540        &self,
541        cookie: Option<AppCookie>,
542        tag_eid: Option<(Tag, Eid)>,
543        buf: &'f mut [u8],
544    ) -> Result<(&'f mut [u8], Eid, MsgType, Tag, MsgIC)> {
545        // Allow single use inside poll_fn
546        let mut buf = Some(buf);
547
548        poll_fn(|cx| {
549            // Lock it inside the poll_fn
550            let l = self.inner.lock();
551            let l = pin!(l);
552            let mut inner = match l.poll(cx) {
553                Poll::Ready(i) => i,
554                Poll::Pending => return Poll::Pending,
555            };
556
557            trace!("poll recv message");
558
559            // Find the message's handle
560            // TODO: get_deferred is inefficient lookup, does it matter?
561            let handle = match (cookie, tag_eid) {
562                // lookup by cookie for Listener
563                (Some(cookie), None) => {
564                    inner.stack.get_deferred_bycookie(&[cookie])
565                }
566                // lookup by tag/eid for ReqChannel
567                (None, Some((tag, eid))) => inner.stack.get_deferred(eid, tag),
568                // one of them must have been set
569                _ => unreachable!(),
570            };
571
572            let Some(handle) = handle else {
573                // No message handle. Maybe it hasn't arrived yet, find the waker
574                // to register.
575
576                if let Some(cookie) = cookie {
577                    // This is a Listener.
578                    trace!("listener, cookie index {}", cookie.0);
579                    self.app_listeners.lock(|a| {
580                        let mut a = a.borrow_mut();
581                        let Some(bind) = a.get_mut(cookie.0) else {
582                            debug_assert!(false, "recv bad cookie");
583                            return;
584                        };
585                        let Some((_typ, waker)) = bind else {
586                            debug_assert!(false, "recv no listener");
587                            return;
588                        };
589                        waker.register(cx.waker());
590                    });
591                } else {
592                    // Other receivers.
593                    trace!("other recv");
594                    inner.app_receive_wakers.register(cx.waker());
595                }
596                trace!("pending");
597                return Poll::Pending;
598            };
599
600            // A matching message was found. Fetch it, copy the contents to the caller,
601            // and finish with it for the stack.
602            trace!("got handle");
603
604            let msg = inner.stack.fetch_message(&handle);
605
606            // OK unwrap, set above and only hit once on Poll::Ready
607            let buf = buf.take().unwrap();
608            let res = if msg.payload.len() > buf.len() {
609                trace!("no space");
610                Err(Error::NoSpace)
611            } else {
612                trace!("good len {}", msg.payload.len());
613                let buf = &mut buf[..msg.payload.len()];
614                buf.copy_from_slice(msg.payload);
615                Ok((buf, msg.source, msg.typ, msg.tag, msg.ic))
616            };
617
618            inner.stack.finished_receive(handle);
619            Poll::Ready(res)
620        })
621        .await
622    }
623
624    /// Used by traits to send a message, see comment on .send_vectored() methods
625    ///
626    /// TODO should handle loopback if eid matches local stack's
627    async fn app_send_message(
628        &self,
629        eid: Eid,
630        typ: MsgType,
631        tag: Option<Tag>,
632        tag_expires: bool,
633        integrity_check: MsgIC,
634        buf: &[&[u8]],
635        cookie: Option<AppCookie>,
636    ) -> Result<Tag> {
637        let mut inner = self.inner.lock().await;
638
639        let Some(p) = inner.lookup.by_eid(eid, None) else {
640            debug!("No route for recv {}", eid);
641            return Err(Error::TxFailure);
642        };
643
644        let Some(top) = self.ports.get(p.0 as usize) else {
645            debug!("Bad port ID from lookup");
646            return Err(Error::TxFailure);
647        };
648
649        let mtu = top.mtu;
650        let mut fragmenter = inner
651            .stack
652            .start_send(
653                eid,
654                typ,
655                tag,
656                tag_expires,
657                integrity_check,
658                Some(mtu),
659                cookie,
660            )
661            .inspect_err(|e| trace!("error fragmenter {}", e))?;
662        // release to allow other ports to continue work
663        drop(inner);
664
665        top.send_message(&mut fragmenter, buf).await
666    }
667
668    /// Only needs to be called for tags allocated with tag_expires=false
669    ///
670    /// Must only be called for owned tags.
671    async fn app_release_tag(&self, eid: Eid, tag: Tag) {
672        let Tag::Owned(tv) = tag else { unreachable!() };
673        let mut inner = self.inner.lock().await;
674
675        if let Err(e) = inner.stack.cancel_flow(eid, tv) {
676            warn!("flow cancel failed {}", e);
677        }
678    }
679
680    /// Create a `AsyncReqChannel` instance.
681    pub fn req(&'r self, eid: Eid) -> RouterAsyncReqChannel<'r> {
682        RouterAsyncReqChannel::new(eid, self)
683    }
684
685    /// Create a `AsyncListener` instance.
686    ///
687    /// Will receive incoming messages with the TO bit set for the given `typ`.
688    pub fn listener(&'r self, typ: MsgType) -> Result<RouterAsyncListener<'r>> {
689        let cookie = self.app_bind(typ)?;
690        Ok(RouterAsyncListener {
691            cookie,
692            router: self,
693        })
694    }
695
696    /// Retrieve the EID assigned to the local stack
697    pub async fn get_eid(&self) -> Eid {
698        let inner = self.inner.lock().await;
699        inner.stack.own_eid
700    }
701
702    /// Set the EID assigned to the local stack
703    pub async fn set_eid(&self, eid: Eid) -> mctp::Result<()> {
704        let mut inner = self.inner.lock().await;
705        inner.stack.set_eid(eid.0)
706    }
707}
708
709/// A request channel.
710pub struct RouterAsyncReqChannel<'r> {
711    eid: Eid,
712    sent_tag: Option<Tag>,
713    router: &'r Router<'r>,
714    tag_expires: bool,
715}
716
717impl<'r> RouterAsyncReqChannel<'r> {
718    fn new(eid: Eid, router: &'r Router<'r>) -> Self {
719        RouterAsyncReqChannel {
720            eid,
721            sent_tag: None,
722            tag_expires: true,
723            router,
724        }
725    }
726
727    /// Set the tag to not expire. That allows multiple calls to `send()`.
728    ///
729    /// `async_drop` must be called prior to drop.
730    pub fn tag_noexpire(&mut self) -> Result<()> {
731        if self.sent_tag.is_some() {
732            return Err(Error::BadArgument);
733        }
734        self.tag_expires = false;
735        Ok(())
736    }
737
738    /// This must be called prior to drop whenever `tag_noexpire()` is used.
739    ///
740    /// A workaround until async drop is implemented in Rust itself.
741    /// <https://github.com/rust-lang/rust/issues/126482>
742    pub async fn async_drop(self) {
743        if !self.tag_expires {
744            if let Some(tag) = self.sent_tag {
745                self.router.app_release_tag(self.eid, tag).await;
746            }
747        }
748    }
749}
750
751impl Drop for RouterAsyncReqChannel<'_> {
752    fn drop(&mut self) {
753        if !self.tag_expires && self.sent_tag.is_some() {
754            warn!("Didn't call async_drop()");
755        }
756    }
757}
758
759/// A request channel
760///
761/// Created with [`Router::req()`](Router::req).
762impl mctp::AsyncReqChannel for RouterAsyncReqChannel<'_> {
763    /// Send a message.
764    ///
765    /// This will async block until the message has been enqueued to the physical port.
766    /// Note that it will return failure immediately if the MCTP stack has no available tags,
767    /// that behaviour may need changing in future.
768    ///
769    /// Subsequent calls will fail unless tag_noexpire() was performed.
770    async fn send_vectored(
771        &mut self,
772        typ: MsgType,
773        integrity_check: MsgIC,
774        bufs: &[&[u8]],
775    ) -> Result<()> {
776        // For the first call, we pass a None tag, get an Owned one allocated.
777        // Subsequent calls will fail unless tag_noexpire() was performed.
778        let tag = self
779            .router
780            .app_send_message(
781                self.eid,
782                typ,
783                self.sent_tag,
784                self.tag_expires,
785                integrity_check,
786                bufs,
787                None,
788            )
789            .await?;
790        debug_assert!(matches!(tag, Tag::Owned(_)));
791        self.sent_tag = Some(tag);
792        Ok(())
793    }
794
795    async fn recv<'f>(
796        &mut self,
797        buf: &'f mut [u8],
798    ) -> Result<(MsgType, MsgIC, &'f mut [u8])> {
799        let Some(Tag::Owned(tv)) = self.sent_tag else {
800            debug!("recv without send");
801            return Err(Error::BadArgument);
802        };
803        let recv_tag = Tag::Unowned(tv);
804        let (buf, eid, typ, tag, ic) = self
805            .router
806            .app_recv_message(None, Some((recv_tag, self.eid)), buf)
807            .await?;
808        debug_assert_eq!(tag, recv_tag);
809        debug_assert_eq!(eid, self.eid);
810        Ok((typ, ic, buf))
811    }
812
813    fn remote_eid(&self) -> Eid {
814        self.eid
815    }
816}
817
818/// A response channel.
819///
820/// Returned by [`RouterAsyncListener::recv`](mctp::AsyncListener::recv).
821pub struct RouterAsyncRespChannel<'r> {
822    eid: Eid,
823    tv: TagValue,
824    router: &'r Router<'r>,
825    typ: MsgType,
826}
827
828impl<'r> mctp::AsyncRespChannel for RouterAsyncRespChannel<'r> {
829    type ReqChannel<'a>
830        = RouterAsyncReqChannel<'r>
831    where
832        Self: 'a;
833
834    /// Send a message.
835    ///
836    /// See description of `RouterAsyncReqChannel::send_vectored()`.
837    async fn send_vectored(
838        &mut self,
839        integrity_check: MsgIC,
840        bufs: &[&[u8]],
841    ) -> Result<()> {
842        let tag = Some(Tag::Unowned(self.tv));
843        self.router
844            .app_send_message(
845                self.eid,
846                self.typ,
847                tag,
848                false,
849                integrity_check,
850                bufs,
851                None,
852            )
853            .await?;
854        Ok(())
855    }
856
857    fn remote_eid(&self) -> Eid {
858        self.eid
859    }
860
861    fn req_channel(&self) -> mctp::Result<Self::ReqChannel<'_>> {
862        Ok(RouterAsyncReqChannel::new(self.eid, self.router))
863    }
864}
865
866/// A listener.
867///
868/// Created with [`Router::listener()`](Router::listener).
869pub struct RouterAsyncListener<'r> {
870    router: &'r Router<'r>,
871    cookie: AppCookie,
872}
873
874impl<'r> mctp::AsyncListener for RouterAsyncListener<'r> {
875    // type RespChannel<'a> = RouterAsyncRespChannel<'a> where Self: 'a;
876    type RespChannel<'a>
877        = RouterAsyncRespChannel<'r>
878    where
879        Self: 'a;
880
881    async fn recv<'f>(
882        &mut self,
883        buf: &'f mut [u8],
884    ) -> mctp::Result<(MsgType, MsgIC, &'f mut [u8], Self::RespChannel<'_>)>
885    {
886        let (msg, eid, typ, tag, ic) = self
887            .router
888            .app_recv_message(Some(self.cookie), None, buf)
889            .await?;
890
891        let Tag::Owned(tv) = tag else {
892            debug_assert!(false, "listeners only accept owned tags");
893            return Err(Error::InternalError);
894        };
895
896        let resp = RouterAsyncRespChannel {
897            eid,
898            tv,
899            router: self.router,
900            typ,
901        };
902        Ok((typ, ic, msg, resp))
903    }
904}
905
906impl Drop for RouterAsyncListener<'_> {
907    fn drop(&mut self) {
908        if self.router.app_unbind(self.cookie).is_err() {
909            // should be infallible, cookie should be valid.
910            debug_assert!(false, "bad unbind");
911        }
912    }
913}