1use crate::config::ReplConfig;
6use crate::repl::info::show_system_info;
7use crate::repl::runner::{ReplClientAdapter, ReplRunner};
8use crate::repl::stats::{
9 parse_stats_command_with_resources, show_all_stats, show_server_stats, show_subprocess_stats,
10};
11use crate::repl::terminal::ReplTerminal;
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use oxur_repl::metadata::SystemMetadata;
15use oxur_repl::metrics::{ClientMetrics, ServerMetrics};
16use oxur_repl::protocol::{
17 MessageId, Operation, OperationResult, ReplMode, Request, Response, SessionId,
18};
19use oxur_repl::server::{MessageHandler, SessionManager};
20use oxur_repl::transport::{inprocess_channel, InProcessClient, InProcessServer, Transport};
21use std::sync::Arc;
22
23struct AdapterMetrics {
25 server: Arc<ServerMetrics>,
26 client: Arc<ClientMetrics>,
27}
28
29struct InProcessAdapter {
37 client: InProcessClient,
38 server: InProcessServer,
39 handler: MessageHandler,
40 session_manager: Arc<SessionManager>,
41 session_id: SessionId,
42 system_metadata: Arc<SystemMetadata>,
43 metrics: AdapterMetrics,
44}
45
46impl InProcessAdapter {
47 fn new(
48 client: InProcessClient,
49 server: InProcessServer,
50 handler: MessageHandler,
51 session_manager: Arc<SessionManager>,
52 session_id: SessionId,
53 system_metadata: Arc<SystemMetadata>,
54 metrics: AdapterMetrics,
55 ) -> Self {
56 Self { client, server, handler, session_manager, session_id, system_metadata, metrics }
57 }
58}
59
60#[async_trait]
61impl ReplClientAdapter for InProcessAdapter {
62 async fn send_eval(&mut self, request: Request) -> Result<()> {
63 let start_time = std::time::Instant::now();
65
66 let operation_name = match &request.operation {
68 Operation::CreateSession { .. } => "create_session",
69 Operation::Clone { .. } => "clone",
70 Operation::Eval { .. } => "eval",
71 Operation::Close => "close",
72 Operation::LsSessions => "ls_sessions",
73 Operation::LoadFile { .. } => "load_file",
74 Operation::Interrupt => "interrupt",
75 Operation::Describe { .. } => "describe",
76 Operation::History { .. } => "history",
77 Operation::ClearOutput => "clear_output",
78 Operation::GetServerStats => "get_server_stats",
79 Operation::GetSessionStats => "get_session_stats",
80 Operation::GetSubprocessStats => "get_subprocess_stats",
81 Operation::GetSystemInfo => "get_system_info",
82 _ => "unknown",
83 };
84 self.metrics.server.request_received(operation_name);
85
86 self.metrics.client.request_sent(operation_name);
88
89 match &request.operation {
91 Operation::CreateSession { .. } => self.metrics.server.session_created(),
92 Operation::Close => self.metrics.server.session_closed(),
93 _ => {}
94 }
95
96 self.client.send_request(&request).await.context("Failed to send request")?;
98
99 let response = self.handler.handle(request).await;
101
102 let status = match &response.result {
104 OperationResult::Success { .. } => "success",
105 OperationResult::Error { .. } => "error",
106 OperationResult::Sessions { .. } => "success",
107 OperationResult::HistoryEntries { .. } => "success",
108 OperationResult::ServerStats { .. } => "success",
109 OperationResult::SessionStats { .. } => "success",
110 OperationResult::SubprocessStats { .. } => "success",
111 OperationResult::SystemInfo { .. } => "success",
112 _ => "unknown",
113 };
114 self.metrics.server.response_sent(status);
115
116 let latency = start_time.elapsed();
118 self.metrics.client.response_received(status, latency);
119
120 self.server.send_response(&response).await.context("Failed to send response")?;
122
123 Ok(())
124 }
125
126 async fn recv_response(&mut self) -> Result<Response> {
127 self.client.recv_response().await.context("Failed to receive response")
128 }
129
130 async fn close(&mut self) -> Result<()> {
131 Ok(())
133 }
134
135 async fn handle_special_command(&mut self, input: &str, color_enabled: bool) -> Option<String> {
136 if input == "(info)" {
138 if let Ok(usage_metrics) = self.session_manager.get_usage_metrics(&self.session_id) {
140 if let Ok(mut metrics) = usage_metrics.lock() {
141 metrics.record_info();
142 }
143 }
144 return Some(show_system_info(&self.system_metadata, color_enabled));
145 }
146
147 if input == "(sessions)" {
149 if let Ok(usage_metrics) = self.session_manager.get_usage_metrics(&self.session_id) {
151 if let Ok(mut metrics) = usage_metrics.lock() {
152 metrics.record_sessions();
153 }
154 }
155 return match self.session_manager.list() {
156 Ok(sessions) => Some(crate::repl::stats::show_sessions(
157 &sessions,
158 &self.session_id,
159 color_enabled,
160 )),
161 Err(e) => Some(format!("Failed to list sessions: {}", e)),
162 };
163 }
164
165 if !input.starts_with("(stats") {
167 return None;
168 }
169
170 if let Ok(usage_metrics) = self.session_manager.get_usage_metrics(&self.session_id) {
172 if let Ok(mut metrics) = usage_metrics.lock() {
173 metrics.record_stats();
174 }
175 }
176
177 if input == "(stats server)" {
179 let snapshot = self.metrics.server.snapshot();
180 return Some(show_server_stats(&snapshot, color_enabled));
181 }
182
183 if input == "(stats client)" {
185 let snapshot = self.metrics.client.snapshot();
186 return Some(crate::repl::stats::show_client_stats(&snapshot, color_enabled));
187 }
188
189 if input == "(stats subprocess)" {
191 return match self.session_manager.get_subprocess_stats(&self.session_id) {
192 Ok(Some(snapshot)) => Some(show_subprocess_stats(&snapshot, color_enabled)),
193 Ok(None) => Some("Subprocess not running".to_string()),
194 Err(e) => Some(format!("Failed to get subprocess stats: {}", e)),
195 };
196 }
197
198 if input == "(stats usage)" {
200 return match self.session_manager.get_usage_metrics(&self.session_id) {
201 Ok(usage_metrics) => {
202 let metrics = usage_metrics.lock().unwrap();
203 let snapshot = metrics.snapshot();
204 Some(crate::repl::stats::show_usage_stats(&snapshot, color_enabled))
205 }
206 Err(e) => Some(format!("Failed to get usage metrics: {}", e)),
207 };
208 }
209
210 if input == "(stats)" {
212 return match self.session_manager.get_stats_collector(&self.session_id) {
213 Ok(stats_collector) => {
214 let (dir_stats, cache_stats) = self
215 .session_manager
216 .get_resource_stats(&self.session_id)
217 .unwrap_or((None, None));
218
219 let collector = stats_collector.lock().unwrap();
220
221 let server_snapshot = self.metrics.server.snapshot();
223 let client_snapshot = self.metrics.client.snapshot();
224 let subprocess_snapshot =
225 self.session_manager.get_subprocess_stats(&self.session_id).ok().flatten();
226 let usage_snapshot = self
227 .session_manager
228 .get_usage_metrics(&self.session_id)
229 .ok()
230 .and_then(|m| m.lock().ok().map(|metrics| metrics.snapshot()));
231
232 Some(show_all_stats(
233 &collector,
234 dir_stats.as_ref(),
235 cache_stats.as_ref(),
236 Some(&server_snapshot),
237 Some(&client_snapshot),
238 subprocess_snapshot.as_ref(),
239 usage_snapshot.as_ref(),
240 color_enabled,
241 ))
242 }
243 Err(e) => Some(format!("Failed to get stats: {}", e)),
244 };
245 }
246
247 match self.session_manager.get_stats_collector(&self.session_id) {
249 Ok(stats_collector) => {
250 let (dir_stats, cache_stats) = self
251 .session_manager
252 .get_resource_stats(&self.session_id)
253 .unwrap_or((None, None));
254
255 let collector = stats_collector.lock().unwrap();
256 parse_stats_command_with_resources(
257 input,
258 &collector,
259 dir_stats.as_ref(),
260 cache_stats.as_ref(),
261 color_enabled,
262 )
263 }
264 Err(e) => Some(format!("Failed to get stats: {}", e)),
265 }
266 }
267
268 fn record_usage(&mut self, command_type: oxur_repl::metrics::CommandType) {
269 if let Ok(usage_metrics) = self.session_manager.get_usage_metrics(&self.session_id) {
270 if let Ok(mut metrics) = usage_metrics.lock() {
271 match command_type {
272 oxur_repl::metrics::CommandType::Eval => metrics.record_eval(),
273 oxur_repl::metrics::CommandType::Help => metrics.record_help(),
274 oxur_repl::metrics::CommandType::Stats => metrics.record_stats(),
275 oxur_repl::metrics::CommandType::Info => metrics.record_info(),
276 oxur_repl::metrics::CommandType::Sessions => metrics.record_sessions(),
277 oxur_repl::metrics::CommandType::Clear => metrics.record_clear(),
278 oxur_repl::metrics::CommandType::Banner => metrics.record_banner(),
279 }
280 }
281 }
282 }
283
284 async fn create_session(&mut self, name: Option<String>) -> Result<SessionId> {
285 let new_id = SessionId::new(format!("session-{}", uuid::Uuid::new_v4()));
287
288 self.session_manager
290 .create(new_id.clone(), ReplMode::Lisp)
291 .context("Failed to create new session")?;
292
293 if let Some(session_name) = name {
295 let _ = session_name; }
299
300 Ok(new_id)
301 }
302
303 async fn switch_session(&mut self, session_id: SessionId) -> Result<()> {
304 self.session_manager.get_info(&session_id).context("Session not found")?;
306
307 self.session_id = session_id;
309
310 Ok(())
311 }
312
313 fn current_session(&self) -> &SessionId {
314 &self.session_id
315 }
316
317 async fn close_session(&mut self, session_id: Option<SessionId>) -> Result<()> {
318 let target_id = session_id.unwrap_or_else(|| self.session_id.clone());
319
320 if target_id == self.session_id {
322 return Err(anyhow::anyhow!(
323 "Cannot close current session. Switch to another session first."
324 ));
325 }
326
327 self.session_manager.close(&target_id).context("Failed to close session")?;
329
330 Ok(())
331 }
332}
333
334pub async fn run(config: ReplConfig) -> Result<()> {
344 let system_metadata = Arc::new(SystemMetadata::capture());
346
347 let (client, server_transport) = inprocess_channel();
349
350 let session_manager = Arc::new(SessionManager::new());
352
353 let server_metrics = Arc::new(ServerMetrics::new());
355
356 let client_metrics = Arc::new(ClientMetrics::new());
358
359 let handler = MessageHandler::with_metrics_and_metadata(
361 (*session_manager).clone(),
362 server_metrics.clone(),
363 system_metadata.clone(),
364 );
365
366 let session_id = SessionId::new(format!("interactive-{}", std::process::id()));
368
369 let create_req = Request {
371 id: MessageId::new(1),
372 session_id: session_id.clone(),
373 operation: Operation::CreateSession { mode: ReplMode::Lisp },
374 };
375
376 let _response = handler.handle(create_req).await;
378
379 let terminal = ReplTerminal::with_config(config.terminal, config.history)
381 .context("Failed to create terminal")?;
382
383 let metrics = AdapterMetrics { server: server_metrics, client: client_metrics };
385 let mut adapter = InProcessAdapter::new(
386 client,
387 server_transport,
388 handler,
389 session_manager,
390 session_id.clone(),
391 system_metadata.clone(),
392 metrics,
393 );
394
395 let mut runner = ReplRunner::new(terminal, session_id);
397 runner.print_banner(&system_metadata);
398 runner.run(&mut adapter).await?;
399 runner.finish(&mut adapter).await?;
400
401 Ok(())
402}