use core::cell::{Cell, RefCell};
use crate::dm::FabricIndex;
use crate::dm::{ArrayAttributeRead, Cluster, Dataver, EndptId, InvokeContext, ReadContext};
use crate::error::{Error, ErrorCode};
use crate::tlv::{TLVBuilderParent, TLVElement, TLVTag, ToTLV};
use crate::utils::storage::{Vec, WriteBuf};
use crate::utils::sync::blocking::Mutex;
use crate::with;
#[allow(unused_imports)]
pub use crate::dm::clusters::decl::push_av_stream_transport::*;
use super::super::decl::push_av_stream_transport as decl;
pub const MAX_TRANSPORT_OPTIONS_BYTES: usize = 768;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum PushAvError {
Failure,
NotFound,
ResourceExhausted,
DynamicConstraint,
InvalidInState,
}
impl From<PushAvError> for Error {
fn from(e: PushAvError) -> Self {
match e {
PushAvError::Failure => ErrorCode::Failure.into(),
PushAvError::NotFound => ErrorCode::NotFound.into(),
PushAvError::ResourceExhausted => ErrorCode::ResourceExhausted.into(),
PushAvError::DynamicConstraint => ErrorCode::ConstraintError.into(),
PushAvError::InvalidInState => ErrorCode::InvalidAction.into(),
}
}
}
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct SupportedFormat {
pub container_format: ContainerFormatEnum,
pub ingest_method: IngestMethodsEnum,
}
#[derive(Debug, Clone)]
pub struct PushConnection {
pub connection_id: u16,
pub fabric_index: FabricIndex,
pub status: TransportStatusEnum,
pub transport_options: Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>,
}
#[derive(Debug, Clone, Copy)]
pub struct PushAvStreamConfig<'a> {
pub supported_formats: &'a [SupportedFormat],
}
pub trait PushAvStreamHooks {
fn on_allocate(
&self,
connection_id: u16,
fabric_index: FabricIndex,
request: &AllocatePushTransportRequest<'_>,
) -> impl core::future::Future<Output = Result<(), PushAvError>>;
fn on_deallocate(
&self,
_connection_id: u16,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
async { Ok(()) }
}
fn on_modify(
&self,
_connection_id: u16,
_request: &ModifyPushTransportRequest<'_>,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
async { Ok(()) }
}
fn on_set_status(
&self,
_connection_id: Option<u16>,
_status: TransportStatusEnum,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
async { Ok(()) }
}
fn on_manually_trigger(
&self,
_connection_id: u16,
_activation_reason: TriggerActivationReasonEnum,
_time_control: Option<TransportMotionTriggerTimeControlStruct<'_>>,
_user_defined: Option<&[u8]>,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
async { Ok(()) }
}
}
impl<T> PushAvStreamHooks for &T
where
T: PushAvStreamHooks,
{
fn on_allocate(
&self,
connection_id: u16,
fabric_index: FabricIndex,
request: &AllocatePushTransportRequest<'_>,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
(*self).on_allocate(connection_id, fabric_index, request)
}
fn on_deallocate(
&self,
connection_id: u16,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
(*self).on_deallocate(connection_id)
}
fn on_modify(
&self,
connection_id: u16,
request: &ModifyPushTransportRequest<'_>,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
(*self).on_modify(connection_id, request)
}
fn on_set_status(
&self,
connection_id: Option<u16>,
status: TransportStatusEnum,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
(*self).on_set_status(connection_id, status)
}
fn on_manually_trigger(
&self,
connection_id: u16,
activation_reason: TriggerActivationReasonEnum,
time_control: Option<TransportMotionTriggerTimeControlStruct<'_>>,
user_defined: Option<&[u8]>,
) -> impl core::future::Future<Output = Result<(), PushAvError>> {
(*self).on_manually_trigger(connection_id, activation_reason, time_control, user_defined)
}
}
struct State<const NC: usize> {
connections: Vec<PushConnection, NC>,
}
impl<const NC: usize> State<NC> {
const fn new() -> Self {
Self {
connections: Vec::new(),
}
}
}
pub struct PushAvStreamHandler<'a, H, const NC: usize>
where
H: PushAvStreamHooks,
{
dataver: Dataver,
endpoint_id: EndptId,
config: PushAvStreamConfig<'a>,
hooks: H,
state: Mutex<RefCell<State<NC>>>,
next_id: Mutex<Cell<u16>>,
}
impl<'a, H, const NC: usize> PushAvStreamHandler<'a, H, NC>
where
H: PushAvStreamHooks,
{
pub const CLUSTER: Cluster<'static> = decl::FULL_CLUSTER
.with_revision(2)
.with_attrs(with!(
required;
AttributeId::SupportedFormats | AttributeId::CurrentConnections
))
.with_cmds(with!(
decl::CommandId::AllocatePushTransport
| decl::CommandId::DeallocatePushTransport
| decl::CommandId::ModifyPushTransport
| decl::CommandId::SetTransportStatus
| decl::CommandId::ManuallyTriggerTransport
| decl::CommandId::FindTransport
));
pub const fn new(
dataver: Dataver,
endpoint_id: EndptId,
config: PushAvStreamConfig<'a>,
hooks: H,
) -> Self {
Self {
dataver,
endpoint_id,
config,
hooks,
state: Mutex::new(RefCell::new(State::new())),
next_id: Mutex::new(Cell::new(1)),
}
}
pub const fn adapt(self) -> decl::HandlerAsyncAdaptor<Self> {
decl::HandlerAsyncAdaptor(self)
}
pub const fn endpoint_id(&self) -> EndptId {
self.endpoint_id
}
pub fn connections(&self) -> Vec<PushConnection, NC> {
self.state.lock(|cell| cell.borrow().connections.clone())
}
fn alloc_id(&self) -> u16 {
self.next_id.lock(|cell| {
let mut id = cell.get();
if id == 0 {
id = 1;
}
cell.set(id.wrapping_add(1).max(1));
id
})
}
fn capture_options(
&self,
options: &TransportOptionsStruct<'_>,
) -> Result<Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES>, Error> {
let mut buf = [0u8; MAX_TRANSPORT_OPTIONS_BYTES];
let mut wb = WriteBuf::new(&mut buf);
options.to_tlv(&TLVTag::Anonymous, &mut wb)?;
let bytes = wb.as_slice();
let mut stored: Vec<u8, MAX_TRANSPORT_OPTIONS_BYTES> = Vec::new();
stored
.extend_from_slice(bytes)
.map_err(|_| Error::from(ErrorCode::ResourceExhausted))?;
Ok(stored)
}
}
impl<'a, H, const NC: usize> ClusterAsyncHandler for PushAvStreamHandler<'a, H, NC>
where
H: PushAvStreamHooks,
{
const CLUSTER: Cluster<'static> = Self::CLUSTER;
fn dataver(&self) -> u32 {
self.dataver.get()
}
fn dataver_changed(&self) {
self.dataver.changed();
}
async fn supported_formats<P: TLVBuilderParent>(
&self,
_ctx: impl ReadContext,
builder: ArrayAttributeRead<
SupportedFormatStructArrayBuilder<P>,
SupportedFormatStructBuilder<P>,
>,
) -> Result<P, Error> {
match builder {
ArrayAttributeRead::ReadAll(mut b) => {
for f in self.config.supported_formats {
b = write_supported_format(b.push()?, f)?;
}
b.end()
}
ArrayAttributeRead::ReadOne(idx, b) => {
let Some(f) = self.config.supported_formats.get(idx as usize) else {
return Err(ErrorCode::ConstraintError.into());
};
write_supported_format(b, f)
}
ArrayAttributeRead::ReadNone(b) => b.end(),
}
}
async fn current_connections<P: TLVBuilderParent>(
&self,
ctx: impl ReadContext,
builder: ArrayAttributeRead<
TransportConfigurationStructArrayBuilder<P>,
TransportConfigurationStructBuilder<P>,
>,
) -> Result<P, Error> {
let attr = ctx.attr();
let mut snapshot: Vec<PushConnection, NC> = Vec::new();
self.state.lock(|cell| {
for c in cell.borrow().connections.iter() {
if !attr.fab_filter || c.fabric_index == attr.fab_idx {
let _ = snapshot.push(c.clone());
}
}
});
match builder {
ArrayAttributeRead::ReadAll(mut b) => {
for c in snapshot.iter() {
b = write_connection(b.push()?, c)?;
}
b.end()
}
ArrayAttributeRead::ReadOne(idx, b) => {
let Some(c) = snapshot.get(idx as usize) else {
return Err(ErrorCode::ConstraintError.into());
};
write_connection(b, c)
}
ArrayAttributeRead::ReadNone(b) => b.end(),
}
}
async fn handle_allocate_push_transport<P: TLVBuilderParent>(
&self,
ctx: impl InvokeContext,
request: AllocatePushTransportRequest<'_>,
response: AllocatePushTransportResponseBuilder<P>,
) -> Result<P, Error> {
let cmd = ctx.cmd();
let fab_idx = cmd.fab_idx;
let options = request.transport_options()?;
let url = options.url()?;
if url.is_empty() {
return Err(ErrorCode::ConstraintError.into());
}
let full = self
.state
.lock(|cell| cell.borrow().connections.len() >= NC);
if full {
return Err(ErrorCode::ResourceExhausted.into());
}
let stored_options = self.capture_options(&options)?;
let connection_id = self.alloc_id();
self.hooks
.on_allocate(connection_id, fab_idx, &request)
.await?;
let pushed = self.state.lock(|cell| {
let mut state = cell.borrow_mut();
state
.connections
.push(PushConnection {
connection_id,
fabric_index: fab_idx,
status: TransportStatusEnum::Inactive,
transport_options: stored_options,
})
.is_ok()
});
if !pushed {
let _ = self.hooks.on_deallocate(connection_id).await;
return Err(ErrorCode::ResourceExhausted.into());
}
ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
let snapshot = self
.state
.lock(|cell| cell.borrow().connections.last().cloned())
.ok_or(Error::from(ErrorCode::Failure))?;
let cfg = response.transport_configuration()?;
write_connection(cfg, &snapshot)?.end()
}
async fn handle_deallocate_push_transport(
&self,
ctx: impl InvokeContext,
request: DeallocatePushTransportRequest<'_>,
) -> Result<(), Error> {
let cmd = ctx.cmd();
let fab_idx = cmd.fab_idx;
let connection_id = request.connection_id()?;
let exists = self.state.lock(|cell| {
cell.borrow()
.connections
.iter()
.any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
});
if !exists {
return Err(ErrorCode::NotFound.into());
}
self.hooks.on_deallocate(connection_id).await?;
self.state.lock(|cell| {
let mut state = cell.borrow_mut();
state
.connections
.retain(|c| !(c.connection_id == connection_id && c.fabric_index == fab_idx));
});
ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
Ok(())
}
async fn handle_modify_push_transport(
&self,
ctx: impl InvokeContext,
request: ModifyPushTransportRequest<'_>,
) -> Result<(), Error> {
let cmd = ctx.cmd();
let fab_idx = cmd.fab_idx;
let connection_id = request.connection_id()?;
let options = request.transport_options()?;
let url = options.url()?;
if url.is_empty() {
return Err(ErrorCode::ConstraintError.into());
}
let exists = self.state.lock(|cell| {
cell.borrow()
.connections
.iter()
.any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
});
if !exists {
return Err(ErrorCode::NotFound.into());
}
let stored_options = self.capture_options(&options)?;
self.hooks.on_modify(connection_id, &request).await?;
self.state.lock(|cell| {
let mut state = cell.borrow_mut();
if let Some(row) = state
.connections
.iter_mut()
.find(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
{
row.transport_options = stored_options;
}
});
ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
Ok(())
}
async fn handle_set_transport_status(
&self,
ctx: impl InvokeContext,
request: SetTransportStatusRequest<'_>,
) -> Result<(), Error> {
let cmd = ctx.cmd();
let fab_idx = cmd.fab_idx;
let connection_id = request.connection_id()?.into_option();
let status = request.transport_status()?;
if let Some(id) = connection_id {
let owned = self.state.lock(|cell| {
cell.borrow()
.connections
.iter()
.any(|c| c.connection_id == id && c.fabric_index == fab_idx)
});
if !owned {
return Err(ErrorCode::NotFound.into());
}
}
self.hooks.on_set_status(connection_id, status).await?;
self.state.lock(|cell| {
let mut state = cell.borrow_mut();
for c in state.connections.iter_mut() {
if c.fabric_index != fab_idx {
continue;
}
match connection_id {
Some(id) if c.connection_id != id => continue,
_ => {}
}
c.status = status;
}
});
ctx.notify_own_attr_changed(AttributeId::CurrentConnections as _);
Ok(())
}
async fn handle_manually_trigger_transport(
&self,
ctx: impl InvokeContext,
request: ManuallyTriggerTransportRequest<'_>,
) -> Result<(), Error> {
let cmd = ctx.cmd();
let fab_idx = cmd.fab_idx;
let connection_id = request.connection_id()?;
let reason = request.activation_reason()?;
let time_control = request.time_control()?;
let user_defined = request.user_defined()?;
let owned = self.state.lock(|cell| {
cell.borrow()
.connections
.iter()
.any(|c| c.connection_id == connection_id && c.fabric_index == fab_idx)
});
if !owned {
return Err(ErrorCode::NotFound.into());
}
self.hooks
.on_manually_trigger(
connection_id,
reason,
time_control,
user_defined.map(|s| s.0),
)
.await?;
Ok(())
}
async fn handle_find_transport<P: TLVBuilderParent>(
&self,
ctx: impl InvokeContext,
request: FindTransportRequest<'_>,
response: FindTransportResponseBuilder<P>,
) -> Result<P, Error> {
let cmd = ctx.cmd();
let fab_idx = cmd.fab_idx;
let connection_id = request.connection_id()?.into_option();
let mut snapshot: Vec<PushConnection, NC> = Vec::new();
self.state.lock(|cell| {
for c in cell.borrow().connections.iter() {
if c.fabric_index != fab_idx {
continue;
}
if let Some(id) = connection_id {
if c.connection_id != id {
continue;
}
}
let _ = snapshot.push(c.clone());
}
});
if snapshot.is_empty() {
return Err(ErrorCode::NotFound.into());
}
let mut arr = response.transport_configurations()?;
for c in snapshot.iter() {
arr = write_connection(arr.push()?, c)?;
}
arr.end()?.end()
}
}
fn write_supported_format<P: TLVBuilderParent>(
builder: SupportedFormatStructBuilder<P>,
f: &SupportedFormat,
) -> Result<P, Error> {
builder
.container_format(f.container_format)?
.ingest_method(f.ingest_method)?
.end()
}
fn write_connection<P: TLVBuilderParent>(
builder: TransportConfigurationStructBuilder<P>,
c: &PushConnection,
) -> Result<P, Error> {
let b = builder
.connection_id(c.connection_id)?
.transport_status(c.status)?;
let mut b = b.transport_options()?.none();
if !c.transport_options.is_empty() {
let element = TLVElement::new(c.transport_options.as_slice());
element.to_tlv(&TLVTag::Context(2), b.writer())?;
}
b.fabric_index(Some(c.fabric_index))?.end()
}