runtara_protocol/
server.rs

1// Copyright (C) 2025 SyncMyOrders Sp. z o.o.
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! QUIC server helpers for runtara-core.
4
5use std::net::SocketAddr;
6use std::sync::Arc;
7
8use quinn::{Endpoint, Incoming, RecvStream, SendStream, ServerConfig, TransportConfig};
9use thiserror::Error;
10use tracing::{debug, error, info, instrument, warn};
11
12use crate::frame::{Frame, FrameError, FramedStream, read_frame, write_frame};
13
14/// Errors that can occur in the QUIC server
15#[derive(Debug, Error)]
16pub enum ServerError {
17    #[error("bind error: {0}")]
18    Bind(#[from] std::io::Error),
19
20    #[error("connection error: {0}")]
21    Connection(#[from] quinn::ConnectionError),
22
23    #[error("frame error: {0}")]
24    Frame(#[from] FrameError),
25
26    #[error("TLS error: {0}")]
27    Tls(String),
28
29    #[error("server closed")]
30    Closed,
31}
32
33/// Configuration for the QUIC server
34#[derive(Debug, Clone)]
35pub struct RuntaraServerConfig {
36    /// Address to bind to
37    pub bind_addr: SocketAddr,
38    /// TLS certificate chain (PEM format)
39    pub cert_pem: Vec<u8>,
40    /// TLS private key (PEM format)
41    pub key_pem: Vec<u8>,
42    /// Maximum pending incoming connections (handshakes in progress)
43    pub max_incoming: u32,
44    /// Maximum concurrent bidirectional streams per connection
45    pub max_bi_streams: u32,
46    /// Maximum concurrent unidirectional streams per connection
47    pub max_uni_streams: u32,
48    /// Idle timeout in milliseconds
49    pub idle_timeout_ms: u64,
50    /// Server-side keep-alive interval in milliseconds (0 to disable)
51    pub keep_alive_interval_ms: u64,
52    /// UDP receive buffer size in bytes (0 for OS default)
53    pub udp_receive_buffer_size: usize,
54    /// UDP send buffer size in bytes (0 for OS default)
55    pub udp_send_buffer_size: usize,
56    /// Maximum concurrent connection handlers (0 for unlimited)
57    pub max_concurrent_handlers: u32,
58}
59
60impl Default for RuntaraServerConfig {
61    fn default() -> Self {
62        Self {
63            bind_addr: "0.0.0.0:7001".parse().unwrap(),
64            cert_pem: Vec::new(),
65            key_pem: Vec::new(),
66            max_incoming: 10_000,
67            max_bi_streams: 1_000,
68            max_uni_streams: 100,
69            idle_timeout_ms: 120_000,
70            keep_alive_interval_ms: 15_000,
71            udp_receive_buffer_size: 2 * 1024 * 1024, // 2MB
72            udp_send_buffer_size: 2 * 1024 * 1024,    // 2MB
73            max_concurrent_handlers: 0,               // unlimited by default
74        }
75    }
76}
77
78impl RuntaraServerConfig {
79    /// Create a configuration from environment variables with defaults.
80    ///
81    /// Environment variables:
82    /// - `RUNTARA_QUIC_MAX_INCOMING`: Max pending handshakes (default: 10000)
83    /// - `RUNTARA_QUIC_MAX_BI_STREAMS`: Max bidirectional streams per connection (default: 1000)
84    /// - `RUNTARA_QUIC_MAX_UNI_STREAMS`: Max unidirectional streams per connection (default: 100)
85    /// - `RUNTARA_QUIC_IDLE_TIMEOUT_MS`: Idle timeout in ms (default: 120000)
86    /// - `RUNTARA_QUIC_KEEP_ALIVE_MS`: Keep-alive interval in ms, 0 to disable (default: 15000)
87    /// - `RUNTARA_QUIC_UDP_RECV_BUFFER`: UDP receive buffer size in bytes (default: 2097152)
88    /// - `RUNTARA_QUIC_UDP_SEND_BUFFER`: UDP send buffer size in bytes (default: 2097152)
89    /// - `RUNTARA_QUIC_MAX_HANDLERS`: Max concurrent connection handlers, 0 for unlimited (default: 0)
90    pub fn from_env() -> Self {
91        let default = Self::default();
92
93        Self {
94            bind_addr: default.bind_addr,
95            cert_pem: default.cert_pem,
96            key_pem: default.key_pem,
97            max_incoming: std::env::var("RUNTARA_QUIC_MAX_INCOMING")
98                .ok()
99                .and_then(|v| v.parse().ok())
100                .unwrap_or(default.max_incoming),
101            max_bi_streams: std::env::var("RUNTARA_QUIC_MAX_BI_STREAMS")
102                .ok()
103                .and_then(|v| v.parse().ok())
104                .unwrap_or(default.max_bi_streams),
105            max_uni_streams: std::env::var("RUNTARA_QUIC_MAX_UNI_STREAMS")
106                .ok()
107                .and_then(|v| v.parse().ok())
108                .unwrap_or(default.max_uni_streams),
109            idle_timeout_ms: std::env::var("RUNTARA_QUIC_IDLE_TIMEOUT_MS")
110                .ok()
111                .and_then(|v| v.parse().ok())
112                .unwrap_or(default.idle_timeout_ms),
113            keep_alive_interval_ms: std::env::var("RUNTARA_QUIC_KEEP_ALIVE_MS")
114                .ok()
115                .and_then(|v| v.parse().ok())
116                .unwrap_or(default.keep_alive_interval_ms),
117            udp_receive_buffer_size: std::env::var("RUNTARA_QUIC_UDP_RECV_BUFFER")
118                .ok()
119                .and_then(|v| v.parse().ok())
120                .unwrap_or(default.udp_receive_buffer_size),
121            udp_send_buffer_size: std::env::var("RUNTARA_QUIC_UDP_SEND_BUFFER")
122                .ok()
123                .and_then(|v| v.parse().ok())
124                .unwrap_or(default.udp_send_buffer_size),
125            max_concurrent_handlers: std::env::var("RUNTARA_QUIC_MAX_HANDLERS")
126                .ok()
127                .and_then(|v| v.parse().ok())
128                .unwrap_or(default.max_concurrent_handlers),
129        }
130    }
131}
132
133/// QUIC server for runtara-core
134pub struct RuntaraServer {
135    endpoint: Endpoint,
136    config: RuntaraServerConfig,
137}
138
139impl RuntaraServer {
140    /// Create a new server with the given configuration
141    pub fn new(config: RuntaraServerConfig) -> Result<Self, ServerError> {
142        use socket2::{Domain, Protocol, Socket, Type};
143
144        let server_config = Self::build_server_config(&config)?;
145
146        // Create UDP socket with custom buffer sizes using socket2
147        let domain = if config.bind_addr.is_ipv6() {
148            Domain::IPV6
149        } else {
150            Domain::IPV4
151        };
152        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
153
154        if config.udp_receive_buffer_size > 0
155            && let Err(e) = socket.set_recv_buffer_size(config.udp_receive_buffer_size)
156        {
157            warn!(
158                size = config.udp_receive_buffer_size,
159                error = %e,
160                "Failed to set UDP receive buffer size"
161            );
162        }
163        if config.udp_send_buffer_size > 0
164            && let Err(e) = socket.set_send_buffer_size(config.udp_send_buffer_size)
165        {
166            warn!(
167                size = config.udp_send_buffer_size,
168                error = %e,
169                "Failed to set UDP send buffer size"
170            );
171        }
172
173        // Bind and convert to std socket
174        socket.bind(&config.bind_addr.into())?;
175        let std_socket: std::net::UdpSocket = socket.into();
176
177        let runtime = quinn::default_runtime()
178            .ok_or_else(|| ServerError::Bind(std::io::Error::other("no async runtime found")))?;
179        let endpoint = Endpoint::new_with_abstract_socket(
180            quinn::EndpointConfig::default(),
181            Some(server_config),
182            runtime.wrap_udp_socket(std_socket)?,
183            runtime,
184        )?;
185
186        info!(
187            addr = %config.bind_addr,
188            max_incoming = config.max_incoming,
189            max_bi_streams = config.max_bi_streams,
190            idle_timeout_ms = config.idle_timeout_ms,
191            keep_alive_ms = config.keep_alive_interval_ms,
192            udp_recv_buffer = config.udp_receive_buffer_size,
193            udp_send_buffer = config.udp_send_buffer_size,
194            max_handlers = config.max_concurrent_handlers,
195            "QUIC server bound"
196        );
197
198        Ok(Self { endpoint, config })
199    }
200
201    /// Create a server with self-signed certificate for local development
202    pub fn localhost(bind_addr: SocketAddr) -> Result<Self, ServerError> {
203        Self::localhost_with_config(bind_addr, RuntaraServerConfig::from_env())
204    }
205
206    /// Create a server with self-signed certificate and custom config
207    pub fn localhost_with_config(
208        bind_addr: SocketAddr,
209        mut config: RuntaraServerConfig,
210    ) -> Result<Self, ServerError> {
211        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
212            .map_err(|e| ServerError::Tls(e.to_string()))?;
213
214        config.bind_addr = bind_addr;
215        config.cert_pem = cert.cert.pem().into_bytes();
216        config.key_pem = cert.key_pair.serialize_pem().into_bytes();
217
218        Self::new(config)
219    }
220
221    /// Get the server configuration
222    pub fn config(&self) -> &RuntaraServerConfig {
223        &self.config
224    }
225
226    fn build_server_config(config: &RuntaraServerConfig) -> Result<ServerConfig, ServerError> {
227        let certs = rustls_pemfile::certs(&mut config.cert_pem.as_slice())
228            .collect::<Result<Vec<_>, _>>()
229            .map_err(|e| ServerError::Tls(format!("failed to parse certificates: {}", e)))?;
230
231        let key = rustls_pemfile::private_key(&mut config.key_pem.as_slice())
232            .map_err(|e| ServerError::Tls(format!("failed to parse private key: {}", e)))?
233            .ok_or_else(|| ServerError::Tls("no private key found".to_string()))?;
234
235        let crypto = rustls::ServerConfig::builder()
236            .with_no_client_auth()
237            .with_single_cert(certs, key)
238            .map_err(|e| ServerError::Tls(e.to_string()))?;
239
240        let mut transport = TransportConfig::default();
241        transport.max_idle_timeout(Some(
242            std::time::Duration::from_millis(config.idle_timeout_ms)
243                .try_into()
244                .unwrap(),
245        ));
246        transport.max_concurrent_bidi_streams(config.max_bi_streams.into());
247        transport.max_concurrent_uni_streams(config.max_uni_streams.into());
248
249        // Server-side keep-alive
250        if config.keep_alive_interval_ms > 0 {
251            transport.keep_alive_interval(Some(std::time::Duration::from_millis(
252                config.keep_alive_interval_ms,
253            )));
254        }
255
256        let mut server_config = ServerConfig::with_crypto(Arc::new(
257            quinn::crypto::rustls::QuicServerConfig::try_from(crypto)
258                .map_err(|e| ServerError::Tls(e.to_string()))?,
259        ));
260        server_config.transport_config(Arc::new(transport));
261
262        // Limit pending handshakes
263        server_config.max_incoming(config.max_incoming as usize);
264
265        Ok(server_config)
266    }
267
268    /// Accept the next incoming connection
269    pub async fn accept(&self) -> Option<Incoming> {
270        self.endpoint.accept().await
271    }
272
273    /// Get the local address the server is bound to
274    pub fn local_addr(&self) -> Result<SocketAddr, ServerError> {
275        Ok(self.endpoint.local_addr()?)
276    }
277
278    /// Close the server
279    pub fn close(&self) {
280        self.endpoint.close(0u32.into(), b"server closing");
281    }
282
283    /// Run the server with a connection handler
284    #[instrument(skip(self, handler))]
285    pub async fn run<H, Fut>(&self, handler: H) -> Result<(), ServerError>
286    where
287        H: Fn(ConnectionHandler) -> Fut + Send + Sync + Clone + 'static,
288        Fut: std::future::Future<Output = ()> + Send + 'static,
289    {
290        use tokio::sync::Semaphore;
291
292        info!("QUIC server running");
293
294        // Create semaphore for backpressure if configured
295        let semaphore = if self.config.max_concurrent_handlers > 0 {
296            Some(Arc::new(Semaphore::new(
297                self.config.max_concurrent_handlers as usize,
298            )))
299        } else {
300            None
301        };
302
303        while let Some(incoming) = self.accept().await {
304            let handler = handler.clone();
305            let semaphore = semaphore.clone();
306
307            tokio::spawn(async move {
308                // Acquire permit if semaphore is configured
309                let _permit = if let Some(ref sem) = semaphore {
310                    match sem.clone().acquire_owned().await {
311                        Ok(permit) => Some(permit),
312                        Err(_) => {
313                            warn!("semaphore closed, dropping connection");
314                            return;
315                        }
316                    }
317                } else {
318                    None
319                };
320
321                match incoming.await {
322                    Ok(connection) => {
323                        let remote_addr = connection.remote_address();
324                        debug!(%remote_addr, "accepted connection");
325
326                        let conn_handler = ConnectionHandler::new(connection);
327                        handler(conn_handler).await;
328                    }
329                    Err(e) => {
330                        warn!("failed to accept connection: {}", e);
331                    }
332                }
333            });
334        }
335
336        Ok(())
337    }
338}
339
340/// Handler for an individual QUIC connection
341pub struct ConnectionHandler {
342    connection: quinn::Connection,
343}
344
345impl ConnectionHandler {
346    pub fn new(connection: quinn::Connection) -> Self {
347        Self { connection }
348    }
349
350    /// Get the remote address of the connection
351    pub fn remote_address(&self) -> SocketAddr {
352        self.connection.remote_address()
353    }
354
355    /// Accept the next bidirectional stream
356    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ServerError> {
357        Ok(self.connection.accept_bi().await?)
358    }
359
360    /// Accept the next unidirectional stream (for receiving)
361    pub async fn accept_uni(&self) -> Result<RecvStream, ServerError> {
362        Ok(self.connection.accept_uni().await?)
363    }
364
365    /// Open a bidirectional stream
366    pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ServerError> {
367        Ok(self.connection.open_bi().await?)
368    }
369
370    /// Open a unidirectional stream (for sending)
371    pub async fn open_uni(&self) -> Result<SendStream, ServerError> {
372        Ok(self.connection.open_uni().await?)
373    }
374
375    /// Run the connection handler with a stream handler
376    #[instrument(skip(self, handler), fields(remote = %self.remote_address()))]
377    pub async fn run<H, Fut>(&self, handler: H)
378    where
379        H: Fn(StreamHandler) -> Fut + Send + Sync + Clone + 'static,
380        Fut: std::future::Future<Output = ()> + Send + 'static,
381    {
382        loop {
383            tokio::select! {
384                result = self.accept_bi() => {
385                    match result {
386                        Ok((send, recv)) => {
387                            let handler = handler.clone();
388                            tokio::spawn(async move {
389                                let stream_handler = StreamHandler::new(send, recv);
390                                handler(stream_handler).await;
391                            });
392                        }
393                        Err(e) => {
394                            match &e {
395                                ServerError::Connection(quinn::ConnectionError::ApplicationClosed(_)) |
396                                ServerError::Connection(quinn::ConnectionError::LocallyClosed) => {
397                                    debug!("connection closed");
398                                }
399                                _ => {
400                                    error!("error accepting stream: {}", e);
401                                }
402                            }
403                            break;
404                        }
405                    }
406                }
407            }
408        }
409    }
410
411    /// Check if the connection is still open
412    pub fn is_open(&self) -> bool {
413        self.connection.close_reason().is_none()
414    }
415
416    /// Close the connection
417    pub fn close(&self, code: u32, reason: &[u8]) {
418        self.connection.close(code.into(), reason);
419    }
420}
421
422/// Handler for an individual QUIC stream (bidirectional)
423pub struct StreamHandler {
424    send: SendStream,
425    recv: RecvStream,
426}
427
428impl StreamHandler {
429    pub fn new(send: SendStream, recv: RecvStream) -> Self {
430        Self { send, recv }
431    }
432
433    /// Read the next frame from the stream
434    pub async fn read_frame(&mut self) -> Result<Frame, ServerError> {
435        Ok(read_frame(&mut self.recv).await?)
436    }
437
438    /// Write a frame to the stream
439    pub async fn write_frame(&mut self, frame: &Frame) -> Result<(), ServerError> {
440        Ok(write_frame(&mut self.send, frame).await?)
441    }
442
443    /// Handle a request/response pattern
444    pub async fn handle_request<Req, Resp, H, Fut>(&mut self, handler: H) -> Result<(), ServerError>
445    where
446        Req: prost::Message + Default,
447        Resp: prost::Message,
448        H: FnOnce(Req) -> Fut,
449        Fut: std::future::Future<Output = Result<Resp, ServerError>>,
450    {
451        // Read request
452        let request_frame = self.read_frame().await?;
453        let request: Req = request_frame.decode()?;
454
455        // Process and respond
456        match handler(request).await {
457            Ok(response) => {
458                let response_frame = Frame::response(&response)?;
459                self.write_frame(&response_frame).await?;
460            }
461            Err(e) => {
462                error!("request handler error: {}", e);
463                // Send error frame with empty payload
464                // The frame type itself indicates an error
465                let error_frame = Frame {
466                    message_type: crate::frame::MessageType::Error,
467                    payload: bytes::Bytes::new(),
468                };
469                self.write_frame(&error_frame).await?;
470            }
471        }
472
473        Ok(())
474    }
475
476    /// Convert to a FramedStream for more complex patterns
477    pub fn into_framed(self) -> FramedStream<(SendStream, RecvStream)> {
478        FramedStream::new((self.send, self.recv))
479    }
480
481    /// Finish the send stream (signal no more data)
482    pub fn finish(&mut self) -> Result<(), ServerError> {
483        self.send
484            .finish()
485            .map_err(|e| ServerError::Frame(FrameError::Io(std::io::Error::other(e))))?;
486        Ok(())
487    }
488
489    /// Read raw bytes from the stream (for streaming uploads)
490    /// Returns the number of bytes read, or 0 if EOF
491    pub async fn read_bytes(&mut self, buf: &mut [u8]) -> Result<usize, ServerError> {
492        match self.recv.read(buf).await {
493            Ok(Some(n)) => Ok(n),
494            Ok(None) => Ok(0), // EOF
495            Err(e) => Err(ServerError::Frame(FrameError::Io(std::io::Error::other(
496                e.to_string(),
497            )))),
498        }
499    }
500
501    /// Read exact number of bytes from the stream
502    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ServerError> {
503        self.recv.read_exact(buf).await.map_err(|e| {
504            ServerError::Frame(FrameError::Io(std::io::Error::other(e.to_string())))
505        })?;
506        Ok(())
507    }
508
509    /// Read all remaining bytes from the stream until EOF (with size limit)
510    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ServerError> {
511        self.recv
512            .read_to_end(size_limit)
513            .await
514            .map_err(|e| ServerError::Frame(FrameError::Io(std::io::Error::other(e.to_string()))))
515    }
516
517    /// Stream bytes to a writer (for large uploads without buffering all in memory)
518    pub async fn stream_to_writer<W: tokio::io::AsyncWrite + Unpin>(
519        &mut self,
520        writer: &mut W,
521        expected_size: Option<u64>,
522    ) -> Result<u64, ServerError> {
523        use tokio::io::AsyncWriteExt;
524
525        let mut total = 0u64;
526        let mut buf = [0u8; 64 * 1024]; // 64KB chunks
527
528        loop {
529            let n = match self.recv.read(&mut buf).await {
530                Ok(Some(n)) => n,
531                Ok(None) => 0, // EOF
532                Err(e) => {
533                    return Err(ServerError::Frame(FrameError::Io(std::io::Error::other(
534                        e.to_string(),
535                    ))));
536                }
537            };
538            if n == 0 {
539                break;
540            }
541            writer.write_all(&buf[..n]).await?;
542            total += n as u64;
543        }
544
545        if let Some(expected) = expected_size
546            && total != expected
547        {
548            return Err(ServerError::Frame(FrameError::Io(std::io::Error::new(
549                std::io::ErrorKind::UnexpectedEof,
550                format!("Expected {} bytes, got {}", expected, total),
551            ))));
552        }
553
554        Ok(total)
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    #[test]
563    fn test_default_config() {
564        let config = RuntaraServerConfig::default();
565        assert_eq!(config.bind_addr, "0.0.0.0:7001".parse().unwrap());
566        assert_eq!(config.max_incoming, 10_000);
567    }
568
569    #[test]
570    fn test_default_config_all_fields() {
571        let config = RuntaraServerConfig::default();
572        assert_eq!(config.bind_addr, "0.0.0.0:7001".parse().unwrap());
573        assert!(config.cert_pem.is_empty());
574        assert!(config.key_pem.is_empty());
575        assert_eq!(config.max_incoming, 10_000);
576        assert_eq!(config.max_bi_streams, 1_000);
577        assert_eq!(config.max_uni_streams, 100);
578        assert_eq!(config.idle_timeout_ms, 120_000);
579        assert_eq!(config.keep_alive_interval_ms, 15_000);
580        assert_eq!(config.udp_receive_buffer_size, 2 * 1024 * 1024);
581        assert_eq!(config.udp_send_buffer_size, 2 * 1024 * 1024);
582        assert_eq!(config.max_concurrent_handlers, 0);
583    }
584
585    #[test]
586    fn test_config_clone() {
587        let config = RuntaraServerConfig {
588            bind_addr: "127.0.0.1:9000".parse().unwrap(),
589            cert_pem: b"test-cert".to_vec(),
590            key_pem: b"test-key".to_vec(),
591            max_incoming: 5000,
592            max_bi_streams: 50,
593            max_uni_streams: 25,
594            idle_timeout_ms: 60000,
595            keep_alive_interval_ms: 10000,
596            udp_receive_buffer_size: 1024 * 1024,
597            udp_send_buffer_size: 1024 * 1024,
598            max_concurrent_handlers: 500,
599        };
600        let cloned = config.clone();
601        assert_eq!(config.bind_addr, cloned.bind_addr);
602        assert_eq!(config.cert_pem, cloned.cert_pem);
603        assert_eq!(config.key_pem, cloned.key_pem);
604        assert_eq!(config.max_incoming, cloned.max_incoming);
605        assert_eq!(config.max_bi_streams, cloned.max_bi_streams);
606        assert_eq!(config.max_uni_streams, cloned.max_uni_streams);
607        assert_eq!(config.idle_timeout_ms, cloned.idle_timeout_ms);
608        assert_eq!(config.keep_alive_interval_ms, cloned.keep_alive_interval_ms);
609        assert_eq!(
610            config.udp_receive_buffer_size,
611            cloned.udp_receive_buffer_size
612        );
613        assert_eq!(config.udp_send_buffer_size, cloned.udp_send_buffer_size);
614        assert_eq!(
615            config.max_concurrent_handlers,
616            cloned.max_concurrent_handlers
617        );
618    }
619
620    #[test]
621    fn test_config_debug() {
622        let config = RuntaraServerConfig::default();
623        let debug_str = format!("{:?}", config);
624        assert!(debug_str.contains("RuntaraServerConfig"));
625        assert!(debug_str.contains("bind_addr"));
626        assert!(debug_str.contains("max_incoming"));
627    }
628
629    #[tokio::test]
630    async fn test_server_localhost_creation() {
631        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
632        let server = RuntaraServer::localhost(addr);
633        assert!(
634            server.is_ok(),
635            "Failed to create localhost server: {:?}",
636            server.err()
637        );
638    }
639
640    #[tokio::test]
641    async fn test_server_localhost_local_addr() {
642        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
643        let server = RuntaraServer::localhost(addr).unwrap();
644        let local_addr = server.local_addr();
645        assert!(local_addr.is_ok());
646        // Port 0 should have been assigned a real port
647        assert!(local_addr.unwrap().port() > 0);
648    }
649
650    #[tokio::test]
651    async fn test_server_close() {
652        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
653        let server = RuntaraServer::localhost(addr).unwrap();
654        // Closing should not panic
655        server.close();
656    }
657
658    #[test]
659    fn test_server_with_invalid_cert() {
660        let config = RuntaraServerConfig {
661            bind_addr: "127.0.0.1:0".parse().unwrap(),
662            cert_pem: b"invalid-cert".to_vec(),
663            key_pem: b"invalid-key".to_vec(),
664            ..Default::default()
665        };
666        let server = RuntaraServer::new(config);
667        assert!(server.is_err());
668    }
669
670    #[test]
671    fn test_server_error_display() {
672        let err = ServerError::Tls("invalid certificate".to_string());
673        assert_eq!(format!("{}", err), "TLS error: invalid certificate");
674
675        let err = ServerError::Closed;
676        assert_eq!(format!("{}", err), "server closed");
677    }
678
679    #[test]
680    fn test_connection_handler_new() {
681        // We can't easily create a real Connection in tests without network,
682        // but we can test that the struct exists and has expected methods
683        // This is primarily a compile-time check
684    }
685
686    #[test]
687    fn test_stream_handler_new() {
688        // Similar to above - this verifies the API exists
689        // Real testing requires integration tests with actual streams
690    }
691
692    #[tokio::test]
693    async fn test_server_accept_after_close() {
694        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
695        let server = RuntaraServer::localhost(addr).unwrap();
696        server.close();
697        // After close, accept should return None
698        let result = server.accept().await;
699        assert!(result.is_none());
700    }
701
702    #[test]
703    fn test_build_server_config_empty_cert() {
704        let config = RuntaraServerConfig {
705            cert_pem: Vec::new(),
706            key_pem: Vec::new(),
707            ..Default::default()
708        };
709        let result = RuntaraServer::build_server_config(&config);
710        // Empty cert should fail
711        assert!(result.is_err());
712    }
713
714    #[test]
715    fn test_build_server_config_missing_key() {
716        // Generate a valid cert but provide no key
717        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
718        let config = RuntaraServerConfig {
719            cert_pem: cert.cert.pem().into_bytes(),
720            key_pem: Vec::new(),
721            ..Default::default()
722        };
723        let result = RuntaraServer::build_server_config(&config);
724        assert!(result.is_err());
725    }
726
727    #[test]
728    fn test_build_server_config_valid() {
729        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
730        let config = RuntaraServerConfig {
731            cert_pem: cert.cert.pem().into_bytes(),
732            key_pem: cert.key_pair.serialize_pem().into_bytes(),
733            ..Default::default()
734        };
735        let result = RuntaraServer::build_server_config(&config);
736        assert!(result.is_ok());
737    }
738
739    #[test]
740    fn test_build_server_config_with_custom_limits() {
741        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
742        let config = RuntaraServerConfig {
743            bind_addr: "0.0.0.0:0".parse().unwrap(),
744            cert_pem: cert.cert.pem().into_bytes(),
745            key_pem: cert.key_pair.serialize_pem().into_bytes(),
746            max_incoming: 1000,
747            max_bi_streams: 200,
748            max_uni_streams: 50,
749            idle_timeout_ms: 120000,
750            keep_alive_interval_ms: 20000,
751            udp_receive_buffer_size: 4 * 1024 * 1024,
752            udp_send_buffer_size: 4 * 1024 * 1024,
753            max_concurrent_handlers: 200,
754        };
755        let result = RuntaraServer::build_server_config(&config);
756        assert!(result.is_ok());
757    }
758
759    #[tokio::test]
760    async fn test_server_new_with_valid_config() {
761        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
762        let config = RuntaraServerConfig {
763            bind_addr: "127.0.0.1:0".parse().unwrap(),
764            cert_pem: cert.cert.pem().into_bytes(),
765            key_pem: cert.key_pair.serialize_pem().into_bytes(),
766            ..Default::default()
767        };
768        let server = RuntaraServer::new(config);
769        assert!(server.is_ok());
770    }
771
772    // ========== Additional ServerError Tests ==========
773
774    #[test]
775    fn test_server_error_display_bind() {
776        let io_err = std::io::Error::new(std::io::ErrorKind::AddrInUse, "address in use");
777        let err = ServerError::Bind(io_err);
778        let msg = format!("{}", err);
779        assert!(msg.contains("bind error"));
780    }
781
782    #[test]
783    fn test_server_error_display_frame() {
784        let frame_err = FrameError::FrameTooLarge(100);
785        let err = ServerError::Frame(frame_err);
786        let msg = format!("{}", err);
787        assert!(msg.contains("frame error"));
788    }
789
790    #[test]
791    fn test_server_error_debug() {
792        let err = ServerError::Tls("test error".to_string());
793        let debug = format!("{:?}", err);
794        assert!(debug.contains("Tls"));
795
796        let err = ServerError::Closed;
797        let debug = format!("{:?}", err);
798        assert!(debug.contains("Closed"));
799    }
800
801    #[test]
802    fn test_server_error_from_io() {
803        let io_err = std::io::Error::new(std::io::ErrorKind::Other, "test");
804        let err: ServerError = io_err.into();
805        match err {
806            ServerError::Bind(_) => {}
807            _ => panic!("Expected Bind error"),
808        }
809    }
810
811    #[test]
812    fn test_server_error_from_frame_error() {
813        let frame_err = FrameError::ConnectionClosed;
814        let err: ServerError = frame_err.into();
815        match err {
816            ServerError::Frame(_) => {}
817            _ => panic!("Expected Frame error"),
818        }
819    }
820
821    // ========== Config from_env Tests ==========
822
823    #[test]
824    fn test_config_from_env_returns_config() {
825        // Simply verify from_env() returns a valid config
826        // Note: We can't safely modify env vars in Rust 2024 edition without unsafe blocks
827        let config = RuntaraServerConfig::from_env();
828
829        // Verify the config has the expected structure
830        // Values may be defaults or from env vars - both are valid
831        assert!(config.max_incoming > 0);
832        assert!(config.max_bi_streams > 0);
833        assert!(config.max_uni_streams > 0);
834        assert!(config.idle_timeout_ms > 0);
835        // keep_alive_interval_ms can be 0 (disabled)
836        // udp buffer sizes should be positive
837        assert!(config.udp_receive_buffer_size > 0);
838        assert!(config.udp_send_buffer_size > 0);
839        // max_concurrent_handlers can be 0 (unlimited)
840    }
841
842    // ========== Server Config Access Tests ==========
843
844    #[tokio::test]
845    async fn test_server_config_accessor() {
846        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
847        let server = RuntaraServer::localhost(addr).unwrap();
848        let config = server.config();
849        // Config should be accessible and have reasonable values
850        assert_eq!(config.max_incoming, 10_000);
851    }
852
853    #[tokio::test]
854    async fn test_server_localhost_with_custom_config() {
855        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
856        let custom_config = RuntaraServerConfig {
857            max_incoming: 5000,
858            max_bi_streams: 500,
859            ..RuntaraServerConfig::from_env()
860        };
861        let server = RuntaraServer::localhost_with_config(addr, custom_config);
862        assert!(server.is_ok());
863        let server = server.unwrap();
864        let config = server.config();
865        assert_eq!(config.max_incoming, 5000);
866        assert_eq!(config.max_bi_streams, 500);
867    }
868
869    // ========== Build Server Config Edge Cases ==========
870
871    #[test]
872    fn test_build_server_config_no_keepalive() {
873        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
874        let config = RuntaraServerConfig {
875            cert_pem: cert.cert.pem().into_bytes(),
876            key_pem: cert.key_pair.serialize_pem().into_bytes(),
877            keep_alive_interval_ms: 0, // Disabled
878            ..Default::default()
879        };
880        let result = RuntaraServer::build_server_config(&config);
881        assert!(result.is_ok());
882    }
883
884    #[test]
885    fn test_build_server_config_malformed_cert() {
886        let config = RuntaraServerConfig {
887            cert_pem: b"-----BEGIN CERTIFICATE-----\nMALFORMED\n-----END CERTIFICATE-----".to_vec(),
888            key_pem: b"-----BEGIN PRIVATE KEY-----\nMALFORMED\n-----END PRIVATE KEY-----".to_vec(),
889            ..Default::default()
890        };
891        let result = RuntaraServer::build_server_config(&config);
892        assert!(result.is_err());
893    }
894
895    // ========== IPv6 Support Test ==========
896
897    #[tokio::test]
898    async fn test_server_ipv6_binding() {
899        // Try to bind to IPv6 localhost
900        let addr: SocketAddr = "[::1]:0".parse().unwrap();
901        let server = RuntaraServer::localhost(addr);
902        // May fail on systems without IPv6, that's ok
903        if let Ok(server) = server {
904            let local_addr = server.local_addr().unwrap();
905            assert!(local_addr.is_ipv6());
906            server.close();
907        }
908    }
909
910    // ========== Multiple Server Instances ==========
911
912    #[tokio::test]
913    async fn test_multiple_server_instances() {
914        let addr1: SocketAddr = "127.0.0.1:0".parse().unwrap();
915        let addr2: SocketAddr = "127.0.0.1:0".parse().unwrap();
916
917        let server1 = RuntaraServer::localhost(addr1).unwrap();
918        let server2 = RuntaraServer::localhost(addr2).unwrap();
919
920        // Both should have different ports
921        let port1 = server1.local_addr().unwrap().port();
922        let port2 = server2.local_addr().unwrap().port();
923        assert_ne!(port1, port2);
924
925        server1.close();
926        server2.close();
927    }
928}