Skip to main content

rs_matter/dm/clusters/app/
webrtc_prov.rs

1/*
2 *
3 *    Copyright (c) 2026 Project CHIP Authors
4 *
5 *    Licensed under the Apache License, Version 2.0 (the "License");
6 *    you may not use this file except in compliance with the License.
7 *    You may obtain a copy of the License at
8 *
9 *        http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *    Unless required by applicable law or agreed to in writing, software
12 *    distributed under the License is distributed on an "AS IS" BASIS,
13 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *    See the License for the specific language governing permissions and
15 *    limitations under the License.
16 */
17
18//! The WebRTC Transport Provider cluster (0x0553).
19//!
20//! Implements the server side of the Matter WebRTC signalling used by Matter
21//! cameras to expose audio/video streams to Matter controllers via the standard
22//! WebRTC Offer/Answer flow and trickle-ICE exchange.
23//!
24//! # Architecture (Pattern B1 — "Hooks")
25//!
26//! [`WebRtcProvHandler`] implements the spec-defined session table and
27//! command state machine. All media-specific work (SDP generation, ICE
28//! gathering, media-source lifecycle) is delegated to a user-supplied
29//! [`WebRtcHooks`] implementation.
30//!
31//! ```text
32//! ┌───────────────────┐   ClusterAsyncHandler    ┌────────────────┐
33//! │                   │◀── inbound commands ─────│   rs-matter IM │
34//! │  WebRtcProvHandler│                          │    dispatcher  │
35//! └───────┬───────────┘                          └────────────────┘
36//!         │ delegates SDP/ICE work
37//!         ▼
38//! ┌───────────────────┐
39//! │    WebRtcHooks    │  user-supplied (e.g. str0m-based for std)
40//! └───────────────────┘
41//! ```
42//!
43//! # Const generics
44//!
45//! Every static allocation is explicit at the call-site:
46//!
47//! * `N_SESSIONS` — maximum concurrent WebRTC sessions held in the table.
48//!   The Matter spec mandates MinLimit = 3; typical values are 3..=8.
49//! * `SDP_LEN` — maximum SDP blob size, in bytes. Full WebRTC SDPs range
50//!   2–8 KiB depending on codec support.
51//! * `OUT_LEN` — scratch-buffer size, in bytes, for the outbound invoke
52//!   payload produced by [`WebRtcProvHandler::run`]. Must be large enough
53//!   for a trickle-ICE batch; typical value 1 KiB.
54//!
55//! On `no_std` / embedded targets, [`WebRtcProvHandler::run`] holds an
56//! `[u8; OUT_LEN]` plus, on the `Offer`/`Answer` paths, an `[u8; SDP_LEN]`
57//! in its async-future state. Pick `SDP_LEN` to fit the smallest SDP your
58//! deployment will negotiate (e.g. 2 KiB for a single H.264 + Opus track)
59//! to keep the future small enough for the executor's task slot.
60//!
61//! # Scope
62//!
63//! * Inbound command state machine: fully implemented for
64//!   `SolicitOffer` / `ProvideOffer` / `ProvideAnswer` /
65//!   `ProvideICECandidates` / `EndSession`.
66//! * Fabric-scoped `CurrentSessions` attribute.
67//! * `WebRtcHooks` trait with `async` hooks for the media-plane work.
68//! * Outbound push to `WebRTCTransportRequestor`:
69//!   [`OutboundWork::Offer`] (deferred-offer flow — camera-initiated
70//!   SDP Offer following an earlier `SolicitOffer`),
71//!   [`OutboundWork::Answer`] (SDP Answer for `ProvideOffer`),
72//!   [`OutboundWork::IceCandidates`] (trickle ICE to traverse NAT) and
73//!   [`OutboundWork::End`] (camera-initiated teardown) are driven from
74//!   [`WebRtcProvHandler::run`] via [`WebRtcHooks::next_outbound`].
75
76use core::cell::{Cell, RefCell};
77use core::future::Future;
78
79use crate::dm::clusters::decl::globals::{
80    ICECandidateStruct, StreamUsageEnum, WebRTCEndReasonEnum, WebRTCSessionStructArrayBuilder,
81    WebRTCSessionStructBuilder,
82};
83use crate::dm::{
84    ArrayAttributeRead, Cluster, Dataver, EndptId, HandlerContext, InvokeContext, ReadContext,
85};
86use crate::error::{Error, ErrorCode};
87use crate::tlv::{Nullable, TLVArray, TLVBuilderParent};
88use crate::transport::exchange::Exchange;
89use crate::utils::storage::Vec;
90use crate::utils::sync::blocking::Mutex;
91use crate::with;
92
93use super::super::decl::web_rtc_transport_provider as decl;
94use super::super::decl::web_rtc_transport_requestor::WebRtcTransportRequestorClient;
95
96#[allow(unused_imports)]
97pub use crate::dm::clusters::decl::web_rtc_transport_provider::*;
98
99/// Errors surfaced by [`WebRtcHooks`] implementations. These map to
100/// Matter cluster-status codes.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102#[cfg_attr(feature = "defmt", derive(defmt::Format))]
103pub enum WebRtcError {
104    /// `INVALID_IN_STATE` — the command cannot be processed in the current
105    /// session state (e.g. `ProvideAnswer` before an Offer was sent).
106    InvalidInState,
107    /// `INVALID_COMMAND` — the command parameters are structurally invalid
108    /// (e.g. `SolicitOffer` with no stream IDs at all).
109    InvalidCommand,
110    /// `DYNAMIC_CONSTRAINT_ERROR` — the request refers to stream IDs,
111    /// codecs or capabilities the hooks cannot satisfy.
112    DynamicConstraint,
113    /// `RESOURCE_EXHAUSTED` — the media source cannot accept a new session.
114    ResourceExhausted,
115    /// `FAILURE` — any other hooks-level failure.
116    Failure,
117}
118
119impl From<WebRtcError> for Error {
120    fn from(e: WebRtcError) -> Self {
121        match e {
122            WebRtcError::InvalidInState => ErrorCode::InvalidAction.into(),
123            WebRtcError::InvalidCommand => ErrorCode::InvalidCommand.into(),
124            WebRtcError::DynamicConstraint => ErrorCode::DynamicConstraintError.into(),
125            WebRtcError::ResourceExhausted => ErrorCode::ResourceExhausted.into(),
126            WebRtcError::Failure => ErrorCode::Failure.into(),
127        }
128    }
129}
130
131/// Owned parameters of a `SolicitOffer` / `ProvideOffer` request.
132///
133/// Owned (not TLV-borrowed) so it can cross `.await` points when passed
134/// to async hooks.
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
136#[cfg_attr(feature = "defmt", derive(defmt::Format))]
137pub struct OfferParams {
138    /// The stream usage requested by the peer.
139    pub stream_usage: StreamUsageEnum,
140    /// Originating endpoint of the controller hosting the
141    /// `WebRTCTransportRequestor`.
142    pub originating_endpoint_id: EndptId,
143    /// Requested video-stream ID.
144    /// - `None`           → field absent from request
145    /// - `Some(None)`     → explicit NULL
146    /// - `Some(Some(id))` → concrete ID
147    pub video_stream_id: Option<Option<u16>>,
148    /// Requested audio-stream ID, same encoding as `video_stream_id`.
149    pub audio_stream_id: Option<Option<u16>>,
150    /// Whether the peer opted in to metadata delivery.
151    pub metadata_enabled: bool,
152}
153
154/// Outcome of [`WebRtcHooks::on_solicit_offer`].
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156#[cfg_attr(feature = "defmt", derive(defmt::Format))]
157pub struct SolicitOutcome {
158    /// If `false`, the `SolicitOfferResponse` has `deferredOffer = false`
159    /// and the hooks are expected to enqueue [`OutboundWork::Offer`]
160    /// shortly so the Offer can be pushed via
161    /// `WebRTCTransportRequestor::Offer`. If `true`, the media source was
162    /// not ready; the hooks must enqueue the Offer when it becomes
163    /// available.
164    pub deferred: bool,
165    /// Resolved video-stream ID (absent = omit field in response).
166    pub video_stream_id: Option<u16>,
167    /// Resolved audio-stream ID (absent = omit field in response).
168    pub audio_stream_id: Option<u16>,
169}
170
171/// Outcome of [`WebRtcHooks::on_offer`].
172///
173/// The SDP Answer itself is buffered by the hooks implementation and
174/// pushed asynchronously via [`OutboundWork::Answer`] /
175/// [`WebRtcHooks::fill_answer`].
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
177#[cfg_attr(feature = "defmt", derive(defmt::Format))]
178pub struct AnswerOutcome {
179    /// Resolved video-stream ID, or NULL (= session has no video).
180    pub video_stream_id: Option<u16>,
181    /// Resolved audio-stream ID, or NULL (= session has no audio).
182    pub audio_stream_id: Option<u16>,
183}
184
185/// A descriptor of an outbound invocation, returned by
186/// [`WebRtcHooks::next_outbound`] and dispatched by
187/// [`WebRtcProvHandler::run`].
188///
189/// The payload itself is NOT carried here: for [`OutboundWork::IceCandidates`]
190/// the handler calls [`WebRtcHooks::take_ice_candidates`] before opening
191/// the invoke, snapshotting the queue once into a stack-allocated buffer;
192/// the sync build closure then iterates that snapshot, so MRP retransmits
193/// re-emit the same TLV array.
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
195#[cfg_attr(feature = "defmt", derive(defmt::Format))]
196pub enum OutboundWork {
197    /// `WebRTCTransportRequestor::Offer` — deferred-offer flow:
198    /// the controller previously called `SolicitOffer` and the device
199    /// returned `deferredOffer = true`; once the local media source
200    /// produces an SDP Offer, the hooks enqueue this work item so the
201    /// handler pushes it to the controller. The handler invokes
202    /// [`WebRtcHooks::take_offer_sdp`] to obtain the SDP bytes.
203    Offer {
204        /// Target session.
205        session_id: u16,
206    },
207    /// `WebRTCTransportRequestor::Answer` — push the SDP Answer for a
208    /// session whose Offer arrived via `ProvideOffer`. The handler
209    /// invokes [`WebRtcHooks::fill_answer`] to obtain the SDP bytes.
210    Answer {
211        /// Target session.
212        session_id: u16,
213    },
214    /// `WebRTCTransportRequestor::ICECandidates` — trickle a batch of
215    /// locally-gathered ICE candidates to the peer (needed for NAT
216    /// traversal).
217    IceCandidates {
218        /// Target session.
219        session_id: u16,
220    },
221    /// `WebRTCTransportRequestor::End` — notify the peer that we are
222    /// tearing down the session (e.g. camera power-down, media source
223    /// lost).
224    End {
225        /// Target session.
226        session_id: u16,
227        /// Reason for termination.
228        reason: WebRTCEndReasonEnum,
229    },
230}
231
232/// Receiver for [`WebRtcHooks::take_ice_candidates`]. The hook pushes
233/// one candidate SDP string at a time; storage (size, layout, backing
234/// allocator) is owned by the caller — typically
235/// [`WebRtcProvHandler::push_outbound`], which stack-allocates a
236/// bounded buffer for the duration of one outbound `IceCandidates`
237/// invoke.
238pub trait IceCandidateSink {
239    /// Append `candidate` to the snapshot. Returns
240    /// [`WebRtcError::ResourceExhausted`] if the sink's bounded
241    /// storage is full — the hook MAY choose to drop further
242    /// candidates and continue (deliver next round) or surface the
243    /// error to the caller.
244    fn push(&mut self, candidate: &str) -> Result<(), WebRtcError>;
245}
246
247/// Default `IceCandidateSink` backed by a `crate::utils::storage::Vec`
248/// of bounded-length `heapless::String`s. `WebRtcProvHandler::push_outbound`
249/// stack-allocates one of these per outbound `IceCandidates` invoke and
250/// passes a `&mut` to the hook.
251struct VecCandidateSink<'a, const CAND_LEN: usize, const MAX_CAND: usize> {
252    buf: &'a mut Vec<heapless::String<CAND_LEN>, MAX_CAND>,
253}
254
255impl<const CAND_LEN: usize, const MAX_CAND: usize> IceCandidateSink
256    for VecCandidateSink<'_, CAND_LEN, MAX_CAND>
257{
258    fn push(&mut self, candidate: &str) -> Result<(), WebRtcError> {
259        let mut s = heapless::String::<CAND_LEN>::new();
260        s.push_str(candidate)
261            .map_err(|_| WebRtcError::ResourceExhausted)?;
262        self.buf
263            .push(s)
264            .map_err(|_| WebRtcError::ResourceExhausted)?;
265        Ok(())
266    }
267}
268
269/// The device-side WebRTC media-plane plumbing. Cluster state (session
270/// table, dataver, attribute/command dispatch) is owned by
271/// [`WebRtcProvHandler`]; this trait captures every side-effect the
272/// spec commands have on the actual WebRTC peer.
273///
274/// All methods are `async` and invoked inline from the handler's command
275/// dispatch. Implementations MUST NOT perform blocking I/O.
276pub trait WebRtcHooks {
277    /// Handle an inbound `SolicitOffer` command. The session ID has been
278    /// allocated; the hook decides whether to respond with `deferred = false`
279    /// (in which case it MUST shortly enqueue an [`OutboundWork::Offer`] so
280    /// the handler can push the SDP Offer via
281    /// `WebRTCTransportRequestor::Offer`) or `deferred = true` (Offer
282    /// pushed when the media source becomes ready).
283    ///
284    /// `SolicitOfferResponse` itself does NOT carry an SDP — see Matter Spec -
285    /// so this hook never returns Offer bytes inline.
286    async fn on_solicit_offer(
287        &self,
288        session_id: u16,
289        params: &OfferParams,
290    ) -> Result<SolicitOutcome, WebRtcError>;
291
292    /// Handle an inbound `ProvideOffer` command. The SDP Offer has been
293    /// validated as UTF-8; the hooks buffers the SDP Answer internally
294    /// and enqueues an [`OutboundWork::Answer`] so the handler can push
295    /// it to the peer's `WebRTCTransportRequestor::Answer` command.
296    async fn on_offer(
297        &self,
298        session_id: u16,
299        sdp: &str,
300        params: &OfferParams,
301    ) -> Result<AnswerOutcome, WebRtcError>;
302
303    /// Handle an inbound `ProvideAnswer` for a session whose Offer THIS
304    /// node originally sent (deferred-offer flow).
305    async fn on_answer(&self, session_id: u16, sdp: &str) -> Result<(), WebRtcError>;
306
307    /// Handle an inbound batch of remote ICE candidates.
308    async fn on_ice_candidates(
309        &self,
310        session_id: u16,
311        candidates: &TLVArray<'_, ICECandidateStruct<'_>>,
312    ) -> Result<(), WebRtcError>;
313
314    /// Handle an inbound `EndSession`. The session entry is removed from
315    /// the table immediately after this call returns.
316    async fn on_end_session(
317        &self,
318        session_id: u16,
319        reason: WebRTCEndReasonEnum,
320    ) -> Result<(), WebRtcError>;
321
322    /// Await the next outbound invocation the camera wants to push to the
323    /// controller. Called in a tight loop from [`WebRtcProvHandler::run`]:
324    /// when the hook has nothing to send it MUST `.await` a future that
325    /// never completes (e.g. [`core::future::pending`]) or parks on an
326    /// internal signal.
327    ///
328    /// A default implementation is provided that parks forever, i.e.
329    /// opts the implementor out of outbound support. Override to enable
330    /// trickle-ICE and camera-initiated session end.
331    async fn next_outbound(&self) -> OutboundWork {
332        core::future::pending().await
333    }
334
335    /// Snapshot-and-consume the queued ICE candidates for
336    /// `session_id` into `out`, in queue order. Called by the handler
337    /// **outside** the sync build closure of an outbound
338    /// `IceCandidates` invoke — analogous to [`Self::take_offer_sdp`] /
339    /// [`Self::take_answer_sdp`]. The handler then iterates the
340    /// snapshot inside the (sync, idempotent) build closure to write
341    /// the wire array; MRP retransmits re-iterate the same snapshot,
342    /// so the closure stays idempotent without the hook seeing the
343    /// retransmit.
344    ///
345    /// If the invoke fails after this hook returns, the snapshotted
346    /// candidates are lost — same semantics as the SDP paths. The
347    /// implementation MAY decide to keep a backup if it wants
348    /// at-least-once delivery, but the default contract is at-most-once.
349    ///
350    /// Default: returns `Err(WebRtcError::InvalidInState)` — override
351    /// alongside [`Self::next_outbound`] when emitting
352    /// [`OutboundWork::IceCandidates`].
353    async fn take_ice_candidates(
354        &self,
355        _session_id: u16,
356        _out: &mut dyn IceCandidateSink,
357    ) -> Result<(), WebRtcError> {
358        Err(WebRtcError::InvalidInState)
359    }
360
361    /// Write the SDP Answer bytes for `session_id` into the supplied
362    /// buffer, returning the number of bytes written. Called by the
363    /// handler immediately after [`Self::next_outbound`] returns
364    /// [`OutboundWork::Answer`].
365    ///
366    /// If the buffer is too small the hooks SHOULD return
367    /// [`WebRtcError::ResourceExhausted`]; if no Answer is queued for
368    /// `session_id` return [`WebRtcError::InvalidInState`].
369    ///
370    /// Default: errors out — override when overriding [`Self::on_offer`].
371    async fn take_answer_sdp(
372        &self,
373        _session_id: u16,
374        _sdp_out: &mut [u8],
375    ) -> Result<usize, WebRtcError> {
376        Err(WebRtcError::InvalidInState)
377    }
378
379    /// Write the SDP Offer bytes for `session_id` into the supplied
380    /// buffer, returning the number of bytes written. Called by the
381    /// handler immediately after [`Self::next_outbound`] returns
382    /// [`OutboundWork::Offer`].
383    ///
384    /// If the buffer is too small the hooks SHOULD return
385    /// [`WebRtcError::ResourceExhausted`]; if no Offer is queued for
386    /// `session_id` return [`WebRtcError::InvalidInState`].
387    ///
388    /// Default: errors out — override to enable the deferred-offer
389    /// flow ([`SolicitOutcome::deferred = true`](SolicitOutcome) on a
390    /// `SolicitOffer` response, followed by an [`OutboundWork::Offer`]
391    /// from [`Self::next_outbound`]).
392    async fn take_offer_sdp(
393        &self,
394        _session_id: u16,
395        _sdp_out: &mut [u8],
396    ) -> Result<usize, WebRtcError> {
397        Err(WebRtcError::InvalidInState)
398    }
399}
400
401impl<T> WebRtcHooks for &T
402where
403    T: WebRtcHooks,
404{
405    fn on_solicit_offer(
406        &self,
407        session_id: u16,
408        params: &OfferParams,
409    ) -> impl Future<Output = Result<SolicitOutcome, WebRtcError>> {
410        (*self).on_solicit_offer(session_id, params)
411    }
412
413    fn on_offer(
414        &self,
415        session_id: u16,
416        sdp: &str,
417        params: &OfferParams,
418    ) -> impl Future<Output = Result<AnswerOutcome, WebRtcError>> {
419        (*self).on_offer(session_id, sdp, params)
420    }
421
422    fn on_answer(
423        &self,
424        session_id: u16,
425        sdp: &str,
426    ) -> impl Future<Output = Result<(), WebRtcError>> {
427        (*self).on_answer(session_id, sdp)
428    }
429
430    fn on_ice_candidates(
431        &self,
432        session_id: u16,
433        candidates: &TLVArray<'_, ICECandidateStruct<'_>>,
434    ) -> impl Future<Output = Result<(), WebRtcError>> {
435        (*self).on_ice_candidates(session_id, candidates)
436    }
437
438    fn on_end_session(
439        &self,
440        session_id: u16,
441        reason: WebRTCEndReasonEnum,
442    ) -> impl Future<Output = Result<(), WebRtcError>> {
443        (*self).on_end_session(session_id, reason)
444    }
445
446    fn next_outbound(&self) -> impl Future<Output = OutboundWork> {
447        (*self).next_outbound()
448    }
449
450    fn take_ice_candidates(
451        &self,
452        session_id: u16,
453        out: &mut dyn IceCandidateSink,
454    ) -> impl Future<Output = Result<(), WebRtcError>> {
455        (*self).take_ice_candidates(session_id, out)
456    }
457
458    fn take_answer_sdp(
459        &self,
460        session_id: u16,
461        sdp_out: &mut [u8],
462    ) -> impl Future<Output = Result<usize, WebRtcError>> {
463        (*self).take_answer_sdp(session_id, sdp_out)
464    }
465
466    fn take_offer_sdp(
467        &self,
468        session_id: u16,
469        sdp_out: &mut [u8],
470    ) -> impl Future<Output = Result<usize, WebRtcError>> {
471        (*self).take_offer_sdp(session_id, sdp_out)
472    }
473}
474
475/// Internal session-state tracked by the handler.
476#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
477#[cfg_attr(feature = "defmt", derive(defmt::Format))]
478enum SessionState {
479    /// Created via `SolicitOffer` with a deferred Offer; awaiting the
480    /// hooks to enqueue [`OutboundWork::Offer`].
481    AwaitingDeferredOffer,
482    /// Offer was sent (immediately via SolicitOfferResponse or later via
483    /// the deferred flow). Awaiting `ProvideAnswer`.
484    AwaitingAnswer,
485    /// `ProvideOffer` arrived peer-initiated and was answered inline, OR
486    /// `ProvideAnswer` arrived for an already-sent Offer. Signalling
487    /// done; ICE candidates may still flow.
488    Established,
489}
490
491/// Owned session-table row. Kept small + `Copy` so snapshotting for
492/// iteration is cheap.
493#[derive(Debug, Clone, Copy)]
494#[cfg_attr(feature = "defmt", derive(defmt::Format))]
495struct SessionEntry {
496    id: u16,
497    fab_idx: u8,
498    peer_node_id: u64,
499    peer_endpoint_id: EndptId,
500    stream_usage: StreamUsageEnum,
501    /// `None` = NULL in spec; `Some(x)` = concrete stream ID.
502    video_stream_id: Option<u16>,
503    /// `None` = NULL in spec; `Some(x)` = concrete stream ID.
504    audio_stream_id: Option<u16>,
505    metadata_enabled: bool,
506    state: SessionState,
507}
508
509/// The WebRTC Transport Provider cluster handler.
510///
511/// See the [module documentation](self) for the architecture and usage.
512pub struct WebRtcProvHandler<
513    H: WebRtcHooks,
514    const N_SESSIONS: usize,
515    const SDP_LEN: usize,
516    const OUT_LEN: usize,
517    const CAND_LEN: usize,
518    const MAX_CAND: usize,
519> {
520    dataver: Dataver,
521    endpoint_id: EndptId,
522    hooks: H,
523    sessions: Mutex<RefCell<Vec<SessionEntry, N_SESSIONS>>>,
524    next_id: Mutex<Cell<u16>>,
525}
526
527impl<
528        H: WebRtcHooks,
529        const N_SESSIONS: usize,
530        const SDP_LEN: usize,
531        const OUT_LEN: usize,
532        const CAND_LEN: usize,
533        const MAX_CAND: usize,
534    > WebRtcProvHandler<H, N_SESSIONS, SDP_LEN, OUT_LEN, CAND_LEN, MAX_CAND>
535{
536    /// Cluster metadata exposed to the data-model dispatcher.
537    pub const CLUSTER: Cluster<'static> = decl::FULL_CLUSTER
538        .with_revision(2)
539        .with_attrs(with!(required))
540        .with_cmds(with!(
541            decl::CommandId::SolicitOffer
542                | decl::CommandId::ProvideOffer
543                | decl::CommandId::ProvideAnswer
544                | decl::CommandId::ProvideICECandidates
545                | decl::CommandId::EndSession
546        ));
547
548    /// Construct a new handler.
549    pub const fn new(dataver: Dataver, endpoint_id: EndptId, hooks: H) -> Self {
550        Self {
551            dataver,
552            endpoint_id,
553            hooks,
554            sessions: Mutex::new(RefCell::new(Vec::new())),
555            next_id: Mutex::new(Cell::new(1)),
556        }
557    }
558
559    /// Wrap in the generic async adaptor for registration with a
560    /// `rs-matter` `Node`.
561    pub const fn adapt(self) -> decl::HandlerAsyncAdaptor<Self> {
562        decl::HandlerAsyncAdaptor(self)
563    }
564
565    /// Remove every session owned by the given fabric index. Callers MUST
566    /// invoke this when a fabric is removed (spec §"Fabric-scoped data"
567    /// for fabric removal). Hooks are NOT notified.
568    pub fn remove_fabric_sessions(&self, fab_idx: u8) {
569        let changed = self.sessions.lock(|cell| {
570            let mut sessions = cell.borrow_mut();
571            let before = sessions.len();
572            sessions.retain(|s| s.fab_idx != fab_idx);
573            before != sessions.len()
574        });
575        if changed {
576            self.dataver.changed();
577        }
578    }
579
580    /// Get the endpoint this handler is mounted on.
581    pub const fn endpoint_id(&self) -> EndptId {
582        self.endpoint_id
583    }
584
585    fn allocate_id(&self) -> u16 {
586        self.sessions.lock(|cell| {
587            let sessions = cell.borrow();
588            self.next_id.lock(|n| loop {
589                let candidate = n.get();
590                // Wrap and skip zero (spec: session ID MUST be non-zero).
591                let next = if candidate == u16::MAX {
592                    1
593                } else {
594                    candidate + 1
595                };
596                n.set(next);
597                if candidate != 0 && !sessions.iter().any(|s| s.id == candidate) {
598                    return candidate;
599                }
600            })
601        })
602    }
603
604    fn session_copy(&self, id: u16) -> Option<SessionEntry> {
605        self.sessions
606            .lock(|cell| cell.borrow().iter().find(|s| s.id == id).copied())
607    }
608
609    fn upsert_session(&self, entry: SessionEntry) -> Result<(), Error> {
610        self.sessions.lock(|cell| {
611            let mut sessions = cell.borrow_mut();
612            if let Some(existing) = sessions.iter_mut().find(|s| s.id == entry.id) {
613                *existing = entry;
614                Ok(())
615            } else {
616                sessions
617                    .push(entry)
618                    .map_err(|_| Error::from(ErrorCode::ResourceExhausted))
619            }
620        })
621    }
622
623    fn remove_session(&self, id: u16) {
624        self.sessions.lock(|cell| {
625            let mut sessions = cell.borrow_mut();
626            sessions.retain(|s| s.id != id);
627        });
628    }
629
630    fn set_state(&self, id: u16, state: SessionState) {
631        self.sessions.lock(|cell| {
632            if let Some(s) = cell.borrow_mut().iter_mut().find(|s| s.id == id) {
633                s.state = state;
634            }
635        });
636    }
637
638    fn check_peer(&self, s: &SessionEntry, fab_idx: u8, peer: u64) -> Result<(), Error> {
639        // Spec: NOT_FOUND if the session does not belong to the accessing
640        // fabric. Peer-node mismatch within the same fabric is also
641        // NOT_FOUND to avoid leaking existence.
642        if s.fab_idx != fab_idx || s.peer_node_id != peer {
643            Err(ErrorCode::NotFound.into())
644        } else {
645            Ok(())
646        }
647    }
648
649    /// Encode and send a single [`OutboundWork`] item.
650    ///
651    /// Opens a new initiator exchange to the session's peer via
652    /// [`Exchange::initiate`] (which in turn uses the CASE session cache
653    /// and mDNS if needed), encodes the request payload into a fixed-size
654    /// scratch buffer, and invokes the paired `WebRTCTransportRequestor`
655    /// cluster on the peer's originating endpoint.
656    ///
657    /// Returns `Ok(())` without doing anything if the session has been
658    /// removed meanwhile (e.g. peer sent `EndSession` in the race window).
659    async fn push_outbound(
660        &self,
661        ctx: &impl HandlerContext,
662        work: OutboundWork,
663    ) -> Result<(), Error> {
664        let session_id = match work {
665            OutboundWork::Offer { session_id } => session_id,
666            OutboundWork::Answer { session_id } => session_id,
667            OutboundWork::IceCandidates { session_id } => session_id,
668            OutboundWork::End { session_id, .. } => session_id,
669        };
670        let Some(session) = self.session_copy(session_id) else {
671            // Session dropped concurrently — not an error, just a race.
672            return Ok(());
673        };
674
675        // Pre-fetch any async-supplied request payload (SDPs for
676        // Offer/Answer; the ICE-candidate snapshot is similarly
677        // taken outside its own build closure further down). The
678        // `take_*` hooks consume the respective queues, so they must
679        // run exactly once — outside the sync `FnMut` build closure
680        // that MRP may re-invoke on retransmit. The build closure
681        // then works off the snapshot and stays idempotent.
682        // `OutboundWork::End` carries its parameters by value and
683        // needs no pre-fetch.
684        let mut sdp_buf = [0u8; SDP_LEN];
685        let sdp_len = match &work {
686            OutboundWork::Offer { session_id } => self
687                .hooks
688                .take_offer_sdp(*session_id, &mut sdp_buf)
689                .await
690                .map_err(Error::from)?,
691            OutboundWork::Answer { session_id } => self
692                .hooks
693                .take_answer_sdp(*session_id, &mut sdp_buf)
694                .await
695                .map_err(Error::from)?,
696            _ => 0,
697        };
698        let sdp = core::str::from_utf8(&sdp_buf[..sdp_len])
699            .map_err(|_| Error::from(ErrorCode::Invalid))?;
700
701        // Offer side-effect: transition the session to AwaitingAnswer
702        // so a subsequent ProvideAnswer is accepted. Doing this here
703        // (rather than inside the build closure) keeps the transition
704        // idempotent under retransmit.
705        if let OutboundWork::Offer { session_id } = &work {
706            self.set_state(*session_id, SessionState::AwaitingAnswer);
707        }
708
709        // A WebRTC peer session is always on a real (non-zero) fabric.
710        let fab_idx = core::num::NonZeroU8::new(session.fab_idx).ok_or(ErrorCode::Invalid)?;
711        let exchange =
712            Exchange::initiate(ctx.matter(), ctx.crypto(), fab_idx, session.peer_node_id).await?;
713
714        // Single-shot client-trait invoke for all four commands. The
715        // codegen `WebRtcTransportRequestorClient` methods bake in
716        // cluster ID (0x0554), command ID, the request opcode
717        // (`InvokeRequest`), the MRP retransmit loop, chunk draining
718        // (trailing `StatusResponse(Success)` ACK) and IM-status-to-
719        // `Error` conversion. The build closure is a sync `FnMut`,
720        // so it must be idempotent under MRP retransmit; any state-
721        // changing hook (e.g. consuming the trickle-ICE queue) runs
722        // *outside* the closure.
723        let endpoint = session.peer_endpoint_id;
724        match &work {
725            OutboundWork::Offer { session_id } => {
726                let session_id = *session_id;
727                exchange
728                    .web_rtc_transport_requestor()
729                    .offer(endpoint, |req| {
730                        req.web_rtc_session_id(session_id)?
731                            .sdp(sdp)?
732                            .ice_servers()?
733                            .none()
734                            .ice_transport_policy(None)?
735                            .end()
736                    })
737                    .await?;
738            }
739            OutboundWork::Answer { session_id } => {
740                let session_id = *session_id;
741                exchange
742                    .web_rtc_transport_requestor()
743                    .answer(endpoint, |req| {
744                        req.web_rtc_session_id(session_id)?.sdp(sdp)?.end()
745                    })
746                    .await?;
747            }
748            OutboundWork::IceCandidates { session_id } => {
749                let session_id = *session_id;
750                // Snapshot the candidates OUTSIDE the build closure
751                // (mirrors `take_offer_sdp` / `take_answer_sdp`). The
752                // hook consumes the queue; the sync FnMut build
753                // closure below iterates the snapshot, so MRP
754                // retransmits re-iterate the same data and the
755                // closure stays idempotent. `CAND_LEN` / `MAX_CAND`
756                // bound the per-invoke snapshot — candidates beyond
757                // the bound are dropped by `IceCandidateSink::push`
758                // and will not be sent this round (or any future one,
759                // since the hook consumed them).
760                let mut ice_buf: crate::utils::storage::Vec<heapless::String<CAND_LEN>, MAX_CAND> =
761                    crate::utils::storage::Vec::new();
762                {
763                    let mut sink = VecCandidateSink { buf: &mut ice_buf };
764                    self.hooks
765                        .take_ice_candidates(session_id, &mut sink)
766                        .await
767                        .map_err(Error::from)?;
768                }
769                exchange
770                    .web_rtc_transport_requestor()
771                    .ice_candidates(endpoint, |req| {
772                        let req = req.web_rtc_session_id(session_id)?;
773                        let mut arr = req.ice_candidates()?;
774                        for cand in ice_buf.iter() {
775                            arr = arr
776                                .push()?
777                                .candidate(cand.as_str())?
778                                .sdp_mid(Nullable::none())?
779                                .sdpm_line_index(Nullable::none())?
780                                .end()?;
781                        }
782                        arr.end()?.end()
783                    })
784                    .await?;
785            }
786            OutboundWork::End { session_id, reason } => {
787                let session_id = *session_id;
788                let reason = *reason;
789                exchange
790                    .web_rtc_transport_requestor()
791                    .end(endpoint, |req| {
792                        req.web_rtc_session_id(session_id)?.reason(reason)?.end()
793                    })
794                    .await?;
795            }
796        }
797        Ok(())
798    }
799}
800
801// ──────────────────────────────────────────────────────────────────────
802// ClusterAsyncHandler
803// ──────────────────────────────────────────────────────────────────────
804
805impl<
806        H: WebRtcHooks,
807        const N_SESSIONS: usize,
808        const SDP_LEN: usize,
809        const OUT_LEN: usize,
810        const CAND_LEN: usize,
811        const MAX_CAND: usize,
812    > decl::ClusterAsyncHandler
813    for WebRtcProvHandler<H, N_SESSIONS, SDP_LEN, OUT_LEN, CAND_LEN, MAX_CAND>
814{
815    const CLUSTER: Cluster<'static> = Self::CLUSTER;
816
817    fn dataver(&self) -> u32 {
818        self.dataver.get()
819    }
820
821    fn dataver_changed(&self) {
822        self.dataver.changed();
823    }
824
825    async fn run(&self, ctx: impl HandlerContext) -> Result<(), Error> {
826        // Drain loop: the hook parks in `next_outbound().await` until it
827        // has something to say, then we push it to the paired
828        // `WebRTCTransportRequestor` on the remote. Errors are logged
829        // (when a `log`/`defmt` feature is enabled) and the loop
830        // continues — a single failed push must not take down the whole
831        // cluster.
832        loop {
833            let work = self.hooks.next_outbound().await;
834            if let Err(err) = self.push_outbound(&ctx, work).await {
835                warn!("webrtc_prov: outbound push failed: {}", err);
836            }
837        }
838    }
839
840    async fn current_sessions<P: TLVBuilderParent>(
841        &self,
842        ctx: impl ReadContext,
843        builder: ArrayAttributeRead<
844            WebRTCSessionStructArrayBuilder<P>,
845            WebRTCSessionStructBuilder<P>,
846        >,
847    ) -> Result<P, Error> {
848        let attr = ctx.attr();
849
850        // Snapshot the filtered list so we don't hold the Mutex across
851        // any `?` bail-outs inside the builder chain.
852        let mut snapshot: Vec<SessionEntry, N_SESSIONS> = Vec::new();
853        self.sessions.lock(|cell| {
854            for s in cell.borrow().iter() {
855                if !attr.fab_filter || s.fab_idx == attr.fab_idx {
856                    // `push` cannot fail: the snapshot is the same size
857                    // as the source.
858                    let _ = snapshot.push(*s);
859                }
860            }
861        });
862
863        match builder {
864            ArrayAttributeRead::ReadAll(mut arr) => {
865                for s in &snapshot {
866                    arr = encode_session_struct(arr.push()?, s)?;
867                }
868                arr.end()
869            }
870            ArrayAttributeRead::ReadOne(index, b) => {
871                let s = snapshot
872                    .get(index as usize)
873                    .ok_or(Error::from(ErrorCode::ConstraintError))?;
874                encode_session_struct(b, s)
875            }
876            ArrayAttributeRead::ReadNone(b) => b.end(),
877        }
878    }
879
880    async fn handle_solicit_offer<P: TLVBuilderParent>(
881        &self,
882        ctx: impl InvokeContext,
883        request: decl::SolicitOfferRequest<'_>,
884        response: decl::SolicitOfferResponseBuilder<P>,
885    ) -> Result<P, Error> {
886        let cmd = ctx.cmd();
887        let fab_idx = cmd.fab_idx;
888        let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
889
890        let params = OfferParams {
891            stream_usage: request.stream_usage()?,
892            originating_endpoint_id: request.originating_endpoint_id()?,
893            video_stream_id: request.video_stream_id()?.map(|n| n.into_option()),
894            audio_stream_id: request.audio_stream_id()?.map(|n| n.into_option()),
895            metadata_enabled: request.metadata_enabled()?.unwrap_or(false),
896        };
897
898        let session_id = self.allocate_id();
899
900        let outcome = self
901            .hooks
902            .on_solicit_offer(session_id, &params)
903            .await
904            .map_err(Error::from)?;
905        let state = if outcome.deferred {
906            SessionState::AwaitingDeferredOffer
907        } else {
908            SessionState::AwaitingAnswer
909        };
910
911        self.upsert_session(SessionEntry {
912            id: session_id,
913            fab_idx,
914            peer_node_id,
915            peer_endpoint_id: params.originating_endpoint_id,
916            stream_usage: params.stream_usage,
917            video_stream_id: outcome.video_stream_id,
918            audio_stream_id: outcome.audio_stream_id,
919            metadata_enabled: params.metadata_enabled,
920            state,
921        })?;
922        ctx.notify_own_attr_changed(AttributeId::CurrentSessions as _);
923
924        response
925            .web_rtc_session_id(session_id)?
926            .deferred_offer(outcome.deferred)?
927            .video_stream_id(wrap_opt_u16_nullable(outcome.video_stream_id))?
928            .audio_stream_id(wrap_opt_u16_nullable(outcome.audio_stream_id))?
929            .end()
930    }
931
932    async fn handle_provide_offer<P: TLVBuilderParent>(
933        &self,
934        ctx: impl InvokeContext,
935        request: decl::ProvideOfferRequest<'_>,
936        response: decl::ProvideOfferResponseBuilder<P>,
937    ) -> Result<P, Error> {
938        let cmd = ctx.cmd();
939        let fab_idx = cmd.fab_idx;
940        let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
941
942        let sdp = request.sdp()?;
943        if sdp.len() > SDP_LEN {
944            return Err(ErrorCode::ConstraintError.into());
945        }
946
947        let params = OfferParams {
948            stream_usage: request.stream_usage()?,
949            originating_endpoint_id: request.originating_endpoint_id()?,
950            video_stream_id: request.video_stream_id()?.map(|n| n.into_option()),
951            audio_stream_id: request.audio_stream_id()?.map(|n| n.into_option()),
952            metadata_enabled: request.metadata_enabled()?.unwrap_or(false),
953        };
954
955        // Spec: NULL session ID = allocate new; non-null = existing session.
956        let session_id = match request.web_rtc_session_id()?.into_option() {
957            None => self.allocate_id(),
958            Some(id) => {
959                let s = self
960                    .session_copy(id)
961                    .ok_or(Error::from(ErrorCode::NotFound))?;
962                self.check_peer(&s, fab_idx, peer_node_id)?;
963                id
964            }
965        };
966
967        let outcome = self
968            .hooks
969            .on_offer(session_id, sdp, &params)
970            .await
971            .map_err(Error::from)?;
972
973        self.upsert_session(SessionEntry {
974            id: session_id,
975            fab_idx,
976            peer_node_id,
977            peer_endpoint_id: params.originating_endpoint_id,
978            stream_usage: params.stream_usage,
979            video_stream_id: outcome.video_stream_id,
980            audio_stream_id: outcome.audio_stream_id,
981            metadata_enabled: params.metadata_enabled,
982            state: SessionState::Established,
983        })?;
984        ctx.notify_own_attr_changed(AttributeId::CurrentSessions as _);
985
986        response
987            .web_rtc_session_id(session_id)?
988            .video_stream_id(wrap_opt_u16_nullable(outcome.video_stream_id))?
989            .audio_stream_id(wrap_opt_u16_nullable(outcome.audio_stream_id))?
990            .end()
991    }
992
993    async fn handle_provide_answer(
994        &self,
995        ctx: impl InvokeContext,
996        request: decl::ProvideAnswerRequest<'_>,
997    ) -> Result<(), Error> {
998        let cmd = ctx.cmd();
999        let fab_idx = cmd.fab_idx;
1000        let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
1001
1002        let session_id = request.web_rtc_session_id()?;
1003        let sdp = request.sdp()?;
1004        if sdp.len() > SDP_LEN {
1005            return Err(ErrorCode::ConstraintError.into());
1006        }
1007
1008        let session = self
1009            .session_copy(session_id)
1010            .ok_or(Error::from(ErrorCode::NotFound))?;
1011        self.check_peer(&session, fab_idx, peer_node_id)?;
1012
1013        // Spec: ProvideAnswer is only valid when we sent the Offer.
1014        match session.state {
1015            SessionState::AwaitingAnswer | SessionState::AwaitingDeferredOffer => {}
1016            SessionState::Established => return Err(ErrorCode::InvalidAction.into()),
1017        }
1018
1019        self.hooks
1020            .on_answer(session_id, sdp)
1021            .await
1022            .map_err(Error::from)?;
1023        self.set_state(session_id, SessionState::Established);
1024        Ok(())
1025    }
1026
1027    async fn handle_provide_ice_candidates(
1028        &self,
1029        ctx: impl InvokeContext,
1030        request: decl::ProvideICECandidatesRequest<'_>,
1031    ) -> Result<(), Error> {
1032        let cmd = ctx.cmd();
1033        let fab_idx = cmd.fab_idx;
1034        let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
1035
1036        let session_id = request.web_rtc_session_id()?;
1037        let session = self
1038            .session_copy(session_id)
1039            .ok_or(Error::from(ErrorCode::NotFound))?;
1040        self.check_peer(&session, fab_idx, peer_node_id)?;
1041
1042        let candidates = request.ice_candidates()?;
1043        self.hooks
1044            .on_ice_candidates(session_id, &candidates)
1045            .await
1046            .map_err(Error::from)
1047    }
1048
1049    async fn handle_end_session(
1050        &self,
1051        ctx: impl InvokeContext,
1052        request: decl::EndSessionRequest<'_>,
1053    ) -> Result<(), Error> {
1054        let cmd = ctx.cmd();
1055        let fab_idx = cmd.fab_idx;
1056        let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
1057
1058        let session_id = request.web_rtc_session_id()?;
1059        let reason = request.reason()?;
1060
1061        let session = self
1062            .session_copy(session_id)
1063            .ok_or(Error::from(ErrorCode::NotFound))?;
1064        self.check_peer(&session, fab_idx, peer_node_id)?;
1065
1066        // Best-effort notify the hooks; even if they fail we drop the row.
1067        let _ = self.hooks.on_end_session(session_id, reason).await;
1068        self.remove_session(session_id);
1069        ctx.notify_own_attr_changed(AttributeId::CurrentSessions as _);
1070        Ok(())
1071    }
1072}
1073
1074// ──────────────────────────────────────────────────────────────────────
1075// Helpers
1076// ──────────────────────────────────────────────────────────────────────
1077
1078/// Extract the peer node ID from an incoming exchange's session state.
1079fn exchange_peer_node_id(exchange: &Exchange<'_>) -> Result<u64, Error> {
1080    exchange.with_state(|state| {
1081        let sess = exchange.id().session(&mut state.sessions);
1082        sess.get_peer_node_id().ok_or(ErrorCode::Invalid.into())
1083    })
1084}
1085
1086/// Convert `Option<u16>` (internal storage) into `Option<Nullable<u16>>`
1087/// suitable for response builders that treat the field as BOTH optional
1088/// and nullable. We always emit the field (`Some(_)`); `None` stream ID
1089/// is represented as `Nullable::none()`.
1090fn wrap_opt_u16_nullable(v: Option<u16>) -> Option<Nullable<u16>> {
1091    Some(match v {
1092        Some(x) => Nullable::some(x),
1093        None => Nullable::none(),
1094    })
1095}
1096
1097/// Emit a single `WebRTCSessionStruct` into the TLV stream.
1098fn encode_session_struct<P: TLVBuilderParent>(
1099    b: WebRTCSessionStructBuilder<P>,
1100    s: &SessionEntry,
1101) -> Result<P, Error> {
1102    let video = match s.video_stream_id {
1103        Some(x) => Nullable::some(x),
1104        None => Nullable::none(),
1105    };
1106    let audio = match s.audio_stream_id {
1107        Some(x) => Nullable::some(x),
1108        None => Nullable::none(),
1109    };
1110    b.id(s.id)?
1111        .peer_node_id(s.peer_node_id)?
1112        .peer_endpoint_id(s.peer_endpoint_id)?
1113        .stream_usage(s.stream_usage)?
1114        .video_stream_id(video)?
1115        .audio_stream_id(audio)?
1116        .metadata_enabled(s.metadata_enabled)?
1117        .video_streams()?
1118        .none()
1119        .audio_streams()?
1120        .none()
1121        .fabric_index(Some(s.fab_idx))?
1122        .end()
1123}