panopticon_core/hooks/mod.rs
1pub mod core;
2
3use crate::imports::*;
4
5/// A structured event fired during pipeline execution.
6///
7/// `HookEvent` is the input to every [`Hook`] callback. The variants
8/// mirror the runtime's timeline: `Before*` events fire immediately
9/// before the associated action, `After*` events fire immediately
10/// after, and [`Complete`](Self::Complete) / [`Error`](Self::Error)
11/// bracket the end of the run. The borrowed references let hooks
12/// inspect step metadata, parameters, and outputs without cloning.
13#[derive(Debug)]
14pub enum HookEvent<'a> {
15 /// Fires immediately before a step's operation runs. Parameters
16 /// have been resolved but no outputs have been staged yet.
17 BeforeStep {
18 /// The step's name (prefixed if inside a child pipeline).
19 step_name: &'a str,
20 /// The operation's static metadata.
21 metadata: &'a OperationMetadata,
22 /// The unresolved parameters as declared at draft time.
23 params: &'a Parameters,
24 /// Present when the step is nested inside an iteration.
25 iter_context: Option<&'a IterContext<'a>>,
26 },
27 /// Fires immediately after a step's operation runs. Operation
28 /// outputs and global outputs are staged but not yet merged into
29 /// the runtime store.
30 AfterStep {
31 /// The step's name.
32 step_name: &'a str,
33 /// The operation's static metadata.
34 metadata: &'a OperationMetadata,
35 /// The unresolved parameters.
36 params: &'a Parameters,
37 /// Outputs scoped to the operation only (not merged into the
38 /// runtime store).
39 operation_outputs: &'a Store<StoreEntry>,
40 /// Outputs about to be merged into the runtime store.
41 global_outputs: &'a Store<StoreEntry>,
42 /// Present when the step is nested inside an iteration.
43 iter_context: Option<&'a IterContext<'a>>,
44 },
45 /// Fires before each iteration body executes.
46 BeforeIteration {
47 /// The current iteration's context.
48 iter_context: &'a IterContext<'a>,
49 },
50 /// Fires after each iteration body finishes.
51 AfterIteration {
52 /// The current iteration's context.
53 iter_context: &'a IterContext<'a>,
54 },
55 /// Fires before a return block resolves its parameters.
56 BeforeReturns {
57 /// The return block's name.
58 return_name: &'a str,
59 /// The unresolved parameters.
60 params: &'a Parameters,
61 },
62 /// Fires after a return block resolves its parameters.
63 AfterReturns {
64 /// The return block's name.
65 return_name: &'a str,
66 /// The unresolved parameters.
67 params: &'a Parameters,
68 /// The returns store with the resolved values.
69 outputs: &'a Store<StoreEntry>,
70 },
71 /// Fires when a guard's boolean source evaluates to true. The
72 /// guard body is about to execute.
73 GuardPassed {
74 /// The guard node's name.
75 guard_name: &'a str,
76 },
77 /// Fires when a guard's boolean source evaluates to false. The
78 /// guard body is skipped.
79 GuardFailed {
80 /// The guard node's name.
81 guard_name: &'a str,
82 },
83 /// Fires once at the end of a successful run, after all return
84 /// blocks have resolved.
85 Complete,
86 /// Fires once when the worker thread produces an error. The
87 /// pipeline is about to fail.
88 Error {
89 /// The error the worker is propagating.
90 error: &'a OperationError,
91 },
92}
93
94/// The decision an interceptor hook returns to the runtime.
95///
96/// Observer hooks have no return value; interceptor hooks return a
97/// `HookAction` to either let execution proceed ([`Continue`](Self::Continue))
98/// or stop the pipeline with an error ([`Abort`](Self::Abort)).
99pub enum HookAction {
100 /// Let execution proceed to the next step or event.
101 Continue,
102 /// Stop the pipeline. The runtime wraps the message in
103 /// [`OperationError::HookAbort`].
104 Abort(String),
105}
106
107/// An observer callback: read-only access to every event.
108pub type ObserverCallback = Box<dyn Fn(&HookEvent, &Store<StoreEntry>) + Send>;
109
110/// An interceptor callback: read-only access to every event plus a
111/// [`HookAction`] return value for aborting the pipeline.
112pub type InterceptorCallback = Box<dyn Fn(&HookEvent, &Store<StoreEntry>) -> HookAction + Send>;
113
114/// A type-erased hook callback — either an observer or an
115/// interceptor. `#[non_exhaustive]` to leave room for future
116/// callback flavours.
117#[non_exhaustive]
118pub enum HookCallback {
119 /// A read-only observer.
120 Observer(ObserverCallback),
121 /// An interceptor that can abort the pipeline.
122 Interceptor(InterceptorCallback),
123}
124
125/// An observer or interceptor attached to a pipeline via
126/// [`Pipeline::hook`](crate::prelude::Pipeline#method.hook).
127///
128/// Construct with [`Hook::observer`] for read-only event tracing or
129/// [`Hook::interceptor`] for callbacks that can abort the pipeline.
130/// Built-in hooks ([`Logger`](crate::prelude::Logger),
131/// [`Profiler`](crate::prelude::Profiler),
132/// [`StepFilter`](crate::prelude::StepFilter), and friends) implement
133/// `Into<Hook>` so they can be passed directly to `Pipeline::hook`.
134pub struct Hook {
135 /// The hook's name, surfaced in abort messages.
136 pub name: String,
137 /// The underlying callback.
138 pub callback: HookCallback,
139}
140
141impl Hook {
142 /// Wraps a closure as a read-only observer hook. Observers cannot
143 /// abort the pipeline.
144 pub fn observer(
145 name: impl Into<String>,
146 f: impl Fn(&HookEvent, &Store<StoreEntry>) + Send + 'static,
147 ) -> Self {
148 Hook {
149 name: name.into(),
150 callback: HookCallback::Observer(Box::new(f)),
151 }
152 }
153
154 /// Wraps a closure as an interceptor hook. Interceptors inspect
155 /// each event and return a [`HookAction`] — use [`HookAction::Abort`]
156 /// to stop the pipeline.
157 pub fn interceptor(
158 name: impl Into<String>,
159 f: impl Fn(&HookEvent, &Store<StoreEntry>) -> HookAction + Send + 'static,
160 ) -> Self {
161 Hook {
162 name: name.into(),
163 callback: HookCallback::Interceptor(Box::new(f)),
164 }
165 }
166
167 pub(crate) fn emit(
168 &self,
169 event: &HookEvent,
170 store: &Store<StoreEntry>,
171 ) -> Result<(), OperationError> {
172 match &self.callback {
173 HookCallback::Observer(f) => {
174 f(event, store);
175 Ok(())
176 }
177 HookCallback::Interceptor(f) => match f(event, store) {
178 HookAction::Continue => Ok(()),
179 HookAction::Abort(reason) => Err(OperationError::HookAbort {
180 hook: self.name.clone(),
181 reason,
182 }),
183 },
184 }
185 }
186}
187
188pub(crate) fn emit_all(
189 hooks: &[Hook],
190 event: &HookEvent,
191 store: &Store<StoreEntry>,
192) -> Result<(), OperationError> {
193 for hook in hooks {
194 hook.emit(event, store)?;
195 }
196 Ok(())
197}