use crate::actors::{Handler, Message, OrchestrationActor};
use crate::orchestration::lifecycle::result_processing::OrchestrationResultProcessor;
use async_trait::async_trait;
use std::sync::Arc;
use tasker_shared::system_context::SystemContext;
use tasker_shared::StepExecutionResult;
use tasker_shared::TaskerResult;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct ProcessStepResultMessage {
pub result: StepExecutionResult,
}
impl Message for ProcessStepResultMessage {
type Response = ();
}
#[derive(Debug)]
pub struct ResultProcessorActor {
context: Arc<SystemContext>,
service: Arc<OrchestrationResultProcessor>,
}
impl ResultProcessorActor {
pub fn new(context: Arc<SystemContext>, service: Arc<OrchestrationResultProcessor>) -> Self {
Self { context, service }
}
}
impl OrchestrationActor for ResultProcessorActor {
fn name(&self) -> &'static str {
"ResultProcessorActor"
}
fn context(&self) -> &Arc<SystemContext> {
&self.context
}
fn started(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"ResultProcessorActor started - ready to process step results"
);
Ok(())
}
fn stopped(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"ResultProcessorActor stopped"
);
Ok(())
}
}
#[async_trait]
impl Handler<ProcessStepResultMessage> for ResultProcessorActor {
type Response = ();
async fn handle(&self, msg: ProcessStepResultMessage) -> TaskerResult<Self::Response> {
debug!(
actor = %self.name(),
step_uuid = %msg.result.step_uuid,
status = %msg.result.status,
execution_time_ms = msg.result.metadata.execution_time_ms,
has_orchestration_metadata = msg.result.orchestration_metadata.is_some(),
"Processing step result message"
);
self.service
.handle_step_execution_result(&msg.result)
.await
.map_err(|e| tasker_shared::TaskerError::OrchestrationError(e.to_string()))?;
debug!(
actor = %self.name(),
step_uuid = %msg.result.step_uuid,
status = %msg.result.status,
"Step result processed successfully"
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_result_processor_actor_implements_traits() {
fn assert_orchestration_actor<T: OrchestrationActor>() {}
fn assert_handler<T: Handler<ProcessStepResultMessage>>() {}
assert_orchestration_actor::<ResultProcessorActor>();
assert_handler::<ResultProcessorActor>();
}
#[test]
fn test_process_step_result_message_implements_message() {
fn assert_message<T: Message>() {}
assert_message::<ProcessStepResultMessage>();
}
}