Skip to main content

taquba_workflow/
terminal.rs

1use std::collections::HashMap;
2use std::future::Future;
3
4/// Terminal state of a workflow run, passed to a [`TerminalHook`].
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[non_exhaustive]
7pub enum TerminalStatus {
8    /// The runner returned [`crate::StepOutcome::Succeed`].
9    Succeeded,
10    /// One of:
11    /// - the runner returned [`crate::StepOutcome::Fail`] (runner verdict);
12    /// - a step returned [`crate::StepError::permanent`];
13    /// - a step exhausted its transient-retry budget; or
14    /// - the worker hit a permanent runtime error (e.g. malformed step
15    ///   headers).
16    Failed,
17    /// The run was cancelled. Either:
18    /// - [`crate::WorkflowRuntime::cancel`] was called for this run; or
19    /// - the runner returned [`crate::StepOutcome::Cancel`].
20    ///
21    /// Like [`Self::Failed`] from `StepOutcome::Fail`, this is a clean
22    /// run-level outcome rather than an infrastructure error: the step is
23    /// acked and no dead-letter is produced.
24    Cancelled,
25}
26
27impl TerminalStatus {
28    /// Canonical lowercase identifier for this status, suitable for HTTP
29    /// headers, structured logs, and other wire-format use. Stable across
30    /// minor releases.
31    pub fn as_str(&self) -> &'static str {
32        match self {
33            TerminalStatus::Succeeded => "succeeded",
34            TerminalStatus::Failed => "failed",
35            TerminalStatus::Cancelled => "cancelled",
36        }
37    }
38}
39
40impl std::fmt::Display for TerminalStatus {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.write_str(self.as_str())
43    }
44}
45
46/// Information passed to a [`TerminalHook`] when a run reaches a terminal
47/// state.
48#[derive(Debug, Clone)]
49pub struct RunOutcome {
50    /// The run's identifier.
51    pub run_id: String,
52    /// Whether the run completed successfully or failed.
53    pub status: TerminalStatus,
54    /// Set when `status == Succeeded`: the bytes the runner returned via
55    /// [`crate::StepOutcome::Succeed`].
56    pub result: Option<Vec<u8>>,
57    /// - When `status == Failed`: the human-readable reason recorded on
58    ///   the terminal step's `last_error`.
59    /// - When `status == Cancelled`: `Some(reason)` if the runner
60    ///   returned [`crate::StepOutcome::Cancel`], or `None` if
61    ///   cancellation came from [`crate::WorkflowRuntime::cancel`]
62    ///   (which takes no reason at the API level).
63    /// - When `status == Succeeded`: always `None`.
64    pub error: Option<String>,
65    /// Submitter-supplied metadata, threaded through from
66    /// [`crate::RunSpec::headers`].
67    pub headers: HashMap<String, String>,
68    /// Step number of the step that produced the terminal outcome (zero-based).
69    pub final_step: u32,
70}
71
72/// User-implemented hook fired once per run when the run reaches a terminal
73/// state.
74///
75/// The hook is called from the worker task that processed the terminal step,
76/// after the step is acked / dead-lettered. Hook errors are not propagated;
77/// implementations should either be infallible or log internally.
78pub trait TerminalHook: Send + Sync {
79    /// Called when a run reaches [`TerminalStatus::Succeeded`],
80    /// [`TerminalStatus::Failed`], or [`TerminalStatus::Cancelled`].
81    fn on_termination(&self, outcome: &RunOutcome) -> impl Future<Output = ()> + Send;
82}
83
84/// A no-op terminal hook. Useful when the user only cares about run
85/// observation via tracing or external state.
86#[derive(Debug, Default, Clone, Copy)]
87pub struct NoopTerminalHook;
88
89impl TerminalHook for NoopTerminalHook {
90    async fn on_termination(&self, _outcome: &RunOutcome) {}
91}
92
93#[cfg(feature = "webhooks")]
94mod webhook {
95    use super::{RunOutcome, TerminalHook, TerminalStatus};
96    use std::sync::Arc;
97    use std::time::Duration;
98    use taquba::Queue;
99    use taquba_webhooks::{WebhookRequest, enqueue_webhook};
100
101    /// Convenience terminal hook that fires an HTTP webhook delivery via
102    /// `taquba-webhooks` whenever a run terminates.
103    ///
104    /// The hook reads the target URL from the run's submission headers under
105    /// [`Self::URL_HEADER`] (default `"callback_url"`). Runs without that
106    /// header are simply ignored. The default key intentionally avoids the
107    /// reserved `workflow.*` prefix so submitters can set it directly via
108    /// [`crate::RunSpec::headers`].
109    ///
110    /// The webhook body is the raw `result` bytes for succeeded runs, and
111    /// the UTF-8 error message for failed runs. The run identifier and
112    /// terminal status are passed in the `Workflow-Run-Id` and
113    /// `Workflow-Run-Status` HTTP headers respectively.
114    pub struct WebhookTerminalHook {
115        queue: Arc<Queue>,
116        target_queue: String,
117        url_header: String,
118        timeout: Option<Duration>,
119    }
120
121    impl WebhookTerminalHook {
122        /// Default header key the hook looks for on each [`RunOutcome`].
123        /// Deliberately outside the reserved `workflow.*` prefix so submitters
124        /// can set it on [`crate::RunSpec::headers`] without being
125        /// rejected.
126        pub const URL_HEADER: &'static str = "callback_url";
127
128        /// Build a hook that enqueues webhook deliveries onto `target_queue`
129        /// of the supplied Taquba queue. The submitter sets a callback URL
130        /// per run via the [`Self::URL_HEADER`] header on
131        /// [`crate::RunSpec::headers`].
132        pub fn new(queue: Arc<Queue>, target_queue: impl Into<String>) -> Self {
133            Self {
134                queue,
135                target_queue: target_queue.into(),
136                url_header: Self::URL_HEADER.to_string(),
137                timeout: None,
138            }
139        }
140
141        /// Override the header key the hook reads. Defaults to
142        /// [`Self::URL_HEADER`].
143        pub fn with_url_header(mut self, header: impl Into<String>) -> Self {
144            self.url_header = header.into();
145            self
146        }
147
148        /// Set a per-delivery timeout passed through to the webhook worker.
149        pub fn with_timeout(mut self, timeout: Duration) -> Self {
150            self.timeout = Some(timeout);
151            self
152        }
153    }
154
155    impl TerminalHook for WebhookTerminalHook {
156        async fn on_termination(&self, outcome: &RunOutcome) {
157            let Some(url) = outcome.headers.get(&self.url_header) else {
158                return;
159            };
160            let mut req = WebhookRequest::new(url)
161                .header("Workflow-Run-Id", &outcome.run_id)
162                .header("Workflow-Run-Status", outcome.status.as_str());
163            if let Some(t) = self.timeout {
164                req = req.timeout(t);
165            }
166            let body = match outcome.status {
167                TerminalStatus::Succeeded => outcome.result.clone().unwrap_or_default(),
168                TerminalStatus::Failed | TerminalStatus::Cancelled => {
169                    outcome.error.clone().unwrap_or_default().into_bytes()
170                }
171            };
172            if let Err(e) = enqueue_webhook(&self.queue, &self.target_queue, req, body).await {
173                tracing::warn!(
174                    run_id = %outcome.run_id,
175                    error = %e,
176                    "webhook terminal-hook enqueue failed"
177                );
178            }
179        }
180    }
181}
182
183#[cfg(feature = "webhooks")]
184pub use webhook::WebhookTerminalHook;