astrid_uplink/kernel_client.rs
1//! Kernel-API IPC client.
2//!
3//! Mirrors [`AdminClient`](crate::admin_client::AdminClient) but for
4//! the `KernelRequest` family that flows over the
5//! `astrid.v1.request.*` / `astrid.v1.response.*` topic prefixes
6//! instead of the `astrid.v1.admin.*` admin surface. Used by the
7//! HTTP gateway's capsule + system routes (`GET /api/capsules`,
8//! `GET /api/sys/status`, etc.).
9//!
10//! ## Why a separate client
11//!
12//! [`KernelRequest`](astrid_core::kernel_api::KernelRequest) and
13//! [`AdminRequestKind`](astrid_core::kernel_api::AdminRequestKind)
14//! are two distinct typed wire formats with two distinct topic
15//! prefixes; the kernel dispatchers are sibling-but-separate. Splitting
16//! the client mirror keeps the type-level boundaries clean — a route
17//! handler for `/api/capsules` literally cannot accidentally publish
18//! an `AdminRequestKind` and vice-versa.
19//!
20//! ## Concurrency safety
21//!
22//! Unlike `AdminClient`, the `KernelRequest` payload has no
23//! `request_id` field for correlation. To make concurrent HTTP
24//! requests safe on a shared bus, every outbound message gets a
25//! per-request UUID embedded in the topic suffix
26//! (`astrid.v1.request.<wire-name>.<uuid>`). The kernel router echoes
27//! the full suffix on its response (`response_topic` does
28//! `topic.strip_prefix("astrid.v1.request.")` then prepends the
29//! response prefix), so the client can match a unique
30//! `astrid.v1.response.<wire-name>.<uuid>` and never confuse two
31//! in-flight `GetStatus` calls for each other.
32
33use std::time::Duration;
34
35use anyhow::{Context, Result, anyhow};
36use astrid_core::PrincipalId;
37use astrid_core::kernel_api::{KernelRequest, KernelResponse};
38use astrid_types::ipc::{IpcMessage, IpcPayload};
39use uuid::Uuid;
40
41use crate::socket_client::SocketClient;
42
43/// Topic prefix for kernel-management requests.
44const REQUEST_PREFIX: &str = "astrid.v1.request.";
45/// Topic prefix for kernel-management responses.
46const RESPONSE_PREFIX: &str = "astrid.v1.response.";
47
48/// Default response timeout. Generous because some kernel ops
49/// (capsule reload, status under load) can take a few seconds.
50const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
51
52/// Stable wire-name component of the topic suffix for a
53/// [`KernelRequest`] variant.
54///
55/// These names are hand-curated rather than auto-derived — they
56/// match the CLI's existing conventions
57/// (`crates/astrid-cli/src/commands/{who,daemon,ps,doctor}.rs`) so
58/// the gateway and CLI publish on the same topics and the kernel
59/// can't accidentally see two different name conventions for the
60/// same payload.
61#[must_use]
62pub const fn topic_suffix(req: &KernelRequest) -> &'static str {
63 match req {
64 KernelRequest::InstallCapsule { .. } => "install_capsule",
65 KernelRequest::ApproveCapability { .. } => "approve_capability",
66 KernelRequest::ListCapsules => "list_capsules",
67 KernelRequest::ReloadCapsules => "reload_capsules",
68 KernelRequest::GetCommands => "get_commands",
69 KernelRequest::GetCapsuleMetadata => "metadata",
70 KernelRequest::Shutdown { .. } => "shutdown",
71 KernelRequest::GetStatus => "status",
72 }
73}
74
75/// A connected kernel-management client. One short-lived
76/// `KernelClient` per HTTP request; the per-request connection (plus
77/// the per-call UUID embedded in the topic) avoids cross-talk
78/// between concurrent dashboard calls.
79pub struct KernelClient {
80 inner: SocketClient,
81 caller: PrincipalId,
82 timeout: Duration,
83}
84
85impl KernelClient {
86 /// Connect to the running daemon, authenticate via the existing
87 /// handshake, and bind the client to `caller`. Every outbound
88 /// request stamps `IpcMessage.principal = caller` so the kernel's
89 /// `resolve_caller` reads it for Layer 5 capability checks.
90 ///
91 /// # Errors
92 /// Returns an error if the socket file is missing (no daemon),
93 /// connection fails, or the handshake is rejected.
94 pub async fn connect(caller: PrincipalId) -> Result<Self> {
95 let session_id = astrid_core::SessionId::from_uuid(Uuid::new_v4());
96 let inner = SocketClient::connect(session_id)
97 .await
98 .context("Failed to connect to Astrid daemon. Run `astrid start` to launch it.")?;
99 Ok(Self {
100 inner,
101 caller,
102 timeout: DEFAULT_TIMEOUT,
103 })
104 }
105
106 /// Override the response read timeout.
107 #[must_use]
108 pub const fn with_timeout(mut self, timeout: Duration) -> Self {
109 self.timeout = timeout;
110 self
111 }
112
113 /// Send a [`KernelRequest`] and await the matching
114 /// [`KernelResponse`].
115 ///
116 /// # Errors
117 /// Returns an error on serialization failure, send failure, or
118 /// timeout / connection drop before a matching response arrives.
119 pub async fn request(&mut self, req: KernelRequest) -> Result<KernelResponse> {
120 // Per-call UUID suffix. The kernel's `handle_request` strips
121 // the `astrid.v1.request.` prefix and reprefixes with
122 // `astrid.v1.response.`, so embedding a UUID here gives us a
123 // private response channel that other concurrent in-flight
124 // calls can't collide on.
125 let correlation = Uuid::new_v4().simple().to_string();
126 let suffix = format!("{}.{correlation}", topic_suffix(&req));
127 let request_topic = format!("{REQUEST_PREFIX}{suffix}");
128 let want_response = format!("{RESPONSE_PREFIX}{suffix}");
129
130 let payload = serde_json::to_value(&req).context("serialise KernelRequest")?;
131 let msg = IpcMessage::new(request_topic, IpcPayload::RawJson(payload), Uuid::nil())
132 .with_principal(self.caller.to_string());
133 self.inner.send_message(msg).await?;
134
135 let raw = self
136 .inner
137 .read_until_topic(&want_response, self.timeout)
138 .await
139 .with_context(|| format!("waiting on {want_response}"))?;
140
141 SocketClient::extract_kernel_response(&raw).ok_or_else(|| {
142 anyhow!("kernel response on {want_response} did not deserialize as KernelResponse")
143 })
144 }
145
146 /// Borrow the principal this client stamps on outbound messages.
147 #[must_use]
148 pub const fn caller(&self) -> &PrincipalId {
149 &self.caller
150 }
151}
152
153/// Convenience: lift a [`KernelResponse::Error`] into `Err`.
154///
155/// # Errors
156/// Returns an error wrapping the kernel's error message when the
157/// response is `KernelResponse::Error`.
158pub fn into_result(resp: KernelResponse) -> Result<KernelResponse> {
159 match resp {
160 KernelResponse::Error(msg) => Err(anyhow!("kernel rejected request: {msg}")),
161 other => Ok(other),
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn topic_suffixes_match_cli_conventions() {
171 // Pin these to the strings the CLI's existing kernel-request
172 // code already uses (who.rs, daemon.rs, ps.rs, doctor.rs).
173 // Drift here means the gateway and CLI publish to different
174 // topics for the same payload — silent breakage.
175 assert_eq!(topic_suffix(&KernelRequest::GetStatus), "status");
176 assert_eq!(topic_suffix(&KernelRequest::ListCapsules), "list_capsules");
177 assert_eq!(topic_suffix(&KernelRequest::GetCommands), "get_commands");
178 assert_eq!(topic_suffix(&KernelRequest::GetCapsuleMetadata), "metadata");
179 assert_eq!(
180 topic_suffix(&KernelRequest::ReloadCapsules),
181 "reload_capsules"
182 );
183 assert_eq!(
184 topic_suffix(&KernelRequest::Shutdown { reason: None }),
185 "shutdown"
186 );
187 }
188
189 #[test]
190 fn into_result_lifts_error_variant() {
191 let err = KernelResponse::Error("not allowed".into());
192 let res = into_result(err);
193 assert!(res.is_err());
194 assert!(res.unwrap_err().to_string().contains("not allowed"));
195 }
196}