1use core::cell::{Cell, RefCell};
68
69use crate::dm::FabricIndex;
70use crate::dm::{ArrayAttributeRead, Cluster, Dataver, EndptId, InvokeContext, ReadContext};
71use crate::error::{Error, ErrorCode};
72use crate::tlv::{TLVBuilderParent, TLVElement, TLVTag, ToTLV};
73use crate::utils::storage::{Vec, WriteBuf};
74use crate::utils::sync::blocking::Mutex;
75use crate::with;
76
77#[allow(unused_imports)]
78pub use crate::dm::clusters::decl::push_av_stream_transport::*;
79
80use super::super::decl::push_av_stream_transport as decl;
81
82pub const MAX_TRANSPORT_OPTIONS_BYTES: usize = 768;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
94#[cfg_attr(feature = "defmt", derive(defmt::Format))]
95pub enum PushAvError {
96 Failure,
98 NotFound,
100 ResourceExhausted,
103 DynamicConstraint,
107 InvalidInState,
110}
111
112impl From<PushAvError> for Error {
113 fn from(e: PushAvError) -> Self {
114 match e {
115 PushAvError::Failure => ErrorCode::Failure.into(),
116 PushAvError::NotFound => ErrorCode::NotFound.into(),
117 PushAvError::ResourceExhausted => ErrorCode::ResourceExhausted.into(),
118 PushAvError::DynamicConstraint => ErrorCode::ConstraintError.into(),
119 PushAvError::InvalidInState => ErrorCode::InvalidAction.into(),
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy)]
126#[cfg_attr(feature = "defmt", derive(defmt::Format))]
127pub struct SupportedFormat {
128 pub container_format: ContainerFormatEnum,
129 pub ingest_method: IngestMethodsEnum,
130}
131
132#[derive(Debug, Clone)]
135pub struct PushConnection {
136 pub connection_id: u16,
139 pub fabric_index: FabricIndex,
142 pub status: TransportStatusEnum,
145 pub transport_options: Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>,
150}
151
152#[derive(Debug, Clone, Copy)]
156pub struct PushAvStreamConfig<'a> {
157 pub supported_formats: &'a [SupportedFormat],
158}
159
160pub trait PushAvStreamHooks {
168 fn on_allocate(
174 &self,
175 connection_id: u16,
176 fabric_index: FabricIndex,
177 request: &AllocatePushTransportRequest<'_>,
178 ) -> impl core::future::Future<Output = Result<(), PushAvError>>;
179
180 fn on_deallocate(
185 &self,
186 _connection_id: u16,
187 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
188 async { Ok(()) }
189 }
190
191 fn on_modify(
195 &self,
196 _connection_id: u16,
197 _request: &ModifyPushTransportRequest<'_>,
198 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
199 async { Ok(()) }
200 }
201
202 fn on_set_status(
206 &self,
207 _connection_id: Option<u16>,
208 _status: TransportStatusEnum,
209 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
210 async { Ok(()) }
211 }
212
213 fn on_manually_trigger(
218 &self,
219 _connection_id: u16,
220 _activation_reason: TriggerActivationReasonEnum,
221 _time_control: Option<TransportMotionTriggerTimeControlStruct<'_>>,
222 _user_defined: Option<&[u8]>,
223 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
224 async { Ok(()) }
225 }
226}
227
228impl<T> PushAvStreamHooks for &T
229where
230 T: PushAvStreamHooks,
231{
232 fn on_allocate(
233 &self,
234 connection_id: u16,
235 fabric_index: FabricIndex,
236 request: &AllocatePushTransportRequest<'_>,
237 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
238 (*self).on_allocate(connection_id, fabric_index, request)
239 }
240
241 fn on_deallocate(
242 &self,
243 connection_id: u16,
244 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
245 (*self).on_deallocate(connection_id)
246 }
247
248 fn on_modify(
249 &self,
250 connection_id: u16,
251 request: &ModifyPushTransportRequest<'_>,
252 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
253 (*self).on_modify(connection_id, request)
254 }
255
256 fn on_set_status(
257 &self,
258 connection_id: Option<u16>,
259 status: TransportStatusEnum,
260 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
261 (*self).on_set_status(connection_id, status)
262 }
263
264 fn on_manually_trigger(
265 &self,
266 connection_id: u16,
267 activation_reason: TriggerActivationReasonEnum,
268 time_control: Option<TransportMotionTriggerTimeControlStruct<'_>>,
269 user_defined: Option<&[u8]>,
270 ) -> impl core::future::Future<Output = Result<(), PushAvError>> {
271 (*self).on_manually_trigger(connection_id, activation_reason, time_control, user_defined)
272 }
273}
274
275struct State<const NC: usize> {
276 connections: Vec<PushConnection, NC>,
277}
278
279impl<const NC: usize> State<NC> {
280 const fn new() -> Self {
281 Self {
282 connections: Vec::new(),
283 }
284 }
285}
286
287pub struct PushAvStreamHandler<'a, H, const NC: usize>
289where
290 H: PushAvStreamHooks,
291{
292 dataver: Dataver,
293 endpoint_id: EndptId,
294 config: PushAvStreamConfig<'a>,
295 hooks: H,
296 state: Mutex<RefCell<State<NC>>>,
297 next_id: Mutex<Cell<u16>>,
298}
299
300impl<'a, H, const NC: usize> PushAvStreamHandler<'a, H, NC>
301where
302 H: PushAvStreamHooks,
303{
304 pub const CLUSTER: Cluster<'static> = decl::FULL_CLUSTER
309 .with_revision(2)
310 .with_attrs(with!(
311 required;
312 AttributeId::SupportedFormats | AttributeId::CurrentConnections
313 ))
314 .with_cmds(with!(
315 decl::CommandId::AllocatePushTransport
316 | decl::CommandId::DeallocatePushTransport
317 | decl::CommandId::ModifyPushTransport
318 | decl::CommandId::SetTransportStatus
319 | decl::CommandId::ManuallyTriggerTransport
320 | decl::CommandId::FindTransport
321 ));
322
323 pub const fn new(
325 dataver: Dataver,
326 endpoint_id: EndptId,
327 config: PushAvStreamConfig<'a>,
328 hooks: H,
329 ) -> Self {
330 Self {
331 dataver,
332 endpoint_id,
333 config,
334 hooks,
335 state: Mutex::new(RefCell::new(State::new())),
336 next_id: Mutex::new(Cell::new(1)),
337 }
338 }
339
340 pub const fn adapt(self) -> decl::HandlerAsyncAdaptor<Self> {
343 decl::HandlerAsyncAdaptor(self)
344 }
345
346 pub const fn endpoint_id(&self) -> EndptId {
348 self.endpoint_id
349 }
350
351 pub fn connections(&self) -> Vec<PushConnection, NC> {
353 self.state.lock(|cell| cell.borrow().connections.clone())
354 }
355
356 fn alloc_id(&self) -> u16 {
359 self.next_id.lock(|cell| {
360 let mut id = cell.get();
361 if id == 0 {
362 id = 1;
363 }
364 cell.set(id.wrapping_add(1).max(1));
365 id
366 })
367 }
368
369 fn capture_options(
372 &self,
373 options: &TransportOptionsStruct<'_>,
374 ) -> Result<Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>, Error> {
375 let mut buf = [0u8; MAX_TRANSPORT_OPTIONS_BYTES];
376 let mut wb = WriteBuf::new(&mut buf);
377 options.to_tlv(&TLVTag::Anonymous, &mut wb)?;
378 let bytes = wb.as_slice();
379 let mut stored: Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES> = Vec::new();
380 stored
381 .extend_from_slice(bytes)
382 .map_err(|_| Error::from(ErrorCode::ResourceExhausted))?;
383 Ok(stored)
384 }
385}
386
387impl<'a, H, const NC: usize> ClusterAsyncHandler for PushAvStreamHandler<'a, H, NC>
388where
389 H: PushAvStreamHooks,
390{
391 const CLUSTER: Cluster<'static> = Self::CLUSTER;
392
393 fn dataver(&self) -> u32 {
394 self.dataver.get()
395 }
396
397 fn dataver_changed(&self) {
398 self.dataver.changed();
399 }
400
401 async fn supported_formats<P: TLVBuilderParent>(
404 &self,
405 _ctx: impl ReadContext,
406 builder: ArrayAttributeRead<
407 SupportedFormatStructArrayBuilder<P>,
408 SupportedFormatStructBuilder<P>,
409 >,
410 ) -> Result<P, Error> {
411 match builder {
412 ArrayAttributeRead::ReadAll(mut b) => {
413 for f in self.config.supported_formats {
414 b = write_supported_format(b.push()?, f)?;
415 }
416 b.end()
417 }
418 ArrayAttributeRead::ReadOne(idx, b) => {
419 let Some(f) = self.config.supported_formats.get(idx as usize) else {
420 return Err(ErrorCode::ConstraintError.into());
421 };
422 write_supported_format(b, f)
423 }
424 ArrayAttributeRead::ReadNone(b) => b.end(),
425 }
426 }
427
428 async fn current_connections<P: TLVBuilderParent>(
429 &self,
430 ctx: impl ReadContext,
431 builder: ArrayAttributeRead<
432 TransportConfigurationStructArrayBuilder<P>,
433 TransportConfigurationStructBuilder<P>,
434 >,
435 ) -> Result<P, Error> {
436 let attr = ctx.attr();
437 let mut snapshot: Vec<PushConnection, NC> = Vec::new();
438 self.state.lock(|cell| {
439 for c in cell.borrow().connections.iter() {
440 if !attr.fab_filter || c.fabric_index == attr.fab_idx {
441 let _ = snapshot.push(c.clone());
442 }
443 }
444 });
445
446 match builder {
447 ArrayAttributeRead::ReadAll(mut b) => {
448 for c in snapshot.iter() {
449 b = write_connection(b.push()?, c)?;
450 }
451 b.end()
452 }
453 ArrayAttributeRead::ReadOne(idx, b) => {
454 let Some(c) = snapshot.get(idx as usize) else {
455 return Err(ErrorCode::ConstraintError.into());
456 };
457 write_connection(b, c)
458 }
459 ArrayAttributeRead::ReadNone(b) => b.end(),
460 }
461 }
462
463 async fn handle_allocate_push_transport<P: TLVBuilderParent>(
466 &self,
467 ctx: impl InvokeContext,
468 request: AllocatePushTransportRequest<'_>,
469 response: AllocatePushTransportResponseBuilder<P>,
470 ) -> Result<P, Error> {
471 let cmd = ctx.cmd();
472 let fab_idx = cmd.fab_idx;
473
474 let options = request.transport_options()?;
475
476 let url = options.url()?;
478 if url.is_empty() {
479 return Err(ErrorCode::ConstraintError.into());
480 }
481
482 let full = self
484 .state
485 .lock(|cell| cell.borrow().connections.len() >= NC);
486 if full {
487 return Err(ErrorCode::ResourceExhausted.into());
488 }
489
490 let stored_options = self.capture_options(&options)?;
493
494 let connection_id = self.alloc_id();
495 self.hooks
496 .on_allocate(connection_id, fab_idx, &request)
497 .await?;
498
499 let pushed = self.state.lock(|cell| {
500 let mut state = cell.borrow_mut();
501 state
502 .connections
503 .push(PushConnection {
504 connection_id,
505 fabric_index: fab_idx,
506 status: TransportStatusEnum::Inactive,
507 transport_options: stored_options,
508 })
509 .is_ok()
510 });
511 if !pushed {
512 let _ = self.hooks.on_deallocate(connection_id).await;
513 return Err(ErrorCode::ResourceExhausted.into());
514 }
515 ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
516
517 let snapshot = self
519 .state
520 .lock(|cell| cell.borrow().connections.last().cloned())
521 .ok_or(Error::from(ErrorCode::Failure))?;
522 let cfg = response.transport_configuration()?;
523 write_connection(cfg, &snapshot)?.end()
524 }
525
526 async fn handle_deallocate_push_transport(
527 &self,
528 ctx: impl InvokeContext,
529 request: DeallocatePushTransportRequest<'_>,
530 ) -> Result<(), Error> {
531 let cmd = ctx.cmd();
532 let fab_idx = cmd.fab_idx;
533 let connection_id = request.connection_id()?;
534
535 let exists = self.state.lock(|cell| {
536 cell.borrow()
537 .connections
538 .iter()
539 .any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
540 });
541 if !exists {
542 return Err(ErrorCode::NotFound.into());
543 }
544
545 self.hooks.on_deallocate(connection_id).await?;
546
547 self.state.lock(|cell| {
548 let mut state = cell.borrow_mut();
549 state
550 .connections
551 .retain(|c| !(c.connection_id == connection_id && c.fabric_index == fab_idx));
552 });
553 ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
554 Ok(())
555 }
556
557 async fn handle_modify_push_transport(
558 &self,
559 ctx: impl InvokeContext,
560 request: ModifyPushTransportRequest<'_>,
561 ) -> Result<(), Error> {
562 let cmd = ctx.cmd();
563 let fab_idx = cmd.fab_idx;
564 let connection_id = request.connection_id()?;
565 let options = request.transport_options()?;
566
567 let url = options.url()?;
568 if url.is_empty() {
569 return Err(ErrorCode::ConstraintError.into());
570 }
571
572 let exists = self.state.lock(|cell| {
573 cell.borrow()
574 .connections
575 .iter()
576 .any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
577 });
578 if !exists {
579 return Err(ErrorCode::NotFound.into());
580 }
581
582 let stored_options = self.capture_options(&options)?;
583
584 self.hooks.on_modify(connection_id, &request).await?;
585
586 self.state.lock(|cell| {
587 let mut state = cell.borrow_mut();
588 if let Some(row) = state
589 .connections
590 .iter_mut()
591 .find(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
592 {
593 row.transport_options = stored_options;
594 }
595 });
596 ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
597 Ok(())
598 }
599
600 async fn handle_set_transport_status(
601 &self,
602 ctx: impl InvokeContext,
603 request: SetTransportStatusRequest<'_>,
604 ) -> Result<(), Error> {
605 let cmd = ctx.cmd();
606 let fab_idx = cmd.fab_idx;
607 let connection_id = request.connection_id()?.into_option();
608 let status = request.transport_status()?;
609
610 if let Some(id) = connection_id {
612 let owned = self.state.lock(|cell| {
613 cell.borrow()
614 .connections
615 .iter()
616 .any(|c| c.connection_id == id && c.fabric_index == fab_idx)
617 });
618 if !owned {
619 return Err(ErrorCode::NotFound.into());
620 }
621 }
622
623 self.hooks.on_set_status(connection_id, status).await?;
624
625 self.state.lock(|cell| {
626 let mut state = cell.borrow_mut();
627 for c in state.connections.iter_mut() {
628 if c.fabric_index != fab_idx {
629 continue;
630 }
631 match connection_id {
632 Some(id) if c.connection_id != id => continue,
633 _ => {}
634 }
635 c.status = status;
636 }
637 });
638 ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
639 Ok(())
640 }
641
642 async fn handle_manually_trigger_transport(
643 &self,
644 ctx: impl InvokeContext,
645 request: ManuallyTriggerTransportRequest<'_>,
646 ) -> Result<(), Error> {
647 let cmd = ctx.cmd();
648 let fab_idx = cmd.fab_idx;
649 let connection_id = request.connection_id()?;
650 let reason = request.activation_reason()?;
651 let time_control = request.time_control()?;
652 let user_defined = request.user_defined()?;
653
654 let owned = self.state.lock(|cell| {
655 cell.borrow()
656 .connections
657 .iter()
658 .any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
659 });
660 if !owned {
661 return Err(ErrorCode::NotFound.into());
662 }
663
664 self.hooks
665 .on_manually_trigger(
666 connection_id,
667 reason,
668 time_control,
669 user_defined.map(|s| s.0),
670 )
671 .await?;
672 Ok(())
673 }
674
675 async fn handle_find_transport<P: TLVBuilderParent>(
676 &self,
677 ctx: impl InvokeContext,
678 request: FindTransportRequest<'_>,
679 response: FindTransportResponseBuilder<P>,
680 ) -> Result<P, Error> {
681 let cmd = ctx.cmd();
682 let fab_idx = cmd.fab_idx;
683 let connection_id = request.connection_id()?.into_option();
684
685 let mut snapshot: Vec<PushConnection, NC> = Vec::new();
687 self.state.lock(|cell| {
688 for c in cell.borrow().connections.iter() {
689 if c.fabric_index != fab_idx {
690 continue;
691 }
692 if let Some(id) = connection_id {
693 if c.connection_id != id {
694 continue;
695 }
696 }
697 let _ = snapshot.push(c.clone());
698 }
699 });
700
701 if snapshot.is_empty() {
703 return Err(ErrorCode::NotFound.into());
704 }
705
706 let mut arr = response.transport_configurations()?;
707 for c in snapshot.iter() {
708 arr = write_connection(arr.push()?, c)?;
709 }
710 arr.end()?.end()
711 }
712}
713
714fn write_supported_format<P: TLVBuilderParent>(
715 builder: SupportedFormatStructBuilder<P>,
716 f: &SupportedFormat,
717) -> Result<P, Error> {
718 builder
719 .container_format(f.container_format)?
720 .ingest_method(f.ingest_method)?
721 .end()
722}
723
724fn write_connection<P: TLVBuilderParent>(
735 builder: TransportConfigurationStructBuilder<P>,
736 c: &PushConnection,
737) -> Result<P, Error> {
738 let b = builder
739 .connection_id(c.connection_id)?
740 .transport_status(c.status)?;
741 let mut b = b.transport_options()?.none();
745 if !c.transport_options.is_empty() {
746 let element = TLVElement::new(c.transport_options.as_slice());
747 element.to_tlv(&TLVTag::Context(2), b.writer())?;
748 }
749 b.fabric_index(Some(c.fabric_index))?.end()
750}