oxur_repl/server/
repl_server.rs

1//! REPL Server Implementation
2//!
3//! TCP-based server that accepts client connections and processes REPL requests.
4//! Manages multiple concurrent sessions with shared SessionManager.
5//!
6//! Based on ODD-0018: Oxur Remote REPL Protocol Design
7
8use crate::metrics::ServerMetrics;
9use crate::server::{MessageHandler, SessionManager};
10use crate::transport::{TcpTransport, TcpTransportListener};
11use std::io;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::broadcast;
15use tokio::task::JoinSet;
16use tokio::time::timeout;
17
18/// REPL server that accepts TCP connections and processes requests
19///
20/// Manages multiple concurrent client connections, each with independent
21/// request/response handling. All connections share the same SessionManager,
22/// allowing sessions to be accessed from multiple clients.
23///
24/// Supports graceful shutdown with configurable timeout for active connections.
25#[derive(Debug)]
26pub struct ReplServer {
27    /// Address to bind to
28    address: String,
29
30    /// Shared session manager
31    session_manager: Arc<SessionManager>,
32
33    /// Shutdown signal broadcaster
34    shutdown_tx: broadcast::Sender<()>,
35
36    /// Graceful shutdown timeout (how long to wait for connections to finish)
37    shutdown_timeout: Duration,
38
39    /// Optional server metrics for observability (Arc-wrapped for sharing across tasks)
40    metrics: Option<Arc<ServerMetrics>>,
41}
42
43/// Handle for triggering server shutdown
44///
45/// Obtained via `ReplServer::shutdown_handle()`, this handle can be used
46/// to trigger graceful server shutdown from another task.
47#[derive(Debug, Clone)]
48pub struct ShutdownHandle {
49    tx: broadcast::Sender<()>,
50}
51
52impl ShutdownHandle {
53    /// Trigger graceful server shutdown
54    ///
55    /// Sends shutdown signal to the server. The server will stop accepting
56    /// new connections and wait for active connections to complete.
57    pub fn shutdown(&self) {
58        // Ignore error if no receivers (server already stopped)
59        let _ = self.tx.send(());
60    }
61}
62
63impl ReplServer {
64    /// Create a new REPL server
65    ///
66    /// # Arguments
67    ///
68    /// * `address` - Address to bind to (e.g., "127.0.0.1:5555")
69    ///
70    /// # Example
71    ///
72    /// ```no_run
73    /// use oxur_repl::server::ReplServer;
74    ///
75    /// # async fn example() {
76    /// let server = ReplServer::new("127.0.0.1:5555");
77    /// # }
78    /// ```
79    pub fn new(address: impl Into<String>) -> Self {
80        let (shutdown_tx, _) = broadcast::channel(1);
81        Self {
82            address: address.into(),
83            session_manager: Arc::new(SessionManager::new()),
84            shutdown_tx,
85            shutdown_timeout: Duration::from_secs(30), // Default 30 second timeout
86            metrics: None,
87        }
88    }
89
90    /// Enable metrics collection for the server
91    ///
92    /// When enabled, the server records:
93    /// - Connection counts (total and active)
94    /// - Session counts (total and active)
95    /// - Request/response counts by type and status
96    ///
97    /// # Example
98    ///
99    /// ```no_run
100    /// use oxur_repl::server::ReplServer;
101    ///
102    /// # async fn example() {
103    /// let server = ReplServer::new("127.0.0.1:5555")
104    ///     .with_metrics();
105    /// # }
106    /// ```
107    pub fn with_metrics(mut self) -> Self {
108        self.metrics = Some(Arc::new(ServerMetrics::new()));
109        self
110    }
111
112    /// Set the graceful shutdown timeout
113    ///
114    /// This configures how long the server will wait for active connections
115    /// to finish when shutting down.
116    ///
117    /// # Example
118    ///
119    /// ```no_run
120    /// use oxur_repl::server::ReplServer;
121    /// use std::time::Duration;
122    ///
123    /// # async fn example() {
124    /// let mut server = ReplServer::new("127.0.0.1:5555")
125    ///     .with_shutdown_timeout(Duration::from_secs(60));
126    /// # }
127    /// ```
128    pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
129        self.shutdown_timeout = timeout;
130        self
131    }
132
133    /// Get a shutdown handle that can be used to trigger graceful shutdown
134    ///
135    /// Call `shutdown()` on the returned handle to initiate server shutdown.
136    ///
137    /// # Example
138    ///
139    /// ```no_run
140    /// use oxur_repl::server::ReplServer;
141    ///
142    /// # async fn example() {
143    /// let mut server = ReplServer::new("127.0.0.1:5555");
144    /// let shutdown_handle = server.shutdown_handle();
145    ///
146    /// // Later, from another task:
147    /// tokio::spawn(async move {
148    ///     // ... some condition ...
149    ///     shutdown_handle.shutdown();
150    /// });
151    /// # }
152    /// ```
153    pub fn shutdown_handle(&self) -> ShutdownHandle {
154        ShutdownHandle { tx: self.shutdown_tx.clone() }
155    }
156
157    /// Start the server and listen for connections
158    ///
159    /// This method blocks until the server is shut down via a shutdown signal.
160    /// When shutdown is triggered, the server will:
161    /// 1. Stop accepting new connections
162    /// 2. Wait for active connections to finish (up to shutdown_timeout)
163    /// 3. Close all sessions
164    ///
165    /// # Errors
166    ///
167    /// Returns error if:
168    /// - Failed to bind to address
169    /// - Failed to accept connections
170    ///
171    /// # Example
172    ///
173    /// ```no_run
174    /// use oxur_repl::server::ReplServer;
175    ///
176    /// # async fn example() -> std::io::Result<()> {
177    /// let mut server = ReplServer::new("127.0.0.1:5555");
178    ///
179    /// // Get shutdown handle before starting
180    /// let shutdown_handle = server.shutdown_handle();
181    ///
182    /// // Spawn server in background
183    /// let server_task = tokio::spawn(async move {
184    ///     server.start().await
185    /// });
186    ///
187    /// // Later, trigger shutdown
188    /// shutdown_handle.shutdown();
189    ///
190    /// // Wait for server to finish
191    /// server_task.await.unwrap()?;
192    /// # Ok(())
193    /// # }
194    /// ```
195    pub async fn start(&mut self) -> io::Result<()> {
196        eprintln!("[INFO] Starting REPL server on {}", self.address);
197
198        let listener = TcpTransportListener::bind(&self.address)
199            .await
200            .map_err(|e| io::Error::other(format!("{:?}", e)))?;
201        eprintln!("[INFO] Server listening on {}", self.address);
202
203        // Subscribe to shutdown signal
204        let mut shutdown_rx = self.shutdown_tx.subscribe();
205
206        // Track active connections
207        let mut active_connections = JoinSet::new();
208
209        loop {
210            tokio::select! {
211                // Wait for shutdown signal
212                _ = shutdown_rx.recv() => {
213                    eprintln!("[INFO] Shutdown signal received");
214                    break;
215                }
216
217                // Accept new connection
218                result = listener.accept() => {
219                    match result {
220                        Ok(transport) => {
221                            let peer_addr = transport
222                                .peer_addr()
223                                .map(|a| a.to_string())
224                                .unwrap_or_else(|_| "unknown".to_string());
225
226                            eprintln!("[INFO] Accepted connection from {}", peer_addr);
227
228                            // Record connection accepted metric
229                            if let Some(ref metrics) = self.metrics {
230                                metrics.connection_accepted();
231                            }
232
233                            // Spawn handler for this connection
234                            let session_manager = Arc::clone(&self.session_manager);
235                            let mut shutdown_rx = self.shutdown_tx.subscribe();
236                            let metrics = self.metrics.as_ref().map(Arc::clone);
237
238                            active_connections.spawn(async move {
239                                // Run connection handler, but cancel if shutdown is triggered
240                                tokio::select! {
241                                    result = Self::handle_connection(transport, session_manager, metrics.as_deref()) => {
242                                        match result {
243                                            Ok(_) => {
244                                                eprintln!("[INFO] Connection closed ({})", peer_addr);
245                                            }
246                                            Err(e) => {
247                                                eprintln!("[ERROR] Connection error ({}): {}", peer_addr, e);
248                                            }
249                                        }
250                                    }
251                                    _ = shutdown_rx.recv() => {
252                                        eprintln!("[INFO] Connection interrupted by shutdown ({})", peer_addr);
253                                    }
254                                }
255                                // Record connection closed metric
256                                if let Some(ref metrics) = metrics {
257                                    metrics.connection_closed();
258                                }
259                            });
260                        }
261                        Err(e) => {
262                            eprintln!("[ERROR] Failed to accept connection: {}", e);
263                            // Continue accepting other connections
264                        }
265                    }
266                }
267            }
268        }
269
270        // Shutdown initiated - wait for active connections to finish
271        eprintln!(
272            "[INFO] Waiting for {} active connections to finish (timeout: {:?})",
273            active_connections.len(),
274            self.shutdown_timeout
275        );
276
277        // Wait for all connections with timeout
278        match timeout(self.shutdown_timeout, async {
279            while active_connections.join_next().await.is_some() {
280                // Just wait for all tasks to finish
281            }
282        })
283        .await
284        {
285            Ok(_) => {
286                eprintln!("[INFO] All connections closed gracefully");
287            }
288            Err(_) => {
289                eprintln!(
290                    "[WARN] Shutdown timeout reached, {} connections still active",
291                    active_connections.len()
292                );
293                // Abort remaining tasks
294                active_connections.shutdown().await;
295            }
296        }
297
298        // Close all sessions
299        if let Ok(count) = self.session_manager.close_all() {
300            eprintln!("[INFO] Closed {} sessions", count);
301        }
302
303        eprintln!("[INFO] Server shutdown complete");
304        Ok(())
305    }
306
307    /// Handle a single client connection
308    ///
309    /// Reads requests from the transport, processes them via MessageHandler,
310    /// and writes responses back.
311    async fn handle_connection(
312        mut transport: TcpTransport,
313        session_manager: Arc<SessionManager>,
314        metrics: Option<&ServerMetrics>,
315    ) -> io::Result<()> {
316        use crate::protocol::{Operation, OperationResult};
317        use crate::transport::{Transport, TransportError};
318
319        let handler = MessageHandler::new((*session_manager).clone());
320
321        loop {
322            // Read request using Transport trait
323            let request = match transport.recv_request().await {
324                Ok(req) => req,
325                Err(TransportError::ConnectionClosed) => {
326                    // Clean connection close - not an error
327                    eprintln!("[INFO] Connection closed cleanly");
328                    return Ok(());
329                }
330                Err(e) => {
331                    // Other errors are actual errors
332                    eprintln!("[ERROR] Failed to read request: {:?}", e);
333                    return Err(io::Error::other(format!("{:?}", e)));
334                }
335            };
336
337            // Record request metric
338            if let Some(m) = metrics {
339                let operation_name = match &request.operation {
340                    Operation::CreateSession { .. } => "create_session",
341                    Operation::Clone { .. } => "clone",
342                    Operation::Eval { .. } => "eval",
343                    Operation::Close => "close",
344                    Operation::LsSessions => "ls_sessions",
345                    Operation::LoadFile { .. } => "load_file",
346                    Operation::Interrupt => "interrupt",
347                    Operation::Describe { .. } => "describe",
348                    Operation::History { .. } => "history",
349                    Operation::ClearOutput => "clear_output",
350                    Operation::GetServerStats => "get_server_stats",
351                    Operation::GetSessionStats => "get_session_stats",
352                    Operation::GetSubprocessStats => "get_subprocess_stats",
353                    Operation::GetSystemInfo => "get_system_info",
354                };
355                m.request_received(operation_name);
356
357                // Track session creation/close for session metrics
358                match &request.operation {
359                    Operation::CreateSession { .. } => m.session_created(),
360                    Operation::Close => m.session_closed(),
361                    _ => {}
362                }
363            }
364
365            // Process request
366            let response = handler.handle(request).await;
367
368            // Record response metric
369            if let Some(m) = metrics {
370                let status = match &response.result {
371                    OperationResult::Success { .. } => "success",
372                    OperationResult::Error { .. } => "error",
373                    OperationResult::Sessions { .. } => "success",
374                    OperationResult::HistoryEntries { .. } => "success",
375                    OperationResult::ServerStats { .. } => "success",
376                    OperationResult::SessionStats { .. } => "success",
377                    OperationResult::SubprocessStats { .. } => "success",
378                    OperationResult::SystemInfo { .. } => "success",
379                };
380                m.response_sent(status);
381            }
382
383            // Write response using Transport trait
384            if let Err(e) = transport.send_response(&response).await {
385                match e {
386                    TransportError::ConnectionClosed => {
387                        // Client closed connection after we processed request - that's OK
388                        eprintln!("[INFO] Connection closed after response");
389                        return Ok(());
390                    }
391                    _ => {
392                        eprintln!("[ERROR] Failed to write response: {:?}", e);
393                        return Err(io::Error::other(format!("{:?}", e)));
394                    }
395                }
396            }
397        }
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::protocol::{MessageId, Operation, OperationResult, ReplMode, Request, SessionId};
405    use crate::transport::Transport;
406    use std::time::Duration;
407    use tokio::time::timeout;
408
409    #[tokio::test]
410    async fn test_server_creation() {
411        let server = ReplServer::new("127.0.0.1:0");
412        assert_eq!(server.address, "127.0.0.1:0");
413    }
414
415    #[tokio::test]
416    async fn test_server_start_and_connect() {
417        // Start server on random port
418        let _server = ReplServer::new("127.0.0.1:0");
419
420        // Get the actual bound address
421        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
422        let addr = listener.local_addr().unwrap();
423
424        // Spawn server in background
425        let server_handle = tokio::spawn(async move {
426            // Accept one connection then stop
427            let transport = listener.accept().await.unwrap();
428            let session_manager = Arc::new(SessionManager::new());
429            ReplServer::handle_connection(transport, session_manager, None).await.unwrap();
430        });
431
432        // Give server time to start
433        tokio::time::sleep(Duration::from_millis(100)).await;
434
435        // Connect client
436        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
437
438        // Send a request
439        let request = Request {
440            id: MessageId::new(1),
441            session_id: SessionId::new("test"),
442            operation: Operation::CreateSession { mode: ReplMode::Lisp },
443        };
444
445        client.send_request(&request).await.unwrap();
446
447        // Read response
448        let response = client.recv_response().await.unwrap();
449
450        assert_eq!(response.request_id, MessageId::new(1));
451        assert!(matches!(response.result, OperationResult::Success { .. }));
452
453        // Close client connection
454        drop(client);
455
456        // Wait for server to finish
457        timeout(Duration::from_secs(1), server_handle).await.unwrap().unwrap();
458    }
459
460    #[tokio::test]
461    async fn test_multiple_requests_same_connection() {
462        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
463        let addr = listener.local_addr().unwrap();
464
465        // Spawn server
466        let server_handle = tokio::spawn(async move {
467            let transport = listener.accept().await.unwrap();
468            let session_manager = Arc::new(SessionManager::new());
469            ReplServer::handle_connection(transport, session_manager, None).await.ok();
470        });
471
472        tokio::time::sleep(Duration::from_millis(50)).await;
473
474        // Connect client
475        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
476
477        // Request 1: Create session
478        let req1 = Request {
479            id: MessageId::new(1),
480            session_id: SessionId::new("test"),
481            operation: Operation::CreateSession { mode: ReplMode::Lisp },
482        };
483
484        client.send_request(&req1).await.unwrap();
485        let resp1 = client.recv_response().await.unwrap();
486        assert_eq!(resp1.request_id, MessageId::new(1));
487
488        // Request 2: Eval
489        let req2 = Request {
490            id: MessageId::new(2),
491            session_id: SessionId::new("test"),
492            operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
493        };
494
495        client.send_request(&req2).await.unwrap();
496        let resp2 = client.recv_response().await.unwrap();
497        assert_eq!(resp2.request_id, MessageId::new(2));
498
499        if let OperationResult::Success { value, .. } = resp2.result {
500            assert_eq!(value, Some("3".to_string()));
501        } else {
502            panic!("Expected success response");
503        }
504
505        // Request 3: Close session
506        let req3 = Request {
507            id: MessageId::new(3),
508            session_id: SessionId::new("test"),
509            operation: Operation::Close,
510        };
511
512        client.send_request(&req3).await.unwrap();
513        let resp3 = client.recv_response().await.unwrap();
514        assert_eq!(resp3.request_id, MessageId::new(3));
515
516        drop(client);
517        timeout(Duration::from_secs(1), server_handle).await.ok();
518    }
519
520    #[tokio::test]
521    async fn test_concurrent_connections() {
522        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
523        let addr = listener.local_addr().unwrap();
524
525        let session_manager = Arc::new(SessionManager::new());
526
527        // Spawn server that handles 2 connections
528        let sm = Arc::clone(&session_manager);
529        let server_handle = tokio::spawn(async move {
530            // Accept and handle first connection
531            let transport1 = listener.accept().await.unwrap();
532            let sm1 = Arc::clone(&sm);
533            tokio::spawn(async move {
534                ReplServer::handle_connection(transport1, sm1, None).await.ok();
535            });
536
537            // Accept and handle second connection
538            let transport2 = listener.accept().await.unwrap();
539            ReplServer::handle_connection(transport2, sm, None).await.ok();
540        });
541
542        tokio::time::sleep(Duration::from_millis(50)).await;
543
544        // Client 1
545        let mut client1 = TcpTransport::connect(addr.to_string()).await.unwrap();
546        let req1 = Request {
547            id: MessageId::new(1),
548            session_id: SessionId::new("session-1"),
549            operation: Operation::CreateSession { mode: ReplMode::Lisp },
550        };
551        client1.send_request(&req1).await.unwrap();
552        let resp1 = client1.recv_response().await.unwrap();
553        assert_eq!(resp1.request_id, MessageId::new(1));
554
555        // Client 2
556        let mut client2 = TcpTransport::connect(addr.to_string()).await.unwrap();
557        let req2 = Request {
558            id: MessageId::new(1),
559            session_id: SessionId::new("session-2"),
560            operation: Operation::CreateSession { mode: ReplMode::Sexpr },
561        };
562        client2.send_request(&req2).await.unwrap();
563        let resp2 = client2.recv_response().await.unwrap();
564        assert_eq!(resp2.request_id, MessageId::new(1));
565
566        // Both sessions should exist
567        assert_eq!(session_manager.count().unwrap(), 2);
568
569        drop(client1);
570        drop(client2);
571
572        timeout(Duration::from_secs(1), server_handle).await.ok();
573    }
574
575    #[tokio::test]
576    async fn test_connection_error_handling() {
577        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
578        let addr = listener.local_addr().unwrap();
579
580        let server_handle = tokio::spawn(async move {
581            let transport = listener.accept().await.unwrap();
582            let session_manager = Arc::new(SessionManager::new());
583            ReplServer::handle_connection(transport, session_manager, None).await
584        });
585
586        tokio::time::sleep(Duration::from_millis(50)).await;
587
588        // Connect and immediately close without sending anything
589        let client = TcpTransport::connect(addr.to_string()).await.unwrap();
590        drop(client);
591
592        // Server should handle this gracefully (UnexpectedEof)
593        let result = timeout(Duration::from_secs(1), server_handle).await.unwrap().unwrap();
594
595        assert!(result.is_ok()); // Should return Ok(()) for clean close
596    }
597
598    #[tokio::test]
599    async fn test_shutdown_handle_functionality() {
600        let server = ReplServer::new("127.0.0.1:5555");
601        let handle1 = server.shutdown_handle();
602        let handle2 = handle1.clone();
603
604        // Verify handles can be cloned
605        // Just trigger shutdown - no server is running, so this just tests the API
606        handle2.shutdown();
607
608        // This is a smoke test - the actual shutdown functionality is tested
609        // by the other tests that run the full server
610    }
611
612    #[tokio::test]
613    async fn test_with_shutdown_timeout() {
614        let server =
615            ReplServer::new("127.0.0.1:5555").with_shutdown_timeout(Duration::from_secs(60));
616
617        assert_eq!(server.shutdown_timeout, Duration::from_secs(60));
618    }
619
620    #[tokio::test]
621    async fn test_with_metrics() {
622        let server = ReplServer::new("127.0.0.1:0").with_metrics();
623        assert!(server.metrics.is_some());
624    }
625
626    #[tokio::test]
627    async fn test_server_builder_chain() {
628        let server = ReplServer::new("127.0.0.1:0")
629            .with_metrics()
630            .with_shutdown_timeout(Duration::from_secs(45));
631
632        assert!(server.metrics.is_some());
633        assert_eq!(server.shutdown_timeout, Duration::from_secs(45));
634    }
635
636    #[tokio::test]
637    async fn test_handle_connection_with_metrics() {
638        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
639        let addr = listener.local_addr().unwrap();
640
641        let metrics = Arc::new(ServerMetrics::new());
642        let metrics_clone = Arc::clone(&metrics);
643
644        // Spawn server with metrics
645        let server_handle = tokio::spawn(async move {
646            let transport = listener.accept().await.unwrap();
647            let session_manager = Arc::new(SessionManager::new());
648            ReplServer::handle_connection(transport, session_manager, Some(&metrics_clone)).await
649        });
650
651        tokio::time::sleep(Duration::from_millis(50)).await;
652
653        // Connect client
654        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
655
656        // Create session request
657        let request = Request {
658            id: MessageId::new(1),
659            session_id: SessionId::new("test-metrics"),
660            operation: Operation::CreateSession { mode: ReplMode::Lisp },
661        };
662        client.send_request(&request).await.unwrap();
663        let response = client.recv_response().await.unwrap();
664        assert!(matches!(response.result, OperationResult::Success { .. }));
665
666        // Eval request
667        let request = Request {
668            id: MessageId::new(2),
669            session_id: SessionId::new("test-metrics"),
670            operation: Operation::Eval { code: "(+ 1 1)".to_string(), mode: ReplMode::Lisp },
671        };
672        client.send_request(&request).await.unwrap();
673        let _response = client.recv_response().await.unwrap();
674
675        // Close session request - triggers session_closed metric
676        let request = Request {
677            id: MessageId::new(3),
678            session_id: SessionId::new("test-metrics"),
679            operation: Operation::Close,
680        };
681        client.send_request(&request).await.unwrap();
682        let _response = client.recv_response().await.unwrap();
683
684        drop(client);
685        timeout(Duration::from_secs(1), server_handle).await.ok();
686    }
687
688    #[tokio::test]
689    async fn test_handle_connection_ls_sessions() {
690        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
691        let addr = listener.local_addr().unwrap();
692
693        let server_handle = tokio::spawn(async move {
694            let transport = listener.accept().await.unwrap();
695            let session_manager = Arc::new(SessionManager::new());
696            ReplServer::handle_connection(transport, session_manager, None).await
697        });
698
699        tokio::time::sleep(Duration::from_millis(50)).await;
700
701        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
702
703        // LsSessions request
704        let request = Request {
705            id: MessageId::new(1),
706            session_id: SessionId::new("unused"),
707            operation: Operation::LsSessions,
708        };
709        client.send_request(&request).await.unwrap();
710        let response = client.recv_response().await.unwrap();
711        assert!(matches!(response.result, OperationResult::Sessions { .. }));
712
713        drop(client);
714        timeout(Duration::from_secs(1), server_handle).await.ok();
715    }
716
717    #[tokio::test]
718    async fn test_handle_connection_history() {
719        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
720        let addr = listener.local_addr().unwrap();
721
722        let server_handle = tokio::spawn(async move {
723            let transport = listener.accept().await.unwrap();
724            let session_manager = Arc::new(SessionManager::new());
725            ReplServer::handle_connection(transport, session_manager, None).await
726        });
727
728        tokio::time::sleep(Duration::from_millis(50)).await;
729
730        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
731
732        // Create session first
733        let request = Request {
734            id: MessageId::new(1),
735            session_id: SessionId::new("test-history"),
736            operation: Operation::CreateSession { mode: ReplMode::Lisp },
737        };
738        client.send_request(&request).await.unwrap();
739        let _response = client.recv_response().await.unwrap();
740
741        // History request
742        let request = Request {
743            id: MessageId::new(2),
744            session_id: SessionId::new("test-history"),
745            operation: Operation::History { limit: Some(10) },
746        };
747        client.send_request(&request).await.unwrap();
748        let response = client.recv_response().await.unwrap();
749        // History may return entries or error depending on session state
750        assert!(matches!(
751            response.result,
752            OperationResult::HistoryEntries { .. } | OperationResult::Error { .. }
753        ));
754
755        drop(client);
756        timeout(Duration::from_secs(1), server_handle).await.ok();
757    }
758
759    #[tokio::test]
760    async fn test_handle_connection_get_server_stats() {
761        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
762        let addr = listener.local_addr().unwrap();
763
764        let server_handle = tokio::spawn(async move {
765            let transport = listener.accept().await.unwrap();
766            let session_manager = Arc::new(SessionManager::new());
767            ReplServer::handle_connection(transport, session_manager, None).await
768        });
769
770        tokio::time::sleep(Duration::from_millis(50)).await;
771
772        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
773
774        // GetServerStats request
775        let request = Request {
776            id: MessageId::new(1),
777            session_id: SessionId::new("unused"),
778            operation: Operation::GetServerStats,
779        };
780        client.send_request(&request).await.unwrap();
781        let response = client.recv_response().await.unwrap();
782        // ServerStats may return stats or error depending on state
783        assert!(matches!(
784            response.result,
785            OperationResult::ServerStats { .. } | OperationResult::Error { .. }
786        ));
787
788        drop(client);
789        timeout(Duration::from_secs(1), server_handle).await.ok();
790    }
791
792    #[tokio::test]
793    async fn test_handle_connection_get_session_stats() {
794        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
795        let addr = listener.local_addr().unwrap();
796
797        let server_handle = tokio::spawn(async move {
798            let transport = listener.accept().await.unwrap();
799            let session_manager = Arc::new(SessionManager::new());
800            ReplServer::handle_connection(transport, session_manager, None).await
801        });
802
803        tokio::time::sleep(Duration::from_millis(50)).await;
804
805        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
806
807        // Create session first
808        let request = Request {
809            id: MessageId::new(1),
810            session_id: SessionId::new("test-stats"),
811            operation: Operation::CreateSession { mode: ReplMode::Lisp },
812        };
813        client.send_request(&request).await.unwrap();
814        let _response = client.recv_response().await.unwrap();
815
816        // GetSessionStats request
817        let request = Request {
818            id: MessageId::new(2),
819            session_id: SessionId::new("test-stats"),
820            operation: Operation::GetSessionStats,
821        };
822        client.send_request(&request).await.unwrap();
823        let response = client.recv_response().await.unwrap();
824        assert!(matches!(response.result, OperationResult::SessionStats { .. }));
825
826        drop(client);
827        timeout(Duration::from_secs(1), server_handle).await.ok();
828    }
829
830    #[tokio::test]
831    async fn test_handle_connection_get_subprocess_stats() {
832        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
833        let addr = listener.local_addr().unwrap();
834
835        let server_handle = tokio::spawn(async move {
836            let transport = listener.accept().await.unwrap();
837            let session_manager = Arc::new(SessionManager::new());
838            ReplServer::handle_connection(transport, session_manager, None).await
839        });
840
841        tokio::time::sleep(Duration::from_millis(50)).await;
842
843        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
844
845        // Create session first
846        let request = Request {
847            id: MessageId::new(1),
848            session_id: SessionId::new("test-sub-stats"),
849            operation: Operation::CreateSession { mode: ReplMode::Lisp },
850        };
851        client.send_request(&request).await.unwrap();
852        let _response = client.recv_response().await.unwrap();
853
854        // GetSubprocessStats request
855        let request = Request {
856            id: MessageId::new(2),
857            session_id: SessionId::new("test-sub-stats"),
858            operation: Operation::GetSubprocessStats,
859        };
860        client.send_request(&request).await.unwrap();
861        let response = client.recv_response().await.unwrap();
862        // SubprocessStats may return stats or error depending on subprocess state
863        assert!(matches!(
864            response.result,
865            OperationResult::SubprocessStats { .. } | OperationResult::Error { .. }
866        ));
867
868        drop(client);
869        timeout(Duration::from_secs(1), server_handle).await.ok();
870    }
871
872    #[tokio::test]
873    async fn test_handle_connection_get_system_info() {
874        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
875        let addr = listener.local_addr().unwrap();
876
877        let server_handle = tokio::spawn(async move {
878            let transport = listener.accept().await.unwrap();
879            let session_manager = Arc::new(SessionManager::new());
880            ReplServer::handle_connection(transport, session_manager, None).await
881        });
882
883        tokio::time::sleep(Duration::from_millis(50)).await;
884
885        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
886
887        // GetSystemInfo request
888        let request = Request {
889            id: MessageId::new(1),
890            session_id: SessionId::new("unused"),
891            operation: Operation::GetSystemInfo,
892        };
893        client.send_request(&request).await.unwrap();
894        let response = client.recv_response().await.unwrap();
895        // SystemInfo may return info or error
896        assert!(matches!(
897            response.result,
898            OperationResult::SystemInfo { .. } | OperationResult::Error { .. }
899        ));
900
901        drop(client);
902        timeout(Duration::from_secs(1), server_handle).await.ok();
903    }
904
905    #[tokio::test]
906    async fn test_handle_connection_clear_output() {
907        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
908        let addr = listener.local_addr().unwrap();
909
910        let server_handle = tokio::spawn(async move {
911            let transport = listener.accept().await.unwrap();
912            let session_manager = Arc::new(SessionManager::new());
913            ReplServer::handle_connection(transport, session_manager, None).await
914        });
915
916        tokio::time::sleep(Duration::from_millis(50)).await;
917
918        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
919
920        // Create session first
921        let request = Request {
922            id: MessageId::new(1),
923            session_id: SessionId::new("test-clear"),
924            operation: Operation::CreateSession { mode: ReplMode::Lisp },
925        };
926        client.send_request(&request).await.unwrap();
927        let _response = client.recv_response().await.unwrap();
928
929        // ClearOutput request
930        let request = Request {
931            id: MessageId::new(2),
932            session_id: SessionId::new("test-clear"),
933            operation: Operation::ClearOutput,
934        };
935        client.send_request(&request).await.unwrap();
936        let response = client.recv_response().await.unwrap();
937        // ClearOutput may return success or error depending on session state
938        assert!(matches!(
939            response.result,
940            OperationResult::Success { .. } | OperationResult::Error { .. }
941        ));
942
943        drop(client);
944        timeout(Duration::from_secs(1), server_handle).await.ok();
945    }
946
947    #[tokio::test]
948    async fn test_handle_connection_interrupt() {
949        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
950        let addr = listener.local_addr().unwrap();
951
952        let server_handle = tokio::spawn(async move {
953            let transport = listener.accept().await.unwrap();
954            let session_manager = Arc::new(SessionManager::new());
955            ReplServer::handle_connection(transport, session_manager, None).await
956        });
957
958        tokio::time::sleep(Duration::from_millis(50)).await;
959
960        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
961
962        // Create session first
963        let request = Request {
964            id: MessageId::new(1),
965            session_id: SessionId::new("test-interrupt"),
966            operation: Operation::CreateSession { mode: ReplMode::Lisp },
967        };
968        client.send_request(&request).await.unwrap();
969        let _response = client.recv_response().await.unwrap();
970
971        // Interrupt request
972        let request = Request {
973            id: MessageId::new(2),
974            session_id: SessionId::new("test-interrupt"),
975            operation: Operation::Interrupt,
976        };
977        client.send_request(&request).await.unwrap();
978        let response = client.recv_response().await.unwrap();
979        // Interrupt may return success or error depending on state
980        assert!(matches!(
981            response.result,
982            OperationResult::Success { .. } | OperationResult::Error { .. }
983        ));
984
985        drop(client);
986        timeout(Duration::from_secs(1), server_handle).await.ok();
987    }
988
989    #[tokio::test]
990    async fn test_handle_connection_describe() {
991        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
992        let addr = listener.local_addr().unwrap();
993
994        let server_handle = tokio::spawn(async move {
995            let transport = listener.accept().await.unwrap();
996            let session_manager = Arc::new(SessionManager::new());
997            ReplServer::handle_connection(transport, session_manager, None).await
998        });
999
1000        tokio::time::sleep(Duration::from_millis(50)).await;
1001
1002        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1003
1004        // Create session first
1005        let request = Request {
1006            id: MessageId::new(1),
1007            session_id: SessionId::new("test-describe"),
1008            operation: Operation::CreateSession { mode: ReplMode::Lisp },
1009        };
1010        client.send_request(&request).await.unwrap();
1011        let _response = client.recv_response().await.unwrap();
1012
1013        // Describe request
1014        let request = Request {
1015            id: MessageId::new(2),
1016            session_id: SessionId::new("test-describe"),
1017            operation: Operation::Describe { symbol: "test".to_string() },
1018        };
1019        client.send_request(&request).await.unwrap();
1020        let response = client.recv_response().await.unwrap();
1021        // Response can be success or error depending on what's describable
1022        assert!(matches!(
1023            response.result,
1024            OperationResult::Success { .. } | OperationResult::Error { .. }
1025        ));
1026
1027        drop(client);
1028        timeout(Duration::from_secs(1), server_handle).await.ok();
1029    }
1030
1031    #[tokio::test]
1032    async fn test_handle_connection_load_file() {
1033        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
1034        let addr = listener.local_addr().unwrap();
1035
1036        let server_handle = tokio::spawn(async move {
1037            let transport = listener.accept().await.unwrap();
1038            let session_manager = Arc::new(SessionManager::new());
1039            ReplServer::handle_connection(transport, session_manager, None).await
1040        });
1041
1042        tokio::time::sleep(Duration::from_millis(50)).await;
1043
1044        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1045
1046        // Create session first
1047        let request = Request {
1048            id: MessageId::new(1),
1049            session_id: SessionId::new("test-load"),
1050            operation: Operation::CreateSession { mode: ReplMode::Lisp },
1051        };
1052        client.send_request(&request).await.unwrap();
1053        let _response = client.recv_response().await.unwrap();
1054
1055        // LoadFile request - file doesn't need to exist, testing the path
1056        let request = Request {
1057            id: MessageId::new(2),
1058            session_id: SessionId::new("test-load"),
1059            operation: Operation::LoadFile {
1060                path: "/nonexistent/test.lisp".to_string(),
1061                mode: ReplMode::Lisp,
1062            },
1063        };
1064        client.send_request(&request).await.unwrap();
1065        let response = client.recv_response().await.unwrap();
1066        // Should return error for nonexistent file
1067        assert!(matches!(response.result, OperationResult::Error { .. }));
1068
1069        drop(client);
1070        timeout(Duration::from_secs(1), server_handle).await.ok();
1071    }
1072
1073    #[tokio::test]
1074    async fn test_handle_connection_clone_session() {
1075        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
1076        let addr = listener.local_addr().unwrap();
1077
1078        let server_handle = tokio::spawn(async move {
1079            let transport = listener.accept().await.unwrap();
1080            let session_manager = Arc::new(SessionManager::new());
1081            ReplServer::handle_connection(transport, session_manager, None).await
1082        });
1083
1084        tokio::time::sleep(Duration::from_millis(50)).await;
1085
1086        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1087
1088        // Create session first
1089        let request = Request {
1090            id: MessageId::new(1),
1091            session_id: SessionId::new("test-clone-src"),
1092            operation: Operation::CreateSession { mode: ReplMode::Lisp },
1093        };
1094        client.send_request(&request).await.unwrap();
1095        let _response = client.recv_response().await.unwrap();
1096
1097        // Clone request - clone FROM the source session (session_id is the target)
1098        let request = Request {
1099            id: MessageId::new(2),
1100            session_id: SessionId::new("test-clone-dst"),
1101            operation: Operation::Clone { source_session_id: SessionId::new("test-clone-src") },
1102        };
1103        client.send_request(&request).await.unwrap();
1104        let response = client.recv_response().await.unwrap();
1105        assert!(matches!(response.result, OperationResult::Success { .. }));
1106
1107        drop(client);
1108        timeout(Duration::from_secs(1), server_handle).await.ok();
1109    }
1110
1111    #[tokio::test]
1112    async fn test_handle_connection_error_response_with_metrics() {
1113        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
1114        let addr = listener.local_addr().unwrap();
1115
1116        let metrics = Arc::new(ServerMetrics::new());
1117        let metrics_clone = Arc::clone(&metrics);
1118
1119        let server_handle = tokio::spawn(async move {
1120            let transport = listener.accept().await.unwrap();
1121            let session_manager = Arc::new(SessionManager::new());
1122            ReplServer::handle_connection(transport, session_manager, Some(&metrics_clone)).await
1123        });
1124
1125        tokio::time::sleep(Duration::from_millis(50)).await;
1126
1127        let mut client = TcpTransport::connect(addr.to_string()).await.unwrap();
1128
1129        // Try to eval on non-existent session - should get error response
1130        let request = Request {
1131            id: MessageId::new(1),
1132            session_id: SessionId::new("nonexistent"),
1133            operation: Operation::Eval { code: "(+ 1 1)".to_string(), mode: ReplMode::Lisp },
1134        };
1135        client.send_request(&request).await.unwrap();
1136        let response = client.recv_response().await.unwrap();
1137        assert!(matches!(response.result, OperationResult::Error { .. }));
1138
1139        drop(client);
1140        timeout(Duration::from_secs(1), server_handle).await.ok();
1141    }
1142
1143    #[tokio::test]
1144    async fn test_full_server_start_and_shutdown() {
1145        let mut server = ReplServer::new("127.0.0.1:0");
1146        let shutdown_handle = server.shutdown_handle();
1147
1148        // Start server in background
1149        let server_task = tokio::spawn(async move { server.start().await });
1150
1151        // Give server time to start
1152        tokio::time::sleep(Duration::from_millis(100)).await;
1153
1154        // Trigger shutdown
1155        shutdown_handle.shutdown();
1156
1157        // Wait for server to finish
1158        let result = timeout(Duration::from_secs(5), server_task).await;
1159        assert!(result.is_ok());
1160    }
1161
1162    #[tokio::test]
1163    async fn test_server_with_metrics_full_lifecycle() {
1164        let mut server = ReplServer::new("127.0.0.1:0")
1165            .with_metrics()
1166            .with_shutdown_timeout(Duration::from_secs(5));
1167        let shutdown_handle = server.shutdown_handle();
1168
1169        let server_task = tokio::spawn(async move { server.start().await });
1170
1171        tokio::time::sleep(Duration::from_millis(100)).await;
1172
1173        shutdown_handle.shutdown();
1174
1175        let result = timeout(Duration::from_secs(5), server_task).await;
1176        assert!(result.is_ok());
1177    }
1178
1179    #[tokio::test]
1180    async fn test_shutdown_handle_clone() {
1181        let server = ReplServer::new("127.0.0.1:0");
1182        let handle1 = server.shutdown_handle();
1183        let handle2 = handle1.clone();
1184
1185        // Both handles should be functional
1186        handle1.shutdown();
1187        handle2.shutdown();
1188    }
1189
1190    #[tokio::test]
1191    async fn test_server_default_shutdown_timeout() {
1192        let server = ReplServer::new("127.0.0.1:0");
1193        assert_eq!(server.shutdown_timeout, Duration::from_secs(30));
1194    }
1195}