1use crate::metrics::ServerMetrics;
9use crate::server::{MessageHandler, SessionManager};
10use crate::transport::{TcpTransport, TcpTransportListener};
11use std::io;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::broadcast;
15use tokio::task::JoinSet;
16use tokio::time::timeout;
17
18#[derive(Debug)]
26pub struct ReplServer {
27 address: String,
29
30 session_manager: Arc<SessionManager>,
32
33 shutdown_tx: broadcast::Sender<()>,
35
36 shutdown_timeout: Duration,
38
39 metrics: Option<Arc<ServerMetrics>>,
41}
42
43#[derive(Debug, Clone)]
48pub struct ShutdownHandle {
49 tx: broadcast::Sender<()>,
50}
51
52impl ShutdownHandle {
53 pub fn shutdown(&self) {
58 let _ = self.tx.send(());
60 }
61}
62
63impl ReplServer {
64 pub fn new(address: impl Into<String>) -> Self {
80 let (shutdown_tx, _) = broadcast::channel(1);
81 Self {
82 address: address.into(),
83 session_manager: Arc::new(SessionManager::new()),
84 shutdown_tx,
85 shutdown_timeout: Duration::from_secs(30), metrics: None,
87 }
88 }
89
90 pub fn with_metrics(mut self) -> Self {
108 self.metrics = Some(Arc::new(ServerMetrics::new()));
109 self
110 }
111
112 pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
129 self.shutdown_timeout = timeout;
130 self
131 }
132
133 pub fn shutdown_handle(&self) -> ShutdownHandle {
154 ShutdownHandle { tx: self.shutdown_tx.clone() }
155 }
156
157 pub async fn start(&mut self) -> io::Result<()> {
196 eprintln!("[INFO] Starting REPL server on {}", self.address);
197
198 let listener = TcpTransportListener::bind(&self.address)
199 .await
200 .map_err(|e| io::Error::other(format!("{:?}", e)))?;
201 eprintln!("[INFO] Server listening on {}", self.address);
202
203 let mut shutdown_rx = self.shutdown_tx.subscribe();
205
206 let mut active_connections = JoinSet::new();
208
209 loop {
210 tokio::select! {
211 _ = shutdown_rx.recv() => {
213 eprintln!("[INFO] Shutdown signal received");
214 break;
215 }
216
217 result = listener.accept() => {
219 match result {
220 Ok(transport) => {
221 let peer_addr = transport
222 .peer_addr()
223 .map(|a| a.to_string())
224 .unwrap_or_else(|_| "unknown".to_string());
225
226 eprintln!("[INFO] Accepted connection from {}", peer_addr);
227
228 if let Some(ref metrics) = self.metrics {
230 metrics.connection_accepted();
231 }
232
233 let session_manager = Arc::clone(&self.session_manager);
235 let mut shutdown_rx = self.shutdown_tx.subscribe();
236 let metrics = self.metrics.as_ref().map(Arc::clone);
237
238 active_connections.spawn(async move {
239 tokio::select! {
241 result = Self::handle_connection(transport, session_manager, metrics.as_deref()) => {
242 match result {
243 Ok(_) => {
244 eprintln!("[INFO] Connection closed ({})", peer_addr);
245 }
246 Err(e) => {
247 eprintln!("[ERROR] Connection error ({}): {}", peer_addr, e);
248 }
249 }
250 }
251 _ = shutdown_rx.recv() => {
252 eprintln!("[INFO] Connection interrupted by shutdown ({})", peer_addr);
253 }
254 }
255 if let Some(ref metrics) = metrics {
257 metrics.connection_closed();
258 }
259 });
260 }
261 Err(e) => {
262 eprintln!("[ERROR] Failed to accept connection: {}", e);
263 }
265 }
266 }
267 }
268 }
269
270 eprintln!(
272 "[INFO] Waiting for {} active connections to finish (timeout: {:?})",
273 active_connections.len(),
274 self.shutdown_timeout
275 );
276
277 match timeout(self.shutdown_timeout, async {
279 while active_connections.join_next().await.is_some() {
280 }
282 })
283 .await
284 {
285 Ok(_) => {
286 eprintln!("[INFO] All connections closed gracefully");
287 }
288 Err(_) => {
289 eprintln!(
290 "[WARN] Shutdown timeout reached, {} connections still active",
291 active_connections.len()
292 );
293 active_connections.shutdown().await;
295 }
296 }
297
298 if let Ok(count) = self.session_manager.close_all() {
300 eprintln!("[INFO] Closed {} sessions", count);
301 }
302
303 eprintln!("[INFO] Server shutdown complete");
304 Ok(())
305 }
306
307 async fn handle_connection(
312 mut transport: TcpTransport,
313 session_manager: Arc<SessionManager>,
314 metrics: Option<&ServerMetrics>,
315 ) -> io::Result<()> {
316 use crate::protocol::{Operation, OperationResult};
317 use crate::transport::{Transport, TransportError};
318
319 let handler = MessageHandler::new((*session_manager).clone());
320
321 loop {
322 let request = match transport.recv_request().await {
324 Ok(req) => req,
325 Err(TransportError::ConnectionClosed) => {
326 eprintln!("[INFO] Connection closed cleanly");
328 return Ok(());
329 }
330 Err(e) => {
331 eprintln!("[ERROR] Failed to read request: {:?}", e);
333 return Err(io::Error::other(format!("{:?}", e)));
334 }
335 };
336
337 if let Some(m) = metrics {
339 let operation_name = match &request.operation {
340 Operation::CreateSession { .. } => "create_session",
341 Operation::Clone { .. } => "clone",
342 Operation::Eval { .. } => "eval",
343 Operation::Close => "close",
344 Operation::LsSessions => "ls_sessions",
345 Operation::LoadFile { .. } => "load_file",
346 Operation::Interrupt => "interrupt",
347 Operation::Describe { .. } => "describe",
348 Operation::History { .. } => "history",
349 Operation::ClearOutput => "clear_output",
350 Operation::GetServerStats => "get_server_stats",
351 Operation::GetSessionStats => "get_session_stats",
352 Operation::GetSubprocessStats => "get_subprocess_stats",
353 Operation::GetSystemInfo => "get_system_info",
354 };
355 m.request_received(operation_name);
356
357 match &request.operation {
359 Operation::CreateSession { .. } => m.session_created(),
360 Operation::Close => m.session_closed(),
361 _ => {}
362 }
363 }
364
365 let response = handler.handle(request).await;
367
368 if let Some(m) = metrics {
370 let status = match &response.result {
371 OperationResult::Success { .. } => "success",
372 OperationResult::Error { .. } => "error",
373 OperationResult::Sessions { .. } => "success",
374 OperationResult::HistoryEntries { .. } => "success",
375 OperationResult::ServerStats { .. } => "success",
376 OperationResult::SessionStats { .. } => "success",
377 OperationResult::SubprocessStats { .. } => "success",
378 OperationResult::SystemInfo { .. } => "success",
379 };
380 m.response_sent(status);
381 }
382
383 if let Err(e) = transport.send_response(&response).await {
385 match e {
386 TransportError::ConnectionClosed => {
387 eprintln!("[INFO] Connection closed after response");
389 return Ok(());
390 }
391 _ => {
392 eprintln!("[ERROR] Failed to write response: {:?}", e);
393 return Err(io::Error::other(format!("{:?}", e)));
394 }
395 }
396 }
397 }
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::protocol::{MessageId, Operation, OperationResult, ReplMode, Request, SessionId};
405 use crate::transport::Transport;
406 use std::time::Duration;
407 use tokio::time::timeout;
408
409 #[tokio::test]
410 async fn test_server_creation() {
411 let server = ReplServer::new("127.0.0.1:0");
412 assert_eq!(server.address, "127.0.0.1:0");
413 }
414
415 #[tokio::test]
416 async fn test_server_start_and_connect() {
417 let _server = ReplServer::new("127.0.0.1:0");
419
420 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
422 let addr = listener.local_addr().unwrap();
423
424 let server_handle = tokio::spawn(async move {
426 let transport = listener.accept().await.unwrap();
428 let session_manager = Arc::new(SessionManager::new());
429 ReplServer::handle_connection(transport, session_manager, None).await.unwrap();
430 });
431
432 tokio::time::sleep(Duration::from_millis(100)).await;
434
435 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
437
438 let request = Request {
440 id: MessageId::new(1),
441 session_id: SessionId::new("test"),
442 operation: Operation::CreateSession { mode: ReplMode::Lisp },
443 };
444
445 client.send_request(&request).await.unwrap();
446
447 let response = client.recv_response().await.unwrap();
449
450 assert_eq!(response.request_id, MessageId::new(1));
451 assert!(matches!(response.result, OperationResult::Success { .. }));
452
453 drop(client);
455
456 timeout(Duration::from_secs(1), server_handle).await.unwrap().unwrap();
458 }
459
460 #[tokio::test]
461 async fn test_multiple_requests_same_connection() {
462 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
463 let addr = listener.local_addr().unwrap();
464
465 let server_handle = tokio::spawn(async move {
467 let transport = listener.accept().await.unwrap();
468 let session_manager = Arc::new(SessionManager::new());
469 ReplServer::handle_connection(transport, session_manager, None).await.ok();
470 });
471
472 tokio::time::sleep(Duration::from_millis(50)).await;
473
474 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
476
477 let req1 = Request {
479 id: MessageId::new(1),
480 session_id: SessionId::new("test"),
481 operation: Operation::CreateSession { mode: ReplMode::Lisp },
482 };
483
484 client.send_request(&req1).await.unwrap();
485 let resp1 = client.recv_response().await.unwrap();
486 assert_eq!(resp1.request_id, MessageId::new(1));
487
488 let req2 = Request {
490 id: MessageId::new(2),
491 session_id: SessionId::new("test"),
492 operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
493 };
494
495 client.send_request(&req2).await.unwrap();
496 let resp2 = client.recv_response().await.unwrap();
497 assert_eq!(resp2.request_id, MessageId::new(2));
498
499 if let OperationResult::Success { value, .. } = resp2.result {
500 assert_eq!(value, Some("3".to_string()));
501 } else {
502 panic!("Expected success response");
503 }
504
505 let req3 = Request {
507 id: MessageId::new(3),
508 session_id: SessionId::new("test"),
509 operation: Operation::Close,
510 };
511
512 client.send_request(&req3).await.unwrap();
513 let resp3 = client.recv_response().await.unwrap();
514 assert_eq!(resp3.request_id, MessageId::new(3));
515
516 drop(client);
517 timeout(Duration::from_secs(1), server_handle).await.ok();
518 }
519
520 #[tokio::test]
521 async fn test_concurrent_connections() {
522 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
523 let addr = listener.local_addr().unwrap();
524
525 let session_manager = Arc::new(SessionManager::new());
526
527 let sm = Arc::clone(&session_manager);
529 let server_handle = tokio::spawn(async move {
530 let transport1 = listener.accept().await.unwrap();
532 let sm1 = Arc::clone(&sm);
533 tokio::spawn(async move {
534 ReplServer::handle_connection(transport1, sm1, None).await.ok();
535 });
536
537 let transport2 = listener.accept().await.unwrap();
539 ReplServer::handle_connection(transport2, sm, None).await.ok();
540 });
541
542 tokio::time::sleep(Duration::from_millis(50)).await;
543
544 let mut client1 = TcpTransport::connect(addr.to_string()).await.unwrap();
546 let req1 = Request {
547 id: MessageId::new(1),
548 session_id: SessionId::new("session-1"),
549 operation: Operation::CreateSession { mode: ReplMode::Lisp },
550 };
551 client1.send_request(&req1).await.unwrap();
552 let resp1 = client1.recv_response().await.unwrap();
553 assert_eq!(resp1.request_id, MessageId::new(1));
554
555 let mut client2 = TcpTransport::connect(addr.to_string()).await.unwrap();
557 let req2 = Request {
558 id: MessageId::new(1),
559 session_id: SessionId::new("session-2"),
560 operation: Operation::CreateSession { mode: ReplMode::Sexpr },
561 };
562 client2.send_request(&req2).await.unwrap();
563 let resp2 = client2.recv_response().await.unwrap();
564 assert_eq!(resp2.request_id, MessageId::new(1));
565
566 assert_eq!(session_manager.count().unwrap(), 2);
568
569 drop(client1);
570 drop(client2);
571
572 timeout(Duration::from_secs(1), server_handle).await.ok();
573 }
574
575 #[tokio::test]
576 async fn test_connection_error_handling() {
577 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
578 let addr = listener.local_addr().unwrap();
579
580 let server_handle = tokio::spawn(async move {
581 let transport = listener.accept().await.unwrap();
582 let session_manager = Arc::new(SessionManager::new());
583 ReplServer::handle_connection(transport, session_manager, None).await
584 });
585
586 tokio::time::sleep(Duration::from_millis(50)).await;
587
588 let client = TcpTransport::connect(addr.to_string()).await.unwrap();
590 drop(client);
591
592 let result = timeout(Duration::from_secs(1), server_handle).await.unwrap().unwrap();
594
595 assert!(result.is_ok()); }
597
598 #[tokio::test]
599 async fn test_shutdown_handle_functionality() {
600 let server = ReplServer::new("127.0.0.1:5555");
601 let handle1 = server.shutdown_handle();
602 let handle2 = handle1.clone();
603
604 handle2.shutdown();
607
608 }
611
612 #[tokio::test]
613 async fn test_with_shutdown_timeout() {
614 let server =
615 ReplServer::new("127.0.0.1:5555").with_shutdown_timeout(Duration::from_secs(60));
616
617 assert_eq!(server.shutdown_timeout, Duration::from_secs(60));
618 }
619
620 #[tokio::test]
621 async fn test_with_metrics() {
622 let server = ReplServer::new("127.0.0.1:0").with_metrics();
623 assert!(server.metrics.is_some());
624 }
625
626 #[tokio::test]
627 async fn test_server_builder_chain() {
628 let server = ReplServer::new("127.0.0.1:0")
629 .with_metrics()
630 .with_shutdown_timeout(Duration::from_secs(45));
631
632 assert!(server.metrics.is_some());
633 assert_eq!(server.shutdown_timeout, Duration::from_secs(45));
634 }
635
636 #[tokio::test]
637 async fn test_handle_connection_with_metrics() {
638 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
639 let addr = listener.local_addr().unwrap();
640
641 let metrics = Arc::new(ServerMetrics::new());
642 let metrics_clone = Arc::clone(&metrics);
643
644 let server_handle = tokio::spawn(async move {
646 let transport = listener.accept().await.unwrap();
647 let session_manager = Arc::new(SessionManager::new());
648 ReplServer::handle_connection(transport, session_manager, Some(&metrics_clone)).await
649 });
650
651 tokio::time::sleep(Duration::from_millis(50)).await;
652
653 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
655
656 let request = Request {
658 id: MessageId::new(1),
659 session_id: SessionId::new("test-metrics"),
660 operation: Operation::CreateSession { mode: ReplMode::Lisp },
661 };
662 client.send_request(&request).await.unwrap();
663 let response = client.recv_response().await.unwrap();
664 assert!(matches!(response.result, OperationResult::Success { .. }));
665
666 let request = Request {
668 id: MessageId::new(2),
669 session_id: SessionId::new("test-metrics"),
670 operation: Operation::Eval { code: "(+ 1 1)".to_string(), mode: ReplMode::Lisp },
671 };
672 client.send_request(&request).await.unwrap();
673 let _response = client.recv_response().await.unwrap();
674
675 let request = Request {
677 id: MessageId::new(3),
678 session_id: SessionId::new("test-metrics"),
679 operation: Operation::Close,
680 };
681 client.send_request(&request).await.unwrap();
682 let _response = client.recv_response().await.unwrap();
683
684 drop(client);
685 timeout(Duration::from_secs(1), server_handle).await.ok();
686 }
687
688 #[tokio::test]
689 async fn test_handle_connection_ls_sessions() {
690 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
691 let addr = listener.local_addr().unwrap();
692
693 let server_handle = tokio::spawn(async move {
694 let transport = listener.accept().await.unwrap();
695 let session_manager = Arc::new(SessionManager::new());
696 ReplServer::handle_connection(transport, session_manager, None).await
697 });
698
699 tokio::time::sleep(Duration::from_millis(50)).await;
700
701 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
702
703 let request = Request {
705 id: MessageId::new(1),
706 session_id: SessionId::new("unused"),
707 operation: Operation::LsSessions,
708 };
709 client.send_request(&request).await.unwrap();
710 let response = client.recv_response().await.unwrap();
711 assert!(matches!(response.result, OperationResult::Sessions { .. }));
712
713 drop(client);
714 timeout(Duration::from_secs(1), server_handle).await.ok();
715 }
716
717 #[tokio::test]
718 async fn test_handle_connection_history() {
719 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
720 let addr = listener.local_addr().unwrap();
721
722 let server_handle = tokio::spawn(async move {
723 let transport = listener.accept().await.unwrap();
724 let session_manager = Arc::new(SessionManager::new());
725 ReplServer::handle_connection(transport, session_manager, None).await
726 });
727
728 tokio::time::sleep(Duration::from_millis(50)).await;
729
730 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
731
732 let request = Request {
734 id: MessageId::new(1),
735 session_id: SessionId::new("test-history"),
736 operation: Operation::CreateSession { mode: ReplMode::Lisp },
737 };
738 client.send_request(&request).await.unwrap();
739 let _response = client.recv_response().await.unwrap();
740
741 let request = Request {
743 id: MessageId::new(2),
744 session_id: SessionId::new("test-history"),
745 operation: Operation::History { limit: Some(10) },
746 };
747 client.send_request(&request).await.unwrap();
748 let response = client.recv_response().await.unwrap();
749 assert!(matches!(
751 response.result,
752 OperationResult::HistoryEntries { .. } | OperationResult::Error { .. }
753 ));
754
755 drop(client);
756 timeout(Duration::from_secs(1), server_handle).await.ok();
757 }
758
759 #[tokio::test]
760 async fn test_handle_connection_get_server_stats() {
761 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
762 let addr = listener.local_addr().unwrap();
763
764 let server_handle = tokio::spawn(async move {
765 let transport = listener.accept().await.unwrap();
766 let session_manager = Arc::new(SessionManager::new());
767 ReplServer::handle_connection(transport, session_manager, None).await
768 });
769
770 tokio::time::sleep(Duration::from_millis(50)).await;
771
772 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
773
774 let request = Request {
776 id: MessageId::new(1),
777 session_id: SessionId::new("unused"),
778 operation: Operation::GetServerStats,
779 };
780 client.send_request(&request).await.unwrap();
781 let response = client.recv_response().await.unwrap();
782 assert!(matches!(
784 response.result,
785 OperationResult::ServerStats { .. } | OperationResult::Error { .. }
786 ));
787
788 drop(client);
789 timeout(Duration::from_secs(1), server_handle).await.ok();
790 }
791
792 #[tokio::test]
793 async fn test_handle_connection_get_session_stats() {
794 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
795 let addr = listener.local_addr().unwrap();
796
797 let server_handle = tokio::spawn(async move {
798 let transport = listener.accept().await.unwrap();
799 let session_manager = Arc::new(SessionManager::new());
800 ReplServer::handle_connection(transport, session_manager, None).await
801 });
802
803 tokio::time::sleep(Duration::from_millis(50)).await;
804
805 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
806
807 let request = Request {
809 id: MessageId::new(1),
810 session_id: SessionId::new("test-stats"),
811 operation: Operation::CreateSession { mode: ReplMode::Lisp },
812 };
813 client.send_request(&request).await.unwrap();
814 let _response = client.recv_response().await.unwrap();
815
816 let request = Request {
818 id: MessageId::new(2),
819 session_id: SessionId::new("test-stats"),
820 operation: Operation::GetSessionStats,
821 };
822 client.send_request(&request).await.unwrap();
823 let response = client.recv_response().await.unwrap();
824 assert!(matches!(response.result, OperationResult::SessionStats { .. }));
825
826 drop(client);
827 timeout(Duration::from_secs(1), server_handle).await.ok();
828 }
829
830 #[tokio::test]
831 async fn test_handle_connection_get_subprocess_stats() {
832 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
833 let addr = listener.local_addr().unwrap();
834
835 let server_handle = tokio::spawn(async move {
836 let transport = listener.accept().await.unwrap();
837 let session_manager = Arc::new(SessionManager::new());
838 ReplServer::handle_connection(transport, session_manager, None).await
839 });
840
841 tokio::time::sleep(Duration::from_millis(50)).await;
842
843 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
844
845 let request = Request {
847 id: MessageId::new(1),
848 session_id: SessionId::new("test-sub-stats"),
849 operation: Operation::CreateSession { mode: ReplMode::Lisp },
850 };
851 client.send_request(&request).await.unwrap();
852 let _response = client.recv_response().await.unwrap();
853
854 let request = Request {
856 id: MessageId::new(2),
857 session_id: SessionId::new("test-sub-stats"),
858 operation: Operation::GetSubprocessStats,
859 };
860 client.send_request(&request).await.unwrap();
861 let response = client.recv_response().await.unwrap();
862 assert!(matches!(
864 response.result,
865 OperationResult::SubprocessStats { .. } | OperationResult::Error { .. }
866 ));
867
868 drop(client);
869 timeout(Duration::from_secs(1), server_handle).await.ok();
870 }
871
872 #[tokio::test]
873 async fn test_handle_connection_get_system_info() {
874 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
875 let addr = listener.local_addr().unwrap();
876
877 let server_handle = tokio::spawn(async move {
878 let transport = listener.accept().await.unwrap();
879 let session_manager = Arc::new(SessionManager::new());
880 ReplServer::handle_connection(transport, session_manager, None).await
881 });
882
883 tokio::time::sleep(Duration::from_millis(50)).await;
884
885 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
886
887 let request = Request {
889 id: MessageId::new(1),
890 session_id: SessionId::new("unused"),
891 operation: Operation::GetSystemInfo,
892 };
893 client.send_request(&request).await.unwrap();
894 let response = client.recv_response().await.unwrap();
895 assert!(matches!(
897 response.result,
898 OperationResult::SystemInfo { .. } | OperationResult::Error { .. }
899 ));
900
901 drop(client);
902 timeout(Duration::from_secs(1), server_handle).await.ok();
903 }
904
905 #[tokio::test]
906 async fn test_handle_connection_clear_output() {
907 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
908 let addr = listener.local_addr().unwrap();
909
910 let server_handle = tokio::spawn(async move {
911 let transport = listener.accept().await.unwrap();
912 let session_manager = Arc::new(SessionManager::new());
913 ReplServer::handle_connection(transport, session_manager, None).await
914 });
915
916 tokio::time::sleep(Duration::from_millis(50)).await;
917
918 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
919
920 let request = Request {
922 id: MessageId::new(1),
923 session_id: SessionId::new("test-clear"),
924 operation: Operation::CreateSession { mode: ReplMode::Lisp },
925 };
926 client.send_request(&request).await.unwrap();
927 let _response = client.recv_response().await.unwrap();
928
929 let request = Request {
931 id: MessageId::new(2),
932 session_id: SessionId::new("test-clear"),
933 operation: Operation::ClearOutput,
934 };
935 client.send_request(&request).await.unwrap();
936 let response = client.recv_response().await.unwrap();
937 assert!(matches!(
939 response.result,
940 OperationResult::Success { .. } | OperationResult::Error { .. }
941 ));
942
943 drop(client);
944 timeout(Duration::from_secs(1), server_handle).await.ok();
945 }
946
947 #[tokio::test]
948 async fn test_handle_connection_interrupt() {
949 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
950 let addr = listener.local_addr().unwrap();
951
952 let server_handle = tokio::spawn(async move {
953 let transport = listener.accept().await.unwrap();
954 let session_manager = Arc::new(SessionManager::new());
955 ReplServer::handle_connection(transport, session_manager, None).await
956 });
957
958 tokio::time::sleep(Duration::from_millis(50)).await;
959
960 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
961
962 let request = Request {
964 id: MessageId::new(1),
965 session_id: SessionId::new("test-interrupt"),
966 operation: Operation::CreateSession { mode: ReplMode::Lisp },
967 };
968 client.send_request(&request).await.unwrap();
969 let _response = client.recv_response().await.unwrap();
970
971 let request = Request {
973 id: MessageId::new(2),
974 session_id: SessionId::new("test-interrupt"),
975 operation: Operation::Interrupt,
976 };
977 client.send_request(&request).await.unwrap();
978 let response = client.recv_response().await.unwrap();
979 assert!(matches!(
981 response.result,
982 OperationResult::Success { .. } | OperationResult::Error { .. }
983 ));
984
985 drop(client);
986 timeout(Duration::from_secs(1), server_handle).await.ok();
987 }
988
989 #[tokio::test]
990 async fn test_handle_connection_describe() {
991 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
992 let addr = listener.local_addr().unwrap();
993
994 let server_handle = tokio::spawn(async move {
995 let transport = listener.accept().await.unwrap();
996 let session_manager = Arc::new(SessionManager::new());
997 ReplServer::handle_connection(transport, session_manager, None).await
998 });
999
1000 tokio::time::sleep(Duration::from_millis(50)).await;
1001
1002 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1003
1004 let request = Request {
1006 id: MessageId::new(1),
1007 session_id: SessionId::new("test-describe"),
1008 operation: Operation::CreateSession { mode: ReplMode::Lisp },
1009 };
1010 client.send_request(&request).await.unwrap();
1011 let _response = client.recv_response().await.unwrap();
1012
1013 let request = Request {
1015 id: MessageId::new(2),
1016 session_id: SessionId::new("test-describe"),
1017 operation: Operation::Describe { symbol: "test".to_string() },
1018 };
1019 client.send_request(&request).await.unwrap();
1020 let response = client.recv_response().await.unwrap();
1021 assert!(matches!(
1023 response.result,
1024 OperationResult::Success { .. } | OperationResult::Error { .. }
1025 ));
1026
1027 drop(client);
1028 timeout(Duration::from_secs(1), server_handle).await.ok();
1029 }
1030
1031 #[tokio::test]
1032 async fn test_handle_connection_load_file() {
1033 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
1034 let addr = listener.local_addr().unwrap();
1035
1036 let server_handle = tokio::spawn(async move {
1037 let transport = listener.accept().await.unwrap();
1038 let session_manager = Arc::new(SessionManager::new());
1039 ReplServer::handle_connection(transport, session_manager, None).await
1040 });
1041
1042 tokio::time::sleep(Duration::from_millis(50)).await;
1043
1044 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1045
1046 let request = Request {
1048 id: MessageId::new(1),
1049 session_id: SessionId::new("test-load"),
1050 operation: Operation::CreateSession { mode: ReplMode::Lisp },
1051 };
1052 client.send_request(&request).await.unwrap();
1053 let _response = client.recv_response().await.unwrap();
1054
1055 let request = Request {
1057 id: MessageId::new(2),
1058 session_id: SessionId::new("test-load"),
1059 operation: Operation::LoadFile {
1060 path: "/nonexistent/test.lisp".to_string(),
1061 mode: ReplMode::Lisp,
1062 },
1063 };
1064 client.send_request(&request).await.unwrap();
1065 let response = client.recv_response().await.unwrap();
1066 assert!(matches!(response.result, OperationResult::Error { .. }));
1068
1069 drop(client);
1070 timeout(Duration::from_secs(1), server_handle).await.ok();
1071 }
1072
1073 #[tokio::test]
1074 async fn test_handle_connection_clone_session() {
1075 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
1076 let addr = listener.local_addr().unwrap();
1077
1078 let server_handle = tokio::spawn(async move {
1079 let transport = listener.accept().await.unwrap();
1080 let session_manager = Arc::new(SessionManager::new());
1081 ReplServer::handle_connection(transport, session_manager, None).await
1082 });
1083
1084 tokio::time::sleep(Duration::from_millis(50)).await;
1085
1086 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1087
1088 let request = Request {
1090 id: MessageId::new(1),
1091 session_id: SessionId::new("test-clone-src"),
1092 operation: Operation::CreateSession { mode: ReplMode::Lisp },
1093 };
1094 client.send_request(&request).await.unwrap();
1095 let _response = client.recv_response().await.unwrap();
1096
1097 let request = Request {
1099 id: MessageId::new(2),
1100 session_id: SessionId::new("test-clone-dst"),
1101 operation: Operation::Clone { source_session_id: SessionId::new("test-clone-src") },
1102 };
1103 client.send_request(&request).await.unwrap();
1104 let response = client.recv_response().await.unwrap();
1105 assert!(matches!(response.result, OperationResult::Success { .. }));
1106
1107 drop(client);
1108 timeout(Duration::from_secs(1), server_handle).await.ok();
1109 }
1110
1111 #[tokio::test]
1112 async fn test_handle_connection_error_response_with_metrics() {
1113 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
1114 let addr = listener.local_addr().unwrap();
1115
1116 let metrics = Arc::new(ServerMetrics::new());
1117 let metrics_clone = Arc::clone(&metrics);
1118
1119 let server_handle = tokio::spawn(async move {
1120 let transport = listener.accept().await.unwrap();
1121 let session_manager = Arc::new(SessionManager::new());
1122 ReplServer::handle_connection(transport, session_manager, Some(&metrics_clone)).await
1123 });
1124
1125 tokio::time::sleep(Duration::from_millis(50)).await;
1126
1127 let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1128
1129 let request = Request {
1131 id: MessageId::new(1),
1132 session_id: SessionId::new("nonexistent"),
1133 operation: Operation::Eval { code: "(+ 1 1)".to_string(), mode: ReplMode::Lisp },
1134 };
1135 client.send_request(&request).await.unwrap();
1136 let response = client.recv_response().await.unwrap();
1137 assert!(matches!(response.result, OperationResult::Error { .. }));
1138
1139 drop(client);
1140 timeout(Duration::from_secs(1), server_handle).await.ok();
1141 }
1142
1143 #[tokio::test]
1144 async fn test_full_server_start_and_shutdown() {
1145 let mut server = ReplServer::new("127.0.0.1:0");
1146 let shutdown_handle = server.shutdown_handle();
1147
1148 let server_task = tokio::spawn(async move { server.start().await });
1150
1151 tokio::time::sleep(Duration::from_millis(100)).await;
1153
1154 shutdown_handle.shutdown();
1156
1157 let result = timeout(Duration::from_secs(5), server_task).await;
1159 assert!(result.is_ok());
1160 }
1161
1162 #[tokio::test]
1163 async fn test_server_with_metrics_full_lifecycle() {
1164 let mut server = ReplServer::new("127.0.0.1:0")
1165 .with_metrics()
1166 .with_shutdown_timeout(Duration::from_secs(5));
1167 let shutdown_handle = server.shutdown_handle();
1168
1169 let server_task = tokio::spawn(async move { server.start().await });
1170
1171 tokio::time::sleep(Duration::from_millis(100)).await;
1172
1173 shutdown_handle.shutdown();
1174
1175 let result = timeout(Duration::from_secs(5), server_task).await;
1176 assert!(result.is_ok());
1177 }
1178
1179 #[tokio::test]
1180 async fn test_shutdown_handle_clone() {
1181 let server = ReplServer::new("127.0.0.1:0");
1182 let handle1 = server.shutdown_handle();
1183 let handle2 = handle1.clone();
1184
1185 handle1.shutdown();
1187 handle2.shutdown();
1188 }
1189
1190 #[tokio::test]
1191 async fn test_server_default_shutdown_timeout() {
1192 let server = ReplServer::new("127.0.0.1:0");
1193 assert_eq!(server.shutdown_timeout, Duration::from_secs(30));
1194 }
1195}