1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex};
use super::channel::Channel;
use super::packet::{Packet, read_packet, write_packet};
pub struct Connection {
writer: Mutex<Box<dyn Write + Send>>,
/// Per-channel packet senders. The background reader thread dispatches
/// incoming packets to the appropriate channel's sender.
channel_senders: Mutex<HashMap<u32, Sender<Packet>>>,
next_channel_id: AtomicU32,
server_exited: AtomicBool,
}
impl Connection {
pub fn new(mut reader: Box<dyn Read + Send>, writer: Box<dyn Write + Send>) -> Arc<Self> {
let conn = Arc::new(Self {
writer: Mutex::new(writer),
channel_senders: Mutex::new(HashMap::new()),
// channel 0 is reserved for the control channel
next_channel_id: AtomicU32::new(1),
server_exited: AtomicBool::new(false),
});
// Background reader thread: reads all packets from the stream and
// dispatches them to the appropriate channel's receiver queue.
let conn_for_reader = Arc::clone(&conn);
std::thread::spawn(move || {
loop {
match read_packet(&mut reader) {
Ok(packet) => {
let senders = conn_for_reader.channel_senders.lock().unwrap();
if let Some(sender) = senders.get(&packet.channel) {
// If the receiver is dropped, the send fails — that's fine,
// the channel was closed.
let _ = sender.send(packet);
}
// Packets for unknown channels are silently dropped.
}
Err(_) => {
// Stream closed or error — mark server as exited and stop.
conn_for_reader.server_exited.store(true, Ordering::SeqCst);
// Drop all senders so any thread blocked on channel.recv()
// unblocks with RecvError instead of hanging forever.
conn_for_reader.channel_senders.lock().unwrap().clear();
break;
}
}
}
});
conn
}
pub fn control_channel(self: &Arc<Self>) -> Channel {
self.register_channel(0)
}
pub fn new_channel(self: &Arc<Self>) -> Channel {
let next = self.next_channel_id.fetch_add(1, Ordering::SeqCst);
// client channels use odd ids
let channel_id = (next << 1) | 1;
self.register_channel(channel_id)
}
pub fn connect_channel(self: &Arc<Self>, channel_id: u32) -> Channel {
self.register_channel(channel_id)
}
fn register_channel(self: &Arc<Self>, channel_id: u32) -> Channel {
let (tx, rx) = mpsc::channel();
let mut senders = self.channel_senders.lock().unwrap();
senders.insert(channel_id, tx);
// If the server already exited, the background reader's clear() either
// already ran (so our insert is orphaned) or is blocked on this lock
// (and will clear it). Remove the sender now so recv() unblocks
// immediately with RecvError.
if self.server_has_exited() {
senders.remove(&channel_id);
}
drop(senders);
Channel::new(channel_id, Arc::clone(self), rx)
}
pub fn unregister_channel(&self, channel_id: u32) {
self.channel_senders.lock().unwrap().remove(&channel_id);
}
// nocov start
pub fn mark_server_exited(&self) {
self.server_exited.store(true, Ordering::SeqCst);
// nocov end
}
pub fn server_has_exited(&self) -> bool {
self.server_exited.load(Ordering::SeqCst)
}
// nocov start
fn server_crashed_error() -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
// nocov end
super::SERVER_CRASHED_MESSAGE,
)
}
pub fn send_packet(&self, packet: &Packet) -> std::io::Result<()> {
let mut writer = self.writer.lock().unwrap();
match write_packet(&mut **writer, packet) {
Ok(()) => Ok(()),
Err(_) if self.server_has_exited() => Err(Self::server_crashed_error()), // nocov
Err(e) => Err(e), // nocov
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::net::UnixStream;
/// Verify that when the reader stream closes (simulating server crash),
/// channels unblock promptly instead of hanging forever.
#[test]
fn test_channel_unblocks_on_reader_close() {
// Create a connection whose reader returns EOF immediately.
// This simulates the server process dying.
let (_, write_end) = UnixStream::pair().unwrap();
let conn = Connection::new(Box::new(std::io::empty()), Box::new(write_end));
// Wait for the background reader to detect EOF
while !conn.server_has_exited() {
std::thread::yield_now();
}
// Channel created AFTER server exit must still unblock, not hang.
let mut channel = conn.new_channel();
let result = channel.receive_request();
assert!(result.is_err());
}
}