forge-runtime 0.9.0

Runtime executors and gateway for the Forge framework
use std::sync::Arc;
use std::time::Duration;

use forge_core::CircuitBreakerClient;
use forge_core::job::{JobContext, ProgressUpdate};
use tokio::time::timeout;

use super::queue::{JobQueue, JobRecord};
use super::registry::{JobEntry, JobRegistry};

/// Executes jobs with timeout and retry handling.
pub struct JobExecutor {
    queue: JobQueue,
    registry: Arc<JobRegistry>,
    db_pool: sqlx::PgPool,
    http_client: CircuitBreakerClient,
}

impl JobExecutor {
    const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);

    /// Create a new job executor.
    pub fn new(queue: JobQueue, registry: JobRegistry, db_pool: sqlx::PgPool) -> Self {
        Self {
            queue,
            registry: Arc::new(registry),
            db_pool,
            http_client: CircuitBreakerClient::with_defaults(reqwest::Client::new()),
        }
    }

    /// Execute a claimed job.
    pub async fn execute(&self, job: &JobRecord) -> ExecutionResult {
        let entry = match self.registry.get(&job.job_type) {
            Some(e) => e,
            None => {
                return ExecutionResult::Failed {
                    error: format!("Unknown job type: {}", job.job_type),
                    retryable: false,
                };
            }
        };

        if matches!(job.status, forge_core::job::JobStatus::Cancelled) {
            return ExecutionResult::Cancelled {
                reason: Self::cancellation_reason(job, "Job cancelled"),
            };
        }

        // Mark job as running
        if let Err(e) = self.queue.start(job.id).await {
            if matches!(e, sqlx::Error::RowNotFound) {
                return ExecutionResult::Cancelled {
                    reason: Self::cancellation_reason(job, "Job cancelled"),
                };
            }
            return ExecutionResult::Failed {
                error: format!("Failed to start job: {}", e),
                retryable: true,
            };
        }

        // Set up progress channel
        let (progress_tx, progress_rx) = std::sync::mpsc::channel::<ProgressUpdate>();

        // Spawn task to consume progress updates and save to database
        // Use try_recv() with async sleep to avoid blocking the tokio runtime
        let progress_queue = self.queue.clone();
        let progress_job_id = job.id;
        tokio::spawn(async move {
            loop {
                match progress_rx.try_recv() {
                    Ok(update) => {
                        if let Err(e) = progress_queue
                            .update_progress(
                                progress_job_id,
                                update.percentage as i32,
                                &update.message,
                            )
                            .await
                        {
                            tracing::debug!(job_id = %progress_job_id, error = %e, "Failed to update job progress");
                        }
                    }
                    Err(std::sync::mpsc::TryRecvError::Empty) => {
                        // No message yet, sleep briefly and check again
                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    }
                    Err(std::sync::mpsc::TryRecvError::Disconnected) => {
                        // Sender dropped (job finished), exit loop
                        break;
                    }
                }
            }
        });

        // Create job context with progress channel
        let mut ctx = JobContext::new(
            job.id,
            job.job_type.clone(),
            job.attempts as u32,
            job.max_attempts as u32,
            self.db_pool.clone(),
            self.http_client.clone(),
        )
        .with_saved(job.job_context.clone())
        .with_progress(progress_tx);
        ctx.set_http_timeout(entry.info.http_timeout);

        // Keepalive heartbeat prevents stale cleanup from reclaiming healthy long jobs.
        let heartbeat_queue = self.queue.clone();
        let heartbeat_job_id = job.id;
        let (heartbeat_stop_tx, mut heartbeat_stop_rx) = tokio::sync::watch::channel(false);
        let heartbeat_task = tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = tokio::time::sleep(Self::HEARTBEAT_INTERVAL) => {
                        if let Err(e) = heartbeat_queue.heartbeat(heartbeat_job_id).await {
                            tracing::debug!(job_id = %heartbeat_job_id, error = %e, "Failed to update job heartbeat");
                        }
                    }
                    changed = heartbeat_stop_rx.changed() => {
                        if changed.is_err() || *heartbeat_stop_rx.borrow() {
                            break;
                        }
                    }
                }
            }
        });

        // Execute with timeout
        let job_timeout = entry.info.timeout;
        let exec_start = std::time::Instant::now();
        let result = timeout(job_timeout, self.run_handler(&entry, &ctx, &job.input)).await;
        let exec_duration_ms = exec_start.elapsed().as_millis() as i32;

        let _ = heartbeat_stop_tx.send(true);
        let _ = heartbeat_task.await;

        let ttl = entry.info.ttl;

        match result {
            Ok(Ok(output)) => {
                // Job completed successfully
                if let Err(e) = self.queue.complete(job.id, output.clone(), ttl).await {
                    tracing::debug!(job_id = %job.id, error = %e, "Failed to mark job as complete");
                }
                crate::signals::emit_server_execution(
                    &job.job_type,
                    "job",
                    exec_duration_ms,
                    true,
                    None,
                );
                ExecutionResult::Completed { output }
            }
            Ok(Err(e)) => {
                // Job failed
                let error_msg = e.to_string();
                // Accepts either an explicit cancellation error or a late cancellation request.
                let cancel_requested = match ctx.is_cancel_requested().await {
                    Ok(value) => value,
                    Err(err) => {
                        tracing::debug!(job_id = %job.id, error = %err, "Failed to check cancellation status");
                        false
                    }
                };
                if matches!(e, forge_core::ForgeError::JobCancelled(_)) || cancel_requested {
                    let reason = Self::cancellation_reason(job, "Job cancellation requested");
                    if let Err(e) = self.queue.cancel(job.id, Some(&reason), ttl).await {
                        tracing::debug!(job_id = %job.id, error = %e, "Failed to cancel job");
                    }
                    if let Err(e) = self
                        .run_compensation(&entry, &ctx, &job.input, &reason)
                        .await
                    {
                        tracing::error!(job_id = %job.id, error = %e, "Job compensation failed");
                    }
                    crate::signals::emit_server_execution(
                        &job.job_type,
                        "job",
                        exec_duration_ms,
                        false,
                        Some(format!("cancelled: {}", reason)),
                    );
                    return ExecutionResult::Cancelled { reason };
                }
                let should_retry = job.attempts < job.max_attempts;

                let retry_delay = if should_retry {
                    Some(entry.info.retry.calculate_backoff(job.attempts as u32))
                } else {
                    None
                };

                let chrono_delay = retry_delay.map(|d| {
                    chrono::Duration::from_std(d).unwrap_or(chrono::Duration::seconds(60))
                });

                if let Err(e) = self.queue.fail(job.id, &error_msg, chrono_delay, ttl).await {
                    tracing::error!(job_id = %job.id, error = %e, "Failed to record job failure");
                }

                crate::signals::emit_server_execution(
                    &job.job_type,
                    "job",
                    exec_duration_ms,
                    false,
                    Some(error_msg.clone()),
                );

                ExecutionResult::Failed {
                    error: error_msg,
                    retryable: should_retry,
                }
            }
            Err(_) => {
                // Timeout
                let error_msg = format!("Job timed out after {:?}", job_timeout);
                let should_retry = job.attempts < job.max_attempts;

                let retry_delay = if should_retry {
                    Some(chrono::Duration::seconds(60))
                } else {
                    None
                };

                if let Err(e) = self.queue.fail(job.id, &error_msg, retry_delay, ttl).await {
                    tracing::error!(job_id = %job.id, error = %e, "Failed to record job timeout");
                }

                crate::signals::emit_server_execution(
                    &job.job_type,
                    "job",
                    exec_duration_ms,
                    false,
                    Some(error_msg),
                );

                ExecutionResult::TimedOut {
                    retryable: should_retry,
                }
            }
        }
    }

    /// Run the job handler.
    async fn run_handler(
        &self,
        entry: &Arc<JobEntry>,
        ctx: &JobContext,
        input: &serde_json::Value,
    ) -> forge_core::Result<serde_json::Value> {
        (entry.handler)(ctx, input.clone()).await
    }

    async fn run_compensation(
        &self,
        entry: &Arc<JobEntry>,
        ctx: &JobContext,
        input: &serde_json::Value,
        reason: &str,
    ) -> forge_core::Result<()> {
        (entry.compensation)(ctx, input.clone(), reason).await
    }

    fn cancellation_reason(job: &JobRecord, fallback: &str) -> String {
        job.cancel_reason
            .clone()
            .unwrap_or_else(|| fallback.to_string())
    }
}

/// Result of job execution.
#[derive(Debug)]
pub enum ExecutionResult {
    /// Job completed successfully.
    Completed { output: serde_json::Value },
    /// Job failed.
    Failed { error: String, retryable: bool },
    /// Job timed out.
    TimedOut { retryable: bool },
    /// Job cancelled.
    Cancelled { reason: String },
}

impl ExecutionResult {
    /// Check if execution was successful.
    pub fn is_success(&self) -> bool {
        matches!(self, Self::Completed { .. })
    }

    /// Check if the job should be retried.
    pub fn should_retry(&self) -> bool {
        match self {
            Self::Failed { retryable, .. } => *retryable,
            Self::TimedOut { retryable } => *retryable,
            _ => false,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_execution_result_success() {
        let result = ExecutionResult::Completed {
            output: serde_json::json!({}),
        };
        assert!(result.is_success());
        assert!(!result.should_retry());
    }

    #[test]
    fn test_execution_result_failed_retryable() {
        let result = ExecutionResult::Failed {
            error: "test error".to_string(),
            retryable: true,
        };
        assert!(!result.is_success());
        assert!(result.should_retry());
    }

    #[test]
    fn test_execution_result_failed_not_retryable() {
        let result = ExecutionResult::Failed {
            error: "test error".to_string(),
            retryable: false,
        };
        assert!(!result.is_success());
        assert!(!result.should_retry());
    }

    #[test]
    fn test_execution_result_timeout() {
        let result = ExecutionResult::TimedOut { retryable: true };
        assert!(!result.is_success());
        assert!(result.should_retry());
    }

    #[test]
    fn test_execution_result_cancelled() {
        let result = ExecutionResult::Cancelled {
            reason: "user request".to_string(),
        };
        assert!(!result.is_success());
        assert!(!result.should_retry());
    }
}