use tracing::{debug, info, warn};
use std::sync::Arc;
use crate::dal::DAL;
use crate::database::universal_types::UniversalUuid;
use crate::error::ValidationError;
use crate::models::task_execution::TaskExecution;
use crate::Runtime;
use super::context_manager::ContextManager;
use super::trigger_rules::{TriggerCondition, TriggerRule};
pub struct StateManager<'a> {
dal: &'a DAL,
runtime: Arc<Runtime>,
}
impl<'a> StateManager<'a> {
pub fn new(dal: &'a DAL, runtime: Arc<Runtime>) -> Self {
Self { dal, runtime }
}
pub async fn update_workflow_task_readiness(
&self,
workflow_execution_id: UniversalUuid,
pending_tasks: &[TaskExecution],
) -> Result<(), ValidationError> {
for task_execution in pending_tasks {
let dependencies_satisfied = self.check_task_dependencies(task_execution).await?;
if dependencies_satisfied {
let trigger_rules_satisfied = self.evaluate_trigger_rules(task_execution).await?;
if trigger_rules_satisfied {
self.dal
.task_execution()
.mark_ready(task_execution.id)
.await?;
info!("Task ready: {} (workflow execution: {}, dependencies satisfied, trigger rules passed)",
task_execution.task_name, workflow_execution_id);
} else {
self.dal
.task_execution()
.mark_skipped(task_execution.id, "Trigger rules not satisfied")
.await?;
info!("Task skipped: {} (workflow execution: {}, dependencies satisfied, trigger rules failed)",
task_execution.task_name, workflow_execution_id);
}
}
}
Ok(())
}
pub async fn check_task_dependencies(
&self,
task_execution: &TaskExecution,
) -> Result<bool, ValidationError> {
let workflow_execution = self
.dal
.workflow_execution()
.get_by_id(task_execution.workflow_execution_id)
.await?;
let workflow = match self.runtime.get_workflow(&workflow_execution.workflow_name) {
Some(wf) => wf,
None => {
return Err(ValidationError::WorkflowNotFound(
workflow_execution.workflow_name.clone(),
));
}
};
let task_namespace = crate::task::TaskNamespace::from_string(&task_execution.task_name)
.map_err(ValidationError::InvalidTaskName)?;
let dependencies = workflow
.get_dependencies(&task_namespace)
.map_err(|e| ValidationError::InvalidTaskName(e.to_string()))?;
if dependencies.is_empty() {
return Ok(true);
}
let dependency_names = dependencies.iter().map(|ns| ns.to_string()).collect();
let status_map = self
.dal
.task_execution()
.get_task_statuses_batch(task_execution.workflow_execution_id, dependency_names)
.await?;
let all_satisfied = dependencies.iter().all(|dependency| {
status_map
.get(&dependency.to_string())
.map(|status| matches!(status.as_str(), "Completed" | "Failed" | "Skipped"))
.unwrap_or_else(|| {
warn!(
"Dependency task '{}' not found for task '{}'",
dependency, task_execution.task_name
);
false
})
});
Ok(all_satisfied)
}
pub async fn evaluate_trigger_rules(
&self,
task_execution: &TaskExecution,
) -> Result<bool, ValidationError> {
let trigger_rule: TriggerRule = serde_json::from_str(&task_execution.trigger_rules)
.map_err(|e| ValidationError::InvalidTriggerRule(e.to_string()))?;
match &trigger_rule {
TriggerRule::Always => {
debug!(
"Trigger rule evaluation: Always -> true (task: {})",
task_execution.task_name
);
Ok(true)
}
TriggerRule::All { conditions } => {
debug!(
"Trigger rule evaluation: All({} conditions) (task: {})",
conditions.len(),
task_execution.task_name
);
for (i, condition) in conditions.iter().enumerate() {
let condition_result =
self.evaluate_condition(condition, task_execution).await?;
debug!(
" └─ Condition {}: {:?} -> {}",
i + 1,
condition,
condition_result
);
if !condition_result {
debug!(
"Trigger rule result: All -> false (condition {} failed)",
i + 1
);
return Ok(false);
}
}
debug!("Trigger rule result: All -> true (all conditions passed)");
Ok(true)
}
TriggerRule::Any { conditions } => {
debug!(
"Trigger rule evaluation: Any({} conditions) (task: {})",
conditions.len(),
task_execution.task_name
);
for (i, condition) in conditions.iter().enumerate() {
let condition_result =
self.evaluate_condition(condition, task_execution).await?;
debug!(
" └─ Condition {}: {:?} -> {}",
i + 1,
condition,
condition_result
);
if condition_result {
debug!(
"Trigger rule result: Any -> true (condition {} passed)",
i + 1
);
return Ok(true);
}
}
debug!("Trigger rule result: Any -> false (no conditions passed)");
Ok(false)
}
TriggerRule::None { conditions } => {
debug!(
"Trigger rule evaluation: None({} conditions) (task: {})",
conditions.len(),
task_execution.task_name
);
for (i, condition) in conditions.iter().enumerate() {
let condition_result =
self.evaluate_condition(condition, task_execution).await?;
debug!(
" └─ Condition {}: {:?} -> {}",
i + 1,
condition,
condition_result
);
if condition_result {
debug!(
"Trigger rule result: None -> false (condition {} passed)",
i + 1
);
return Ok(false);
}
}
debug!("Trigger rule result: None -> true (no conditions passed)");
Ok(true)
}
}
}
async fn evaluate_condition(
&self,
condition: &TriggerCondition,
task_execution: &TaskExecution,
) -> Result<bool, ValidationError> {
match condition {
TriggerCondition::TaskSuccess { task_name } => {
tracing::debug!(
"[DEBUG] Scheduler evaluating TaskSuccess trigger rule: looking up task_name '{}' in workflow execution {}",
task_name, task_execution.workflow_execution_id
);
let status = self
.dal
.task_execution()
.get_task_status(task_execution.workflow_execution_id, task_name)
.await?;
let result = status == "Completed";
debug!(
" TaskSuccess('{}') -> {} (status: {})",
task_name, result, status
);
Ok(result)
}
TriggerCondition::TaskFailed { task_name } => {
tracing::debug!(
"[DEBUG] Scheduler evaluating TaskFailed trigger rule: looking up task_name '{}' in workflow execution {}",
task_name, task_execution.workflow_execution_id
);
let status = self
.dal
.task_execution()
.get_task_status(task_execution.workflow_execution_id, task_name)
.await?;
let result = status == "Failed";
debug!(
" TaskFailed('{}') -> {} (status: {})",
task_name, result, status
);
Ok(result)
}
TriggerCondition::TaskSkipped { task_name } => {
tracing::debug!(
"[DEBUG] Scheduler evaluating TaskSkipped trigger rule: looking up task_name '{}' in workflow execution {}",
task_name, task_execution.workflow_execution_id
);
let status = self
.dal
.task_execution()
.get_task_status(task_execution.workflow_execution_id, task_name)
.await?;
let result = status == "Skipped";
debug!(
" TaskSkipped('{}') -> {} (status: {})",
task_name, result, status
);
Ok(result)
}
TriggerCondition::ContextValue {
key,
operator,
value,
} => {
let context_manager = ContextManager::new(self.dal, self.runtime.clone());
let context = context_manager
.load_context_for_task(task_execution)
.await?;
let actual_value = context.get(key);
let result =
ContextManager::evaluate_context_condition(&context, key, operator, value)?;
debug!(
" ContextValue('{}', {:?}, {}) -> {} (actual: {:?})",
key, operator, value, result, actual_value
);
Ok(result)
}
}
}
}