use crate::actors::{Handler, Message, OrchestrationActor};
use crate::orchestration::lifecycle::DecisionPointService;
use async_trait::async_trait;
use sqlx::types::Uuid;
use std::sync::Arc;
use tasker_shared::messaging::DecisionPointOutcome;
use tasker_shared::system_context::SystemContext;
use tasker_shared::TaskerResult;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct ProcessDecisionPointMessage {
pub workflow_step_uuid: Uuid,
pub task_uuid: Uuid,
pub outcome: DecisionPointOutcome,
}
impl Message for ProcessDecisionPointMessage {
type Response = DecisionPointProcessingResult;
}
#[derive(Debug, Clone, PartialEq)]
pub enum DecisionPointProcessingResult {
NoStepsCreated,
StepsCreated {
step_names: Vec<String>,
count: usize,
},
}
#[derive(Debug)]
pub struct DecisionPointActor {
context: Arc<SystemContext>,
service: Arc<DecisionPointService>,
}
impl DecisionPointActor {
pub fn new(context: Arc<SystemContext>, service: Arc<DecisionPointService>) -> Self {
Self { context, service }
}
}
impl OrchestrationActor for DecisionPointActor {
fn name(&self) -> &'static str {
"DecisionPointActor"
}
fn context(&self) -> &Arc<SystemContext> {
&self.context
}
fn started(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"DecisionPointActor started - ready to process decision outcomes"
);
Ok(())
}
fn stopped(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"DecisionPointActor stopped"
);
Ok(())
}
}
#[async_trait]
impl Handler<ProcessDecisionPointMessage> for DecisionPointActor {
type Response = DecisionPointProcessingResult;
async fn handle(&self, msg: ProcessDecisionPointMessage) -> TaskerResult<Self::Response> {
debug!(
actor = %self.name(),
workflow_step_uuid = %msg.workflow_step_uuid,
task_uuid = %msg.task_uuid,
requires_creation = msg.outcome.requires_step_creation(),
step_count = msg.outcome.step_names().len(),
"Processing decision point outcome"
);
if !msg.outcome.requires_step_creation() {
debug!(
actor = %self.name(),
workflow_step_uuid = %msg.workflow_step_uuid,
"No steps to create (NoBranches outcome)"
);
return Ok(DecisionPointProcessingResult::NoStepsCreated);
}
let step_mapping = self
.service
.process_decision_outcome(msg.workflow_step_uuid, msg.task_uuid, msg.outcome.clone())
.await
.map_err(|e| tasker_shared::TaskerError::OrchestrationError(e.to_string()))?;
let step_names: Vec<String> = step_mapping.keys().cloned().collect();
let count = step_names.len();
debug!(
actor = %self.name(),
workflow_step_uuid = %msg.workflow_step_uuid,
task_uuid = %msg.task_uuid,
steps_created = count,
"Decision point processing complete"
);
Ok(DecisionPointProcessingResult::StepsCreated { step_names, count })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_decision_point_actor_implements_traits() {
fn assert_orchestration_actor<T: OrchestrationActor>() {}
fn assert_handler<T: Handler<ProcessDecisionPointMessage>>() {}
assert_orchestration_actor::<DecisionPointActor>();
assert_handler::<DecisionPointActor>();
}
#[test]
fn test_process_decision_point_message_implements_message() {
fn assert_message<T: Message>() {}
assert_message::<ProcessDecisionPointMessage>();
}
#[test]
fn test_decision_point_processing_result_equality() {
let result1 = DecisionPointProcessingResult::NoStepsCreated;
let result2 = DecisionPointProcessingResult::NoStepsCreated;
assert_eq!(result1, result2);
let result3 = DecisionPointProcessingResult::StepsCreated {
step_names: vec!["branch_a".to_string()],
count: 1,
};
let result4 = DecisionPointProcessingResult::StepsCreated {
step_names: vec!["branch_a".to_string()],
count: 1,
};
assert_eq!(result3, result4);
}
#[tokio::test]
async fn test_no_branches_outcome_processing() {
let context = Arc::new(SystemContext::new_for_orchestration().await.unwrap());
let service = Arc::new(DecisionPointService::new(context.clone()));
let mut actor = DecisionPointActor::new(context, service);
actor.started().unwrap();
let msg = ProcessDecisionPointMessage {
workflow_step_uuid: Uuid::new_v4(),
task_uuid: Uuid::new_v4(),
outcome: DecisionPointOutcome::no_branches(),
};
let result = actor.handle(msg).await.unwrap();
assert_eq!(result, DecisionPointProcessingResult::NoStepsCreated);
actor.stopped().unwrap();
}
}