kanade_shared/nats_client.rs
1//! Shared NATS client constructor.
2//!
3//! Token resolution (first match wins):
4//!
5//! 1. Windows registry — `HKLM\SOFTWARE\kanade\agent\NatsToken`
6//! (`REG_SZ`). Production path. Hardened ACL (SYSTEM + Admin
7//! only) keeps the token out of low-privilege users' reach,
8//! which Machine-scope env vars cannot do.
9//! 2. `$KANADE_NATS_TOKEN` environment variable. Dev / fallback
10//! path. The agent service runs as LocalSystem so user-session
11//! env vars never reach it; this branch only fires for
12//! `cargo run` / interactive shells.
13//! 3. No token — connect unauthenticated. Works against a broker
14//! started without `authorization { … }`.
15//!
16//! For mTLS / NKeys / NATS-JWT modes (spec §2.7.1's full design), the
17//! plan is to grow ConnectOptions here — every binary picks up the
18//! upgrade for free.
19
20use anyhow::{Context, Result};
21
22use crate::secrets;
23
24const ENV_TOKEN: &str = "KANADE_NATS_TOKEN";
25const REG_SUBKEY: &str = r"SOFTWARE\kanade\agent";
26const REG_VALUE: &str = "NatsToken";
27
28fn resolve_token() -> Option<String> {
29 if let Some(t) = secrets::read_hklm_value(REG_SUBKEY, REG_VALUE) {
30 return Some(t);
31 }
32 match std::env::var(ENV_TOKEN) {
33 Ok(t) if !t.is_empty() => Some(t),
34 _ => None,
35 }
36}
37
38/// Connect to NATS at `url`. Resolves the bearer token from registry
39/// (Windows) or `$KANADE_NATS_TOKEN`; connects unauthenticated when
40/// neither is set.
41pub async fn connect(url: &str) -> Result<async_nats::Client> {
42 connect_inner(url, None::<fn(async_nats::Event) -> std::future::Ready<()>>).await
43}
44
45/// Same as [`connect`] but also wires an `event_callback` that fires
46/// whenever async-nats publishes a `ConnectEvent` (Connected,
47/// Disconnected, ServerError, etc.). The callback's `Future` runs on
48/// the async-nats internal task — keep it cheap and non-blocking
49/// (set a flag, send on a channel, that kind of thing) so the
50/// connection state machine isn't held up.
51///
52/// Used by the agent's v0.26 Layer 2 staleness tracker: the callback
53/// stamps a shared `Mutex<Option<Instant>>` on every Connected event,
54/// so `decide()` at fire time can answer "how long ago were we last
55/// definitely-talking-to-the-broker" without a polling loop.
56pub async fn connect_with_event_callback<F, Fut>(url: &str, cb: F) -> Result<async_nats::Client>
57where
58 F: Fn(async_nats::Event) -> Fut + Send + Sync + 'static,
59 Fut: std::future::Future<Output = ()> + Send + Sync + 'static,
60{
61 connect_inner(url, Some(cb)).await
62}
63
64async fn connect_inner<F, Fut>(url: &str, cb: Option<F>) -> Result<async_nats::Client>
65where
66 F: Fn(async_nats::Event) -> Fut + Send + Sync + 'static,
67 Fut: std::future::Future<Output = ()> + Send + Sync + 'static,
68{
69 // v0.38 / #137: offline-tolerant boot. Without
70 // `retry_on_initial_connect`, `opts.connect(url).await` blocks-then-
71 // errors when the broker is unreachable at startup — the agent
72 // process dies, SCM ticks its restart counter, and the offline-
73 // tolerant subsystems (local_scheduler, outbox drain) never spawn.
74 // With this flag, connect() returns `Ok(Client)` immediately and
75 // async-nats does the reconnect in the background; subscribe()
76 // calls queue the SUB frame until the link is up.
77 let opts = async_nats::ConnectOptions::new().retry_on_initial_connect();
78 let opts = match resolve_token() {
79 Some(token) => opts.token(token),
80 None => opts,
81 };
82 let opts = match cb {
83 Some(cb) => opts.event_callback(cb),
84 None => opts,
85 };
86 opts.connect(url)
87 .await
88 .with_context(|| format!("connect to NATS at {url}"))
89}