use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use astrid_core::PrincipalId;
use astrid_core::kernel_api::{
AdminKernelRequest, AdminKernelResponse, AdminRequestKind, AdminResponseBody,
};
use astrid_types::ipc::{IpcMessage, IpcPayload};
use serde_json::Value;
use uuid::Uuid;
use crate::socket_client::SocketClient;
const ADMIN_INPUT_PREFIX: &str = "astrid.v1.admin.";
const ADMIN_RESPONSE_PREFIX: &str = "astrid.v1.admin.response.";
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
#[must_use]
pub const fn topic_suffix(req: &AdminRequestKind) -> &'static str {
match req {
AdminRequestKind::AgentCreate { .. } => "agent.create",
AdminRequestKind::AgentDelete { .. } => "agent.delete",
AdminRequestKind::AgentEnable { .. } => "agent.enable",
AdminRequestKind::AgentDisable { .. } => "agent.disable",
AdminRequestKind::AgentModify { .. } => "agent.modify",
AdminRequestKind::AgentList => "agent.list",
AdminRequestKind::QuotaSet { .. } => "quota.set",
AdminRequestKind::QuotaGet { .. } => "quota.get",
AdminRequestKind::UsageGet { .. } => "usage.get",
AdminRequestKind::GroupCreate { .. } => "group.create",
AdminRequestKind::GroupDelete { .. } => "group.delete",
AdminRequestKind::GroupModify { .. } => "group.modify",
AdminRequestKind::GroupList => "group.list",
AdminRequestKind::CapsGrant { .. } => "caps.grant",
AdminRequestKind::CapsRevoke { .. } => "caps.revoke",
AdminRequestKind::InviteIssue { .. } => "invite.issue",
AdminRequestKind::InviteRedeem { .. } => "invite.redeem",
AdminRequestKind::InviteList => "invite.list",
AdminRequestKind::InviteRevoke { .. } => "invite.revoke",
AdminRequestKind::PairDeviceIssue { .. } => "auth.pair.issue",
AdminRequestKind::PairDeviceRedeem { .. } => "auth.pair.redeem",
}
}
#[must_use]
pub fn request_topic(req: &AdminRequestKind) -> String {
format!("{ADMIN_INPUT_PREFIX}{}", topic_suffix(req))
}
#[must_use]
pub fn response_topic(req: &AdminRequestKind) -> String {
format!("{ADMIN_RESPONSE_PREFIX}{}", topic_suffix(req))
}
pub struct AdminClient {
inner: SocketClient,
caller: PrincipalId,
timeout: Duration,
}
impl AdminClient {
pub async fn connect(caller: PrincipalId) -> Result<Self> {
let session_id = astrid_core::SessionId::from_uuid(Uuid::new_v4());
let inner = SocketClient::connect(session_id)
.await
.context("Failed to connect to Astrid daemon. Run `astrid start` to launch it.")?;
Ok(Self {
inner,
caller,
timeout: DEFAULT_TIMEOUT,
})
}
#[must_use]
pub const fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub const fn caller(&self) -> &PrincipalId {
&self.caller
}
pub async fn request(&mut self, kind: AdminRequestKind) -> Result<AdminResponseBody> {
let request_id = Uuid::new_v4().to_string();
let topic = request_topic(&kind);
let want_response = response_topic(&kind);
let req = AdminKernelRequest::with_request_id(request_id.clone(), kind);
let payload =
serde_json::to_value(&req).context("Failed to serialize AdminKernelRequest")?;
let msg = IpcMessage::new(topic, IpcPayload::RawJson(payload), Uuid::nil())
.with_principal(self.caller.to_string());
self.inner.send_message(msg).await?;
let deadline = tokio::time::Instant::now()
.checked_add(self.timeout)
.unwrap_or_else(tokio::time::Instant::now);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
anyhow::bail!(
"Admin request timed out after {:?} waiting for {want_response}",
self.timeout
);
}
let read = tokio::time::timeout(remaining, self.inner.read_raw_frame()).await;
let frame = match read {
Ok(Ok(Some(bytes))) => bytes,
Ok(Ok(None)) => {
anyhow::bail!("Daemon closed the connection before responding");
},
Ok(Err(e)) => return Err(e),
Err(_) => {
anyhow::bail!(
"Admin request timed out after {:?} waiting for {want_response}",
self.timeout
);
},
};
let raw: Value = match serde_json::from_slice(&frame) {
Ok(v) => v,
Err(e) => {
tracing::debug!(error = %e, "frame is not JSON, skipping");
continue;
},
};
let topic = raw
.get("topic")
.and_then(|t| t.as_str())
.unwrap_or_default();
if topic != want_response {
tracing::debug!(topic = %topic, "ignoring non-matching message");
continue;
}
let Some(payload) = raw.get("payload").cloned() else {
tracing::warn!(topic = %topic, "matched response missing payload");
continue;
};
let response_value = if payload
.as_object()
.is_some_and(|m| m.contains_key("type") && m.contains_key("value"))
{
payload.get("value").cloned().unwrap_or(payload)
} else {
payload
};
match serde_json::from_value::<AdminKernelResponse>(response_value) {
Ok(resp) => {
if resp.request_id.as_deref() == Some(&request_id) {
return Ok(resp.body);
}
tracing::debug!(
echoed = ?resp.request_id,
expected = %request_id,
"ignoring response with non-matching request_id"
);
},
Err(e) => {
tracing::warn!(error = %e, "failed to deserialize admin response");
},
}
}
}
}
pub fn into_result(body: AdminResponseBody) -> Result<AdminResponseBody> {
match body {
AdminResponseBody::Error(msg) => Err(anyhow!("kernel rejected request: {msg}")),
other => Ok(other),
}
}
#[cfg(test)]
mod tests {
use super::*;
use astrid_core::PrincipalId;
#[test]
fn topic_suffixes_match_kernel_constants() {
assert_eq!(
topic_suffix(&AdminRequestKind::AgentCreate {
name: "x".into(),
groups: vec![],
grants: vec![],
}),
"agent.create"
);
assert_eq!(topic_suffix(&AdminRequestKind::AgentList), "agent.list");
assert_eq!(topic_suffix(&AdminRequestKind::GroupList), "group.list");
let p = PrincipalId::default();
assert_eq!(
topic_suffix(&AdminRequestKind::QuotaGet { principal: p }),
"quota.get"
);
}
#[test]
fn request_topic_uses_admin_prefix() {
let req = AdminRequestKind::AgentList;
assert_eq!(request_topic(&req), "astrid.v1.admin.agent.list");
assert_eq!(response_topic(&req), "astrid.v1.admin.response.agent.list");
}
#[test]
fn invite_topic_suffixes() {
assert_eq!(
topic_suffix(&AdminRequestKind::InviteIssue {
group: "agent".into(),
expires_secs: Some(3600),
max_uses: 1,
metadata: None,
}),
"invite.issue"
);
assert_eq!(
topic_suffix(&AdminRequestKind::InviteRedeem {
token: "x".into(),
public_key: String::new(),
display_name: None,
}),
"invite.redeem"
);
assert_eq!(topic_suffix(&AdminRequestKind::InviteList), "invite.list");
}
#[test]
fn into_result_lifts_error_variant() {
let err = AdminResponseBody::Error("permission denied".into());
let res = into_result(err);
assert!(res.is_err());
let msg = res.unwrap_err().to_string();
assert!(msg.contains("permission denied"), "got: {msg}");
}
#[test]
fn into_result_passes_through_success() {
let ok = AdminResponseBody::AgentList(vec![]);
let res = into_result(ok);
assert!(matches!(res, Ok(AdminResponseBody::AgentList(_))));
}
}