Skip to main content

stormchaser_engine/handler/
runner.rs

1#![allow(clippy::explicit_auto_deref)]
2use anyhow::{Context, Result};
3use serde_json::Value;
4use sqlx::PgPool;
5use stormchaser_model::runner::RunnerStatus;
6use tracing::{debug, error, info};
7
8/// Handle runner registration.
9pub async fn handle_runner_registration(payload: Value, pool: PgPool) -> Result<()> {
10    let runner_id = payload["runner_id"].as_str().context("Missing runner_id")?;
11    let runner_type = payload["runner_type"]
12        .as_str()
13        .context("Missing runner_type")?;
14    let protocol_version = payload["protocol_version"]
15        .as_str()
16        .context("Missing protocol_version")?;
17    let nats_subject = payload["nats_subject"]
18        .as_str()
19        .context("Missing nats_subject")?;
20
21    let capabilities: Vec<String> = if let Some(caps) = payload["capabilities"].as_array() {
22        caps.iter()
23            .filter_map(|c| c.as_str().map(|s| s.to_string()))
24            .collect()
25    } else {
26        Vec::new()
27    };
28
29    info!("Registering runner: {} (type: {})", runner_id, runner_type);
30
31    let mut step_defs = Vec::new();
32    if let Some(step_types) = payload["step_types"].as_array() {
33        for st in step_types {
34            let step_type = st["step_type"]
35                .as_str()
36                .context("Missing step_type in step_types array")?;
37            let schema = st["schema"].clone();
38            let documentation = st["documentation"].as_str();
39
40            step_defs.push(crate::db::StepDefinitionInput {
41                step_type: step_type.to_string(),
42                schema,
43                documentation: documentation.map(|s| s.to_string()),
44            });
45        }
46    }
47
48    let mut conn = pool.acquire().await?;
49    crate::db::register_runner_with_steps(
50        &mut *conn,
51        runner_id,
52        runner_type,
53        protocol_version,
54        &capabilities,
55        nats_subject,
56        step_defs,
57    )
58    .await
59    .with_context(|| format!("Failed to register runner {} with steps", runner_id))?;
60
61    Ok(())
62}
63
64/// Handle wasm registration.
65pub async fn handle_wasm_registration(payload: Value, pool: PgPool) -> Result<()> {
66    let step_type = payload["step_type"].as_str().context("Missing step_type")?;
67    let module = payload["wasm_module"]
68        .as_str()
69        .context("Missing wasm_module")?;
70    let function = payload["wasm_function"].as_str().unwrap_or("run");
71    let config = payload["wasm_config"].clone();
72    let schema = payload["schema"].clone();
73    let documentation = payload["documentation"].as_str();
74
75    info!("Registering WASM step type: {}", step_type);
76
77    crate::db::upsert_step_definition_with_wasm(
78        &pool,
79        step_type,
80        &schema,
81        documentation,
82        module,
83        function,
84        &config,
85    )
86    .await
87    .with_context(|| format!("Failed to register WASM step type {}", step_type))?;
88
89    Ok(())
90}
91
92/// Handle runner heartbeat.
93pub async fn handle_runner_heartbeat(payload: Value, pool: PgPool) -> Result<()> {
94    let runner_id = payload["runner_id"].as_str().context("Missing runner_id")?;
95
96    debug!("Received heartbeat from runner: {}", runner_id);
97
98    let rows_affected = crate::db::update_runner_heartbeat(&pool, RunnerStatus::Online, runner_id)
99        .await
100        .with_context(|| format!("Failed to update heartbeat for runner {}", runner_id))?
101        .rows_affected();
102
103    if rows_affected == 0 {
104        error!("Received heartbeat from unknown runner: {}", runner_id);
105    }
106
107    Ok(())
108}
109
110/// Handle runner offline.
111pub async fn handle_runner_offline(payload: Value, pool: PgPool) -> Result<()> {
112    let runner_id = payload["runner_id"].as_str().context("Missing runner_id")?;
113
114    info!("Runner going offline: {}", runner_id);
115
116    crate::db::update_runner_status(&pool, RunnerStatus::Offline, runner_id)
117        .await
118        .with_context(|| format!("Failed to set runner {} to offline", runner_id))?;
119
120    Ok(())
121}