use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
use serde::{Serialize, de::DeserializeOwned};
use super::context::WorkflowContext;
use crate::Result;
pub trait ForgeWorkflow: Send + Sync + 'static {
type Input: DeserializeOwned + Serialize + Send + Sync;
type Output: Serialize + Send;
fn info() -> WorkflowInfo;
fn execute(
ctx: &WorkflowContext,
input: Self::Input,
) -> Pin<Box<dyn Future<Output = Result<Self::Output>> + Send + '_>>;
}
#[derive(Debug, Clone)]
pub struct WorkflowInfo {
pub name: &'static str,
pub version: &'static str,
pub signature: &'static str,
pub is_active: bool,
pub is_deprecated: bool,
pub timeout: Duration,
pub http_timeout: Option<Duration>,
pub is_public: bool,
pub required_role: Option<&'static str>,
}
impl Default for WorkflowInfo {
fn default() -> Self {
Self {
name: "",
version: "v1",
signature: "",
is_active: true,
is_deprecated: false,
timeout: Duration::from_secs(86400), http_timeout: None,
is_public: false,
required_role: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkflowStatus {
Created,
Running,
Waiting,
Completed,
Compensating,
Compensated,
Failed,
BlockedMissingVersion,
BlockedSignatureMismatch,
BlockedMissingHandler,
RetiredUnresumable,
CancelledByOperator,
}
impl WorkflowStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Created => "created",
Self::Running => "running",
Self::Waiting => "waiting",
Self::Completed => "completed",
Self::Compensating => "compensating",
Self::Compensated => "compensated",
Self::Failed => "failed",
Self::BlockedMissingVersion => "blocked_missing_version",
Self::BlockedSignatureMismatch => "blocked_signature_mismatch",
Self::BlockedMissingHandler => "blocked_missing_handler",
Self::RetiredUnresumable => "retired_unresumable",
Self::CancelledByOperator => "cancelled_by_operator",
}
}
pub fn is_terminal(&self) -> bool {
matches!(
self,
Self::Completed
| Self::Compensated
| Self::Failed
| Self::RetiredUnresumable
| Self::CancelledByOperator
)
}
pub fn is_blocked(&self) -> bool {
matches!(
self,
Self::BlockedMissingVersion
| Self::BlockedSignatureMismatch
| Self::BlockedMissingHandler
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseWorkflowStatusError(pub String);
impl std::fmt::Display for ParseWorkflowStatusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "invalid workflow status: '{}'", self.0)
}
}
impl std::error::Error for ParseWorkflowStatusError {}
impl FromStr for WorkflowStatus {
type Err = ParseWorkflowStatusError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"created" => Ok(Self::Created),
"running" => Ok(Self::Running),
"waiting" => Ok(Self::Waiting),
"completed" => Ok(Self::Completed),
"compensating" => Ok(Self::Compensating),
"compensated" => Ok(Self::Compensated),
"failed" => Ok(Self::Failed),
"blocked_missing_version" => Ok(Self::BlockedMissingVersion),
"blocked_signature_mismatch" => Ok(Self::BlockedSignatureMismatch),
"blocked_missing_handler" => Ok(Self::BlockedMissingHandler),
"retired_unresumable" => Ok(Self::RetiredUnresumable),
"cancelled_by_operator" => Ok(Self::CancelledByOperator),
_ => Err(ParseWorkflowStatusError(s.to_string())),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
use super::*;
#[test]
fn test_workflow_info_default() {
let info = WorkflowInfo::default();
assert_eq!(info.name, "");
assert_eq!(info.version, "v1");
assert!(info.is_active);
assert!(!info.is_deprecated);
}
#[test]
fn test_workflow_status_conversion() {
assert_eq!(WorkflowStatus::Running.as_str(), "running");
assert_eq!(WorkflowStatus::Completed.as_str(), "completed");
assert_eq!(WorkflowStatus::Compensating.as_str(), "compensating");
assert_eq!(
WorkflowStatus::BlockedMissingVersion.as_str(),
"blocked_missing_version"
);
assert_eq!(
WorkflowStatus::CancelledByOperator.as_str(),
"cancelled_by_operator"
);
assert_eq!(
"running".parse::<WorkflowStatus>(),
Ok(WorkflowStatus::Running)
);
assert_eq!(
"completed".parse::<WorkflowStatus>(),
Ok(WorkflowStatus::Completed)
);
assert_eq!(
"blocked_missing_version".parse::<WorkflowStatus>(),
Ok(WorkflowStatus::BlockedMissingVersion)
);
assert_eq!(
"cancelled_by_operator".parse::<WorkflowStatus>(),
Ok(WorkflowStatus::CancelledByOperator)
);
}
#[test]
fn test_workflow_status_is_terminal() {
assert!(!WorkflowStatus::Running.is_terminal());
assert!(!WorkflowStatus::Waiting.is_terminal());
assert!(WorkflowStatus::Completed.is_terminal());
assert!(WorkflowStatus::Failed.is_terminal());
assert!(WorkflowStatus::Compensated.is_terminal());
assert!(WorkflowStatus::RetiredUnresumable.is_terminal());
assert!(WorkflowStatus::CancelledByOperator.is_terminal());
}
#[test]
fn test_workflow_status_is_blocked() {
assert!(!WorkflowStatus::Running.is_blocked());
assert!(WorkflowStatus::BlockedMissingVersion.is_blocked());
assert!(WorkflowStatus::BlockedSignatureMismatch.is_blocked());
assert!(WorkflowStatus::BlockedMissingHandler.is_blocked());
}
}