use crate::context::Context;
use crate::control_flow::{ControlFlow, ExecuteResult};
use crate::error::ExecutorError;
use crate::executor::Executor;
use crate::item::ExecutableItem;
use crate::payload::Payload;
use crate::trigger::TriggerDeclarer;
use crate::{Publisher, Subscriber};
type BeforeSendCb<T> = Option<Box<dyn FnMut(&mut T) -> bool + Send + 'static>>;
type AfterRecvCb<T> = Option<Box<dyn FnMut(&T) -> bool + Send + 'static>>;
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
#[non_exhaustive]
pub enum TakePolicy {
Single,
All,
}
pub fn pair<T: Payload + Default + Copy + Send>(
exec: &mut Executor,
topic: &str,
) -> Result<(SignalItem<T>, SlotItem<T>), ExecutorError> {
let ch = exec.channel::<T>(topic)?;
let publisher = ch.publisher()?;
let subscriber = ch.subscriber()?;
Ok((
SignalItem {
publisher,
before_send: None,
_marker: core::marker::PhantomData,
},
SlotItem {
subscriber,
policy: TakePolicy::Single,
after_recv: None,
_marker: core::marker::PhantomData,
},
))
}
pub struct SignalItem<T: Payload + Default + Copy + Send> {
publisher: Publisher<T>,
before_send: BeforeSendCb<T>,
_marker: core::marker::PhantomData<T>,
}
impl<T: Payload + Default + Copy + Send> SignalItem<T> {
#[must_use]
pub fn before_send<F>(mut self, f: F) -> Self
where
F: FnMut(&mut T) -> bool + Send + 'static,
{
self.before_send = Some(Box::new(f));
self
}
}
impl<T: Payload + Default + Copy + Send> ExecutableItem for SignalItem<T> {
fn declare_triggers(&mut self, _d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
Ok(())
}
fn execute(&mut self, _ctx: &mut Context<'_>) -> ExecuteResult {
let outcome = if let Some(cb) = self.before_send.as_mut() {
self.publisher
.loan_send(|t: &mut T| (cb)(t))
.map_err(|e| -> crate::error::ItemError { Box::new(e) })?
} else {
self.publisher
.loan_send(|_| true)
.map_err(|e| -> crate::error::ItemError { Box::new(e) })?
};
if outcome.sent {
Ok(ControlFlow::Continue)
} else {
Ok(ControlFlow::StopChain)
}
}
}
pub struct SlotItem<T: Payload + Copy + Send> {
subscriber: Subscriber<T>,
policy: TakePolicy,
after_recv: AfterRecvCb<T>,
_marker: core::marker::PhantomData<T>,
}
impl<T: Payload + Copy + Send> SlotItem<T> {
#[must_use]
pub const fn take_policy(mut self, p: TakePolicy) -> Self {
self.policy = p;
self
}
#[must_use]
pub fn after_recv<F>(mut self, f: F) -> Self
where
F: FnMut(&T) -> bool + Send + 'static,
{
self.after_recv = Some(Box::new(f));
self
}
#[must_use]
pub fn from_subscriber(subscriber: Subscriber<T>) -> Self {
Self {
subscriber,
policy: TakePolicy::Single,
after_recv: None,
_marker: core::marker::PhantomData,
}
}
}
impl<T: Payload + Copy + Send> ExecutableItem for SlotItem<T> {
fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
d.subscriber(&self.subscriber);
Ok(())
}
fn execute(&mut self, _ctx: &mut Context<'_>) -> ExecuteResult {
let mut delivered_any = false;
while let Some(sample) = self
.subscriber
.take()
.map_err(|e| -> crate::error::ItemError { Box::new(e) })?
{
delivered_any = true;
if let Some(cb) = self.after_recv.as_mut() {
if !(cb)(sample.payload()) {
return Ok(ControlFlow::StopChain);
}
}
if matches!(self.policy, TakePolicy::Single) {
break;
}
}
if delivered_any {
Ok(ControlFlow::Continue)
} else {
Ok(ControlFlow::StopChain)
}
}
}