Skip to main content

mockforge_registry_server/
run_queue.rs

1//! Helpers for pushing test_run jobs onto the Redis queue that the
2//! `mockforge-test-runner` worker consumes.
3//!
4//! The runner crate owns the wire format — see
5//! `mockforge_test_runner::queue::QueuedJobDescriptor` — but we don't
6//! depend on it here (registry-server → runner would be a backwards
7//! dep). Instead this module produces the same JSON shape inline so a
8//! drift between the two would be a serde test failure on the runner
9//! side, not a build break.
10
11use redis::AsyncCommands;
12use serde::Serialize;
13use uuid::Uuid;
14
15use crate::redis::RedisPool;
16
17/// Default queue key. Mirrors the runner's `MOCKFORGE_RUNNER_QUEUE_KEY`
18/// default. Override at boot via env var if multiple runner pools share
19/// one Redis (e.g., dev + staging).
20pub const DEFAULT_QUEUE_KEY: &str = "test_runs:queued";
21
22/// JSON shape the runner expects on the wire. Keep in sync with
23/// `mockforge_test_runner::queue::QueuedJobDescriptor`.
24#[derive(Debug, Serialize)]
25pub struct EnqueuedJob<'a> {
26    pub run_id: Uuid,
27    pub org_id: Uuid,
28    pub source_id: Uuid,
29    pub kind: &'a str,
30    pub payload: serde_json::Value,
31}
32
33/// Push a job descriptor onto the runner's queue. When Redis isn't
34/// configured (e.g., local dev without a Redis container) this logs a
35/// warning and returns Ok — the test_runs row still exists, it just
36/// never gets picked up. That matches the existing pattern in other
37/// places that treat Redis as optional.
38pub async fn enqueue(redis: Option<&RedisPool>, job: EnqueuedJob<'_>) -> redis::RedisResult<()> {
39    let Some(redis) = redis else {
40        tracing::warn!(
41            run_id = %job.run_id,
42            kind = job.kind,
43            "Redis not configured — test_run will sit in 'queued' until a runner connects",
44        );
45        return Ok(());
46    };
47
48    let raw = match serde_json::to_string(&job) {
49        Ok(s) => s,
50        Err(e) => {
51            tracing::error!(error = %e, "failed to serialize queue job; not enqueueing");
52            return Ok(());
53        }
54    };
55
56    // ConnectionManager is Clone, so deref the Arc + clone for the
57    // mutable handle redis async commands need.
58    let mut conn = (*redis.get_connection()).clone();
59    let _: () = conn.lpush(DEFAULT_QUEUE_KEY, raw).await?;
60    Ok(())
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use serde_json::json;
67
68    #[test]
69    fn enqueued_job_serializes_to_runner_wire_format() {
70        // The runner deserializes via:
71        //   #[derive(Deserialize)]
72        //   struct QueuedJobDescriptor {
73        //       run_id: Uuid, org_id: Uuid, source_id: Uuid,
74        //       kind: String, payload: serde_json::Value,
75        //   }
76        // Confirm we produce exactly that.
77        let run_id = Uuid::new_v4();
78        let org_id = Uuid::new_v4();
79        let source_id = Uuid::new_v4();
80        let job = EnqueuedJob {
81            run_id,
82            org_id,
83            source_id,
84            kind: "unit",
85            payload: json!({ "synthetic_steps": 5 }),
86        };
87        let raw = serde_json::to_string(&job).unwrap();
88        let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap();
89        assert_eq!(parsed["run_id"], run_id.to_string());
90        assert_eq!(parsed["org_id"], org_id.to_string());
91        assert_eq!(parsed["source_id"], source_id.to_string());
92        assert_eq!(parsed["kind"], "unit");
93        assert_eq!(parsed["payload"]["synthetic_steps"], 5);
94    }
95}