astrid_uplink/
admin_client.rs1use std::time::Duration;
22
23use anyhow::{Context, Result, anyhow};
24use astrid_core::PrincipalId;
25use astrid_core::kernel_api::{
26 AdminKernelRequest, AdminKernelResponse, AdminRequestKind, AdminResponseBody,
27};
28use astrid_types::ipc::{IpcMessage, IpcPayload};
29use serde_json::Value;
30use uuid::Uuid;
31
32use crate::socket_client::SocketClient;
33
34const ADMIN_INPUT_PREFIX: &str = "astrid.v1.admin.";
36const ADMIN_RESPONSE_PREFIX: &str = "astrid.v1.admin.response.";
38
39const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
42
43#[must_use]
50pub const fn topic_suffix(req: &AdminRequestKind) -> &'static str {
51 match req {
52 AdminRequestKind::AgentCreate { .. } => "agent.create",
53 AdminRequestKind::AgentDelete { .. } => "agent.delete",
54 AdminRequestKind::AgentEnable { .. } => "agent.enable",
55 AdminRequestKind::AgentDisable { .. } => "agent.disable",
56 AdminRequestKind::AgentModify { .. } => "agent.modify",
57 AdminRequestKind::AgentList => "agent.list",
58 AdminRequestKind::QuotaSet { .. } => "quota.set",
59 AdminRequestKind::QuotaGet { .. } => "quota.get",
60 AdminRequestKind::UsageGet { .. } => "usage.get",
61 AdminRequestKind::GroupCreate { .. } => "group.create",
62 AdminRequestKind::GroupDelete { .. } => "group.delete",
63 AdminRequestKind::GroupModify { .. } => "group.modify",
64 AdminRequestKind::GroupList => "group.list",
65 AdminRequestKind::CapsGrant { .. } => "caps.grant",
66 AdminRequestKind::CapsRevoke { .. } => "caps.revoke",
67 AdminRequestKind::InviteIssue { .. } => "invite.issue",
68 AdminRequestKind::InviteRedeem { .. } => "invite.redeem",
69 AdminRequestKind::InviteList => "invite.list",
70 AdminRequestKind::InviteRevoke { .. } => "invite.revoke",
71 AdminRequestKind::PairDeviceIssue { .. } => "auth.pair.issue",
72 AdminRequestKind::PairDeviceRedeem { .. } => "auth.pair.redeem",
73 }
74}
75
76#[must_use]
78pub fn request_topic(req: &AdminRequestKind) -> String {
79 format!("{ADMIN_INPUT_PREFIX}{}", topic_suffix(req))
80}
81
82#[must_use]
84pub fn response_topic(req: &AdminRequestKind) -> String {
85 format!("{ADMIN_RESPONSE_PREFIX}{}", topic_suffix(req))
86}
87
88pub struct AdminClient {
91 inner: SocketClient,
92 caller: PrincipalId,
93 timeout: Duration,
94}
95
96impl AdminClient {
97 pub async fn connect(caller: PrincipalId) -> Result<Self> {
106 let session_id = astrid_core::SessionId::from_uuid(Uuid::new_v4());
107 let inner = SocketClient::connect(session_id)
108 .await
109 .context("Failed to connect to Astrid daemon. Run `astrid start` to launch it.")?;
110 Ok(Self {
111 inner,
112 caller,
113 timeout: DEFAULT_TIMEOUT,
114 })
115 }
116
117 #[must_use]
119 pub const fn with_timeout(mut self, timeout: Duration) -> Self {
120 self.timeout = timeout;
121 self
122 }
123
124 #[must_use]
126 pub const fn caller(&self) -> &PrincipalId {
127 &self.caller
128 }
129
130 pub async fn request(&mut self, kind: AdminRequestKind) -> Result<AdminResponseBody> {
141 let request_id = Uuid::new_v4().to_string();
142 let topic = request_topic(&kind);
143 let want_response = response_topic(&kind);
144
145 let req = AdminKernelRequest::with_request_id(request_id.clone(), kind);
146 let payload =
147 serde_json::to_value(&req).context("Failed to serialize AdminKernelRequest")?;
148 let msg = IpcMessage::new(topic, IpcPayload::RawJson(payload), Uuid::nil())
149 .with_principal(self.caller.to_string());
150 self.inner.send_message(msg).await?;
151
152 let deadline = tokio::time::Instant::now()
153 .checked_add(self.timeout)
154 .unwrap_or_else(tokio::time::Instant::now);
155 loop {
156 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
157 if remaining.is_zero() {
158 anyhow::bail!(
159 "Admin request timed out after {:?} waiting for {want_response}",
160 self.timeout
161 );
162 }
163 let read = tokio::time::timeout(remaining, self.inner.read_raw_frame()).await;
164 let frame = match read {
165 Ok(Ok(Some(bytes))) => bytes,
166 Ok(Ok(None)) => {
167 anyhow::bail!("Daemon closed the connection before responding");
168 },
169 Ok(Err(e)) => return Err(e),
170 Err(_) => {
171 anyhow::bail!(
172 "Admin request timed out after {:?} waiting for {want_response}",
173 self.timeout
174 );
175 },
176 };
177
178 let raw: Value = match serde_json::from_slice(&frame) {
185 Ok(v) => v,
186 Err(e) => {
187 tracing::debug!(error = %e, "frame is not JSON, skipping");
188 continue;
189 },
190 };
191 let topic = raw
192 .get("topic")
193 .and_then(|t| t.as_str())
194 .unwrap_or_default();
195 if topic != want_response {
196 tracing::debug!(topic = %topic, "ignoring non-matching message");
197 continue;
198 }
199 let Some(payload) = raw.get("payload").cloned() else {
200 tracing::warn!(topic = %topic, "matched response missing payload");
201 continue;
202 };
203 let response_value = if payload
208 .as_object()
209 .is_some_and(|m| m.contains_key("type") && m.contains_key("value"))
210 {
211 payload.get("value").cloned().unwrap_or(payload)
212 } else {
213 payload
214 };
215 match serde_json::from_value::<AdminKernelResponse>(response_value) {
216 Ok(resp) => {
217 if resp.request_id.as_deref() == Some(&request_id) {
218 return Ok(resp.body);
219 }
220 tracing::debug!(
221 echoed = ?resp.request_id,
222 expected = %request_id,
223 "ignoring response with non-matching request_id"
224 );
225 },
226 Err(e) => {
227 tracing::warn!(error = %e, "failed to deserialize admin response");
228 },
229 }
230 }
231 }
232}
233
234pub fn into_result(body: AdminResponseBody) -> Result<AdminResponseBody> {
242 match body {
243 AdminResponseBody::Error(msg) => Err(anyhow!("kernel rejected request: {msg}")),
244 other => Ok(other),
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use astrid_core::PrincipalId;
252
253 #[test]
254 fn topic_suffixes_match_kernel_constants() {
255 assert_eq!(
256 topic_suffix(&AdminRequestKind::AgentCreate {
257 name: "x".into(),
258 groups: vec![],
259 grants: vec![],
260 }),
261 "agent.create"
262 );
263 assert_eq!(topic_suffix(&AdminRequestKind::AgentList), "agent.list");
264 assert_eq!(topic_suffix(&AdminRequestKind::GroupList), "group.list");
265 let p = PrincipalId::default();
266 assert_eq!(
267 topic_suffix(&AdminRequestKind::QuotaGet { principal: p }),
268 "quota.get"
269 );
270 }
271
272 #[test]
273 fn request_topic_uses_admin_prefix() {
274 let req = AdminRequestKind::AgentList;
275 assert_eq!(request_topic(&req), "astrid.v1.admin.agent.list");
276 assert_eq!(response_topic(&req), "astrid.v1.admin.response.agent.list");
277 }
278
279 #[test]
280 fn invite_topic_suffixes() {
281 assert_eq!(
282 topic_suffix(&AdminRequestKind::InviteIssue {
283 group: "agent".into(),
284 expires_secs: Some(3600),
285 max_uses: 1,
286 metadata: None,
287 }),
288 "invite.issue"
289 );
290 assert_eq!(
291 topic_suffix(&AdminRequestKind::InviteRedeem {
292 token: "x".into(),
293 public_key: String::new(),
294 display_name: None,
295 }),
296 "invite.redeem"
297 );
298 assert_eq!(topic_suffix(&AdminRequestKind::InviteList), "invite.list");
299 }
300
301 #[test]
302 fn into_result_lifts_error_variant() {
303 let err = AdminResponseBody::Error("permission denied".into());
304 let res = into_result(err);
305 assert!(res.is_err());
306 let msg = res.unwrap_err().to_string();
307 assert!(msg.contains("permission denied"), "got: {msg}");
308 }
309
310 #[test]
311 fn into_result_passes_through_success() {
312 let ok = AdminResponseBody::AgentList(vec![]);
313 let res = into_result(ok);
314 assert!(matches!(res, Ok(AdminResponseBody::AgentList(_))));
315 }
316}