rrq-runner 0.11.1

RRQ runner runtime for Rust.
Documentation
use async_trait::async_trait;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use std::time::Instant;

use tracing::Instrument;

use crate::telemetry::{NoopTelemetry, Telemetry};
use crate::types::{ExecutionOutcome, ExecutionRequest, OutcomeStatus};

#[async_trait]
pub trait Handler: Send + Sync {
    async fn handle(&self, request: ExecutionRequest) -> ExecutionOutcome;
}

struct FnHandler<F>(F);

#[async_trait]
impl<F, Fut> Handler for FnHandler<F>
where
    F: Fn(ExecutionRequest) -> Fut + Send + Sync,
    Fut: Future<Output = ExecutionOutcome> + Send,
{
    async fn handle(&self, request: ExecutionRequest) -> ExecutionOutcome {
        (self.0)(request).await
    }
}

#[derive(Clone, Default)]
pub struct Registry {
    handlers: HashMap<String, Arc<dyn Handler>>,
}

impl Registry {
    #[must_use]
    pub fn new() -> Self {
        Self {
            handlers: HashMap::new(),
        }
    }

    pub fn register<F, Fut>(&mut self, name: impl Into<String>, handler: F)
    where
        F: Fn(ExecutionRequest) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = ExecutionOutcome> + Send + 'static,
    {
        let handler = Arc::new(FnHandler(handler)) as Arc<dyn Handler>;
        self.handlers.insert(name.into(), handler);
    }

    #[must_use]
    pub fn get(&self, name: &str) -> Option<Arc<dyn Handler>> {
        self.handlers.get(name).cloned()
    }

    pub async fn execute(&self, request: ExecutionRequest) -> ExecutionOutcome {
        let telemetry = NoopTelemetry;
        self.execute_with(request, &telemetry).await
    }

    pub async fn execute_with<T: Telemetry + ?Sized>(
        &self,
        request: ExecutionRequest,
        telemetry: &T,
    ) -> ExecutionOutcome {
        let span = telemetry.runner_span(&request);
        let start = Instant::now();
        let function_name = request.function_name.clone();
        let job_id = request.job_id.clone();
        let request_id = request.request_id.clone();
        let mut outcome = match self.get(&function_name) {
            Some(handler) => handler.handle(request).instrument(span.clone()).await,
            None => ExecutionOutcome::handler_not_found(
                job_id.clone(),
                request_id.clone(),
                format!("No handler registered for function '{function_name}'"),
            ),
        };
        if outcome.job_id.is_none() {
            outcome.job_id = Some(job_id.clone());
        }
        if outcome.request_id.is_none() {
            outcome.request_id = Some(request_id.clone());
        }
        record_outcome(&span, function_name.as_str(), &outcome, start.elapsed());
        outcome
    }
}

fn record_outcome(
    span: &tracing::Span,
    function_name: &str,
    outcome: &ExecutionOutcome,
    duration: std::time::Duration,
) {
    let duration_ms = duration.as_secs_f64() * 1000.0;
    crate::telemetry::record_job_outcome(function_name, outcome.status.clone(), duration);
    span.record("rrq.duration_ms", duration_ms);
    match outcome.status {
        OutcomeStatus::Success => {
            span.record("rrq.outcome", "success");
        }
        OutcomeStatus::Retry => {
            span.record("rrq.outcome", "retry");
            if let Some(delay) = outcome.retry_after_seconds {
                span.record("rrq.retry_delay_ms", delay * 1000.0);
            }
        }
        OutcomeStatus::Timeout => {
            span.record("rrq.outcome", "timeout");
        }
        OutcomeStatus::Error => {
            span.record("rrq.outcome", "error");
        }
    }
    if let Some(error) = outcome.error.as_ref() {
        span.record("rrq.error_message", error.message.as_str());
        if let Some(error_type) = error.error_type.as_deref() {
            span.record("rrq.error_type", error_type);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::{ExecutionOutcome, ExecutionRequest, OutcomeStatus};
    use serde_json::json;

    #[tokio::test]
    async fn registry_invokes_handler() {
        let mut registry = Registry::new();
        registry.register("echo", |request| async move {
            ExecutionOutcome::success(
                request.job_id.clone(),
                request.request_id.clone(),
                json!({ "job_id": request.job_id }),
            )
        });

        let handler = registry.get("echo").expect("handler not found");
        let payload = json!({
            "job_id": "job-1",
            "request_id": "req-1",
            "function_name": "echo",
            "args": [],
            "kwargs": {},
            "context": {
                "job_id": "job-1",
                "attempt": 1,
                "enqueue_time": "2024-01-01T00:00:00Z",
                "queue_name": "default",
                "deadline": null,
                "trace_context": null,
                "worker_id": null
            }
        });
        let request: ExecutionRequest = serde_json::from_value(payload).unwrap();

        let outcome = handler.handle(request).await;
        assert_eq!(outcome.status, OutcomeStatus::Success);
        assert_eq!(outcome.result, Some(json!({ "job_id": "job-1" })));
    }

    #[tokio::test]
    async fn registry_execute_with_noop_telemetry() {
        let mut registry = Registry::new();
        registry.register("echo", |request| async move {
            ExecutionOutcome::success(
                request.job_id.clone(),
                request.request_id.clone(),
                json!({ "job_id": request.job_id }),
            )
        });

        let payload = json!({
            "job_id": "job-2",
            "request_id": "req-2",
            "function_name": "echo",
            "args": [],
            "kwargs": {},
            "context": {
                "job_id": "job-2",
                "attempt": 1,
                "enqueue_time": "2024-01-01T00:00:00Z",
                "queue_name": "default",
                "deadline": null,
                "trace_context": null,
                "worker_id": null
            }
        });
        let request: ExecutionRequest = serde_json::from_value(payload).unwrap();

        let outcome = registry.execute_with(request, &NoopTelemetry).await;
        assert_eq!(outcome.status, OutcomeStatus::Success);
    }

    #[tokio::test]
    async fn registry_execute_handler_not_found() {
        let registry = Registry::new();
        let payload = json!({
            "job_id": "job-3",
            "request_id": "req-3",
            "function_name": "missing",
            "args": [],
            "kwargs": {},
            "context": {
                "job_id": "job-3",
                "attempt": 1,
                "enqueue_time": "2024-01-01T00:00:00Z",
                "queue_name": "default",
                "deadline": null,
                "trace_context": null,
                "worker_id": null
            }
        });
        let request: ExecutionRequest = serde_json::from_value(payload).unwrap();

        let outcome = registry.execute_with(request, &NoopTelemetry).await;
        assert_eq!(outcome.status, OutcomeStatus::Error);
        assert_eq!(
            outcome
                .error
                .as_ref()
                .and_then(|error| error.error_type.as_deref()),
            Some("handler_not_found")
        );
    }
}