Skip to main content

rs_matter/dm/clusters/app/
push_av_stream.rs

1/*
2 *
3 *    Copyright (c) 2026 Project CHIP Authors
4 *
5 *    Licensed under the Apache License, Version 2.0 (the "License");
6 *    you may not use this file except in compliance with the License.
7 *    You may obtain a copy of the License at
8 *
9 *        http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *    Unless required by applicable law or agreed to in writing, software
12 *    distributed under the License is distributed on an "AS IS" BASIS,
13 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *    See the License for the specific language governing permissions and
15 *    limitations under the License.
16 */
17
18//! Implementation of the Matter Push AV Stream Transport cluster (0x0555).
19//!
20//! Lets a camera push audio/video streams to an external endpoint (e.g.
21//! a cloud recorder or NVR) using CMAF/DASH over TLS, as an alternative
22//! to the pull-style WebRTC transport.
23//!
24//! # Architecture (Pattern B1 — "Hooks")
25//!
26//! [`PushAvStreamHandler`] owns the spec-defined state — the
27//! `CurrentConnections` table, stable connection IDs, the
28//! `SupportedFormats` catalogue — and performs all spec validation
29//! before delegating the side-effecting bits (open / modify / close
30//! the actual outgoing TLS push) to a user-supplied
31//! [`PushAvStreamHooks`] implementation.
32//!
33//! # Storage
34//!
35//! `TransportOptionsStruct` is large (11 fields including 2 nested
36//! structs and 2 nullable arrays). Rather than mirror every leaf in
37//! a dedicated owned type, the handler captures the request's TLV
38//! bytes verbatim into a per-connection
39//! [`Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>`] and replays them on
40//! attribute reads / `FindTransport`. This keeps the in-memory
41//! footprint predictable and faithful to whatever the controller
42//! sent without round-tripping through Rust types we'd have to keep
43//! in sync with codegen.
44//!
45//! # Const generics
46//!
47//! * `NC` — maximum number of concurrently allocated push connections.
48//!   Spec MinLimit is 1 for any device advertising the cluster;
49//!   typical values 2..=8.
50//!
51//! # Scope of v1
52//!
53//! * Full validation, allocation, modification, deallocation,
54//!   set-status, manually-trigger, find of push transport
55//!   connections.
56//! * `SupportedFormats` advertises the `(ContainerFormat, IngestMethod)`
57//!   pairs the device can produce.
58//! * Round-trip of `TransportOptionsStruct` is byte-for-byte (raw
59//!   TLV pass-through) on `CurrentConnections` reads and
60//!   `FindTransport` responses.
61//! * NOT in scope (left to the application via hooks):
62//!   `PerZoneSensitivity` / `Metadata` features beyond echoing the
63//!   client-supplied bits in stored options. The cluster declares
64//!   neither feature in [`Self::CLUSTER`]; build a custom `Cluster`
65//!   value if you need them.
66
67use core::cell::{Cell, RefCell};
68
69use crate::dm::FabricIndex;
70use crate::dm::{ArrayAttributeRead, Cluster, Dataver, EndptId, InvokeContext, ReadContext};
71use crate::error::{Error, ErrorCode};
72use crate::tlv::{TLVBuilderParent, TLVElement, TLVTag, ToTLV};
73use crate::utils::storage::{Vec, WriteBuf};
74use crate::utils::sync::blocking::Mutex;
75use crate::with;
76
77#[allow(unused_imports)]
78pub use crate::dm::clusters::decl::push_av_stream_transport::*;
79
80use super::super::decl::push_av_stream_transport as decl;
81
82/// Maximum size, in bytes, of a single connection's serialized
83/// `TransportOptionsStruct`. Sized to fit the worst-case CMAF-with-CENC
84/// shape (URL up to 256 chars, two CENC keys, motion zones). Increase
85/// at the call site by re-sizing the [`Vec`] type if your deployment
86/// pushes the limit.
87pub const MAX_TRANSPORT_OPTIONS_BYTES: usize = 768;
88
89/// Errors a [`PushAvStreamHooks`] implementation can surface back to
90/// the cluster. Each maps to a Matter cluster-status code (most via
91/// the generic `ErrorCode` set; cluster-specific codes are emitted
92/// directly by the handler from [`Self::CLUSTER`]).
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
94#[cfg_attr(feature = "defmt", derive(defmt::Format))]
95pub enum PushAvError {
96    /// `FAILURE` — generic hooks-level failure.
97    Failure,
98    /// `NOT_FOUND` — referenced connection ID does not exist.
99    NotFound,
100    /// `RESOURCE_EXHAUSTED` — application cannot accept another
101    /// concurrent push.
102    ResourceExhausted,
103    /// `CONSTRAINT_ERROR` — combination of params is unsupported at
104    /// runtime (e.g. URL host unreachable, codec unavailable on the
105    /// requested stream).
106    DynamicConstraint,
107    /// `INVALID_IN_STATE` — cluster-specific (e.g. trigger before
108    /// allocate).
109    InvalidInState,
110}
111
112impl From<PushAvError> for Error {
113    fn from(e: PushAvError) -> Self {
114        match e {
115            PushAvError::Failure => ErrorCode::Failure.into(),
116            PushAvError::NotFound => ErrorCode::NotFound.into(),
117            PushAvError::ResourceExhausted => ErrorCode::ResourceExhausted.into(),
118            PushAvError::DynamicConstraint => ErrorCode::ConstraintError.into(),
119            PushAvError::InvalidInState => ErrorCode::InvalidAction.into(),
120        }
121    }
122}
123
124/// One row in the `SupportedFormats` attribute.
125#[derive(Debug, Clone, Copy)]
126#[cfg_attr(feature = "defmt", derive(defmt::Format))]
127pub struct SupportedFormat {
128    pub container_format: ContainerFormatEnum,
129    pub ingest_method: IngestMethodsEnum,
130}
131
132/// One row in the `CurrentConnections` attribute. Stored verbatim by
133/// [`PushAvStreamHandler`].
134#[derive(Debug, Clone)]
135pub struct PushConnection {
136    /// Stable, server-assigned ID handed back in
137    /// `AllocatePushTransportResponse`.
138    pub connection_id: u16,
139    /// Fabric the connection belongs to (taken from the invoking
140    /// command's accessing fabric — fabric-scoped per spec).
141    pub fabric_index: FabricIndex,
142    /// Mutable status flipped by `SetTransportStatus` /
143    /// `ManuallyTriggerTransport`.
144    pub status: TransportStatusEnum,
145    /// Raw TLV bytes of the `TransportOptionsStruct` as received in
146    /// the original `AllocatePushTransport` (or, after a successful
147    /// `ModifyPushTransport`, the new options). Replayed on
148    /// attribute reads.
149    pub transport_options: Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>,
150}
151
152/// Static configuration for a [`PushAvStreamHandler`]. Currently just
153/// the supported-formats catalogue; expand here when you need
154/// per-deployment knobs.
155#[derive(Debug, Clone, Copy)]
156pub struct PushAvStreamConfig<'a> {
157    pub supported_formats: &'a [SupportedFormat],
158}
159
160/// Application hooks. All spec validation (URL non-empty, stream
161/// reference checks if you wire them up via the cam-av-stream
162/// reference-counting helpers, capacity, fabric scoping) is done by
163/// [`PushAvStreamHandler`] before any of these methods run.
164///
165/// Implementors only need to interact with their actual upload
166/// pipeline (open the TLS connection, push CMAF segments, etc.).
167pub trait PushAvStreamHooks {
168    /// Called when a new push connection has been validated and
169    /// assigned a stable ID. The implementation should provision the
170    /// upload pipeline; returning `Err` aborts the allocation (the
171    /// connection is NOT added to `CurrentConnections` and no ID is
172    /// returned to the controller).
173    fn on_allocate(
174        &self,
175        connection_id: u16,
176        fabric_index: FabricIndex,
177        request: &AllocatePushTransportRequest<'_>,
178    ) -> impl core::future::Future<Output = Result<(), PushAvError>>;
179
180    /// Called when a controller requests `DeallocatePushTransport`
181    /// AND the handler has confirmed the connection exists and is
182    /// owned by the accessing fabric. On `Err` the row is left in
183    /// place.
184    fn on_deallocate(
185        &self,
186        _connection_id: u16,
187    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
188        async { Ok(()) }
189    }
190
191    /// Called on `ModifyPushTransport`. The handler has already
192    /// confirmed existence and fabric scoping; on success the stored
193    /// raw options are replaced.
194    fn on_modify(
195        &self,
196        _connection_id: u16,
197        _request: &ModifyPushTransportRequest<'_>,
198    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
199        async { Ok(()) }
200    }
201
202    /// Called on `SetTransportStatus`. `connection_id == None` means
203    /// "all connections of the accessing fabric" per spec. Status is
204    /// applied to the cluster table on `Ok`.
205    fn on_set_status(
206        &self,
207        _connection_id: Option<u16>,
208        _status: TransportStatusEnum,
209    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
210        async { Ok(()) }
211    }
212
213    /// Called on `ManuallyTriggerTransport` — request the upload
214    /// pipeline to begin (or extend) a push. `time_control` carries
215    /// the optional motion-trigger envelope; `user_defined` carries
216    /// the optional opaque blob from the spec's `userDefined` field.
217    fn on_manually_trigger(
218        &self,
219        _connection_id: u16,
220        _activation_reason: TriggerActivationReasonEnum,
221        _time_control: Option<TransportMotionTriggerTimeControlStruct<'_>>,
222        _user_defined: Option<&[u8]>,
223    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
224        async { Ok(()) }
225    }
226}
227
228impl<T> PushAvStreamHooks for &T
229where
230    T: PushAvStreamHooks,
231{
232    fn on_allocate(
233        &self,
234        connection_id: u16,
235        fabric_index: FabricIndex,
236        request: &AllocatePushTransportRequest<'_>,
237    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
238        (*self).on_allocate(connection_id, fabric_index, request)
239    }
240
241    fn on_deallocate(
242        &self,
243        connection_id: u16,
244    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
245        (*self).on_deallocate(connection_id)
246    }
247
248    fn on_modify(
249        &self,
250        connection_id: u16,
251        request: &ModifyPushTransportRequest<'_>,
252    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
253        (*self).on_modify(connection_id, request)
254    }
255
256    fn on_set_status(
257        &self,
258        connection_id: Option<u16>,
259        status: TransportStatusEnum,
260    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
261        (*self).on_set_status(connection_id, status)
262    }
263
264    fn on_manually_trigger(
265        &self,
266        connection_id: u16,
267        activation_reason: TriggerActivationReasonEnum,
268        time_control: Option<TransportMotionTriggerTimeControlStruct<'_>>,
269        user_defined: Option<&[u8]>,
270    ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
271        (*self).on_manually_trigger(connection_id, activation_reason, time_control, user_defined)
272    }
273}
274
275struct State<const NC: usize> {
276    connections: Vec<PushConnection, NC>,
277}
278
279impl<const NC: usize> State<NC> {
280    const fn new() -> Self {
281        Self {
282            connections: Vec::new(),
283        }
284    }
285}
286
287/// Handler for the Push AV Stream Transport cluster (0x0555).
288pub struct PushAvStreamHandler<'a, H, const NC: usize>
289where
290    H: PushAvStreamHooks,
291{
292    dataver: Dataver,
293    endpoint_id: EndptId,
294    config: PushAvStreamConfig<'a>,
295    hooks: H,
296    state: Mutex<RefCell<State<NC>>>,
297    next_id: Mutex<Cell<u16>>,
298}
299
300impl<'a, H, const NC: usize> PushAvStreamHandler<'a, H, NC>
301where
302    H: PushAvStreamHooks,
303{
304    /// Cluster metadata advertising the mandatory attribute / command
305    /// set without optional features. Build a custom [`Cluster`]
306    /// value via `decl::FULL_CLUSTER.with_features(...)` for
307    /// `PER_ZONE_SENSITIVITY` / `METADATA`.
308    pub const CLUSTER: Cluster<'static> = decl::FULL_CLUSTER
309        .with_revision(2)
310        .with_attrs(with!(
311            required;
312            AttributeId::SupportedFormats | AttributeId::CurrentConnections
313        ))
314        .with_cmds(with!(
315            decl::CommandId::AllocatePushTransport
316                | decl::CommandId::DeallocatePushTransport
317                | decl::CommandId::ModifyPushTransport
318                | decl::CommandId::SetTransportStatus
319                | decl::CommandId::ManuallyTriggerTransport
320                | decl::CommandId::FindTransport
321        ));
322
323    /// Construct a new handler.
324    pub const fn new(
325        dataver: Dataver,
326        endpoint_id: EndptId,
327        config: PushAvStreamConfig<'a>,
328        hooks: H,
329    ) -> Self {
330        Self {
331            dataver,
332            endpoint_id,
333            config,
334            hooks,
335            state: Mutex::new(RefCell::new(State::new())),
336            next_id: Mutex::new(Cell::new(1)),
337        }
338    }
339
340    /// Wrap in the generic async adaptor for registration with a
341    /// `rs-matter` `Node`.
342    pub const fn adapt(self) -> decl::HandlerAsyncAdaptor<Self> {
343        decl::HandlerAsyncAdaptor(self)
344    }
345
346    /// Endpoint this handler is mounted on.
347    pub const fn endpoint_id(&self) -> EndptId {
348        self.endpoint_id
349    }
350
351    /// Snapshot the current connections. Useful for diagnostics.
352    pub fn connections(&self) -> Vec<PushConnection, NC> {
353        self.state.lock(|cell| cell.borrow().connections.clone())
354    }
355
356    /// Allocate the next free `connection_id` (wraps to 1 on `u16`
357    /// overflow).
358    fn alloc_id(&self) -> u16 {
359        self.next_id.lock(|cell| {
360            let mut id = cell.get();
361            if id == 0 {
362                id = 1;
363            }
364            cell.set(id.wrapping_add(1).max(1));
365            id
366        })
367    }
368
369    /// Capture `options` (a request-side `TransportOptionsStruct`)
370    /// into a fresh raw-TLV buffer.
371    fn capture_options(
372        &self,
373        options: &TransportOptionsStruct<'_>,
374    ) -> Result<Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>, Error> {
375        let mut buf = [0u8; MAX_TRANSPORT_OPTIONS_BYTES];
376        let mut wb = WriteBuf::new(&mut buf);
377        options.to_tlv(&TLVTag::Anonymous, &mut wb)?;
378        let bytes = wb.as_slice();
379        let mut stored: Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES> = Vec::new();
380        stored
381            .extend_from_slice(bytes)
382            .map_err(|_| Error::from(ErrorCode::ResourceExhausted))?;
383        Ok(stored)
384    }
385}
386
387impl<'a, H, const NC: usize> ClusterAsyncHandler for PushAvStreamHandler<'a, H, NC>
388where
389    H: PushAvStreamHooks,
390{
391    const CLUSTER: Cluster<'static> = Self::CLUSTER;
392
393    fn dataver(&self) -> u32 {
394        self.dataver.get()
395    }
396
397    fn dataver_changed(&self) {
398        self.dataver.changed();
399    }
400
401    // ----- Attributes -----
402
403    async fn supported_formats<P: TLVBuilderParent>(
404        &self,
405        _ctx: impl ReadContext,
406        builder: ArrayAttributeRead<
407            SupportedFormatStructArrayBuilder<P>,
408            SupportedFormatStructBuilder<P>,
409        >,
410    ) -> Result<P, Error> {
411        match builder {
412            ArrayAttributeRead::ReadAll(mut b) => {
413                for f in self.config.supported_formats {
414                    b = write_supported_format(b.push()?, f)?;
415                }
416                b.end()
417            }
418            ArrayAttributeRead::ReadOne(idx, b) => {
419                let Some(f) = self.config.supported_formats.get(idx as usize) else {
420                    return Err(ErrorCode::ConstraintError.into());
421                };
422                write_supported_format(b, f)
423            }
424            ArrayAttributeRead::ReadNone(b) => b.end(),
425        }
426    }
427
428    async fn current_connections<P: TLVBuilderParent>(
429        &self,
430        ctx: impl ReadContext,
431        builder: ArrayAttributeRead<
432            TransportConfigurationStructArrayBuilder<P>,
433            TransportConfigurationStructBuilder<P>,
434        >,
435    ) -> Result<P, Error> {
436        let attr = ctx.attr();
437        let mut snapshot: Vec<PushConnection, NC> = Vec::new();
438        self.state.lock(|cell| {
439            for c in cell.borrow().connections.iter() {
440                if !attr.fab_filter || c.fabric_index == attr.fab_idx {
441                    let _ = snapshot.push(c.clone());
442                }
443            }
444        });
445
446        match builder {
447            ArrayAttributeRead::ReadAll(mut b) => {
448                for c in snapshot.iter() {
449                    b = write_connection(b.push()?, c)?;
450                }
451                b.end()
452            }
453            ArrayAttributeRead::ReadOne(idx, b) => {
454                let Some(c) = snapshot.get(idx as usize) else {
455                    return Err(ErrorCode::ConstraintError.into());
456                };
457                write_connection(b, c)
458            }
459            ArrayAttributeRead::ReadNone(b) => b.end(),
460        }
461    }
462
463    // ----- Commands -----
464
465    async fn handle_allocate_push_transport<P: TLVBuilderParent>(
466        &self,
467        ctx: impl InvokeContext,
468        request: AllocatePushTransportRequest<'_>,
469        response: AllocatePushTransportResponseBuilder<P>,
470    ) -> Result<P, Error> {
471        let cmd = ctx.cmd();
472        let fab_idx = cmd.fab_idx;
473
474        let options = request.transport_options()?;
475
476        // Spec: URL must be non-empty.
477        let url = options.url()?;
478        if url.is_empty() {
479            return Err(ErrorCode::ConstraintError.into());
480        }
481
482        // Capacity.
483        let full = self
484            .state
485            .lock(|cell| cell.borrow().connections.len() >= NC);
486        if full {
487            return Err(ErrorCode::ResourceExhausted.into());
488        }
489
490        // Capture raw options before crossing the await boundary so
491        // we don't re-borrow the request after the hook returns.
492        let stored_options = self.capture_options(&options)?;
493
494        let connection_id = self.alloc_id();
495        self.hooks
496            .on_allocate(connection_id, fab_idx, &request)
497            .await?;
498
499        let pushed = self.state.lock(|cell| {
500            let mut state = cell.borrow_mut();
501            state
502                .connections
503                .push(PushConnection {
504                    connection_id,
505                    fabric_index: fab_idx,
506                    status: TransportStatusEnum::Inactive,
507                    transport_options: stored_options,
508                })
509                .is_ok()
510        });
511        if !pushed {
512            let _ = self.hooks.on_deallocate(connection_id).await;
513            return Err(ErrorCode::ResourceExhausted.into());
514        }
515        ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
516
517        // Build the response: echo the just-stored connection.
518        let snapshot = self
519            .state
520            .lock(|cell| cell.borrow().connections.last().cloned())
521            .ok_or(Error::from(ErrorCode::Failure))?;
522        let cfg = response.transport_configuration()?;
523        write_connection(cfg, &snapshot)?.end()
524    }
525
526    async fn handle_deallocate_push_transport(
527        &self,
528        ctx: impl InvokeContext,
529        request: DeallocatePushTransportRequest<'_>,
530    ) -> Result<(), Error> {
531        let cmd = ctx.cmd();
532        let fab_idx = cmd.fab_idx;
533        let connection_id = request.connection_id()?;
534
535        let exists = self.state.lock(|cell| {
536            cell.borrow()
537                .connections
538                .iter()
539                .any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
540        });
541        if !exists {
542            return Err(ErrorCode::NotFound.into());
543        }
544
545        self.hooks.on_deallocate(connection_id).await?;
546
547        self.state.lock(|cell| {
548            let mut state = cell.borrow_mut();
549            state
550                .connections
551                .retain(|c| !(c.connection_id == connection_id && c.fabric_index == fab_idx));
552        });
553        ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
554        Ok(())
555    }
556
557    async fn handle_modify_push_transport(
558        &self,
559        ctx: impl InvokeContext,
560        request: ModifyPushTransportRequest<'_>,
561    ) -> Result<(), Error> {
562        let cmd = ctx.cmd();
563        let fab_idx = cmd.fab_idx;
564        let connection_id = request.connection_id()?;
565        let options = request.transport_options()?;
566
567        let url = options.url()?;
568        if url.is_empty() {
569            return Err(ErrorCode::ConstraintError.into());
570        }
571
572        let exists = self.state.lock(|cell| {
573            cell.borrow()
574                .connections
575                .iter()
576                .any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
577        });
578        if !exists {
579            return Err(ErrorCode::NotFound.into());
580        }
581
582        let stored_options = self.capture_options(&options)?;
583
584        self.hooks.on_modify(connection_id, &request).await?;
585
586        self.state.lock(|cell| {
587            let mut state = cell.borrow_mut();
588            if let Some(row) = state
589                .connections
590                .iter_mut()
591                .find(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
592            {
593                row.transport_options = stored_options;
594            }
595        });
596        ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
597        Ok(())
598    }
599
600    async fn handle_set_transport_status(
601        &self,
602        ctx: impl InvokeContext,
603        request: SetTransportStatusRequest<'_>,
604    ) -> Result<(), Error> {
605        let cmd = ctx.cmd();
606        let fab_idx = cmd.fab_idx;
607        let connection_id = request.connection_id()?.into_option();
608        let status = request.transport_status()?;
609
610        // If a specific connection is targeted, validate existence + ownership.
611        if let Some(id) = connection_id {
612            let owned = self.state.lock(|cell| {
613                cell.borrow()
614                    .connections
615                    .iter()
616                    .any(|c| c.connection_id == id && c.fabric_index == fab_idx)
617            });
618            if !owned {
619                return Err(ErrorCode::NotFound.into());
620            }
621        }
622
623        self.hooks.on_set_status(connection_id, status).await?;
624
625        self.state.lock(|cell| {
626            let mut state = cell.borrow_mut();
627            for c in state.connections.iter_mut() {
628                if c.fabric_index != fab_idx {
629                    continue;
630                }
631                match connection_id {
632                    Some(id) if c.connection_id != id => continue,
633                    _ => {}
634                }
635                c.status = status;
636            }
637        });
638        ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
639        Ok(())
640    }
641
642    async fn handle_manually_trigger_transport(
643        &self,
644        ctx: impl InvokeContext,
645        request: ManuallyTriggerTransportRequest<'_>,
646    ) -> Result<(), Error> {
647        let cmd = ctx.cmd();
648        let fab_idx = cmd.fab_idx;
649        let connection_id = request.connection_id()?;
650        let reason = request.activation_reason()?;
651        let time_control = request.time_control()?;
652        let user_defined = request.user_defined()?;
653
654        let owned = self.state.lock(|cell| {
655            cell.borrow()
656                .connections
657                .iter()
658                .any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
659        });
660        if !owned {
661            return Err(ErrorCode::NotFound.into());
662        }
663
664        self.hooks
665            .on_manually_trigger(
666                connection_id,
667                reason,
668                time_control,
669                user_defined.map(|s| s.0),
670            )
671            .await?;
672        Ok(())
673    }
674
675    async fn handle_find_transport<P: TLVBuilderParent>(
676        &self,
677        ctx: impl InvokeContext,
678        request: FindTransportRequest<'_>,
679        response: FindTransportResponseBuilder<P>,
680    ) -> Result<P, Error> {
681        let cmd = ctx.cmd();
682        let fab_idx = cmd.fab_idx;
683        let connection_id = request.connection_id()?.into_option();
684
685        // Snapshot matching rows.
686        let mut snapshot: Vec<PushConnection, NC> = Vec::new();
687        self.state.lock(|cell| {
688            for c in cell.borrow().connections.iter() {
689                if c.fabric_index != fab_idx {
690                    continue;
691                }
692                if let Some(id) = connection_id {
693                    if c.connection_id != id {
694                        continue;
695                    }
696                }
697                let _ = snapshot.push(c.clone());
698            }
699        });
700
701        // Spec: NOT_FOUND if no matching transport.
702        if snapshot.is_empty() {
703            return Err(ErrorCode::NotFound.into());
704        }
705
706        let mut arr = response.transport_configurations()?;
707        for c in snapshot.iter() {
708            arr = write_connection(arr.push()?, c)?;
709        }
710        arr.end()?.end()
711    }
712}
713
714fn write_supported_format<P: TLVBuilderParent>(
715    builder: SupportedFormatStructBuilder<P>,
716    f: &SupportedFormat,
717) -> Result<P, Error> {
718    builder
719        .container_format(f.container_format)?
720        .ingest_method(f.ingest_method)?
721        .end()
722}
723
724/// Replay a stored [`PushConnection`] into a
725/// `TransportConfigurationStructBuilder<P>`.
726///
727/// Slot 0 (`connection_id`) and slot 1 (`transport_status`) are
728/// written via the typed builder; slot 2 (`transport_options`,
729/// optional) is sidestepped via `OptionalBuilder::none()` and
730/// the stored raw TLV bytes are spliced in directly under
731/// `Context(2)` — this lets us round-trip arbitrarily complex
732/// `TransportOptionsStruct` values without reconstructing every
733/// nested struct field-by-field.
734fn write_connection<P: TLVBuilderParent>(
735    builder: TransportConfigurationStructBuilder<P>,
736    c: &PushConnection,
737) -> Result<P, Error> {
738    let b = builder
739        .connection_id(c.connection_id)?
740        .transport_status(c.status)?;
741    // Skip the typed builder for slot 2 and write the raw stored TLV
742    // directly under Context(2). The const-generic state advances to
743    // 254 either way, preserving slot ordering for `fabric_index`.
744    let mut b = b.transport_options()?.none();
745    if !c.transport_options.is_empty() {
746        let element = TLVElement::new(c.transport_options.as_slice());
747        element.to_tlv(&TLVTag::Context(2), b.writer())?;
748    }
749    b.fabric_index(Some(c.fabric_index))?.end()
750}