Skip to main content

taktora_executor/
trigger.rs

1//! Trigger declaration. Items hand iceoryx2 listeners / intervals / etc. to
2//! the [`TriggerDeclarer`]; the executor turns the recorded declarations into
3//! `WaitSet` attachments at add-time.
4
5use crate::Subscriber;
6use crate::payload::Payload;
7use core::time::Duration;
8use iceoryx2::port::listener::Listener as IxListener;
9use iceoryx2::prelude::ipc;
10use std::sync::Arc;
11
12/// Listener type the rest of the crate manipulates. Aliased so client code
13/// using `RawListener` keeps working if iceoryx2 renames its types.
14pub type RawListener = IxListener<ipc::Service>;
15
16/// Internal representation of a trigger declaration. Consumed by the executor.
17#[allow(dead_code, clippy::redundant_pub_crate)]
18#[derive(Clone, Debug)]
19pub(crate) enum TriggerDecl {
20    /// Wake when the listener (paired with a subscriber's channel) fires.
21    Subscriber {
22        /// Listener cloned from the subscriber's paired event service.
23        listener: Arc<RawListener>,
24    },
25    /// Wake periodically.
26    Interval(Duration),
27    /// Wake on the listener firing OR after `deadline` elapses without one.
28    ///
29    /// `listener` and `deadline` live in the same variant because iceoryx2's
30    /// `WaitSet::attach_deadline` takes both atomically; splitting them here
31    /// would create a footgun where one could be attached without the other.
32    Deadline {
33        /// Listener cloned from the subscriber's paired event service.
34        listener: Arc<RawListener>,
35        /// Deadline duration after which a missed-deadline event fires.
36        deadline: Duration,
37    },
38    /// Raw user-supplied listener, used as the escape hatch.
39    RawListener(Arc<RawListener>),
40}
41
42/// Records trigger intentions. Consumed by the executor at add-time.
43pub struct TriggerDeclarer<'a> {
44    _marker: core::marker::PhantomData<&'a mut ()>,
45    pub(crate) decls: Vec<TriggerDecl>,
46}
47
48impl TriggerDeclarer<'_> {
49    /// Internal constructor used by the executor when adding a task.
50    #[doc(hidden)]
51    #[allow(dead_code)]
52    pub(crate) const fn new_internal() -> Self {
53        Self {
54            _marker: core::marker::PhantomData,
55            decls: Vec::new(),
56        }
57    }
58
59    #[cfg(test)]
60    pub(crate) const fn new_test() -> Self {
61        Self::new_internal()
62    }
63
64    /// Declare that the item should fire when the given subscriber receives.
65    pub fn subscriber<T: Payload>(&mut self, sub: &Subscriber<T>) -> &mut Self {
66        self.decls.push(TriggerDecl::Subscriber {
67            listener: sub.listener_handle(),
68        });
69        self
70    }
71
72    /// Declare a periodic interval trigger.
73    pub fn interval(&mut self, period: impl Into<Duration>) -> &mut Self {
74        self.decls.push(TriggerDecl::Interval(period.into()));
75        self
76    }
77
78    /// Declare a subscriber trigger that *also* fires the deadline if no
79    /// event arrives within `deadline`.
80    pub fn deadline<T: Payload>(
81        &mut self,
82        sub: &Subscriber<T>,
83        deadline: impl Into<Duration>,
84    ) -> &mut Self {
85        self.decls.push(TriggerDecl::Deadline {
86            listener: sub.listener_handle(),
87            deadline: deadline.into(),
88        });
89        self
90    }
91
92    /// Escape hatch — attach a raw iceoryx2 listener directly.
93    pub fn raw_listener(&mut self, listener: Arc<RawListener>) -> &mut Self {
94        self.decls.push(TriggerDecl::RawListener(listener));
95        self
96    }
97
98    /// Declare that the item should fire when the server receives a request.
99    pub fn server<Req, Resp>(&mut self, srv: &crate::service::Server<Req, Resp>) -> &mut Self
100    where
101        Req: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
102        Resp: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
103    {
104        self.decls.push(TriggerDecl::Subscriber {
105            listener: srv.listener_handle(),
106        });
107        self
108    }
109
110    /// Declare that the item should fire when the client receives a response.
111    pub fn client<Req, Resp>(&mut self, cl: &crate::service::Client<Req, Resp>) -> &mut Self
112    where
113        Req: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
114        Resp: iceoryx2::prelude::ZeroCopySend + Default + core::fmt::Debug + Copy + 'static,
115    {
116        self.decls.push(TriggerDecl::Subscriber {
117            listener: cl.listener_handle(),
118        });
119        self
120    }
121
122    /// Drain the recorded declarations.
123    #[doc(hidden)]
124    #[allow(dead_code)]
125    pub(crate) fn into_decls(self) -> Vec<TriggerDecl> {
126        self.decls
127    }
128
129    /// True if any triggers were declared. Used by the executor to warn when
130    /// non-head items in a chain declare triggers (Task 12).
131    #[doc(hidden)]
132    #[allow(dead_code)]
133    pub(crate) fn is_empty(&self) -> bool {
134        self.decls.is_empty()
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use crate::error::ExecutorError;
142    use iceoryx2::prelude::*;
143
144    #[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
145    #[repr(C)]
146    struct Msg(u32);
147
148    fn make_subscriber(topic: &str) -> crate::Subscriber<Msg> {
149        let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
150        let ch = crate::Channel::<Msg>::open_or_create(&node, topic).unwrap();
151        ch.subscriber().unwrap()
152    }
153
154    #[test]
155    fn collects_subscriber_decl() {
156        let sub = make_subscriber("taktora.test.trig.sub");
157        let expected = sub.listener_handle();
158        let mut d = TriggerDeclarer::new_test();
159        d.subscriber(&sub);
160        assert_eq!(d.decls.len(), 1);
161        let TriggerDecl::Subscriber { listener } = &d.decls[0] else {
162            panic!("expected Subscriber variant");
163        };
164        assert!(std::sync::Arc::ptr_eq(listener, &expected));
165    }
166
167    #[test]
168    fn collects_interval_decl() {
169        let mut d = TriggerDeclarer::new_test();
170        d.interval(Duration::from_millis(100));
171        assert!(
172            matches!(d.decls[0], TriggerDecl::Interval(dur) if dur == Duration::from_millis(100))
173        );
174    }
175
176    #[test]
177    fn collects_deadline_decl() {
178        let sub = make_subscriber("taktora.test.trig.deadline");
179        let expected_listener = sub.listener_handle();
180        let mut d = TriggerDeclarer::new_test();
181        d.deadline(&sub, Duration::from_millis(50));
182        let TriggerDecl::Deadline { listener, deadline } = &d.decls[0] else {
183            panic!("expected Deadline variant");
184        };
185        assert!(std::sync::Arc::ptr_eq(listener, &expected_listener));
186        assert_eq!(*deadline, Duration::from_millis(50));
187    }
188
189    #[test]
190    fn collects_raw_listener_decl() {
191        let sub = make_subscriber("taktora.test.trig.raw");
192        let handle = sub.listener_handle();
193        let expected = std::sync::Arc::clone(&handle);
194        let mut d = TriggerDeclarer::new_test();
195        d.raw_listener(handle);
196        let TriggerDecl::RawListener(stored) = &d.decls[0] else {
197            panic!("expected RawListener variant");
198        };
199        assert!(std::sync::Arc::ptr_eq(stored, &expected));
200    }
201
202    #[test]
203    #[allow(clippy::unnecessary_wraps)]
204    fn declarer_chains() -> Result<(), ExecutorError> {
205        let sub = make_subscriber("taktora.test.trig.chain");
206        let mut d = TriggerDeclarer::new_test();
207        d.subscriber(&sub).interval(Duration::from_millis(10));
208        assert_eq!(d.decls.len(), 2);
209        Ok(())
210    }
211}