Skip to main content

taktora_executor/
trigger.rs

1//! Trigger declaration. Items hand iceoryx2 listeners / intervals / etc. to
2//! the [`TriggerDeclarer`]; the executor turns the recorded declarations into
3//! `WaitSet` attachments at add-time.
4
5use crate::Subscriber;
6use crate::payload::Payload;
7use core::time::Duration;
8use iceoryx2::port::listener::Listener as IxListener;
9use iceoryx2::prelude::ipc;
10use std::sync::Arc;
11
12/// Listener type the rest of the crate manipulates. Aliased so client code
13/// using `RawListener` keeps working if iceoryx2 renames its types.
14pub type RawListener = IxListener<ipc::Service>;
15
16/// Internal representation of a trigger declaration. Consumed by the executor.
17#[allow(dead_code, clippy::redundant_pub_crate)]
18#[derive(Clone, Debug)]
19pub(crate) enum TriggerDecl {
20    /// Wake when the listener (paired with a subscriber's channel) fires.
21    Subscriber {
22        /// Listener cloned from the subscriber's paired event service.
23        listener: Arc<RawListener>,
24    },
25    /// Wake periodically.
26    Interval(Duration),
27    /// Wake on the listener firing OR after `deadline` elapses without one.
28    ///
29    /// `listener` and `deadline` live in the same variant because iceoryx2's
30    /// `WaitSet::attach_deadline` takes both atomically; splitting them here
31    /// would create a footgun where one could be attached without the other.
32    Deadline {
33        /// Listener cloned from the subscriber's paired event service.
34        listener: Arc<RawListener>,
35        /// Deadline duration after which a missed-deadline event fires.
36        deadline: Duration,
37    },
38    /// Raw user-supplied listener, used as the escape hatch.
39    RawListener(Arc<RawListener>),
40}
41
42/// Records trigger intentions. Consumed by the executor at add-time.
43pub struct TriggerDeclarer<'a> {
44    _marker: core::marker::PhantomData<&'a mut ()>,
45    pub(crate) decls: Vec<TriggerDecl>,
46    /// Optional per-task budget (`REQ_0070`). `None` means no per-task
47    /// budget; the executor-wide iteration budget (if set) still applies.
48    pub(crate) budget: Option<Duration>,
49}
50
51impl TriggerDeclarer<'_> {
52    /// Internal constructor used by the executor when adding a task.
53    #[doc(hidden)]
54    #[allow(dead_code)]
55    pub(crate) const fn new_internal() -> Self {
56        Self {
57            _marker: core::marker::PhantomData,
58            decls: Vec::new(),
59            budget: None,
60        }
61    }
62
63    #[cfg(test)]
64    pub(crate) const fn new_test() -> Self {
65        Self::new_internal()
66    }
67
68    /// Declare that the item should fire when the given subscriber receives.
69    pub fn subscriber<T: Payload>(&mut self, sub: &Subscriber<T>) -> &mut Self {
70        self.decls.push(TriggerDecl::Subscriber {
71            listener: sub.listener_handle(),
72        });
73        self
74    }
75
76    /// Declare a periodic interval trigger.
77    pub fn interval(&mut self, period: impl Into<Duration>) -> &mut Self {
78        self.decls.push(TriggerDecl::Interval(period.into()));
79        self
80    }
81
82    /// Declare a subscriber trigger that *also* fires the deadline if no
83    /// event arrives within `deadline`.
84    pub fn deadline<T: Payload>(
85        &mut self,
86        sub: &Subscriber<T>,
87        deadline: impl Into<Duration>,
88    ) -> &mut Self {
89        self.decls.push(TriggerDecl::Deadline {
90            listener: sub.listener_handle(),
91            deadline: deadline.into(),
92        });
93        self
94    }
95
96    /// Declare that this item's `execute()` must finish within `dur`.
97    /// Exceeding it transitions the task to `Faulted` state (`REQ_0070`).
98    /// Calling `budget` more than once on the same declarer keeps the
99    /// last value (consistent with how other trigger declarations would
100    /// behave if repeated).
101    pub const fn budget(&mut self, dur: Duration) {
102        self.budget = Some(dur);
103    }
104
105    /// Escape hatch — attach a raw iceoryx2 listener directly.
106    pub fn raw_listener(&mut self, listener: Arc<RawListener>) -> &mut Self {
107        self.decls.push(TriggerDecl::RawListener(listener));
108        self
109    }
110
111    /// Declare that the item should fire when the server receives a request.
112    pub fn server<Req, Resp>(&mut self, srv: &crate::service::Server<Req, Resp>) -> &mut Self
113    where
114        Req: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
115        Resp: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
116    {
117        self.decls.push(TriggerDecl::Subscriber {
118            listener: srv.listener_handle(),
119        });
120        self
121    }
122
123    /// Declare that the item should fire when the client receives a response.
124    pub fn client<Req, Resp>(&mut self, cl: &crate::service::Client<Req, Resp>) -> &mut Self
125    where
126        Req: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
127        Resp: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
128    {
129        self.decls.push(TriggerDecl::Subscriber {
130            listener: cl.listener_handle(),
131        });
132        self
133    }
134
135    /// Drain the recorded declarations.
136    #[doc(hidden)]
137    #[allow(dead_code)]
138    pub(crate) fn into_decls(self) -> Vec<TriggerDecl> {
139        self.decls
140    }
141
142    /// True if any triggers were declared. Used by the executor to warn when
143    /// non-head items in a chain declare triggers (Task 12).
144    #[doc(hidden)]
145    #[allow(dead_code)]
146    pub(crate) fn is_empty(&self) -> bool {
147        self.decls.is_empty()
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::error::ExecutorError;
155    use iceoryx2::prelude::*;
156
157    #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
158    #[repr(C)]
159    struct Msg(u32);
160
161    fn make_subscriber(topic: &str) -> crate::Subscriber<Msg> {
162        let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
163        let ch = crate::Channel::<Msg>::open_or_create(&node, topic).unwrap();
164        ch.subscriber().unwrap()
165    }
166
167    #[test]
168    fn collects_subscriber_decl() {
169        let sub = make_subscriber("taktora.test.trig.sub");
170        let expected = sub.listener_handle();
171        let mut d = TriggerDeclarer::new_test();
172        d.subscriber(&sub);
173        assert_eq!(d.decls.len(), 1);
174        let TriggerDecl::Subscriber { listener } = &d.decls[0] else {
175            panic!("expected Subscriber variant");
176        };
177        assert!(std::sync::Arc::ptr_eq(listener, &expected));
178    }
179
180    #[test]
181    fn collects_interval_decl() {
182        let mut d = TriggerDeclarer::new_test();
183        d.interval(Duration::from_millis(100));
184        assert!(
185            matches!(d.decls[0], TriggerDecl::Interval(dur) if dur == Duration::from_millis(100))
186        );
187    }
188
189    #[test]
190    fn collects_deadline_decl() {
191        let sub = make_subscriber("taktora.test.trig.deadline");
192        let expected_listener = sub.listener_handle();
193        let mut d = TriggerDeclarer::new_test();
194        d.deadline(&sub, Duration::from_millis(50));
195        let TriggerDecl::Deadline { listener, deadline } = &d.decls[0] else {
196            panic!("expected Deadline variant");
197        };
198        assert!(std::sync::Arc::ptr_eq(listener, &expected_listener));
199        assert_eq!(*deadline, Duration::from_millis(50));
200    }
201
202    #[test]
203    fn collects_raw_listener_decl() {
204        let sub = make_subscriber("taktora.test.trig.raw");
205        let handle = sub.listener_handle();
206        let expected = std::sync::Arc::clone(&handle);
207        let mut d = TriggerDeclarer::new_test();
208        d.raw_listener(handle);
209        let TriggerDecl::RawListener(stored) = &d.decls[0] else {
210            panic!("expected RawListener variant");
211        };
212        assert!(std::sync::Arc::ptr_eq(stored, &expected));
213    }
214
215    #[test]
216    #[allow(clippy::unnecessary_wraps)]
217    fn declarer_chains() -> Result<(), ExecutorError> {
218        let sub = make_subscriber("taktora.test.trig.chain");
219        let mut d = TriggerDeclarer::new_test();
220        d.subscriber(&sub).interval(Duration::from_millis(10));
221        assert_eq!(d.decls.len(), 2);
222        Ok(())
223    }
224
225    #[test]
226    fn declares_budget() {
227        let mut d = TriggerDeclarer::new_test();
228        d.budget(Duration::from_millis(8));
229        assert_eq!(d.budget, Some(Duration::from_millis(8)));
230    }
231
232    #[test]
233    fn budget_overwrites_previous() {
234        let mut d = TriggerDeclarer::new_test();
235        d.budget(Duration::from_millis(8));
236        d.budget(Duration::from_millis(4));
237        assert_eq!(d.budget, Some(Duration::from_millis(4)));
238    }
239}