Skip to main content

astrid_uplink/
kernel_client.rs

1//! Kernel-API IPC client.
2//!
3//! Mirrors [`AdminClient`](crate::admin_client::AdminClient) but for
4//! the `KernelRequest` family that flows over the
5//! `astrid.v1.request.*` / `astrid.v1.response.*` topic prefixes
6//! instead of the `astrid.v1.admin.*` admin surface. Used by the
7//! HTTP gateway's capsule + system routes (`GET /api/capsules`,
8//! `GET /api/sys/status`, etc.).
9//!
10//! ## Why a separate client
11//!
12//! [`KernelRequest`](astrid_core::kernel_api::KernelRequest) and
13//! [`AdminRequestKind`](astrid_core::kernel_api::AdminRequestKind)
14//! are two distinct typed wire formats with two distinct topic
15//! prefixes; the kernel dispatchers are sibling-but-separate. Splitting
16//! the client mirror keeps the type-level boundaries clean — a route
17//! handler for `/api/capsules` literally cannot accidentally publish
18//! an `AdminRequestKind` and vice-versa.
19//!
20//! ## Concurrency safety
21//!
22//! Unlike `AdminClient`, the `KernelRequest` payload has no
23//! `request_id` field for correlation. To make concurrent HTTP
24//! requests safe on a shared bus, every outbound message gets a
25//! per-request UUID embedded in the topic suffix
26//! (`astrid.v1.request.<wire-name>.<uuid>`). The kernel router echoes
27//! the full suffix on its response (`response_topic` does
28//! `topic.strip_prefix("astrid.v1.request.")` then prepends the
29//! response prefix), so the client can match a unique
30//! `astrid.v1.response.<wire-name>.<uuid>` and never confuse two
31//! in-flight `GetStatus` calls for each other.
32
33use std::time::Duration;
34
35use anyhow::{Context, Result, anyhow};
36use astrid_core::PrincipalId;
37use astrid_core::kernel_api::{KernelRequest, KernelResponse};
38use astrid_types::ipc::{IpcMessage, IpcPayload};
39use uuid::Uuid;
40
41use crate::socket_client::SocketClient;
42
43/// Topic prefix for kernel-management requests.
44const REQUEST_PREFIX: &str = "astrid.v1.request.";
45/// Topic prefix for kernel-management responses.
46const RESPONSE_PREFIX: &str = "astrid.v1.response.";
47
48/// Default response timeout. Generous because some kernel ops
49/// (capsule reload, status under load) can take a few seconds.
50const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
51
52/// Stable wire-name component of the topic suffix for a
53/// [`KernelRequest`] variant.
54///
55/// These names are hand-curated rather than auto-derived — they
56/// match the CLI's existing conventions
57/// (`crates/astrid-cli/src/commands/{who,daemon,ps,doctor}.rs`) so
58/// the gateway and CLI publish on the same topics and the kernel
59/// can't accidentally see two different name conventions for the
60/// same payload.
61#[must_use]
62pub const fn topic_suffix(req: &KernelRequest) -> &'static str {
63    match req {
64        KernelRequest::InstallCapsule { .. } => "install_capsule",
65        KernelRequest::ApproveCapability { .. } => "approve_capability",
66        KernelRequest::ListCapsules => "list_capsules",
67        KernelRequest::ReloadCapsules => "reload_capsules",
68        KernelRequest::GetCommands => "get_commands",
69        KernelRequest::GetCapsuleMetadata => "metadata",
70        KernelRequest::Shutdown { .. } => "shutdown",
71        KernelRequest::GetStatus => "status",
72    }
73}
74
75/// A connected kernel-management client. One short-lived
76/// `KernelClient` per HTTP request; the per-request connection (plus
77/// the per-call UUID embedded in the topic) avoids cross-talk
78/// between concurrent dashboard calls.
79pub struct KernelClient {
80    inner: SocketClient,
81    caller: PrincipalId,
82    timeout: Duration,
83}
84
85impl KernelClient {
86    /// Connect to the running daemon, authenticate via the existing
87    /// handshake, and bind the client to `caller`. Every outbound
88    /// request stamps `IpcMessage.principal = caller` so the kernel's
89    /// `resolve_caller` reads it for Layer 5 capability checks.
90    ///
91    /// # Errors
92    /// Returns an error if the socket file is missing (no daemon),
93    /// connection fails, or the handshake is rejected.
94    pub async fn connect(caller: PrincipalId) -> Result<Self> {
95        let session_id = astrid_core::SessionId::from_uuid(Uuid::new_v4());
96        let inner = SocketClient::connect(session_id)
97            .await
98            .context("Failed to connect to Astrid daemon. Run `astrid start` to launch it.")?;
99        Ok(Self {
100            inner,
101            caller,
102            timeout: DEFAULT_TIMEOUT,
103        })
104    }
105
106    /// Override the response read timeout.
107    #[must_use]
108    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
109        self.timeout = timeout;
110        self
111    }
112
113    /// Send a [`KernelRequest`] and await the matching
114    /// [`KernelResponse`].
115    ///
116    /// # Errors
117    /// Returns an error on serialization failure, send failure, or
118    /// timeout / connection drop before a matching response arrives.
119    pub async fn request(&mut self, req: KernelRequest) -> Result<KernelResponse> {
120        // Per-call UUID suffix. The kernel's `handle_request` strips
121        // the `astrid.v1.request.` prefix and reprefixes with
122        // `astrid.v1.response.`, so embedding a UUID here gives us a
123        // private response channel that other concurrent in-flight
124        // calls can't collide on.
125        let correlation = Uuid::new_v4().simple().to_string();
126        let suffix = format!("{}.{correlation}", topic_suffix(&req));
127        let request_topic = format!("{REQUEST_PREFIX}{suffix}");
128        let want_response = format!("{RESPONSE_PREFIX}{suffix}");
129
130        let payload = serde_json::to_value(&req).context("serialise KernelRequest")?;
131        let msg = IpcMessage::new(request_topic, IpcPayload::RawJson(payload), Uuid::nil())
132            .with_principal(self.caller.to_string());
133        self.inner.send_message(msg).await?;
134
135        let raw = self
136            .inner
137            .read_until_topic(&want_response, self.timeout)
138            .await
139            .with_context(|| format!("waiting on {want_response}"))?;
140
141        SocketClient::extract_kernel_response(&raw).ok_or_else(|| {
142            anyhow!("kernel response on {want_response} did not deserialize as KernelResponse")
143        })
144    }
145
146    /// Borrow the principal this client stamps on outbound messages.
147    #[must_use]
148    pub const fn caller(&self) -> &PrincipalId {
149        &self.caller
150    }
151}
152
153/// Convenience: lift a [`KernelResponse::Error`] into `Err`.
154///
155/// # Errors
156/// Returns an error wrapping the kernel's error message when the
157/// response is `KernelResponse::Error`.
158pub fn into_result(resp: KernelResponse) -> Result<KernelResponse> {
159    match resp {
160        KernelResponse::Error(msg) => Err(anyhow!("kernel rejected request: {msg}")),
161        other => Ok(other),
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn topic_suffixes_match_cli_conventions() {
171        // Pin these to the strings the CLI's existing kernel-request
172        // code already uses (who.rs, daemon.rs, ps.rs, doctor.rs).
173        // Drift here means the gateway and CLI publish to different
174        // topics for the same payload — silent breakage.
175        assert_eq!(topic_suffix(&KernelRequest::GetStatus), "status");
176        assert_eq!(topic_suffix(&KernelRequest::ListCapsules), "list_capsules");
177        assert_eq!(topic_suffix(&KernelRequest::GetCommands), "get_commands");
178        assert_eq!(topic_suffix(&KernelRequest::GetCapsuleMetadata), "metadata");
179        assert_eq!(
180            topic_suffix(&KernelRequest::ReloadCapsules),
181            "reload_capsules"
182        );
183        assert_eq!(
184            topic_suffix(&KernelRequest::Shutdown { reason: None }),
185            "shutdown"
186        );
187    }
188
189    #[test]
190    fn into_result_lifts_error_variant() {
191        let err = KernelResponse::Error("not allowed".into());
192        let res = into_result(err);
193        assert!(res.is_err());
194        assert!(res.unwrap_err().to_string().contains("not allowed"));
195    }
196}