Skip to main content

panopticon_core/hooks/
mod.rs

1pub mod core;
2
3use crate::imports::*;
4
5/// A structured event fired during pipeline execution.
6///
7/// `HookEvent` is the input to every [`Hook`] callback. The variants
8/// mirror the runtime's timeline: `Before*` events fire immediately
9/// before the associated action, `After*` events fire immediately
10/// after, and [`Complete`](Self::Complete) / [`Error`](Self::Error)
11/// bracket the end of the run. The borrowed references let hooks
12/// inspect step metadata, parameters, and outputs without cloning.
13#[derive(Debug)]
14pub enum HookEvent<'a> {
15    /// Fires immediately before a step's operation runs. Parameters
16    /// have been resolved but no outputs have been staged yet.
17    BeforeStep {
18        /// The step's name (prefixed if inside a child pipeline).
19        step_name: &'a str,
20        /// The operation's static metadata.
21        metadata: &'a OperationMetadata,
22        /// The unresolved parameters as declared at draft time.
23        params: &'a Parameters,
24        /// Present when the step is nested inside an iteration.
25        iter_context: Option<&'a IterContext<'a>>,
26    },
27    /// Fires immediately after a step's operation runs. Operation
28    /// outputs and global outputs are staged but not yet merged into
29    /// the runtime store.
30    AfterStep {
31        /// The step's name.
32        step_name: &'a str,
33        /// The operation's static metadata.
34        metadata: &'a OperationMetadata,
35        /// The unresolved parameters.
36        params: &'a Parameters,
37        /// Outputs scoped to the operation only (not merged into the
38        /// runtime store).
39        operation_outputs: &'a Store<StoreEntry>,
40        /// Outputs about to be merged into the runtime store.
41        global_outputs: &'a Store<StoreEntry>,
42        /// Present when the step is nested inside an iteration.
43        iter_context: Option<&'a IterContext<'a>>,
44    },
45    /// Fires before each iteration body executes.
46    BeforeIteration {
47        /// The current iteration's context.
48        iter_context: &'a IterContext<'a>,
49    },
50    /// Fires after each iteration body finishes.
51    AfterIteration {
52        /// The current iteration's context.
53        iter_context: &'a IterContext<'a>,
54    },
55    /// Fires before a return block resolves its parameters.
56    BeforeReturns {
57        /// The return block's name.
58        return_name: &'a str,
59        /// The unresolved parameters.
60        params: &'a Parameters,
61    },
62    /// Fires after a return block resolves its parameters.
63    AfterReturns {
64        /// The return block's name.
65        return_name: &'a str,
66        /// The unresolved parameters.
67        params: &'a Parameters,
68        /// The returns store with the resolved values.
69        outputs: &'a Store<StoreEntry>,
70    },
71    /// Fires when a guard's boolean source evaluates to true. The
72    /// guard body is about to execute.
73    GuardPassed {
74        /// The guard node's name.
75        guard_name: &'a str,
76    },
77    /// Fires when a guard's boolean source evaluates to false. The
78    /// guard body is skipped.
79    GuardFailed {
80        /// The guard node's name.
81        guard_name: &'a str,
82    },
83    /// Fires once at the end of a successful run, after all return
84    /// blocks have resolved.
85    Complete,
86    /// Fires once when the worker thread produces an error. The
87    /// pipeline is about to fail.
88    Error {
89        /// The error the worker is propagating.
90        error: &'a OperationError,
91    },
92}
93
94/// The decision an interceptor hook returns to the runtime.
95///
96/// Observer hooks have no return value; interceptor hooks return a
97/// `HookAction` to either let execution proceed ([`Continue`](Self::Continue))
98/// or stop the pipeline with an error ([`Abort`](Self::Abort)).
99pub enum HookAction {
100    /// Let execution proceed to the next step or event.
101    Continue,
102    /// Stop the pipeline. The runtime wraps the message in
103    /// [`OperationError::HookAbort`].
104    Abort(String),
105}
106
107/// An observer callback: read-only access to every event.
108pub type ObserverCallback = Box<dyn Fn(&HookEvent, &Store<StoreEntry>) + Send>;
109
110/// An interceptor callback: read-only access to every event plus a
111/// [`HookAction`] return value for aborting the pipeline.
112pub type InterceptorCallback = Box<dyn Fn(&HookEvent, &Store<StoreEntry>) -> HookAction + Send>;
113
114/// A type-erased hook callback — either an observer or an
115/// interceptor. `#[non_exhaustive]` to leave room for future
116/// callback flavours.
117#[non_exhaustive]
118pub enum HookCallback {
119    /// A read-only observer.
120    Observer(ObserverCallback),
121    /// An interceptor that can abort the pipeline.
122    Interceptor(InterceptorCallback),
123}
124
125/// An observer or interceptor attached to a pipeline via
126/// [`Pipeline::hook`](crate::prelude::Pipeline#method.hook).
127///
128/// Construct with [`Hook::observer`] for read-only event tracing or
129/// [`Hook::interceptor`] for callbacks that can abort the pipeline.
130/// Built-in hooks ([`Logger`](crate::prelude::Logger),
131/// [`Profiler`](crate::prelude::Profiler),
132/// [`StepFilter`](crate::prelude::StepFilter), and friends) implement
133/// `Into<Hook>` so they can be passed directly to `Pipeline::hook`.
134pub struct Hook {
135    /// The hook's name, surfaced in abort messages.
136    pub name: String,
137    /// The underlying callback.
138    pub callback: HookCallback,
139}
140
141impl Hook {
142    /// Wraps a closure as a read-only observer hook. Observers cannot
143    /// abort the pipeline.
144    pub fn observer(
145        name: impl Into<String>,
146        f: impl Fn(&HookEvent, &Store<StoreEntry>) + Send + 'static,
147    ) -> Self {
148        Hook {
149            name: name.into(),
150            callback: HookCallback::Observer(Box::new(f)),
151        }
152    }
153
154    /// Wraps a closure as an interceptor hook. Interceptors inspect
155    /// each event and return a [`HookAction`] — use [`HookAction::Abort`]
156    /// to stop the pipeline.
157    pub fn interceptor(
158        name: impl Into<String>,
159        f: impl Fn(&HookEvent, &Store<StoreEntry>) -> HookAction + Send + 'static,
160    ) -> Self {
161        Hook {
162            name: name.into(),
163            callback: HookCallback::Interceptor(Box::new(f)),
164        }
165    }
166
167    pub(crate) fn emit(
168        &self,
169        event: &HookEvent,
170        store: &Store<StoreEntry>,
171    ) -> Result<(), OperationError> {
172        match &self.callback {
173            HookCallback::Observer(f) => {
174                f(event, store);
175                Ok(())
176            }
177            HookCallback::Interceptor(f) => match f(event, store) {
178                HookAction::Continue => Ok(()),
179                HookAction::Abort(reason) => Err(OperationError::HookAbort {
180                    hook: self.name.clone(),
181                    reason,
182                }),
183            },
184        }
185    }
186}
187
188pub(crate) fn emit_all(
189    hooks: &[Hook],
190    event: &HookEvent,
191    store: &Store<StoreEntry>,
192) -> Result<(), OperationError> {
193    for hook in hooks {
194        hook.emit(event, store)?;
195    }
196    Ok(())
197}