use std::sync::Arc;
use std::time::Duration;
use forge_core::CircuitBreakerClient;
use forge_core::job::{JobContext, ProgressUpdate};
use tokio::time::timeout;
use super::queue::{JobQueue, JobRecord};
use super::registry::{JobEntry, JobRegistry};
pub struct JobExecutor {
queue: JobQueue,
registry: Arc<JobRegistry>,
db_pool: sqlx::PgPool,
http_client: CircuitBreakerClient,
}
impl JobExecutor {
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
pub fn new(queue: JobQueue, registry: JobRegistry, db_pool: sqlx::PgPool) -> Self {
Self {
queue,
registry: Arc::new(registry),
db_pool,
http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
}
}
pub async fn execute(&self, job: &JobRecord) -> ExecutionResult {
let entry = match self.registry.get(&job.job_type) {
Some(e) => e,
None => {
return ExecutionResult::Failed {
error: format!("Unknown job type: {}", job.job_type),
retryable: false,
};
}
};
if matches!(job.status, forge_core::job::JobStatus::Cancelled) {
return ExecutionResult::Cancelled {
reason: Self::cancellation_reason(job, "Job cancelled"),
};
}
if let Err(e) = self.queue.start(job.id).await {
if matches!(e, sqlx::Error::RowNotFound) {
return ExecutionResult::Cancelled {
reason: Self::cancellation_reason(job, "Job cancelled"),
};
}
return ExecutionResult::Failed {
error: format!("Failed to start job: {}", e),
retryable: true,
};
}
let (progress_tx, progress_rx) = std::sync::mpsc::channel::<ProgressUpdate>();
let progress_queue = self.queue.clone();
let progress_job_id = job.id;
tokio::spawn(async move {
loop {
match progress_rx.try_recv() {
Ok(update) => {
if let Err(e) = progress_queue
.update_progress(
progress_job_id,
update.percentage as i32,
&update.message,
)
.await
{
tracing::debug!(job_id = %progress_job_id, error = %e, "Failed to update job progress");
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
break;
}
}
}
});
let mut ctx = JobContext::new(
job.id,
job.job_type.clone(),
job.attempts as u32,
job.max_attempts as u32,
self.db_pool.clone(),
self.http_client.clone(),
)
.with_saved(job.job_context.clone())
.with_progress(progress_tx);
ctx.set_http_timeout(entry.info.http_timeout);
let heartbeat_queue = self.queue.clone();
let heartbeat_job_id = job.id;
let (heartbeat_stop_tx, mut heartbeat_stop_rx) = tokio::sync::watch::channel(false);
let heartbeat_task = tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(Self::HEARTBEAT_INTERVAL) => {
if let Err(e) = heartbeat_queue.heartbeat(heartbeat_job_id).await {
tracing::debug!(job_id = %heartbeat_job_id, error = %e, "Failed to update job heartbeat");
}
}
changed = heartbeat_stop_rx.changed() => {
if changed.is_err() || *heartbeat_stop_rx.borrow() {
break;
}
}
}
}
});
let job_timeout = entry.info.timeout;
let exec_start = std::time::Instant::now();
let result = timeout(job_timeout, self.run_handler(&entry, &ctx, &job.input)).await;
let exec_duration_ms = exec_start.elapsed().as_millis() as i32;
let _ = heartbeat_stop_tx.send(true);
let _ = heartbeat_task.await;
let ttl = entry.info.ttl;
match result {
Ok(Ok(output)) => {
if let Err(e) = self.queue.complete(job.id, output.clone(), ttl).await {
tracing::debug!(job_id = %job.id, error = %e, "Failed to mark job as complete");
}
crate::signals::emit_server_execution(
&job.job_type,
"job",
exec_duration_ms,
true,
None,
);
ExecutionResult::Completed { output }
}
Ok(Err(e)) => {
let error_msg = e.to_string();
let cancel_requested = match ctx.is_cancel_requested().await {
Ok(value) => value,
Err(err) => {
tracing::debug!(job_id = %job.id, error = %err, "Failed to check cancellation status");
false
}
};
if matches!(e, forge_core::ForgeError::JobCancelled(_)) || cancel_requested {
let reason = Self::cancellation_reason(job, "Job cancellation requested");
if let Err(e) = self.queue.cancel(job.id, Some(&reason), ttl).await {
tracing::debug!(job_id = %job.id, error = %e, "Failed to cancel job");
}
if let Err(e) = self
.run_compensation(&entry, &ctx, &job.input, &reason)
.await
{
tracing::error!(job_id = %job.id, error = %e, "Job compensation failed");
}
crate::signals::emit_server_execution(
&job.job_type,
"job",
exec_duration_ms,
false,
Some(format!("cancelled: {}", reason)),
);
return ExecutionResult::Cancelled { reason };
}
let should_retry = job.attempts < job.max_attempts;
let retry_delay = if should_retry {
Some(entry.info.retry.calculate_backoff(job.attempts as u32))
} else {
None
};
let chrono_delay = retry_delay.map(|d| {
chrono::Duration::from_std(d).unwrap_or(chrono::Duration::seconds(60))
});
if let Err(e) = self.queue.fail(job.id, &error_msg, chrono_delay, ttl).await {
tracing::error!(job_id = %job.id, error = %e, "Failed to record job failure");
}
crate::signals::emit_server_execution(
&job.job_type,
"job",
exec_duration_ms,
false,
Some(error_msg.clone()),
);
ExecutionResult::Failed {
error: error_msg,
retryable: should_retry,
}
}
Err(_) => {
let error_msg = format!("Job timed out after {:?}", job_timeout);
let should_retry = job.attempts < job.max_attempts;
let retry_delay = if should_retry {
Some(chrono::Duration::seconds(60))
} else {
None
};
if let Err(e) = self.queue.fail(job.id, &error_msg, retry_delay, ttl).await {
tracing::error!(job_id = %job.id, error = %e, "Failed to record job timeout");
}
crate::signals::emit_server_execution(
&job.job_type,
"job",
exec_duration_ms,
false,
Some(error_msg),
);
ExecutionResult::TimedOut {
retryable: should_retry,
}
}
}
}
async fn run_handler(
&self,
entry: &Arc<JobEntry>,
ctx: &JobContext,
input: &serde_json::Value,
) -> forge_core::Result<serde_json::Value> {
(entry.handler)(ctx, input.clone()).await
}
async fn run_compensation(
&self,
entry: &Arc<JobEntry>,
ctx: &JobContext,
input: &serde_json::Value,
reason: &str,
) -> forge_core::Result<()> {
(entry.compensation)(ctx, input.clone(), reason).await
}
fn cancellation_reason(job: &JobRecord, fallback: &str) -> String {
job.cancel_reason
.clone()
.unwrap_or_else(|| fallback.to_string())
}
}
#[derive(Debug)]
pub enum ExecutionResult {
Completed { output: serde_json::Value },
Failed { error: String, retryable: bool },
TimedOut { retryable: bool },
Cancelled { reason: String },
}
impl ExecutionResult {
pub fn is_success(&self) -> bool {
matches!(self, Self::Completed { .. })
}
pub fn should_retry(&self) -> bool {
match self {
Self::Failed { retryable, .. } => *retryable,
Self::TimedOut { retryable } => *retryable,
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_execution_result_success() {
let result = ExecutionResult::Completed {
output: serde_json::json!({}),
};
assert!(result.is_success());
assert!(!result.should_retry());
}
#[test]
fn test_execution_result_failed_retryable() {
let result = ExecutionResult::Failed {
error: "test error".to_string(),
retryable: true,
};
assert!(!result.is_success());
assert!(result.should_retry());
}
#[test]
fn test_execution_result_failed_not_retryable() {
let result = ExecutionResult::Failed {
error: "test error".to_string(),
retryable: false,
};
assert!(!result.is_success());
assert!(!result.should_retry());
}
#[test]
fn test_execution_result_timeout() {
let result = ExecutionResult::TimedOut { retryable: true };
assert!(!result.is_success());
assert!(result.should_retry());
}
#[test]
fn test_execution_result_cancelled() {
let result = ExecutionResult::Cancelled {
reason: "user request".to_string(),
};
assert!(!result.is_success());
assert!(!result.should_retry());
}
}