Skip to main content

taktora_executor/
item.rs

1//! `ExecutableItem` trait and the closure adapter.
2
3use crate::context::Context;
4use crate::control_flow::ExecuteResult;
5use crate::error::ExecutorError;
6use crate::trigger::TriggerDeclarer;
7
8/// Trait implemented by every unit of work the executor schedules.
9///
10/// Implementors are moved into the executor and dispatched to pool workers.
11/// `Send + 'static` is required; `Sync` is **not** — the executor guarantees
12/// at most one thread at a time invokes `execute` on a given item.
13pub trait ExecutableItem: Send + 'static {
14    /// Called once when the item is added to an executor. The implementor
15    /// registers its trigger handles via the [`TriggerDeclarer`].
16    fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
17        let _ = d;
18        Ok(())
19    }
20
21    /// Called by the executor when any declared trigger fires.
22    fn execute(&mut self, ctx: &mut Context<'_>) -> ExecuteResult;
23
24    /// Optional human-readable id used for monitor/observer correlation.
25    /// `None` means "use the auto-assigned id".
26    fn task_id(&self) -> Option<&str> {
27        None
28    }
29
30    /// Optional application id; `Some(_)` enables Observer per-app callbacks.
31    fn app_id(&self) -> Option<u32> {
32        None
33    }
34
35    /// Optional application instance id.
36    fn app_instance_id(&self) -> Option<u32> {
37        None
38    }
39}
40
41// ── Blanket impl for boxed trait objects ─────────────────────────────────────
42
43/// Allows `Vec<Box<dyn ExecutableItem>>` to be passed directly to
44/// [`crate::Executor::add_chain`] without a secondary wrapper.
45impl ExecutableItem for Box<dyn ExecutableItem> {
46    fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
47        (**self).declare_triggers(d)
48    }
49
50    fn execute(&mut self, ctx: &mut Context<'_>) -> ExecuteResult {
51        (**self).execute(ctx)
52    }
53
54    fn task_id(&self) -> Option<&str> {
55        (**self).task_id()
56    }
57
58    fn app_id(&self) -> Option<u32> {
59        (**self).app_id()
60    }
61
62    fn app_instance_id(&self) -> Option<u32> {
63        (**self).app_instance_id()
64    }
65}
66
67// ── Closure adapter ────────────────────────────────────────────────────────
68
69/// Adapter turning a closure into an [`ExecutableItem`] with no triggers
70/// declared. Use [`item_with_triggers`] when triggers are needed.
71pub struct FnItem<F>(F);
72
73impl<F> ExecutableItem for FnItem<F>
74where
75    F: FnMut(&mut Context<'_>) -> ExecuteResult + Send + 'static,
76{
77    fn execute(&mut self, ctx: &mut Context<'_>) -> ExecuteResult {
78        (self.0)(ctx)
79    }
80}
81
82/// Wrap a closure as an [`ExecutableItem`].
83pub const fn item<F>(f: F) -> FnItem<F>
84where
85    F: FnMut(&mut Context<'_>) -> ExecuteResult + Send + 'static,
86{
87    FnItem(f)
88}
89
90/// Wrap a pair of closures (`declare`, `execute`) as an [`ExecutableItem`].
91pub struct FnItemWithTriggers<D, E> {
92    /// `Some` until the first `declare_triggers` call; `None` thereafter.
93    /// `Option::take` enforces the once-only guarantee — do not unwrap directly.
94    declare: Option<D>,
95    /// User-supplied execute closure invoked on every dispatch.
96    execute: E,
97}
98
99impl<D, E> ExecutableItem for FnItemWithTriggers<D, E>
100where
101    D: FnOnce(&mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> + Send + 'static,
102    E: FnMut(&mut Context<'_>) -> ExecuteResult + Send + 'static,
103{
104    fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
105        self.declare.take().map_or_else(|| Ok(()), |decl| decl(d))
106    }
107
108    fn execute(&mut self, ctx: &mut Context<'_>) -> ExecuteResult {
109        (self.execute)(ctx)
110    }
111}
112
113/// Wrap a `(declare, execute)` pair as an [`ExecutableItem`].
114pub const fn item_with_triggers<D, E>(declare: D, execute: E) -> FnItemWithTriggers<D, E>
115where
116    D: FnOnce(&mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> + Send + 'static,
117    E: FnMut(&mut Context<'_>) -> ExecuteResult + Send + 'static,
118{
119    FnItemWithTriggers {
120        declare: Some(declare),
121        execute,
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::context::ContextHarness;
129    use crate::control_flow::ControlFlow;
130
131    #[test]
132    fn closure_item_runs() {
133        let mut counter = 0_u32;
134        let cell = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
135        let cell_clone = std::sync::Arc::clone(&cell);
136
137        let mut it = item(move |_ctx| {
138            cell_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
139            Ok(ControlFlow::Continue)
140        });
141
142        let harness = ContextHarness::new("test-task");
143        for _ in 0..3 {
144            it.execute(&mut harness.context()).unwrap();
145            counter += 1;
146        }
147
148        assert_eq!(counter, 3);
149        assert_eq!(cell.load(std::sync::atomic::Ordering::SeqCst), 3);
150    }
151
152    #[test]
153    fn item_with_triggers_calls_declare_once() {
154        let calls = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
155        let calls_d = std::sync::Arc::clone(&calls);
156
157        let mut it = item_with_triggers(
158            move |_d| {
159                calls_d.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
160                Ok(())
161            },
162            |_ctx| Ok(ControlFlow::Continue),
163        );
164
165        let mut declarer_storage = TriggerDeclarer::new_test();
166        it.declare_triggers(&mut declarer_storage).unwrap();
167        it.declare_triggers(&mut declarer_storage).unwrap();
168        it.declare_triggers(&mut declarer_storage).unwrap();
169
170        assert_eq!(
171            calls.load(std::sync::atomic::Ordering::SeqCst),
172            1,
173            "declare closure must be invoked at most once"
174        );
175    }
176}