mockforge_registry_server/run_queue.rs
1//! Helpers for pushing test_run jobs onto the Redis queue that the
2//! `mockforge-test-runner` worker consumes.
3//!
4//! The runner crate owns the wire format — see
5//! `mockforge_test_runner::queue::QueuedJobDescriptor` — but we don't
6//! depend on it here (registry-server → runner would be a backwards
7//! dep). Instead this module produces the same JSON shape inline so a
8//! drift between the two would be a serde test failure on the runner
9//! side, not a build break.
10
11use redis::AsyncCommands;
12use serde::Serialize;
13use uuid::Uuid;
14
15use crate::redis::RedisPool;
16
17/// Default queue key. Mirrors the runner's `MOCKFORGE_RUNNER_QUEUE_KEY`
18/// default. Override at boot via env var if multiple runner pools share
19/// one Redis (e.g., dev + staging).
20pub const DEFAULT_QUEUE_KEY: &str = "test_runs:queued";
21
22/// JSON shape the runner expects on the wire. Keep in sync with
23/// `mockforge_test_runner::queue::QueuedJobDescriptor`.
24#[derive(Debug, Serialize)]
25pub struct EnqueuedJob<'a> {
26 pub run_id: Uuid,
27 pub org_id: Uuid,
28 pub source_id: Uuid,
29 pub kind: &'a str,
30 pub payload: serde_json::Value,
31}
32
33/// Push a job descriptor onto the runner's queue. When Redis isn't
34/// configured (e.g., local dev without a Redis container) this logs a
35/// warning and returns Ok — the test_runs row still exists, it just
36/// never gets picked up. That matches the existing pattern in other
37/// places that treat Redis as optional.
38pub async fn enqueue(redis: Option<&RedisPool>, job: EnqueuedJob<'_>) -> redis::RedisResult<()> {
39 let Some(redis) = redis else {
40 tracing::warn!(
41 run_id = %job.run_id,
42 kind = job.kind,
43 "Redis not configured — test_run will sit in 'queued' until a runner connects",
44 );
45 return Ok(());
46 };
47
48 let raw = match serde_json::to_string(&job) {
49 Ok(s) => s,
50 Err(e) => {
51 tracing::error!(error = %e, "failed to serialize queue job; not enqueueing");
52 return Ok(());
53 }
54 };
55
56 // ConnectionManager is Clone, so deref the Arc + clone for the
57 // mutable handle redis async commands need.
58 let mut conn = (*redis.get_connection()).clone();
59 let _: () = conn.lpush(DEFAULT_QUEUE_KEY, raw).await?;
60 Ok(())
61}
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66 use serde_json::json;
67
68 #[test]
69 fn enqueued_job_serializes_to_runner_wire_format() {
70 // The runner deserializes via:
71 // #[derive(Deserialize)]
72 // struct QueuedJobDescriptor {
73 // run_id: Uuid, org_id: Uuid, source_id: Uuid,
74 // kind: String, payload: serde_json::Value,
75 // }
76 // Confirm we produce exactly that.
77 let run_id = Uuid::new_v4();
78 let org_id = Uuid::new_v4();
79 let source_id = Uuid::new_v4();
80 let job = EnqueuedJob {
81 run_id,
82 org_id,
83 source_id,
84 kind: "unit",
85 payload: json!({ "synthetic_steps": 5 }),
86 };
87 let raw = serde_json::to_string(&job).unwrap();
88 let parsed: serde_json::Value = serde_json::from_str(&raw).unwrap();
89 assert_eq!(parsed["run_id"], run_id.to_string());
90 assert_eq!(parsed["org_id"], org_id.to_string());
91 assert_eq!(parsed["source_id"], source_id.to_string());
92 assert_eq!(parsed["kind"], "unit");
93 assert_eq!(parsed["payload"]["synthetic_steps"], 5);
94 }
95}