1use std::net::SocketAddr;
6use std::sync::Arc;
7
8use quinn::{Endpoint, Incoming, RecvStream, SendStream, ServerConfig, TransportConfig};
9use thiserror::Error;
10use tracing::{debug, error, info, instrument, warn};
11
12use crate::frame::{Frame, FrameError, FramedStream, read_frame, write_frame};
13
14#[derive(Debug, Error)]
16pub enum ServerError {
17 #[error("bind error: {0}")]
18 Bind(#[from] std::io::Error),
19
20 #[error("connection error: {0}")]
21 Connection(#[from] quinn::ConnectionError),
22
23 #[error("frame error: {0}")]
24 Frame(#[from] FrameError),
25
26 #[error("TLS error: {0}")]
27 Tls(String),
28
29 #[error("server closed")]
30 Closed,
31}
32
33#[derive(Debug, Clone)]
35pub struct RuntaraServerConfig {
36 pub bind_addr: SocketAddr,
38 pub cert_pem: Vec<u8>,
40 pub key_pem: Vec<u8>,
42 pub max_incoming: u32,
44 pub max_bi_streams: u32,
46 pub max_uni_streams: u32,
48 pub idle_timeout_ms: u64,
50 pub keep_alive_interval_ms: u64,
52 pub udp_receive_buffer_size: usize,
54 pub udp_send_buffer_size: usize,
56 pub max_concurrent_handlers: u32,
58}
59
60impl Default for RuntaraServerConfig {
61 fn default() -> Self {
62 Self {
63 bind_addr: "0.0.0.0:7001".parse().unwrap(),
64 cert_pem: Vec::new(),
65 key_pem: Vec::new(),
66 max_incoming: 10_000,
67 max_bi_streams: 1_000,
68 max_uni_streams: 100,
69 idle_timeout_ms: 120_000,
70 keep_alive_interval_ms: 15_000,
71 udp_receive_buffer_size: 2 * 1024 * 1024, udp_send_buffer_size: 2 * 1024 * 1024, max_concurrent_handlers: 0, }
75 }
76}
77
78impl RuntaraServerConfig {
79 pub fn from_env() -> Self {
91 let default = Self::default();
92
93 Self {
94 bind_addr: default.bind_addr,
95 cert_pem: default.cert_pem,
96 key_pem: default.key_pem,
97 max_incoming: std::env::var("RUNTARA_QUIC_MAX_INCOMING")
98 .ok()
99 .and_then(|v| v.parse().ok())
100 .unwrap_or(default.max_incoming),
101 max_bi_streams: std::env::var("RUNTARA_QUIC_MAX_BI_STREAMS")
102 .ok()
103 .and_then(|v| v.parse().ok())
104 .unwrap_or(default.max_bi_streams),
105 max_uni_streams: std::env::var("RUNTARA_QUIC_MAX_UNI_STREAMS")
106 .ok()
107 .and_then(|v| v.parse().ok())
108 .unwrap_or(default.max_uni_streams),
109 idle_timeout_ms: std::env::var("RUNTARA_QUIC_IDLE_TIMEOUT_MS")
110 .ok()
111 .and_then(|v| v.parse().ok())
112 .unwrap_or(default.idle_timeout_ms),
113 keep_alive_interval_ms: std::env::var("RUNTARA_QUIC_KEEP_ALIVE_MS")
114 .ok()
115 .and_then(|v| v.parse().ok())
116 .unwrap_or(default.keep_alive_interval_ms),
117 udp_receive_buffer_size: std::env::var("RUNTARA_QUIC_UDP_RECV_BUFFER")
118 .ok()
119 .and_then(|v| v.parse().ok())
120 .unwrap_or(default.udp_receive_buffer_size),
121 udp_send_buffer_size: std::env::var("RUNTARA_QUIC_UDP_SEND_BUFFER")
122 .ok()
123 .and_then(|v| v.parse().ok())
124 .unwrap_or(default.udp_send_buffer_size),
125 max_concurrent_handlers: std::env::var("RUNTARA_QUIC_MAX_HANDLERS")
126 .ok()
127 .and_then(|v| v.parse().ok())
128 .unwrap_or(default.max_concurrent_handlers),
129 }
130 }
131}
132
133pub struct RuntaraServer {
135 endpoint: Endpoint,
136 config: RuntaraServerConfig,
137}
138
139impl RuntaraServer {
140 pub fn new(config: RuntaraServerConfig) -> Result<Self, ServerError> {
142 use socket2::{Domain, Protocol, Socket, Type};
143
144 let server_config = Self::build_server_config(&config)?;
145
146 let domain = if config.bind_addr.is_ipv6() {
148 Domain::IPV6
149 } else {
150 Domain::IPV4
151 };
152 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
153
154 if config.udp_receive_buffer_size > 0
155 && let Err(e) = socket.set_recv_buffer_size(config.udp_receive_buffer_size)
156 {
157 warn!(
158 size = config.udp_receive_buffer_size,
159 error = %e,
160 "Failed to set UDP receive buffer size"
161 );
162 }
163 if config.udp_send_buffer_size > 0
164 && let Err(e) = socket.set_send_buffer_size(config.udp_send_buffer_size)
165 {
166 warn!(
167 size = config.udp_send_buffer_size,
168 error = %e,
169 "Failed to set UDP send buffer size"
170 );
171 }
172
173 socket.bind(&config.bind_addr.into())?;
175 let std_socket: std::net::UdpSocket = socket.into();
176
177 let runtime = quinn::default_runtime()
178 .ok_or_else(|| ServerError::Bind(std::io::Error::other("no async runtime found")))?;
179 let endpoint = Endpoint::new_with_abstract_socket(
180 quinn::EndpointConfig::default(),
181 Some(server_config),
182 runtime.wrap_udp_socket(std_socket)?,
183 runtime,
184 )?;
185
186 info!(
187 addr = %config.bind_addr,
188 max_incoming = config.max_incoming,
189 max_bi_streams = config.max_bi_streams,
190 idle_timeout_ms = config.idle_timeout_ms,
191 keep_alive_ms = config.keep_alive_interval_ms,
192 udp_recv_buffer = config.udp_receive_buffer_size,
193 udp_send_buffer = config.udp_send_buffer_size,
194 max_handlers = config.max_concurrent_handlers,
195 "QUIC server bound"
196 );
197
198 Ok(Self { endpoint, config })
199 }
200
201 pub fn localhost(bind_addr: SocketAddr) -> Result<Self, ServerError> {
203 Self::localhost_with_config(bind_addr, RuntaraServerConfig::from_env())
204 }
205
206 pub fn localhost_with_config(
208 bind_addr: SocketAddr,
209 mut config: RuntaraServerConfig,
210 ) -> Result<Self, ServerError> {
211 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
212 .map_err(|e| ServerError::Tls(e.to_string()))?;
213
214 config.bind_addr = bind_addr;
215 config.cert_pem = cert.cert.pem().into_bytes();
216 config.key_pem = cert.key_pair.serialize_pem().into_bytes();
217
218 Self::new(config)
219 }
220
221 pub fn config(&self) -> &RuntaraServerConfig {
223 &self.config
224 }
225
226 fn build_server_config(config: &RuntaraServerConfig) -> Result<ServerConfig, ServerError> {
227 let certs = rustls_pemfile::certs(&mut config.cert_pem.as_slice())
228 .collect::<Result<Vec<_>, _>>()
229 .map_err(|e| ServerError::Tls(format!("failed to parse certificates: {}", e)))?;
230
231 let key = rustls_pemfile::private_key(&mut config.key_pem.as_slice())
232 .map_err(|e| ServerError::Tls(format!("failed to parse private key: {}", e)))?
233 .ok_or_else(|| ServerError::Tls("no private key found".to_string()))?;
234
235 let crypto = rustls::ServerConfig::builder()
236 .with_no_client_auth()
237 .with_single_cert(certs, key)
238 .map_err(|e| ServerError::Tls(e.to_string()))?;
239
240 let mut transport = TransportConfig::default();
241 transport.max_idle_timeout(Some(
242 std::time::Duration::from_millis(config.idle_timeout_ms)
243 .try_into()
244 .unwrap(),
245 ));
246 transport.max_concurrent_bidi_streams(config.max_bi_streams.into());
247 transport.max_concurrent_uni_streams(config.max_uni_streams.into());
248
249 if config.keep_alive_interval_ms > 0 {
251 transport.keep_alive_interval(Some(std::time::Duration::from_millis(
252 config.keep_alive_interval_ms,
253 )));
254 }
255
256 let mut server_config = ServerConfig::with_crypto(Arc::new(
257 quinn::crypto::rustls::QuicServerConfig::try_from(crypto)
258 .map_err(|e| ServerError::Tls(e.to_string()))?,
259 ));
260 server_config.transport_config(Arc::new(transport));
261
262 server_config.max_incoming(config.max_incoming as usize);
264
265 Ok(server_config)
266 }
267
268 pub async fn accept(&self) -> Option<Incoming> {
270 self.endpoint.accept().await
271 }
272
273 pub fn local_addr(&self) -> Result<SocketAddr, ServerError> {
275 Ok(self.endpoint.local_addr()?)
276 }
277
278 pub fn close(&self) {
280 self.endpoint.close(0u32.into(), b"server closing");
281 }
282
283 #[instrument(skip(self, handler))]
285 pub async fn run<H, Fut>(&self, handler: H) -> Result<(), ServerError>
286 where
287 H: Fn(ConnectionHandler) -> Fut + Send + Sync + Clone + 'static,
288 Fut: std::future::Future<Output = ()> + Send + 'static,
289 {
290 use tokio::sync::Semaphore;
291
292 info!("QUIC server running");
293
294 let semaphore = if self.config.max_concurrent_handlers > 0 {
296 Some(Arc::new(Semaphore::new(
297 self.config.max_concurrent_handlers as usize,
298 )))
299 } else {
300 None
301 };
302
303 while let Some(incoming) = self.accept().await {
304 let handler = handler.clone();
305 let semaphore = semaphore.clone();
306
307 tokio::spawn(async move {
308 let _permit = if let Some(ref sem) = semaphore {
310 match sem.clone().acquire_owned().await {
311 Ok(permit) => Some(permit),
312 Err(_) => {
313 warn!("semaphore closed, dropping connection");
314 return;
315 }
316 }
317 } else {
318 None
319 };
320
321 match incoming.await {
322 Ok(connection) => {
323 let remote_addr = connection.remote_address();
324 debug!(%remote_addr, "accepted connection");
325
326 let conn_handler = ConnectionHandler::new(connection);
327 handler(conn_handler).await;
328 }
329 Err(e) => {
330 warn!("failed to accept connection: {}", e);
331 }
332 }
333 });
334 }
335
336 Ok(())
337 }
338}
339
340pub struct ConnectionHandler {
342 connection: quinn::Connection,
343}
344
345impl ConnectionHandler {
346 pub fn new(connection: quinn::Connection) -> Self {
347 Self { connection }
348 }
349
350 pub fn remote_address(&self) -> SocketAddr {
352 self.connection.remote_address()
353 }
354
355 pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ServerError> {
357 Ok(self.connection.accept_bi().await?)
358 }
359
360 pub async fn accept_uni(&self) -> Result<RecvStream, ServerError> {
362 Ok(self.connection.accept_uni().await?)
363 }
364
365 pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ServerError> {
367 Ok(self.connection.open_bi().await?)
368 }
369
370 pub async fn open_uni(&self) -> Result<SendStream, ServerError> {
372 Ok(self.connection.open_uni().await?)
373 }
374
375 #[instrument(skip(self, handler), fields(remote = %self.remote_address()))]
377 pub async fn run<H, Fut>(&self, handler: H)
378 where
379 H: Fn(StreamHandler) -> Fut + Send + Sync + Clone + 'static,
380 Fut: std::future::Future<Output = ()> + Send + 'static,
381 {
382 loop {
383 tokio::select! {
384 result = self.accept_bi() => {
385 match result {
386 Ok((send, recv)) => {
387 let handler = handler.clone();
388 tokio::spawn(async move {
389 let stream_handler = StreamHandler::new(send, recv);
390 handler(stream_handler).await;
391 });
392 }
393 Err(e) => {
394 match &e {
395 ServerError::Connection(quinn::ConnectionError::ApplicationClosed(_)) |
396 ServerError::Connection(quinn::ConnectionError::LocallyClosed) => {
397 debug!("connection closed");
398 }
399 _ => {
400 error!("error accepting stream: {}", e);
401 }
402 }
403 break;
404 }
405 }
406 }
407 }
408 }
409 }
410
411 pub fn is_open(&self) -> bool {
413 self.connection.close_reason().is_none()
414 }
415
416 pub fn close(&self, code: u32, reason: &[u8]) {
418 self.connection.close(code.into(), reason);
419 }
420}
421
422pub struct StreamHandler {
424 send: SendStream,
425 recv: RecvStream,
426}
427
428impl StreamHandler {
429 pub fn new(send: SendStream, recv: RecvStream) -> Self {
430 Self { send, recv }
431 }
432
433 pub async fn read_frame(&mut self) -> Result<Frame, ServerError> {
435 Ok(read_frame(&mut self.recv).await?)
436 }
437
438 pub async fn write_frame(&mut self, frame: &Frame) -> Result<(), ServerError> {
440 Ok(write_frame(&mut self.send, frame).await?)
441 }
442
443 pub async fn handle_request<Req, Resp, H, Fut>(&mut self, handler: H) -> Result<(), ServerError>
445 where
446 Req: prost::Message + Default,
447 Resp: prost::Message,
448 H: FnOnce(Req) -> Fut,
449 Fut: std::future::Future<Output = Result<Resp, ServerError>>,
450 {
451 let request_frame = self.read_frame().await?;
453 let request: Req = request_frame.decode()?;
454
455 match handler(request).await {
457 Ok(response) => {
458 let response_frame = Frame::response(&response)?;
459 self.write_frame(&response_frame).await?;
460 }
461 Err(e) => {
462 error!("request handler error: {}", e);
463 let error_frame = Frame {
466 message_type: crate::frame::MessageType::Error,
467 payload: bytes::Bytes::new(),
468 };
469 self.write_frame(&error_frame).await?;
470 }
471 }
472
473 Ok(())
474 }
475
476 pub fn into_framed(self) -> FramedStream<(SendStream, RecvStream)> {
478 FramedStream::new((self.send, self.recv))
479 }
480
481 pub fn finish(&mut self) -> Result<(), ServerError> {
483 self.send
484 .finish()
485 .map_err(|e| ServerError::Frame(FrameError::Io(std::io::Error::other(e))))?;
486 Ok(())
487 }
488
489 pub async fn read_bytes(&mut self, buf: &mut [u8]) -> Result<usize, ServerError> {
492 match self.recv.read(buf).await {
493 Ok(Some(n)) => Ok(n),
494 Ok(None) => Ok(0), Err(e) => Err(ServerError::Frame(FrameError::Io(std::io::Error::other(
496 e.to_string(),
497 )))),
498 }
499 }
500
501 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ServerError> {
503 self.recv.read_exact(buf).await.map_err(|e| {
504 ServerError::Frame(FrameError::Io(std::io::Error::other(e.to_string())))
505 })?;
506 Ok(())
507 }
508
509 pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ServerError> {
511 self.recv
512 .read_to_end(size_limit)
513 .await
514 .map_err(|e| ServerError::Frame(FrameError::Io(std::io::Error::other(e.to_string()))))
515 }
516
517 pub async fn stream_to_writer<W: tokio::io::AsyncWrite + Unpin>(
519 &mut self,
520 writer: &mut W,
521 expected_size: Option<u64>,
522 ) -> Result<u64, ServerError> {
523 use tokio::io::AsyncWriteExt;
524
525 let mut total = 0u64;
526 let mut buf = [0u8; 64 * 1024]; loop {
529 let n = match self.recv.read(&mut buf).await {
530 Ok(Some(n)) => n,
531 Ok(None) => 0, Err(e) => {
533 return Err(ServerError::Frame(FrameError::Io(std::io::Error::other(
534 e.to_string(),
535 ))));
536 }
537 };
538 if n == 0 {
539 break;
540 }
541 writer.write_all(&buf[..n]).await?;
542 total += n as u64;
543 }
544
545 if let Some(expected) = expected_size
546 && total != expected
547 {
548 return Err(ServerError::Frame(FrameError::Io(std::io::Error::new(
549 std::io::ErrorKind::UnexpectedEof,
550 format!("Expected {} bytes, got {}", expected, total),
551 ))));
552 }
553
554 Ok(total)
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561
562 #[test]
563 fn test_default_config() {
564 let config = RuntaraServerConfig::default();
565 assert_eq!(config.bind_addr, "0.0.0.0:7001".parse().unwrap());
566 assert_eq!(config.max_incoming, 10_000);
567 }
568
569 #[test]
570 fn test_default_config_all_fields() {
571 let config = RuntaraServerConfig::default();
572 assert_eq!(config.bind_addr, "0.0.0.0:7001".parse().unwrap());
573 assert!(config.cert_pem.is_empty());
574 assert!(config.key_pem.is_empty());
575 assert_eq!(config.max_incoming, 10_000);
576 assert_eq!(config.max_bi_streams, 1_000);
577 assert_eq!(config.max_uni_streams, 100);
578 assert_eq!(config.idle_timeout_ms, 120_000);
579 assert_eq!(config.keep_alive_interval_ms, 15_000);
580 assert_eq!(config.udp_receive_buffer_size, 2 * 1024 * 1024);
581 assert_eq!(config.udp_send_buffer_size, 2 * 1024 * 1024);
582 assert_eq!(config.max_concurrent_handlers, 0);
583 }
584
585 #[test]
586 fn test_config_clone() {
587 let config = RuntaraServerConfig {
588 bind_addr: "127.0.0.1:9000".parse().unwrap(),
589 cert_pem: b"test-cert".to_vec(),
590 key_pem: b"test-key".to_vec(),
591 max_incoming: 5000,
592 max_bi_streams: 50,
593 max_uni_streams: 25,
594 idle_timeout_ms: 60000,
595 keep_alive_interval_ms: 10000,
596 udp_receive_buffer_size: 1024 * 1024,
597 udp_send_buffer_size: 1024 * 1024,
598 max_concurrent_handlers: 500,
599 };
600 let cloned = config.clone();
601 assert_eq!(config.bind_addr, cloned.bind_addr);
602 assert_eq!(config.cert_pem, cloned.cert_pem);
603 assert_eq!(config.key_pem, cloned.key_pem);
604 assert_eq!(config.max_incoming, cloned.max_incoming);
605 assert_eq!(config.max_bi_streams, cloned.max_bi_streams);
606 assert_eq!(config.max_uni_streams, cloned.max_uni_streams);
607 assert_eq!(config.idle_timeout_ms, cloned.idle_timeout_ms);
608 assert_eq!(config.keep_alive_interval_ms, cloned.keep_alive_interval_ms);
609 assert_eq!(
610 config.udp_receive_buffer_size,
611 cloned.udp_receive_buffer_size
612 );
613 assert_eq!(config.udp_send_buffer_size, cloned.udp_send_buffer_size);
614 assert_eq!(
615 config.max_concurrent_handlers,
616 cloned.max_concurrent_handlers
617 );
618 }
619
620 #[test]
621 fn test_config_debug() {
622 let config = RuntaraServerConfig::default();
623 let debug_str = format!("{:?}", config);
624 assert!(debug_str.contains("RuntaraServerConfig"));
625 assert!(debug_str.contains("bind_addr"));
626 assert!(debug_str.contains("max_incoming"));
627 }
628
629 #[tokio::test]
630 async fn test_server_localhost_creation() {
631 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
632 let server = RuntaraServer::localhost(addr);
633 assert!(
634 server.is_ok(),
635 "Failed to create localhost server: {:?}",
636 server.err()
637 );
638 }
639
640 #[tokio::test]
641 async fn test_server_localhost_local_addr() {
642 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
643 let server = RuntaraServer::localhost(addr).unwrap();
644 let local_addr = server.local_addr();
645 assert!(local_addr.is_ok());
646 assert!(local_addr.unwrap().port() > 0);
648 }
649
650 #[tokio::test]
651 async fn test_server_close() {
652 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
653 let server = RuntaraServer::localhost(addr).unwrap();
654 server.close();
656 }
657
658 #[test]
659 fn test_server_with_invalid_cert() {
660 let config = RuntaraServerConfig {
661 bind_addr: "127.0.0.1:0".parse().unwrap(),
662 cert_pem: b"invalid-cert".to_vec(),
663 key_pem: b"invalid-key".to_vec(),
664 ..Default::default()
665 };
666 let server = RuntaraServer::new(config);
667 assert!(server.is_err());
668 }
669
670 #[test]
671 fn test_server_error_display() {
672 let err = ServerError::Tls("invalid certificate".to_string());
673 assert_eq!(format!("{}", err), "TLS error: invalid certificate");
674
675 let err = ServerError::Closed;
676 assert_eq!(format!("{}", err), "server closed");
677 }
678
679 #[test]
680 fn test_connection_handler_new() {
681 }
685
686 #[test]
687 fn test_stream_handler_new() {
688 }
691
692 #[tokio::test]
693 async fn test_server_accept_after_close() {
694 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
695 let server = RuntaraServer::localhost(addr).unwrap();
696 server.close();
697 let result = server.accept().await;
699 assert!(result.is_none());
700 }
701
702 #[test]
703 fn test_build_server_config_empty_cert() {
704 let config = RuntaraServerConfig {
705 cert_pem: Vec::new(),
706 key_pem: Vec::new(),
707 ..Default::default()
708 };
709 let result = RuntaraServer::build_server_config(&config);
710 assert!(result.is_err());
712 }
713
714 #[test]
715 fn test_build_server_config_missing_key() {
716 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
718 let config = RuntaraServerConfig {
719 cert_pem: cert.cert.pem().into_bytes(),
720 key_pem: Vec::new(),
721 ..Default::default()
722 };
723 let result = RuntaraServer::build_server_config(&config);
724 assert!(result.is_err());
725 }
726
727 #[test]
728 fn test_build_server_config_valid() {
729 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
730 let config = RuntaraServerConfig {
731 cert_pem: cert.cert.pem().into_bytes(),
732 key_pem: cert.key_pair.serialize_pem().into_bytes(),
733 ..Default::default()
734 };
735 let result = RuntaraServer::build_server_config(&config);
736 assert!(result.is_ok());
737 }
738
739 #[test]
740 fn test_build_server_config_with_custom_limits() {
741 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
742 let config = RuntaraServerConfig {
743 bind_addr: "0.0.0.0:0".parse().unwrap(),
744 cert_pem: cert.cert.pem().into_bytes(),
745 key_pem: cert.key_pair.serialize_pem().into_bytes(),
746 max_incoming: 1000,
747 max_bi_streams: 200,
748 max_uni_streams: 50,
749 idle_timeout_ms: 120000,
750 keep_alive_interval_ms: 20000,
751 udp_receive_buffer_size: 4 * 1024 * 1024,
752 udp_send_buffer_size: 4 * 1024 * 1024,
753 max_concurrent_handlers: 200,
754 };
755 let result = RuntaraServer::build_server_config(&config);
756 assert!(result.is_ok());
757 }
758
759 #[tokio::test]
760 async fn test_server_new_with_valid_config() {
761 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
762 let config = RuntaraServerConfig {
763 bind_addr: "127.0.0.1:0".parse().unwrap(),
764 cert_pem: cert.cert.pem().into_bytes(),
765 key_pem: cert.key_pair.serialize_pem().into_bytes(),
766 ..Default::default()
767 };
768 let server = RuntaraServer::new(config);
769 assert!(server.is_ok());
770 }
771
772 #[test]
775 fn test_server_error_display_bind() {
776 let io_err = std::io::Error::new(std::io::ErrorKind::AddrInUse, "address in use");
777 let err = ServerError::Bind(io_err);
778 let msg = format!("{}", err);
779 assert!(msg.contains("bind error"));
780 }
781
782 #[test]
783 fn test_server_error_display_frame() {
784 let frame_err = FrameError::FrameTooLarge(100);
785 let err = ServerError::Frame(frame_err);
786 let msg = format!("{}", err);
787 assert!(msg.contains("frame error"));
788 }
789
790 #[test]
791 fn test_server_error_debug() {
792 let err = ServerError::Tls("test error".to_string());
793 let debug = format!("{:?}", err);
794 assert!(debug.contains("Tls"));
795
796 let err = ServerError::Closed;
797 let debug = format!("{:?}", err);
798 assert!(debug.contains("Closed"));
799 }
800
801 #[test]
802 fn test_server_error_from_io() {
803 let io_err = std::io::Error::new(std::io::ErrorKind::Other, "test");
804 let err: ServerError = io_err.into();
805 match err {
806 ServerError::Bind(_) => {}
807 _ => panic!("Expected Bind error"),
808 }
809 }
810
811 #[test]
812 fn test_server_error_from_frame_error() {
813 let frame_err = FrameError::ConnectionClosed;
814 let err: ServerError = frame_err.into();
815 match err {
816 ServerError::Frame(_) => {}
817 _ => panic!("Expected Frame error"),
818 }
819 }
820
821 #[test]
824 fn test_config_from_env_returns_config() {
825 let config = RuntaraServerConfig::from_env();
828
829 assert!(config.max_incoming > 0);
832 assert!(config.max_bi_streams > 0);
833 assert!(config.max_uni_streams > 0);
834 assert!(config.idle_timeout_ms > 0);
835 assert!(config.udp_receive_buffer_size > 0);
838 assert!(config.udp_send_buffer_size > 0);
839 }
841
842 #[tokio::test]
845 async fn test_server_config_accessor() {
846 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
847 let server = RuntaraServer::localhost(addr).unwrap();
848 let config = server.config();
849 assert_eq!(config.max_incoming, 10_000);
851 }
852
853 #[tokio::test]
854 async fn test_server_localhost_with_custom_config() {
855 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
856 let custom_config = RuntaraServerConfig {
857 max_incoming: 5000,
858 max_bi_streams: 500,
859 ..RuntaraServerConfig::from_env()
860 };
861 let server = RuntaraServer::localhost_with_config(addr, custom_config);
862 assert!(server.is_ok());
863 let server = server.unwrap();
864 let config = server.config();
865 assert_eq!(config.max_incoming, 5000);
866 assert_eq!(config.max_bi_streams, 500);
867 }
868
869 #[test]
872 fn test_build_server_config_no_keepalive() {
873 let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap();
874 let config = RuntaraServerConfig {
875 cert_pem: cert.cert.pem().into_bytes(),
876 key_pem: cert.key_pair.serialize_pem().into_bytes(),
877 keep_alive_interval_ms: 0, ..Default::default()
879 };
880 let result = RuntaraServer::build_server_config(&config);
881 assert!(result.is_ok());
882 }
883
884 #[test]
885 fn test_build_server_config_malformed_cert() {
886 let config = RuntaraServerConfig {
887 cert_pem: b"-----BEGIN CERTIFICATE-----\nMALFORMED\n-----END CERTIFICATE-----".to_vec(),
888 key_pem: b"-----BEGIN PRIVATE KEY-----\nMALFORMED\n-----END PRIVATE KEY-----".to_vec(),
889 ..Default::default()
890 };
891 let result = RuntaraServer::build_server_config(&config);
892 assert!(result.is_err());
893 }
894
895 #[tokio::test]
898 async fn test_server_ipv6_binding() {
899 let addr: SocketAddr = "[::1]:0".parse().unwrap();
901 let server = RuntaraServer::localhost(addr);
902 if let Ok(server) = server {
904 let local_addr = server.local_addr().unwrap();
905 assert!(local_addr.is_ipv6());
906 server.close();
907 }
908 }
909
910 #[tokio::test]
913 async fn test_multiple_server_instances() {
914 let addr1: SocketAddr = "127.0.0.1:0".parse().unwrap();
915 let addr2: SocketAddr = "127.0.0.1:0".parse().unwrap();
916
917 let server1 = RuntaraServer::localhost(addr1).unwrap();
918 let server2 = RuntaraServer::localhost(addr2).unwrap();
919
920 let port1 = server1.local_addr().unwrap().port();
922 let port2 = server2.local_addr().unwrap().port();
923 assert_ne!(port1, port2);
924
925 server1.close();
926 server2.close();
927 }
928}