use celers_core::{Result, SerializedTask};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum WorkflowState {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
impl std::fmt::Display for WorkflowState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkflowState::Pending => write!(f, "pending"),
WorkflowState::Running => write!(f, "running"),
WorkflowState::Completed => write!(f, "completed"),
WorkflowState::Failed => write!(f, "failed"),
WorkflowState::Cancelled => write!(f, "cancelled"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum StageState {
Pending,
Running,
Completed,
Failed,
Skipped,
}
impl std::fmt::Display for StageState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StageState::Pending => write!(f, "pending"),
StageState::Running => write!(f, "running"),
StageState::Completed => write!(f, "completed"),
StageState::Failed => write!(f, "failed"),
StageState::Skipped => write!(f, "skipped"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Workflow {
pub id: Uuid,
pub workflow_name: String,
pub state: WorkflowState,
pub config: serde_json::Value,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowStage {
pub id: Uuid,
pub workflow_id: Uuid,
pub stage_number: i32,
pub stage_name: String,
pub state: StageState,
pub task_count: i32,
pub completed_count: i32,
pub failed_count: i32,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskDependency {
pub id: Uuid,
pub task_id: Uuid,
pub parent_task_id: Uuid,
pub workflow_id: Option<Uuid>,
pub stage_id: Option<Uuid>,
pub satisfied: bool,
pub created_at: DateTime<Utc>,
pub satisfied_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowStatistics {
pub workflow_id: Uuid,
pub workflow_name: String,
pub workflow_state: WorkflowState,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub total_stages: i64,
pub completed_stages: i64,
pub failed_stages: i64,
pub running_stages: i64,
pub total_tasks: i64,
pub completed_tasks: i64,
pub failed_tasks: i64,
pub duration_secs: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct WorkflowBuilder {
workflow_name: String,
stages: Vec<WorkflowStageBuilder>,
}
#[derive(Debug, Clone)]
pub struct WorkflowStageBuilder {
stage_name: String,
tasks: Vec<SerializedTask>,
dependencies: Vec<String>, }
impl WorkflowBuilder {
pub fn new(workflow_name: String) -> Self {
Self {
workflow_name,
stages: Vec::new(),
}
}
pub fn add_stage(mut self, stage_name: String) -> Self {
self.stages.push(WorkflowStageBuilder {
stage_name,
tasks: Vec::new(),
dependencies: Vec::new(),
});
self
}
pub fn add_task_to_stage(mut self, task: SerializedTask) -> Self {
if let Some(stage) = self.stages.last_mut() {
stage.tasks.push(task);
}
self
}
pub fn add_stage_dependencies(mut self, dependencies: Vec<String>) -> Self {
if let Some(stage) = self.stages.last_mut() {
stage.dependencies = dependencies;
}
self
}
pub fn workflow_name(&self) -> &str {
&self.workflow_name
}
pub fn stages(&self) -> &[WorkflowStageBuilder] {
&self.stages
}
}
impl WorkflowStageBuilder {
pub fn stage_name(&self) -> &str {
&self.stage_name
}
pub fn tasks(&self) -> &[SerializedTask] {
&self.tasks
}
pub fn dependencies(&self) -> &[String] {
&self.dependencies
}
}
pub type HookFn = Arc<
dyn Fn(
&HookContext,
&SerializedTask,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone)]
pub struct HookContext {
pub queue_name: String,
pub task_id: Option<Uuid>,
pub timestamp: DateTime<Utc>,
pub metadata: serde_json::Value,
}
#[derive(Clone)]
pub enum TaskHook {
BeforeEnqueue(HookFn),
AfterEnqueue(HookFn),
BeforeDequeue(HookFn),
AfterDequeue(HookFn),
BeforeAck(HookFn),
AfterAck(HookFn),
BeforeReject(HookFn),
AfterReject(HookFn),
}
#[derive(Clone, Default)]
pub struct TaskHooks {
pub(crate) before_enqueue: Vec<HookFn>,
pub(crate) after_enqueue: Vec<HookFn>,
pub(crate) before_dequeue: Vec<HookFn>,
pub(crate) after_dequeue: Vec<HookFn>,
pub(crate) before_ack: Vec<HookFn>,
pub(crate) after_ack: Vec<HookFn>,
pub(crate) before_reject: Vec<HookFn>,
pub(crate) after_reject: Vec<HookFn>,
}
impl TaskHooks {
pub fn new() -> Self {
Self::default()
}
pub fn add(&mut self, hook: TaskHook) {
match hook {
TaskHook::BeforeEnqueue(f) => self.before_enqueue.push(f),
TaskHook::AfterEnqueue(f) => self.after_enqueue.push(f),
TaskHook::BeforeDequeue(f) => self.before_dequeue.push(f),
TaskHook::AfterDequeue(f) => self.after_dequeue.push(f),
TaskHook::BeforeAck(f) => self.before_ack.push(f),
TaskHook::AfterAck(f) => self.after_ack.push(f),
TaskHook::BeforeReject(f) => self.before_reject.push(f),
TaskHook::AfterReject(f) => self.after_reject.push(f),
}
}
pub fn clear(&mut self) {
self.before_enqueue.clear();
self.after_enqueue.clear();
self.before_dequeue.clear();
self.after_dequeue.clear();
self.before_ack.clear();
self.after_ack.clear();
self.before_reject.clear();
self.after_reject.clear();
}
pub(crate) async fn run_before_enqueue(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.before_enqueue {
hook(ctx, task).await?;
}
Ok(())
}
pub(crate) async fn run_after_enqueue(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.after_enqueue {
hook(ctx, task).await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn run_before_ack(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.before_ack {
hook(ctx, task).await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn run_after_ack(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.after_ack {
hook(ctx, task).await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn run_before_reject(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.before_reject {
hook(ctx, task).await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn run_after_reject(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.after_reject {
hook(ctx, task).await?;
}
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn run_after_dequeue(
&self,
ctx: &HookContext,
task: &SerializedTask,
) -> Result<()> {
for hook in &self.after_dequeue {
hook(ctx, task).await?;
}
Ok(())
}
}