use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use serde::Deserialize;
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::{oneshot, Mutex as AsyncMutex};
use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
type WsWrite = SplitSink<WsStream, Message>;
type WsRead = SplitStream<WsStream>;
type RpcResult = Result<Value, String>;
pub type ServerRequestHandler =
Arc<dyn Fn(Value) -> Pin<Box<dyn Future<Output = Result<Value, String>> + Send>> + Send + Sync>;
pub type NotificationHandler = Arc<dyn Fn(Value) + Send + Sync>;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_READ_TIMEOUT_SECS: u64 = 30;
fn read_timeout() -> Duration {
std::env::var("CAR_DAEMON_TIMEOUT")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(DEFAULT_READ_TIMEOUT_SECS))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeMode {
Daemon,
}
impl RuntimeMode {
pub fn from_env() -> Self {
Self::Daemon
}
pub fn resolve_or_err() -> Result<Self, String> {
Ok(Self::Daemon)
}
}
pub fn daemon_port() -> u16 {
car_proto::daemon::daemon_port()
}
pub fn daemon_ws_url() -> String {
car_proto::daemon::daemon_ws_url()
}
#[derive(Debug, Deserialize)]
struct JsonRpcErrorPayload {
code: i64,
message: String,
}
#[derive(Debug, Deserialize)]
struct IncomingFrame {
#[serde(default)]
method: Option<String>,
#[serde(default)]
params: Option<Value>,
#[serde(default)]
result: Option<Value>,
#[serde(default)]
error: Option<JsonRpcErrorPayload>,
#[serde(default)]
id: Option<Value>,
}
enum AuthOutcome {
Accepted,
Rejected(String),
}
async fn send_auth(
socket: &mut WsStream,
token: &str,
id: u64,
) -> Result<AuthOutcome, String> {
let handshake = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": "session.auth",
"params": { "token": token },
});
let payload = serde_json::to_string(&handshake)
.map_err(|e| format!("serialize session.auth: {e}"))?;
socket
.send(Message::Text(payload.into()))
.await
.map_err(|e| format!("send session.auth: {e}"))?;
let auth_to = read_timeout();
match timeout(auth_to, socket.next()).await {
Ok(Some(Ok(Message::Text(text)))) => {
if let Ok(env) = serde_json::from_str::<IncomingFrame>(&text) {
if let Some(err) = env.error {
return Ok(AuthOutcome::Rejected(err.message));
}
}
Ok(AuthOutcome::Accepted)
}
Ok(Some(Ok(_))) => Ok(AuthOutcome::Accepted),
Ok(Some(Err(e))) => Err(format!("recv session.auth response: {e}")),
Ok(None) => Err("daemon closed during session.auth".to_string()),
Err(_) => Err(format!(
"session.auth timed out after {}s",
auth_to.as_secs()
)),
}
}
pub struct DaemonClient {
state: AsyncMutex<Option<ConnState>>,
url: String,
req_id: AtomicU64,
pending: Arc<StdMutex<HashMap<u64, oneshot::Sender<RpcResult>>>>,
handlers: Arc<StdMutex<HashMap<String, ServerRequestHandler>>>,
notif_handlers: Arc<StdMutex<HashMap<String, NotificationHandler>>>,
}
struct ConnState {
write: Arc<AsyncMutex<WsWrite>>,
recv_task: tokio::task::AbortHandle,
}
impl DaemonClient {
pub fn new() -> Arc<Self> {
Arc::new(Self {
state: AsyncMutex::new(None),
url: daemon_ws_url(),
req_id: AtomicU64::new(1),
pending: Arc::new(StdMutex::new(HashMap::new())),
handlers: Arc::new(StdMutex::new(HashMap::new())),
notif_handlers: Arc::new(StdMutex::new(HashMap::new())),
})
}
pub fn with_url(url: impl Into<String>) -> Arc<Self> {
Arc::new(Self {
state: AsyncMutex::new(None),
url: url.into(),
req_id: AtomicU64::new(1),
pending: Arc::new(StdMutex::new(HashMap::new())),
handlers: Arc::new(StdMutex::new(HashMap::new())),
notif_handlers: Arc::new(StdMutex::new(HashMap::new())),
})
}
pub fn register_handler<F, Fut>(&self, method: &str, handler: F)
where
F: Fn(Value) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Value, String>> + Send + 'static,
{
let h: ServerRequestHandler = Arc::new(move |params| Box::pin(handler(params)));
if let Ok(mut g) = self.handlers.lock() {
g.insert(method.to_string(), h);
}
}
pub fn unregister_handler(&self, method: &str) {
if let Ok(mut g) = self.handlers.lock() {
g.remove(method);
}
}
pub fn register_notification_handler<F>(&self, method: &str, handler: F)
where
F: Fn(Value) + Send + Sync + 'static,
{
let h: NotificationHandler = Arc::new(handler);
if let Ok(mut g) = self.notif_handlers.lock() {
g.insert(method.to_string(), h);
}
}
pub fn unregister_notification_handler(&self, method: &str) {
if let Ok(mut g) = self.notif_handlers.lock() {
g.remove(method);
}
}
pub async fn call(&self, method: &str, params: Value) -> Result<Value, String> {
let write = self.ensure_connected().await?;
let id = self.req_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel::<RpcResult>();
if let Ok(mut g) = self.pending.lock() {
g.insert(id, tx);
}
let rpc = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
let payload = match serde_json::to_string(&rpc) {
Ok(s) => s,
Err(e) => {
if let Ok(mut g) = self.pending.lock() {
g.remove(&id);
}
return Err(format!("serialize {method} request: {e}"));
}
};
if let Err(e) = write.lock().await.send(Message::Text(payload.into())).await {
if let Ok(mut g) = self.pending.lock() {
g.remove(&id);
}
self.reset().await;
return Err(format!("send {method} request: {e}"));
}
let read_to = read_timeout();
match timeout(read_to, rx).await {
Ok(Ok(result)) => match result {
Ok(v) => Ok(v),
Err(e) => Err(format!("rpc {method}: {e}")),
},
Ok(Err(_)) => Err(format!("rpc channel closed for {method}")),
Err(_) => {
if let Ok(mut g) = self.pending.lock() {
g.remove(&id);
}
Err(format!(
"daemon read timeout on {method} after {}s",
read_to.as_secs()
))
}
}
}
async fn ensure_connected(&self) -> Result<Arc<AsyncMutex<WsWrite>>, String> {
let mut state = self.state.lock().await;
if let Some(s) = state.as_ref() {
return Ok(s.write.clone());
}
let connect_fut = connect_async(&self.url);
let (mut socket, _) = match timeout(CONNECT_TIMEOUT, connect_fut).await {
Ok(Ok(pair)) => pair,
Ok(Err(e)) => return Err(format!("connect daemon at {}: {}", self.url, e)),
Err(_) => {
return Err(format!(
"connect daemon at {} timed out after {}s",
self.url,
CONNECT_TIMEOUT.as_secs()
));
}
};
if let Ok(Some(token)) = crate::auth_token::read() {
match send_auth(&mut socket, &token, 0).await? {
AuthOutcome::Accepted => {}
AuthOutcome::Rejected(msg) => {
let looks_like_mismatch = msg.to_lowercase().contains("mismatch");
if !looks_like_mismatch {
return Err(format!("session.auth rejected by daemon: {msg}"));
}
let fresh = crate::auth_token::read()
.map_err(|e| {
format!("session.auth rejected (token mismatch); re-read failed: {e}")
})?
.ok_or_else(|| {
"session.auth rejected (token mismatch); token file vanished on re-read"
.to_string()
})?;
if fresh == token {
return Err(format!(
"session.auth rejected by daemon: {msg} (re-read returned same token — \
true auth failure, not a rotation race)"
));
}
match send_auth(&mut socket, &fresh, 1).await? {
AuthOutcome::Accepted => {}
AuthOutcome::Rejected(msg2) => {
return Err(format!(
"session.auth rejected by daemon after rotation re-read: {msg2}"
));
}
}
}
}
}
let (write_half, read_half) = socket.split();
let write_arc = Arc::new(AsyncMutex::new(write_half));
let pending = self.pending.clone();
let handlers = self.handlers.clone();
let notif_handlers = self.notif_handlers.clone();
let write_for_task = write_arc.clone();
let url_for_task = self.url.clone();
let task = tokio::spawn(async move {
recv_loop(
read_half,
write_for_task,
pending,
handlers,
notif_handlers,
url_for_task,
)
.await;
});
*state = Some(ConnState {
write: write_arc.clone(),
recv_task: task.abort_handle(),
});
Ok(write_arc)
}
async fn reset(&self) {
let mut state = self.state.lock().await;
if let Some(s) = state.take() {
s.recv_task.abort();
}
if let Ok(mut g) = self.pending.lock() {
g.clear();
}
}
}
async fn recv_loop(
mut read: WsRead,
write: Arc<AsyncMutex<WsWrite>>,
pending: Arc<StdMutex<HashMap<u64, oneshot::Sender<RpcResult>>>>,
handlers: Arc<StdMutex<HashMap<String, ServerRequestHandler>>>,
notif_handlers: Arc<StdMutex<HashMap<String, NotificationHandler>>>,
url: String,
) {
while let Some(frame) = read.next().await {
let msg = match frame {
Ok(m) => m,
Err(e) => {
tracing::warn!(
target: "car_ffi_common::proxy",
url = %url,
error = %e,
"recv loop read error; closing"
);
break;
}
};
match msg {
Message::Text(text) => {
let parsed: IncomingFrame = match serde_json::from_str(&text) {
Ok(f) => f,
Err(e) => {
tracing::warn!(
target: "car_ffi_common::proxy",
error = %e,
"parse incoming frame failed"
);
continue;
}
};
if let Some(method) = parsed.method.as_deref() {
let Some(id) = parsed.id.clone() else {
let h = notif_handlers
.lock()
.ok()
.and_then(|g| g.get(method).cloned());
let params = parsed.params.unwrap_or(Value::Null);
if let Some(h) = h {
let method_owned = method.to_string();
if let Err(e) =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| h(params)))
{
let panic_msg = e
.downcast_ref::<&'static str>()
.map(|s| (*s).to_string())
.or_else(|| e.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "<non-string panic payload>".to_string());
tracing::error!(
target: "car_ffi_common::proxy",
method = %method_owned,
panic = %panic_msg,
"notification handler panicked; recv loop continues"
);
}
} else {
tracing::debug!(
target: "car_ffi_common::proxy",
method = %method,
"no notification handler registered; dropping"
);
}
continue;
};
let handler = handlers.lock().ok().and_then(|g| g.get(method).cloned());
let params = parsed.params.unwrap_or(Value::Null);
let write_for_resp = write.clone();
let method_owned = method.to_string();
if let Some(h) = handler {
tokio::spawn(async move {
let resp = match h(params).await {
Ok(v) => serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"result": v,
}),
Err(e) => serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"error": { "code": -32000, "message": e },
}),
};
if let Ok(payload) = serde_json::to_string(&resp) {
let _ = write_for_resp
.lock()
.await
.send(Message::Text(payload.into()))
.await;
}
});
} else {
let resp = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32601,
"message": format!(
"no handler registered on FFI client for `{method_owned}`"
),
},
});
if let Ok(payload) = serde_json::to_string(&resp) {
let _ = write_for_resp
.lock()
.await
.send(Message::Text(payload.into()))
.await;
}
}
} else {
let Some(id) = parsed.id.as_ref().and_then(|v| v.as_u64()) else {
tracing::warn!(
target: "car_ffi_common::proxy",
"response with non-numeric id; ignoring"
);
continue;
};
let tx = pending.lock().ok().and_then(|mut g| g.remove(&id));
let Some(tx) = tx else {
tracing::debug!(
target: "car_ffi_common::proxy",
id,
"no pending request for response (likely timed out)"
);
continue;
};
let result: RpcResult = if let Some(err) = parsed.error {
Err(format!("{} {}", err.code, err.message))
} else {
Ok(parsed.result.unwrap_or(Value::Null))
};
let _ = tx.send(result);
}
}
Message::Binary(b) => {
tracing::debug!(
target: "car_ffi_common::proxy",
len = b.len(),
"skipping binary frame"
);
}
Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {}
Message::Close(_) => {
tracing::info!(
target: "car_ffi_common::proxy",
url = %url,
"daemon closed connection"
);
break;
}
}
}
if let Ok(mut g) = pending.lock() {
let count = g.len();
if count > 0 {
tracing::warn!(
target: "car_ffi_common::proxy",
url = %url,
count,
"recv loop ended with pending requests; dropping waiters"
);
}
g.clear();
}
}
pub async fn proxy_tools_register(client: &DaemonClient, name: &str) -> Result<(), String> {
let params = serde_json::json!([{ "name": name }]);
client.call("tools.register", params).await.map(|_| ())
}
pub async fn proxy_tools_register_schema(
client: &DaemonClient,
schema_json: &str,
) -> Result<(), String> {
let schema: Value =
serde_json::from_str(schema_json).map_err(|e| format!("invalid ToolSchema JSON: {e}"))?;
let params = serde_json::json!([schema]);
client.call("tools.register", params).await.map(|_| ())
}
pub async fn proxy_policy_register(client: &DaemonClient, params_json: &str) -> Result<(), String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid policy params JSON: {e}"))?;
client.call("policy.register", params).await.map(|_| ())
}
pub async fn proxy_register_agent_basics(client: &DaemonClient) -> Result<(), String> {
client
.call("agents.register_basics", Value::Null)
.await
.map(|_| ())
}
pub async fn proxy_prepare_parakeet(client: &DaemonClient) -> Result<String, String> {
let v = client.call("voice.prepare_parakeet", Value::Null).await?;
Ok(v.to_string())
}
pub async fn proxy_prepare_diarizer(client: &DaemonClient) -> Result<String, String> {
let v = client.call("voice.prepare_diarizer", Value::Null).await?;
Ok(v.to_string())
}
pub async fn proxy_state_set(
client: &DaemonClient,
key: &str,
value_json: &str,
) -> Result<(), String> {
let value: Value = serde_json::from_str(value_json)
.map_err(|e| format!("invalid value JSON for state.set: {e}"))?;
client
.call(
"state.set",
serde_json::json!({ "key": key, "value": value }),
)
.await
.map(|_| ())
}
pub async fn proxy_state_get(client: &DaemonClient, key: &str) -> Result<String, String> {
let v = client
.call("state.get", serde_json::json!({ "key": key }))
.await?;
Ok(serde_json::to_string(&v).unwrap_or_else(|_| "null".to_string()))
}
pub async fn proxy_state_exists(client: &DaemonClient, key: &str) -> Result<bool, String> {
let v = client
.call("state.exists", serde_json::json!({ "key": key }))
.await?;
Ok(v.as_bool().unwrap_or(false))
}
pub async fn proxy_state_keys(client: &DaemonClient) -> Result<Vec<String>, String> {
let v = client.call("state.keys", Value::Null).await?;
serde_json::from_value(v).map_err(|e| format!("parse state.keys: {e}"))
}
pub async fn proxy_state_snapshot(client: &DaemonClient) -> Result<String, String> {
let v = client.call("state.snapshot", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize state.snapshot: {e}"))
}
pub async fn proxy_memory_build_context_fast(
client: &DaemonClient,
query: &str,
model_context_window: Option<u32>,
) -> Result<String, String> {
let mut params = serde_json::json!({ "query": query });
if let Some(w) = model_context_window {
params["model_context_window"] = serde_json::json!(w);
}
let v = client.call("memory.build_context_fast", params).await?;
Ok(v.as_str().unwrap_or("").to_string())
}
pub async fn proxy_infer(client: &DaemonClient, request_json: &str) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid GenerateRequest JSON: {e}"))?;
let v = client.call("infer", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize infer result: {e}"))
}
pub async fn proxy_embed(
client: &DaemonClient,
texts_json: &str,
model: Option<&str>,
) -> Result<String, String> {
let texts: Value =
serde_json::from_str(texts_json).map_err(|e| format!("invalid texts JSON: {e}"))?;
let mut params = serde_json::json!({ "texts": texts });
if let Some(m) = model {
params["model"] = Value::String(m.to_string());
}
let v = client.call("embed", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize embed result: {e}"))
}
pub async fn proxy_classify(
client: &DaemonClient,
text: &str,
labels_json: &str,
model: Option<&str>,
) -> Result<String, String> {
let labels: Value =
serde_json::from_str(labels_json).map_err(|e| format!("invalid labels JSON: {e}"))?;
let mut params = serde_json::json!({ "text": text, "labels": labels });
if let Some(m) = model {
params["model"] = Value::String(m.to_string());
}
let v = client.call("classify", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize classify result: {e}"))
}
pub async fn proxy_verify(client: &DaemonClient, params_json: &str) -> Result<String, String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid verify params JSON: {e}"))?;
let v = client.call("verify", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize verify result: {e}"))
}
pub async fn proxy_tokenize(
client: &DaemonClient,
model: &str,
text: &str,
) -> Result<String, String> {
let v = client
.call(
"tokenize",
serde_json::json!({ "model": model, "text": text }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize tokenize result: {e}"))
}
pub async fn proxy_detokenize(
client: &DaemonClient,
model: &str,
tokens: &[u32],
) -> Result<String, String> {
let v = client
.call(
"detokenize",
serde_json::json!({ "model": model, "tokens": tokens }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize detokenize result: {e}"))
}
pub async fn proxy_skills_distill(
client: &DaemonClient,
events_json: &str,
) -> Result<String, String> {
let events: Value =
serde_json::from_str(events_json).map_err(|e| format!("invalid events JSON: {e}"))?;
let v = client
.call("skills.distill", serde_json::json!({ "events": events }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skills.distill result: {e}"))
}
pub async fn proxy_memory_consolidate(client: &DaemonClient) -> Result<String, String> {
let v = client.call("memory.consolidate", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize consolidate result: {e}"))
}
pub async fn proxy_memory_persist(client: &DaemonClient, path: &str) -> Result<u32, String> {
let v = client
.call("memory.persist", serde_json::json!({ "path": path }))
.await?;
let n = v
.as_u64()
.ok_or_else(|| format!("memory.persist returned non-numeric: {v}"))?;
Ok(n as u32)
}
pub async fn proxy_memory_load(client: &DaemonClient, path: &str) -> Result<u32, String> {
let v = client
.call("memory.load", serde_json::json!({ "path": path }))
.await?;
let n = v
.as_u64()
.ok_or_else(|| format!("memory.load returned non-numeric: {v}"))?;
Ok(n as u32)
}
pub async fn proxy_skills_ingest_distilled(
client: &DaemonClient,
skills_json: &str,
) -> Result<u32, String> {
let skills: Value =
serde_json::from_str(skills_json).map_err(|e| format!("invalid skills JSON: {e}"))?;
let v = client
.call(
"skills.ingest_distilled",
serde_json::json!({ "skills": skills }),
)
.await?;
let n = v
.get("ingested")
.and_then(|x| x.as_u64())
.ok_or_else(|| format!("ingest_distilled returned unexpected shape: {v}"))?;
Ok(n as u32)
}
pub async fn proxy_skill_repair(
client: &DaemonClient,
skill_name: &str,
) -> Result<Option<String>, String> {
let v = client
.call(
"skill.repair",
serde_json::json!({ "skill_name": skill_name }),
)
.await?;
if v.is_null() {
return Ok(None);
}
Ok(v.get("code")
.and_then(|c| c.as_str())
.map(|s| s.to_string()))
}
pub async fn proxy_skills_evolve(
client: &DaemonClient,
events_json: &str,
domain: &str,
) -> Result<String, String> {
let events: Value =
serde_json::from_str(events_json).map_err(|e| format!("invalid events JSON: {e}"))?;
let v = client
.call(
"skills.evolve",
serde_json::json!({ "events": events, "domain": domain }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skills.evolve result: {e}"))
}
pub async fn proxy_skills_domains_needing_evolution(
client: &DaemonClient,
threshold: Option<f64>,
) -> Result<Vec<String>, String> {
let mut params = serde_json::json!({});
if let Some(t) = threshold {
params["threshold"] = serde_json::json!(t);
}
let v = client
.call("skills.domains_needing_evolution", params)
.await?;
serde_json::from_value(v).map_err(|e| format!("parse domains: {e}"))
}
pub async fn proxy_rerank(client: &DaemonClient, request_json: &str) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid RerankRequest JSON: {e}"))?;
let v = client.call("rerank", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize rerank result: {e}"))
}
pub async fn proxy_transcribe(client: &DaemonClient, request_json: &str) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid TranscribeRequest JSON: {e}"))?;
let v = client.call("transcribe", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize transcribe result: {e}"))
}
pub async fn proxy_synthesize(client: &DaemonClient, request_json: &str) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid SynthesizeRequest JSON: {e}"))?;
let v = client.call("synthesize", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize synthesize result: {e}"))
}
pub async fn proxy_speech_prepare(client: &DaemonClient) -> Result<String, String> {
let v = client.call("speech.prepare", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize speech.prepare result: {e}"))
}
pub async fn proxy_models_route(client: &DaemonClient, prompt: &str) -> Result<String, String> {
let v = client
.call("models.route", serde_json::json!({ "prompt": prompt }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.route result: {e}"))
}
pub async fn proxy_models_stats(client: &DaemonClient) -> Result<String, String> {
let v = client.call("models.stats", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.stats result: {e}"))
}
pub async fn proxy_events_count(client: &DaemonClient) -> Result<u32, String> {
let v = client.call("events.count", Value::Null).await?;
v.as_u64()
.map(|n| n as u32)
.ok_or_else(|| format!("events.count returned non-u64: {v}"))
}
pub async fn proxy_events_stats(client: &DaemonClient) -> Result<String, String> {
let v = client.call("events.stats", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize events.stats result: {e}"))
}
pub async fn proxy_events_truncate(
client: &DaemonClient,
max_events: Option<u32>,
max_spans: Option<u32>,
) -> Result<String, String> {
let mut params = serde_json::json!({});
if let Some(max) = max_events {
params["maxEvents"] = serde_json::json!(max);
}
if let Some(max) = max_spans {
params["maxSpans"] = serde_json::json!(max);
}
let v = client.call("events.truncate", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize events.truncate result: {e}"))
}
pub async fn proxy_events_clear(client: &DaemonClient) -> Result<String, String> {
let v = client.call("events.clear", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize events.clear result: {e}"))
}
pub async fn proxy_replan_set_config(
client: &DaemonClient,
max_replans: u32,
delay_ms: u64,
verify_before_execute: bool,
) -> Result<(), String> {
client
.call(
"replan.set_config",
serde_json::json!({
"max_replans": max_replans,
"delay_ms": delay_ms,
"verify_before_execute": verify_before_execute,
}),
)
.await
.map(|_| ())
}
pub async fn proxy_memory_add_fact(
client: &DaemonClient,
subject: &str,
body: &str,
kind: Option<&str>,
confidence: Option<f64>,
) -> Result<u64, String> {
let mut params = serde_json::json!({
"subject": subject,
"body": body,
});
if let Some(k) = kind {
params["kind"] = Value::String(k.to_string());
}
if let Some(c) = confidence {
params["confidence"] = serde_json::json!(c);
}
let v = client.call("memory.add_fact", params).await?;
v.as_u64()
.ok_or_else(|| format!("memory.add_fact returned non-u64: {v}"))
}
pub async fn proxy_memory_query(
client: &DaemonClient,
query: &str,
k: Option<u32>,
) -> Result<String, String> {
let mut params = serde_json::json!({ "query": query });
if let Some(k) = k {
params["k"] = serde_json::json!(k);
}
let v = client.call("memory.query", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize memory.query result: {e}"))
}
pub async fn proxy_memory_fact_count(client: &DaemonClient) -> Result<u32, String> {
let v = client.call("memory.fact_count", Value::Null).await?;
v.as_u64()
.map(|n| n as u32)
.ok_or_else(|| format!("memory.fact_count returned non-u64: {v}"))
}
pub async fn proxy_memory_build_context(
client: &DaemonClient,
query: &str,
) -> Result<String, String> {
let v = client
.call(
"memory.build_context",
serde_json::json!({ "query": query }),
)
.await?;
Ok(v.as_str().unwrap_or("").to_string())
}
pub async fn proxy_skill_ingest(
client: &DaemonClient,
params_json: &str,
) -> Result<String, String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid skill.ingest params JSON: {e}"))?;
let v = client.call("skill.ingest", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skill.ingest result: {e}"))
}
pub async fn proxy_skill_find(
client: &DaemonClient,
persona: &str,
url: &str,
task: &str,
max_results: Option<u32>,
) -> Result<String, String> {
let mut params = serde_json::json!({
"persona": persona,
"url": url,
"task": task,
});
if let Some(n) = max_results {
params["max_results"] = serde_json::json!(n);
}
let v = client.call("skill.find", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skill.find result: {e}"))
}
pub async fn proxy_skill_report(
client: &DaemonClient,
skill_name: &str,
outcome: &str,
) -> Result<String, String> {
let v = client
.call(
"skill.report",
serde_json::json!({ "skill_name": skill_name, "outcome": outcome }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skill.report result: {e}"))
}
pub async fn proxy_skills_list(
client: &DaemonClient,
params_json: Option<&str>,
) -> Result<String, String> {
let params = match params_json {
Some(s) => {
serde_json::from_str(s).map_err(|e| format!("invalid skills.list params JSON: {e}"))?
}
None => Value::Null,
};
let v = client.call("skills.list", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skills.list result: {e}"))
}
pub async fn proxy_models_list(client: &DaemonClient) -> Result<String, String> {
let v = client.call("models.list", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.list result: {e}"))
}
pub async fn proxy_models_list_unified(client: &DaemonClient) -> Result<String, String> {
let v = client.call("models.list_unified", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.list_unified result: {e}"))
}
pub async fn proxy_models_pull(client: &DaemonClient, name: &str) -> Result<String, String> {
let v = client
.call("models.pull", serde_json::json!({ "name": name }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.pull result: {e}"))
}
pub async fn proxy_meeting_start(
client: &DaemonClient,
request_json: &str,
) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid meeting.start request JSON: {e}"))?;
let v = client.call("meeting.start", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize meeting.start result: {e}"))
}
pub async fn proxy_meeting_stop(
client: &DaemonClient,
meeting_id: &str,
summarize: bool,
) -> Result<String, String> {
let v = client
.call(
"meeting.stop",
serde_json::json!({
"meeting_id": meeting_id,
"summarize": summarize,
}),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize meeting.stop result: {e}"))
}
pub async fn proxy_meeting_list(
client: &DaemonClient,
root: Option<&str>,
) -> Result<String, String> {
let mut params = serde_json::json!({});
if let Some(r) = root {
params["root"] = Value::String(r.to_string());
}
let v = client.call("meeting.list", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize meeting.list result: {e}"))
}
pub async fn proxy_meeting_get(
client: &DaemonClient,
meeting_id: &str,
root: Option<&str>,
) -> Result<String, String> {
let mut params = serde_json::json!({ "meeting_id": meeting_id });
if let Some(r) = root {
params["root"] = Value::String(r.to_string());
}
let v = client.call("meeting.get", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize meeting.get result: {e}"))
}
pub async fn proxy_a2a_start(client: &DaemonClient, params_json: &str) -> Result<String, String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid a2a.start params JSON: {e}"))?;
let v = client.call("a2a.start", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2a.start result: {e}"))
}
pub async fn proxy_a2a_stop(client: &DaemonClient) -> Result<String, String> {
let v = client.call("a2a.stop", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2a.stop result: {e}"))
}
pub async fn proxy_a2a_status(client: &DaemonClient) -> Result<String, String> {
let v = client.call("a2a.status", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2a.status result: {e}"))
}
pub async fn proxy_a2a_send(client: &DaemonClient, params_json: &str) -> Result<String, String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid a2a.send params JSON: {e}"))?;
let v = client.call("a2a.send", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2a.send result: {e}"))
}
pub async fn proxy_a2ui_capabilities(client: &DaemonClient) -> Result<String, String> {
let v = client.call("a2ui.capabilities", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.capabilities result: {e}"))
}
pub async fn proxy_a2ui_apply(
client: &DaemonClient,
envelope_json: &str,
) -> Result<String, String> {
let envelope: Value =
serde_json::from_str(envelope_json).map_err(|e| format!("invalid A2UI envelope: {e}"))?;
let v = client.call("a2ui.apply", envelope).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.apply result: {e}"))
}
pub async fn proxy_a2ui_ingest(
client: &DaemonClient,
payload_json: &str,
) -> Result<String, String> {
let payload: Value =
serde_json::from_str(payload_json).map_err(|e| format!("invalid A2UI payload: {e}"))?;
let v = client.call("a2ui.ingest", payload).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.ingest result: {e}"))
}
pub async fn proxy_a2ui_surfaces(client: &DaemonClient) -> Result<String, String> {
let v = client.call("a2ui.surfaces", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.surfaces result: {e}"))
}
pub async fn proxy_a2ui_get(client: &DaemonClient, surface_id: &str) -> Result<String, String> {
let v = client
.call("a2ui.get", serde_json::json!({ "surface_id": surface_id }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.get result: {e}"))
}
pub async fn proxy_a2ui_reap(client: &DaemonClient) -> Result<String, String> {
let v = client.call("a2ui.reap", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.reap result: {e}"))
}
pub async fn proxy_a2ui_action(client: &DaemonClient, action_json: &str) -> Result<String, String> {
let action: Value =
serde_json::from_str(action_json).map_err(|e| format!("invalid A2UI action JSON: {e}"))?;
let v = client.call("a2ui.action", action).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize a2ui.action result: {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
static ENV_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn runtime_mode_is_daemon_only() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_FFI_MODE").ok();
std::env::remove_var("CAR_FFI_MODE");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Daemon);
assert_eq!(RuntimeMode::resolve_or_err().unwrap(), RuntimeMode::Daemon);
std::env::set_var("CAR_FFI_MODE", "embedded");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Daemon);
assert_eq!(RuntimeMode::resolve_or_err().unwrap(), RuntimeMode::Daemon);
match prev {
Some(v) => std::env::set_var("CAR_FFI_MODE", v),
None => std::env::remove_var("CAR_FFI_MODE"),
}
}
#[test]
fn daemon_url_resolution() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_DAEMON_URL").ok();
std::env::remove_var("CAR_DAEMON_URL");
assert_eq!(daemon_ws_url(), "ws://127.0.0.1:9100");
std::env::set_var("CAR_DAEMON_URL", "ws://other:1234");
assert_eq!(daemon_ws_url(), "ws://other:1234");
match prev {
Some(v) => std::env::set_var("CAR_DAEMON_URL", v),
None => std::env::remove_var("CAR_DAEMON_URL"),
}
}
#[test]
fn probe_dead_port_returns_false() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_DAEMON_URL").ok();
std::env::set_var("CAR_DAEMON_URL", "ws://127.0.0.1:1");
assert!(
!car_proto::daemon::probe_daemon_port(std::time::Duration::from_millis(100)),
"probe of port 1 should fail"
);
match prev {
Some(v) => std::env::set_var("CAR_DAEMON_URL", v),
None => std::env::remove_var("CAR_DAEMON_URL"),
}
}
#[tokio::test]
async fn client_call_against_dead_port_errors_clearly() {
let client = DaemonClient::with_url("ws://127.0.0.1:1");
let r = client
.call("state.get", serde_json::json!({"key": "x"}))
.await;
assert!(r.is_err(), "expected error against dead port");
let msg = r.unwrap_err();
assert!(
msg.contains("connect daemon"),
"expected connect-error wording, got: {msg}"
);
}
#[tokio::test]
async fn client_recovers_from_failure_to_retry_connect() {
let client = DaemonClient::with_url("ws://127.0.0.1:1");
let _ = client
.call("state.get", serde_json::json!({"key": "x"}))
.await;
let r = client
.call("state.get", serde_json::json!({"key": "y"}))
.await;
assert!(r.is_err());
assert!(r.unwrap_err().contains("connect daemon"));
}
}