use futures_util::{SinkExt, StreamExt};
use serde::Deserialize;
use serde_json::Value;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::timeout;
use tokio_tungstenite::{
connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream,
};
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_READ_TIMEOUT_SECS: u64 = 30;
fn read_timeout() -> Duration {
std::env::var("CAR_DAEMON_TIMEOUT")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(DEFAULT_READ_TIMEOUT_SECS))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeMode {
Daemon,
Embedded,
}
impl RuntimeMode {
pub fn from_env() -> Self {
match std::env::var("CAR_FFI_MODE")
.ok()
.as_deref()
.map(str::trim)
.map(str::to_ascii_lowercase)
.as_deref()
{
Some("embedded") => Self::Embedded,
_ => Self::Daemon,
}
}
pub fn resolve_or_err() -> Result<Self, String> {
use ResolutionPolicy::*;
let policy = resolution_policy();
if policy == Embedded {
return Ok(Self::Embedded);
}
let probe_timeout = probe_timeout();
if car_proto::daemon::probe_daemon_port(probe_timeout) {
return Ok(Self::Daemon);
}
let should_spawn = matches!(policy, DaemonOnly | DaemonPrefer);
if should_spawn && car_proto::daemon::try_spawn_daemon().is_ok() {
for _ in 0..8 {
std::thread::sleep(std::time::Duration::from_millis(250));
if car_proto::daemon::probe_daemon_port(probe_timeout) {
return Ok(Self::Daemon);
}
}
}
if policy == DaemonOnly {
return Err(format!(
"CAR_FFI_MODE=daemon-only: daemon at {} unreachable and \
`car-server` could not be spawned. Start the daemon \
manually or set CAR_FFI_MODE=daemon-prefer to allow \
embedded fallback.",
daemon_ws_url()
));
}
warn_embedded_fallback();
Ok(Self::Embedded)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResolutionPolicy {
DaemonOnly,
DaemonPrefer,
DaemonNoSpawn,
Embedded,
}
pub fn resolution_policy() -> ResolutionPolicy {
let raw = std::env::var("CAR_FFI_MODE")
.ok()
.as_deref()
.map(str::trim)
.map(str::to_ascii_lowercase);
let canonical = match raw.as_deref() {
Some("embedded") => Some(ResolutionPolicy::Embedded),
Some("daemon-only") => Some(ResolutionPolicy::DaemonOnly),
Some("daemon-prefer") => Some(ResolutionPolicy::DaemonPrefer),
Some("daemon-no-spawn") => Some(ResolutionPolicy::DaemonNoSpawn),
Some("daemon") => None,
None => None,
Some(other) => {
eprintln!(
"warning: CAR_FFI_MODE={other:?} unrecognized; \
falling back to daemon-prefer (default). Valid values: \
embedded | daemon-only | daemon-prefer | daemon-no-spawn"
);
None
}
};
if let Some(policy) = canonical {
let legacy_req = is_env_flag_set("CAR_FFI_REQUIRE_DAEMON");
let legacy_no = is_env_flag_set("CAR_FFI_NO_AUTOSPAWN");
if legacy_req || legacy_no {
let conflicts: Vec<&str> = [
("CAR_FFI_REQUIRE_DAEMON=1", legacy_req),
("CAR_FFI_NO_AUTOSPAWN=1", legacy_no),
]
.into_iter()
.filter_map(|(name, set)| if set { Some(name) } else { None })
.collect();
panic!(
"CAR_FFI_MODE conflict: matrix value `{:?}` is set alongside \
legacy flag(s) {}. Pick one — the legacy flags are deprecated \
(mapped to daemon-only / daemon-no-spawn). Unset the legacy \
flag(s) and keep CAR_FFI_MODE.",
raw.as_deref().unwrap_or(""),
conflicts.join(", ")
);
}
return policy;
}
if is_env_flag_set("CAR_FFI_REQUIRE_DAEMON") {
warn_legacy_flag("CAR_FFI_REQUIRE_DAEMON=1", "CAR_FFI_MODE=daemon-only");
return ResolutionPolicy::DaemonOnly;
}
if is_env_flag_set("CAR_FFI_NO_AUTOSPAWN") {
warn_legacy_flag("CAR_FFI_NO_AUTOSPAWN=1", "CAR_FFI_MODE=daemon-no-spawn");
return ResolutionPolicy::DaemonNoSpawn;
}
ResolutionPolicy::DaemonPrefer
}
fn warn_legacy_flag(legacy: &str, replacement: &str) {
use std::sync::Mutex;
use std::sync::OnceLock;
static SEEN: OnceLock<Mutex<std::collections::HashSet<String>>> = OnceLock::new();
if is_env_flag_set("CAR_FFI_NO_DEPRECATION_WARNING") {
return;
}
let seen = SEEN.get_or_init(|| Mutex::new(std::collections::HashSet::new()));
let mut guard = match seen.lock() {
Ok(g) => g,
Err(_) => return,
};
if guard.insert(legacy.to_string()) {
eprintln!(
"warning: {legacy} is deprecated; use {replacement} instead. \
Suppress this warning with CAR_FFI_NO_DEPRECATION_WARNING=1."
);
}
}
pub fn daemon_port() -> u16 {
car_proto::daemon::daemon_port()
}
fn probe_timeout() -> std::time::Duration {
std::env::var("CAR_FFI_PROBE_TIMEOUT_MS")
.ok()
.and_then(|s| s.trim().parse::<u64>().ok())
.map(std::time::Duration::from_millis)
.unwrap_or(std::time::Duration::from_millis(100))
}
fn is_env_flag_set(name: &str) -> bool {
std::env::var(name)
.ok()
.as_deref()
.map(str::trim)
.map(|s| s == "1")
.unwrap_or(false)
}
fn warn_embedded_fallback() {
if is_env_flag_set("CAR_FFI_NO_DAEMON_WARNING") {
return;
}
eprintln!(
"warning: CAR daemon at {} unreachable and `car-server` \
could not be spawned. Falling back to an embedded runtime in this \
process — model loads, admission, and memgine are NOT shared with \
other CarRuntime consumers on this host. Start the daemon \
(`car-server`) for the singleton-daemon contract, set \
CAR_FFI_REQUIRE_DAEMON=1 for hard-fail, or \
CAR_FFI_NO_DAEMON_WARNING=1 to silence this warning.",
daemon_ws_url()
);
}
pub fn daemon_ws_url() -> String {
car_proto::daemon::daemon_ws_url()
}
#[derive(Debug, Deserialize)]
struct JsonRpcErrorPayload {
code: i64,
message: String,
}
#[derive(Debug, Deserialize)]
struct JsonRpcEnvelope {
result: Option<Value>,
error: Option<JsonRpcErrorPayload>,
id: Option<Value>,
}
pub struct DaemonClient {
conn: AsyncMutex<Option<WsStream>>,
url: String,
req_id: AtomicU64,
}
impl DaemonClient {
pub fn new() -> Arc<Self> {
Arc::new(Self {
conn: AsyncMutex::new(None),
url: daemon_ws_url(),
req_id: AtomicU64::new(1),
})
}
pub fn with_url(url: impl Into<String>) -> Arc<Self> {
Arc::new(Self {
conn: AsyncMutex::new(None),
url: url.into(),
req_id: AtomicU64::new(1),
})
}
pub async fn call(&self, method: &str, params: Value) -> Result<Value, String> {
let id = self.req_id.fetch_add(1, Ordering::Relaxed);
let mut guard = self.conn.lock().await;
if guard.is_none() {
let connect_fut = connect_async(&self.url);
let (socket, _) = match timeout(CONNECT_TIMEOUT, connect_fut).await {
Ok(Ok(pair)) => pair,
Ok(Err(e)) => return Err(format!("connect daemon at {}: {}", self.url, e)),
Err(_) => {
return Err(format!(
"connect daemon at {} timed out after {}s",
self.url,
CONNECT_TIMEOUT.as_secs()
));
}
};
*guard = Some(socket);
}
let socket = guard.as_mut().expect("just connected");
let rpc = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
let payload = serde_json::to_string(&rpc)
.map_err(|e| format!("serialize {method} request: {e}"))?;
if let Err(e) = socket.send(Message::Text(payload.into())).await {
*guard = None;
return Err(format!("send {method} request: {e}"))
}
let read_to = read_timeout();
loop {
let next = timeout(read_to, socket.next()).await;
let msg = match next {
Ok(Some(Ok(m))) => m,
Ok(Some(Err(e))) => {
*guard = None;
return Err(format!("recv daemon response: {e}"));
}
Ok(None) => {
*guard = None;
return Err(format!(
"daemon stream ended without responding to {method}"
));
}
Err(_) => {
*guard = None;
return Err(format!(
"daemon read timeout on {method} after {}s",
read_to.as_secs()
));
}
};
match msg {
Message::Text(text) => {
let env: JsonRpcEnvelope = match serde_json::from_str(&text) {
Ok(v) => v,
Err(e) => {
*guard = None;
return Err(format!("parse daemon response: {e}"));
}
};
let env_id = env.id.as_ref().and_then(|v| v.as_u64());
if env_id.is_none() {
tracing::debug!(target: "car_ffi_common::proxy", method, "skipping notification frame");
continue;
}
if env_id != Some(id) {
*guard = None;
return Err(format!(
"id mismatch on {method}: expected {id}, got {env_id:?}"
));
}
if let Some(err) = env.error {
return Err(format!("rpc {} {}: {}", err.code, method, err.message));
}
if let Some(result) = env.result {
return Ok(result);
}
*guard = None;
return Err(format!("daemon response missing result for {method}"));
}
Message::Binary(b) => {
tracing::debug!(target: "car_ffi_common::proxy", method, len = b.len(), "skipping binary frame");
continue;
}
Message::Ping(_) | Message::Pong(_) => continue,
Message::Close(_) => {
*guard = None;
return Err(format!("daemon closed connection mid-{method}"));
}
Message::Frame(_) => continue,
}
}
}
}
pub async fn proxy_tools_register(
client: &DaemonClient,
name: &str,
) -> Result<(), String> {
let params = serde_json::json!([{ "name": name }]);
client.call("tools.register", params).await.map(|_| ())
}
pub async fn proxy_tools_register_schema(
client: &DaemonClient,
schema_json: &str,
) -> Result<(), String> {
let schema: Value = serde_json::from_str(schema_json)
.map_err(|e| format!("invalid ToolSchema JSON: {e}"))?;
let params = serde_json::json!([schema]);
client.call("tools.register", params).await.map(|_| ())
}
pub async fn proxy_policy_register(
client: &DaemonClient,
params_json: &str,
) -> Result<(), String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid policy params JSON: {e}"))?;
client.call("policy.register", params).await.map(|_| ())
}
pub async fn proxy_register_agent_basics(client: &DaemonClient) -> Result<(), String> {
client
.call("agents.register_basics", Value::Null)
.await
.map(|_| ())
}
pub async fn proxy_state_set(
client: &DaemonClient,
key: &str,
value_json: &str,
) -> Result<(), String> {
let value: Value = serde_json::from_str(value_json)
.map_err(|e| format!("invalid value JSON for state.set: {e}"))?;
client
.call(
"state.set",
serde_json::json!({ "key": key, "value": value }),
)
.await
.map(|_| ())
}
pub async fn proxy_state_get(client: &DaemonClient, key: &str) -> Result<String, String> {
let v = client
.call("state.get", serde_json::json!({ "key": key }))
.await?;
Ok(serde_json::to_string(&v).unwrap_or_else(|_| "null".to_string()))
}
pub async fn proxy_infer(client: &DaemonClient, request_json: &str) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid GenerateRequest JSON: {e}"))?;
let v = client.call("infer", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize infer result: {e}"))
}
pub async fn proxy_embed(
client: &DaemonClient,
texts_json: &str,
model: Option<&str>,
) -> Result<String, String> {
let texts: Value = serde_json::from_str(texts_json)
.map_err(|e| format!("invalid texts JSON: {e}"))?;
let mut params = serde_json::json!({ "texts": texts });
if let Some(m) = model {
params["model"] = Value::String(m.to_string());
}
let v = client.call("embed", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize embed result: {e}"))
}
pub async fn proxy_classify(
client: &DaemonClient,
text: &str,
labels_json: &str,
model: Option<&str>,
) -> Result<String, String> {
let labels: Value = serde_json::from_str(labels_json)
.map_err(|e| format!("invalid labels JSON: {e}"))?;
let mut params = serde_json::json!({ "text": text, "labels": labels });
if let Some(m) = model {
params["model"] = Value::String(m.to_string());
}
let v = client.call("classify", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize classify result: {e}"))
}
pub async fn proxy_verify(client: &DaemonClient, params_json: &str) -> Result<String, String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid verify params JSON: {e}"))?;
let v = client.call("verify", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize verify result: {e}"))
}
pub async fn proxy_tokenize(
client: &DaemonClient,
model: &str,
text: &str,
) -> Result<String, String> {
let v = client
.call(
"tokenize",
serde_json::json!({ "model": model, "text": text }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize tokenize result: {e}"))
}
pub async fn proxy_detokenize(
client: &DaemonClient,
model: &str,
tokens: &[u32],
) -> Result<String, String> {
let v = client
.call(
"detokenize",
serde_json::json!({ "model": model, "tokens": tokens }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize detokenize result: {e}"))
}
pub async fn proxy_skills_distill(
client: &DaemonClient,
events_json: &str,
) -> Result<String, String> {
let events: Value = serde_json::from_str(events_json)
.map_err(|e| format!("invalid events JSON: {e}"))?;
let v = client
.call("skills.distill", serde_json::json!({ "events": events }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skills.distill result: {e}"))
}
pub async fn proxy_memory_consolidate(client: &DaemonClient) -> Result<String, String> {
let v = client.call("memory.consolidate", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize consolidate result: {e}"))
}
pub async fn proxy_skills_ingest_distilled(
client: &DaemonClient,
skills_json: &str,
) -> Result<u32, String> {
let skills: Value = serde_json::from_str(skills_json)
.map_err(|e| format!("invalid skills JSON: {e}"))?;
let v = client
.call(
"skills.ingest_distilled",
serde_json::json!({ "skills": skills }),
)
.await?;
let n = v
.get("ingested")
.and_then(|x| x.as_u64())
.ok_or_else(|| format!("ingest_distilled returned unexpected shape: {v}"))?;
Ok(n as u32)
}
pub async fn proxy_skill_repair(
client: &DaemonClient,
skill_name: &str,
) -> Result<Option<String>, String> {
let v = client
.call(
"skill.repair",
serde_json::json!({ "skill_name": skill_name }),
)
.await?;
if v.is_null() {
return Ok(None);
}
Ok(v.get("code")
.and_then(|c| c.as_str())
.map(|s| s.to_string()))
}
pub async fn proxy_skills_evolve(
client: &DaemonClient,
events_json: &str,
domain: &str,
) -> Result<String, String> {
let events: Value = serde_json::from_str(events_json)
.map_err(|e| format!("invalid events JSON: {e}"))?;
let v = client
.call(
"skills.evolve",
serde_json::json!({ "events": events, "domain": domain }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skills.evolve result: {e}"))
}
pub async fn proxy_skills_domains_needing_evolution(
client: &DaemonClient,
threshold: Option<f64>,
) -> Result<Vec<String>, String> {
let mut params = serde_json::json!({});
if let Some(t) = threshold {
params["threshold"] = serde_json::json!(t);
}
let v = client
.call("skills.domains_needing_evolution", params)
.await?;
serde_json::from_value(v).map_err(|e| format!("parse domains: {e}"))
}
pub async fn proxy_rerank(
client: &DaemonClient,
request_json: &str,
) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid RerankRequest JSON: {e}"))?;
let v = client.call("rerank", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize rerank result: {e}"))
}
pub async fn proxy_transcribe(
client: &DaemonClient,
request_json: &str,
) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid TranscribeRequest JSON: {e}"))?;
let v = client.call("transcribe", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize transcribe result: {e}"))
}
pub async fn proxy_synthesize(
client: &DaemonClient,
request_json: &str,
) -> Result<String, String> {
let req: Value = serde_json::from_str(request_json)
.map_err(|e| format!("invalid SynthesizeRequest JSON: {e}"))?;
let v = client.call("synthesize", req).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize synthesize result: {e}"))
}
pub async fn proxy_speech_prepare(client: &DaemonClient) -> Result<String, String> {
let v = client.call("speech.prepare", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize speech.prepare result: {e}"))
}
pub async fn proxy_models_route(
client: &DaemonClient,
prompt: &str,
) -> Result<String, String> {
let v = client
.call("models.route", serde_json::json!({ "prompt": prompt }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.route result: {e}"))
}
pub async fn proxy_models_stats(client: &DaemonClient) -> Result<String, String> {
let v = client.call("models.stats", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.stats result: {e}"))
}
pub async fn proxy_events_count(client: &DaemonClient) -> Result<u32, String> {
let v = client.call("events.count", Value::Null).await?;
v.as_u64()
.map(|n| n as u32)
.ok_or_else(|| format!("events.count returned non-u64: {v}"))
}
pub async fn proxy_replan_set_config(
client: &DaemonClient,
max_replans: u32,
delay_ms: u64,
verify_before_execute: bool,
) -> Result<(), String> {
client
.call(
"replan.set_config",
serde_json::json!({
"max_replans": max_replans,
"delay_ms": delay_ms,
"verify_before_execute": verify_before_execute,
}),
)
.await
.map(|_| ())
}
pub async fn proxy_memory_add_fact(
client: &DaemonClient,
subject: &str,
body: &str,
kind: Option<&str>,
confidence: Option<f64>,
) -> Result<u64, String> {
let mut params = serde_json::json!({
"subject": subject,
"body": body,
});
if let Some(k) = kind {
params["kind"] = Value::String(k.to_string());
}
if let Some(c) = confidence {
params["confidence"] = serde_json::json!(c);
}
let v = client.call("memory.add_fact", params).await?;
v.as_u64()
.ok_or_else(|| format!("memory.add_fact returned non-u64: {v}"))
}
pub async fn proxy_memory_query(
client: &DaemonClient,
query: &str,
k: Option<u32>,
) -> Result<String, String> {
let mut params = serde_json::json!({ "query": query });
if let Some(k) = k {
params["k"] = serde_json::json!(k);
}
let v = client.call("memory.query", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize memory.query result: {e}"))
}
pub async fn proxy_memory_fact_count(client: &DaemonClient) -> Result<u32, String> {
let v = client.call("memory.fact_count", Value::Null).await?;
v.as_u64()
.map(|n| n as u32)
.ok_or_else(|| format!("memory.fact_count returned non-u64: {v}"))
}
pub async fn proxy_memory_build_context(
client: &DaemonClient,
query: &str,
) -> Result<String, String> {
let v = client
.call("memory.build_context", serde_json::json!({ "query": query }))
.await?;
Ok(v.as_str().unwrap_or("").to_string())
}
pub async fn proxy_skill_ingest(
client: &DaemonClient,
params_json: &str,
) -> Result<String, String> {
let params: Value = serde_json::from_str(params_json)
.map_err(|e| format!("invalid skill.ingest params JSON: {e}"))?;
let v = client.call("skill.ingest", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skill.ingest result: {e}"))
}
pub async fn proxy_skill_find(
client: &DaemonClient,
persona: &str,
url: &str,
task: &str,
max_results: Option<u32>,
) -> Result<String, String> {
let mut params = serde_json::json!({
"persona": persona,
"url": url,
"task": task,
});
if let Some(n) = max_results {
params["max_results"] = serde_json::json!(n);
}
let v = client.call("skill.find", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skill.find result: {e}"))
}
pub async fn proxy_skill_report(
client: &DaemonClient,
skill_name: &str,
outcome: &str,
) -> Result<String, String> {
let v = client
.call(
"skill.report",
serde_json::json!({ "skill_name": skill_name, "outcome": outcome }),
)
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skill.report result: {e}"))
}
pub async fn proxy_skills_list(
client: &DaemonClient,
params_json: Option<&str>,
) -> Result<String, String> {
let params = match params_json {
Some(s) => serde_json::from_str(s)
.map_err(|e| format!("invalid skills.list params JSON: {e}"))?,
None => Value::Null,
};
let v = client.call("skills.list", params).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize skills.list result: {e}"))
}
pub async fn proxy_models_list(client: &DaemonClient) -> Result<String, String> {
let v = client.call("models.list", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.list result: {e}"))
}
pub async fn proxy_models_list_unified(client: &DaemonClient) -> Result<String, String> {
let v = client.call("models.list_unified", Value::Null).await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.list_unified result: {e}"))
}
pub async fn proxy_models_pull(client: &DaemonClient, name: &str) -> Result<String, String> {
let v = client
.call("models.pull", serde_json::json!({ "name": name }))
.await?;
serde_json::to_string(&v).map_err(|e| format!("serialize models.pull result: {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
static ENV_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn mode_from_env_defaults_to_daemon() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_FFI_MODE").ok();
std::env::remove_var("CAR_FFI_MODE");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Daemon);
std::env::set_var("CAR_FFI_MODE", "embedded");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Embedded);
std::env::set_var("CAR_FFI_MODE", "EMBEDDED");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Embedded);
std::env::set_var("CAR_FFI_MODE", " embedded ");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Embedded);
std::env::set_var("CAR_FFI_MODE", "daemon");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Daemon);
std::env::set_var("CAR_FFI_MODE", "garbage");
assert_eq!(RuntimeMode::from_env(), RuntimeMode::Daemon);
match prev {
Some(v) => std::env::set_var("CAR_FFI_MODE", v),
None => std::env::remove_var("CAR_FFI_MODE"),
}
}
#[test]
fn daemon_url_resolution() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_DAEMON_URL").ok();
std::env::remove_var("CAR_DAEMON_URL");
assert_eq!(daemon_ws_url(), "ws://127.0.0.1:9100");
std::env::set_var("CAR_DAEMON_URL", "ws://other:1234");
assert_eq!(daemon_ws_url(), "ws://other:1234");
match prev {
Some(v) => std::env::set_var("CAR_DAEMON_URL", v),
None => std::env::remove_var("CAR_DAEMON_URL"),
}
}
#[test]
fn probe_dead_port_returns_false() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_DAEMON_URL").ok();
std::env::set_var("CAR_DAEMON_URL", "ws://127.0.0.1:1");
assert!(
!car_proto::daemon::probe_daemon_port(std::time::Duration::from_millis(100)),
"probe of port 1 should fail"
);
match prev {
Some(v) => std::env::set_var("CAR_DAEMON_URL", v),
None => std::env::remove_var("CAR_DAEMON_URL"),
}
}
#[test]
fn resolution_policy_recognizes_canonical_modes() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev_mode = std::env::var("CAR_FFI_MODE").ok();
let prev_legacy_req = std::env::var("CAR_FFI_REQUIRE_DAEMON").ok();
let prev_legacy_no = std::env::var("CAR_FFI_NO_AUTOSPAWN").ok();
std::env::remove_var("CAR_FFI_REQUIRE_DAEMON");
std::env::remove_var("CAR_FFI_NO_AUTOSPAWN");
std::env::set_var("CAR_FFI_NO_DEPRECATION_WARNING", "1");
std::env::set_var("CAR_FFI_MODE", "embedded");
assert_eq!(resolution_policy(), ResolutionPolicy::Embedded);
std::env::set_var("CAR_FFI_MODE", "daemon-only");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonOnly);
std::env::set_var("CAR_FFI_MODE", "daemon-prefer");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonPrefer);
std::env::set_var("CAR_FFI_MODE", "daemon-no-spawn");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonNoSpawn);
std::env::set_var("CAR_FFI_MODE", " DAEMON-ONLY ");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonOnly);
std::env::remove_var("CAR_FFI_MODE");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonPrefer);
std::env::remove_var("CAR_FFI_NO_DEPRECATION_WARNING");
match prev_mode {
Some(v) => std::env::set_var("CAR_FFI_MODE", v),
None => std::env::remove_var("CAR_FFI_MODE"),
}
match prev_legacy_req {
Some(v) => std::env::set_var("CAR_FFI_REQUIRE_DAEMON", v),
None => std::env::remove_var("CAR_FFI_REQUIRE_DAEMON"),
}
match prev_legacy_no {
Some(v) => std::env::set_var("CAR_FFI_NO_AUTOSPAWN", v),
None => std::env::remove_var("CAR_FFI_NO_AUTOSPAWN"),
}
}
#[test]
fn legacy_flags_map_to_policy() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev_mode = std::env::var("CAR_FFI_MODE").ok();
let prev_req = std::env::var("CAR_FFI_REQUIRE_DAEMON").ok();
let prev_no = std::env::var("CAR_FFI_NO_AUTOSPAWN").ok();
std::env::remove_var("CAR_FFI_MODE");
std::env::remove_var("CAR_FFI_NO_AUTOSPAWN");
std::env::set_var("CAR_FFI_NO_DEPRECATION_WARNING", "1");
std::env::set_var("CAR_FFI_REQUIRE_DAEMON", "1");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonOnly);
std::env::remove_var("CAR_FFI_REQUIRE_DAEMON");
std::env::set_var("CAR_FFI_NO_AUTOSPAWN", "1");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonNoSpawn);
std::env::remove_var("CAR_FFI_NO_AUTOSPAWN");
std::env::set_var("CAR_FFI_MODE", "daemon");
assert_eq!(resolution_policy(), ResolutionPolicy::DaemonPrefer);
std::env::remove_var("CAR_FFI_NO_DEPRECATION_WARNING");
std::env::remove_var("CAR_FFI_MODE");
std::env::remove_var("CAR_FFI_REQUIRE_DAEMON");
match prev_mode {
Some(v) => std::env::set_var("CAR_FFI_MODE", v),
None => {}
}
match prev_req {
Some(v) => std::env::set_var("CAR_FFI_REQUIRE_DAEMON", v),
None => {}
}
match prev_no {
Some(v) => std::env::set_var("CAR_FFI_NO_AUTOSPAWN", v),
None => {}
}
}
#[test]
fn conflict_between_canonical_and_legacy_panics() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev_mode = std::env::var("CAR_FFI_MODE").ok();
let prev_req = std::env::var("CAR_FFI_REQUIRE_DAEMON").ok();
std::env::set_var("CAR_FFI_MODE", "daemon-prefer");
std::env::set_var("CAR_FFI_REQUIRE_DAEMON", "1");
let result = std::panic::catch_unwind(resolution_policy);
assert!(result.is_err(), "expected panic on conflict");
std::env::remove_var("CAR_FFI_MODE");
std::env::remove_var("CAR_FFI_REQUIRE_DAEMON");
match prev_mode {
Some(v) => std::env::set_var("CAR_FFI_MODE", v),
None => {}
}
match prev_req {
Some(v) => std::env::set_var("CAR_FFI_REQUIRE_DAEMON", v),
None => {}
}
}
#[test]
fn resolve_embedded_short_circuits_probe() {
let _guard = ENV_TEST_LOCK.lock().unwrap();
let prev = std::env::var("CAR_FFI_MODE").ok();
std::env::set_var("CAR_FFI_MODE", "embedded");
let started = std::time::Instant::now();
assert_eq!(
RuntimeMode::resolve_or_err().unwrap(),
RuntimeMode::Embedded
);
assert!(
started.elapsed() < std::time::Duration::from_millis(50),
"embedded short-circuit shouldn't pay the probe cost"
);
match prev {
Some(v) => std::env::set_var("CAR_FFI_MODE", v),
None => std::env::remove_var("CAR_FFI_MODE"),
}
}
#[tokio::test]
async fn client_call_against_dead_port_errors_clearly() {
let client = DaemonClient::with_url("ws://127.0.0.1:1");
let r = client.call("state.get", serde_json::json!({"key": "x"})).await;
assert!(r.is_err(), "expected error against dead port");
let msg = r.unwrap_err();
assert!(
msg.contains("connect daemon"),
"expected connect-error wording, got: {msg}"
);
}
#[tokio::test]
async fn client_recovers_from_failure_to_retry_connect() {
let client = DaemonClient::with_url("ws://127.0.0.1:1");
let _ = client
.call("state.get", serde_json::json!({"key": "x"}))
.await;
let r = client
.call("state.get", serde_json::json!({"key": "y"}))
.await;
assert!(r.is_err());
assert!(r.unwrap_err().contains("connect daemon"));
}
}