use core::future::poll_fn;
use core::marker::PhantomData;
use core::task::{Context, Poll};
use emcyphal_encoding::{Deserialize, Serialize};
use crate::buffer::{self, BufferError};
use crate::core::{NodeId, Priority, PrioritySet, ServiceId, SubjectId, TransferId};
use crate::format::TransferCrc;
use crate::marker::{Message, Request, Response};
use crate::node::Hub;
use crate::registry::{self, RxRegKind, TxRegKind};
use crate::time::{Duration, Instant};
pub use crate::registry::RegistrationError;
pub use emcyphal_encoding::DeserializeError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct TransferMeta {
pub priority: Priority,
pub address: Option<NodeId>,
pub transfer_id: TransferId,
pub timestamp: Instant,
pub loop_back: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Transfer<T> {
pub meta: TransferMeta,
pub payload: T,
}
#[allow(private_bounds)]
pub trait RxEndpointKind: RxRegKind {}
impl RxEndpointKind for Message {}
impl RxEndpointKind for Request {}
#[allow(private_bounds)]
pub trait TxEndpointKind: TxRegKind {}
impl TxEndpointKind for Message {}
impl TxEndpointKind for Response {}
pub struct Rx<'a, K: RxEndpointKind, T> {
reg: &'a (dyn registry::RxReg<K> + Sync),
reg_token: K::Token,
buf_token: buffer::BufferToken<'a>,
_phantom: PhantomData<T>,
}
impl<'a, T: Deserialize> Rx<'a, Message, T> {
pub fn create_message(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::rx_msg::Buffer<T>,
subject: SubjectId,
timeout: Duration,
) -> Result<Self, RegistrationError> {
let reg = hub.rx_msg();
let (entry_ptr, buf_token) = unsafe { buffer.init(subject, timeout) };
let res = reg.register(entry_ptr, false);
if res.is_err() {
unsafe { entry_ptr.drop_in_place() };
}
Ok(Self {
reg,
reg_token: res?,
buf_token,
_phantom: PhantomData,
})
}
pub fn create_message_loop_back(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::rx_msg::Buffer<T>,
subject: SubjectId,
timeout: Duration,
) -> Result<Self, RegistrationError> {
let reg = hub.rx_msg();
let (entry_ptr, buf_token) = unsafe { buffer.init(subject, timeout) };
let res = reg.register(entry_ptr, true);
if res.is_err() {
unsafe { entry_ptr.drop_in_place() };
}
Ok(Self {
reg,
reg_token: res?,
buf_token,
_phantom: PhantomData,
})
}
}
impl<'a, T: Deserialize> Rx<'a, Request, T> {
pub fn create_request(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::rx_req::Buffer<T>,
service: ServiceId,
timeout: Duration,
) -> Result<Self, RegistrationError> {
let reg = hub.rx_req();
let (entry_ptr, buf_token) = unsafe { buffer.init(service, timeout) };
let res = reg.register(entry_ptr, false);
if res.is_err() {
unsafe { entry_ptr.drop_in_place() };
}
Ok(Self {
reg,
reg_token: res?,
buf_token,
_phantom: PhantomData,
})
}
}
impl<'a, K: RxEndpointKind, T: Deserialize> Rx<'a, K, T> {
pub fn pop_readiness(&mut self) -> PrioritySet {
self.reg.pop_readiness(&mut self.reg_token)
}
fn poll_pop_readiness(
&mut self,
cx: &mut Context<'_>,
priority_mask: PrioritySet,
) -> Poll<PrioritySet> {
self.reg
.poll_pop_readiness(&mut self.reg_token, cx, priority_mask)
}
pub async fn wait_pop_readiness(&mut self, priority_mask: PrioritySet) -> PrioritySet {
poll_fn(|cx| self.poll_pop_readiness(cx, priority_mask)).await
}
pub fn try_pop(
&mut self,
priority_mask: PrioritySet,
) -> Option<Result<Transfer<T>, DeserializeError>> {
let (meta, buffer) = self.reg.try_pop(
&mut self.reg_token,
self.buf_token.reborrow(),
priority_mask,
)?;
Some(T::deserialize_from_bytes(buffer).map(|payload| Transfer { meta, payload }))
}
pub async fn pop(
&mut self,
priority_mask: PrioritySet,
) -> Result<Transfer<T>, DeserializeError> {
loop {
self.wait_pop_readiness(priority_mask).await;
if let Some(res) = self.try_pop(priority_mask) {
return res;
}
}
}
}
impl<'a, K: RxEndpointKind, T> Drop for Rx<'a, K, T> {
fn drop(&mut self) {
self.reg.unregister(&mut self.reg_token);
}
}
pub struct Tx<'a, K: TxEndpointKind, T> {
reg: &'a (dyn registry::TxReg<K> + Sync),
reg_token: K::Token,
buf_token: buffer::BufferToken<'a>,
_phantom: PhantomData<T>,
}
impl<'a, T: Serialize> Tx<'a, Message, T> {
pub fn create_message(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::tx_msg::Buffer<T>,
subject: SubjectId,
priority: Priority,
) -> Result<Self, RegistrationError> {
let reg = hub.tx_msg();
let (entry_ptr, buf_token) = unsafe { buffer.init(subject, priority) };
let res = reg.register(entry_ptr);
if res.is_err() {
unsafe { entry_ptr.drop_in_place() };
}
Ok(Self {
reg,
reg_token: res?,
buf_token,
_phantom: PhantomData,
})
}
}
impl<'a, T: Serialize> Tx<'a, Response, T> {
pub fn create_response(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::tx_resp::Buffer<T>,
service: ServiceId,
) -> Result<Self, RegistrationError> {
let reg = hub.tx_resp();
let (entry_ptr, buf_token) = unsafe { buffer.init(service) };
let res = reg.register(entry_ptr);
if res.is_err() {
unsafe { entry_ptr.drop_in_place() };
}
Ok(Self {
reg,
reg_token: res?,
buf_token,
_phantom: PhantomData,
})
}
}
impl<'a, K: TxEndpointKind, T: Serialize> Tx<'a, K, T> {
pub fn all_sent(&mut self) -> bool {
self.reg.is_empty(&mut self.reg_token)
}
fn poll_all_sent(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.reg.poll_is_empty(&mut self.reg_token, cx)
}
pub async fn wait_all_sent(&mut self) {
poll_fn(|cx| self.poll_all_sent(cx)).await
}
pub fn push_readiness(&mut self) -> PrioritySet {
self.reg.push_readiness(&mut self.reg_token)
}
fn poll_push_readiness(
&mut self,
cx: &mut Context<'_>,
priority_mask: PrioritySet,
) -> Poll<PrioritySet> {
self.reg
.poll_push_readiness(&mut self.reg_token, cx, priority_mask)
}
pub async fn wait_push_readiness(&mut self, priority_mask: PrioritySet) -> PrioritySet {
poll_fn(|cx| self.poll_push_readiness(cx, priority_mask)).await
}
pub fn try_push(&mut self, transfer: Transfer<T>) -> Result<(), BufferError> {
let (priorities, buffer) = self
.reg
.get_scratchpad(&mut self.reg_token, self.buf_token.reborrow())
.ok_or(BufferError::PriorityNotReady)?;
if !priorities.contains(transfer.meta.priority) {
return Err(BufferError::PriorityNotReady);
}
transfer.payload.serialize_to_bytes(buffer);
let length = transfer.payload.size_bits().div_ceil(8);
let mut crc: TransferCrc = Default::default();
crc.add_bytes(&buffer[..length]);
self.reg.try_push(
&mut self.reg_token,
self.buf_token.reborrow(),
transfer.meta,
length,
crc,
)
}
}
impl<'a, K: TxEndpointKind, T> Drop for Tx<'a, K, T> {
fn drop(&mut self) {
self.reg.unregister(&mut self.reg_token);
}
}