Skip to main content

amqp_dds_endpoint/
server.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Multi-Connection-Accept-Loop mit thread-per-connection.
5//!
6//! Spec dds-amqp-1.0 §2.1 Cl. 2 — Server-Side, akzeptiert
7//! eingehende AMQP-1.0-Connections.
8
9use std::io;
10use std::net::{TcpListener, TcpStream, ToSocketAddrs};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::thread;
14use std::time::Duration;
15
16use zerodds_amqp_endpoint::MetricsHub;
17
18use crate::handler::{HandlerConfig, handle_connection};
19
20// Re-export fuer einheitlichen Public-Surface.
21pub use crate::frame_io::AmqpProtocol;
22
23/// Server-Konfiguration fuer `run_server`.
24#[derive(Debug, Clone)]
25pub struct ServerConfig {
26    /// Listen-Address (z.B. `"0.0.0.0:5672"`).
27    pub listen_addr: String,
28    /// Container-Id, die der Server in Open-Frames meldet.
29    pub container_id: String,
30    /// Max Frame-Size (DoS-Cap).
31    pub max_frame_size: u32,
32    /// Ist TLS aktiv? (Beeinflusst SASL-PLAIN-Akzeptanz §10.2.1.)
33    pub tls_active: bool,
34    /// Read-Timeout pro Connection (None = unlimited; im Daemon
35    /// typisch 60s = idle-timeout).
36    pub read_timeout: Option<Duration>,
37    /// Per-Connection Write-Timeout.
38    pub write_timeout: Option<Duration>,
39}
40
41impl ServerConfig {
42    /// Default-Konfiguration fuer `0.0.0.0:5672`.
43    #[must_use]
44    pub fn default_listen() -> Self {
45        Self {
46            listen_addr: "0.0.0.0:5672".to_string(),
47            container_id: "zerodds-amqp-endpoint".to_string(),
48            max_frame_size: 1_048_576,
49            tls_active: false,
50            read_timeout: Some(Duration::from_secs(60)),
51            write_timeout: Some(Duration::from_secs(60)),
52        }
53    }
54}
55
56/// Server-Fehler.
57#[derive(Debug)]
58pub enum ServerError {
59    /// Bind-Fehler.
60    Bind(io::Error),
61    /// Allgemeiner IO-Fehler.
62    Io(io::Error),
63}
64
65impl core::fmt::Display for ServerError {
66    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
67        match self {
68            Self::Bind(e) => write!(f, "bind error: {e}"),
69            Self::Io(e) => write!(f, "io error: {e}"),
70        }
71    }
72}
73
74impl std::error::Error for ServerError {}
75
76/// Spec §2.1 Cl. 2 — TCP-Listener-Loop.
77///
78/// Akzeptiert blocking auf `listen_addr`, spawnt pro
79/// akzeptierter Connection einen Thread, der
80/// `handle_connection` durchlaeuft.
81///
82/// `shutdown_signal` ist ein Atomic-Flag, das von aussen auf
83/// `true` gesetzt werden kann — der Accept-Loop pollt es
84/// periodisch (per `set_nonblocking` + sleep) und beendet
85/// sauber.
86///
87/// `metrics` ist der prozess-globale Counter-Hub, der pro
88/// Connection und pro Frame inkrementiert wird.
89///
90/// # Errors
91/// `Bind` wenn der Listener nicht binden kann; `Io` bei
92/// schweren Accept-Fehlern.
93pub fn run_server(
94    cfg: ServerConfig,
95    metrics: Arc<MetricsHub>,
96    shutdown_signal: Arc<AtomicBool>,
97) -> Result<(), ServerError> {
98    let listener = bind_listener(&cfg.listen_addr).map_err(ServerError::Bind)?;
99    listener.set_nonblocking(true).map_err(ServerError::Io)?;
100
101    eprintln!(
102        "amqp-dds-endpoint listening on {} (container_id={}, max_frame_size={})",
103        cfg.listen_addr, cfg.container_id, cfg.max_frame_size
104    );
105
106    while !shutdown_signal.load(Ordering::Relaxed) {
107        match listener.accept() {
108            Ok((stream, peer)) => {
109                let cfg = cfg.clone();
110                let metrics = metrics.clone();
111                let _ = thread::Builder::new()
112                    .name(format!("amqp-conn-{peer}"))
113                    .spawn(move || {
114                        if let Err(e) = serve_one(stream, &cfg, &metrics) {
115                            eprintln!("connection from {peer} ended: {e}");
116                        }
117                    });
118            }
119            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
120                thread::sleep(Duration::from_millis(50));
121            }
122            Err(e) => {
123                eprintln!("accept error: {e}");
124                return Err(ServerError::Io(e));
125            }
126        }
127    }
128    eprintln!("amqp-dds-endpoint shutting down");
129    Ok(())
130}
131
132fn bind_listener<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
133    TcpListener::bind(addr)
134}
135
136fn serve_one(
137    mut stream: TcpStream,
138    cfg: &ServerConfig,
139    metrics: &Arc<MetricsHub>,
140) -> Result<(), Box<dyn std::error::Error>> {
141    if let Some(t) = cfg.read_timeout {
142        stream.set_read_timeout(Some(t))?;
143    }
144    if let Some(t) = cfg.write_timeout {
145        stream.set_write_timeout(Some(t))?;
146    }
147    let mut handler_cfg = HandlerConfig::for_tests(metrics.clone());
148    handler_cfg.container_id = cfg.container_id.clone();
149    handler_cfg.max_frame_size = cfg.max_frame_size;
150    handler_cfg.tls_active = cfg.tls_active;
151    handle_connection(&mut stream, &handler_cfg)?;
152    Ok(())
153}
154
155#[cfg(test)]
156#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
157mod tests {
158    use super::*;
159    use std::io::Write;
160    use std::net::TcpStream;
161
162    #[test]
163    fn server_config_default_has_sensible_values() {
164        let c = ServerConfig::default_listen();
165        assert!(c.listen_addr.ends_with(":5672"));
166        assert!(c.max_frame_size >= 65_536);
167        assert!(!c.tls_active);
168        assert!(c.read_timeout.is_some());
169    }
170
171    /// E2E: Server bindet auf 127.0.0.1:0 (zufaelliger Port),
172    /// Klient connectet, schickt AMQP-Header + Close.
173    #[test]
174    fn server_accepts_connection_and_handles_open_close() {
175        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
176        let port = listener.local_addr().unwrap().port();
177        listener.set_nonblocking(true).unwrap();
178        let metrics = Arc::new(MetricsHub::new());
179        let shutdown = Arc::new(AtomicBool::new(false));
180
181        let cfg = ServerConfig {
182            listen_addr: format!("127.0.0.1:{port}"),
183            container_id: "test-server".into(),
184            max_frame_size: 65_536,
185            tls_active: false,
186            read_timeout: Some(Duration::from_secs(2)),
187            write_timeout: Some(Duration::from_secs(2)),
188        };
189
190        // Listener wieder freigeben — der Test-Server bindet
191        // ihn neu in einem Thread.
192        drop(listener);
193
194        let server_metrics = metrics.clone();
195        let server_shutdown = shutdown.clone();
196        let server_thread = thread::spawn(move || {
197            let _ = run_server(cfg, server_metrics, server_shutdown);
198        });
199
200        // Bisschen warten bis der Server bindet.
201        thread::sleep(Duration::from_millis(100));
202
203        // Klient connectet.
204        let mut client = TcpStream::connect(format!("127.0.0.1:{port}")).unwrap();
205        client
206            .set_read_timeout(Some(Duration::from_secs(2)))
207            .unwrap();
208        client
209            .set_write_timeout(Some(Duration::from_secs(2)))
210            .unwrap();
211
212        // AMQP-Protocol-Header senden.
213        client.write_all(&AmqpProtocol::Amqp.as_bytes()).unwrap();
214
215        // Open-Performative + Close-Performative senden.
216        let open = zerodds_amqp_bridge::performatives::open("client").unwrap();
217        let h = zerodds_amqp_bridge::frame::FrameHeader {
218            size: 8 + open.len() as u32,
219            doff: 2,
220            frame_type: zerodds_amqp_bridge::frame::FrameType::Amqp,
221            channel: 0,
222        };
223        client
224            .write_all(&zerodds_amqp_bridge::frame::encode_frame_header(h))
225            .unwrap();
226        client.write_all(&open).unwrap();
227
228        let close = zerodds_amqp_bridge::performatives::close().unwrap();
229        let h = zerodds_amqp_bridge::frame::FrameHeader {
230            size: 8 + close.len() as u32,
231            doff: 2,
232            frame_type: zerodds_amqp_bridge::frame::FrameType::Amqp,
233            channel: 0,
234        };
235        client
236            .write_all(&zerodds_amqp_bridge::frame::encode_frame_header(h))
237            .unwrap();
238        client.write_all(&close).unwrap();
239
240        // Server schickt: AMQP-Header + Open-Reply + Close-Reply.
241        let mut buf = [0u8; 8];
242        std::io::Read::read_exact(&mut client, &mut buf).unwrap();
243        assert_eq!(&buf[0..4], b"AMQP");
244
245        // Wir lesen nicht alle Server-Frames detailliert; uns
246        // reicht: Server hat geantwortet + Stats sind hochgezaehlt.
247        // Klient schliesst.
248        drop(client);
249
250        // Bisschen warten bis Server-Side fertig ist.
251        thread::sleep(Duration::from_millis(200));
252
253        assert_eq!(metrics.snapshot("connections.total"), Some(1));
254
255        // Shutdown.
256        shutdown.store(true, Ordering::Relaxed);
257        server_thread.join().unwrap();
258    }
259}