#[cfg(feature = "test-harness")]
pub mod mock;
pub mod runtime_sender;
pub mod takeover;
pub mod transcripts;
#[cfg(feature = "test-harness")]
pub use mock::{MockAdminRpc, MockRequest};
pub use runtime_sender::WriterAdminSender;
pub use takeover::{HumanTakeover, SendReplyArgs};
pub use transcripts::TranscriptStream;
use std::sync::{Arc, OnceLock};
use async_trait::async_trait;
use dashmap::DashMap;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use thiserror::Error;
use tokio::sync::oneshot;
use uuid::Uuid;
pub type OperatorHashSource = Arc<dyn Fn() -> String + Send + Sync>;
pub const DEFAULT_ADMIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum AdminError {
#[error("capability_not_granted: {capability} for method {method}")]
CapabilityNotGranted {
capability: String,
method: String,
},
#[error("invalid_params: {0}")]
InvalidParams(String),
#[error("method_not_found: {0}")]
MethodNotFound(String),
#[error("not_found: {0}")]
NotFound(String),
#[error("internal: {0}")]
Internal(String),
#[error("transport: {0}")]
Transport(String),
}
impl AdminError {
pub fn from_rpc_error(error: &Value) -> Self {
let code = error.get("code").and_then(Value::as_i64).unwrap_or(-32603);
let message = error
.get("message")
.and_then(Value::as_str)
.unwrap_or("(no message)")
.to_string();
let data = error.get("data");
match code {
-32004 => {
let capability = data
.and_then(|d| d.get("capability"))
.and_then(Value::as_str)
.unwrap_or("(unknown)")
.to_string();
let method = data
.and_then(|d| d.get("method"))
.and_then(Value::as_str)
.unwrap_or("(unknown)")
.to_string();
AdminError::CapabilityNotGranted { capability, method }
}
-32602 => AdminError::InvalidParams(message),
-32601 => {
if message.contains("not_found") {
AdminError::NotFound(message)
} else {
AdminError::MethodNotFound(message)
}
}
_ => AdminError::Internal(message),
}
}
}
#[async_trait]
pub trait AdminSender: Send + Sync + std::fmt::Debug {
async fn send_line(&self, line: String) -> Result<(), AdminError>;
}
#[derive(Clone)]
pub struct AdminClient {
sender: Arc<dyn AdminSender>,
pending: Arc<DashMap<String, oneshot::Sender<Result<Value, AdminError>>>>,
timeout: std::time::Duration,
operator_hash_source: Arc<OnceLock<OperatorHashSource>>,
}
impl std::fmt::Debug for AdminClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AdminClient")
.field("sender", &self.sender)
.field("pending_len", &self.pending.len())
.field("timeout", &self.timeout)
.field(
"operator_hash_source",
&if self.operator_hash_source.get().is_some() {
"Some(<closure>)"
} else {
"None"
},
)
.finish()
}
}
impl AdminClient {
pub fn new(sender: Arc<dyn AdminSender>) -> Self {
Self::with_timeout(sender, DEFAULT_ADMIN_TIMEOUT)
}
pub fn with_timeout(sender: Arc<dyn AdminSender>, timeout: std::time::Duration) -> Self {
Self {
sender,
pending: Arc::new(DashMap::new()),
timeout,
operator_hash_source: Arc::new(OnceLock::new()),
}
}
pub fn set_operator_token_hash<F>(&self, source: F)
where
F: Fn() -> String + Send + Sync + 'static,
{
let arc: OperatorHashSource = Arc::new(source);
if self.operator_hash_source.set(arc).is_err() {
tracing::warn!(
"AdminClient::set_operator_token_hash called twice; \
keeping the source registered first"
);
}
}
fn maybe_stamp_operator(&self, method: &str, params: &mut Value) {
let Some(source) = self.operator_hash_source.get() else {
return;
};
if !nexo_tool_meta::admin::operator_stamping::is_operator_stamped(method) {
return;
}
let Some(obj) = params.as_object_mut() else {
return;
};
obj.insert("operator_token_hash".to_string(), Value::String(source()));
}
pub fn next_request_id() -> String {
format!("app:{}", Uuid::new_v4())
}
pub async fn call<P: Serialize, R: DeserializeOwned>(
&self,
method: &str,
params: P,
) -> Result<R, AdminError> {
let raw = self
.call_raw(
method,
serde_json::to_value(¶ms)
.map_err(|e| AdminError::InvalidParams(format!("serialize params: {e}")))?,
)
.await?;
serde_json::from_value(raw)
.map_err(|e| AdminError::Internal(format!("deserialise result: {e}")))
}
pub async fn call_raw(&self, method: &str, params: Value) -> Result<Value, AdminError> {
let id = Self::next_request_id();
let (tx, rx) = oneshot::channel();
self.pending.insert(id.clone(), tx);
let mut params = params;
self.maybe_stamp_operator(method, &mut params);
let frame = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
let line = serde_json::to_string(&frame)
.map_err(|e| AdminError::Internal(format!("serialize frame: {e}")))?;
if let Err(e) = self.sender.send_line(line).await {
self.pending.remove(&id);
return Err(e);
}
match tokio::time::timeout(self.timeout, rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => {
self.pending.remove(&id);
Err(AdminError::Transport("response channel closed".into()))
}
Err(_) => {
self.pending.remove(&id);
Err(AdminError::Transport(format!(
"timeout after {:?} waiting for {method}",
self.timeout
)))
}
}
}
pub fn on_inbound_response(&self, id: &str, frame: &Value) -> bool {
let Some((_, tx)) = self.pending.remove(id) else {
return false;
};
let payload = if let Some(error) = frame.get("error") {
Err(AdminError::from_rpc_error(error))
} else {
Ok(frame.get("result").cloned().unwrap_or(Value::Null))
};
let _ = tx.send(payload);
true
}
pub fn pending_len(&self) -> usize {
self.pending.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex as StdMutex;
#[derive(Debug, Default, Clone)]
struct CaptureSender {
lines: Arc<StdMutex<Vec<String>>>,
}
#[async_trait]
impl AdminSender for CaptureSender {
async fn send_line(&self, line: String) -> Result<(), AdminError> {
self.lines.lock().unwrap().push(line);
Ok(())
}
}
#[derive(Debug, Default)]
struct FailingSender;
#[async_trait]
impl AdminSender for FailingSender {
async fn send_line(&self, _line: String) -> Result<(), AdminError> {
Err(AdminError::Transport("simulated stdin closed".into()))
}
}
#[tokio::test]
async fn admin_client_round_trip_via_capture_sender() {
let sender = Arc::new(CaptureSender::default());
let client = AdminClient::new(sender.clone());
let client_for_call = client.clone();
let call = tokio::spawn(async move {
client_for_call
.call_raw("nexo/admin/echo", serde_json::json!({ "x": 1 }))
.await
});
for _ in 0..100 {
if !sender.lines.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let captured = sender
.lines
.lock()
.unwrap()
.first()
.cloned()
.expect("frame written");
let frame: Value = serde_json::from_str(&captured).unwrap();
assert_eq!(frame["method"], "nexo/admin/echo");
assert!(frame["id"].as_str().unwrap().starts_with("app:"));
let id = frame["id"].as_str().unwrap().to_string();
let response = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"result": { "echoed": { "x": 1 } }
});
assert!(client.on_inbound_response(&id, &response));
let result = call.await.unwrap().unwrap();
assert_eq!(result["echoed"]["x"], 1);
}
#[tokio::test]
async fn admin_client_capability_not_granted_maps_to_typed_error() {
let sender = Arc::new(CaptureSender::default());
let client = AdminClient::with_timeout(sender.clone(), std::time::Duration::from_secs(2));
let client_for_call = client.clone();
let call = tokio::spawn(async move {
client_for_call
.call_raw("nexo/admin/agents/list", serde_json::json!({}))
.await
});
for _ in 0..100 {
if !sender.lines.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let id =
serde_json::from_str::<Value>(&sender.lines.lock().unwrap().first().cloned().unwrap())
.unwrap()["id"]
.as_str()
.unwrap()
.to_string();
let error_frame = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32004,
"message": "capability_not_granted",
"data": {
"capability": "agents_crud",
"microapp_id": "agent-creator",
"method": "nexo/admin/agents/list"
}
}
});
client.on_inbound_response(&id, &error_frame);
let err = call.await.unwrap().unwrap_err();
match err {
AdminError::CapabilityNotGranted { capability, method } => {
assert_eq!(capability, "agents_crud");
assert_eq!(method, "nexo/admin/agents/list");
}
other => panic!("expected CapabilityNotGranted, got {other:?}"),
}
}
#[tokio::test]
async fn admin_client_transport_error_returned_when_send_fails() {
let client = AdminClient::new(Arc::new(FailingSender));
let err = client
.call_raw("nexo/admin/echo", Value::Null)
.await
.unwrap_err();
assert!(matches!(err, AdminError::Transport(_)));
assert_eq!(
client.pending_len(),
0,
"pending entry cleared on send fail"
);
}
#[test]
fn next_request_id_uses_app_prefix() {
let id = AdminClient::next_request_id();
assert!(id.starts_with("app:"));
assert_eq!(id.len(), "app:".len() + 36);
}
async fn drive_send_only(
client: &AdminClient,
sender: &CaptureSender,
method: &str,
params: Value,
) -> Value {
let client_for_call = client.clone();
let m = method.to_string();
let p = params.clone();
tokio::spawn(async move {
let _ = client_for_call.call_raw(&m, p).await;
});
for _ in 0..200 {
if !sender.lines.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
let line = sender
.lines
.lock()
.unwrap()
.first()
.cloned()
.expect("frame written");
serde_json::from_str(&line).unwrap()
}
#[tokio::test]
async fn client_without_source_passes_params_through() {
let sender = Arc::new(CaptureSender::default());
let client =
AdminClient::with_timeout(sender.clone(), std::time::Duration::from_millis(50));
let frame = drive_send_only(
&client,
&sender,
"nexo/admin/processing/pause",
serde_json::json!({ "scope": "x" }),
)
.await;
assert!(frame["params"]
.as_object()
.unwrap()
.get("operator_token_hash")
.is_none());
}
#[tokio::test]
async fn client_with_source_stamps_processing_pause() {
let sender = Arc::new(CaptureSender::default());
let client =
AdminClient::with_timeout(sender.clone(), std::time::Duration::from_millis(50));
client.set_operator_token_hash(|| "abc123".to_string());
let frame = drive_send_only(
&client,
&sender,
"nexo/admin/processing/pause",
serde_json::json!({ "scope": "x" }),
)
.await;
assert_eq!(frame["params"]["operator_token_hash"], "abc123");
}
#[tokio::test]
async fn client_with_source_overrides_caller_value() {
let sender = Arc::new(CaptureSender::default());
let client =
AdminClient::with_timeout(sender.clone(), std::time::Duration::from_millis(50));
client.set_operator_token_hash(|| "trusted".to_string());
let frame = drive_send_only(
&client,
&sender,
"nexo/admin/processing/pause",
serde_json::json!({
"scope": "x",
"operator_token_hash": "untrusted-from-client"
}),
)
.await;
assert_eq!(frame["params"]["operator_token_hash"], "trusted");
}
#[tokio::test]
async fn client_with_source_skips_non_stamped_methods() {
let sender = Arc::new(CaptureSender::default());
let client =
AdminClient::with_timeout(sender.clone(), std::time::Duration::from_millis(50));
client.set_operator_token_hash(|| "should-not-leak".to_string());
let frame = drive_send_only(
&client,
&sender,
"nexo/admin/agents/list",
serde_json::json!({}),
)
.await;
assert!(frame["params"]
.as_object()
.unwrap()
.get("operator_token_hash")
.is_none());
}
#[tokio::test]
async fn client_source_called_per_request_supports_hot_swap() {
use std::sync::atomic::{AtomicUsize, Ordering};
let sender = Arc::new(CaptureSender::default());
let client =
AdminClient::with_timeout(sender.clone(), std::time::Duration::from_millis(50));
let counter = Arc::new(AtomicUsize::new(0));
let counter_for_closure = counter.clone();
client.set_operator_token_hash(move || {
let n = counter_for_closure.fetch_add(1, Ordering::SeqCst);
format!("hash-{n}")
});
let client_a = client.clone();
let h1 = tokio::spawn(async move {
let _ = client_a
.call_raw(
"nexo/admin/processing/pause",
serde_json::json!({"scope": "a"}),
)
.await;
});
let client_b = client.clone();
let h2 = tokio::spawn(async move {
let _ = client_b
.call_raw(
"nexo/admin/processing/resume",
serde_json::json!({"scope": "b"}),
)
.await;
});
let _ = h1.await;
let _ = h2.await;
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"closure called once per stamped request"
);
let client_c = client.clone();
let h3 = tokio::spawn(async move {
let _ = client_c
.call_raw("nexo/admin/agents/list", serde_json::json!({}))
.await;
});
let _ = h3.await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn client_with_source_skips_when_params_not_object() {
let sender = Arc::new(CaptureSender::default());
let client =
AdminClient::with_timeout(sender.clone(), std::time::Duration::from_millis(50));
client.set_operator_token_hash(|| "abc".to_string());
let frame =
drive_send_only(&client, &sender, "nexo/admin/processing/pause", Value::Null).await;
assert_eq!(frame["params"], Value::Null);
}
#[test]
fn from_rpc_error_falls_back_to_internal_for_unknown_code() {
let err = AdminError::from_rpc_error(&serde_json::json!({
"code": -99999,
"message": "alien error"
}));
match err {
AdminError::Internal(m) => assert!(m.contains("alien error")),
other => panic!("expected Internal, got {other:?}"),
}
}
}