Skip to main content

running_process/broker/
client_v2.rs

1//! v2 broker client (slice 4 of #488).
2//!
3//! Counterpart of [`super::client`]. Single public entry point
4//! [`connect`]: dial the v2 broker pipe by program name, exchange a
5//! Hello / Negotiated, return a [`ClientSession`] handle.
6//!
7//! The v2 broker fronts each program via the namespace defined by
8//! [`super::lifecycle::names_v2::v2_program_pipe`]. The Hello round-trip
9//! itself reuses v1's framing (`protocol::{read_frame, write_frame}`)
10//! and message shapes (`Hello`, `HelloReply`) per #470's coexistence
11//! table. Subsequent slices add post-Hello operations (streaming,
12//! HTTP endpoint discovery, etc.); this slice exposes only the
13//! handshake so downstream consumers (zccache et al.) can pin against
14//! a stable v2 client API while the broker side grows under them.
15
16use std::io::{Read, Write};
17use std::time::Duration;
18
19use interprocess::local_socket::traits::Stream as _;
20use interprocess::local_socket::Stream;
21use prost::Message;
22
23/// Default deadline for the Hello round-trip in [`connect`].
24///
25/// Mirrors v1's `AsyncBrokerSession::adopt` budget (~3s). A v2 broker
26/// that accepts the dial but stalls (deadlock, GC pause, hung backend
27/// resolver, ENOSPC log write) would otherwise hang the caller
28/// indefinitely — local-socket streams have no portable read deadline,
29/// so the only bound is via a helper thread + `recv_timeout`. Fixes
30/// #517.
31pub const DEFAULT_HELLO_DEADLINE: Duration = Duration::from_secs(3);
32
33use crate::broker::lifecycle::names::PipePathError;
34use crate::broker::lifecycle::names_v2::v2_program_pipe;
35use crate::broker::lifecycle::sid::{user_sid_hash, SidError};
36use crate::broker::protocol::{
37    hello_reply, read_frame, write_frame, FramingError, Hello, HelloReply, Negotiated, Refused,
38    ENVELOPE_VERSION,
39};
40
41/// Errors surfaced by [`connect`].
42#[derive(Debug, thiserror::Error)]
43pub enum BrokerV2Error {
44    /// `user_sid_hash` failed.
45    #[error(transparent)]
46    Sid(#[from] SidError),
47
48    /// Building the v2 pipe name failed.
49    #[error(transparent)]
50    PipeName(#[from] PipePathError),
51
52    /// Dialing the v2 broker pipe failed (no listener, permission denied, ...).
53    #[error("dial v2 broker pipe at {socket_path:?}: {source}")]
54    Dial {
55        /// Path the client attempted to dial.
56        socket_path: String,
57        /// Underlying IO error.
58        #[source]
59        source: std::io::Error,
60    },
61
62    /// Framing-layer error on read or write (envelope version mismatch,
63    /// truncated body, oversized frame, ...).
64    #[error(transparent)]
65    Framing(#[from] FramingError),
66
67    /// Underlying IO failure during Hello / HelloReply exchange.
68    #[error("Hello round-trip io: {0}")]
69    Io(#[from] std::io::Error),
70
71    /// `HelloReply` payload failed to decode.
72    #[error("HelloReply decode: {0}")]
73    Decode(#[from] prost::DecodeError),
74
75    /// `HelloReply` was syntactically valid but missing its `result` oneof.
76    #[error("HelloReply.result missing")]
77    MissingResult,
78
79    /// Broker explicitly refused the Hello (returned a `Refused` reply).
80    ///
81    /// `retry_after_ms` is promoted from `details.retry_after_ms` to a
82    /// top-level field so RateLimited callers don't have to thread the
83    /// boxed prost payload back out to honor broker-supplied backoff.
84    /// Matches the shape of v1's `BrokerClientError::Refused`. Fixes
85    /// #518. `details` is kept so any future scalar / nested field in
86    /// the prost message stays accessible without another API break.
87    #[error("broker refused Hello: {reason}")]
88    Refused {
89        /// Human-readable refusal text.
90        reason: String,
91        /// Suggested back-off before retrying (0 = no hint). Mirrors the
92        /// proto wire type (`Refused.retry_after_ms` is `uint64`).
93        retry_after_ms: u64,
94        /// Decoded refused payload for further inspection by callers.
95        details: Box<Refused>,
96    },
97
98    /// Encoding the outbound `Hello` failed.
99    #[error("Hello encode: {0}")]
100    Encode(#[from] prost::EncodeError),
101}
102
103/// A live session with the v2 broker.
104///
105/// Wraps the underlying [`Stream`] plus the broker's [`Negotiated`]
106/// reply. Future slices add operations on top (streaming frames, HTTP
107/// endpoint discovery, etc.); slice 4 exposes only the handshake
108/// result so downstream consumers can pin the API shape now.
109#[derive(Debug)]
110pub struct ClientSession {
111    stream: Stream,
112    negotiated: Negotiated,
113}
114
115impl ClientSession {
116    /// The broker's negotiated reply to our `Hello`.
117    pub fn negotiated(&self) -> &Negotiated {
118        &self.negotiated
119    }
120
121    /// Consume the session into the raw byte stream + negotiated reply.
122    ///
123    /// Slices that add post-handshake operations build them on this
124    /// raw stream until the v2 client surface stabilizes.
125    pub fn into_inner(self) -> (Stream, Negotiated) {
126        (self.stream, self.negotiated)
127    }
128}
129
130/// Dial the v2 broker for `program` and exchange Hello / Negotiated.
131///
132/// Computes the pipe name via [`v2_program_pipe`], dials it, sends a
133/// Hello carrying `program` as `service_name` and `version_hint` as
134/// `wanted_version`, reads the HelloReply, and either returns a
135/// [`ClientSession`] (on `Negotiated`) or a [`BrokerV2Error::Refused`]
136/// (on `Refused`).
137///
138/// `connection_id` on the outbound Hello is left at 0 — the broker
139/// assigns one and echoes it in the Negotiated reply.
140///
141/// Bounded by [`DEFAULT_HELLO_DEADLINE`]; for a custom deadline use
142/// [`connect_with_deadline`].
143pub fn connect(program: &str, version_hint: &str) -> Result<ClientSession, BrokerV2Error> {
144    connect_with_deadline(program, version_hint, DEFAULT_HELLO_DEADLINE)
145}
146
147/// Same as [`connect`] but with a caller-supplied deadline for the
148/// Hello round-trip. On deadline returns
149/// `BrokerV2Error::Io(ErrorKind::TimedOut)` and the helper thread
150/// continues to drain (there is no portable way to cancel a sync
151/// `Stream::connect` / framed read mid-call).
152///
153/// Fixes #517 — without this bound, a v2 broker that accepts the dial
154/// then stalls hangs the caller indefinitely.
155pub fn connect_with_deadline(
156    program: &str,
157    version_hint: &str,
158    deadline: Duration,
159) -> Result<ClientSession, BrokerV2Error> {
160    let program = program.to_owned();
161    let version_hint = version_hint.to_owned();
162    let (tx, rx) = std::sync::mpsc::channel();
163    std::thread::spawn(move || {
164        let _ = tx.send(connect_unbounded(&program, &version_hint));
165    });
166    match rx.recv_timeout(deadline) {
167        Ok(result) => result,
168        Err(_) => Err(BrokerV2Error::Io(std::io::Error::new(
169            std::io::ErrorKind::TimedOut,
170            format!("v2 broker Hello did not complete within {deadline:?}"),
171        ))),
172    }
173}
174
175/// Inner connect without a deadline. Called from inside the helper
176/// thread spawned by [`connect_with_deadline`].
177fn connect_unbounded(program: &str, version_hint: &str) -> Result<ClientSession, BrokerV2Error> {
178    let sid = user_sid_hash()?;
179    let pipe_name = v2_program_pipe(program, &sid, 0)?;
180    let socket_path = resolve_socket_path(&pipe_name);
181    let name = wrap_socket_name(&socket_path).map_err(|err| BrokerV2Error::Dial {
182        socket_path: socket_path.clone(),
183        source: std::io::Error::new(std::io::ErrorKind::InvalidInput, err),
184    })?;
185    let mut stream = Stream::connect(name).map_err(|source| BrokerV2Error::Dial {
186        socket_path: socket_path.clone(),
187        source,
188    })?;
189    let negotiated = hello_round_trip(&mut stream, program, version_hint)?;
190    Ok(ClientSession { stream, negotiated })
191}
192
193fn hello_round_trip<S: Read + Write>(
194    stream: &mut S,
195    program: &str,
196    version_hint: &str,
197) -> Result<Negotiated, BrokerV2Error> {
198    let hello = Hello {
199        client_min_protocol: ENVELOPE_VERSION as u32,
200        client_max_protocol: ENVELOPE_VERSION as u32,
201        service_name: program.to_string(),
202        wanted_version: version_hint.to_string(),
203        client_version: env!("CARGO_PKG_VERSION").to_string(),
204        client_capabilities: 0,
205        auth_token: Vec::new(),
206        request_id: format!("client_v2-{program}-{}", std::process::id()),
207        connection_id: 0,
208        peer_pid: std::process::id(),
209        client_lib_name: "running-process broker::client_v2".to_string(),
210        client_lib_version: env!("CARGO_PKG_VERSION").to_string(),
211        peer_attestation_nonce: Vec::new(),
212        capability_token: Vec::new(),
213        client_keepalive_secs: 0,
214    };
215    let mut body = Vec::with_capacity(hello.encoded_len());
216    hello.encode(&mut body)?;
217    write_frame(stream, &body)?;
218
219    let reply_bytes = read_frame(stream)?;
220    let reply = HelloReply::decode(reply_bytes.as_slice())?;
221    match reply.result {
222        Some(hello_reply::Result::Negotiated(n)) => Ok(n),
223        Some(hello_reply::Result::Refused(r)) => Err(BrokerV2Error::Refused {
224            reason: r.reason.clone(),
225            retry_after_ms: r.retry_after_ms,
226            details: Box::new(r),
227        }),
228        None => Err(BrokerV2Error::MissingResult),
229    }
230}
231
232fn resolve_socket_path(bare_name: &str) -> String {
233    #[cfg(windows)]
234    {
235        format!(r"\\.\pipe\{bare_name}")
236    }
237    #[cfg(unix)]
238    {
239        use std::path::PathBuf;
240        let dir: PathBuf = {
241            #[cfg(target_os = "macos")]
242            {
243                let uid = unsafe { libc::getuid() };
244                let tmp = std::env::var_os("TMPDIR")
245                    .map(PathBuf::from)
246                    .unwrap_or_else(|| PathBuf::from("/tmp"));
247                tmp.join(format!(".rp-{uid}-broker-v2"))
248            }
249            #[cfg(not(target_os = "macos"))]
250            {
251                if let Some(d) = std::env::var_os("XDG_RUNTIME_DIR") {
252                    PathBuf::from(d).join("running-process").join("broker-v2")
253                } else {
254                    let uid = unsafe { libc::getuid() };
255                    PathBuf::from(format!("/tmp/running-process-{uid}/broker-v2"))
256                }
257            }
258        };
259        let leaf = if cfg!(target_os = "macos") {
260            let mut hash = blake3::Hasher::new();
261            hash.update(bare_name.as_bytes());
262            let bytes = hash.finalize();
263            let mut hex = String::with_capacity(16);
264            for b in bytes.as_bytes().iter().take(8) {
265                use std::fmt::Write as _;
266                let _ = write!(hex, "{b:02x}");
267            }
268            format!("{hex}.sock")
269        } else {
270            format!("{bare_name}.sock")
271        };
272        dir.join(leaf).to_string_lossy().into_owned()
273    }
274}
275
276fn wrap_socket_name(socket_path: &str) -> Result<interprocess::local_socket::Name<'_>, String> {
277    use interprocess::local_socket::prelude::*;
278    #[cfg(windows)]
279    {
280        use interprocess::local_socket::GenericNamespaced;
281        let bare = socket_path
282            .strip_prefix(r"\\.\pipe\")
283            .unwrap_or(socket_path);
284        bare.to_ns_name::<GenericNamespaced>()
285            .map_err(|e| format!("to_ns_name: {e}"))
286    }
287    #[cfg(unix)]
288    {
289        use interprocess::local_socket::GenericFilePath;
290        socket_path
291            .to_fs_name::<GenericFilePath>()
292            .map_err(|e| format!("to_fs_name: {e}"))
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use interprocess::local_socket::traits::Listener as _;
300    use interprocess::local_socket::ListenerOptions;
301    use std::sync::mpsc;
302    use std::thread;
303    use std::time::{Duration, Instant};
304
305    /// RAII guard: on `Drop`, removes the socket file at `path`. Used by
306    /// [`spawn_stub_broker`] so a panic between bind and the final
307    /// explicit `remove_file` doesn't leak a stale `.sock` that would
308    /// poison the next test run.
309    ///
310    /// Fixes #519: previously, any panic between `tx.send` and the
311    /// explicit `remove_file` left a stale socket. The next test run
312    /// either got `EADDRINUSE` on bind or `ECONNREFUSED` on connect to
313    /// the dead socket — both masking the real failure.
314    #[cfg(unix)]
315    struct SocketCleanup(std::path::PathBuf);
316
317    #[cfg(unix)]
318    impl Drop for SocketCleanup {
319        fn drop(&mut self) {
320            let _ = std::fs::remove_file(&self.0);
321        }
322    }
323
324    /// In-process stub broker: listens on the given path, accepts ONE
325    /// connection, reads a Hello, sends back a `Negotiated` with
326    /// `connection_id = 0xC0FFEE`. Returns nothing — the test asserts
327    /// against the ClientSession the real client builds.
328    fn spawn_stub_broker(socket_path: String) -> mpsc::Receiver<()> {
329        let (tx, rx) = mpsc::channel();
330        thread::spawn(move || {
331            let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
332            #[cfg(unix)]
333            let _cleanup = {
334                let _ = std::fs::create_dir_all(
335                    std::path::Path::new(&socket_path).parent().unwrap(),
336                );
337                let _ = std::fs::remove_file(&socket_path);
338                SocketCleanup(std::path::PathBuf::from(&socket_path))
339            };
340            let listener = ListenerOptions::new()
341                .name(name)
342                .create_sync()
343                .expect("ListenerOptions create_sync");
344            tx.send(()).expect("send listener-ready signal");
345            let mut stream = listener.accept().expect("accept");
346            let bytes = read_frame(&mut stream).expect("read Hello frame");
347            let hello = Hello::decode(bytes.as_slice()).expect("decode Hello");
348            let reply = HelloReply {
349                result: Some(hello_reply::Result::Negotiated(Negotiated {
350                    negotiated_protocol: ENVELOPE_VERSION as u32,
351                    daemon_version: "stub-1.2.3".to_string(),
352                    backend_pipe: String::new(),
353                    warnings: Vec::new(),
354                    server_capabilities: 0,
355                    keepalive_interval_secs: 0,
356                    handle_passed_token: Vec::new(),
357                    connection_id: 0x00C0_FFEE,
358                })),
359            };
360            let mut body = Vec::with_capacity(reply.encoded_len());
361            reply.encode(&mut body).expect("encode HelloReply");
362            write_frame(&mut stream, &body).expect("write HelloReply frame");
363            // RAII guard removes the socket on scope exit; the explicit
364            // remove that lived here previously was a no-op leftover.
365            let _ = hello.service_name;
366        });
367        rx
368    }
369
370    #[test]
371    fn connect_completes_hello_round_trip_against_stub_broker() {
372        // Use a per-test program name so parallel tests don't collide.
373        let program = "client-v2-stub";
374        let sid = user_sid_hash().expect("user_sid_hash");
375        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
376        let socket_path = resolve_socket_path(&pipe_name);
377
378        let ready = spawn_stub_broker(socket_path.clone());
379        ready
380            .recv_timeout(Duration::from_secs(2))
381            .expect("stub broker listening");
382
383        // The Listener on Windows is fully ready as soon as `create_sync`
384        // returns; on Unix the same holds. But a short retry loop is
385        // resilient to spawning race in CI.
386        let start = Instant::now();
387        let session = loop {
388            match connect(program, "0.0.0") {
389                Ok(s) => break s,
390                Err(err) if start.elapsed() < Duration::from_secs(2) => {
391                    eprintln!("connect retry after error: {err}");
392                    std::thread::sleep(Duration::from_millis(50));
393                    continue;
394                }
395                Err(err) => panic!("connect failed after retries: {err}"),
396            }
397        };
398
399        let neg = session.negotiated();
400        assert_eq!(neg.negotiated_protocol, ENVELOPE_VERSION as u32);
401        assert_eq!(neg.connection_id, 0x00C0_FFEE);
402        assert_eq!(neg.daemon_version, "stub-1.2.3");
403    }
404
405    #[test]
406    fn connect_with_no_broker_returns_dial_error() {
407        let err = connect("client-v2-no-broker-ever", "0.0.0")
408            .expect_err("no broker => Dial error");
409        match err {
410            BrokerV2Error::Dial { .. } => {}
411            other => panic!("expected Dial, got: {other:?}"),
412        }
413    }
414
415    /// In-process stub that accepts the dial then sleeps forever — the
416    /// pathological case that motivated #517. Without the helper-thread
417    /// deadline, the client hangs indefinitely.
418    fn spawn_stall_broker(socket_path: String) -> mpsc::Receiver<()> {
419        let (tx, rx) = mpsc::channel();
420        thread::spawn(move || {
421            let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
422            #[cfg(unix)]
423            let _cleanup = {
424                let _ = std::fs::create_dir_all(
425                    std::path::Path::new(&socket_path).parent().unwrap(),
426                );
427                let _ = std::fs::remove_file(&socket_path);
428                SocketCleanup(std::path::PathBuf::from(&socket_path))
429            };
430            let listener = ListenerOptions::new()
431                .name(name)
432                .create_sync()
433                .expect("ListenerOptions create_sync");
434            tx.send(()).expect("send listener-ready signal");
435            let _stream = listener.accept().expect("accept");
436            // Stall — never reads the Hello, never replies. The deadline
437            // bound on the client side is what releases it.
438            thread::sleep(Duration::from_secs(60));
439        });
440        rx
441    }
442
443    /// `connect_with_deadline` returns `TimedOut` when the broker
444    /// accepts then stalls. Fixes #517.
445    #[test]
446    fn connect_with_deadline_fires_on_stalling_broker() {
447        let program = "client-v2-stall-deadline";
448        let sid = user_sid_hash().expect("user_sid_hash");
449        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
450        let socket_path = resolve_socket_path(&pipe_name);
451        let ready = spawn_stall_broker(socket_path);
452        ready
453            .recv_timeout(Duration::from_secs(2))
454            .expect("stall broker listening");
455        let start = Instant::now();
456        let err = connect_with_deadline(program, "0.0.0", Duration::from_millis(200))
457            .expect_err("stall broker => deadline TimedOut");
458        let elapsed = start.elapsed();
459        match err {
460            BrokerV2Error::Io(io) => assert_eq!(io.kind(), std::io::ErrorKind::TimedOut),
461            other => panic!("expected Io(TimedOut), got: {other:?}"),
462        }
463        assert!(
464            elapsed < Duration::from_secs(2),
465            "deadline should fire within budget; took {elapsed:?}"
466        );
467    }
468
469    /// `BrokerV2Error::Refused` exposes `retry_after_ms` as a top-level
470    /// field, mirroring v1's `BrokerClientError::Refused`. Fixes #518.
471    /// Constructs a stub broker that replies with Refused, asserts the
472    /// retry hint surfaces top-level (not buried in `details`).
473    fn spawn_refusing_broker(socket_path: String, retry_after_ms: u64) -> mpsc::Receiver<()> {
474        let (tx, rx) = mpsc::channel();
475        thread::spawn(move || {
476            let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
477            #[cfg(unix)]
478            let _cleanup = {
479                let _ = std::fs::create_dir_all(
480                    std::path::Path::new(&socket_path).parent().unwrap(),
481                );
482                let _ = std::fs::remove_file(&socket_path);
483                SocketCleanup(std::path::PathBuf::from(&socket_path))
484            };
485            let listener = ListenerOptions::new()
486                .name(name)
487                .create_sync()
488                .expect("ListenerOptions create_sync");
489            tx.send(()).expect("send listener-ready signal");
490            let mut stream = listener.accept().expect("accept");
491            let _bytes = read_frame(&mut stream).expect("read Hello frame");
492            let reply = HelloReply {
493                result: Some(hello_reply::Result::Refused(Refused {
494                    code: 0,
495                    reason: "stub refusal".to_string(),
496                    retry_after_ms,
497                    ..Refused::default()
498                })),
499            };
500            let mut body = Vec::with_capacity(reply.encoded_len());
501            reply.encode(&mut body).expect("encode HelloReply");
502            write_frame(&mut stream, &body).expect("write HelloReply frame");
503        });
504        rx
505    }
506
507    /// Stress stub: accepts `count` connections in a loop, replying
508    /// Negotiated to each. Used by the concurrent-connect stress test
509    /// to prove the client side doesn't deadlock or leak handles when
510    /// many threads dial simultaneously.
511    fn spawn_multi_accept_stub_broker(socket_path: String, count: usize) -> mpsc::Receiver<()> {
512        let (tx, rx) = mpsc::channel();
513        thread::spawn(move || {
514            let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
515            #[cfg(unix)]
516            let _cleanup = {
517                let _ = std::fs::create_dir_all(
518                    std::path::Path::new(&socket_path).parent().unwrap(),
519                );
520                let _ = std::fs::remove_file(&socket_path);
521                SocketCleanup(std::path::PathBuf::from(&socket_path))
522            };
523            let listener = ListenerOptions::new()
524                .name(name)
525                .create_sync()
526                .expect("ListenerOptions create_sync");
527            tx.send(()).expect("send listener-ready signal");
528            for _ in 0..count {
529                let mut stream = match listener.accept() {
530                    Ok(s) => s,
531                    Err(_) => break,
532                };
533                let _ = read_frame(&mut stream).expect("read Hello frame");
534                let reply = HelloReply {
535                    result: Some(hello_reply::Result::Negotiated(Negotiated {
536                        negotiated_protocol: ENVELOPE_VERSION as u32,
537                        daemon_version: "stub-multi-1".to_string(),
538                        backend_pipe: String::new(),
539                        warnings: Vec::new(),
540                        server_capabilities: 0,
541                        keepalive_interval_secs: 0,
542                        handle_passed_token: Vec::new(),
543                        connection_id: 0x0FFF_F1EE,
544                    })),
545                };
546                let mut body = Vec::with_capacity(reply.encoded_len());
547                reply.encode(&mut body).expect("encode HelloReply");
548                write_frame(&mut stream, &body).expect("write HelloReply frame");
549            }
550        });
551        rx
552    }
553
554    /// Stress test: 8 concurrent `connect_with_deadline` calls against a
555    /// multi-accept stub broker. All must succeed within wall-clock
556    /// budget — the helper-thread + `recv_timeout` pattern must scale
557    /// to concurrent callers without serializing on a global mutex or
558    /// deadlocking on the channel.
559    #[test]
560    fn concurrent_connects_against_multi_accept_broker() {
561        let program = "client-v2-concurrent-multi";
562        let sid = user_sid_hash().expect("user_sid_hash");
563        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
564        let socket_path = resolve_socket_path(&pipe_name);
565        const N: usize = 8;
566        let ready = spawn_multi_accept_stub_broker(socket_path, N);
567        ready
568            .recv_timeout(Duration::from_secs(2))
569            .expect("multi-accept broker listening");
570
571        let start = Instant::now();
572        let handles: Vec<_> = (0..N)
573            .map(|_| {
574                let p = program.to_string();
575                thread::spawn(move || connect_with_deadline(&p, "0.0.0", Duration::from_secs(2)))
576            })
577            .collect();
578        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
579        let elapsed = start.elapsed();
580
581        let ok = results.iter().filter(|r| r.is_ok()).count();
582        assert_eq!(
583            ok, N,
584            "all {N} concurrent connects must succeed; got {ok} ok, full results: {results:?}"
585        );
586        assert!(
587            elapsed < Duration::from_secs(5),
588            "concurrent connect took {elapsed:?}; expected < 5s"
589        );
590        for session in results.iter().flatten() {
591            assert_eq!(session.negotiated().connection_id, 0x0FFF_F1EE);
592            assert_eq!(session.negotiated().daemon_version, "stub-multi-1");
593        }
594    }
595
596    /// Adversarial stub: accepts, reads Hello, replies with a HelloReply
597    /// whose `result` oneof is `None` (proto3 default — easy bug if a
598    /// future broker forgets to set the variant). Must surface as
599    /// `BrokerV2Error::MissingResult`, not be mis-routed as success.
600    fn spawn_missing_result_broker(socket_path: String) -> mpsc::Receiver<()> {
601        let (tx, rx) = mpsc::channel();
602        thread::spawn(move || {
603            let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
604            #[cfg(unix)]
605            let _cleanup = {
606                let _ = std::fs::create_dir_all(
607                    std::path::Path::new(&socket_path).parent().unwrap(),
608                );
609                let _ = std::fs::remove_file(&socket_path);
610                SocketCleanup(std::path::PathBuf::from(&socket_path))
611            };
612            let listener = ListenerOptions::new()
613                .name(name)
614                .create_sync()
615                .expect("ListenerOptions create_sync");
616            tx.send(()).expect("send listener-ready signal");
617            let mut stream = listener.accept().expect("accept");
618            let _ = read_frame(&mut stream).expect("read Hello frame");
619            let reply = HelloReply { result: None };
620            let mut body = Vec::with_capacity(reply.encoded_len());
621            reply.encode(&mut body).expect("encode HelloReply");
622            write_frame(&mut stream, &body).expect("write HelloReply frame");
623        });
624        rx
625    }
626
627    #[test]
628    fn connect_rejects_hello_reply_with_missing_result_oneof() {
629        let program = "client-v2-missing-result";
630        let sid = user_sid_hash().expect("user_sid_hash");
631        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
632        let socket_path = resolve_socket_path(&pipe_name);
633        let ready = spawn_missing_result_broker(socket_path);
634        ready
635            .recv_timeout(Duration::from_secs(2))
636            .expect("missing-result broker listening");
637        let start = Instant::now();
638        let err = loop {
639            match connect(program, "0.0.0") {
640                Err(e) => break e,
641                Ok(_) if start.elapsed() < Duration::from_secs(2) => {
642                    thread::sleep(Duration::from_millis(50));
643                    continue;
644                }
645                Ok(_) => panic!("expected MissingResult, got Ok"),
646            }
647        };
648        assert!(
649            matches!(err, BrokerV2Error::MissingResult),
650            "expected MissingResult, got: {err:?}"
651        );
652    }
653
654    /// Adversarial: broker accepts then immediately drops the stream
655    /// without reading the Hello or writing a reply. Must surface as
656    /// a typed transport error (Framing/Io), never as a successful
657    /// session, never hang past the deadline.
658    fn spawn_drop_on_accept_broker(socket_path: String) -> mpsc::Receiver<()> {
659        let (tx, rx) = mpsc::channel();
660        thread::spawn(move || {
661            let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
662            #[cfg(unix)]
663            let _cleanup = {
664                let _ = std::fs::create_dir_all(
665                    std::path::Path::new(&socket_path).parent().unwrap(),
666                );
667                let _ = std::fs::remove_file(&socket_path);
668                SocketCleanup(std::path::PathBuf::from(&socket_path))
669            };
670            let listener = ListenerOptions::new()
671                .name(name)
672                .create_sync()
673                .expect("ListenerOptions create_sync");
674            tx.send(()).expect("send listener-ready signal");
675            let stream = listener.accept().expect("accept");
676            drop(stream); // immediate close
677        });
678        rx
679    }
680
681    #[test]
682    fn connect_returns_err_on_premature_disconnect() {
683        let program = "client-v2-prem-disconnect";
684        let sid = user_sid_hash().expect("user_sid_hash");
685        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
686        let socket_path = resolve_socket_path(&pipe_name);
687        let ready = spawn_drop_on_accept_broker(socket_path);
688        ready
689            .recv_timeout(Duration::from_secs(2))
690            .expect("drop-on-accept broker listening");
691        let start = Instant::now();
692        let err = loop {
693            match connect_with_deadline(program, "0.0.0", Duration::from_millis(500)) {
694                Err(e) => break e,
695                Ok(_) if start.elapsed() < Duration::from_secs(2) => {
696                    thread::sleep(Duration::from_millis(50));
697                    continue;
698                }
699                Ok(_) => panic!("expected transport error, got Ok"),
700            }
701        };
702        // The exact variant depends on whether the write or read hits the
703        // disconnect first: Framing(UnexpectedEof), Io(BrokenPipe), or
704        // Dial (rare race). All are transport-class — none is a session.
705        match err {
706            BrokerV2Error::Framing(_)
707            | BrokerV2Error::Io(_)
708            | BrokerV2Error::Dial { .. } => {}
709            other => panic!("expected transport variant, got: {other:?}"),
710        }
711        assert!(
712            start.elapsed() < Duration::from_secs(2),
713            "must not hang past deadline; took {:?}",
714            start.elapsed()
715        );
716    }
717
718    /// Adversarial: every malformed program name must be rejected BEFORE
719    /// `Stream::connect` runs — proves `v2_program_pipe`'s validation is
720    /// the front gate. Catches NUL injection, path traversal, uppercase,
721    /// over-long names, and empties. The expected error variant is
722    /// `BrokerV2Error::PipeName(_)` because `v2_program_pipe`'s
723    /// `validate_service_name` fires before any IO.
724    #[test]
725    fn connect_rejects_invalid_program_names_before_dial() {
726        let too_long = "a".repeat(65);
727        for bad in [
728            "zccache\0evil",
729            "../etc/passwd",
730            r"a\b",
731            "Zccache",
732            "a b",
733            too_long.as_str(),
734            "",
735        ] {
736            let err = connect(bad, "0.0.0")
737                .expect_err(&format!("invalid program name {bad:?} must be rejected"));
738            assert!(
739                matches!(err, BrokerV2Error::PipeName(_)),
740                "expected PipeName for {bad:?}, got: {err:?}"
741            );
742        }
743    }
744
745    /// Pin u64::MAX round-trips through `retry_after_ms` without overflow.
746    /// `Duration::from_millis(u64::MAX)` is valid (~584M years); locks
747    /// the contract for any caller doing `Duration::from_millis(retry_after_ms)`.
748    #[test]
749    fn refused_with_u64_max_retry_after_ms_round_trips() {
750        let program = "client-v2-refused-u64-max";
751        let sid = user_sid_hash().expect("user_sid_hash");
752        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
753        let socket_path = resolve_socket_path(&pipe_name);
754        let ready = spawn_refusing_broker(socket_path, u64::MAX);
755        ready
756            .recv_timeout(Duration::from_secs(2))
757            .expect("refusing broker listening");
758        let start = Instant::now();
759        let err = loop {
760            match connect(program, "0.0.0") {
761                Err(e) => break e,
762                Ok(_) if start.elapsed() < Duration::from_secs(2) => {
763                    thread::sleep(Duration::from_millis(50));
764                    continue;
765                }
766                Ok(_) => panic!("expected Refused, got Ok"),
767            }
768        };
769        match err {
770            BrokerV2Error::Refused {
771                retry_after_ms,
772                details,
773                ..
774            } => {
775                assert_eq!(retry_after_ms, u64::MAX);
776                assert_eq!(details.retry_after_ms, u64::MAX);
777                // Caller-side contract: this Duration construction must not panic.
778                let _safe_duration = Duration::from_millis(retry_after_ms);
779            }
780            other => panic!("expected Refused, got: {other:?}"),
781        }
782    }
783
784    #[test]
785    fn refused_exposes_retry_after_ms_top_level() {
786        let program = "client-v2-refused-retry";
787        let sid = user_sid_hash().expect("user_sid_hash");
788        let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
789        let socket_path = resolve_socket_path(&pipe_name);
790        let ready = spawn_refusing_broker(socket_path, 1234);
791        ready
792            .recv_timeout(Duration::from_secs(2))
793            .expect("refusing broker listening");
794        let start = Instant::now();
795        let err = loop {
796            match connect(program, "0.0.0") {
797                Err(e) => break e,
798                Ok(_) if start.elapsed() < Duration::from_secs(2) => {
799                    thread::sleep(Duration::from_millis(50));
800                    continue;
801                }
802                Ok(_) => panic!("expected Refused"),
803            }
804        };
805        match err {
806            BrokerV2Error::Refused {
807                retry_after_ms,
808                reason,
809                details,
810            } => {
811                assert_eq!(
812                    retry_after_ms, 1234,
813                    "retry hint must surface top-level (was: {retry_after_ms})"
814                );
815                assert_eq!(reason, "stub refusal");
816                assert_eq!(
817                    details.retry_after_ms, 1234,
818                    "details payload still carries the field for full diagnostics"
819                );
820            }
821            other => panic!("expected Refused, got: {other:?}"),
822        }
823    }
824}