use running_process::broker::client::{
connect_local_socket, connect_to_backend, BackendConnectionRoute, ConnectBackendRequest,
};
use super::error::IpcError;
use super::{connect, running_process_disabled, ClientConnection};
pub const RUNNING_PROCESS_FAKE_BACKEND_ENV: &str = "RUNNING_PROCESS_FAKE_BACKEND";
pub const ZCCACHE_BROKER_CONNECT_ENV: &str = "ZCCACHE_BROKER_CONNECT";
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DaemonConnectRoute {
Direct,
Broker {
route: BackendConnectionRoute,
endpoint: String,
},
}
pub async fn connect_daemon(endpoint: &str) -> Result<ClientConnection, IpcError> {
connect_daemon_with_route(endpoint)
.await
.map(|(conn, _route)| conn)
}
pub async fn connect_daemon_with_route(
endpoint: &str,
) -> Result<(ClientConnection, DaemonConnectRoute), IpcError> {
if running_process_disabled() || !broker_lane_requested() {
let conn = connect(endpoint).await?;
return Ok((conn, DaemonConnectRoute::Direct));
}
if let Some((resolved, route)) = resolve_backend_endpoint().await {
match connect(&resolved).await {
Ok(conn) => {
return Ok((
conn,
DaemonConnectRoute::Broker {
route,
endpoint: resolved,
},
));
}
Err(err) => {
tracing::debug!(
resolved_endpoint = %resolved,
error = %err,
"broker-resolved endpoint unreachable; falling back to direct connect"
);
}
}
}
let conn = connect(endpoint).await?;
Ok((conn, DaemonConnectRoute::Direct))
}
fn broker_lane_requested() -> bool {
if std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV).is_some_and(|value| !value.is_empty()) {
return true;
}
std::env::var(ZCCACHE_BROKER_CONNECT_ENV).is_ok_and(|value| value == "1")
}
async fn resolve_backend_endpoint() -> Option<(String, BackendConnectionRoute)> {
match tokio::task::spawn_blocking(resolve_backend_endpoint_blocking).await {
Ok(resolved) => resolved,
Err(err) => {
tracing::debug!(error = %err, "broker negotiation task failed; using direct connect");
None
}
}
}
fn resolve_backend_endpoint_blocking() -> Option<(String, BackendConnectionRoute)> {
if let Some(seam_endpoint) = fake_backend_endpoint_from_env() {
return match connect_local_socket(&seam_endpoint) {
Ok(stream) => {
drop(stream);
Some((
to_zccache_endpoint(&seam_endpoint),
BackendConnectionRoute::HelloSkip,
))
}
Err(err) => {
tracing::warn!(
endpoint = %seam_endpoint,
error = %err,
"RUNNING_PROCESS_FAKE_BACKEND endpoint unreachable; using direct connect"
);
None
}
};
}
let broker_endpoint = default_broker_endpoint()?;
let request = ConnectBackendRequest::new(
&broker_endpoint,
"zccache",
crate::core::VERSION,
crate::core::VERSION,
);
match connect_to_backend(request) {
Ok(connection) => Some((to_zccache_endpoint(&connection.endpoint), connection.route)),
Err(err) => {
tracing::debug!(
error = %err,
"running-process broker negotiation failed; using direct connect"
);
None
}
}
}
fn fake_backend_endpoint_from_env() -> Option<String> {
let value = std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV)?;
let value = value.to_string_lossy();
if value.is_empty() {
return None;
}
Some(value.into_owned())
}
fn default_broker_endpoint() -> Option<String> {
let sid_hash = running_process::broker::lifecycle::user_sid_hash().ok()?;
let pipe = running_process::broker::lifecycle::names::shared_broker_pipe(&sid_hash).ok()?;
#[cfg(windows)]
{
pipe.windows
}
#[cfg(unix)]
{
pipe.unix.map(|path| path.to_string_lossy().into_owned())
}
}
fn to_zccache_endpoint(endpoint: &str) -> String {
#[cfg(windows)]
{
if endpoint.starts_with(r"\\.\pipe\") {
endpoint.to_string()
} else {
format!(r"\\.\pipe\{endpoint}")
}
}
#[cfg(unix)]
{
endpoint.to_string()
}
}
#[must_use]
pub fn to_running_process_endpoint(endpoint: &str) -> String {
#[cfg(windows)]
{
endpoint
.strip_prefix(r"\\.\pipe\")
.unwrap_or(endpoint)
.to_string()
}
#[cfg(unix)]
{
endpoint.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ipc::test_env::EnvVarGuard;
use crate::ipc::{unique_test_endpoint, IpcListener, RUNNING_PROCESS_DISABLE_ENV};
use crate::protocol::{Request, Response};
fn spawn_ping_server(listener: IpcListener) -> tokio::task::JoinHandle<usize> {
spawn_counting_ping_server(listener, 1)
}
async fn ping_roundtrip(conn: &mut super::ClientConnection) {
conn.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = conn.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
}
#[tokio::test]
async fn default_route_is_direct() {
let _env = EnvVarGuard::unset_all(&[
RUNNING_PROCESS_DISABLE_ENV,
RUNNING_PROCESS_FAKE_BACKEND_ENV,
ZCCACHE_BROKER_CONNECT_ENV,
]);
let endpoint = unique_test_endpoint();
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
assert_eq!(route, DaemonConnectRoute::Direct);
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
#[tokio::test]
async fn fake_backend_seam_routes_through_connect_to_backend() {
let endpoint = unique_test_endpoint();
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, None),
(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
Some(to_running_process_endpoint(&endpoint)),
),
(ZCCACHE_BROKER_CONNECT_ENV, None),
]);
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
match route {
DaemonConnectRoute::Broker {
route: BackendConnectionRoute::HelloSkip,
endpoint: resolved,
} => assert_eq!(resolved, endpoint),
other => panic!("expected broker HelloSkip route, got {other:?}"),
}
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
#[tokio::test]
async fn disable_env_bypasses_broker_lane_entirely() {
let endpoint = unique_test_endpoint();
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, Some("1".to_string())),
(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
Some(to_running_process_endpoint(&unique_test_endpoint())),
),
(ZCCACHE_BROKER_CONNECT_ENV, Some("1".to_string())),
]);
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
assert_eq!(route, DaemonConnectRoute::Direct);
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
#[tokio::test]
async fn broker_failure_falls_back_to_direct_connect() {
let endpoint = unique_test_endpoint();
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, None),
(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
Some(to_running_process_endpoint(&unique_test_endpoint())),
),
(ZCCACHE_BROKER_CONNECT_ENV, None),
]);
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
assert_eq!(route, DaemonConnectRoute::Direct);
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
fn percentile_ms(sorted: &[f64], pct: f64) -> f64 {
let idx = ((sorted.len() as f64 * pct) as usize).min(sorted.len() - 1);
sorted[idx]
}
fn spawn_counting_ping_server(
mut listener: IpcListener,
pings: usize,
) -> tokio::task::JoinHandle<usize> {
tokio::spawn(async move {
let mut answered = 0;
while answered < pings {
let Ok(mut conn) = listener.accept().await else {
break;
};
match conn.recv::<Request>().await {
Ok(Some(Request::Ping)) => {
conn.send(&Response::Pong).await.unwrap();
answered += 1;
}
Ok(None) | Err(_) => continue,
Ok(Some(other)) => panic!("unexpected request: {other:?}"),
}
}
answered
})
}
async fn measure_connect_roundtrip_ms(samples: usize, expect_broker: bool) -> Vec<f64> {
let endpoint = unique_test_endpoint();
if expect_broker {
std::env::set_var(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
to_running_process_endpoint(&endpoint),
);
}
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_counting_ping_server(listener, samples);
let mut samples_ms = Vec::with_capacity(samples);
for _ in 0..samples {
let start = std::time::Instant::now();
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
ping_roundtrip(&mut conn).await;
samples_ms.push(start.elapsed().as_secs_f64() * 1000.0);
drop(conn);
match (&route, expect_broker) {
(DaemonConnectRoute::Broker { .. }, true) => {}
(DaemonConnectRoute::Direct, false) => {}
(other, _) => panic!("unexpected route {other:?} (expect_broker={expect_broker})"),
}
}
assert_eq!(server.await.unwrap(), samples);
samples_ms
}
#[tokio::test]
async fn broker_lane_connect_latency_p50_p99() {
const WARMUP: usize = 5;
const SAMPLES: usize = 100;
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, None),
(RUNNING_PROCESS_FAKE_BACKEND_ENV, None),
(ZCCACHE_BROKER_CONNECT_ENV, None),
]);
measure_connect_roundtrip_ms(WARMUP, false).await;
let mut direct = measure_connect_roundtrip_ms(SAMPLES, false).await;
measure_connect_roundtrip_ms(WARMUP, true).await;
let mut broker = measure_connect_roundtrip_ms(SAMPLES, true).await;
std::env::remove_var(RUNNING_PROCESS_FAKE_BACKEND_ENV);
direct.sort_by(|a, b| a.partial_cmp(b).unwrap());
broker.sort_by(|a, b| a.partial_cmp(b).unwrap());
let report = |label: &str, sorted: &[f64]| {
let p50 = percentile_ms(sorted, 0.50);
let p99 = percentile_ms(sorted, 0.99);
println!(
" {label:<28} p50={p50:>8.3}ms p99={p99:>8.3}ms min={:>8.3}ms max={:>8.3}ms (n={})",
sorted[0],
sorted[sorted.len() - 1],
sorted.len()
);
(p50, p99)
};
println!("broker lane connect+ping latency (running-process#383 evidence):");
let (_direct_p50, direct_p99) = report("direct lane", &direct);
let (_broker_p50, broker_p99) = report("broker lane (seam)", &broker);
assert!(
direct_p99 < 1000.0,
"direct lane p99 {direct_p99:.3}ms exceeded 1000ms budget"
);
assert!(
broker_p99 < 1000.0,
"broker lane p99 {broker_p99:.3}ms exceeded 1000ms budget"
);
}
#[test]
fn endpoint_translation_round_trips() {
let endpoint = unique_test_endpoint();
assert_eq!(
to_zccache_endpoint(&to_running_process_endpoint(&endpoint)),
endpoint
);
#[cfg(windows)]
{
assert_eq!(to_zccache_endpoint("name"), r"\\.\pipe\name");
assert_eq!(to_running_process_endpoint(r"\\.\pipe\name"), "name");
}
}
}