use crate::error::Result;
use crate::events::SchedulerEvent;
use crate::projection::DueTask;
use crate::task_handler::{TaskContext, TaskHandlerRegistry};
use azoth::AzothDb;
use azoth_core::traits::{CanonicalStore, CanonicalTxn};
use chrono::Utc;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info};
pub async fn execute_task(
db: Arc<AzothDb>,
handler_registry: Arc<TaskHandlerRegistry>,
due_task: DueTask,
) -> Result<()> {
let execution_id = uuid::Uuid::new_v4().to_string();
let started_at = Utc::now().timestamp();
debug!(
task_id = %due_task.task_id,
execution_id = %execution_id,
"Starting task execution"
);
let handler = handler_registry.get(&due_task.task_type)?;
let ctx = TaskContext {
task_id: due_task.task_id.clone(),
execution_id: execution_id.clone(),
scheduled_at: Utc::now(),
execution_attempt: due_task.retry_count,
};
let task_id = due_task.task_id.clone();
let payload = due_task.payload.clone();
let timeout_secs = due_task.timeout_secs;
let result = tokio::time::timeout(
Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || handler.execute(&ctx, &payload)),
)
.await;
let completed_at = Utc::now().timestamp();
match result {
Ok(Ok(Ok(task_event))) => {
info!(
task_id = %task_id,
execution_id = %execution_id,
event_type = %task_event.event_type,
"Task executed successfully"
);
let mut txn = db.canonical().write_txn()?;
let event_id = txn.append_event(&task_event.payload)?;
let executed_event = SchedulerEvent::TaskExecuted {
task_id,
execution_id,
triggered_event_id: event_id,
started_at,
completed_at,
success: true,
error: None,
};
txn.append_event(&serde_json::to_vec(&executed_event)?)?;
txn.commit()?;
Ok(())
}
Ok(Ok(Err(handler_error))) => {
error!(
task_id = %task_id,
execution_id = %execution_id,
error = %handler_error,
"Task handler returned error"
);
let executed_event = SchedulerEvent::TaskExecuted {
task_id,
execution_id,
triggered_event_id: 0,
started_at,
completed_at,
success: false,
error: Some(handler_error.to_string()),
};
let mut txn = db.canonical().write_txn()?;
txn.append_event(&serde_json::to_vec(&executed_event)?)?;
txn.commit()?;
Ok(())
}
Ok(Err(join_error)) => {
error!(
task_id = %task_id,
execution_id = %execution_id,
error = %join_error,
"Task handler panicked"
);
let executed_event = SchedulerEvent::TaskExecuted {
task_id,
execution_id,
triggered_event_id: 0,
started_at,
completed_at,
success: false,
error: Some(format!("Task panicked: {}", join_error)),
};
let mut txn = db.canonical().write_txn()?;
txn.append_event(&serde_json::to_vec(&executed_event)?)?;
txn.commit()?;
Ok(())
}
Err(_timeout) => {
error!(
task_id = %task_id,
execution_id = %execution_id,
timeout_secs = timeout_secs,
"Task execution timed out"
);
let executed_event = SchedulerEvent::TaskExecuted {
task_id,
execution_id,
triggered_event_id: 0,
started_at,
completed_at,
success: false,
error: Some(format!("Task timed out after {} seconds", timeout_secs)),
};
let mut txn = db.canonical().write_txn()?;
txn.append_event(&serde_json::to_vec(&executed_event)?)?;
txn.commit()?;
Ok(())
}
}
}