1use std::sync::Arc;
9
10use crate::metadata::SystemMetadata;
11use crate::metrics::ServerMetrics;
12use crate::protocol::{
13 ErrorInfo, ErrorKind, Operation, OperationResult, Request, Response, SessionId, SessionInfo,
14 Status,
15};
16use crate::server::{SessionError, SessionManager};
17
18#[derive(Debug)]
23pub struct MessageHandler {
24 session_manager: SessionManager,
26 server_metrics: Option<Arc<ServerMetrics>>,
28 system_metadata: Option<Arc<SystemMetadata>>,
30}
31
32impl MessageHandler {
33 pub fn new(session_manager: SessionManager) -> Self {
35 Self { session_manager, server_metrics: None, system_metadata: None }
36 }
37
38 pub fn with_metrics(session_manager: SessionManager, metrics: Arc<ServerMetrics>) -> Self {
40 Self { session_manager, server_metrics: Some(metrics), system_metadata: None }
41 }
42
43 pub fn with_metadata(session_manager: SessionManager, metadata: Arc<SystemMetadata>) -> Self {
45 Self { session_manager, server_metrics: None, system_metadata: Some(metadata) }
46 }
47
48 pub fn with_metrics_and_metadata(
50 session_manager: SessionManager,
51 metrics: Arc<ServerMetrics>,
52 metadata: Arc<SystemMetadata>,
53 ) -> Self {
54 Self { session_manager, server_metrics: Some(metrics), system_metadata: Some(metadata) }
55 }
56
57 pub async fn handle(&self, request: Request) -> Response {
81 let result = match &request.operation {
82 Operation::CreateSession { mode } => {
83 self.handle_create_session(&request.session_id, *mode)
84 }
85
86 Operation::Clone { source_session_id } => {
87 self.handle_clone_session(source_session_id, &request.session_id)
88 }
89
90 Operation::Eval { code, mode } => {
91 self.handle_eval(&request.session_id, code, *mode).await
92 }
93
94 Operation::Close => self.handle_close(&request.session_id),
95
96 Operation::LsSessions => self.handle_list_sessions(),
97
98 Operation::GetServerStats => self.handle_get_server_stats(),
99
100 Operation::GetSessionStats => self.handle_get_session_stats(&request.session_id),
101
102 Operation::GetSubprocessStats => self.handle_get_subprocess_stats(&request.session_id),
103
104 Operation::GetSystemInfo => self.handle_get_system_info(),
105
106 Operation::LoadFile { .. }
108 | Operation::Interrupt
109 | Operation::Describe { .. }
110 | Operation::History { .. }
111 | Operation::ClearOutput => OperationResult::Error {
112 error: ErrorInfo {
113 kind: ErrorKind::InvalidRequest,
114 message: "Operation not yet implemented".to_string(),
115 location: None,
116 details: None,
117 },
118 stdout: None,
119 stderr: None,
120 },
121 };
122
123 Response { request_id: request.id, session_id: request.session_id, result }
124 }
125
126 fn handle_create_session(
128 &self,
129 session_id: &SessionId,
130 mode: crate::protocol::ReplMode,
131 ) -> OperationResult {
132 match self.session_manager.create(session_id.clone(), mode) {
133 Ok(_) => OperationResult::Success {
134 status: Status { tier: 0, cached: false, duration_ms: 0 },
135 value: Some(format!("Session {} created", session_id)),
136 stdout: None,
137 stderr: None,
138 },
139 Err(e) => self.error_result(e),
140 }
141 }
142
143 fn handle_clone_session(
145 &self,
146 source_id: &SessionId,
147 target_id: &SessionId,
148 ) -> OperationResult {
149 match self.session_manager.clone_session(source_id, target_id.clone()) {
150 Ok(_) => OperationResult::Success {
151 status: Status { tier: 0, cached: false, duration_ms: 0 },
152 value: Some(format!("Session {} cloned to {}", source_id, target_id)),
153 stdout: None,
154 stderr: None,
155 },
156 Err(e) => self.error_result(e),
157 }
158 }
159
160 async fn handle_eval(
162 &self,
163 session_id: &SessionId,
164 code: &str,
165 _mode: crate::protocol::ReplMode,
166 ) -> OperationResult {
167 match self.session_manager.eval(session_id, code).await {
169 Ok(result) => {
170 let tier_num = match result.tier {
172 crate::eval::ExecutionTier::Calculator => 1,
173 crate::eval::ExecutionTier::CachedLoaded => 2,
174 crate::eval::ExecutionTier::JustInTime => 3,
175 };
176
177 OperationResult::Success {
178 status: Status {
179 tier: tier_num,
180 cached: result.cached, duration_ms: result.duration_ms,
182 },
183 value: Some(result.value),
184 stdout: result.stdout,
185 stderr: result.stderr,
186 }
187 }
188 Err(e) => self.error_result(e),
189 }
190 }
191
192 fn handle_close(&self, session_id: &SessionId) -> OperationResult {
194 match self.session_manager.close(session_id) {
195 Ok(_) => OperationResult::Success {
196 status: Status { tier: 0, cached: false, duration_ms: 0 },
197 value: Some(format!("Session {} closed", session_id)),
198 stdout: None,
199 stderr: None,
200 },
201 Err(e) => self.error_result(e),
202 }
203 }
204
205 fn handle_list_sessions(&self) -> OperationResult {
207 match self.session_manager.list() {
208 Ok(sessions) => {
209 let session_infos = sessions
211 .into_iter()
212 .map(|s| SessionInfo {
213 id: s.id,
214 name: s.name,
215 mode: s.mode,
216 eval_count: s.eval_count,
217 created_at: s.created_at,
218 last_active_at: s.last_active_at,
219 timeout_ms: s.timeout_ms,
220 })
221 .collect();
222
223 OperationResult::Sessions { sessions: session_infos }
224 }
225 Err(e) => self.error_result(e),
226 }
227 }
228
229 fn handle_get_server_stats(&self) -> OperationResult {
231 match &self.server_metrics {
232 Some(metrics) => {
233 let snapshot = metrics.snapshot();
234 OperationResult::ServerStats { snapshot }
235 }
236 None => OperationResult::Error {
237 error: ErrorInfo {
238 kind: ErrorKind::InternalError,
239 message: "Server metrics not available".to_string(),
240 location: None,
241 details: None,
242 },
243 stdout: None,
244 stderr: None,
245 },
246 }
247 }
248
249 fn handle_get_session_stats(&self, session_id: &SessionId) -> OperationResult {
251 match self.session_manager.get_stats_collector(session_id) {
252 Ok(stats_collector) => {
253 let collector = stats_collector.lock().unwrap();
254 let snapshot = collector.snapshot();
255 OperationResult::SessionStats { snapshot }
256 }
257 Err(e) => self.error_result(e),
258 }
259 }
260
261 fn handle_get_subprocess_stats(&self, session_id: &SessionId) -> OperationResult {
263 match self.session_manager.get_subprocess_stats(session_id) {
264 Ok(Some(snapshot)) => OperationResult::SubprocessStats { snapshot },
265 Ok(None) => OperationResult::Error {
266 error: ErrorInfo {
267 kind: ErrorKind::InternalError,
268 message: "Subprocess metrics not available for this session".to_string(),
269 location: None,
270 details: None,
271 },
272 stdout: None,
273 stderr: None,
274 },
275 Err(e) => self.error_result(e),
276 }
277 }
278
279 fn handle_get_system_info(&self) -> OperationResult {
281 match &self.system_metadata {
282 Some(metadata) => OperationResult::SystemInfo { metadata: (**metadata).clone() },
283 None => OperationResult::Error {
284 error: ErrorInfo {
285 kind: ErrorKind::InternalError,
286 message: "System metadata not available".to_string(),
287 location: None,
288 details: None,
289 },
290 stdout: None,
291 stderr: None,
292 },
293 }
294 }
295
296 fn error_result(&self, error: SessionError) -> OperationResult {
298 let (kind, message) = match error {
299 SessionError::NotFound(id) => {
300 (ErrorKind::SessionNotFound, format!("Session not found: {}", id))
301 }
302 SessionError::AlreadyExists(id) => {
303 (ErrorKind::SessionAlreadyExists, format!("Session already exists: {}", id))
304 }
305 SessionError::LockPoisoned => (ErrorKind::InternalError, "Lock poisoned".to_string()),
306 SessionError::EvalFailed(eval_err) => {
307 use crate::eval::EvalError;
308 match eval_err {
309 EvalError::SyntaxError { msg, .. } => (ErrorKind::SyntaxError, msg),
310 EvalError::TypeError { msg, .. } => (ErrorKind::TypeError, msg),
311 EvalError::RuntimeError { msg, .. } => (ErrorKind::RuntimeError, msg),
312 EvalError::CompilationError { msg, .. } => (ErrorKind::CompilationError, msg),
313 EvalError::UnsupportedOperation { msg, .. } => (ErrorKind::RuntimeError, msg),
314 }
315 }
316 };
317
318 OperationResult::Error {
319 error: ErrorInfo { kind, message, location: None, details: None },
320 stdout: None,
321 stderr: None,
322 }
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use crate::protocol::{MessageId, ReplMode, SessionId};
330
331 #[tokio::test]
332 async fn test_handle_create_session() {
333 let manager = SessionManager::new();
334 let handler = MessageHandler::new(manager);
335
336 let request = Request {
337 id: MessageId::new(1),
338 session_id: SessionId::new("test-1"),
339 operation: Operation::CreateSession { mode: ReplMode::Lisp },
340 };
341
342 let response = handler.handle(request).await;
343
344 assert_eq!(response.request_id, MessageId::new(1));
345 assert_eq!(response.session_id, SessionId::new("test-1"));
346 assert!(matches!(response.result, OperationResult::Success { .. }));
347 }
348
349 #[tokio::test]
350 async fn test_handle_create_duplicate_session() {
351 let manager = SessionManager::new();
352 let handler = MessageHandler::new(manager);
353
354 handler
356 .handle(Request {
357 id: MessageId::new(1),
358 session_id: SessionId::new("test-1"),
359 operation: Operation::CreateSession { mode: ReplMode::Lisp },
360 })
361 .await;
362
363 let response = handler
365 .handle(Request {
366 id: MessageId::new(2),
367 session_id: SessionId::new("test-1"),
368 operation: Operation::CreateSession { mode: ReplMode::Lisp },
369 })
370 .await;
371
372 assert_eq!(response.request_id, MessageId::new(2));
373 if let OperationResult::Error { error, .. } = response.result {
374 assert_eq!(error.kind, ErrorKind::SessionAlreadyExists);
375 } else {
376 panic!("Expected error response");
377 }
378 }
379
380 #[tokio::test]
381 async fn test_handle_close_session() {
382 let manager = SessionManager::new();
383 let handler = MessageHandler::new(manager);
384
385 handler
387 .handle(Request {
388 id: MessageId::new(1),
389 session_id: SessionId::new("test-1"),
390 operation: Operation::CreateSession { mode: ReplMode::Lisp },
391 })
392 .await;
393
394 let response = handler
396 .handle(Request {
397 id: MessageId::new(2),
398 session_id: SessionId::new("test-1"),
399 operation: Operation::Close,
400 })
401 .await;
402
403 assert!(matches!(response.result, OperationResult::Success { .. }));
404 }
405
406 #[tokio::test]
407 async fn test_handle_close_nonexistent_session() {
408 let manager = SessionManager::new();
409 let handler = MessageHandler::new(manager);
410
411 let response = handler
412 .handle(Request {
413 id: MessageId::new(1),
414 session_id: SessionId::new("nonexistent"),
415 operation: Operation::Close,
416 })
417 .await;
418
419 if let OperationResult::Error { error, .. } = response.result {
420 assert_eq!(error.kind, ErrorKind::SessionNotFound);
421 } else {
422 panic!("Expected error response");
423 }
424 }
425
426 #[tokio::test]
427 async fn test_handle_list_sessions() {
428 let manager = SessionManager::new();
429 let handler = MessageHandler::new(manager);
430
431 handler
433 .handle(Request {
434 id: MessageId::new(1),
435 session_id: SessionId::new("test-1"),
436 operation: Operation::CreateSession { mode: ReplMode::Lisp },
437 })
438 .await;
439
440 handler
441 .handle(Request {
442 id: MessageId::new(2),
443 session_id: SessionId::new("test-2"),
444 operation: Operation::CreateSession { mode: ReplMode::Sexpr },
445 })
446 .await;
447
448 let response = handler
450 .handle(Request {
451 id: MessageId::new(3),
452 session_id: SessionId::new(""), operation: Operation::LsSessions,
454 })
455 .await;
456
457 if let OperationResult::Sessions { sessions } = response.result {
458 assert_eq!(sessions.len(), 2);
459 assert_eq!(sessions[0].id, SessionId::new("test-1"));
460 assert_eq!(sessions[0].mode, ReplMode::Lisp);
461 assert_eq!(sessions[1].id, SessionId::new("test-2"));
462 assert_eq!(sessions[1].mode, ReplMode::Sexpr);
463 } else {
464 panic!("Expected Sessions response");
465 }
466 }
467
468 #[tokio::test]
469 async fn test_handle_eval() {
470 let manager = SessionManager::new();
471 let handler = MessageHandler::new(manager);
472
473 handler
475 .handle(Request {
476 id: MessageId::new(1),
477 session_id: SessionId::new("test"),
478 operation: Operation::CreateSession { mode: ReplMode::Lisp },
479 })
480 .await;
481
482 let response = handler
484 .handle(Request {
485 id: MessageId::new(2),
486 session_id: SessionId::new("test"),
487 operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
488 })
489 .await;
490
491 if let OperationResult::Success { value, .. } = response.result {
492 assert_eq!(value, Some("3".to_string()));
493 } else {
494 panic!("Expected success response");
495 }
496 }
497
498 #[tokio::test]
499 async fn test_handle_eval_nonexistent_session() {
500 let manager = SessionManager::new();
501 let handler = MessageHandler::new(manager);
502
503 let response = handler
504 .handle(Request {
505 id: MessageId::new(1),
506 session_id: SessionId::new("nonexistent"),
507 operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
508 })
509 .await;
510
511 if let OperationResult::Error { error, .. } = response.result {
512 assert_eq!(error.kind, ErrorKind::SessionNotFound);
513 } else {
514 panic!("Expected error response");
515 }
516 }
517
518 #[tokio::test]
519 async fn test_handle_clone_session() {
520 let manager = SessionManager::new();
521 let handler = MessageHandler::new(manager);
522
523 handler
525 .handle(Request {
526 id: MessageId::new(1),
527 session_id: SessionId::new("source"),
528 operation: Operation::CreateSession { mode: ReplMode::Lisp },
529 })
530 .await;
531
532 let response = handler
534 .handle(Request {
535 id: MessageId::new(2),
536 session_id: SessionId::new("target"),
537 operation: Operation::Clone { source_session_id: SessionId::new("source") },
538 })
539 .await;
540
541 assert!(matches!(response.result, OperationResult::Success { .. }));
542 }
543
544 #[tokio::test]
545 async fn test_handle_clone_nonexistent_session() {
546 let manager = SessionManager::new();
547 let handler = MessageHandler::new(manager);
548
549 let response = handler
550 .handle(Request {
551 id: MessageId::new(1),
552 session_id: SessionId::new("target"),
553 operation: Operation::Clone { source_session_id: SessionId::new("nonexistent") },
554 })
555 .await;
556
557 if let OperationResult::Error { error, .. } = response.result {
558 assert_eq!(error.kind, ErrorKind::SessionNotFound);
559 } else {
560 panic!("Expected error response");
561 }
562 }
563
564 #[tokio::test]
565 async fn test_handle_clone_to_existing_session() {
566 let manager = SessionManager::new();
567 let handler = MessageHandler::new(manager);
568
569 handler
571 .handle(Request {
572 id: MessageId::new(1),
573 session_id: SessionId::new("source"),
574 operation: Operation::CreateSession { mode: ReplMode::Lisp },
575 })
576 .await;
577
578 handler
579 .handle(Request {
580 id: MessageId::new(2),
581 session_id: SessionId::new("target"),
582 operation: Operation::CreateSession { mode: ReplMode::Lisp },
583 })
584 .await;
585
586 let response = handler
588 .handle(Request {
589 id: MessageId::new(3),
590 session_id: SessionId::new("target"),
591 operation: Operation::Clone { source_session_id: SessionId::new("source") },
592 })
593 .await;
594
595 if let OperationResult::Error { error, .. } = response.result {
596 assert_eq!(error.kind, ErrorKind::SessionAlreadyExists);
597 } else {
598 panic!("Expected error response");
599 }
600 }
601
602 #[tokio::test]
603 async fn test_eval_with_output_capture() {
604 let manager = SessionManager::new();
605 let handler = MessageHandler::new(manager);
606
607 handler
609 .handle(Request {
610 id: MessageId::new(1),
611 session_id: SessionId::new("test"),
612 operation: Operation::CreateSession { mode: ReplMode::Lisp },
613 })
614 .await;
615
616 let response = handler
618 .handle(Request {
619 id: MessageId::new(2),
620 session_id: SessionId::new("test"),
621 operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
622 })
623 .await;
624
625 assert!(matches!(response.result, OperationResult::Success { .. }));
627 }
628
629 #[tokio::test]
630 async fn test_multiple_operations_same_session() {
631 let manager = SessionManager::new();
632 let handler = MessageHandler::new(manager);
633
634 handler
636 .handle(Request {
637 id: MessageId::new(1),
638 session_id: SessionId::new("test"),
639 operation: Operation::CreateSession { mode: ReplMode::Lisp },
640 })
641 .await;
642
643 let response1 = handler
645 .handle(Request {
646 id: MessageId::new(2),
647 session_id: SessionId::new("test"),
648 operation: Operation::Eval { code: "(+ 1 2)".to_string(), mode: ReplMode::Lisp },
649 })
650 .await;
651
652 let response2 = handler
653 .handle(Request {
654 id: MessageId::new(3),
655 session_id: SessionId::new("test"),
656 operation: Operation::Eval { code: "(* 3 4)".to_string(), mode: ReplMode::Lisp },
657 })
658 .await;
659
660 assert!(matches!(response1.result, OperationResult::Success { .. }));
661 assert!(matches!(response2.result, OperationResult::Success { .. }));
662
663 let response3 = handler
665 .handle(Request {
666 id: MessageId::new(4),
667 session_id: SessionId::new("test"),
668 operation: Operation::Close,
669 })
670 .await;
671
672 assert!(matches!(response3.result, OperationResult::Success { .. }));
673 }
674
675 #[tokio::test]
676 async fn test_unimplemented_operation() {
677 let manager = SessionManager::new();
678 let handler = MessageHandler::new(manager);
679
680 let response = handler
681 .handle(Request {
682 id: MessageId::new(1),
683 session_id: SessionId::new("test"),
684 operation: Operation::LoadFile {
685 path: "test.lisp".to_string(),
686 mode: ReplMode::Lisp,
687 },
688 })
689 .await;
690
691 if let OperationResult::Error { error, .. } = response.result {
692 assert_eq!(error.kind, ErrorKind::InvalidRequest);
693 assert!(error.message.contains("not yet implemented"));
694 } else {
695 panic!("Expected error response");
696 }
697 }
698}