use std::collections::HashMap;
use std::future::Future;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum TerminalStatus {
Succeeded,
Failed,
Cancelled,
}
impl TerminalStatus {
pub fn as_str(&self) -> &'static str {
match self {
TerminalStatus::Succeeded => "succeeded",
TerminalStatus::Failed => "failed",
TerminalStatus::Cancelled => "cancelled",
}
}
}
impl std::fmt::Display for TerminalStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct RunOutcome {
pub run_id: String,
pub status: TerminalStatus,
pub result: Option<Vec<u8>>,
pub error: Option<String>,
pub headers: HashMap<String, String>,
pub final_step: u32,
}
pub trait TerminalHook: Send + Sync {
fn on_termination(&self, outcome: &RunOutcome) -> impl Future<Output = ()> + Send;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopTerminalHook;
impl TerminalHook for NoopTerminalHook {
async fn on_termination(&self, _outcome: &RunOutcome) {}
}
#[cfg(feature = "webhooks")]
mod webhook {
use super::{RunOutcome, TerminalHook, TerminalStatus};
use std::sync::Arc;
use std::time::Duration;
use taquba::Queue;
use taquba_webhooks::{WebhookRequest, enqueue_webhook};
pub struct WebhookTerminalHook {
queue: Arc<Queue>,
target_queue: String,
url_header: String,
timeout: Option<Duration>,
}
impl WebhookTerminalHook {
pub const URL_HEADER: &'static str = "callback_url";
pub fn new(queue: Arc<Queue>, target_queue: impl Into<String>) -> Self {
Self {
queue,
target_queue: target_queue.into(),
url_header: Self::URL_HEADER.to_string(),
timeout: None,
}
}
pub fn with_url_header(mut self, header: impl Into<String>) -> Self {
self.url_header = header.into();
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl TerminalHook for WebhookTerminalHook {
async fn on_termination(&self, outcome: &RunOutcome) {
let Some(url) = outcome.headers.get(&self.url_header) else {
return;
};
let mut req = WebhookRequest::new(url)
.header("Workflow-Run-Id", &outcome.run_id)
.header("Workflow-Run-Status", outcome.status.as_str());
if let Some(t) = self.timeout {
req = req.timeout(t);
}
let body = match outcome.status {
TerminalStatus::Succeeded => outcome.result.clone().unwrap_or_default(),
TerminalStatus::Failed | TerminalStatus::Cancelled => {
outcome.error.clone().unwrap_or_default().into_bytes()
}
};
if let Err(e) = enqueue_webhook(&self.queue, &self.target_queue, req, body).await {
tracing::warn!(
run_id = %outcome.run_id,
error = %e,
"webhook terminal-hook enqueue failed"
);
}
}
}
}
#[cfg(feature = "webhooks")]
pub use webhook::WebhookTerminalHook;