reinhardt_testkit/fixtures/
server.rs1use reinhardt_di::InjectionContext;
7use reinhardt_http::Handler;
8use reinhardt_http::{Request, Response};
9use reinhardt_server::{
10 HttpServer, RateLimitConfig, RateLimitHandler, ShutdownCoordinator, TimeoutHandler,
11};
12use reinhardt_urls::routers::ServerRouter as Router;
13use rstest::fixture;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::net::TcpListener;
18use tokio::task::JoinHandle;
19
20#[cfg(feature = "websockets")]
21use reinhardt_server::WebSocketServer;
22
23#[cfg(feature = "graphql")]
24use reinhardt_server::GraphQLHandler;
25
26pub struct TestServerGuard {
49 pub url: String,
51 pub coordinator: Arc<ShutdownCoordinator>,
53 server_task: Option<JoinHandle<()>>,
55}
56
57impl TestServerGuard {
58 async fn new(router: Router) -> Self {
70 let shutdown_timeout = Duration::from_secs(5);
71 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
73 let actual_addr = listener.local_addr().unwrap();
74 let url = format!("http://{}", actual_addr);
75
76 let coordinator = Arc::new(ShutdownCoordinator::new(shutdown_timeout));
78
79 let server_coordinator = (*coordinator).clone();
81 let handler: Arc<dyn Handler> = Arc::new(router);
82 let server = HttpServer::new(handler);
83 let mut shutdown_rx = server_coordinator.subscribe();
84 let server_task = tokio::spawn(async move {
85 loop {
86 tokio::select! {
87 result = listener.accept() => {
88 match result {
89 Ok((stream, socket_addr)) => {
90 let handler_clone = server.handler();
91 tokio::spawn(async move {
92 if let Err(e) =
93 HttpServer::handle_connection(stream, socket_addr, handler_clone, None)
94 .await
95 {
96 eprintln!("Error handling connection: {:?}", e);
97 }
98 });
99 }
100 Err(e) => {
101 eprintln!("Error accepting connection: {:?}", e);
102 break;
103 }
104 }
105 }
106 _ = shutdown_rx.recv() => {
107 break;
108 }
109 }
110 }
111 });
112
113 wait_for_server_ready(actual_addr)
116 .await
117 .expect("Test server failed to become ready");
118
119 Self {
120 url,
121 coordinator,
122 server_task: Some(server_task),
123 }
124 }
125}
126
127impl Drop for TestServerGuard {
128 fn drop(&mut self) {
129 self.coordinator.shutdown();
131
132 if let Some(task) = self.server_task.take() {
136 task.abort();
137 }
138 }
139}
140
141pub async fn test_server_guard(router: Router) -> TestServerGuard {
164 TestServerGuard::new(router).await
165}
166
167#[derive(Clone)]
173pub struct BasicHandler;
174
175#[async_trait::async_trait]
176impl Handler for BasicHandler {
177 async fn handle(&self, _request: Request) -> reinhardt_core::exception::Result<Response> {
178 Ok(Response::ok().with_body("OK"))
179 }
180}
181
182#[fixture]
206pub fn http_client() -> reqwest::Client {
207 reqwest::Client::builder()
208 .timeout(Duration::from_secs(10))
209 .build()
210 .expect("Failed to create HTTP client")
211}
212#[fixture]
234pub async fn http1_server() -> TestServer {
235 let handler = Arc::new(BasicHandler);
236 TestServer::builder()
237 .handler(handler)
238 .build()
239 .await
240 .expect("Failed to create HTTP/1.1 server")
241}
242
243#[fixture]
263pub async fn http2_server() -> TestServer {
264 let handler = Arc::new(BasicHandler);
265 TestServer::builder()
266 .handler(handler)
267 .http2(true)
268 .build()
269 .await
270 .expect("Failed to create HTTP/2 server")
271}
272
273#[fixture]
295pub async fn server_with_timeout(
296 #[default(Duration::from_secs(5))] timeout: Duration,
297) -> TestServer {
298 let handler = Arc::new(BasicHandler);
299 let timeout_handler = Arc::new(TimeoutHandler::new(handler, timeout));
300 TestServer::builder()
301 .handler(timeout_handler)
302 .build()
303 .await
304 .expect("Failed to create server with timeout")
305}
306
307#[fixture]
325pub async fn server_with_rate_limit(#[default(100)] limit: u32) -> TestServer {
326 let handler = Arc::new(BasicHandler);
327 let config = RateLimitConfig::per_minute(limit as usize);
328 let rate_limit_handler = Arc::new(RateLimitHandler::new(handler, config));
329 TestServer::builder()
330 .handler(rate_limit_handler)
331 .build()
332 .await
333 .expect("Failed to create server with rate limit")
334}
335
336#[fixture]
352pub async fn server_with_middleware_chain() -> TestServer {
353 let handler = Arc::new(BasicHandler);
354 let timeout_handler = Arc::new(TimeoutHandler::new(handler, Duration::from_secs(5)));
355 let config = RateLimitConfig::per_minute(100);
356 let rate_limit_handler = Arc::new(RateLimitHandler::new(timeout_handler, config));
357
358 TestServer::builder()
359 .handler(rate_limit_handler)
360 .build()
361 .await
362 .expect("Failed to create server with middleware chain")
363}
364
365#[fixture]
381pub async fn server_with_di() -> (TestServer, Arc<InjectionContext>) {
382 use reinhardt_di::SingletonScope;
383
384 let handler = Arc::new(BasicHandler);
385 let di_context = Arc::new(InjectionContext::builder(Arc::new(SingletonScope::new())).build());
386
387 let server = TestServer::builder()
388 .handler(handler)
389 .di_context(di_context.clone())
390 .build()
391 .await
392 .expect("Failed to create server with DI context");
393
394 (server, di_context)
395}
396
397#[cfg(feature = "websockets")]
402#[fixture]
418pub async fn websocket_server() -> TestServer {
419 use reinhardt_server::WebSocketHandler;
420
421 #[derive(Clone)]
422 struct EchoHandler;
423
424 #[async_trait::async_trait]
425 impl WebSocketHandler for EchoHandler {
426 async fn handle_message(&self, message: String) -> Result<String, String> {
427 Ok(message) }
429
430 async fn on_connect(&self) {}
431 async fn on_disconnect(&self) {}
432 }
433
434 let ws_handler = Arc::new(EchoHandler);
435 TestServer::builder()
436 .websocket_handler(ws_handler)
437 .build()
438 .await
439 .expect("Failed to create WebSocket server")
440}
441
442#[cfg(feature = "websockets")]
443#[fixture]
458pub async fn websocket_client(
459 #[from(websocket_server)]
460 #[future]
461 server: TestServer,
462) -> tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>> {
463 let server = server.await;
464 let ws_url = server.url.replace("http://", "ws://");
465 let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url)
466 .await
467 .expect("Failed to connect WebSocket");
468 ws_stream
469}
470
471#[cfg(feature = "graphql")]
476#[cfg(feature = "graphql")]
492#[fixture]
493pub async fn graphql_server() -> TestServer {
494 use async_graphql::{EmptyMutation, EmptySubscription, Object, Schema};
495
496 struct Query;
497
498 #[Object]
499 impl Query {
500 async fn hello(&self) -> &'static str {
501 "Hello, GraphQL!"
502 }
503 }
504
505 let schema = Schema::build(Query, EmptyMutation, EmptySubscription).finish();
506 let graphql_handler = Arc::new(GraphQLHandler::new(schema));
507
508 TestServer::builder()
509 .handler(graphql_handler)
510 .build()
511 .await
512 .expect("Failed to create GraphQL server")
513}
514
515pub struct TestServer {
521 pub url: String,
523 pub addr: SocketAddr,
525 pub coordinator: Arc<ShutdownCoordinator>,
527 server_task: Option<JoinHandle<()>>,
529}
530
531impl TestServer {
532 pub fn builder() -> TestServerBuilder {
534 TestServerBuilder::new()
535 }
536}
537
538impl Drop for TestServer {
539 fn drop(&mut self) {
540 self.coordinator.shutdown();
542
543 if let Some(task) = self.server_task.take() {
545 task.abort();
546 }
547 }
548}
549
550pub struct TestServerBuilder {
552 handler: Option<Arc<dyn Handler>>,
553 #[cfg(feature = "websockets")]
554 websocket_handler: Option<Arc<dyn reinhardt_server::WebSocketHandler>>,
555 di_context: Option<Arc<InjectionContext>>,
556 http2: bool,
557 shutdown_timeout: Duration,
558}
559
560impl TestServerBuilder {
561 fn new() -> Self {
562 Self {
563 handler: None,
564 #[cfg(feature = "websockets")]
565 websocket_handler: None,
566 di_context: None,
567 http2: false,
568 shutdown_timeout: Duration::from_secs(5),
569 }
570 }
571
572 pub fn handler(mut self, handler: Arc<dyn Handler>) -> Self {
574 self.handler = Some(handler);
575 self
576 }
577
578 #[cfg(feature = "websockets")]
579 pub fn websocket_handler(
581 mut self,
582 handler: Arc<dyn reinhardt_server::WebSocketHandler>,
583 ) -> Self {
584 self.websocket_handler = Some(handler);
585 self
586 }
587
588 pub fn di_context(mut self, context: Arc<InjectionContext>) -> Self {
590 self.di_context = Some(context);
591 self
592 }
593
594 pub fn http2(mut self, enabled: bool) -> Self {
596 self.http2 = enabled;
597 self
598 }
599
600 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
602 self.shutdown_timeout = timeout;
603 self
604 }
605
606 pub async fn build(self) -> Result<TestServer, Box<dyn std::error::Error>> {
608 let listener = TcpListener::bind("127.0.0.1:0").await?;
610 let actual_addr = listener.local_addr()?;
611 let url = format!("http://{}", actual_addr);
612
613 let coordinator = Arc::new(ShutdownCoordinator::new(self.shutdown_timeout));
615
616 let server_coordinator = (*coordinator).clone();
618
619 #[cfg(feature = "websockets")]
620 let websocket_handler = self.websocket_handler;
621
622 let handler = self.handler;
623 let di_context = self.di_context;
624 let http2 = self.http2;
625
626 let server_task = tokio::spawn(async move {
627 #[cfg(feature = "websockets")]
631 if let Some(ws_handler) = websocket_handler {
632 drop(listener);
633 let server = WebSocketServer::from_arc(ws_handler);
634 let _ = server
635 .listen_with_shutdown(actual_addr, server_coordinator)
636 .await;
637 return;
638 }
639
640 if let Some(h) = handler {
641 if http2 {
642 drop(listener);
643 let server = reinhardt_server::Http2Server::new(h);
644 let _ = server
645 .listen_with_shutdown(actual_addr, server_coordinator)
646 .await;
647 } else {
648 let server = HttpServer::new(h);
650 let mut shutdown_rx = server_coordinator.subscribe();
651 loop {
652 tokio::select! {
653 result = listener.accept() => {
654 match result {
655 Ok((stream, socket_addr)) => {
656 let handler_clone = server.handler();
657 let di_ctx = di_context.clone();
658 tokio::spawn(async move {
659 if let Err(e) =
660 HttpServer::handle_connection(stream, socket_addr, handler_clone, di_ctx)
661 .await
662 {
663 eprintln!("Error handling connection: {:?}", e);
664 }
665 });
666 }
667 Err(e) => {
668 eprintln!("Error accepting connection: {:?}", e);
669 break;
670 }
671 }
672 }
673 _ = shutdown_rx.recv() => {
674 break;
675 }
676 }
677 }
678 }
679 }
680 });
681
682 wait_for_server_ready(actual_addr)
685 .await
686 .expect("Test server failed to become ready");
687
688 Ok(TestServer {
689 url,
690 addr: actual_addr,
691 coordinator,
692 server_task: Some(server_task),
693 })
694 }
695}
696
697const SERVER_READY_MAX_ATTEMPTS: u32 = 20;
703
704const SERVER_READY_PROBE_INTERVAL_MS: u64 = 50;
706
707async fn wait_for_server_ready(addr: SocketAddr) -> Result<(), std::io::Error> {
717 for attempt in 1..=SERVER_READY_MAX_ATTEMPTS {
718 match tokio::net::TcpStream::connect(addr).await {
720 Ok(_) => return Ok(()),
721 Err(_) if attempt < SERVER_READY_MAX_ATTEMPTS => {
722 tokio::time::sleep(Duration::from_millis(SERVER_READY_PROBE_INTERVAL_MS)).await;
723 }
724 Err(e) => {
725 return Err(std::io::Error::new(
726 std::io::ErrorKind::TimedOut,
727 format!(
728 "Server at {} not ready after {} attempts: {}",
729 addr, SERVER_READY_MAX_ATTEMPTS, e
730 ),
731 ));
732 }
733 }
734 }
735
736 Err(std::io::Error::new(
737 std::io::ErrorKind::TimedOut,
738 format!(
739 "Server at {} not ready after {} attempts",
740 addr, SERVER_READY_MAX_ATTEMPTS
741 ),
742 ))
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use rstest::*;
749
750 #[rstest]
751 #[tokio::test]
752 async fn test_basic_handler_returns_ok() {
753 let handler = BasicHandler;
755 let request = Request::builder()
756 .method(hyper::Method::GET)
757 .uri("/")
758 .build()
759 .expect("Failed to build request");
760
761 let response = handler.handle(request).await;
763
764 assert!(response.is_ok(), "Expected Ok response from BasicHandler");
766 let resp = response.unwrap();
767 assert_eq!(resp.status, hyper::StatusCode::OK);
768 }
769
770 #[rstest]
771 #[tokio::test]
772 async fn test_test_server_guard_starts() {
773 let router = Router::new();
775
776 let server = test_server_guard(router).await;
778
779 assert!(
781 server.url.starts_with("http://127.0.0.1:"),
782 "Expected URL to start with 'http://127.0.0.1:', got: {}",
783 server.url
784 );
785 }
786
787 #[rstest]
788 #[tokio::test]
789 async fn test_test_server_builder_default() {
790 let handler: Arc<dyn Handler> = Arc::new(BasicHandler);
792
793 let result = TestServer::builder().handler(handler).build().await;
795
796 assert!(
798 result.is_ok(),
799 "Expected TestServer::builder().handler().build() to succeed"
800 );
801 }
802
803 #[rstest]
804 #[tokio::test]
805 async fn test_test_server_url_format() {
806 let handler: Arc<dyn Handler> = Arc::new(BasicHandler);
808
809 let server = TestServer::builder()
811 .handler(handler)
812 .build()
813 .await
814 .expect("Failed to build TestServer");
815
816 assert!(
818 server.url.starts_with("http://127.0.0.1:"),
819 "Expected URL format 'http://127.0.0.1:<port>', got: {}",
820 server.url
821 );
822 assert!(
823 server.addr.port() > 0,
824 "Expected non-zero port, got: {}",
825 server.addr.port()
826 );
827 }
828
829 #[rstest]
830 #[tokio::test]
831 async fn test_test_server_responds_to_request() {
832 let handler: Arc<dyn Handler> = Arc::new(BasicHandler);
834 let server = TestServer::builder()
835 .handler(handler)
836 .build()
837 .await
838 .expect("Failed to build TestServer");
839 let client = reqwest::Client::new();
840
841 let response = client.get(&server.url).send().await;
843
844 assert!(response.is_ok(), "Expected GET request to succeed");
846 let resp = response.unwrap();
847 assert_eq!(resp.status(), reqwest::StatusCode::OK);
848 }
849
850 #[rstest]
851 fn test_http_client_fixture() {
852 let client = http_client();
854
855 let _: &reqwest::Client = &client;
858 }
859
860 #[rstest]
861 #[tokio::test]
862 async fn test_test_server_shutdown_timeout() {
863 let handler: Arc<dyn Handler> = Arc::new(BasicHandler);
865 let custom_timeout = Duration::from_secs(10);
866
867 let result = TestServer::builder()
869 .handler(handler)
870 .shutdown_timeout(custom_timeout)
871 .build()
872 .await;
873
874 assert!(
876 result.is_ok(),
877 "Expected TestServer with custom shutdown timeout to build successfully"
878 );
879 }
880
881 #[rstest]
882 #[tokio::test]
883 async fn test_wait_for_server_ready() {
884 let listener = TcpListener::bind("127.0.0.1:0")
886 .await
887 .expect("Failed to bind listener");
888 let addr = listener.local_addr().expect("Failed to get local addr");
889
890 let result = wait_for_server_ready(addr).await;
892
893 assert!(
895 result.is_ok(),
896 "Expected wait_for_server_ready to succeed for a bound address"
897 );
898 }
899}