use embassy_futures::select::{Either, select};
use emcyphal_core::ServiceId;
use emcyphal_encoding::{Deserialize, Serialize};
use crate::buffer::{self, BufferError};
use crate::core::{Priority, PrioritySet, SubjectId, TransferId};
use crate::endpoint::{Rx, Transfer, TransferMeta, Tx};
use crate::marker::{Message, Request, Response};
use crate::node::Hub;
use crate::time::{Duration, Instant};
pub use crate::registry::RegistrationError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum MessageError {
NotReady,
}
pub struct Subscriber<'a, T> {
endpoint: Rx<'a, Message, T>,
}
impl<'a, T: Deserialize> Subscriber<'a, T> {
pub fn create(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::rx_msg::Buffer<T>,
subject: SubjectId,
timeout: Duration,
) -> Result<Self, RegistrationError> {
let endpoint = Rx::create_message(hub, buffer, subject, timeout)?;
Ok(Self { endpoint })
}
pub fn try_pop(&mut self) -> Option<T> {
loop {
let res = self.endpoint.try_pop(PrioritySet::ALL)?;
if let Ok(transfer) = res {
return Some(transfer.payload);
}
}
}
pub async fn pop(&mut self) -> T {
loop {
let res = self.endpoint.pop(PrioritySet::ALL).await;
if let Ok(transfer) = res {
return transfer.payload;
}
}
}
}
pub struct Publisher<'a, T> {
endpoint: Tx<'a, Message, T>,
priority: Priority,
timeout: Duration,
loop_back: bool,
transfer_id: TransferId,
}
impl<'a, T: Serialize> Publisher<'a, T> {
pub fn create(
hub: Hub<'a>,
buffer: &'a mut dyn buffer::tx_msg::Buffer<T>,
subject: SubjectId,
priority: Priority,
timeout: Duration,
loop_back: bool,
) -> Result<Self, RegistrationError> {
let endpoint = Tx::create_message(hub, buffer, subject, priority)?;
Ok(Self {
endpoint,
priority,
timeout,
loop_back,
transfer_id: TransferId::default(),
})
}
pub fn try_push(&mut self, data: T) -> Result<(), MessageError> {
let deadline = Instant::now().saturating_add(self.timeout);
let transfer = Transfer {
meta: TransferMeta {
priority: self.priority,
address: None,
transfer_id: self.transfer_id,
timestamp: deadline,
loop_back: self.loop_back,
},
payload: data,
};
match self.endpoint.try_push(transfer) {
Ok(()) => {
self.transfer_id = self.transfer_id.next();
Ok(())
}
Err(BufferError::PriorityNotReady) => Err(MessageError::NotReady),
_ => unreachable!(),
}
}
pub async fn push(&mut self, data: T) {
self.endpoint
.wait_push_readiness(PrioritySet::new_eq(self.priority))
.await;
unwrap!(self.try_push(data))
}
}
pub struct Responder<'a, Req, Resp> {
req: Rx<'a, Request, Req>,
resp: Tx<'a, Response, Resp>,
resp_timeout: Duration,
}
impl<'a, Req: Deserialize, Resp: Serialize> Responder<'a, Req, Resp> {
pub fn create(
hub: Hub<'a>,
req_buffer: &'a mut dyn buffer::rx_req::Buffer<Req>,
resp_buffer: &'a mut dyn buffer::tx_resp::Buffer<Resp>,
service_id: ServiceId,
req_timeout: Duration,
resp_timeout: Duration,
) -> Result<Self, RegistrationError> {
let req = Rx::create_request(hub, req_buffer, service_id, req_timeout)?;
let resp = Tx::create_response(hub, resp_buffer, service_id)?;
Ok(Self {
req,
resp,
resp_timeout,
})
}
pub async fn run(&mut self, mut handle: impl core::ops::AsyncFnMut(&Req) -> Resp) {
loop {
self.process_request(&mut handle).await;
}
}
async fn process_request(&mut self, handle: &mut impl core::ops::AsyncFnMut(&Req) -> Resp) {
let mut req_priorities = self.req.wait_pop_readiness(PrioritySet::ALL).await;
let mut resp_priorities = self.resp.wait_push_readiness(PrioritySet::ALL).await;
while (req_priorities & resp_priorities).is_empty() {
match select(
self.req.wait_pop_readiness(req_priorities),
self.resp.wait_push_readiness(resp_priorities),
)
.await
{
Either::First(priority) => req_priorities |= priority,
Either::Second(priority) => resp_priorities |= priority,
}
}
let req_transfer = match self.req.try_pop(resp_priorities) {
Some(Ok(transfer)) => transfer,
_ => return,
};
let resp_payload = handle(&req_transfer.payload).await;
let resp_transfer = Transfer {
meta: TransferMeta {
priority: req_transfer.meta.priority,
address: req_transfer.meta.address,
transfer_id: req_transfer.meta.transfer_id,
timestamp: req_transfer
.meta
.timestamp
.saturating_add(self.resp_timeout),
loop_back: false,
},
payload: resp_payload,
};
unwrap!(self.resp.try_push(resp_transfer));
}
}