use crate::actors::{Handler, Message, OrchestrationActor};
use crate::orchestration::lifecycle::batch_processing::BatchProcessingService;
use async_trait::async_trait;
use sqlx::types::Uuid;
use std::sync::Arc;
use tasker_shared::messaging::execution_types::BatchProcessingOutcome;
use tasker_shared::models::WorkflowStep;
use tasker_shared::system_context::SystemContext;
use tasker_shared::StepExecutionResult;
use tasker_shared::TaskerResult;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct ProcessBatchableStepMessage {
pub task_uuid: Uuid,
pub batchable_step: WorkflowStep,
pub step_result: StepExecutionResult,
}
impl Message for ProcessBatchableStepMessage {
type Response = BatchProcessingOutcome;
}
#[derive(Debug)]
pub struct BatchProcessingActor {
context: Arc<SystemContext>,
service: Arc<BatchProcessingService>,
}
impl BatchProcessingActor {
pub fn new(context: Arc<SystemContext>, service: Arc<BatchProcessingService>) -> Self {
Self { context, service }
}
}
impl OrchestrationActor for BatchProcessingActor {
fn name(&self) -> &'static str {
"BatchProcessingActor"
}
fn context(&self) -> &Arc<SystemContext> {
&self.context
}
fn started(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"BatchProcessingActor started - ready to process batchable steps"
);
Ok(())
}
fn stopped(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"BatchProcessingActor stopped"
);
Ok(())
}
}
#[async_trait]
impl Handler<ProcessBatchableStepMessage> for BatchProcessingActor {
type Response = BatchProcessingOutcome;
async fn handle(&self, msg: ProcessBatchableStepMessage) -> TaskerResult<Self::Response> {
debug!(
actor = %self.name(),
task_uuid = %msg.task_uuid,
step_uuid = %msg.batchable_step.workflow_step_uuid,
"Processing batchable step message"
);
let outcome = self
.service
.process_batchable_step(msg.task_uuid, &msg.batchable_step, &msg.step_result)
.await
.map_err(|e| tasker_shared::TaskerError::OrchestrationError(e.to_string()))?;
debug!(
actor = %self.name(),
task_uuid = %msg.task_uuid,
step_uuid = %msg.batchable_step.workflow_step_uuid,
requires_workers = outcome.requires_worker_creation(),
"Batchable step processed successfully"
);
Ok(outcome)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_processing_actor_implements_traits() {
fn assert_orchestration_actor<T: OrchestrationActor>() {}
fn assert_handler<T: Handler<ProcessBatchableStepMessage>>() {}
assert_orchestration_actor::<BatchProcessingActor>();
assert_handler::<BatchProcessingActor>();
}
#[test]
fn test_process_batchable_step_message_implements_message() {
fn assert_message<T: Message>() {}
assert_message::<ProcessBatchableStepMessage>();
}
}