use crate::actors::{Handler, Message, OrchestrationActor};
use crate::orchestration::lifecycle::step_enqueuer_services::{
StepEnqueuerService, StepEnqueuerServiceResult,
};
use async_trait::async_trait;
use std::sync::Arc;
use tasker_shared::system_context::SystemContext;
use tasker_shared::TaskerResult;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct ProcessBatchMessage;
impl Message for ProcessBatchMessage {
type Response = StepEnqueuerServiceResult;
}
#[derive(Debug)]
pub struct StepEnqueuerActor {
context: Arc<SystemContext>,
service: Arc<StepEnqueuerService>,
}
impl StepEnqueuerActor {
pub fn new(context: Arc<SystemContext>, service: Arc<StepEnqueuerService>) -> Self {
Self { context, service }
}
}
impl OrchestrationActor for StepEnqueuerActor {
fn name(&self) -> &'static str {
"StepEnqueuerActor"
}
fn context(&self) -> &Arc<SystemContext> {
&self.context
}
fn started(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"StepEnqueuerActor started - ready to process ready tasks and enqueue steps"
);
Ok(())
}
fn stopped(&mut self) -> TaskerResult<()> {
info!(
actor = %self.name(),
"StepEnqueuerActor stopped"
);
Ok(())
}
}
#[async_trait]
impl Handler<ProcessBatchMessage> for StepEnqueuerActor {
type Response = StepEnqueuerServiceResult;
async fn handle(&self, _msg: ProcessBatchMessage) -> TaskerResult<Self::Response> {
debug!(
actor = %self.name(),
"Processing batch of ready tasks for step enqueueing"
);
let result = self.service.process_batch().await?;
debug!(
actor = %self.name(),
tasks_processed = result.tasks_processed,
tasks_failed = result.tasks_failed,
cycle_duration_ms = result.cycle_duration_ms,
"Batch processing completed successfully"
);
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_step_enqueuer_actor_implements_traits() {
fn assert_orchestration_actor<T: OrchestrationActor>() {}
fn assert_handler<T: Handler<ProcessBatchMessage>>() {}
assert_orchestration_actor::<StepEnqueuerActor>();
assert_handler::<StepEnqueuerActor>();
}
#[test]
fn test_process_batch_message_implements_message() {
fn assert_message<T: Message>() {}
assert_message::<ProcessBatchMessage>();
}
}