taktora_executor/
trigger.rs1use crate::Subscriber;
6use crate::payload::Payload;
7use core::time::Duration;
8use iceoryx2::port::listener::Listener as IxListener;
9use iceoryx2::prelude::ipc;
10use std::sync::Arc;
11
12pub type RawListener = IxListener<ipc::Service>;
15
16#[allow(dead_code, clippy::redundant_pub_crate)]
18#[derive(Clone, Debug)]
19pub(crate) enum TriggerDecl {
20 Subscriber {
22 listener: Arc<RawListener>,
24 },
25 Interval(Duration),
27 Deadline {
33 listener: Arc<RawListener>,
35 deadline: Duration,
37 },
38 RawListener(Arc<RawListener>),
40}
41
42pub struct TriggerDeclarer<'a> {
44 _marker: core::marker::PhantomData<&'a mut ()>,
45 pub(crate) decls: Vec<TriggerDecl>,
46 pub(crate) budget: Option<Duration>,
49}
50
51impl TriggerDeclarer<'_> {
52 #[doc(hidden)]
54 #[allow(dead_code)]
55 pub(crate) const fn new_internal() -> Self {
56 Self {
57 _marker: core::marker::PhantomData,
58 decls: Vec::new(),
59 budget: None,
60 }
61 }
62
63 #[cfg(test)]
64 pub(crate) const fn new_test() -> Self {
65 Self::new_internal()
66 }
67
68 pub fn subscriber<T: Payload>(&mut self, sub: &Subscriber<T>) -> &mut Self {
70 self.decls.push(TriggerDecl::Subscriber {
71 listener: sub.listener_handle(),
72 });
73 self
74 }
75
76 pub fn interval(&mut self, period: impl Into<Duration>) -> &mut Self {
78 self.decls.push(TriggerDecl::Interval(period.into()));
79 self
80 }
81
82 pub fn deadline<T: Payload>(
85 &mut self,
86 sub: &Subscriber<T>,
87 deadline: impl Into<Duration>,
88 ) -> &mut Self {
89 self.decls.push(TriggerDecl::Deadline {
90 listener: sub.listener_handle(),
91 deadline: deadline.into(),
92 });
93 self
94 }
95
96 pub const fn budget(&mut self, dur: Duration) {
102 self.budget = Some(dur);
103 }
104
105 pub fn raw_listener(&mut self, listener: Arc<RawListener>) -> &mut Self {
107 self.decls.push(TriggerDecl::RawListener(listener));
108 self
109 }
110
111 pub fn server<Req, Resp>(&mut self, srv: &crate::service::Server<Req, Resp>) -> &mut Self
113 where
114 Req: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
115 Resp: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
116 {
117 self.decls.push(TriggerDecl::Subscriber {
118 listener: srv.listener_handle(),
119 });
120 self
121 }
122
123 pub fn client<Req, Resp>(&mut self, cl: &crate::service::Client<Req, Resp>) -> &mut Self
125 where
126 Req: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
127 Resp: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
128 {
129 self.decls.push(TriggerDecl::Subscriber {
130 listener: cl.listener_handle(),
131 });
132 self
133 }
134
135 #[doc(hidden)]
137 #[allow(dead_code)]
138 pub(crate) fn into_decls(self) -> Vec<TriggerDecl> {
139 self.decls
140 }
141
142 #[doc(hidden)]
145 #[allow(dead_code)]
146 pub(crate) fn is_empty(&self) -> bool {
147 self.decls.is_empty()
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::error::ExecutorError;
155 use iceoryx2::prelude::*;
156
157 #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
158 #[repr(C)]
159 struct Msg(u32);
160
161 fn make_subscriber(topic: &str) -> crate::Subscriber<Msg> {
162 let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
163 let ch = crate::Channel::<Msg>::open_or_create(&node, topic).unwrap();
164 ch.subscriber().unwrap()
165 }
166
167 #[test]
168 fn collects_subscriber_decl() {
169 let sub = make_subscriber("taktora.test.trig.sub");
170 let expected = sub.listener_handle();
171 let mut d = TriggerDeclarer::new_test();
172 d.subscriber(&sub);
173 assert_eq!(d.decls.len(), 1);
174 let TriggerDecl::Subscriber { listener } = &d.decls[0] else {
175 panic!("expected Subscriber variant");
176 };
177 assert!(std::sync::Arc::ptr_eq(listener, &expected));
178 }
179
180 #[test]
181 fn collects_interval_decl() {
182 let mut d = TriggerDeclarer::new_test();
183 d.interval(Duration::from_millis(100));
184 assert!(
185 matches!(d.decls[0], TriggerDecl::Interval(dur) if dur == Duration::from_millis(100))
186 );
187 }
188
189 #[test]
190 fn collects_deadline_decl() {
191 let sub = make_subscriber("taktora.test.trig.deadline");
192 let expected_listener = sub.listener_handle();
193 let mut d = TriggerDeclarer::new_test();
194 d.deadline(&sub, Duration::from_millis(50));
195 let TriggerDecl::Deadline { listener, deadline } = &d.decls[0] else {
196 panic!("expected Deadline variant");
197 };
198 assert!(std::sync::Arc::ptr_eq(listener, &expected_listener));
199 assert_eq!(*deadline, Duration::from_millis(50));
200 }
201
202 #[test]
203 fn collects_raw_listener_decl() {
204 let sub = make_subscriber("taktora.test.trig.raw");
205 let handle = sub.listener_handle();
206 let expected = std::sync::Arc::clone(&handle);
207 let mut d = TriggerDeclarer::new_test();
208 d.raw_listener(handle);
209 let TriggerDecl::RawListener(stored) = &d.decls[0] else {
210 panic!("expected RawListener variant");
211 };
212 assert!(std::sync::Arc::ptr_eq(stored, &expected));
213 }
214
215 #[test]
216 #[allow(clippy::unnecessary_wraps)]
217 fn declarer_chains() -> Result<(), ExecutorError> {
218 let sub = make_subscriber("taktora.test.trig.chain");
219 let mut d = TriggerDeclarer::new_test();
220 d.subscriber(&sub).interval(Duration::from_millis(10));
221 assert_eq!(d.decls.len(), 2);
222 Ok(())
223 }
224
225 #[test]
226 fn declares_budget() {
227 let mut d = TriggerDeclarer::new_test();
228 d.budget(Duration::from_millis(8));
229 assert_eq!(d.budget, Some(Duration::from_millis(8)));
230 }
231
232 #[test]
233 fn budget_overwrites_previous() {
234 let mut d = TriggerDeclarer::new_test();
235 d.budget(Duration::from_millis(8));
236 d.budget(Duration::from_millis(4));
237 assert_eq!(d.budget, Some(Duration::from_millis(4)));
238 }
239}