use std::collections::HashMap;
use std::fs;
use std::time::Instant;
use tabled::{builder::Builder, settings::Style};
use tracing::debug;
use crate::config::{Config, OutputFormat};
use crate::error::{ExecutionError, Result};
use crate::parser::{Command, ConfigCommand, ExportFormat, PipeCommand, QueryMode};
use super::admin::AdminExecutor;
use super::context::ExecutionContext;
use super::export::{CsvWriter, ExportCoordinator, FormatWriter, JsonLWriter, ProgressTracker};
use super::query::QueryExecutor;
use super::result::{ExecutionResult, ExecutionStats, ResultData};
use super::utility::UtilityExecutor;
pub struct CommandRouter {
context: ExecutionContext,
}
impl CommandRouter {
pub async fn new(context: ExecutionContext) -> Result<Self> {
Ok(Self { context })
}
pub async fn route(&self, command: Command) -> Result<ExecutionResult> {
debug!("Routing command: {:?}", command);
let start = Instant::now();
let result = match command {
Command::Query(query_cmd) => {
let executor = QueryExecutor::new(self.context.clone()).await?;
executor.execute(query_cmd, QueryMode::default()).await
}
Command::Admin(admin_cmd) => {
let executor = AdminExecutor::new(self.context.clone()).await?;
executor.execute(admin_cmd).await
}
Command::Utility(util_cmd) => {
let executor = UtilityExecutor::new(self.context.clone());
executor.execute(util_cmd).await
}
Command::Config(config_cmd) => self.execute_config(config_cmd).await,
Command::Pipe(base_cmd, pipe_cmd) => self.execute_pipe(*base_cmd, pipe_cmd).await,
Command::Help(topic) => self.execute_help(topic).await,
Command::Exit => Ok(ExecutionResult {
success: true,
data: ResultData::Message("Exiting...".to_string()),
stats: ExecutionStats::default(),
error: None,
}),
};
let elapsed = start.elapsed().as_millis() as u64;
debug!("Command executed in {}ms", elapsed);
result
}
fn execute_pipe(
&self,
base_cmd: Command,
pipe_cmd: PipeCommand,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<ExecutionResult>> + Send + '_>>
{
Box::pin(async move {
match pipe_cmd {
PipeCommand::Export { format, file } => {
let result = if let Command::Query(query_cmd) = base_cmd {
let executor = QueryExecutor::new(self.context.clone()).await?;
executor.execute(query_cmd, QueryMode::Streaming { batch_size: 1000 }).await?
} else {
return Err(ExecutionError::InvalidOperation(
"Export can only be used with query commands".to_string()
).into());
};
let query = match result.data {
ResultData::Stream(stream) => stream,
_ => {
return Err(ExecutionError::InvalidOperation(
"Query did not return streaming data for export".to_string()
).into());
}
};
let filename = file.unwrap_or_else(|| {
use chrono::Local;
let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S");
match format {
ExportFormat::JsonL => format!("export-{}.jsonl", timestamp),
ExportFormat::Csv => format!("export-{}.csv", timestamp),
}
});
let writer: Box<dyn FormatWriter> = match format {
ExportFormat::JsonL => Box::new(JsonLWriter::new(&filename).await?),
ExportFormat::Csv => Box::new(CsvWriter::new(&filename).await?),
};
let tracker = ProgressTracker::new(None, true);
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
cancel_token_clone.cancel();
}
Err(err) => {
eprintln!("Failed to listen for Ctrl+C: {}", err);
}
}
});
let mut coordinator = ExportCoordinator::new(query, tracker, writer)
.with_cancellation(cancel_token);
let export_result = coordinator.execute().await?;
let message = if export_result.cancelled {
format!(
"Export cancelled. Exported {} documents to {} ({:.2} MB) before cancellation",
export_result.documents_exported,
filename,
export_result.file_size_bytes as f64 / 1024.0 / 1024.0
)
} else {
format!(
"Exported {} documents to {} ({:.2} MB) in {:.2}s",
export_result.documents_exported,
filename,
export_result.file_size_bytes as f64 / 1024.0 / 1024.0,
export_result.elapsed_ms as f64 / 1000.0
)
};
Ok(ExecutionResult {
success: true,
data: ResultData::Message(message),
stats: ExecutionStats {
execution_time_ms: export_result.elapsed_ms,
documents_returned: 0,
documents_affected: Some(export_result.documents_exported),
},
error: None,
})
}
PipeCommand::Explain => {
let result = self.route(base_cmd).await?;
Ok(ExecutionResult {
success: true,
data: ResultData::Message(
"Explain functionality not yet implemented".to_string(),
),
stats: result.stats,
error: None,
})
}
}
})
}
async fn execute_help(&self, topic: Option<String>) -> Result<ExecutionResult> {
let help_text = if let Some(t) = topic {
format!("Help for: {}\n(Not yet implemented)", t)
} else {
r#"MongoDB Shell Commands:
Configuration:
format [shell|json|json-pretty|table|compact] - Set/get output format
color [on|off] - Enable/disable color output
config - Show current configuration
Named Queries:
query - List all named queries
query <name> [args...] - Execute a named query with arguments
query save <name> <query> - Save a new named query
query delete <name> - Delete a named query
Parameter substitution:
'$1', '$2'... - String parameters (with quotes in template)
$1, $2... - Numeric/raw parameters (no quotes in template)
$* - Raw aggregation: 18, 25, 30
$@ - String aggregation: 'admin', 'user'
Examples:
query save user "db.users.find({name: '\$1', age: \$2})"
query user John 25 -> {name: 'John', age: 25}
Utility:
help - Show this help
help <command> - Show help for specific command
exit / quit - Exit shell
"#
.to_string()
};
Ok(ExecutionResult {
success: true,
data: ResultData::Message(help_text),
stats: ExecutionStats::default(),
error: None,
})
}
async fn execute_config(&self, cmd: ConfigCommand) -> Result<ExecutionResult> {
let shared_state = &self.context.shared_state;
let message = match cmd {
ConfigCommand::SetFormat(format_str) => {
let format = match format_str.to_lowercase().as_str() {
"shell" => OutputFormat::Shell,
"json" => OutputFormat::Json,
"json-pretty" | "jsonpretty" => OutputFormat::JsonPretty,
"table" => OutputFormat::Table,
"compact" => OutputFormat::Compact,
_ => {
return Ok(ExecutionResult {
success: false,
data: ResultData::Message(format!(
"Invalid format: '{}'\n\nSupported formats: shell, json, json-pretty, table, compact",
format_str
)),
stats: ExecutionStats::default(),
error: Some("Invalid format".to_string()),
});
}
};
shared_state.set_format(format);
format!("Output format set to: {}", format_str)
}
ConfigCommand::GetFormat => {
let format = shared_state.get_format();
let format_str = match format {
OutputFormat::Shell => "shell",
OutputFormat::Json => "json",
OutputFormat::JsonPretty => "json-pretty",
OutputFormat::Table => "table",
OutputFormat::Compact => "compact",
};
format!(
"Current format: {}\n\nSupported formats: shell, json, json-pretty, table, compact",
format_str
)
}
ConfigCommand::SetColor(enabled) => {
shared_state.set_color_enabled(enabled);
format!(
"Color output {}",
if enabled { "enabled" } else { "disabled" }
)
}
ConfigCommand::GetColor => {
let enabled = shared_state.get_color_enabled();
format!(
"Color output: {}",
if enabled { "enabled" } else { "disabled" }
)
}
ConfigCommand::ShowConfig => {
let format = shared_state.get_format();
let format_str = match format {
OutputFormat::Shell => "shell",
OutputFormat::Json => "json",
OutputFormat::JsonPretty => "json-pretty",
OutputFormat::Table => "table",
OutputFormat::Compact => "compact",
};
let color = if shared_state.get_color_enabled() {
"enabled"
} else {
"disabled"
};
format!(
r#"Current Configuration:
format: {}
color: {}
Available Commands:
format [shell|json|json-pretty|table|compact] - Set/get output format
color [on|off] - Set/get color output
config - Show this configuration"#,
format_str, color
)
}
ConfigCommand::ListNamedQueries => {
return self.list_named_query().await;
}
ConfigCommand::ExecuteNamedQuery { name, args } => {
return self.execute_named_query(&name, &args).await;
}
ConfigCommand::SaveNamedQuery { name, query } => {
return self.save_named_query(&name, &query).await;
}
ConfigCommand::DeleteNamedQuery(name) => {
return self.delete_named_query(&name).await;
}
};
Ok(ExecutionResult {
success: true,
data: ResultData::Message(message),
stats: ExecutionStats::default(),
error: None,
})
}
async fn load_named_query(&self) -> Result<HashMap<String, String>> {
let config_path = self
.context
.config_path
.as_ref()
.map(|p| p.clone())
.unwrap_or_else(|| Config::default_config_path());
if !config_path.exists() {
return Ok(HashMap::new());
}
let content = fs::read_to_string(&config_path).map_err(|e| {
crate::error::MongoshError::Config(crate::error::ConfigError::Generic(format!(
"Failed to read config file: {}",
e
)))
})?;
let config: Config = toml::from_str(&content).map_err(|e| {
crate::error::MongoshError::Config(crate::error::ConfigError::Generic(format!(
"Failed to parse config file: {}",
e
)))
})?;
Ok(config.named_query)
}
async fn save_config_with_query(&self, query: HashMap<String, String>) -> Result<()> {
let config_path = self
.context
.config_path
.as_ref()
.map(|p| p.clone())
.unwrap_or_else(|| Config::default_config_path());
let mut config = if config_path.exists() {
let content = fs::read_to_string(&config_path).map_err(|e| {
crate::error::MongoshError::Config(crate::error::ConfigError::Generic(format!(
"Failed to read config file: {}",
e
)))
})?;
toml::from_str(&content).unwrap_or_else(|_| Config::default())
} else {
Config::default()
};
config.named_query = query;
config.save_to_file(Some(&config_path))?;
Ok(())
}
async fn list_named_query(&self) -> Result<ExecutionResult> {
let query = self.load_named_query().await?;
if query.is_empty() {
return Ok(ExecutionResult {
success: true,
data: ResultData::Message("No named queries defined.".to_string()),
stats: ExecutionStats::default(),
error: None,
});
}
let mut builder = Builder::default();
builder.push_record(vec!["Name", "Query"]);
for (name, q) in query.iter() {
builder.push_record(vec![name.as_str(), q.as_str()]);
}
let mut table = builder.build();
table.with(Style::ascii());
Ok(ExecutionResult {
success: true,
data: ResultData::Message(table.to_string()),
stats: ExecutionStats::default(),
error: None,
})
}
async fn execute_named_query(&self, name: &str, args: &[String]) -> Result<ExecutionResult> {
let query = self.load_named_query().await?;
let query_template = query.get(name).ok_or_else(|| {
crate::error::MongoshError::Config(crate::error::ConfigError::Generic(format!(
"Named query '{}' not found",
name
)))
})?;
let substituted_query = self.substitute_parameters(query_template, args);
let mut parser = crate::parser::Parser::new();
let command = parser.parse(&substituted_query)?;
Box::pin(self.route(command)).await
}
fn substitute_parameters(&self, template: &str, args: &[String]) -> String {
let mut result = template.to_string();
for (i, arg) in args.iter().enumerate() {
let placeholder = format!("${}", i + 1);
let quoted_placeholder = format!("'{}'", placeholder);
let double_quoted_placeholder = format!("\"{}\"", placeholder);
if result.contains("ed_placeholder) {
result = result.replace("ed_placeholder, &format!("'{}'", arg));
} else if result.contains(&double_quoted_placeholder) {
result = result.replace(&double_quoted_placeholder, &format!("\"{}\"", arg));
} else {
result = result.replace(&placeholder, arg);
}
}
if result.contains("$@") {
let quoted_args: Vec<String> = args.iter().map(|s| format!("'{}'", s)).collect();
let aggregated = quoted_args.join(", ");
result = result.replace("$@", &aggregated);
}
if result.contains("$*") {
let aggregated = args.join(", ");
result = result.replace("$*", &aggregated);
}
result
}
async fn save_named_query(&self, name: &str, query: &str) -> Result<ExecutionResult> {
let mut query_map = self.load_named_query().await?;
query_map.insert(name.to_string(), query.to_string());
self.save_config_with_query(query_map).await?;
Ok(ExecutionResult {
success: true,
data: ResultData::Message(format!("Named query '{}' saved", name)),
stats: ExecutionStats::default(),
error: None,
})
}
async fn delete_named_query(&self, name: &str) -> Result<ExecutionResult> {
let mut query = self.load_named_query().await?;
if query.remove(name).is_none() {
return Ok(ExecutionResult {
success: false,
data: ResultData::Message(format!("Named query '{}' not found", name)),
stats: ExecutionStats::default(),
error: Some(format!("Query '{}' does not exist", name)),
});
}
self.save_config_with_query(query).await?;
Ok(ExecutionResult {
success: true,
data: ResultData::Message(format!("{}: Deleted", name)),
stats: ExecutionStats::default(),
error: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_substitute_parameters_with_numbers() {
let router = CommandRouter {
context: ExecutionContext::new(
crate::connection::ConnectionManager::new(
"mongodb://localhost:27017".to_string(),
crate::config::ConnectionConfig::default(),
),
crate::repl::SharedState::new("test".to_string()),
),
};
let template = "db.users.find({age: $1})";
let result = router.substitute_parameters(template, &["18".to_string()]);
assert_eq!(result, "db.users.find({age: 18})");
let template = "db.users.find({age: '$1'})";
let result = router.substitute_parameters(template, &["18".to_string()]);
assert_eq!(result, "db.users.find({age: '18'})");
let template = "db.users.findOne({name: '$1'})";
let result = router.substitute_parameters(template, &["davin".to_string()]);
assert_eq!(result, "db.users.findOne({name: 'davin'})");
let template = "db.users.find({name: '$1', age: $2})";
let result =
router.substitute_parameters(template, &["davin".to_string(), "25".to_string()]);
assert_eq!(result, "db.users.find({name: 'davin', age: 25})");
let template = "db.users.find({name: '$1', age: $2, city: '$3', active: $4})";
let result = router.substitute_parameters(
template,
&[
"John".to_string(),
"30".to_string(),
"New York".to_string(),
"true".to_string(),
],
);
assert_eq!(
result,
"db.users.find({name: 'John', age: 30, city: 'New York', active: true})"
);
}
#[test]
fn test_substitute_parameters_with_aggregation() {
let router = CommandRouter {
context: ExecutionContext::new(
crate::connection::ConnectionManager::new(
"mongodb://localhost:27017".to_string(),
crate::config::ConnectionConfig::default(),
),
crate::repl::SharedState::new("test".to_string()),
),
};
let template = "db.users.find({age: {$in: [$*]}})";
let result = router.substitute_parameters(
template,
&["18".to_string(), "25".to_string(), "30".to_string()],
);
assert_eq!(result, "db.users.find({age: {$in: [18, 25, 30]}})");
let template = "db.users.find({category: {$in: [$@]}})";
let result =
router.substitute_parameters(template, &["admin".to_string(), "user".to_string()]);
assert_eq!(
result,
"db.users.find({category: {$in: ['admin', 'user']}})"
);
}
#[tokio::test]
async fn test_command_router_help() {
}
}