Skip to main content

taktora_executor/
signal_slot.rs

1//! `signal_slot::pair` — pre-built [`ExecutableItem`]s wrapping a [`Channel<T>`](crate::Channel).
2
3use crate::context::Context;
4use crate::control_flow::{ControlFlow, ExecuteResult};
5use crate::error::ExecutorError;
6use crate::executor::Executor;
7use crate::item::ExecutableItem;
8use crate::payload::Payload;
9use crate::trigger::TriggerDeclarer;
10use crate::{Publisher, Subscriber};
11
12/// Type alias for the optional before-send callback stored inside [`SignalItem`].
13type BeforeSendCb<T> = Option<Box<dyn FnMut(&mut T) -> bool + Send + 'static>>;
14
15/// Type alias for the optional after-receive callback stored inside [`SlotItem`].
16type AfterRecvCb<T> = Option<Box<dyn FnMut(&T) -> bool + Send + 'static>>;
17
18/// How many messages a slot consumes per `execute`.
19#[derive(Clone, Copy, Eq, PartialEq, Debug)]
20#[non_exhaustive]
21pub enum TakePolicy {
22    /// Take exactly one message; if none is available, return `StopChain`.
23    Single,
24    /// Take all currently buffered messages, calling `after_recv` for each.
25    All,
26}
27
28/// Open a fresh signal/slot pair backed by a `Channel<T>`.
29pub fn pair<T: Payload + Default + Copy + Send>(
30    exec: &mut Executor,
31    topic: &str,
32) -> Result<(SignalItem<T>, SlotItem<T>), ExecutorError> {
33    let ch = exec.channel::<T>(topic)?;
34    let publisher = ch.publisher()?;
35    let subscriber = ch.subscriber()?;
36    Ok((
37        SignalItem {
38            publisher,
39            before_send: None,
40            _marker: core::marker::PhantomData,
41        },
42        SlotItem {
43            subscriber,
44            policy: TakePolicy::Single,
45            after_recv: None,
46            _marker: core::marker::PhantomData,
47        },
48    ))
49}
50
51/// Signal half of a signal/slot pair: an [`ExecutableItem`] that, when fired,
52/// publishes a message on the underlying channel.
53pub struct SignalItem<T: Payload + Default + Copy + Send> {
54    publisher: Publisher<T>,
55    before_send: BeforeSendCb<T>,
56    _marker: core::marker::PhantomData<T>,
57}
58
59impl<T: Payload + Default + Copy + Send> SignalItem<T> {
60    /// Install a callback invoked just before each send. Returning `false`
61    /// skips the send and the `execute` call returns `StopChain`.
62    #[must_use]
63    pub fn before_send<F>(mut self, f: F) -> Self
64    where
65        F: FnMut(&mut T) -> bool + Send + 'static,
66    {
67        self.before_send = Some(Box::new(f));
68        self
69    }
70}
71
72impl<T: Payload + Default + Copy + Send> ExecutableItem for SignalItem<T> {
73    fn declare_triggers(&mut self, _d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
74        Ok(())
75    }
76
77    fn execute(&mut self, _ctx: &mut Context<'_>) -> ExecuteResult {
78        let outcome = if let Some(cb) = self.before_send.as_mut() {
79            self.publisher
80                .loan_send(|t: &mut T| (cb)(t))
81                .map_err(|e| -> crate::error::ItemError { Box::new(e) })?
82        } else {
83            self.publisher
84                .loan_send(|_| true)
85                .map_err(|e| -> crate::error::ItemError { Box::new(e) })?
86        };
87        if outcome.sent {
88            Ok(ControlFlow::Continue)
89        } else {
90            Ok(ControlFlow::StopChain)
91        }
92    }
93}
94
95/// Slot half of a signal/slot pair: an [`ExecutableItem`] that, when its
96/// channel receives a message, runs the optional `after_recv` callback.
97pub struct SlotItem<T: Payload + Copy + Send> {
98    subscriber: Subscriber<T>,
99    policy: TakePolicy,
100    after_recv: AfterRecvCb<T>,
101    _marker: core::marker::PhantomData<T>,
102}
103
104impl<T: Payload + Copy + Send> SlotItem<T> {
105    /// Override the default [`TakePolicy::Single`].
106    #[must_use]
107    pub const fn take_policy(mut self, p: TakePolicy) -> Self {
108        self.policy = p;
109        self
110    }
111
112    /// Install a callback invoked for each received message. Returning `false`
113    /// stops the chain (returns `StopChain`).
114    #[must_use]
115    pub fn after_recv<F>(mut self, f: F) -> Self
116    where
117        F: FnMut(&T) -> bool + Send + 'static,
118    {
119        self.after_recv = Some(Box::new(f));
120        self
121    }
122
123    /// Construct a slot from an existing subscriber rather than a fresh channel.
124    #[must_use]
125    pub fn from_subscriber(subscriber: Subscriber<T>) -> Self {
126        Self {
127            subscriber,
128            policy: TakePolicy::Single,
129            after_recv: None,
130            _marker: core::marker::PhantomData,
131        }
132    }
133}
134
135impl<T: Payload + Copy + Send> ExecutableItem for SlotItem<T> {
136    fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
137        d.subscriber(&self.subscriber);
138        Ok(())
139    }
140
141    fn execute(&mut self, _ctx: &mut Context<'_>) -> ExecuteResult {
142        let mut delivered_any = false;
143        while let Some(sample) = self
144            .subscriber
145            .take()
146            .map_err(|e| -> crate::error::ItemError { Box::new(e) })?
147        {
148            delivered_any = true;
149            if let Some(cb) = self.after_recv.as_mut() {
150                if !(cb)(sample.payload()) {
151                    return Ok(ControlFlow::StopChain);
152                }
153            }
154            if matches!(self.policy, TakePolicy::Single) {
155                break;
156            }
157        }
158        if delivered_any {
159            Ok(ControlFlow::Continue)
160        } else {
161            Ok(ControlFlow::StopChain)
162        }
163    }
164}