Skip to main content

astrid_uplink/
admin_client.rs

1//! Layer 6 admin IPC client.
2//!
3//! Wraps [`SocketClient`] with the request/response correlation
4//! pattern introduced by issue #672. Each call generates a UUID v4
5//! `request_id`, sends an [`AdminKernelRequest`] on
6//! `astrid.v1.admin.<suffix>`, and reads messages from the daemon
7//! until one arrives on `astrid.v1.admin.response.<suffix>` whose
8//! echoed `request_id` matches.
9//!
10//! Messages on other topics or with a non-matching `request_id` are
11//! dropped — the admin client does not consume the chat event stream,
12//! so unrelated broadcasts (capsule-loaded notices, agent responses)
13//! are safe to discard while we wait.
14//!
15//! Trust shape: the caller passes `PrincipalId` explicitly. The CLI
16//! resolves it from its `context::active_agent()` file; the HTTP
17//! gateway resolves it from a verified bearer token. Both stamp the
18//! outbound `IpcMessage.principal` so the kernel's `resolve_caller`
19//! reads the right principal for Layer 5/6 capability checks.
20
21use std::time::Duration;
22
23use anyhow::{Context, Result, anyhow};
24use astrid_core::PrincipalId;
25use astrid_core::kernel_api::{
26    AdminKernelRequest, AdminKernelResponse, AdminRequestKind, AdminResponseBody,
27};
28use astrid_types::ipc::{IpcMessage, IpcPayload};
29use serde_json::Value;
30use uuid::Uuid;
31
32use crate::socket_client::SocketClient;
33
34/// Topic prefix for admin requests sent by uplinks.
35const ADMIN_INPUT_PREFIX: &str = "astrid.v1.admin.";
36/// Topic prefix for admin responses from the kernel.
37const ADMIN_RESPONSE_PREFIX: &str = "astrid.v1.admin.response.";
38
39/// Default timeout for the response read loop. Generous because admin
40/// writes can block on the kernel write lock.
41const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
42
43/// Stable wire-name suffix for an [`AdminRequestKind`].
44///
45/// Mirrors `admin_request_method` on the kernel side — the suffix is
46/// the part after `astrid.v1.admin.`. This is the exact string the
47/// kernel router uses to derive the response topic, so the suffix MUST
48/// match between sides.
49#[must_use]
50pub const fn topic_suffix(req: &AdminRequestKind) -> &'static str {
51    match req {
52        AdminRequestKind::AgentCreate { .. } => "agent.create",
53        AdminRequestKind::AgentDelete { .. } => "agent.delete",
54        AdminRequestKind::AgentEnable { .. } => "agent.enable",
55        AdminRequestKind::AgentDisable { .. } => "agent.disable",
56        AdminRequestKind::AgentModify { .. } => "agent.modify",
57        AdminRequestKind::AgentList => "agent.list",
58        AdminRequestKind::QuotaSet { .. } => "quota.set",
59        AdminRequestKind::QuotaGet { .. } => "quota.get",
60        AdminRequestKind::UsageGet { .. } => "usage.get",
61        AdminRequestKind::GroupCreate { .. } => "group.create",
62        AdminRequestKind::GroupDelete { .. } => "group.delete",
63        AdminRequestKind::GroupModify { .. } => "group.modify",
64        AdminRequestKind::GroupList => "group.list",
65        AdminRequestKind::CapsGrant { .. } => "caps.grant",
66        AdminRequestKind::CapsRevoke { .. } => "caps.revoke",
67        AdminRequestKind::InviteIssue { .. } => "invite.issue",
68        AdminRequestKind::InviteRedeem { .. } => "invite.redeem",
69        AdminRequestKind::InviteList => "invite.list",
70        AdminRequestKind::InviteRevoke { .. } => "invite.revoke",
71        AdminRequestKind::PairDeviceIssue { .. } => "auth.pair.issue",
72        AdminRequestKind::PairDeviceRedeem { .. } => "auth.pair.redeem",
73    }
74}
75
76/// Build the request topic for an [`AdminRequestKind`].
77#[must_use]
78pub fn request_topic(req: &AdminRequestKind) -> String {
79    format!("{ADMIN_INPUT_PREFIX}{}", topic_suffix(req))
80}
81
82/// Build the response topic for an [`AdminRequestKind`].
83#[must_use]
84pub fn response_topic(req: &AdminRequestKind) -> String {
85    format!("{ADMIN_RESPONSE_PREFIX}{}", topic_suffix(req))
86}
87
88/// A connected admin client. Sends [`AdminRequestKind`] requests and
89/// awaits the matching [`AdminResponseBody`].
90pub struct AdminClient {
91    inner: SocketClient,
92    caller: PrincipalId,
93    timeout: Duration,
94}
95
96impl AdminClient {
97    /// Connect to the running daemon, authenticate via the existing
98    /// handshake, and bind the client to `caller`. Every outbound
99    /// request stamps `IpcMessage.principal = caller` so the kernel's
100    /// `resolve_caller` reads it for Layer 5/6 capability checks.
101    ///
102    /// # Errors
103    /// Returns an error if the socket file is missing (no daemon),
104    /// connection fails, or the handshake is rejected.
105    pub async fn connect(caller: PrincipalId) -> Result<Self> {
106        let session_id = astrid_core::SessionId::from_uuid(Uuid::new_v4());
107        let inner = SocketClient::connect(session_id)
108            .await
109            .context("Failed to connect to Astrid daemon. Run `astrid start` to launch it.")?;
110        Ok(Self {
111            inner,
112            caller,
113            timeout: DEFAULT_TIMEOUT,
114        })
115    }
116
117    /// Override the response read timeout. Used by tests.
118    #[must_use]
119    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
120        self.timeout = timeout;
121        self
122    }
123
124    /// Borrow the principal this client stamps on outbound messages.
125    #[must_use]
126    pub const fn caller(&self) -> &PrincipalId {
127        &self.caller
128    }
129
130    /// Send an admin request and await the matching response.
131    ///
132    /// The `request_id` is generated as a fresh UUID v4 and echoed
133    /// back on the response. Messages with a different topic or a
134    /// non-matching `request_id` are dropped while we wait.
135    ///
136    /// # Errors
137    /// Returns an error if the request fails to serialize, the send
138    /// fails, the response is not received within the timeout, or
139    /// the connection drops before a matching response arrives.
140    pub async fn request(&mut self, kind: AdminRequestKind) -> Result<AdminResponseBody> {
141        let request_id = Uuid::new_v4().to_string();
142        let topic = request_topic(&kind);
143        let want_response = response_topic(&kind);
144
145        let req = AdminKernelRequest::with_request_id(request_id.clone(), kind);
146        let payload =
147            serde_json::to_value(&req).context("Failed to serialize AdminKernelRequest")?;
148        let msg = IpcMessage::new(topic, IpcPayload::RawJson(payload), Uuid::nil())
149            .with_principal(self.caller.to_string());
150        self.inner.send_message(msg).await?;
151
152        let deadline = tokio::time::Instant::now()
153            .checked_add(self.timeout)
154            .unwrap_or_else(tokio::time::Instant::now);
155        loop {
156            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
157            if remaining.is_zero() {
158                anyhow::bail!(
159                    "Admin request timed out after {:?} waiting for {want_response}",
160                    self.timeout
161                );
162            }
163            let read = tokio::time::timeout(remaining, self.inner.read_raw_frame()).await;
164            let frame = match read {
165                Ok(Ok(Some(bytes))) => bytes,
166                Ok(Ok(None)) => {
167                    anyhow::bail!("Daemon closed the connection before responding");
168                },
169                Ok(Err(e)) => return Err(e),
170                Err(_) => {
171                    anyhow::bail!(
172                        "Admin request timed out after {:?} waiting for {want_response}",
173                        self.timeout
174                    );
175                },
176            };
177
178            // The host serializes IPC envelopes through `to_guest_bytes`
179            // which strips the `type` tag for `IpcPayload::RawJson`, so
180            // the bytes the uplink sees from the proxy embed the
181            // response directly under `payload` (no `IpcPayload`
182            // wrapper). Match by topic, then deserialize
183            // `AdminKernelResponse` straight out of the `payload` field.
184            let raw: Value = match serde_json::from_slice(&frame) {
185                Ok(v) => v,
186                Err(e) => {
187                    tracing::debug!(error = %e, "frame is not JSON, skipping");
188                    continue;
189                },
190            };
191            let topic = raw
192                .get("topic")
193                .and_then(|t| t.as_str())
194                .unwrap_or_default();
195            if topic != want_response {
196                tracing::debug!(topic = %topic, "ignoring non-matching message");
197                continue;
198            }
199            let Some(payload) = raw.get("payload").cloned() else {
200                tracing::warn!(topic = %topic, "matched response missing payload");
201                continue;
202            };
203            // `payload` may be either the bare AdminKernelResponse JSON
204            // (after `to_guest_bytes` stripped the type tag) or a
205            // wrapped `{"type": "raw_json", "value": ...}`. Try the
206            // bare form first; fall back to extracting `value`.
207            let response_value = if payload
208                .as_object()
209                .is_some_and(|m| m.contains_key("type") && m.contains_key("value"))
210            {
211                payload.get("value").cloned().unwrap_or(payload)
212            } else {
213                payload
214            };
215            match serde_json::from_value::<AdminKernelResponse>(response_value) {
216                Ok(resp) => {
217                    if resp.request_id.as_deref() == Some(&request_id) {
218                        return Ok(resp.body);
219                    }
220                    tracing::debug!(
221                        echoed = ?resp.request_id,
222                        expected = %request_id,
223                        "ignoring response with non-matching request_id"
224                    );
225                },
226                Err(e) => {
227                    tracing::warn!(error = %e, "failed to deserialize admin response");
228                },
229            }
230        }
231    }
232}
233
234/// Convert an [`AdminResponseBody`] into a `Result`, lifting `Error`
235/// variants into `Err` so the caller can use `?` for cross-tenant
236/// permission denials and validation failures.
237///
238/// # Errors
239/// Returns an error wrapping the kernel's error message when the
240/// response body is [`AdminResponseBody::Error`].
241pub fn into_result(body: AdminResponseBody) -> Result<AdminResponseBody> {
242    match body {
243        AdminResponseBody::Error(msg) => Err(anyhow!("kernel rejected request: {msg}")),
244        other => Ok(other),
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use astrid_core::PrincipalId;
252
253    #[test]
254    fn topic_suffixes_match_kernel_constants() {
255        assert_eq!(
256            topic_suffix(&AdminRequestKind::AgentCreate {
257                name: "x".into(),
258                groups: vec![],
259                grants: vec![],
260            }),
261            "agent.create"
262        );
263        assert_eq!(topic_suffix(&AdminRequestKind::AgentList), "agent.list");
264        assert_eq!(topic_suffix(&AdminRequestKind::GroupList), "group.list");
265        let p = PrincipalId::default();
266        assert_eq!(
267            topic_suffix(&AdminRequestKind::QuotaGet { principal: p }),
268            "quota.get"
269        );
270    }
271
272    #[test]
273    fn request_topic_uses_admin_prefix() {
274        let req = AdminRequestKind::AgentList;
275        assert_eq!(request_topic(&req), "astrid.v1.admin.agent.list");
276        assert_eq!(response_topic(&req), "astrid.v1.admin.response.agent.list");
277    }
278
279    #[test]
280    fn invite_topic_suffixes() {
281        assert_eq!(
282            topic_suffix(&AdminRequestKind::InviteIssue {
283                group: "agent".into(),
284                expires_secs: Some(3600),
285                max_uses: 1,
286                metadata: None,
287            }),
288            "invite.issue"
289        );
290        assert_eq!(
291            topic_suffix(&AdminRequestKind::InviteRedeem {
292                token: "x".into(),
293                public_key: String::new(),
294                display_name: None,
295            }),
296            "invite.redeem"
297        );
298        assert_eq!(topic_suffix(&AdminRequestKind::InviteList), "invite.list");
299    }
300
301    #[test]
302    fn into_result_lifts_error_variant() {
303        let err = AdminResponseBody::Error("permission denied".into());
304        let res = into_result(err);
305        assert!(res.is_err());
306        let msg = res.unwrap_err().to_string();
307        assert!(msg.contains("permission denied"), "got: {msg}");
308    }
309
310    #[test]
311    fn into_result_passes_through_success() {
312        let ok = AdminResponseBody::AgentList(vec![]);
313        let res = into_result(ok);
314        assert!(matches!(res, Ok(AdminResponseBody::AgentList(_))));
315    }
316}