rs_matter/dm/clusters/app/webrtc_prov.rs
1/*
2 *
3 * Copyright (c) 2026 Project CHIP Authors
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! The WebRTC Transport Provider cluster (0x0553).
19//!
20//! Implements the server side of the Matter WebRTC signalling used by Matter
21//! cameras to expose audio/video streams to Matter controllers via the standard
22//! WebRTC Offer/Answer flow and trickle-ICE exchange.
23//!
24//! # Architecture (Pattern B1 — "Hooks")
25//!
26//! [`WebRtcProvHandler`] implements the spec-defined session table and
27//! command state machine. All media-specific work (SDP generation, ICE
28//! gathering, media-source lifecycle) is delegated to a user-supplied
29//! [`WebRtcHooks`] implementation.
30//!
31//! ```text
32//! ┌───────────────────┐ ClusterAsyncHandler ┌────────────────┐
33//! │ │◀── inbound commands ─────│ rs-matter IM │
34//! │ WebRtcProvHandler│ │ dispatcher │
35//! └───────┬───────────┘ └────────────────┘
36//! │ delegates SDP/ICE work
37//! ▼
38//! ┌───────────────────┐
39//! │ WebRtcHooks │ user-supplied (e.g. str0m-based for std)
40//! └───────────────────┘
41//! ```
42//!
43//! # Const generics
44//!
45//! Every static allocation is explicit at the call-site:
46//!
47//! * `N_SESSIONS` — maximum concurrent WebRTC sessions held in the table.
48//! The Matter spec mandates MinLimit = 3; typical values are 3..=8.
49//! * `SDP_LEN` — maximum SDP blob size, in bytes. Full WebRTC SDPs range
50//! 2–8 KiB depending on codec support.
51//! * `OUT_LEN` — scratch-buffer size, in bytes, for the outbound invoke
52//! payload produced by [`WebRtcProvHandler::run`]. Must be large enough
53//! for a trickle-ICE batch; typical value 1 KiB.
54//!
55//! On `no_std` / embedded targets, [`WebRtcProvHandler::run`] holds an
56//! `[u8; OUT_LEN]` plus, on the `Offer`/`Answer` paths, an `[u8; SDP_LEN]`
57//! in its async-future state. Pick `SDP_LEN` to fit the smallest SDP your
58//! deployment will negotiate (e.g. 2 KiB for a single H.264 + Opus track)
59//! to keep the future small enough for the executor's task slot.
60//!
61//! # Scope
62//!
63//! * Inbound command state machine: fully implemented for
64//! `SolicitOffer` / `ProvideOffer` / `ProvideAnswer` /
65//! `ProvideICECandidates` / `EndSession`.
66//! * Fabric-scoped `CurrentSessions` attribute.
67//! * `WebRtcHooks` trait with `async` hooks for the media-plane work.
68//! * Outbound push to `WebRTCTransportRequestor`:
69//! [`OutboundWork::Offer`] (deferred-offer flow — camera-initiated
70//! SDP Offer following an earlier `SolicitOffer`),
71//! [`OutboundWork::Answer`] (SDP Answer for `ProvideOffer`),
72//! [`OutboundWork::IceCandidates`] (trickle ICE to traverse NAT) and
73//! [`OutboundWork::End`] (camera-initiated teardown) are driven from
74//! [`WebRtcProvHandler::run`] via [`WebRtcHooks::next_outbound`].
75
76use core::cell::{Cell, RefCell};
77use core::future::Future;
78
79use crate::dm::clusters::decl::globals::{
80 ICECandidateStruct, StreamUsageEnum, WebRTCEndReasonEnum, WebRTCSessionStructArrayBuilder,
81 WebRTCSessionStructBuilder,
82};
83use crate::dm::{
84 ArrayAttributeRead, Cluster, Dataver, EndptId, HandlerContext, InvokeContext, ReadContext,
85};
86use crate::error::{Error, ErrorCode};
87use crate::tlv::{Nullable, TLVArray, TLVBuilderParent};
88use crate::transport::exchange::Exchange;
89use crate::utils::storage::Vec;
90use crate::utils::sync::blocking::Mutex;
91use crate::with;
92
93use super::super::decl::web_rtc_transport_provider as decl;
94use super::super::decl::web_rtc_transport_requestor::WebRtcTransportRequestorClient;
95
96#[allow(unused_imports)]
97pub use crate::dm::clusters::decl::web_rtc_transport_provider::*;
98
99/// Errors surfaced by [`WebRtcHooks`] implementations. These map to
100/// Matter cluster-status codes.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102#[cfg_attr(feature = "defmt", derive(defmt::Format))]
103pub enum WebRtcError {
104 /// `INVALID_IN_STATE` — the command cannot be processed in the current
105 /// session state (e.g. `ProvideAnswer` before an Offer was sent).
106 InvalidInState,
107 /// `INVALID_COMMAND` — the command parameters are structurally invalid
108 /// (e.g. `SolicitOffer` with no stream IDs at all).
109 InvalidCommand,
110 /// `DYNAMIC_CONSTRAINT_ERROR` — the request refers to stream IDs,
111 /// codecs or capabilities the hooks cannot satisfy.
112 DynamicConstraint,
113 /// `RESOURCE_EXHAUSTED` — the media source cannot accept a new session.
114 ResourceExhausted,
115 /// `FAILURE` — any other hooks-level failure.
116 Failure,
117}
118
119impl From<WebRtcError> for Error {
120 fn from(e: WebRtcError) -> Self {
121 match e {
122 WebRtcError::InvalidInState => ErrorCode::InvalidAction.into(),
123 WebRtcError::InvalidCommand => ErrorCode::InvalidCommand.into(),
124 WebRtcError::DynamicConstraint => ErrorCode::DynamicConstraintError.into(),
125 WebRtcError::ResourceExhausted => ErrorCode::ResourceExhausted.into(),
126 WebRtcError::Failure => ErrorCode::Failure.into(),
127 }
128 }
129}
130
131/// Owned parameters of a `SolicitOffer` / `ProvideOffer` request.
132///
133/// Owned (not TLV-borrowed) so it can cross `.await` points when passed
134/// to async hooks.
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
136#[cfg_attr(feature = "defmt", derive(defmt::Format))]
137pub struct OfferParams {
138 /// The stream usage requested by the peer.
139 pub stream_usage: StreamUsageEnum,
140 /// Originating endpoint of the controller hosting the
141 /// `WebRTCTransportRequestor`.
142 pub originating_endpoint_id: EndptId,
143 /// Requested video-stream ID.
144 /// - `None` → field absent from request
145 /// - `Some(None)` → explicit NULL
146 /// - `Some(Some(id))` → concrete ID
147 pub video_stream_id: Option<Option<u16>>,
148 /// Requested audio-stream ID, same encoding as `video_stream_id`.
149 pub audio_stream_id: Option<Option<u16>>,
150 /// Whether the peer opted in to metadata delivery.
151 pub metadata_enabled: bool,
152}
153
154/// Outcome of [`WebRtcHooks::on_solicit_offer`].
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156#[cfg_attr(feature = "defmt", derive(defmt::Format))]
157pub struct SolicitOutcome {
158 /// If `false`, the `SolicitOfferResponse` has `deferredOffer = false`
159 /// and the hooks are expected to enqueue [`OutboundWork::Offer`]
160 /// shortly so the Offer can be pushed via
161 /// `WebRTCTransportRequestor::Offer`. If `true`, the media source was
162 /// not ready; the hooks must enqueue the Offer when it becomes
163 /// available.
164 pub deferred: bool,
165 /// Resolved video-stream ID (absent = omit field in response).
166 pub video_stream_id: Option<u16>,
167 /// Resolved audio-stream ID (absent = omit field in response).
168 pub audio_stream_id: Option<u16>,
169}
170
171/// Outcome of [`WebRtcHooks::on_offer`].
172///
173/// The SDP Answer itself is buffered by the hooks implementation and
174/// pushed asynchronously via [`OutboundWork::Answer`] /
175/// [`WebRtcHooks::fill_answer`].
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
177#[cfg_attr(feature = "defmt", derive(defmt::Format))]
178pub struct AnswerOutcome {
179 /// Resolved video-stream ID, or NULL (= session has no video).
180 pub video_stream_id: Option<u16>,
181 /// Resolved audio-stream ID, or NULL (= session has no audio).
182 pub audio_stream_id: Option<u16>,
183}
184
185/// A descriptor of an outbound invocation, returned by
186/// [`WebRtcHooks::next_outbound`] and dispatched by
187/// [`WebRtcProvHandler::run`].
188///
189/// The payload itself is NOT carried here: for [`OutboundWork::IceCandidates`]
190/// the handler calls [`WebRtcHooks::take_ice_candidates`] before opening
191/// the invoke, snapshotting the queue once into a stack-allocated buffer;
192/// the sync build closure then iterates that snapshot, so MRP retransmits
193/// re-emit the same TLV array.
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
195#[cfg_attr(feature = "defmt", derive(defmt::Format))]
196pub enum OutboundWork {
197 /// `WebRTCTransportRequestor::Offer` — deferred-offer flow:
198 /// the controller previously called `SolicitOffer` and the device
199 /// returned `deferredOffer = true`; once the local media source
200 /// produces an SDP Offer, the hooks enqueue this work item so the
201 /// handler pushes it to the controller. The handler invokes
202 /// [`WebRtcHooks::take_offer_sdp`] to obtain the SDP bytes.
203 Offer {
204 /// Target session.
205 session_id: u16,
206 },
207 /// `WebRTCTransportRequestor::Answer` — push the SDP Answer for a
208 /// session whose Offer arrived via `ProvideOffer`. The handler
209 /// invokes [`WebRtcHooks::fill_answer`] to obtain the SDP bytes.
210 Answer {
211 /// Target session.
212 session_id: u16,
213 },
214 /// `WebRTCTransportRequestor::ICECandidates` — trickle a batch of
215 /// locally-gathered ICE candidates to the peer (needed for NAT
216 /// traversal).
217 IceCandidates {
218 /// Target session.
219 session_id: u16,
220 },
221 /// `WebRTCTransportRequestor::End` — notify the peer that we are
222 /// tearing down the session (e.g. camera power-down, media source
223 /// lost).
224 End {
225 /// Target session.
226 session_id: u16,
227 /// Reason for termination.
228 reason: WebRTCEndReasonEnum,
229 },
230}
231
232/// Receiver for [`WebRtcHooks::take_ice_candidates`]. The hook pushes
233/// one candidate SDP string at a time; storage (size, layout, backing
234/// allocator) is owned by the caller — typically
235/// [`WebRtcProvHandler::push_outbound`], which stack-allocates a
236/// bounded buffer for the duration of one outbound `IceCandidates`
237/// invoke.
238pub trait IceCandidateSink {
239 /// Append `candidate` to the snapshot. Returns
240 /// [`WebRtcError::ResourceExhausted`] if the sink's bounded
241 /// storage is full — the hook MAY choose to drop further
242 /// candidates and continue (deliver next round) or surface the
243 /// error to the caller.
244 fn push(&mut self, candidate: &str) -> Result<(), WebRtcError>;
245}
246
247/// Default `IceCandidateSink` backed by a `crate::utils::storage::Vec`
248/// of bounded-length `heapless::String`s. `WebRtcProvHandler::push_outbound`
249/// stack-allocates one of these per outbound `IceCandidates` invoke and
250/// passes a `&mut` to the hook.
251struct VecCandidateSink<'a, const CAND_LEN: usize, const MAX_CAND: usize> {
252 buf: &'a mut Vec<heapless::String<CAND_LEN>, MAX_CAND>,
253}
254
255impl<const CAND_LEN: usize, const MAX_CAND: usize> IceCandidateSink
256 for VecCandidateSink<'_, CAND_LEN, MAX_CAND>
257{
258 fn push(&mut self, candidate: &str) -> Result<(), WebRtcError> {
259 let mut s = heapless::String::<CAND_LEN>::new();
260 s.push_str(candidate)
261 .map_err(|_| WebRtcError::ResourceExhausted)?;
262 self.buf
263 .push(s)
264 .map_err(|_| WebRtcError::ResourceExhausted)?;
265 Ok(())
266 }
267}
268
269/// The device-side WebRTC media-plane plumbing. Cluster state (session
270/// table, dataver, attribute/command dispatch) is owned by
271/// [`WebRtcProvHandler`]; this trait captures every side-effect the
272/// spec commands have on the actual WebRTC peer.
273///
274/// All methods are `async` and invoked inline from the handler's command
275/// dispatch. Implementations MUST NOT perform blocking I/O.
276pub trait WebRtcHooks {
277 /// Handle an inbound `SolicitOffer` command. The session ID has been
278 /// allocated; the hook decides whether to respond with `deferred = false`
279 /// (in which case it MUST shortly enqueue an [`OutboundWork::Offer`] so
280 /// the handler can push the SDP Offer via
281 /// `WebRTCTransportRequestor::Offer`) or `deferred = true` (Offer
282 /// pushed when the media source becomes ready).
283 ///
284 /// `SolicitOfferResponse` itself does NOT carry an SDP — see Matter Spec -
285 /// so this hook never returns Offer bytes inline.
286 async fn on_solicit_offer(
287 &self,
288 session_id: u16,
289 params: &OfferParams,
290 ) -> Result<SolicitOutcome, WebRtcError>;
291
292 /// Handle an inbound `ProvideOffer` command. The SDP Offer has been
293 /// validated as UTF-8; the hooks buffers the SDP Answer internally
294 /// and enqueues an [`OutboundWork::Answer`] so the handler can push
295 /// it to the peer's `WebRTCTransportRequestor::Answer` command.
296 async fn on_offer(
297 &self,
298 session_id: u16,
299 sdp: &str,
300 params: &OfferParams,
301 ) -> Result<AnswerOutcome, WebRtcError>;
302
303 /// Handle an inbound `ProvideAnswer` for a session whose Offer THIS
304 /// node originally sent (deferred-offer flow).
305 async fn on_answer(&self, session_id: u16, sdp: &str) -> Result<(), WebRtcError>;
306
307 /// Handle an inbound batch of remote ICE candidates.
308 async fn on_ice_candidates(
309 &self,
310 session_id: u16,
311 candidates: &TLVArray<'_, ICECandidateStruct<'_>>,
312 ) -> Result<(), WebRtcError>;
313
314 /// Handle an inbound `EndSession`. The session entry is removed from
315 /// the table immediately after this call returns.
316 async fn on_end_session(
317 &self,
318 session_id: u16,
319 reason: WebRTCEndReasonEnum,
320 ) -> Result<(), WebRtcError>;
321
322 /// Await the next outbound invocation the camera wants to push to the
323 /// controller. Called in a tight loop from [`WebRtcProvHandler::run`]:
324 /// when the hook has nothing to send it MUST `.await` a future that
325 /// never completes (e.g. [`core::future::pending`]) or parks on an
326 /// internal signal.
327 ///
328 /// A default implementation is provided that parks forever, i.e.
329 /// opts the implementor out of outbound support. Override to enable
330 /// trickle-ICE and camera-initiated session end.
331 async fn next_outbound(&self) -> OutboundWork {
332 core::future::pending().await
333 }
334
335 /// Snapshot-and-consume the queued ICE candidates for
336 /// `session_id` into `out`, in queue order. Called by the handler
337 /// **outside** the sync build closure of an outbound
338 /// `IceCandidates` invoke — analogous to [`Self::take_offer_sdp`] /
339 /// [`Self::take_answer_sdp`]. The handler then iterates the
340 /// snapshot inside the (sync, idempotent) build closure to write
341 /// the wire array; MRP retransmits re-iterate the same snapshot,
342 /// so the closure stays idempotent without the hook seeing the
343 /// retransmit.
344 ///
345 /// If the invoke fails after this hook returns, the snapshotted
346 /// candidates are lost — same semantics as the SDP paths. The
347 /// implementation MAY decide to keep a backup if it wants
348 /// at-least-once delivery, but the default contract is at-most-once.
349 ///
350 /// Default: returns `Err(WebRtcError::InvalidInState)` — override
351 /// alongside [`Self::next_outbound`] when emitting
352 /// [`OutboundWork::IceCandidates`].
353 async fn take_ice_candidates(
354 &self,
355 _session_id: u16,
356 _out: &mut dyn IceCandidateSink,
357 ) -> Result<(), WebRtcError> {
358 Err(WebRtcError::InvalidInState)
359 }
360
361 /// Write the SDP Answer bytes for `session_id` into the supplied
362 /// buffer, returning the number of bytes written. Called by the
363 /// handler immediately after [`Self::next_outbound`] returns
364 /// [`OutboundWork::Answer`].
365 ///
366 /// If the buffer is too small the hooks SHOULD return
367 /// [`WebRtcError::ResourceExhausted`]; if no Answer is queued for
368 /// `session_id` return [`WebRtcError::InvalidInState`].
369 ///
370 /// Default: errors out — override when overriding [`Self::on_offer`].
371 async fn take_answer_sdp(
372 &self,
373 _session_id: u16,
374 _sdp_out: &mut [u8],
375 ) -> Result<usize, WebRtcError> {
376 Err(WebRtcError::InvalidInState)
377 }
378
379 /// Write the SDP Offer bytes for `session_id` into the supplied
380 /// buffer, returning the number of bytes written. Called by the
381 /// handler immediately after [`Self::next_outbound`] returns
382 /// [`OutboundWork::Offer`].
383 ///
384 /// If the buffer is too small the hooks SHOULD return
385 /// [`WebRtcError::ResourceExhausted`]; if no Offer is queued for
386 /// `session_id` return [`WebRtcError::InvalidInState`].
387 ///
388 /// Default: errors out — override to enable the deferred-offer
389 /// flow ([`SolicitOutcome::deferred = true`](SolicitOutcome) on a
390 /// `SolicitOffer` response, followed by an [`OutboundWork::Offer`]
391 /// from [`Self::next_outbound`]).
392 async fn take_offer_sdp(
393 &self,
394 _session_id: u16,
395 _sdp_out: &mut [u8],
396 ) -> Result<usize, WebRtcError> {
397 Err(WebRtcError::InvalidInState)
398 }
399}
400
401impl<T> WebRtcHooks for &T
402where
403 T: WebRtcHooks,
404{
405 fn on_solicit_offer(
406 &self,
407 session_id: u16,
408 params: &OfferParams,
409 ) -> impl Future<Output = Result<SolicitOutcome, WebRtcError>> {
410 (*self).on_solicit_offer(session_id, params)
411 }
412
413 fn on_offer(
414 &self,
415 session_id: u16,
416 sdp: &str,
417 params: &OfferParams,
418 ) -> impl Future<Output = Result<AnswerOutcome, WebRtcError>> {
419 (*self).on_offer(session_id, sdp, params)
420 }
421
422 fn on_answer(
423 &self,
424 session_id: u16,
425 sdp: &str,
426 ) -> impl Future<Output = Result<(), WebRtcError>> {
427 (*self).on_answer(session_id, sdp)
428 }
429
430 fn on_ice_candidates(
431 &self,
432 session_id: u16,
433 candidates: &TLVArray<'_, ICECandidateStruct<'_>>,
434 ) -> impl Future<Output = Result<(), WebRtcError>> {
435 (*self).on_ice_candidates(session_id, candidates)
436 }
437
438 fn on_end_session(
439 &self,
440 session_id: u16,
441 reason: WebRTCEndReasonEnum,
442 ) -> impl Future<Output = Result<(), WebRtcError>> {
443 (*self).on_end_session(session_id, reason)
444 }
445
446 fn next_outbound(&self) -> impl Future<Output = OutboundWork> {
447 (*self).next_outbound()
448 }
449
450 fn take_ice_candidates(
451 &self,
452 session_id: u16,
453 out: &mut dyn IceCandidateSink,
454 ) -> impl Future<Output = Result<(), WebRtcError>> {
455 (*self).take_ice_candidates(session_id, out)
456 }
457
458 fn take_answer_sdp(
459 &self,
460 session_id: u16,
461 sdp_out: &mut [u8],
462 ) -> impl Future<Output = Result<usize, WebRtcError>> {
463 (*self).take_answer_sdp(session_id, sdp_out)
464 }
465
466 fn take_offer_sdp(
467 &self,
468 session_id: u16,
469 sdp_out: &mut [u8],
470 ) -> impl Future<Output = Result<usize, WebRtcError>> {
471 (*self).take_offer_sdp(session_id, sdp_out)
472 }
473}
474
475/// Internal session-state tracked by the handler.
476#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
477#[cfg_attr(feature = "defmt", derive(defmt::Format))]
478enum SessionState {
479 /// Created via `SolicitOffer` with a deferred Offer; awaiting the
480 /// hooks to enqueue [`OutboundWork::Offer`].
481 AwaitingDeferredOffer,
482 /// Offer was sent (immediately via SolicitOfferResponse or later via
483 /// the deferred flow). Awaiting `ProvideAnswer`.
484 AwaitingAnswer,
485 /// `ProvideOffer` arrived peer-initiated and was answered inline, OR
486 /// `ProvideAnswer` arrived for an already-sent Offer. Signalling
487 /// done; ICE candidates may still flow.
488 Established,
489}
490
491/// Owned session-table row. Kept small + `Copy` so snapshotting for
492/// iteration is cheap.
493#[derive(Debug, Clone, Copy)]
494#[cfg_attr(feature = "defmt", derive(defmt::Format))]
495struct SessionEntry {
496 id: u16,
497 fab_idx: u8,
498 peer_node_id: u64,
499 peer_endpoint_id: EndptId,
500 stream_usage: StreamUsageEnum,
501 /// `None` = NULL in spec; `Some(x)` = concrete stream ID.
502 video_stream_id: Option<u16>,
503 /// `None` = NULL in spec; `Some(x)` = concrete stream ID.
504 audio_stream_id: Option<u16>,
505 metadata_enabled: bool,
506 state: SessionState,
507}
508
509/// The WebRTC Transport Provider cluster handler.
510///
511/// See the [module documentation](self) for the architecture and usage.
512pub struct WebRtcProvHandler<
513 H: WebRtcHooks,
514 const N_SESSIONS: usize,
515 const SDP_LEN: usize,
516 const OUT_LEN: usize,
517 const CAND_LEN: usize,
518 const MAX_CAND: usize,
519> {
520 dataver: Dataver,
521 endpoint_id: EndptId,
522 hooks: H,
523 sessions: Mutex<RefCell<Vec<SessionEntry, N_SESSIONS>>>,
524 next_id: Mutex<Cell<u16>>,
525}
526
527impl<
528 H: WebRtcHooks,
529 const N_SESSIONS: usize,
530 const SDP_LEN: usize,
531 const OUT_LEN: usize,
532 const CAND_LEN: usize,
533 const MAX_CAND: usize,
534 > WebRtcProvHandler<H, N_SESSIONS, SDP_LEN, OUT_LEN, CAND_LEN, MAX_CAND>
535{
536 /// Cluster metadata exposed to the data-model dispatcher.
537 pub const CLUSTER: Cluster<'static> = decl::FULL_CLUSTER
538 .with_revision(2)
539 .with_attrs(with!(required))
540 .with_cmds(with!(
541 decl::CommandId::SolicitOffer
542 | decl::CommandId::ProvideOffer
543 | decl::CommandId::ProvideAnswer
544 | decl::CommandId::ProvideICECandidates
545 | decl::CommandId::EndSession
546 ));
547
548 /// Construct a new handler.
549 pub const fn new(dataver: Dataver, endpoint_id: EndptId, hooks: H) -> Self {
550 Self {
551 dataver,
552 endpoint_id,
553 hooks,
554 sessions: Mutex::new(RefCell::new(Vec::new())),
555 next_id: Mutex::new(Cell::new(1)),
556 }
557 }
558
559 /// Wrap in the generic async adaptor for registration with a
560 /// `rs-matter` `Node`.
561 pub const fn adapt(self) -> decl::HandlerAsyncAdaptor<Self> {
562 decl::HandlerAsyncAdaptor(self)
563 }
564
565 /// Remove every session owned by the given fabric index. Callers MUST
566 /// invoke this when a fabric is removed (spec §"Fabric-scoped data"
567 /// for fabric removal). Hooks are NOT notified.
568 pub fn remove_fabric_sessions(&self, fab_idx: u8) {
569 let changed = self.sessions.lock(|cell| {
570 let mut sessions = cell.borrow_mut();
571 let before = sessions.len();
572 sessions.retain(|s| s.fab_idx != fab_idx);
573 before != sessions.len()
574 });
575 if changed {
576 self.dataver.changed();
577 }
578 }
579
580 /// Get the endpoint this handler is mounted on.
581 pub const fn endpoint_id(&self) -> EndptId {
582 self.endpoint_id
583 }
584
585 fn allocate_id(&self) -> u16 {
586 self.sessions.lock(|cell| {
587 let sessions = cell.borrow();
588 self.next_id.lock(|n| loop {
589 let candidate = n.get();
590 // Wrap and skip zero (spec: session ID MUST be non-zero).
591 let next = if candidate == u16::MAX {
592 1
593 } else {
594 candidate + 1
595 };
596 n.set(next);
597 if candidate != 0 && !sessions.iter().any(|s| s.id == candidate) {
598 return candidate;
599 }
600 })
601 })
602 }
603
604 fn session_copy(&self, id: u16) -> Option<SessionEntry> {
605 self.sessions
606 .lock(|cell| cell.borrow().iter().find(|s| s.id == id).copied())
607 }
608
609 fn upsert_session(&self, entry: SessionEntry) -> Result<(), Error> {
610 self.sessions.lock(|cell| {
611 let mut sessions = cell.borrow_mut();
612 if let Some(existing) = sessions.iter_mut().find(|s| s.id == entry.id) {
613 *existing = entry;
614 Ok(())
615 } else {
616 sessions
617 .push(entry)
618 .map_err(|_| Error::from(ErrorCode::ResourceExhausted))
619 }
620 })
621 }
622
623 fn remove_session(&self, id: u16) {
624 self.sessions.lock(|cell| {
625 let mut sessions = cell.borrow_mut();
626 sessions.retain(|s| s.id != id);
627 });
628 }
629
630 fn set_state(&self, id: u16, state: SessionState) {
631 self.sessions.lock(|cell| {
632 if let Some(s) = cell.borrow_mut().iter_mut().find(|s| s.id == id) {
633 s.state = state;
634 }
635 });
636 }
637
638 fn check_peer(&self, s: &SessionEntry, fab_idx: u8, peer: u64) -> Result<(), Error> {
639 // Spec: NOT_FOUND if the session does not belong to the accessing
640 // fabric. Peer-node mismatch within the same fabric is also
641 // NOT_FOUND to avoid leaking existence.
642 if s.fab_idx != fab_idx || s.peer_node_id != peer {
643 Err(ErrorCode::NotFound.into())
644 } else {
645 Ok(())
646 }
647 }
648
649 /// Encode and send a single [`OutboundWork`] item.
650 ///
651 /// Opens a new initiator exchange to the session's peer via
652 /// [`Exchange::initiate`] (which in turn uses the CASE session cache
653 /// and mDNS if needed), encodes the request payload into a fixed-size
654 /// scratch buffer, and invokes the paired `WebRTCTransportRequestor`
655 /// cluster on the peer's originating endpoint.
656 ///
657 /// Returns `Ok(())` without doing anything if the session has been
658 /// removed meanwhile (e.g. peer sent `EndSession` in the race window).
659 async fn push_outbound(
660 &self,
661 ctx: &impl HandlerContext,
662 work: OutboundWork,
663 ) -> Result<(), Error> {
664 let session_id = match work {
665 OutboundWork::Offer { session_id } => session_id,
666 OutboundWork::Answer { session_id } => session_id,
667 OutboundWork::IceCandidates { session_id } => session_id,
668 OutboundWork::End { session_id, .. } => session_id,
669 };
670 let Some(session) = self.session_copy(session_id) else {
671 // Session dropped concurrently — not an error, just a race.
672 return Ok(());
673 };
674
675 // Pre-fetch any async-supplied request payload (SDPs for
676 // Offer/Answer; the ICE-candidate snapshot is similarly
677 // taken outside its own build closure further down). The
678 // `take_*` hooks consume the respective queues, so they must
679 // run exactly once — outside the sync `FnMut` build closure
680 // that MRP may re-invoke on retransmit. The build closure
681 // then works off the snapshot and stays idempotent.
682 // `OutboundWork::End` carries its parameters by value and
683 // needs no pre-fetch.
684 let mut sdp_buf = [0u8; SDP_LEN];
685 let sdp_len = match &work {
686 OutboundWork::Offer { session_id } => self
687 .hooks
688 .take_offer_sdp(*session_id, &mut sdp_buf)
689 .await
690 .map_err(Error::from)?,
691 OutboundWork::Answer { session_id } => self
692 .hooks
693 .take_answer_sdp(*session_id, &mut sdp_buf)
694 .await
695 .map_err(Error::from)?,
696 _ => 0,
697 };
698 let sdp = core::str::from_utf8(&sdp_buf[..sdp_len])
699 .map_err(|_| Error::from(ErrorCode::Invalid))?;
700
701 // Offer side-effect: transition the session to AwaitingAnswer
702 // so a subsequent ProvideAnswer is accepted. Doing this here
703 // (rather than inside the build closure) keeps the transition
704 // idempotent under retransmit.
705 if let OutboundWork::Offer { session_id } = &work {
706 self.set_state(*session_id, SessionState::AwaitingAnswer);
707 }
708
709 // A WebRTC peer session is always on a real (non-zero) fabric.
710 let fab_idx = core::num::NonZeroU8::new(session.fab_idx).ok_or(ErrorCode::Invalid)?;
711 let exchange =
712 Exchange::initiate(ctx.matter(), ctx.crypto(), fab_idx, session.peer_node_id).await?;
713
714 // Single-shot client-trait invoke for all four commands. The
715 // codegen `WebRtcTransportRequestorClient` methods bake in
716 // cluster ID (0x0554), command ID, the request opcode
717 // (`InvokeRequest`), the MRP retransmit loop, chunk draining
718 // (trailing `StatusResponse(Success)` ACK) and IM-status-to-
719 // `Error` conversion. The build closure is a sync `FnMut`,
720 // so it must be idempotent under MRP retransmit; any state-
721 // changing hook (e.g. consuming the trickle-ICE queue) runs
722 // *outside* the closure.
723 let endpoint = session.peer_endpoint_id;
724 match &work {
725 OutboundWork::Offer { session_id } => {
726 let session_id = *session_id;
727 exchange
728 .web_rtc_transport_requestor()
729 .offer(endpoint, |req| {
730 req.web_rtc_session_id(session_id)?
731 .sdp(sdp)?
732 .ice_servers()?
733 .none()
734 .ice_transport_policy(None)?
735 .end()
736 })
737 .await?;
738 }
739 OutboundWork::Answer { session_id } => {
740 let session_id = *session_id;
741 exchange
742 .web_rtc_transport_requestor()
743 .answer(endpoint, |req| {
744 req.web_rtc_session_id(session_id)?.sdp(sdp)?.end()
745 })
746 .await?;
747 }
748 OutboundWork::IceCandidates { session_id } => {
749 let session_id = *session_id;
750 // Snapshot the candidates OUTSIDE the build closure
751 // (mirrors `take_offer_sdp` / `take_answer_sdp`). The
752 // hook consumes the queue; the sync FnMut build
753 // closure below iterates the snapshot, so MRP
754 // retransmits re-iterate the same data and the
755 // closure stays idempotent. `CAND_LEN` / `MAX_CAND`
756 // bound the per-invoke snapshot — candidates beyond
757 // the bound are dropped by `IceCandidateSink::push`
758 // and will not be sent this round (or any future one,
759 // since the hook consumed them).
760 let mut ice_buf: crate::utils::storage::Vec<heapless::String<CAND_LEN>, MAX_CAND> =
761 crate::utils::storage::Vec::new();
762 {
763 let mut sink = VecCandidateSink { buf: &mut ice_buf };
764 self.hooks
765 .take_ice_candidates(session_id, &mut sink)
766 .await
767 .map_err(Error::from)?;
768 }
769 exchange
770 .web_rtc_transport_requestor()
771 .ice_candidates(endpoint, |req| {
772 let req = req.web_rtc_session_id(session_id)?;
773 let mut arr = req.ice_candidates()?;
774 for cand in ice_buf.iter() {
775 arr = arr
776 .push()?
777 .candidate(cand.as_str())?
778 .sdp_mid(Nullable::none())?
779 .sdpm_line_index(Nullable::none())?
780 .end()?;
781 }
782 arr.end()?.end()
783 })
784 .await?;
785 }
786 OutboundWork::End { session_id, reason } => {
787 let session_id = *session_id;
788 let reason = *reason;
789 exchange
790 .web_rtc_transport_requestor()
791 .end(endpoint, |req| {
792 req.web_rtc_session_id(session_id)?.reason(reason)?.end()
793 })
794 .await?;
795 }
796 }
797 Ok(())
798 }
799}
800
801// ──────────────────────────────────────────────────────────────────────
802// ClusterAsyncHandler
803// ──────────────────────────────────────────────────────────────────────
804
805impl<
806 H: WebRtcHooks,
807 const N_SESSIONS: usize,
808 const SDP_LEN: usize,
809 const OUT_LEN: usize,
810 const CAND_LEN: usize,
811 const MAX_CAND: usize,
812 > decl::ClusterAsyncHandler
813 for WebRtcProvHandler<H, N_SESSIONS, SDP_LEN, OUT_LEN, CAND_LEN, MAX_CAND>
814{
815 const CLUSTER: Cluster<'static> = Self::CLUSTER;
816
817 fn dataver(&self) -> u32 {
818 self.dataver.get()
819 }
820
821 fn dataver_changed(&self) {
822 self.dataver.changed();
823 }
824
825 async fn run(&self, ctx: impl HandlerContext) -> Result<(), Error> {
826 // Drain loop: the hook parks in `next_outbound().await` until it
827 // has something to say, then we push it to the paired
828 // `WebRTCTransportRequestor` on the remote. Errors are logged
829 // (when a `log`/`defmt` feature is enabled) and the loop
830 // continues — a single failed push must not take down the whole
831 // cluster.
832 loop {
833 let work = self.hooks.next_outbound().await;
834 if let Err(err) = self.push_outbound(&ctx, work).await {
835 warn!("webrtc_prov: outbound push failed: {}", err);
836 }
837 }
838 }
839
840 async fn current_sessions<P: TLVBuilderParent>(
841 &self,
842 ctx: impl ReadContext,
843 builder: ArrayAttributeRead<
844 WebRTCSessionStructArrayBuilder<P>,
845 WebRTCSessionStructBuilder<P>,
846 >,
847 ) -> Result<P, Error> {
848 let attr = ctx.attr();
849
850 // Snapshot the filtered list so we don't hold the Mutex across
851 // any `?` bail-outs inside the builder chain.
852 let mut snapshot: Vec<SessionEntry, N_SESSIONS> = Vec::new();
853 self.sessions.lock(|cell| {
854 for s in cell.borrow().iter() {
855 if !attr.fab_filter || s.fab_idx == attr.fab_idx {
856 // `push` cannot fail: the snapshot is the same size
857 // as the source.
858 let _ = snapshot.push(*s);
859 }
860 }
861 });
862
863 match builder {
864 ArrayAttributeRead::ReadAll(mut arr) => {
865 for s in &snapshot {
866 arr = encode_session_struct(arr.push()?, s)?;
867 }
868 arr.end()
869 }
870 ArrayAttributeRead::ReadOne(index, b) => {
871 let s = snapshot
872 .get(index as usize)
873 .ok_or(Error::from(ErrorCode::ConstraintError))?;
874 encode_session_struct(b, s)
875 }
876 ArrayAttributeRead::ReadNone(b) => b.end(),
877 }
878 }
879
880 async fn handle_solicit_offer<P: TLVBuilderParent>(
881 &self,
882 ctx: impl InvokeContext,
883 request: decl::SolicitOfferRequest<'_>,
884 response: decl::SolicitOfferResponseBuilder<P>,
885 ) -> Result<P, Error> {
886 let cmd = ctx.cmd();
887 let fab_idx = cmd.fab_idx;
888 let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
889
890 let params = OfferParams {
891 stream_usage: request.stream_usage()?,
892 originating_endpoint_id: request.originating_endpoint_id()?,
893 video_stream_id: request.video_stream_id()?.map(|n| n.into_option()),
894 audio_stream_id: request.audio_stream_id()?.map(|n| n.into_option()),
895 metadata_enabled: request.metadata_enabled()?.unwrap_or(false),
896 };
897
898 let session_id = self.allocate_id();
899
900 let outcome = self
901 .hooks
902 .on_solicit_offer(session_id, ¶ms)
903 .await
904 .map_err(Error::from)?;
905 let state = if outcome.deferred {
906 SessionState::AwaitingDeferredOffer
907 } else {
908 SessionState::AwaitingAnswer
909 };
910
911 self.upsert_session(SessionEntry {
912 id: session_id,
913 fab_idx,
914 peer_node_id,
915 peer_endpoint_id: params.originating_endpoint_id,
916 stream_usage: params.stream_usage,
917 video_stream_id: outcome.video_stream_id,
918 audio_stream_id: outcome.audio_stream_id,
919 metadata_enabled: params.metadata_enabled,
920 state,
921 })?;
922 ctx.notify_own_attr_changed(AttributeId::CurrentSessions as _);
923
924 response
925 .web_rtc_session_id(session_id)?
926 .deferred_offer(outcome.deferred)?
927 .video_stream_id(wrap_opt_u16_nullable(outcome.video_stream_id))?
928 .audio_stream_id(wrap_opt_u16_nullable(outcome.audio_stream_id))?
929 .end()
930 }
931
932 async fn handle_provide_offer<P: TLVBuilderParent>(
933 &self,
934 ctx: impl InvokeContext,
935 request: decl::ProvideOfferRequest<'_>,
936 response: decl::ProvideOfferResponseBuilder<P>,
937 ) -> Result<P, Error> {
938 let cmd = ctx.cmd();
939 let fab_idx = cmd.fab_idx;
940 let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
941
942 let sdp = request.sdp()?;
943 if sdp.len() > SDP_LEN {
944 return Err(ErrorCode::ConstraintError.into());
945 }
946
947 let params = OfferParams {
948 stream_usage: request.stream_usage()?,
949 originating_endpoint_id: request.originating_endpoint_id()?,
950 video_stream_id: request.video_stream_id()?.map(|n| n.into_option()),
951 audio_stream_id: request.audio_stream_id()?.map(|n| n.into_option()),
952 metadata_enabled: request.metadata_enabled()?.unwrap_or(false),
953 };
954
955 // Spec: NULL session ID = allocate new; non-null = existing session.
956 let session_id = match request.web_rtc_session_id()?.into_option() {
957 None => self.allocate_id(),
958 Some(id) => {
959 let s = self
960 .session_copy(id)
961 .ok_or(Error::from(ErrorCode::NotFound))?;
962 self.check_peer(&s, fab_idx, peer_node_id)?;
963 id
964 }
965 };
966
967 let outcome = self
968 .hooks
969 .on_offer(session_id, sdp, ¶ms)
970 .await
971 .map_err(Error::from)?;
972
973 self.upsert_session(SessionEntry {
974 id: session_id,
975 fab_idx,
976 peer_node_id,
977 peer_endpoint_id: params.originating_endpoint_id,
978 stream_usage: params.stream_usage,
979 video_stream_id: outcome.video_stream_id,
980 audio_stream_id: outcome.audio_stream_id,
981 metadata_enabled: params.metadata_enabled,
982 state: SessionState::Established,
983 })?;
984 ctx.notify_own_attr_changed(AttributeId::CurrentSessions as _);
985
986 response
987 .web_rtc_session_id(session_id)?
988 .video_stream_id(wrap_opt_u16_nullable(outcome.video_stream_id))?
989 .audio_stream_id(wrap_opt_u16_nullable(outcome.audio_stream_id))?
990 .end()
991 }
992
993 async fn handle_provide_answer(
994 &self,
995 ctx: impl InvokeContext,
996 request: decl::ProvideAnswerRequest<'_>,
997 ) -> Result<(), Error> {
998 let cmd = ctx.cmd();
999 let fab_idx = cmd.fab_idx;
1000 let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
1001
1002 let session_id = request.web_rtc_session_id()?;
1003 let sdp = request.sdp()?;
1004 if sdp.len() > SDP_LEN {
1005 return Err(ErrorCode::ConstraintError.into());
1006 }
1007
1008 let session = self
1009 .session_copy(session_id)
1010 .ok_or(Error::from(ErrorCode::NotFound))?;
1011 self.check_peer(&session, fab_idx, peer_node_id)?;
1012
1013 // Spec: ProvideAnswer is only valid when we sent the Offer.
1014 match session.state {
1015 SessionState::AwaitingAnswer | SessionState::AwaitingDeferredOffer => {}
1016 SessionState::Established => return Err(ErrorCode::InvalidAction.into()),
1017 }
1018
1019 self.hooks
1020 .on_answer(session_id, sdp)
1021 .await
1022 .map_err(Error::from)?;
1023 self.set_state(session_id, SessionState::Established);
1024 Ok(())
1025 }
1026
1027 async fn handle_provide_ice_candidates(
1028 &self,
1029 ctx: impl InvokeContext,
1030 request: decl::ProvideICECandidatesRequest<'_>,
1031 ) -> Result<(), Error> {
1032 let cmd = ctx.cmd();
1033 let fab_idx = cmd.fab_idx;
1034 let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
1035
1036 let session_id = request.web_rtc_session_id()?;
1037 let session = self
1038 .session_copy(session_id)
1039 .ok_or(Error::from(ErrorCode::NotFound))?;
1040 self.check_peer(&session, fab_idx, peer_node_id)?;
1041
1042 let candidates = request.ice_candidates()?;
1043 self.hooks
1044 .on_ice_candidates(session_id, &candidates)
1045 .await
1046 .map_err(Error::from)
1047 }
1048
1049 async fn handle_end_session(
1050 &self,
1051 ctx: impl InvokeContext,
1052 request: decl::EndSessionRequest<'_>,
1053 ) -> Result<(), Error> {
1054 let cmd = ctx.cmd();
1055 let fab_idx = cmd.fab_idx;
1056 let peer_node_id = exchange_peer_node_id(ctx.exchange())?;
1057
1058 let session_id = request.web_rtc_session_id()?;
1059 let reason = request.reason()?;
1060
1061 let session = self
1062 .session_copy(session_id)
1063 .ok_or(Error::from(ErrorCode::NotFound))?;
1064 self.check_peer(&session, fab_idx, peer_node_id)?;
1065
1066 // Best-effort notify the hooks; even if they fail we drop the row.
1067 let _ = self.hooks.on_end_session(session_id, reason).await;
1068 self.remove_session(session_id);
1069 ctx.notify_own_attr_changed(AttributeId::CurrentSessions as _);
1070 Ok(())
1071 }
1072}
1073
1074// ──────────────────────────────────────────────────────────────────────
1075// Helpers
1076// ──────────────────────────────────────────────────────────────────────
1077
1078/// Extract the peer node ID from an incoming exchange's session state.
1079fn exchange_peer_node_id(exchange: &Exchange<'_>) -> Result<u64, Error> {
1080 exchange.with_state(|state| {
1081 let sess = exchange.id().session(&mut state.sessions);
1082 sess.get_peer_node_id().ok_or(ErrorCode::Invalid.into())
1083 })
1084}
1085
1086/// Convert `Option<u16>` (internal storage) into `Option<Nullable<u16>>`
1087/// suitable for response builders that treat the field as BOTH optional
1088/// and nullable. We always emit the field (`Some(_)`); `None` stream ID
1089/// is represented as `Nullable::none()`.
1090fn wrap_opt_u16_nullable(v: Option<u16>) -> Option<Nullable<u16>> {
1091 Some(match v {
1092 Some(x) => Nullable::some(x),
1093 None => Nullable::none(),
1094 })
1095}
1096
1097/// Emit a single `WebRTCSessionStruct` into the TLV stream.
1098fn encode_session_struct<P: TLVBuilderParent>(
1099 b: WebRTCSessionStructBuilder<P>,
1100 s: &SessionEntry,
1101) -> Result<P, Error> {
1102 let video = match s.video_stream_id {
1103 Some(x) => Nullable::some(x),
1104 None => Nullable::none(),
1105 };
1106 let audio = match s.audio_stream_id {
1107 Some(x) => Nullable::some(x),
1108 None => Nullable::none(),
1109 };
1110 b.id(s.id)?
1111 .peer_node_id(s.peer_node_id)?
1112 .peer_endpoint_id(s.peer_endpoint_id)?
1113 .stream_usage(s.stream_usage)?
1114 .video_stream_id(video)?
1115 .audio_stream_id(audio)?
1116 .metadata_enabled(s.metadata_enabled)?
1117 .video_streams()?
1118 .none()
1119 .audio_streams()?
1120 .none()
1121 .fabric_index(Some(s.fab_idx))?
1122 .end()
1123}