use running_process::broker::adopt::{AdoptError, AsyncBrokerSession, OwnedConnectRequest};
use running_process::broker::client::{connect_local_socket, BackendConnectionRoute, RefusalKind};
use super::error::IpcError;
#[cfg(unix)]
use super::IpcConnection;
use super::{connect, running_process_disabled, ClientConnection};
pub const RUNNING_PROCESS_FAKE_BACKEND_ENV: &str = "RUNNING_PROCESS_FAKE_BACKEND";
pub const ZCCACHE_BROKER_CONNECT_ENV: &str = "ZCCACHE_BROKER_CONNECT";
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DaemonConnectRoute {
Direct,
Broker {
route: BackendConnectionRoute,
endpoint: String,
},
}
pub async fn connect_daemon(endpoint: &str) -> Result<ClientConnection, IpcError> {
connect_daemon_with_route(endpoint)
.await
.map(|(conn, _route)| conn)
}
pub async fn connect_daemon_with_route(
endpoint: &str,
) -> Result<(ClientConnection, DaemonConnectRoute), IpcError> {
if running_process_disabled() || !broker_lane_requested() {
let conn = connect(endpoint).await?;
return Ok((conn, DaemonConnectRoute::Direct));
}
if let Some(seam_endpoint) = fake_backend_endpoint_from_env() {
if let Some((resolved, route)) = resolve_fake_backend_seam_async(seam_endpoint).await {
if let Some(result) = redial_resolved(route, resolved).await {
return Ok(result);
}
}
let conn = connect(endpoint).await?;
return Ok((conn, DaemonConnectRoute::Direct));
}
if let Some(result) = connect_via_broker().await {
return Ok(result);
}
let conn = connect(endpoint).await?;
Ok((conn, DaemonConnectRoute::Direct))
}
fn broker_lane_requested() -> bool {
if std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV).is_some_and(|value| !value.is_empty()) {
return true;
}
std::env::var(ZCCACHE_BROKER_CONNECT_ENV).is_ok_and(|value| value == "1")
}
async fn connect_via_broker() -> Option<(ClientConnection, DaemonConnectRoute)> {
let broker_endpoint = default_broker_endpoint()?;
let request = OwnedConnectRequest::new(
broker_endpoint,
"zccache",
crate::core::VERSION,
crate::core::VERSION,
);
let session = match AsyncBrokerSession::adopt(request).await {
Ok(session) => session,
Err(AdoptError::BrokerDisabled) => return None,
Err(err) => {
log_adopt_failure(&err);
return None;
}
};
let route = session.route();
let resolved = to_zccache_endpoint(session.endpoint());
adopt_session_connection(session, route, resolved).await
}
#[cfg(unix)]
async fn adopt_session_connection(
session: AsyncBrokerSession,
route: BackendConnectionRoute,
resolved: String,
) -> Option<(ClientConnection, DaemonConnectRoute)> {
match session.into_backend_io() {
Ok(io) => match unix_connection_from_backend_io(io) {
Ok(conn) => Some((
conn,
DaemonConnectRoute::Broker {
route,
endpoint: resolved,
},
)),
Err(err) => {
tracing::debug!(
error = %err,
"adopting broker backend socket failed; re-dialing resolved endpoint"
);
redial_resolved(route, resolved).await
}
},
Err(err) => {
tracing::debug!(
error = %err,
"into_backend_io declined the live socket; re-dialing resolved endpoint"
);
redial_resolved(route, resolved).await
}
}
}
#[cfg(windows)]
async fn adopt_session_connection(
session: AsyncBrokerSession,
route: BackendConnectionRoute,
resolved: String,
) -> Option<(ClientConnection, DaemonConnectRoute)> {
drop(session);
redial_resolved(route, resolved).await
}
#[cfg(unix)]
fn unix_connection_from_backend_io(
io: running_process::broker::adopt::OwnedBackendIo,
) -> Result<IpcConnection, IpcError> {
let fd = io.into_owned_fd();
let std_stream = std::os::unix::net::UnixStream::from(fd);
std_stream.set_nonblocking(true)?;
let stream = tokio::net::UnixStream::from_std(std_stream)?;
Ok(IpcConnection::from_unix_stream(stream))
}
async fn redial_resolved(
route: BackendConnectionRoute,
resolved: String,
) -> Option<(ClientConnection, DaemonConnectRoute)> {
match connect(&resolved).await {
Ok(conn) => Some((
conn,
DaemonConnectRoute::Broker {
route,
endpoint: resolved,
},
)),
Err(err) => {
tracing::debug!(
resolved_endpoint = %resolved,
error = %err,
"broker-resolved endpoint unreachable; falling back to direct connect"
);
None
}
}
}
async fn resolve_fake_backend_seam_async(
seam_endpoint: String,
) -> Option<(String, BackendConnectionRoute)> {
tokio::task::spawn_blocking(move || resolve_fake_backend_seam(&seam_endpoint))
.await
.unwrap_or_else(|err| {
tracing::debug!(error = %err, "fake-backend seam task failed; using direct connect");
None
})
}
fn resolve_fake_backend_seam(seam_endpoint: &str) -> Option<(String, BackendConnectionRoute)> {
match connect_local_socket(seam_endpoint) {
Ok(stream) => {
drop(stream);
Some((
to_zccache_endpoint(seam_endpoint),
BackendConnectionRoute::HelloSkip,
))
}
Err(err) => {
tracing::warn!(
endpoint = %seam_endpoint,
error = %err,
"RUNNING_PROCESS_FAKE_BACKEND endpoint unreachable; using direct connect"
);
None
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BrokerRefusal {
VersionUnsupported,
VersionBlocked,
ServiceUnknown,
RateLimited,
ShuttingDown,
Other,
}
impl BrokerRefusal {
fn from_kind(kind: RefusalKind) -> Self {
match kind {
RefusalKind::VersionUnsupported => Self::VersionUnsupported,
RefusalKind::VersionBlocked => Self::VersionBlocked,
RefusalKind::ServiceUnknown => Self::ServiceUnknown,
RefusalKind::RateLimited => Self::RateLimited,
RefusalKind::ShuttingDown => Self::ShuttingDown,
RefusalKind::Other(_) => Self::Other,
}
}
}
#[must_use]
pub fn classify_adopt_error(err: &AdoptError) -> Option<BrokerRefusal> {
match err {
AdoptError::Connect(connect_err) => {
connect_err.refusal_kind().map(BrokerRefusal::from_kind)
}
_ => None,
}
}
fn log_adopt_failure(err: &AdoptError) {
match classify_adopt_error(err) {
Some(refusal) => tracing::debug!(
?refusal,
error = %err,
"running-process broker refused negotiation; using direct connect"
),
None => tracing::debug!(
error = %err,
"running-process broker negotiation failed (unreachable/dial error); using direct connect"
),
}
}
fn fake_backend_endpoint_from_env() -> Option<String> {
let value = std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV)?;
let value = value.to_string_lossy();
if value.is_empty() {
return None;
}
Some(value.into_owned())
}
fn default_broker_endpoint() -> Option<String> {
let sid_hash = running_process::broker::lifecycle::user_sid_hash().ok()?;
let pipe = running_process::broker::lifecycle::names::shared_broker_pipe(&sid_hash).ok()?;
#[cfg(windows)]
{
pipe.windows
}
#[cfg(unix)]
{
pipe.unix.map(|path| path.to_string_lossy().into_owned())
}
}
fn to_zccache_endpoint(endpoint: &str) -> String {
#[cfg(windows)]
{
if endpoint.starts_with(r"\\.\pipe\") {
endpoint.to_string()
} else {
format!(r"\\.\pipe\{endpoint}")
}
}
#[cfg(unix)]
{
endpoint.to_string()
}
}
#[must_use]
pub fn to_running_process_endpoint(endpoint: &str) -> String {
#[cfg(windows)]
{
endpoint
.strip_prefix(r"\\.\pipe\")
.unwrap_or(endpoint)
.to_string()
}
#[cfg(unix)]
{
endpoint.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ipc::test_env::EnvVarGuard;
use crate::ipc::{unique_test_endpoint, IpcListener, RUNNING_PROCESS_DISABLE_ENV};
use crate::protocol::{Request, Response};
fn spawn_ping_server(listener: IpcListener) -> tokio::task::JoinHandle<usize> {
spawn_counting_ping_server(listener, 1)
}
async fn ping_roundtrip(conn: &mut super::ClientConnection) {
conn.send(&Request::Ping).await.unwrap();
let resp: Option<Response> = conn.recv().await.unwrap();
assert_eq!(resp, Some(Response::Pong));
}
#[tokio::test]
async fn default_route_is_direct() {
let _env = EnvVarGuard::unset_all(&[
RUNNING_PROCESS_DISABLE_ENV,
RUNNING_PROCESS_FAKE_BACKEND_ENV,
ZCCACHE_BROKER_CONNECT_ENV,
]);
let endpoint = unique_test_endpoint();
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
assert_eq!(route, DaemonConnectRoute::Direct);
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
#[tokio::test]
async fn fake_backend_seam_routes_through_connect_to_backend() {
let endpoint = unique_test_endpoint();
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, None),
(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
Some(to_running_process_endpoint(&endpoint)),
),
(ZCCACHE_BROKER_CONNECT_ENV, None),
]);
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
match route {
DaemonConnectRoute::Broker {
route: BackendConnectionRoute::HelloSkip,
endpoint: resolved,
} => assert_eq!(resolved, endpoint),
other => panic!("expected broker HelloSkip route, got {other:?}"),
}
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
#[tokio::test]
async fn disable_env_bypasses_broker_lane_entirely() {
let endpoint = unique_test_endpoint();
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, Some("1".to_string())),
(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
Some(to_running_process_endpoint(&unique_test_endpoint())),
),
(ZCCACHE_BROKER_CONNECT_ENV, Some("1".to_string())),
]);
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
assert_eq!(route, DaemonConnectRoute::Direct);
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
#[tokio::test]
async fn broker_failure_falls_back_to_direct_connect() {
let endpoint = unique_test_endpoint();
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, None),
(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
Some(to_running_process_endpoint(&unique_test_endpoint())),
),
(ZCCACHE_BROKER_CONNECT_ENV, None),
]);
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_ping_server(listener);
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
assert_eq!(route, DaemonConnectRoute::Direct);
ping_roundtrip(&mut conn).await;
assert_eq!(server.await.unwrap(), 1);
}
fn percentile_ms(sorted: &[f64], pct: f64) -> f64 {
let idx = ((sorted.len() as f64 * pct) as usize).min(sorted.len() - 1);
sorted[idx]
}
fn spawn_counting_ping_server(
mut listener: IpcListener,
pings: usize,
) -> tokio::task::JoinHandle<usize> {
tokio::spawn(async move {
let mut answered = 0;
while answered < pings {
let Ok(mut conn) = listener.accept().await else {
break;
};
match conn.recv::<Request>().await {
Ok(Some(Request::Ping)) => {
conn.send(&Response::Pong).await.unwrap();
answered += 1;
}
Ok(None) | Err(_) => continue,
Ok(Some(other)) => panic!("unexpected request: {other:?}"),
}
}
answered
})
}
async fn measure_connect_roundtrip_ms(samples: usize, expect_broker: bool) -> Vec<f64> {
let endpoint = unique_test_endpoint();
if expect_broker {
std::env::set_var(
RUNNING_PROCESS_FAKE_BACKEND_ENV,
to_running_process_endpoint(&endpoint),
);
}
let listener = IpcListener::bind(&endpoint).unwrap();
let server = spawn_counting_ping_server(listener, samples);
let mut samples_ms = Vec::with_capacity(samples);
for _ in 0..samples {
let start = std::time::Instant::now();
let (mut conn, route) = connect_daemon_with_route(&endpoint).await.unwrap();
ping_roundtrip(&mut conn).await;
samples_ms.push(start.elapsed().as_secs_f64() * 1000.0);
drop(conn);
match (&route, expect_broker) {
(DaemonConnectRoute::Broker { .. }, true) => {}
(DaemonConnectRoute::Direct, false) => {}
(other, _) => panic!("unexpected route {other:?} (expect_broker={expect_broker})"),
}
}
assert_eq!(server.await.unwrap(), samples);
samples_ms
}
#[tokio::test]
async fn broker_lane_connect_latency_p50_p99() {
const WARMUP: usize = 5;
const SAMPLES: usize = 100;
let _env = EnvVarGuard::set_all(&[
(RUNNING_PROCESS_DISABLE_ENV, None),
(RUNNING_PROCESS_FAKE_BACKEND_ENV, None),
(ZCCACHE_BROKER_CONNECT_ENV, None),
]);
measure_connect_roundtrip_ms(WARMUP, false).await;
let mut direct = measure_connect_roundtrip_ms(SAMPLES, false).await;
measure_connect_roundtrip_ms(WARMUP, true).await;
let mut broker = measure_connect_roundtrip_ms(SAMPLES, true).await;
std::env::remove_var(RUNNING_PROCESS_FAKE_BACKEND_ENV);
direct.sort_by(|a, b| a.partial_cmp(b).unwrap());
broker.sort_by(|a, b| a.partial_cmp(b).unwrap());
let report = |label: &str, sorted: &[f64]| {
let p50 = percentile_ms(sorted, 0.50);
let p99 = percentile_ms(sorted, 0.99);
println!(
" {label:<28} p50={p50:>8.3}ms p99={p99:>8.3}ms min={:>8.3}ms max={:>8.3}ms (n={})",
sorted[0],
sorted[sorted.len() - 1],
sorted.len()
);
(p50, p99)
};
println!("broker lane connect+ping latency (running-process#383 evidence):");
let (_direct_p50, direct_p99) = report("direct lane", &direct);
let (_broker_p50, broker_p99) = report("broker lane (seam)", &broker);
assert!(
direct_p99 < 1000.0,
"direct lane p99 {direct_p99:.3}ms exceeded 1000ms budget"
);
assert!(
broker_p99 < 1000.0,
"broker lane p99 {broker_p99:.3}ms exceeded 1000ms budget"
);
}
#[test]
fn classify_adopt_error_maps_typed_refusals() {
use running_process::broker::client::BrokerClientError;
use running_process::broker::protocol::ErrorCode;
let refusal = |code: ErrorCode| {
AdoptError::Connect(BrokerClientError::Refused {
code,
reason: "test".to_string(),
retry_after_ms: 0,
})
};
assert_eq!(
classify_adopt_error(&refusal(ErrorCode::ErrorVersionUnsupported)),
Some(BrokerRefusal::VersionUnsupported)
);
assert_eq!(
classify_adopt_error(&refusal(ErrorCode::ErrorVersionBlocked)),
Some(BrokerRefusal::VersionBlocked)
);
assert_eq!(
classify_adopt_error(&refusal(ErrorCode::ErrorServiceUnknown)),
Some(BrokerRefusal::ServiceUnknown)
);
assert_eq!(
classify_adopt_error(&refusal(ErrorCode::ErrorRateLimited)),
Some(BrokerRefusal::RateLimited)
);
assert_eq!(
classify_adopt_error(&refusal(ErrorCode::ErrorShuttingDown)),
Some(BrokerRefusal::ShuttingDown)
);
assert_eq!(
classify_adopt_error(&refusal(ErrorCode::ErrorPeerRejected)),
Some(BrokerRefusal::Other)
);
}
#[test]
fn classify_adopt_error_returns_none_for_disabled() {
assert_eq!(classify_adopt_error(&AdoptError::BrokerDisabled), None);
}
#[test]
fn endpoint_translation_round_trips() {
let endpoint = unique_test_endpoint();
assert_eq!(
to_zccache_endpoint(&to_running_process_endpoint(&endpoint)),
endpoint
);
#[cfg(windows)]
{
assert_eq!(to_zccache_endpoint("name"), r"\\.\pipe\name");
assert_eq!(to_running_process_endpoint(r"\\.\pipe\name"), "name");
}
}
}