Skip to main content

kanade_shared/
nats_client.rs

1//! Shared NATS client constructor.
2//!
3//! Token resolution (first match wins):
4//!
5//!   1. Windows registry — `HKLM\SOFTWARE\kanade\agent\NatsToken`
6//!      (`REG_SZ`). Production path. Hardened ACL (SYSTEM + Admin
7//!      only) keeps the token out of low-privilege users' reach,
8//!      which Machine-scope env vars cannot do.
9//!   2. `$KANADE_NATS_TOKEN` environment variable. Dev / fallback
10//!      path. The agent service runs as LocalSystem so user-session
11//!      env vars never reach it; this branch only fires for
12//!      `cargo run` / interactive shells.
13//!   3. No token — connect unauthenticated. Works against a broker
14//!      started without `authorization { … }`.
15//!
16//! For mTLS / NKeys / NATS-JWT modes (spec §2.7.1's full design), the
17//! plan is to grow ConnectOptions here — every binary picks up the
18//! upgrade for free.
19
20use anyhow::{Context, Result};
21
22use crate::secrets;
23
24const ENV_TOKEN: &str = "KANADE_NATS_TOKEN";
25const REG_SUBKEY: &str = r"SOFTWARE\kanade\agent";
26const REG_VALUE: &str = "NatsToken";
27
28fn resolve_token() -> Option<String> {
29    if let Some(t) = secrets::read_hklm_value(REG_SUBKEY, REG_VALUE) {
30        return Some(t);
31    }
32    match std::env::var(ENV_TOKEN) {
33        Ok(t) if !t.is_empty() => Some(t),
34        _ => None,
35    }
36}
37
38/// Connect to NATS at `url`. Resolves the bearer token from registry
39/// (Windows) or `$KANADE_NATS_TOKEN`; connects unauthenticated when
40/// neither is set.
41pub async fn connect(url: &str) -> Result<async_nats::Client> {
42    connect_inner(url, None::<fn(async_nats::Event) -> std::future::Ready<()>>).await
43}
44
45/// Same as [`connect`] but also wires an `event_callback` that fires
46/// whenever async-nats publishes a `ConnectEvent` (Connected,
47/// Disconnected, ServerError, etc.). The callback's `Future` runs on
48/// the async-nats internal task — keep it cheap and non-blocking
49/// (set a flag, send on a channel, that kind of thing) so the
50/// connection state machine isn't held up.
51///
52/// Used by the agent's v0.26 Layer 2 staleness tracker: the callback
53/// stamps a shared `Mutex<Option<Instant>>` on every Connected event,
54/// so `decide()` at fire time can answer "how long ago were we last
55/// definitely-talking-to-the-broker" without a polling loop.
56pub async fn connect_with_event_callback<F, Fut>(url: &str, cb: F) -> Result<async_nats::Client>
57where
58    F: Fn(async_nats::Event) -> Fut + Send + Sync + 'static,
59    Fut: std::future::Future<Output = ()> + Send + Sync + 'static,
60{
61    connect_inner(url, Some(cb)).await
62}
63
64async fn connect_inner<F, Fut>(url: &str, cb: Option<F>) -> Result<async_nats::Client>
65where
66    F: Fn(async_nats::Event) -> Fut + Send + Sync + 'static,
67    Fut: std::future::Future<Output = ()> + Send + Sync + 'static,
68{
69    // v0.38 / #137: offline-tolerant boot. Without
70    // `retry_on_initial_connect`, `opts.connect(url).await` blocks-then-
71    // errors when the broker is unreachable at startup — the agent
72    // process dies, SCM ticks its restart counter, and the offline-
73    // tolerant subsystems (local_scheduler, outbox drain) never spawn.
74    // With this flag, connect() returns `Ok(Client)` immediately and
75    // async-nats does the reconnect in the background; subscribe()
76    // calls queue the SUB frame until the link is up.
77    let opts = async_nats::ConnectOptions::new().retry_on_initial_connect();
78    let opts = match resolve_token() {
79        Some(token) => opts.token(token),
80        None => opts,
81    };
82    let opts = match cb {
83        Some(cb) => opts.event_callback(cb),
84        None => opts,
85    };
86    opts.connect(url)
87        .await
88        .with_context(|| format!("connect to NATS at {url}"))
89}