taquba_workflow/terminal.rs
1use std::collections::HashMap;
2use std::future::Future;
3
4/// Terminal state of a workflow run, passed to a [`TerminalHook`].
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[non_exhaustive]
7pub enum TerminalStatus {
8 /// The runner returned [`crate::StepOutcome::Succeed`].
9 Succeeded,
10 /// One of:
11 /// - the runner returned [`crate::StepOutcome::Fail`] (runner verdict);
12 /// - a step returned [`crate::StepError::permanent`];
13 /// - a step exhausted its transient-retry budget; or
14 /// - the worker hit a permanent runtime error (e.g. malformed step
15 /// headers).
16 Failed,
17 /// The run was cancelled. Either:
18 /// - [`crate::WorkflowRuntime::cancel`] was called for this run; or
19 /// - the runner returned [`crate::StepOutcome::Cancel`].
20 ///
21 /// Like [`Self::Failed`] from `StepOutcome::Fail`, this is a clean
22 /// run-level outcome rather than an infrastructure error: the step is
23 /// acked and no dead-letter is produced.
24 Cancelled,
25}
26
27impl TerminalStatus {
28 /// Canonical lowercase identifier for this status, suitable for HTTP
29 /// headers, structured logs, and other wire-format use. Stable across
30 /// minor releases.
31 pub fn as_str(&self) -> &'static str {
32 match self {
33 TerminalStatus::Succeeded => "succeeded",
34 TerminalStatus::Failed => "failed",
35 TerminalStatus::Cancelled => "cancelled",
36 }
37 }
38}
39
40impl std::fmt::Display for TerminalStatus {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.write_str(self.as_str())
43 }
44}
45
46/// Information passed to a [`TerminalHook`] when a run reaches a terminal
47/// state.
48#[derive(Debug, Clone)]
49pub struct RunOutcome {
50 /// The run's identifier.
51 pub run_id: String,
52 /// Whether the run completed successfully or failed.
53 pub status: TerminalStatus,
54 /// Set when `status == Succeeded`: the bytes the runner returned via
55 /// [`crate::StepOutcome::Succeed`].
56 pub result: Option<Vec<u8>>,
57 /// - When `status == Failed`: the human-readable reason recorded on
58 /// the terminal step's `last_error`.
59 /// - When `status == Cancelled`: `Some(reason)` if the runner
60 /// returned [`crate::StepOutcome::Cancel`], or `None` if
61 /// cancellation came from [`crate::WorkflowRuntime::cancel`]
62 /// (which takes no reason at the API level).
63 /// - When `status == Succeeded`: always `None`.
64 pub error: Option<String>,
65 /// Submitter-supplied metadata, threaded through from
66 /// [`crate::RunSpec::headers`].
67 pub headers: HashMap<String, String>,
68 /// Step number of the step that produced the terminal outcome (zero-based).
69 pub final_step: u32,
70}
71
72/// User-implemented hook fired once per run when the run reaches a terminal
73/// state.
74///
75/// The hook is called from the worker task that processed the terminal step,
76/// after the step is acked / dead-lettered. Hook errors are not propagated;
77/// implementations should either be infallible or log internally.
78pub trait TerminalHook: Send + Sync {
79 /// Called when a run reaches [`TerminalStatus::Succeeded`],
80 /// [`TerminalStatus::Failed`], or [`TerminalStatus::Cancelled`].
81 fn on_termination(&self, outcome: &RunOutcome) -> impl Future<Output = ()> + Send;
82}
83
84/// A no-op terminal hook. Useful when the user only cares about run
85/// observation via tracing or external state.
86#[derive(Debug, Default, Clone, Copy)]
87pub struct NoopTerminalHook;
88
89impl TerminalHook for NoopTerminalHook {
90 async fn on_termination(&self, _outcome: &RunOutcome) {}
91}
92
93#[cfg(feature = "webhooks")]
94mod webhook {
95 use super::{RunOutcome, TerminalHook, TerminalStatus};
96 use std::sync::Arc;
97 use std::time::Duration;
98 use taquba::Queue;
99 use taquba_webhooks::{WebhookRequest, enqueue_webhook};
100
101 /// Convenience terminal hook that fires an HTTP webhook delivery via
102 /// `taquba-webhooks` whenever a run terminates.
103 ///
104 /// The hook reads the target URL from the run's submission headers under
105 /// [`Self::URL_HEADER`] (default `"callback_url"`). Runs without that
106 /// header are simply ignored. The default key intentionally avoids the
107 /// reserved `workflow.*` prefix so submitters can set it directly via
108 /// [`crate::RunSpec::headers`].
109 ///
110 /// The webhook body is the raw `result` bytes for succeeded runs, and
111 /// the UTF-8 error message for failed runs. The run identifier and
112 /// terminal status are passed in the `Workflow-Run-Id` and
113 /// `Workflow-Run-Status` HTTP headers respectively.
114 pub struct WebhookTerminalHook {
115 queue: Arc<Queue>,
116 target_queue: String,
117 url_header: String,
118 timeout: Option<Duration>,
119 }
120
121 impl WebhookTerminalHook {
122 /// Default header key the hook looks for on each [`RunOutcome`].
123 /// Deliberately outside the reserved `workflow.*` prefix so submitters
124 /// can set it on [`crate::RunSpec::headers`] without being
125 /// rejected.
126 pub const URL_HEADER: &'static str = "callback_url";
127
128 /// Build a hook that enqueues webhook deliveries onto `target_queue`
129 /// of the supplied Taquba queue. The submitter sets a callback URL
130 /// per run via the [`Self::URL_HEADER`] header on
131 /// [`crate::RunSpec::headers`].
132 pub fn new(queue: Arc<Queue>, target_queue: impl Into<String>) -> Self {
133 Self {
134 queue,
135 target_queue: target_queue.into(),
136 url_header: Self::URL_HEADER.to_string(),
137 timeout: None,
138 }
139 }
140
141 /// Override the header key the hook reads. Defaults to
142 /// [`Self::URL_HEADER`].
143 pub fn with_url_header(mut self, header: impl Into<String>) -> Self {
144 self.url_header = header.into();
145 self
146 }
147
148 /// Set a per-delivery timeout passed through to the webhook worker.
149 pub fn with_timeout(mut self, timeout: Duration) -> Self {
150 self.timeout = Some(timeout);
151 self
152 }
153 }
154
155 impl TerminalHook for WebhookTerminalHook {
156 async fn on_termination(&self, outcome: &RunOutcome) {
157 let Some(url) = outcome.headers.get(&self.url_header) else {
158 return;
159 };
160 let mut req = WebhookRequest::new(url)
161 .header("Workflow-Run-Id", &outcome.run_id)
162 .header("Workflow-Run-Status", outcome.status.as_str());
163 if let Some(t) = self.timeout {
164 req = req.timeout(t);
165 }
166 let body = match outcome.status {
167 TerminalStatus::Succeeded => outcome.result.clone().unwrap_or_default(),
168 TerminalStatus::Failed | TerminalStatus::Cancelled => {
169 outcome.error.clone().unwrap_or_default().into_bytes()
170 }
171 };
172 if let Err(e) = enqueue_webhook(&self.queue, &self.target_queue, req, body).await {
173 tracing::warn!(
174 run_id = %outcome.run_id,
175 error = %e,
176 "webhook terminal-hook enqueue failed"
177 );
178 }
179 }
180 }
181}
182
183#[cfg(feature = "webhooks")]
184pub use webhook::WebhookTerminalHook;