taktora_executor/
signal_slot.rs1use crate::context::Context;
4use crate::control_flow::{ControlFlow, ExecuteResult};
5use crate::error::ExecutorError;
6use crate::executor::Executor;
7use crate::item::ExecutableItem;
8use crate::payload::Payload;
9use crate::trigger::TriggerDeclarer;
10use crate::{Publisher, Subscriber};
11
12type BeforeSendCb<T> = Option<Box<dyn FnMut(&mut T) -> bool + Send + 'static>>;
14
15type AfterRecvCb<T> = Option<Box<dyn FnMut(&T) -> bool + Send + 'static>>;
17
18#[derive(Clone, Copy, Eq, PartialEq, Debug)]
20#[non_exhaustive]
21pub enum TakePolicy {
22 Single,
24 All,
26}
27
28pub fn pair<T: Payload + Default + Copy + Send>(
30 exec: &mut Executor,
31 topic: &str,
32) -> Result<(SignalItem<T>, SlotItem<T>), ExecutorError> {
33 let ch = exec.channel::<T>(topic)?;
34 let publisher = ch.publisher()?;
35 let subscriber = ch.subscriber()?;
36 Ok((
37 SignalItem {
38 publisher,
39 before_send: None,
40 _marker: core::marker::PhantomData,
41 },
42 SlotItem {
43 subscriber,
44 policy: TakePolicy::Single,
45 after_recv: None,
46 _marker: core::marker::PhantomData,
47 },
48 ))
49}
50
51pub struct SignalItem<T: Payload + Default + Copy + Send> {
54 publisher: Publisher<T>,
55 before_send: BeforeSendCb<T>,
56 _marker: core::marker::PhantomData<T>,
57}
58
59impl<T: Payload + Default + Copy + Send> SignalItem<T> {
60 #[must_use]
63 pub fn before_send<F>(mut self, f: F) -> Self
64 where
65 F: FnMut(&mut T) -> bool + Send + 'static,
66 {
67 self.before_send = Some(Box::new(f));
68 self
69 }
70}
71
72impl<T: Payload + Default + Copy + Send> ExecutableItem for SignalItem<T> {
73 fn declare_triggers(&mut self, _d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
74 Ok(())
75 }
76
77 fn execute(&mut self, _ctx: &mut Context<'_>) -> ExecuteResult {
78 let outcome = if let Some(cb) = self.before_send.as_mut() {
79 self.publisher
80 .loan_send(|t: &mut T| (cb)(t))
81 .map_err(|e| -> crate::error::ItemError { Box::new(e) })?
82 } else {
83 self.publisher
84 .loan_send(|_| true)
85 .map_err(|e| -> crate::error::ItemError { Box::new(e) })?
86 };
87 if outcome.sent {
88 Ok(ControlFlow::Continue)
89 } else {
90 Ok(ControlFlow::StopChain)
91 }
92 }
93}
94
95pub struct SlotItem<T: Payload + Copy + Send> {
98 subscriber: Subscriber<T>,
99 policy: TakePolicy,
100 after_recv: AfterRecvCb<T>,
101 _marker: core::marker::PhantomData<T>,
102}
103
104impl<T: Payload + Copy + Send> SlotItem<T> {
105 #[must_use]
107 pub const fn take_policy(mut self, p: TakePolicy) -> Self {
108 self.policy = p;
109 self
110 }
111
112 #[must_use]
115 pub fn after_recv<F>(mut self, f: F) -> Self
116 where
117 F: FnMut(&T) -> bool + Send + 'static,
118 {
119 self.after_recv = Some(Box::new(f));
120 self
121 }
122
123 #[must_use]
125 pub fn from_subscriber(subscriber: Subscriber<T>) -> Self {
126 Self {
127 subscriber,
128 policy: TakePolicy::Single,
129 after_recv: None,
130 _marker: core::marker::PhantomData,
131 }
132 }
133}
134
135impl<T: Payload + Copy + Send> ExecutableItem for SlotItem<T> {
136 fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
137 d.subscriber(&self.subscriber);
138 Ok(())
139 }
140
141 fn execute(&mut self, _ctx: &mut Context<'_>) -> ExecuteResult {
142 let mut delivered_any = false;
143 while let Some(sample) = self
144 .subscriber
145 .take()
146 .map_err(|e| -> crate::error::ItemError { Box::new(e) })?
147 {
148 delivered_any = true;
149 if let Some(cb) = self.after_recv.as_mut() {
150 if !(cb)(sample.payload()) {
151 return Ok(ControlFlow::StopChain);
152 }
153 }
154 if matches!(self.policy, TakePolicy::Single) {
155 break;
156 }
157 }
158 if delivered_any {
159 Ok(ControlFlow::Continue)
160 } else {
161 Ok(ControlFlow::StopChain)
162 }
163 }
164}