use std::io::{self, BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::thread::JoinHandle;
use std::time::Duration;
use serde_json::Value;
use socket2::{Domain, Protocol, Socket, Type};
use crate::debug::DebugError;
const CONTENT_LENGTH: &str = "Content-Length";
const MAX_CONTENT_LENGTH: usize = 16 * 1024 * 1024;
const POLL_INTERVAL: Duration = Duration::from_millis(5);
pub(crate) fn write_frame<W: Write>(out: &mut W, value: &Value) -> io::Result<()> {
let body = serde_json::to_vec(value)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
write!(out, "{CONTENT_LENGTH}: {}\r\n\r\n", body.len())?;
out.write_all(&body)?;
out.flush()
}
pub(crate) fn read_frame<R: BufRead>(reader: &mut R) -> io::Result<Option<Value>> {
let mut content_length: Option<usize> = None;
let mut saw_any_header_byte = false;
loop {
let mut line = String::new();
let n = reader.read_line(&mut line)?;
if n == 0 {
if saw_any_header_byte {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"EOF in the middle of a frame header block",
));
}
return Ok(None);
}
saw_any_header_byte = true;
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
break;
}
if let Some((name, value)) = trimmed.split_once(':')
&& name.trim().eq_ignore_ascii_case(CONTENT_LENGTH)
{
let parsed = value.trim().parse::<usize>().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid Content-Length value: {value:?}"),
)
})?;
content_length = Some(parsed);
}
}
let len = content_length.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"frame header block missing Content-Length",
)
})?;
if len > MAX_CONTENT_LENGTH {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Content-Length {len} exceeds the maximum {MAX_CONTENT_LENGTH}"),
));
}
let mut body = vec![0u8; len];
reader.read_exact(&mut body)?;
let text = String::from_utf8(body)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let value = serde_json::from_str::<Value>(&text)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(Some(value))
}
pub(crate) struct Transport {
inbound: Receiver<Value>,
outbound: Option<Sender<Value>>,
handle: Option<JoinHandle<()>>,
local_addr: Option<SocketAddr>,
shutdown: Arc<AtomicBool>,
}
impl Transport {
pub(crate) fn start(listen: Option<SocketAddr>) -> Result<Self, DebugError> {
let Some(addr) = listen else {
let (_dead_tx, inbound) = std::sync::mpsc::channel::<Value>();
return Ok(Self {
inbound,
outbound: None,
handle: None,
local_addr: None,
shutdown: Arc::new(AtomicBool::new(false)),
});
};
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
.map_err(DebugError::Bind)?;
socket.set_reuse_address(true).map_err(DebugError::Bind)?; socket.bind(&addr.into()).map_err(DebugError::Bind)?;
socket.listen(1).map_err(DebugError::Bind)?; let listener = TcpListener::from(socket);
let local_addr = listener.local_addr().map_err(DebugError::Bind)?;
listener.set_nonblocking(true).map_err(DebugError::Bind)?;
let (in_tx, in_rx) = std::sync::mpsc::channel::<Value>();
let (out_tx, out_rx) = std::sync::mpsc::channel::<Value>();
let shutdown = Arc::new(AtomicBool::new(false));
let serve_shutdown = Arc::clone(&shutdown);
let handle = std::thread::spawn(move || {
serve(listener, in_tx, out_rx, serve_shutdown);
});
Ok(Self {
inbound: in_rx,
outbound: Some(out_tx),
handle: Some(handle),
local_addr: Some(local_addr),
shutdown,
})
}
pub(crate) fn local_addr(&self) -> Option<SocketAddr> {
self.local_addr
}
pub(crate) fn inbound(&self) -> &Receiver<Value> {
&self.inbound
}
pub(crate) fn send(&self, value: Value) -> Result<(), DebugError> {
match &self.outbound {
Some(tx) => tx.send(value).map_err(|_| DebugError::Disconnected),
None => Err(DebugError::Disconnected),
}
}
#[allow(dead_code)] pub(crate) fn shutdown(&mut self) {
self.shutdown.store(true, Ordering::Release);
self.outbound = None;
}
#[allow(dead_code)] pub(crate) fn join(&mut self) {
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
#[cfg(test)]
fn signal_shutdown_flag_only(&self) {
self.shutdown.store(true, Ordering::Release);
}
}
impl Drop for Transport {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
self.outbound = None;
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
fn serve(
listener: TcpListener,
in_tx: Sender<Value>,
out_rx: Receiver<Value>,
shutdown: Arc<AtomicBool>,
) {
let stream = loop {
if shutdown.load(Ordering::Acquire) {
return;
}
match listener.accept() {
Ok((s, _peer)) => {
if s.set_nonblocking(false).is_err() {
return;
}
drop(listener);
break s;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
std::thread::sleep(POLL_INTERVAL);
continue;
}
Err(_) => return,
}
};
let write_half = match stream.try_clone() {
Ok(w) => w,
Err(_) => return,
};
if stream.set_read_timeout(Some(POLL_INTERVAL)).is_err() {
return;
}
let reader_shutdown = Arc::clone(&shutdown);
let reader_handle = std::thread::spawn(move || {
let mut reader = BufReader::new(stream);
loop {
if reader_shutdown.load(Ordering::Acquire) {
return;
}
match reader.fill_buf() {
Ok([]) => return, Ok(_) => {} Err(e)
if e.kind() == io::ErrorKind::WouldBlock
|| e.kind() == io::ErrorKind::TimedOut =>
{
continue;
}
Err(_) => return,
}
if reader.get_ref().set_read_timeout(None).is_err() {
return;
}
let parsed = read_frame(&mut reader);
if reader.get_ref().set_read_timeout(Some(POLL_INTERVAL)).is_err() {
return;
}
match parsed {
Ok(Some(value)) => {
if in_tx.send(value).is_err() {
return;
}
}
Ok(None) => return,
Err(_) => return,
}
}
});
let mut writer = write_half;
'writer: loop {
if shutdown.load(Ordering::Acquire) {
while let Ok(value) = out_rx.try_recv() {
if write_frame(&mut writer, &value).is_err() {
break;
}
}
break;
}
match out_rx.recv_timeout(POLL_INTERVAL) {
Ok(value) => {
if write_frame(&mut writer, &value).is_err() {
break 'writer;
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break 'writer,
}
}
let _ = writer.shutdown(std::net::Shutdown::Both);
let _ = reader_handle.join();
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use std::net::TcpStream;
use std::sync::mpsc::RecvTimeoutError;
use std::time::Duration;
use serde_json::json;
const WATCHDOG: Duration = Duration::from_secs(10);
fn connect_client(addr: SocketAddr) -> io::Result<TcpStream> {
TcpStream::connect(addr)
}
#[test]
fn frame_round_trip_ascii() {
let value = json!({
"seq": 1,
"type": "request",
"command": "initialize",
"arguments": { "adapterID": "pasta" }
});
let mut buf: Vec<u8> = Vec::new();
write_frame(&mut buf, &value).expect("write_frame must succeed");
let mut reader = Cursor::new(buf);
let read = read_frame(&mut reader)
.expect("read_frame must succeed")
.expect("a frame must be present");
assert_eq!(read, value, "round-trip must preserve the JSON value");
}
#[test]
fn content_length_is_byte_length_not_char_count() {
let payload = "こんにちは";
assert_eq!(payload.chars().count(), 5);
assert_eq!(payload.len(), 15);
let value = json!({ "text": payload });
let mut buf: Vec<u8> = Vec::new();
write_frame(&mut buf, &value).expect("write must succeed");
let text = String::from_utf8(buf.clone()).expect("frame is UTF-8");
let body = serde_json::to_vec(&value).unwrap();
let expected_header = format!("Content-Length: {}\r\n\r\n", body.len());
assert!(
text.starts_with(&expected_header),
"header must report the BYTE length ({}), got frame starting: {:?}",
body.len(),
&text[..expected_header.len().min(text.len())]
);
let mut reader = Cursor::new(buf);
let read = read_frame(&mut reader)
.expect("read must succeed")
.expect("a frame must be present");
assert_eq!(read, value, "multi-byte body must round-trip intact");
assert_eq!(read["text"], json!(payload));
}
#[test]
fn read_frame_tolerates_extra_and_reordered_headers() {
let body = br#"{"ok":true}"#;
let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(b"X-Extra: hello\r\n");
frame.extend_from_slice(format!("content-length: {}\r\n", body.len()).as_bytes());
frame.extend_from_slice(b"X-Another: world\r\n");
frame.extend_from_slice(b"\r\n");
frame.extend_from_slice(body);
let mut reader = Cursor::new(frame);
let read = read_frame(&mut reader)
.expect("read must succeed with extra/reordered headers")
.expect("a frame must be present");
assert_eq!(read, json!({ "ok": true }));
}
#[test]
fn read_frame_reads_exactly_n_bytes_and_leaves_the_next_frame() {
let first = json!({ "a": 1 });
let second = json!({ "b": 2 });
let mut buf: Vec<u8> = Vec::new();
write_frame(&mut buf, &first).unwrap();
write_frame(&mut buf, &second).unwrap();
let mut reader = Cursor::new(buf);
let r1 = read_frame(&mut reader).unwrap().expect("first frame");
assert_eq!(r1, first, "first frame parsed");
let r2 = read_frame(&mut reader).unwrap().expect("second frame");
assert_eq!(r2, second, "second frame intact (no over-read of the first)");
assert!(
read_frame(&mut reader).unwrap().is_none(),
"clean EOF between frames yields Ok(None)"
);
}
#[test]
fn read_frame_missing_content_length_is_error() {
let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(b"X-Only: nope\r\n\r\n");
frame.extend_from_slice(br#"{"x":1}"#);
let mut reader = Cursor::new(frame);
let err = read_frame(&mut reader).expect_err("missing Content-Length must error");
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn read_frame_truncated_body_is_error() {
let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(b"Content-Length: 20\r\n\r\n");
frame.extend_from_slice(br#"{"x":1}"#);
let mut reader = Cursor::new(frame);
let err = read_frame(&mut reader).expect_err("truncated body must error");
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
}
#[test]
fn read_frame_non_numeric_content_length_is_error() {
let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(b"Content-Length: abc\r\n\r\n");
frame.extend_from_slice(br#"{"x":1}"#);
let mut reader = Cursor::new(frame);
let err = read_frame(&mut reader).expect_err("non-numeric Content-Length must error");
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(
err.to_string().contains("invalid Content-Length"),
"error should name the bad header, got: {err}"
);
}
#[test]
fn read_frame_eof_mid_header_is_unexpected_eof() {
let frame: Vec<u8> = b"Content-Length: 7\r\n".to_vec();
let mut reader = Cursor::new(frame);
let err = read_frame(&mut reader).expect_err("EOF mid-header must error");
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
assert!(
err.to_string().contains("header"),
"error should mention the header block, got: {err}"
);
}
#[test]
fn read_frame_non_utf8_body_is_error() {
let body: &[u8] = &[0xFF, 0xFE, 0xFD, 0xFC]; let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes());
frame.extend_from_slice(body);
let mut reader = Cursor::new(frame);
let err = read_frame(&mut reader).expect_err("non-UTF-8 body must error");
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn read_frame_invalid_json_body_is_error() {
let body = b"not json at all";
let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes());
frame.extend_from_slice(body);
let mut reader = Cursor::new(frame);
let err = read_frame(&mut reader).expect_err("invalid JSON body must error");
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
let mut zero: Vec<u8> = b"Content-Length: 0\r\n\r\n".to_vec();
write_frame(&mut zero, &json!({"after": true})).unwrap();
let mut reader = Cursor::new(zero);
let err = read_frame(&mut reader).expect_err("zero-length body must error");
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn read_frame_rejects_oversized_content_length_before_allocating() {
let mut over: Vec<u8> = Vec::new();
over.extend_from_slice(
format!("Content-Length: {}\r\n\r\n", MAX_CONTENT_LENGTH + 1).as_bytes(),
);
let mut reader = Cursor::new(over);
let err = read_frame(&mut reader).expect_err("oversized Content-Length must error");
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(
err.to_string().contains("exceeds"),
"error should name the exceeded cap, got: {err}"
);
let mut at: Vec<u8> = Vec::new();
at.extend_from_slice(format!("Content-Length: {MAX_CONTENT_LENGTH}\r\n\r\n").as_bytes());
let mut reader = Cursor::new(at);
let err = read_frame(&mut reader).expect_err("missing body must error");
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof, "at-limit passes the guard");
}
#[test]
fn read_frame_tolerates_bare_lf_header_endings() {
let body = br#"{"lf":true}"#;
let mut frame: Vec<u8> = Vec::new();
frame.extend_from_slice(format!("Content-Length: {}\n", body.len()).as_bytes());
frame.extend_from_slice(b"\n");
frame.extend_from_slice(body);
let mut reader = Cursor::new(frame);
let read = read_frame(&mut reader)
.expect("bare-LF headers must parse")
.expect("a frame must be present");
assert_eq!(read, json!({ "lf": true }));
}
#[test]
fn disabled_listen_none_opens_no_port() {
let transport = Transport::start(None).expect("disabled start must succeed");
assert!(
transport.local_addr().is_none(),
"disabled transport must not bind a port (R5.5)"
);
match transport.inbound().recv_timeout(Duration::from_millis(50)) {
Err(RecvTimeoutError::Disconnected) => {}
other => panic!("disabled inbound must be a closed channel, got {other:?}"),
}
assert!(
matches!(transport.send(json!({"x":1})), Err(DebugError::Disconnected)),
"disabled send must report Disconnected (no socket)"
);
}
#[test]
fn disabled_join_and_shutdown_are_noops() {
let mut transport = Transport::start(None).expect("disabled start must succeed");
transport.join(); transport.shutdown();
transport.shutdown(); transport.join(); assert!(transport.local_addr().is_none());
}
#[test]
fn send_after_shutdown_reports_disconnected() {
let mut transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
assert!(transport.local_addr().is_some(), "enabled transport binds");
transport.shutdown();
assert!(
matches!(transport.send(json!({"x":1})), Err(DebugError::Disconnected)),
"send after shutdown must report Disconnected"
);
transport.shutdown();
let _ = connect_client(transport.local_addr().unwrap());
join_transport_with_watchdog(transport, WATCHDOG);
}
#[test]
fn enabled_socket2_path_binds_and_exposes_local_addr() {
let mut transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("socket2 bind must succeed");
let addr = transport
.local_addr()
.expect("socket2-built enabled transport must expose its bound addr (R3.1)");
assert_eq!(addr.ip().to_string(), "127.0.0.1");
assert_ne!(addr.port(), 0, "OS must assign a concrete port via the socket2 listener");
transport.shutdown();
let _ = connect_client(addr);
join_transport_with_watchdog(transport, WATCHDOG);
}
#[test]
fn enabled_round_trips_framed_json_both_directions() {
let transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
let addr = transport
.local_addr()
.expect("enabled transport must expose its bound addr (R3.1)");
assert_eq!(addr.ip().to_string(), "127.0.0.1");
assert_ne!(addr.port(), 0, "OS must assign a concrete port");
let client = connect_client(addr).expect("client connect must succeed");
client
.set_read_timeout(Some(WATCHDOG))
.expect("TEST-ONLY read timeout");
let client_write = client.try_clone().expect("clone client");
let mut client_reader = BufReader::new(client);
let mut client_writer = client_write;
let request = json!({ "seq": 7, "command": "setBreakpoints" });
write_frame(&mut client_writer, &request).expect("client write must succeed");
let delivered = transport
.inbound()
.recv_timeout(WATCHDOG)
.expect("transport must deliver the inbound frame (R3.1)");
assert_eq!(delivered, request, "inbound JSON must match what the client sent");
let response = json!({ "seq": 7, "type": "response", "success": true });
transport.send(response.clone()).expect("transport send must succeed");
let received = read_frame(&mut client_reader)
.expect("client read must succeed")
.expect("client must receive a frame");
assert_eq!(received, response, "outbound JSON must match what the transport sent");
drop(client_reader);
drop(client_writer);
let mut transport = transport;
transport.shutdown();
join_transport_with_watchdog(transport, WATCHDOG);
}
#[test]
fn transport_carries_raw_values_without_interpreting_dap() {
let transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
let addr = transport.local_addr().unwrap();
let client = connect_client(addr).expect("connect");
client.set_read_timeout(Some(WATCHDOG)).unwrap();
let mut client_writer = client.try_clone().unwrap();
let weird = json!([1, "two", { "three": [true, null] }, "日本語"]);
write_frame(&mut client_writer, &weird).expect("client write");
let delivered = transport
.inbound()
.recv_timeout(WATCHDOG)
.expect("inbound frame delivered");
assert_eq!(delivered, weird, "raw value must pass through uninterpreted");
drop(client);
let mut transport = transport;
transport.shutdown();
join_transport_with_watchdog(transport, WATCHDOG);
}
#[test]
fn client_disconnect_winds_down_without_hang() {
let transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
let addr = transport.local_addr().unwrap();
let client = connect_client(addr).expect("connect");
drop(client);
match transport.inbound().recv_timeout(WATCHDOG) {
Err(RecvTimeoutError::Disconnected) => {} Err(RecvTimeoutError::Timeout) => {
panic!("inbound did not close after client disconnect (hang?)")
}
Ok(v) => panic!("unexpected inbound frame after disconnect: {v:?}"),
}
let mut transport = transport;
transport.shutdown();
join_transport_with_watchdog(transport, WATCHDOG);
}
#[test]
fn no_client_shutdown_unblocks_serve() {
let mut transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
assert!(transport.local_addr().is_some(), "enabled transport binds");
transport.shutdown();
join_transport_with_watchdog(transport, WATCHDOG);
}
#[test]
fn connected_writer_honors_shutdown_flag_while_outbound_alive() {
let mut transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
let addr = transport.local_addr().expect("enabled transport binds");
let client = connect_client(addr).expect("client connect must succeed");
client
.set_read_timeout(Some(WATCHDOG))
.expect("TEST-ONLY read timeout");
std::thread::sleep(Duration::from_millis(50));
transport.signal_shutdown_flag_only();
let handle = transport.handle.take().expect("enabled transport has a handle");
let (done_tx, done_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _ = done_tx.send(handle.join());
});
done_rx
.recv_timeout(WATCHDOG)
.expect("connected writer must break on the shutdown FLAG and serve() must return (R2.5)")
.expect("listener thread must not panic");
drop(client);
}
#[test]
fn connected_shutdown_tears_down_synchronously_with_client_alive() {
let transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
let addr = transport.local_addr().expect("enabled transport binds");
let client = connect_client(addr).expect("client connect must succeed");
client
.set_read_timeout(Some(WATCHDOG))
.expect("TEST-ONLY read timeout");
let mut client_writer = client.try_clone().expect("clone client");
let mut client_reader = BufReader::new(client.try_clone().expect("clone client"));
write_frame(&mut client_writer, &json!({ "seq": 1, "command": "ping" }))
.expect("client write");
let delivered = transport
.inbound()
.recv_timeout(WATCHDOG)
.expect("inbound frame delivered");
assert_eq!(delivered["command"], json!("ping"));
transport.send(json!({ "seq": 1, "type": "response" })).expect("send");
let received = read_frame(&mut client_reader)
.expect("client read")
.expect("client frame");
assert_eq!(received["type"], json!("response"));
let mut transport = transport;
transport.shutdown();
let handle = transport.handle.take().expect("enabled transport has a handle");
let (done_tx, done_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _ = done_tx.send(handle.join());
});
done_rx
.recv_timeout(WATCHDOG)
.expect("connected shutdown() must tear down synchronously (R2.5)")
.expect("listener thread must not panic");
drop(client_writer);
drop(client_reader);
drop(client);
}
#[test]
fn drop_synchronously_frees_port_for_plain_rebind() {
use std::net::TcpListener as PlainTcpListener;
let transport =
Transport::start(Some("127.0.0.1:0".parse().unwrap())).expect("bind must succeed");
let addr = transport
.local_addr()
.expect("enabled transport must expose its bound addr");
let port = addr.port();
assert_ne!(port, 0, "OS must assign a concrete port");
drop(transport);
let rebind = PlainTcpListener::bind(("127.0.0.1", port));
assert!(
rebind.is_ok(),
"plain rebind of port {port} immediately after drop must succeed \
(synchronous serve join freed the port); got: {:?}",
rebind.err()
);
}
#[test]
fn repeated_teardown_rebind_same_port_succeeds() {
let port = {
let throwaway = Transport::start(Some("127.0.0.1:0".parse().unwrap()))
.expect("throwaway bind must succeed");
let p = throwaway
.local_addr()
.expect("enabled transport must expose its bound addr")
.port();
assert_ne!(p, 0, "OS must assign a concrete port");
drop(throwaway); p
};
let fixed: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
for cycle in 0..3 {
let mut transport = Transport::start(Some(fixed)).unwrap_or_else(|e| {
panic!(
"cycle {cycle}: rebind of fixed port {port} must succeed \
(prior cycle's listener was synchronously joined); got {e:?}"
)
});
assert_eq!(
transport.local_addr().map(|a| a.port()),
Some(port),
"cycle {cycle}: must rebind the SAME fixed port",
);
let handle = transport.handle.take();
transport.shutdown.store(true, Ordering::Release);
transport.outbound = None;
if let Some(handle) = handle {
let (done_tx, done_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _ = done_tx.send(handle.join());
});
done_rx
.recv_timeout(WATCHDOG)
.unwrap_or_else(|_| panic!(
"cycle {cycle}: listener must wind down within the watchdog \
(no hang) so the port is freed for the next rebind"
))
.unwrap_or_else(|_| panic!("cycle {cycle}: listener thread must not panic"));
}
}
}
fn join_transport_with_watchdog(mut transport: Transport, timeout: Duration) {
let handle = transport.handle.take();
transport.shutdown.store(true, Ordering::Release);
transport.outbound = None;
if let Some(handle) = handle {
let (done_tx, done_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _ = done_tx.send(handle.join());
});
done_rx
.recv_timeout(timeout)
.expect("listener thread must wind down within the watchdog (no hang)")
.expect("listener thread must not panic");
}
}
}