stormchaser_engine/handler/
runner.rs1#![allow(clippy::explicit_auto_deref)]
2use anyhow::{Context, Result};
3use serde_json::Value;
4use sqlx::PgPool;
5use stormchaser_model::runner::RunnerStatus;
6use tracing::{debug, error, info};
7
8pub async fn handle_runner_registration(payload: Value, pool: PgPool) -> Result<()> {
10 let runner_id = payload["runner_id"].as_str().context("Missing runner_id")?;
11 let runner_type = payload["runner_type"]
12 .as_str()
13 .context("Missing runner_type")?;
14 let protocol_version = payload["protocol_version"]
15 .as_str()
16 .context("Missing protocol_version")?;
17 let nats_subject = payload["nats_subject"]
18 .as_str()
19 .context("Missing nats_subject")?;
20
21 let capabilities: Vec<String> = if let Some(caps) = payload["capabilities"].as_array() {
22 caps.iter()
23 .filter_map(|c| c.as_str().map(|s| s.to_string()))
24 .collect()
25 } else {
26 Vec::new()
27 };
28
29 info!("Registering runner: {} (type: {})", runner_id, runner_type);
30
31 let mut step_defs = Vec::new();
32 if let Some(step_types) = payload["step_types"].as_array() {
33 for st in step_types {
34 let step_type = st["step_type"]
35 .as_str()
36 .context("Missing step_type in step_types array")?;
37 let schema = st["schema"].clone();
38 let documentation = st["documentation"].as_str();
39
40 step_defs.push(crate::db::StepDefinitionInput {
41 step_type: step_type.to_string(),
42 schema,
43 documentation: documentation.map(|s| s.to_string()),
44 });
45 }
46 }
47
48 let mut conn = pool.acquire().await?;
49 crate::db::register_runner_with_steps(
50 &mut *conn,
51 runner_id,
52 runner_type,
53 protocol_version,
54 &capabilities,
55 nats_subject,
56 step_defs,
57 )
58 .await
59 .with_context(|| format!("Failed to register runner {} with steps", runner_id))?;
60
61 Ok(())
62}
63
64pub async fn handle_wasm_registration(payload: Value, pool: PgPool) -> Result<()> {
66 let step_type = payload["step_type"].as_str().context("Missing step_type")?;
67 let module = payload["wasm_module"]
68 .as_str()
69 .context("Missing wasm_module")?;
70 let function = payload["wasm_function"].as_str().unwrap_or("run");
71 let config = payload["wasm_config"].clone();
72 let schema = payload["schema"].clone();
73 let documentation = payload["documentation"].as_str();
74
75 info!("Registering WASM step type: {}", step_type);
76
77 crate::db::upsert_step_definition_with_wasm(
78 &pool,
79 step_type,
80 &schema,
81 documentation,
82 module,
83 function,
84 &config,
85 )
86 .await
87 .with_context(|| format!("Failed to register WASM step type {}", step_type))?;
88
89 Ok(())
90}
91
92pub async fn handle_runner_heartbeat(payload: Value, pool: PgPool) -> Result<()> {
94 let runner_id = payload["runner_id"].as_str().context("Missing runner_id")?;
95
96 debug!("Received heartbeat from runner: {}", runner_id);
97
98 let rows_affected = crate::db::update_runner_heartbeat(&pool, RunnerStatus::Online, runner_id)
99 .await
100 .with_context(|| format!("Failed to update heartbeat for runner {}", runner_id))?
101 .rows_affected();
102
103 if rows_affected == 0 {
104 error!("Received heartbeat from unknown runner: {}", runner_id);
105 }
106
107 Ok(())
108}
109
110pub async fn handle_runner_offline(payload: Value, pool: PgPool) -> Result<()> {
112 let runner_id = payload["runner_id"].as_str().context("Missing runner_id")?;
113
114 info!("Runner going offline: {}", runner_id);
115
116 crate::db::update_runner_status(&pool, RunnerStatus::Offline, runner_id)
117 .await
118 .with_context(|| format!("Failed to set runner {} to offline", runner_id))?;
119
120 Ok(())
121}