1use std::io::{Read, Write};
17use std::time::Duration;
18
19use interprocess::local_socket::traits::Stream as _;
20use interprocess::local_socket::Stream;
21use prost::Message;
22
23pub const DEFAULT_HELLO_DEADLINE: Duration = Duration::from_secs(3);
32
33use crate::broker::lifecycle::names::PipePathError;
34use crate::broker::lifecycle::names_v2::v2_program_pipe;
35use crate::broker::lifecycle::sid::{user_sid_hash, SidError};
36use crate::broker::protocol::{
37 hello_reply, read_frame, write_frame, FramingError, Hello, HelloReply, Negotiated, Refused,
38 ENVELOPE_VERSION,
39};
40
41#[derive(Debug, thiserror::Error)]
43pub enum BrokerV2Error {
44 #[error(transparent)]
46 Sid(#[from] SidError),
47
48 #[error(transparent)]
50 PipeName(#[from] PipePathError),
51
52 #[error("dial v2 broker pipe at {socket_path:?}: {source}")]
54 Dial {
55 socket_path: String,
57 #[source]
59 source: std::io::Error,
60 },
61
62 #[error(transparent)]
65 Framing(#[from] FramingError),
66
67 #[error("Hello round-trip io: {0}")]
69 Io(#[from] std::io::Error),
70
71 #[error("HelloReply decode: {0}")]
73 Decode(#[from] prost::DecodeError),
74
75 #[error("HelloReply.result missing")]
77 MissingResult,
78
79 #[error("broker refused Hello: {reason}")]
88 Refused {
89 reason: String,
91 retry_after_ms: u64,
94 details: Box<Refused>,
96 },
97
98 #[error("Hello encode: {0}")]
100 Encode(#[from] prost::EncodeError),
101}
102
103#[derive(Debug)]
110pub struct ClientSession {
111 stream: Stream,
112 negotiated: Negotiated,
113}
114
115impl ClientSession {
116 pub fn negotiated(&self) -> &Negotiated {
118 &self.negotiated
119 }
120
121 pub fn into_inner(self) -> (Stream, Negotiated) {
126 (self.stream, self.negotiated)
127 }
128}
129
130pub fn connect(program: &str, version_hint: &str) -> Result<ClientSession, BrokerV2Error> {
144 connect_with_deadline(program, version_hint, DEFAULT_HELLO_DEADLINE)
145}
146
147pub fn connect_with_deadline(
156 program: &str,
157 version_hint: &str,
158 deadline: Duration,
159) -> Result<ClientSession, BrokerV2Error> {
160 let program = program.to_owned();
161 let version_hint = version_hint.to_owned();
162 let (tx, rx) = std::sync::mpsc::channel();
163 std::thread::spawn(move || {
164 let _ = tx.send(connect_unbounded(&program, &version_hint));
165 });
166 match rx.recv_timeout(deadline) {
167 Ok(result) => result,
168 Err(_) => Err(BrokerV2Error::Io(std::io::Error::new(
169 std::io::ErrorKind::TimedOut,
170 format!("v2 broker Hello did not complete within {deadline:?}"),
171 ))),
172 }
173}
174
175fn connect_unbounded(program: &str, version_hint: &str) -> Result<ClientSession, BrokerV2Error> {
178 let sid = user_sid_hash()?;
179 let pipe_name = v2_program_pipe(program, &sid, 0)?;
180 let socket_path = resolve_socket_path(&pipe_name);
181 let name = wrap_socket_name(&socket_path).map_err(|err| BrokerV2Error::Dial {
182 socket_path: socket_path.clone(),
183 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, err),
184 })?;
185 let mut stream = Stream::connect(name).map_err(|source| BrokerV2Error::Dial {
186 socket_path: socket_path.clone(),
187 source,
188 })?;
189 let negotiated = hello_round_trip(&mut stream, program, version_hint)?;
190 Ok(ClientSession { stream, negotiated })
191}
192
193fn hello_round_trip<S: Read + Write>(
194 stream: &mut S,
195 program: &str,
196 version_hint: &str,
197) -> Result<Negotiated, BrokerV2Error> {
198 let hello = Hello {
199 client_min_protocol: ENVELOPE_VERSION as u32,
200 client_max_protocol: ENVELOPE_VERSION as u32,
201 service_name: program.to_string(),
202 wanted_version: version_hint.to_string(),
203 client_version: env!("CARGO_PKG_VERSION").to_string(),
204 client_capabilities: 0,
205 auth_token: Vec::new(),
206 request_id: format!("client_v2-{program}-{}", std::process::id()),
207 connection_id: 0,
208 peer_pid: std::process::id(),
209 client_lib_name: "running-process broker::client_v2".to_string(),
210 client_lib_version: env!("CARGO_PKG_VERSION").to_string(),
211 peer_attestation_nonce: Vec::new(),
212 capability_token: Vec::new(),
213 client_keepalive_secs: 0,
214 };
215 let mut body = Vec::with_capacity(hello.encoded_len());
216 hello.encode(&mut body)?;
217 write_frame(stream, &body)?;
218
219 let reply_bytes = read_frame(stream)?;
220 let reply = HelloReply::decode(reply_bytes.as_slice())?;
221 match reply.result {
222 Some(hello_reply::Result::Negotiated(n)) => Ok(n),
223 Some(hello_reply::Result::Refused(r)) => Err(BrokerV2Error::Refused {
224 reason: r.reason.clone(),
225 retry_after_ms: r.retry_after_ms,
226 details: Box::new(r),
227 }),
228 None => Err(BrokerV2Error::MissingResult),
229 }
230}
231
232fn resolve_socket_path(bare_name: &str) -> String {
233 #[cfg(windows)]
234 {
235 format!(r"\\.\pipe\{bare_name}")
236 }
237 #[cfg(unix)]
238 {
239 use std::path::PathBuf;
240 let dir: PathBuf = {
241 #[cfg(target_os = "macos")]
242 {
243 let uid = unsafe { libc::getuid() };
244 let tmp = std::env::var_os("TMPDIR")
245 .map(PathBuf::from)
246 .unwrap_or_else(|| PathBuf::from("/tmp"));
247 tmp.join(format!(".rp-{uid}-broker-v2"))
248 }
249 #[cfg(not(target_os = "macos"))]
250 {
251 if let Some(d) = std::env::var_os("XDG_RUNTIME_DIR") {
252 PathBuf::from(d).join("running-process").join("broker-v2")
253 } else {
254 let uid = unsafe { libc::getuid() };
255 PathBuf::from(format!("/tmp/running-process-{uid}/broker-v2"))
256 }
257 }
258 };
259 let leaf = if cfg!(target_os = "macos") {
260 let mut hash = blake3::Hasher::new();
261 hash.update(bare_name.as_bytes());
262 let bytes = hash.finalize();
263 let mut hex = String::with_capacity(16);
264 for b in bytes.as_bytes().iter().take(8) {
265 use std::fmt::Write as _;
266 let _ = write!(hex, "{b:02x}");
267 }
268 format!("{hex}.sock")
269 } else {
270 format!("{bare_name}.sock")
271 };
272 dir.join(leaf).to_string_lossy().into_owned()
273 }
274}
275
276fn wrap_socket_name(socket_path: &str) -> Result<interprocess::local_socket::Name<'_>, String> {
277 use interprocess::local_socket::prelude::*;
278 #[cfg(windows)]
279 {
280 use interprocess::local_socket::GenericNamespaced;
281 let bare = socket_path
282 .strip_prefix(r"\\.\pipe\")
283 .unwrap_or(socket_path);
284 bare.to_ns_name::<GenericNamespaced>()
285 .map_err(|e| format!("to_ns_name: {e}"))
286 }
287 #[cfg(unix)]
288 {
289 use interprocess::local_socket::GenericFilePath;
290 socket_path
291 .to_fs_name::<GenericFilePath>()
292 .map_err(|e| format!("to_fs_name: {e}"))
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use interprocess::local_socket::traits::Listener as _;
300 use interprocess::local_socket::ListenerOptions;
301 use std::sync::mpsc;
302 use std::thread;
303 use std::time::{Duration, Instant};
304
305 #[cfg(unix)]
315 struct SocketCleanup(std::path::PathBuf);
316
317 #[cfg(unix)]
318 impl Drop for SocketCleanup {
319 fn drop(&mut self) {
320 let _ = std::fs::remove_file(&self.0);
321 }
322 }
323
324 fn spawn_stub_broker(socket_path: String) -> mpsc::Receiver<()> {
329 let (tx, rx) = mpsc::channel();
330 thread::spawn(move || {
331 let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
332 #[cfg(unix)]
333 let _cleanup = {
334 let _ = std::fs::create_dir_all(
335 std::path::Path::new(&socket_path).parent().unwrap(),
336 );
337 let _ = std::fs::remove_file(&socket_path);
338 SocketCleanup(std::path::PathBuf::from(&socket_path))
339 };
340 let listener = ListenerOptions::new()
341 .name(name)
342 .create_sync()
343 .expect("ListenerOptions create_sync");
344 tx.send(()).expect("send listener-ready signal");
345 let mut stream = listener.accept().expect("accept");
346 let bytes = read_frame(&mut stream).expect("read Hello frame");
347 let hello = Hello::decode(bytes.as_slice()).expect("decode Hello");
348 let reply = HelloReply {
349 result: Some(hello_reply::Result::Negotiated(Negotiated {
350 negotiated_protocol: ENVELOPE_VERSION as u32,
351 daemon_version: "stub-1.2.3".to_string(),
352 backend_pipe: String::new(),
353 warnings: Vec::new(),
354 server_capabilities: 0,
355 keepalive_interval_secs: 0,
356 handle_passed_token: Vec::new(),
357 connection_id: 0x00C0_FFEE,
358 })),
359 };
360 let mut body = Vec::with_capacity(reply.encoded_len());
361 reply.encode(&mut body).expect("encode HelloReply");
362 write_frame(&mut stream, &body).expect("write HelloReply frame");
363 let _ = hello.service_name;
366 });
367 rx
368 }
369
370 #[test]
371 fn connect_completes_hello_round_trip_against_stub_broker() {
372 let program = "client-v2-stub";
374 let sid = user_sid_hash().expect("user_sid_hash");
375 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
376 let socket_path = resolve_socket_path(&pipe_name);
377
378 let ready = spawn_stub_broker(socket_path.clone());
379 ready
380 .recv_timeout(Duration::from_secs(2))
381 .expect("stub broker listening");
382
383 let start = Instant::now();
387 let session = loop {
388 match connect(program, "0.0.0") {
389 Ok(s) => break s,
390 Err(err) if start.elapsed() < Duration::from_secs(2) => {
391 eprintln!("connect retry after error: {err}");
392 std::thread::sleep(Duration::from_millis(50));
393 continue;
394 }
395 Err(err) => panic!("connect failed after retries: {err}"),
396 }
397 };
398
399 let neg = session.negotiated();
400 assert_eq!(neg.negotiated_protocol, ENVELOPE_VERSION as u32);
401 assert_eq!(neg.connection_id, 0x00C0_FFEE);
402 assert_eq!(neg.daemon_version, "stub-1.2.3");
403 }
404
405 #[test]
406 fn connect_with_no_broker_returns_dial_error() {
407 let err = connect("client-v2-no-broker-ever", "0.0.0")
408 .expect_err("no broker => Dial error");
409 match err {
410 BrokerV2Error::Dial { .. } => {}
411 other => panic!("expected Dial, got: {other:?}"),
412 }
413 }
414
415 fn spawn_stall_broker(socket_path: String) -> mpsc::Receiver<()> {
419 let (tx, rx) = mpsc::channel();
420 thread::spawn(move || {
421 let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
422 #[cfg(unix)]
423 let _cleanup = {
424 let _ = std::fs::create_dir_all(
425 std::path::Path::new(&socket_path).parent().unwrap(),
426 );
427 let _ = std::fs::remove_file(&socket_path);
428 SocketCleanup(std::path::PathBuf::from(&socket_path))
429 };
430 let listener = ListenerOptions::new()
431 .name(name)
432 .create_sync()
433 .expect("ListenerOptions create_sync");
434 tx.send(()).expect("send listener-ready signal");
435 let _stream = listener.accept().expect("accept");
436 thread::sleep(Duration::from_secs(60));
439 });
440 rx
441 }
442
443 #[test]
446 fn connect_with_deadline_fires_on_stalling_broker() {
447 let program = "client-v2-stall-deadline";
448 let sid = user_sid_hash().expect("user_sid_hash");
449 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
450 let socket_path = resolve_socket_path(&pipe_name);
451 let ready = spawn_stall_broker(socket_path);
452 ready
453 .recv_timeout(Duration::from_secs(2))
454 .expect("stall broker listening");
455 let start = Instant::now();
456 let err = connect_with_deadline(program, "0.0.0", Duration::from_millis(200))
457 .expect_err("stall broker => deadline TimedOut");
458 let elapsed = start.elapsed();
459 match err {
460 BrokerV2Error::Io(io) => assert_eq!(io.kind(), std::io::ErrorKind::TimedOut),
461 other => panic!("expected Io(TimedOut), got: {other:?}"),
462 }
463 assert!(
464 elapsed < Duration::from_secs(2),
465 "deadline should fire within budget; took {elapsed:?}"
466 );
467 }
468
469 fn spawn_refusing_broker(socket_path: String, retry_after_ms: u64) -> mpsc::Receiver<()> {
474 let (tx, rx) = mpsc::channel();
475 thread::spawn(move || {
476 let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
477 #[cfg(unix)]
478 let _cleanup = {
479 let _ = std::fs::create_dir_all(
480 std::path::Path::new(&socket_path).parent().unwrap(),
481 );
482 let _ = std::fs::remove_file(&socket_path);
483 SocketCleanup(std::path::PathBuf::from(&socket_path))
484 };
485 let listener = ListenerOptions::new()
486 .name(name)
487 .create_sync()
488 .expect("ListenerOptions create_sync");
489 tx.send(()).expect("send listener-ready signal");
490 let mut stream = listener.accept().expect("accept");
491 let _bytes = read_frame(&mut stream).expect("read Hello frame");
492 let reply = HelloReply {
493 result: Some(hello_reply::Result::Refused(Refused {
494 code: 0,
495 reason: "stub refusal".to_string(),
496 retry_after_ms,
497 ..Refused::default()
498 })),
499 };
500 let mut body = Vec::with_capacity(reply.encoded_len());
501 reply.encode(&mut body).expect("encode HelloReply");
502 write_frame(&mut stream, &body).expect("write HelloReply frame");
503 });
504 rx
505 }
506
507 fn spawn_multi_accept_stub_broker(socket_path: String, count: usize) -> mpsc::Receiver<()> {
512 let (tx, rx) = mpsc::channel();
513 thread::spawn(move || {
514 let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
515 #[cfg(unix)]
516 let _cleanup = {
517 let _ = std::fs::create_dir_all(
518 std::path::Path::new(&socket_path).parent().unwrap(),
519 );
520 let _ = std::fs::remove_file(&socket_path);
521 SocketCleanup(std::path::PathBuf::from(&socket_path))
522 };
523 let listener = ListenerOptions::new()
524 .name(name)
525 .create_sync()
526 .expect("ListenerOptions create_sync");
527 tx.send(()).expect("send listener-ready signal");
528 for _ in 0..count {
529 let mut stream = match listener.accept() {
530 Ok(s) => s,
531 Err(_) => break,
532 };
533 let _ = read_frame(&mut stream).expect("read Hello frame");
534 let reply = HelloReply {
535 result: Some(hello_reply::Result::Negotiated(Negotiated {
536 negotiated_protocol: ENVELOPE_VERSION as u32,
537 daemon_version: "stub-multi-1".to_string(),
538 backend_pipe: String::new(),
539 warnings: Vec::new(),
540 server_capabilities: 0,
541 keepalive_interval_secs: 0,
542 handle_passed_token: Vec::new(),
543 connection_id: 0x0FFF_F1EE,
544 })),
545 };
546 let mut body = Vec::with_capacity(reply.encoded_len());
547 reply.encode(&mut body).expect("encode HelloReply");
548 write_frame(&mut stream, &body).expect("write HelloReply frame");
549 }
550 });
551 rx
552 }
553
554 #[test]
560 fn concurrent_connects_against_multi_accept_broker() {
561 let program = "client-v2-concurrent-multi";
562 let sid = user_sid_hash().expect("user_sid_hash");
563 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
564 let socket_path = resolve_socket_path(&pipe_name);
565 const N: usize = 8;
566 let ready = spawn_multi_accept_stub_broker(socket_path, N);
567 ready
568 .recv_timeout(Duration::from_secs(2))
569 .expect("multi-accept broker listening");
570
571 let start = Instant::now();
572 let handles: Vec<_> = (0..N)
573 .map(|_| {
574 let p = program.to_string();
575 thread::spawn(move || connect_with_deadline(&p, "0.0.0", Duration::from_secs(2)))
576 })
577 .collect();
578 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
579 let elapsed = start.elapsed();
580
581 let ok = results.iter().filter(|r| r.is_ok()).count();
582 assert_eq!(
583 ok, N,
584 "all {N} concurrent connects must succeed; got {ok} ok, full results: {results:?}"
585 );
586 assert!(
587 elapsed < Duration::from_secs(5),
588 "concurrent connect took {elapsed:?}; expected < 5s"
589 );
590 for session in results.iter().flatten() {
591 assert_eq!(session.negotiated().connection_id, 0x0FFF_F1EE);
592 assert_eq!(session.negotiated().daemon_version, "stub-multi-1");
593 }
594 }
595
596 fn spawn_missing_result_broker(socket_path: String) -> mpsc::Receiver<()> {
601 let (tx, rx) = mpsc::channel();
602 thread::spawn(move || {
603 let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
604 #[cfg(unix)]
605 let _cleanup = {
606 let _ = std::fs::create_dir_all(
607 std::path::Path::new(&socket_path).parent().unwrap(),
608 );
609 let _ = std::fs::remove_file(&socket_path);
610 SocketCleanup(std::path::PathBuf::from(&socket_path))
611 };
612 let listener = ListenerOptions::new()
613 .name(name)
614 .create_sync()
615 .expect("ListenerOptions create_sync");
616 tx.send(()).expect("send listener-ready signal");
617 let mut stream = listener.accept().expect("accept");
618 let _ = read_frame(&mut stream).expect("read Hello frame");
619 let reply = HelloReply { result: None };
620 let mut body = Vec::with_capacity(reply.encoded_len());
621 reply.encode(&mut body).expect("encode HelloReply");
622 write_frame(&mut stream, &body).expect("write HelloReply frame");
623 });
624 rx
625 }
626
627 #[test]
628 fn connect_rejects_hello_reply_with_missing_result_oneof() {
629 let program = "client-v2-missing-result";
630 let sid = user_sid_hash().expect("user_sid_hash");
631 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
632 let socket_path = resolve_socket_path(&pipe_name);
633 let ready = spawn_missing_result_broker(socket_path);
634 ready
635 .recv_timeout(Duration::from_secs(2))
636 .expect("missing-result broker listening");
637 let start = Instant::now();
638 let err = loop {
639 match connect(program, "0.0.0") {
640 Err(e) => break e,
641 Ok(_) if start.elapsed() < Duration::from_secs(2) => {
642 thread::sleep(Duration::from_millis(50));
643 continue;
644 }
645 Ok(_) => panic!("expected MissingResult, got Ok"),
646 }
647 };
648 assert!(
649 matches!(err, BrokerV2Error::MissingResult),
650 "expected MissingResult, got: {err:?}"
651 );
652 }
653
654 fn spawn_drop_on_accept_broker(socket_path: String) -> mpsc::Receiver<()> {
659 let (tx, rx) = mpsc::channel();
660 thread::spawn(move || {
661 let name = wrap_socket_name(&socket_path).expect("wrap_socket_name");
662 #[cfg(unix)]
663 let _cleanup = {
664 let _ = std::fs::create_dir_all(
665 std::path::Path::new(&socket_path).parent().unwrap(),
666 );
667 let _ = std::fs::remove_file(&socket_path);
668 SocketCleanup(std::path::PathBuf::from(&socket_path))
669 };
670 let listener = ListenerOptions::new()
671 .name(name)
672 .create_sync()
673 .expect("ListenerOptions create_sync");
674 tx.send(()).expect("send listener-ready signal");
675 let stream = listener.accept().expect("accept");
676 drop(stream); });
678 rx
679 }
680
681 #[test]
682 fn connect_returns_err_on_premature_disconnect() {
683 let program = "client-v2-prem-disconnect";
684 let sid = user_sid_hash().expect("user_sid_hash");
685 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
686 let socket_path = resolve_socket_path(&pipe_name);
687 let ready = spawn_drop_on_accept_broker(socket_path);
688 ready
689 .recv_timeout(Duration::from_secs(2))
690 .expect("drop-on-accept broker listening");
691 let start = Instant::now();
692 let err = loop {
693 match connect_with_deadline(program, "0.0.0", Duration::from_millis(500)) {
694 Err(e) => break e,
695 Ok(_) if start.elapsed() < Duration::from_secs(2) => {
696 thread::sleep(Duration::from_millis(50));
697 continue;
698 }
699 Ok(_) => panic!("expected transport error, got Ok"),
700 }
701 };
702 match err {
706 BrokerV2Error::Framing(_)
707 | BrokerV2Error::Io(_)
708 | BrokerV2Error::Dial { .. } => {}
709 other => panic!("expected transport variant, got: {other:?}"),
710 }
711 assert!(
712 start.elapsed() < Duration::from_secs(2),
713 "must not hang past deadline; took {:?}",
714 start.elapsed()
715 );
716 }
717
718 #[test]
725 fn connect_rejects_invalid_program_names_before_dial() {
726 let too_long = "a".repeat(65);
727 for bad in [
728 "zccache\0evil",
729 "../etc/passwd",
730 r"a\b",
731 "Zccache",
732 "a b",
733 too_long.as_str(),
734 "",
735 ] {
736 let err = connect(bad, "0.0.0")
737 .expect_err(&format!("invalid program name {bad:?} must be rejected"));
738 assert!(
739 matches!(err, BrokerV2Error::PipeName(_)),
740 "expected PipeName for {bad:?}, got: {err:?}"
741 );
742 }
743 }
744
745 #[test]
749 fn refused_with_u64_max_retry_after_ms_round_trips() {
750 let program = "client-v2-refused-u64-max";
751 let sid = user_sid_hash().expect("user_sid_hash");
752 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
753 let socket_path = resolve_socket_path(&pipe_name);
754 let ready = spawn_refusing_broker(socket_path, u64::MAX);
755 ready
756 .recv_timeout(Duration::from_secs(2))
757 .expect("refusing broker listening");
758 let start = Instant::now();
759 let err = loop {
760 match connect(program, "0.0.0") {
761 Err(e) => break e,
762 Ok(_) if start.elapsed() < Duration::from_secs(2) => {
763 thread::sleep(Duration::from_millis(50));
764 continue;
765 }
766 Ok(_) => panic!("expected Refused, got Ok"),
767 }
768 };
769 match err {
770 BrokerV2Error::Refused {
771 retry_after_ms,
772 details,
773 ..
774 } => {
775 assert_eq!(retry_after_ms, u64::MAX);
776 assert_eq!(details.retry_after_ms, u64::MAX);
777 let _safe_duration = Duration::from_millis(retry_after_ms);
779 }
780 other => panic!("expected Refused, got: {other:?}"),
781 }
782 }
783
784 #[test]
785 fn refused_exposes_retry_after_ms_top_level() {
786 let program = "client-v2-refused-retry";
787 let sid = user_sid_hash().expect("user_sid_hash");
788 let pipe_name = v2_program_pipe(program, &sid, 0).expect("pipe name");
789 let socket_path = resolve_socket_path(&pipe_name);
790 let ready = spawn_refusing_broker(socket_path, 1234);
791 ready
792 .recv_timeout(Duration::from_secs(2))
793 .expect("refusing broker listening");
794 let start = Instant::now();
795 let err = loop {
796 match connect(program, "0.0.0") {
797 Err(e) => break e,
798 Ok(_) if start.elapsed() < Duration::from_secs(2) => {
799 thread::sleep(Duration::from_millis(50));
800 continue;
801 }
802 Ok(_) => panic!("expected Refused"),
803 }
804 };
805 match err {
806 BrokerV2Error::Refused {
807 retry_after_ms,
808 reason,
809 details,
810 } => {
811 assert_eq!(
812 retry_after_ms, 1234,
813 "retry hint must surface top-level (was: {retry_after_ms})"
814 );
815 assert_eq!(reason, "stub refusal");
816 assert_eq!(
817 details.retry_after_ms, 1234,
818 "details payload still carries the field for full diagnostics"
819 );
820 }
821 other => panic!("expected Refused, got: {other:?}"),
822 }
823 }
824}