amqp_dds_endpoint/
server.rs1use std::io;
10use std::net::{TcpListener, TcpStream, ToSocketAddrs};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::thread;
14use std::time::Duration;
15
16use zerodds_amqp_endpoint::MetricsHub;
17
18use crate::handler::{HandlerConfig, handle_connection};
19
20pub use crate::frame_io::AmqpProtocol;
22
23#[derive(Debug, Clone)]
25pub struct ServerConfig {
26 pub listen_addr: String,
28 pub container_id: String,
30 pub max_frame_size: u32,
32 pub tls_active: bool,
34 pub read_timeout: Option<Duration>,
37 pub write_timeout: Option<Duration>,
39}
40
41impl ServerConfig {
42 #[must_use]
44 pub fn default_listen() -> Self {
45 Self {
46 listen_addr: "0.0.0.0:5672".to_string(),
47 container_id: "zerodds-amqp-endpoint".to_string(),
48 max_frame_size: 1_048_576,
49 tls_active: false,
50 read_timeout: Some(Duration::from_secs(60)),
51 write_timeout: Some(Duration::from_secs(60)),
52 }
53 }
54}
55
56#[derive(Debug)]
58pub enum ServerError {
59 Bind(io::Error),
61 Io(io::Error),
63}
64
65impl core::fmt::Display for ServerError {
66 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
67 match self {
68 Self::Bind(e) => write!(f, "bind error: {e}"),
69 Self::Io(e) => write!(f, "io error: {e}"),
70 }
71 }
72}
73
74impl std::error::Error for ServerError {}
75
76pub fn run_server(
94 cfg: ServerConfig,
95 metrics: Arc<MetricsHub>,
96 shutdown_signal: Arc<AtomicBool>,
97) -> Result<(), ServerError> {
98 let listener = bind_listener(&cfg.listen_addr).map_err(ServerError::Bind)?;
99 listener.set_nonblocking(true).map_err(ServerError::Io)?;
100
101 eprintln!(
102 "amqp-dds-endpoint listening on {} (container_id={}, max_frame_size={})",
103 cfg.listen_addr, cfg.container_id, cfg.max_frame_size
104 );
105
106 while !shutdown_signal.load(Ordering::Relaxed) {
107 match listener.accept() {
108 Ok((stream, peer)) => {
109 let cfg = cfg.clone();
110 let metrics = metrics.clone();
111 let _ = thread::Builder::new()
112 .name(format!("amqp-conn-{peer}"))
113 .spawn(move || {
114 if let Err(e) = serve_one(stream, &cfg, &metrics) {
115 eprintln!("connection from {peer} ended: {e}");
116 }
117 });
118 }
119 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
120 thread::sleep(Duration::from_millis(50));
121 }
122 Err(e) => {
123 eprintln!("accept error: {e}");
124 return Err(ServerError::Io(e));
125 }
126 }
127 }
128 eprintln!("amqp-dds-endpoint shutting down");
129 Ok(())
130}
131
132fn bind_listener<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
133 TcpListener::bind(addr)
134}
135
136fn serve_one(
137 mut stream: TcpStream,
138 cfg: &ServerConfig,
139 metrics: &Arc<MetricsHub>,
140) -> Result<(), Box<dyn std::error::Error>> {
141 if let Some(t) = cfg.read_timeout {
142 stream.set_read_timeout(Some(t))?;
143 }
144 if let Some(t) = cfg.write_timeout {
145 stream.set_write_timeout(Some(t))?;
146 }
147 let mut handler_cfg = HandlerConfig::for_tests(metrics.clone());
148 handler_cfg.container_id = cfg.container_id.clone();
149 handler_cfg.max_frame_size = cfg.max_frame_size;
150 handler_cfg.tls_active = cfg.tls_active;
151 handle_connection(&mut stream, &handler_cfg)?;
152 Ok(())
153}
154
155#[cfg(test)]
156#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
157mod tests {
158 use super::*;
159 use std::io::Write;
160 use std::net::TcpStream;
161
162 #[test]
163 fn server_config_default_has_sensible_values() {
164 let c = ServerConfig::default_listen();
165 assert!(c.listen_addr.ends_with(":5672"));
166 assert!(c.max_frame_size >= 65_536);
167 assert!(!c.tls_active);
168 assert!(c.read_timeout.is_some());
169 }
170
171 #[test]
174 fn server_accepts_connection_and_handles_open_close() {
175 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
176 let port = listener.local_addr().unwrap().port();
177 listener.set_nonblocking(true).unwrap();
178 let metrics = Arc::new(MetricsHub::new());
179 let shutdown = Arc::new(AtomicBool::new(false));
180
181 let cfg = ServerConfig {
182 listen_addr: format!("127.0.0.1:{port}"),
183 container_id: "test-server".into(),
184 max_frame_size: 65_536,
185 tls_active: false,
186 read_timeout: Some(Duration::from_secs(2)),
187 write_timeout: Some(Duration::from_secs(2)),
188 };
189
190 drop(listener);
193
194 let server_metrics = metrics.clone();
195 let server_shutdown = shutdown.clone();
196 let server_thread = thread::spawn(move || {
197 let _ = run_server(cfg, server_metrics, server_shutdown);
198 });
199
200 thread::sleep(Duration::from_millis(100));
202
203 let mut client = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
205 client
206 .set_read_timeout(Some(Duration::from_secs(2)))
207 .unwrap();
208 client
209 .set_write_timeout(Some(Duration::from_secs(2)))
210 .unwrap();
211
212 client.write_all(&AmqpProtocol::Amqp.as_bytes()).unwrap();
214
215 let open = zerodds_amqp_bridge::performatives::open("client").unwrap();
217 let h = zerodds_amqp_bridge::frame::FrameHeader {
218 size: 8 + open.len() as u32,
219 doff: 2,
220 frame_type: zerodds_amqp_bridge::frame::FrameType::Amqp,
221 channel: 0,
222 };
223 client
224 .write_all(&zerodds_amqp_bridge::frame::encode_frame_header(h))
225 .unwrap();
226 client.write_all(&open).unwrap();
227
228 let close = zerodds_amqp_bridge::performatives::close().unwrap();
229 let h = zerodds_amqp_bridge::frame::FrameHeader {
230 size: 8 + close.len() as u32,
231 doff: 2,
232 frame_type: zerodds_amqp_bridge::frame::FrameType::Amqp,
233 channel: 0,
234 };
235 client
236 .write_all(&zerodds_amqp_bridge::frame::encode_frame_header(h))
237 .unwrap();
238 client.write_all(&close).unwrap();
239
240 let mut buf = [0u8; 8];
242 std::io::Read::read_exact(&mut client, &mut buf).unwrap();
243 assert_eq!(&buf[0..4], b"AMQP");
244
245 drop(client);
249
250 thread::sleep(Duration::from_millis(200));
252
253 assert_eq!(metrics.snapshot("connections.total"), Some(1));
254
255 shutdown.store(true, Ordering::Relaxed);
257 server_thread.join().unwrap();
258 }
259}