use core::num::NonZeroU8;
use crate::bdx::{BdxBuffer, BdxUploadInitiator};
use crate::dm::{Cluster, Dataver, HandlerContext, InvokeContext};
use crate::error::{Error, ErrorCode};
use crate::tlv::{Octets, TLVBuilderParent};
use crate::transport::exchange::{Exchange, MAX_EXCHANGE_RX_BUF_SIZE};
use crate::utils::cell::RefCell;
use crate::utils::storage::pooled::Buffers;
use crate::utils::sync::blocking::Mutex;
use crate::utils::sync::{Notification, Signal};
use crate::with;
pub use crate::dm::clusters::decl::diagnostic_logs::*;
pub mod client;
pub const MAX_INLINE_LOG: usize = 1024;
const MAX_FILE_DESIGNATOR: usize = 32;
pub trait DiagLogs {
async fn size(&self, intent: IntentEnum) -> Option<u64>;
async fn read(&self, intent: IntentEnum, offset: u64, buf: &mut [u8]) -> Result<usize, Error>;
async fn retrieved(&self, _intent: IntentEnum) {}
}
impl<T> DiagLogs for &T
where
T: DiagLogs,
{
async fn size(&self, intent: IntentEnum) -> Option<u64> {
T::size(self, intent).await
}
async fn read(&self, intent: IntentEnum, offset: u64, buf: &mut [u8]) -> Result<usize, Error> {
T::read(self, intent, offset, buf).await
}
async fn retrieved(&self, intent: IntentEnum) {
T::retrieved(self, intent).await
}
}
struct Job {
fab_idx: NonZeroU8,
node_id: u64,
fd: heapless::String<MAX_FILE_DESIGNATOR>,
intent: IntentEnum,
}
enum Bdx {
Idle,
Requested(Job),
InProgress,
}
pub struct DiagLogsHandler<B, P> {
dataver: Dataver,
buffers: B,
logs: P,
bdx: Mutex<RefCell<Bdx>>,
job_posted: Notification,
handshake: Signal<Option<bool>>,
}
impl<B, P> DiagLogsHandler<B, P> {
pub const fn new(dataver: Dataver, buffers: B, logs: P) -> Self {
Self {
dataver,
buffers,
logs,
bdx: Mutex::new(RefCell::new(Bdx::Idle)),
job_posted: Notification::new(),
handshake: Signal::new(None),
}
}
pub const fn adapt(self) -> HandlerAsyncAdaptor<Self> {
HandlerAsyncAdaptor(self)
}
}
impl<B, P> DiagLogsHandler<B, P>
where
B: Buffers<BdxBuffer>,
P: DiagLogs,
{
async fn fill(&self, intent: IntentEnum, offset: u64, buf: &mut [u8]) -> Result<usize, Error> {
let mut filled = 0;
while filled < buf.len() {
let read_offset = offset
.checked_add(filled as u64)
.ok_or(ErrorCode::Invalid)?;
let n = self
.logs
.read(intent, read_offset, &mut buf[filled..])
.await?;
if n == 0 {
break;
}
if n > buf.len() - filled {
return Err(ErrorCode::Invalid.into());
}
filled += n;
}
Ok(filled)
}
async fn reply_inline<Q: TLVBuilderParent>(
&self,
intent: IntentEnum,
status: StatusEnum,
response: RetrieveLogsResponseBuilder<Q>,
) -> Result<Q, Error> {
let Some(mut buf) = self.buffers.get().await else {
return reply_status(response, StatusEnum::Busy);
};
unwrap!(buf.resize_default(MAX_INLINE_LOG));
let n = self.fill(intent, 0, buf.as_mut_slice()).await?;
let result = response
.status(status)?
.log_content(Octets(&buf[..n]))?
.utc_time_stamp(None)?
.time_since_boot(None)?
.end()?;
self.logs.retrieved(intent).await;
Ok(result)
}
async fn run_transfer(&self, ctx: &impl HandlerContext, job: &Job) -> Result<(), Error> {
let Some(mut buf) = self.buffers.get().await else {
self.handshake.signal(false);
return Err(ErrorCode::NoSpace.into());
};
unwrap!(buf.resize_default(MAX_EXCHANGE_RX_BUF_SIZE));
let exchange =
match Exchange::initiate(ctx.matter(), ctx.crypto(), job.fab_idx, job.node_id).await {
Ok(exchange) => exchange,
Err(e) => {
self.handshake.signal(false);
return Err(e);
}
};
let mut writer = match exchange
.upload(buf.as_mut_slice(), job.fd.as_bytes(), None)
.await
{
Ok(writer) => writer,
Err(e) => {
self.handshake.signal(false);
return Err(e);
}
};
self.handshake.signal(true);
let mut offset = 0;
loop {
let n = self.fill(job.intent, offset, writer.block_buf()).await?;
if n == 0 {
break;
}
writer.commit(n).await?;
offset += n as u64;
}
writer.finish().await
}
}
impl<B, P> ClusterAsyncHandler for DiagLogsHandler<B, P>
where
B: Buffers<BdxBuffer>,
P: DiagLogs,
{
const CLUSTER: Cluster<'static> = FULL_CLUSTER.with_attrs(with!(required));
fn dataver(&self) -> u32 {
self.dataver.get()
}
fn dataver_changed(&self) {
self.dataver.changed();
}
async fn handle_retrieve_logs_request<Q: TLVBuilderParent>(
&self,
ctx: impl InvokeContext,
request: RetrieveLogsRequestRequest<'_>,
response: RetrieveLogsResponseBuilder<Q>,
) -> Result<Q, Error> {
let intent = request.intent().map_err(|_| ErrorCode::InvalidCommand)?;
let protocol = request
.requested_protocol()
.map_err(|_| ErrorCode::InvalidCommand)?;
let file_designator = request.transfer_file_designator()?;
if matches!(protocol, TransferProtocolEnum::BDX) {
match file_designator {
None => return Err(ErrorCode::InvalidCommand.into()),
Some(fd) if fd.len() > MAX_FILE_DESIGNATOR => {
return Err(ErrorCode::ConstraintError.into());
}
_ => {}
}
}
let Some(size) = self.logs.size(intent).await else {
return reply_status(response, StatusEnum::NoLogs);
};
if !matches!(protocol, TransferProtocolEnum::BDX) || size <= MAX_INLINE_LOG as u64 {
let status = if matches!(protocol, TransferProtocolEnum::ResponsePayload) {
StatusEnum::Success
} else {
StatusEnum::Exhausted
};
return self.reply_inline(intent, status, response).await;
}
let fd = unwrap!(file_designator);
let fab_idx = NonZeroU8::new(ctx.cmd().fab_idx).ok_or(ErrorCode::Invalid)?;
let exchange = ctx.exchange();
let node_id = exchange
.with_state(|state| {
Ok(exchange
.id()
.session(&mut state.sessions)
.get_peer_node_id())
})?
.ok_or(ErrorCode::Invalid)?;
let mut fd_str = heapless::String::new();
unwrap!(fd_str.push_str(fd));
let job = Job {
fab_idx,
node_id,
fd: fd_str,
intent,
};
let claimed = self.bdx.lock(|cell| {
let mut state = cell.borrow_mut();
if matches!(&*state, Bdx::Idle) {
*state = Bdx::Requested(job);
true
} else {
false
}
});
if !claimed {
return reply_status(response, StatusEnum::Busy);
}
self.job_posted.notify();
let status = if self.handshake.wait_signalled().await {
StatusEnum::Success
} else {
StatusEnum::Denied
};
reply_status(response, status)
}
async fn run(&self, ctx: impl HandlerContext) -> Result<(), Error> {
loop {
self.job_posted.wait().await;
let job = self.bdx.lock(|cell| {
let mut state = cell.borrow_mut();
match core::mem::replace(&mut *state, Bdx::InProgress) {
Bdx::Requested(job) => Some(job),
other => {
*state = other;
None
}
}
});
let Some(job) = job else {
continue;
};
let result = self.run_transfer(&ctx, &job).await;
self.bdx.lock(|cell| *cell.borrow_mut() = Bdx::Idle);
match result {
Ok(()) => {
info!("Diagnostic logs transfer: Success");
self.logs.retrieved(job.intent).await;
}
Err(e) => warn!("Diagnostic logs transfer: StatusReport Error: {:?}", e),
}
}
}
}
fn reply_status<Q: TLVBuilderParent>(
response: RetrieveLogsResponseBuilder<Q>,
status: StatusEnum,
) -> Result<Q, Error> {
response
.status(status)?
.log_content(Octets(&[]))?
.utc_time_stamp(None)?
.time_since_boot(None)?
.end()
}