Skip to main content

heldar_kernel/services/
fleet_register.rs

1//! Edge-side fleet self-registration.
2//!
3//! When this node is configured with a control-plane URL (`HELDAR_CP_URL`), its own reachable URL
4//! (`HELDAR_PUBLIC_BASE_URL`) and a site id (`HELDAR_SITE_ID`), it POSTs its identity to the control
5//! plane's `POST /api/v1/fleet/nodes` on boot and on a heartbeat cadence. The control plane then drains
6//! this node's outbox without any static config or restart — "add a node and it joins the fleet".
7//!
8//! Registration is idempotent (the control plane upserts on node id), so the heartbeat also re-teaches
9//! a control plane that restarted or lost its registry. The reported token is the bearer the control
10//! plane presents when draining this node's outbox; when this node runs with auth disabled (the LAN
11//! default) it can be empty.
12//!
13//! This is a pure outbound HTTP client — the kernel has NO code dependency on the control-plane crate;
14//! the only seam is the configurable URL. The fleet is strictly opt-in: with `HELDAR_CP_URL` unset (the
15//! default) this loop parks forever and the node never phones home.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Context;
21use serde_json::{json, Value};
22
23use crate::config::Config;
24
25/// The control-plane registration endpoint for a given base URL (trailing slash tolerated).
26fn register_url(cp_url: &str) -> String {
27    format!("{}/api/v1/fleet/nodes", cp_url.trim_end_matches('/'))
28}
29
30/// The registration body this node reports: its fleet identity, the URL the control plane should reach
31/// it on, and the bearer the control plane presents when draining this node's outbox.
32fn register_body(site_id: &str, base_url: &str, token: &str) -> Value {
33    json!({ "id": site_id, "base_url": base_url, "token": token })
34}
35
36/// Build the HTTP client used to register, configuring mTLS (client identity + control-plane CA) when
37/// `HELDAR_CP_TLS_*` is set. Errors only on unreadable/invalid cert material.
38fn build_client(cfg: &Config) -> anyhow::Result<reqwest::Client> {
39    let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(10));
40    if let Some(t) = &cfg.cp_tls {
41        let cert = std::fs::read(&t.client_cert)
42            .with_context(|| format!("reading client cert {}", t.client_cert.display()))?;
43        let key = std::fs::read(&t.client_key)
44            .with_context(|| format!("reading client key {}", t.client_key.display()))?;
45        let ca = std::fs::read(&t.server_ca)
46            .with_context(|| format!("reading control-plane CA {}", t.server_ca.display()))?;
47        // reqwest's PEM identity wants key + cert chain in one buffer.
48        let mut identity_pem = key;
49        identity_pem.extend_from_slice(&cert);
50        let identity =
51            reqwest::Identity::from_pem(&identity_pem).context("building client identity")?;
52        let root = reqwest::Certificate::from_pem(&ca).context("parsing control-plane CA")?;
53        builder = builder.identity(identity).add_root_certificate(root);
54    }
55    builder.build().context("building HTTP client")
56}
57
58/// Edge-side self-registration loop. Parks forever unless fully configured for the fleet (control-plane
59/// URL + this node's site id + this node's reachable URL); otherwise POSTs its identity on boot and on
60/// every heartbeat. Never returns (returning would have the supervisor respawn it).
61pub async fn run(cfg: Arc<Config>) {
62    let (Some(cp_url), Some(site_id), Some(base_url)) = (
63        cfg.cp_url.as_deref(),
64        cfg.site_id.as_deref(),
65        cfg.public_base_url.as_deref(),
66    ) else {
67        // Not configured for the fleet (any of CP URL / site id / reachable URL missing): park.
68        std::future::pending::<()>().await;
69        return;
70    };
71
72    let client = match build_client(&cfg) {
73        Ok(c) => c,
74        Err(e) => {
75            // A persistent cert-config error: park rather than tight-loop respawning.
76            tracing::error!(error = %e, "fleet self-registration disabled: bad mTLS config");
77            std::future::pending::<()>().await;
78            return;
79        }
80    };
81    let url = register_url(cp_url);
82    let body = register_body(site_id, base_url, &cfg.cp_token);
83    // `interval`'s first tick fires immediately → register on boot, then heartbeat on cadence.
84    let mut tick = tokio::time::interval(Duration::from_secs(cfg.cp_register_interval_s.max(1)));
85    loop {
86        tick.tick().await;
87        match client.post(&url).json(&body).send().await {
88            Ok(r) if r.status().is_success() => {
89                tracing::debug!(node = %site_id, control_plane = %cp_url, "registered with fleet control plane")
90            }
91            Ok(r) => {
92                tracing::warn!(node = %site_id, status = %r.status(), "fleet registration rejected")
93            }
94            Err(e) => {
95                tracing::warn!(node = %site_id, error = %e, "fleet registration failed; retry next heartbeat")
96            }
97        }
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn register_url_appends_path_and_trims_trailing_slash() {
107        assert_eq!(
108            register_url("http://cp.lan:9100"),
109            "http://cp.lan:9100/api/v1/fleet/nodes"
110        );
111        assert_eq!(
112            register_url("http://cp.lan:9100/"),
113            "http://cp.lan:9100/api/v1/fleet/nodes"
114        );
115    }
116
117    #[test]
118    fn register_body_carries_identity_url_and_token() {
119        let b = register_body("site-a", "https://edge-a.lan", "tok");
120        assert_eq!(b["id"], "site-a");
121        assert_eq!(b["base_url"], "https://edge-a.lan");
122        assert_eq!(b["token"], "tok");
123    }
124}