1use super::{ReplError, ReplResult};
7use crate::config::Config;
8use anyhow::Result;
9use cqlite_core::{Database, QueryResult};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum SessionState {
17 Initializing,
19 Ready,
21 Executing,
23 Error(String),
25 Shutdown,
27}
28
29pub struct ReplSession {
31 database: Arc<Database>,
33 config: Config,
35 db_path: PathBuf,
37 state: SessionState,
39 current_keyspace: Option<String>,
41 data_dir: Option<PathBuf>,
43 variables: HashMap<String, String>,
45 connection_info: ConnectionInfo,
47 metrics: SessionMetrics,
49 schema_registry:
51 Option<Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>>,
52}
53
54#[derive(Debug, Clone)]
56pub struct ConnectionInfo {
57 pub version: String,
59 pub connected_at: std::time::SystemTime,
61 pub last_activity: std::time::SystemTime,
63 pub queries_executed: u64,
65 pub errors_count: u64,
67}
68
69#[derive(Debug, Clone, Default)]
71pub struct SessionMetrics {
72 pub total_execution_time_us: u64,
74 pub query_counts: HashMap<String, u64>,
76 pub avg_query_time_us: f64,
78 pub memory_usage_bytes: u64,
80 pub cache_hits: u64,
82 pub cache_misses: u64,
83}
84
85impl ReplSession {
86 pub fn new(db_path: &Path, config: Config, database: Database) -> ReplResult<Self> {
88 let connection_info = ConnectionInfo {
89 version: env!("CARGO_PKG_VERSION").to_string(),
90 connected_at: std::time::SystemTime::now(),
91 last_activity: std::time::SystemTime::now(),
92 queries_executed: 0,
93 errors_count: 0,
94 };
95
96 Ok(Self {
97 database: Arc::new(database),
98 config,
99 db_path: db_path.to_path_buf(),
100 state: SessionState::Initializing,
101 current_keyspace: None,
102 data_dir: None,
103 variables: HashMap::new(),
104 connection_info,
105 metrics: SessionMetrics::default(),
106 schema_registry: None,
107 })
108 }
109
110 pub async fn initialize(&mut self) -> ReplResult<()> {
112 self.state = SessionState::Ready;
113 self.connection_info.last_activity = std::time::SystemTime::now();
114
115 if let Some(default_keyspace) = self.config.default_keyspace.clone() {
117 if !default_keyspace.is_empty() {
118 let _ = self.use_keyspace(&default_keyspace).await;
119 }
120 }
121
122 if let Some(ref data_dir) = self.config.data_directory {
124 if !data_dir.as_os_str().is_empty() {
125 self.data_dir = Some(data_dir.clone());
126 }
127 }
128
129 Ok(())
130 }
131
132 pub fn state(&self) -> &SessionState {
134 &self.state
135 }
136
137 pub fn current_keyspace(&self) -> Option<&String> {
139 self.current_keyspace.as_ref()
140 }
141
142 pub fn db_path(&self) -> &Path {
144 &self.db_path
145 }
146
147 pub fn database(&self) -> Option<&Database> {
149 Some(&self.database)
150 }
151
152 pub fn data_dir(&self) -> Option<&Path> {
154 self.data_dir.as_deref()
155 }
156
157 pub fn set_data_dir(&mut self, path: Option<PathBuf>) {
159 self.data_dir = path;
160 }
161
162 pub async fn use_keyspace(&mut self, keyspace: &str) -> ReplResult<()> {
164 self.state = SessionState::Executing;
165 self.connection_info.last_activity = std::time::SystemTime::now();
166
167 let query = format!(
169 "SELECT keyspace_name FROM system.keyspaces WHERE keyspace_name = '{}'",
170 keyspace
171 );
172
173 match self.database.execute(&query).await {
174 Ok(result) => {
175 if result.rows.is_empty() {
176 self.state = SessionState::Ready;
177 return Err(ReplError::Session(format!(
178 "Keyspace '{}' not found",
179 keyspace
180 )));
181 }
182
183 self.current_keyspace = Some(keyspace.to_string());
184 self.state = SessionState::Ready;
185 Ok(())
186 }
187 Err(e) => {
188 self.current_keyspace = Some(keyspace.to_string());
190 self.state = SessionState::Ready;
191 self.connection_info.errors_count += 1;
192
193 log::warn!("Could not verify keyspace '{}': {}", keyspace, e);
195 Ok(())
196 }
197 }
198 }
199
200 pub async fn execute_query(&mut self, query: &str) -> ReplResult<QueryResult> {
202 self.state = SessionState::Executing;
203 self.connection_info.last_activity = std::time::SystemTime::now();
204
205 let start_time = std::time::Instant::now();
206
207 match self.database.execute(query).await {
208 Ok(result) => {
209 let elapsed = start_time.elapsed();
210 self.update_metrics(query, elapsed, true);
211 self.connection_info.queries_executed += 1;
212 self.state = SessionState::Ready;
213 Ok(result)
214 }
215 Err(e) => {
216 let elapsed = start_time.elapsed();
217 self.update_metrics(query, elapsed, false);
218 self.connection_info.errors_count += 1;
219 self.state = SessionState::Ready;
220 Err(ReplError::Database(e.into()))
221 }
222 }
223 }
224
225 pub async fn list_tables(&mut self) -> ReplResult<Vec<String>> {
227 self.state = SessionState::Executing;
228
229 let query = if let Some(ref keyspace) = self.current_keyspace {
230 format!(
231 "SELECT table_name FROM system.tables WHERE keyspace_name = '{}'",
232 keyspace
233 )
234 } else {
235 "SELECT keyspace_name, table_name FROM system.tables WHERE keyspace_name != 'system'"
236 .to_string()
237 };
238
239 match self.database.execute(&query).await {
240 Ok(result) => {
241 self.state = SessionState::Ready;
242 let mut tables = Vec::new();
243
244 for row in &result.rows {
245 if let Some(ref _keyspace) = self.current_keyspace {
246 if let Some(table_name) = row.get("table_name") {
248 tables.push(table_name.to_string());
249 }
250 } else {
251 if let (Some(keyspace_name), Some(table_name)) =
253 (row.get("keyspace_name"), row.get("table_name"))
254 {
255 tables.push(format!("{}.{}", keyspace_name, table_name));
256 }
257 }
258 }
259
260 Ok(tables)
261 }
262 Err(e) => {
263 self.state = SessionState::Ready;
264
265 if let Some(ref data_dir) = self.data_dir {
267 match self.scan_data_directory_tables(data_dir).await {
268 Ok(tables) => Ok(tables),
269 Err(_) => Err(ReplError::Database(e.into())),
270 }
271 } else {
272 Err(ReplError::Database(e.into()))
273 }
274 }
275 }
276 }
277
278 pub async fn list_keyspaces(&mut self) -> ReplResult<Vec<String>> {
280 self.state = SessionState::Executing;
281
282 let query = "SELECT keyspace_name FROM system.keyspaces";
283
284 match self.database.execute(query).await {
285 Ok(result) => {
286 self.state = SessionState::Ready;
287 let mut keyspaces = Vec::new();
288
289 for row in &result.rows {
290 if let Some(keyspace_name) = row.get("keyspace_name") {
291 keyspaces.push(keyspace_name.to_string());
292 }
293 }
294
295 Ok(keyspaces)
296 }
297 Err(e) => {
298 self.state = SessionState::Ready;
299
300 if let Some(ref data_dir) = self.data_dir {
302 match self.scan_data_directory_keyspaces(data_dir).await {
303 Ok(keyspaces) => Ok(keyspaces),
304 Err(_) => Err(ReplError::Database(e.into())),
305 }
306 } else {
307 Err(ReplError::Database(e.into()))
308 }
309 }
310 }
311 }
312
313 pub async fn describe_object(&mut self, object_name: &str) -> ReplResult<String> {
315 self.state = SessionState::Executing;
316
317 let (keyspace, table) = if object_name.contains('.') {
319 let parts: Vec<&str> = object_name.split('.').collect();
320 if parts.len() == 2 {
321 (Some(parts[0]), parts[1])
322 } else {
323 (self.current_keyspace.as_deref(), object_name)
324 }
325 } else {
326 (self.current_keyspace.as_deref(), object_name)
327 };
328
329 if let Some(ks) = keyspace {
330 match self.describe_table(ks, table).await {
331 Ok(description) => {
332 self.state = SessionState::Ready;
333 Ok(description)
334 }
335 Err(e) => {
336 self.state = SessionState::Ready;
337 Err(e)
338 }
339 }
340 } else {
341 self.state = SessionState::Ready;
342 Err(ReplError::Session(
343 "No keyspace specified and no current keyspace set".to_string(),
344 ))
345 }
346 }
347
348 async fn describe_table(&self, keyspace: &str, table: &str) -> ReplResult<String> {
350 let query = format!(
351 "SELECT column_name, type, kind FROM system.columns WHERE keyspace_name = '{}' AND table_name = '{}' ORDER BY position",
352 keyspace, table
353 );
354
355 match self.database.execute(&query).await {
356 Ok(result) => {
357 if result.rows.is_empty() {
358 return Err(ReplError::Session(format!(
359 "Table '{}.{}' not found",
360 keyspace, table
361 )));
362 }
363
364 let mut description = String::new();
365 description.push_str(&format!("Table: {}.{}\n", keyspace, table));
366 description.push_str("Columns:\n");
367
368 for row in &result.rows {
369 if let (Some(col_name), Some(col_type), Some(col_kind)) =
370 (row.get("column_name"), row.get("type"), row.get("kind"))
371 {
372 let kind_desc = match col_kind.to_string().as_str() {
373 "partition_key" => " (PARTITION KEY)",
374 "clustering" => " (CLUSTERING KEY)",
375 "regular" => "",
376 _ => "",
377 };
378 description
379 .push_str(&format!(" {} {}{}\n", col_name, col_type, kind_desc));
380 }
381 }
382
383 Ok(description)
384 }
385 Err(e) => Err(ReplError::Database(e.into())),
386 }
387 }
388
389 pub fn get_variable(&self, name: &str) -> Option<&String> {
391 self.variables.get(name)
392 }
393
394 pub fn set_variable(&mut self, name: String, value: String) {
396 self.variables.insert(name, value);
397 }
398
399 pub fn connection_info(&self) -> &ConnectionInfo {
401 &self.connection_info
402 }
403
404 pub fn metrics(&self) -> &SessionMetrics {
406 &self.metrics
407 }
408
409 fn update_metrics(&mut self, query: &str, elapsed: std::time::Duration, success: bool) {
411 let elapsed_us = elapsed.as_micros() as u64;
412 self.metrics.total_execution_time_us += elapsed_us;
413
414 let total_queries = self.connection_info.queries_executed + if success { 1 } else { 0 };
416 if total_queries > 0 {
417 self.metrics.avg_query_time_us =
418 self.metrics.total_execution_time_us as f64 / total_queries as f64;
419 }
420
421 let query_type = self.categorize_query(query);
423 *self.metrics.query_counts.entry(query_type).or_insert(0) += 1;
424 }
425
426 fn categorize_query(&self, query: &str) -> String {
428 let upper = query.to_uppercase();
429 let trimmed = upper.trim();
430
431 if trimmed.starts_with("SELECT") {
432 "SELECT".to_string()
433 } else if trimmed.starts_with("INSERT") {
434 "INSERT".to_string()
435 } else if trimmed.starts_with("UPDATE") {
436 "UPDATE".to_string()
437 } else if trimmed.starts_with("DELETE") {
438 "DELETE".to_string()
439 } else if trimmed.starts_with("CREATE") {
440 "CREATE".to_string()
441 } else if trimmed.starts_with("ALTER") {
442 "ALTER".to_string()
443 } else if trimmed.starts_with("DROP") {
444 "DROP".to_string()
445 } else if trimmed.starts_with("DESCRIBE") {
446 "DESCRIBE".to_string()
447 } else {
448 "OTHER".to_string()
449 }
450 }
451
452 async fn scan_data_directory_tables(&self, data_dir: &Path) -> Result<Vec<String>> {
454 use std::fs;
455
456 let mut tables = Vec::new();
457
458 if let Some(ref keyspace) = self.current_keyspace {
459 let keyspace_dir = data_dir.join(keyspace);
461 if keyspace_dir.exists() {
462 for entry in fs::read_dir(&keyspace_dir)? {
463 let entry = entry?;
464 if entry.path().is_dir() {
465 if let Some(dir_name) = entry.file_name().to_str() {
466 if let Some(table_name) = self.extract_table_name(dir_name) {
467 tables.push(table_name);
468 }
469 }
470 }
471 }
472 }
473 } else {
474 for entry in fs::read_dir(data_dir)? {
476 let entry = entry?;
477 if entry.path().is_dir() {
478 if let Some(keyspace_name) = entry.file_name().to_str() {
479 if keyspace_name.starts_with('.') || keyspace_name == "system" {
480 continue;
481 }
482
483 let keyspace_dir = entry.path();
484 for table_entry in fs::read_dir(&keyspace_dir)? {
485 let table_entry = table_entry?;
486 if table_entry.path().is_dir() {
487 if let Some(dir_name) = table_entry.file_name().to_str() {
488 if let Some(table_name) = self.extract_table_name(dir_name) {
489 tables.push(format!("{}.{}", keyspace_name, table_name));
490 }
491 }
492 }
493 }
494 }
495 }
496 }
497 }
498
499 Ok(tables)
500 }
501
502 async fn scan_data_directory_keyspaces(&self, data_dir: &Path) -> Result<Vec<String>> {
504 use std::fs;
505
506 let mut keyspaces = Vec::new();
507
508 for entry in fs::read_dir(data_dir)? {
509 let entry = entry?;
510 if entry.path().is_dir() {
511 if let Some(name) = entry.file_name().to_str() {
512 if !name.starts_with('.') && name != "system" {
513 keyspaces.push(name.to_string());
514 }
515 }
516 }
517 }
518
519 keyspaces.sort();
520 Ok(keyspaces)
521 }
522
523 fn extract_table_name(&self, dir_name: &str) -> Option<String> {
525 if let Some(dash_pos) = dir_name.find('-') {
527 let table_part = &dir_name[..dash_pos];
528 if !table_part.is_empty() && table_part.chars().all(|c| c.is_alphanumeric() || c == '_')
529 {
530 return Some(table_part.to_string());
531 }
532 }
533 None
534 }
535
536 pub async fn shutdown(&mut self) -> ReplResult<()> {
538 self.state = SessionState::Shutdown;
539
540 self.save_session_state().await?;
542
543 Ok(())
544 }
545
546 async fn save_session_state(&self) -> ReplResult<()> {
548 log::info!(
551 "Session ending. Queries executed: {}, Errors: {}",
552 self.connection_info.queries_executed,
553 self.connection_info.errors_count
554 );
555 Ok(())
556 }
557
558 pub fn export_metrics(&self) -> String {
560 let mut report = String::new();
561
562 report.push_str("=== CQLite Session Report ===\n");
563 report.push_str(&format!("Database: {}\n", self.db_path.display()));
564 report.push_str(&format!(
565 "Session Duration: {:?}\n",
566 self.connection_info
567 .last_activity
568 .duration_since(self.connection_info.connected_at)
569 .unwrap_or_default()
570 ));
571 report.push_str(&format!(
572 "Queries Executed: {}\n",
573 self.connection_info.queries_executed
574 ));
575 report.push_str(&format!("Errors: {}\n", self.connection_info.errors_count));
576 report.push_str(&format!(
577 "Average Query Time: {:.2}ms\n",
578 self.metrics.avg_query_time_us / 1000.0
579 ));
580
581 if !self.metrics.query_counts.is_empty() {
582 report.push_str("\nQuery Types:\n");
583 for (query_type, count) in &self.metrics.query_counts {
584 report.push_str(&format!(" {}: {}\n", query_type, count));
585 }
586 }
587
588 if let Some(ref keyspace) = self.current_keyspace {
589 report.push_str(&format!("Current Keyspace: {}\n", keyspace));
590 }
591
592 report
593 }
594
595 pub fn replace_database(&mut self, new_database: Database) -> ReplResult<()> {
600 self.database = Arc::new(new_database);
601 Ok(())
602 }
603
604 pub fn config(&self) -> &Config {
606 &self.config
607 }
608
609 pub fn schema_registry(
611 &self,
612 ) -> Option<Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>> {
613 self.schema_registry.clone()
614 }
615
616 pub fn set_schema_registry(
618 &mut self,
619 registry: Option<Arc<tokio::sync::RwLock<cqlite_core::schema::registry::SchemaRegistry>>>,
620 ) {
621 self.schema_registry = registry;
622 }
623}