nirv_engine/cli/
cli_runner.rs1use clap::Parser;
2use colored::*;
3use crate::cli::{CliArgs, Commands, OutputFormatter};
4use crate::engine::{DefaultQueryParser, DefaultQueryExecutor, DefaultDispatcher, Dispatcher};
5use crate::connectors::{MockConnector, Connector};
6use crate::utils::error::NirvResult;
7
8pub struct CliRunner {
10 query_parser: DefaultQueryParser,
11 query_executor: DefaultQueryExecutor,
12 dispatcher: DefaultDispatcher,
13}
14
15impl CliRunner {
16 pub async fn new() -> NirvResult<Self> {
18 let query_parser = DefaultQueryParser::new()?;
19 let mut dispatcher = DefaultDispatcher::new();
20
21 let mut mock_connector = Box::new(MockConnector::new());
23 let config = crate::connectors::ConnectorInitConfig::new();
24 mock_connector.connect(config).await?;
25 dispatcher.register_connector("mock", mock_connector).await?;
26
27 let query_executor = DefaultQueryExecutor::new();
28
29 Ok(Self {
30 query_parser,
31 query_executor,
32 dispatcher,
33 })
34 }
35
36 pub async fn execute_query(&self, sql: &str, format: &crate::cli::OutputFormat, verbose: bool) -> NirvResult<String> {
38 if verbose {
39 eprintln!("{}", OutputFormatter::format_info(&format!("Parsing query: {}", sql)));
40 }
41
42 let internal_query = self.query_parser.parse(sql)?;
44
45 if verbose {
46 eprintln!("{}", OutputFormatter::format_info(&format!("Query parsed successfully. Sources: {:?}",
47 internal_query.sources.iter().map(|s| format!("{}.{}", s.object_type, s.identifier)).collect::<Vec<_>>())));
48 }
49
50 let connector_queries = self.dispatcher.route_query(&internal_query).await?;
52
53 if verbose {
54 eprintln!("{}", OutputFormatter::format_info(&format!("Query routed to {} connector(s)", connector_queries.len())));
55 }
56
57 let result = self.dispatcher.execute_distributed_query(connector_queries).await?;
59
60 if verbose {
61 eprintln!("{}", OutputFormatter::format_info(&format!("Query executed successfully. {} rows returned", result.row_count())));
62 }
63
64 Ok(OutputFormatter::format_result(&result, format))
66 }
67
68 pub fn list_sources(&self, detailed: bool) -> String {
70 let available_types = self.dispatcher.list_available_types();
71
72 if available_types.is_empty() {
73 return OutputFormatter::format_info("No data sources are currently registered.");
74 }
75
76 let mut output = String::new();
77 output.push_str(&format!("{}\n", "Available Data Sources:".bold()));
78
79 for data_type in &available_types {
80 if detailed {
81 if let Some(connector) = self.dispatcher.get_connector(data_type) {
82 let capabilities = connector.get_capabilities();
83 output.push_str(&format!(" {} {}\n", "•".green(), data_type.cyan().bold()));
84 output.push_str(&format!(" Type: {:?}\n", connector.get_connector_type()));
85 output.push_str(&format!(" Connected: {}\n",
86 if connector.is_connected() { "Yes".green() } else { "No".red() }));
87 output.push_str(&format!(" Supports Joins: {}\n",
88 if capabilities.supports_joins { "Yes".green() } else { "No".red() }));
89 output.push_str(&format!(" Supports Transactions: {}\n",
90 if capabilities.supports_transactions { "Yes".green() } else { "No".red() }));
91 output.push_str(&format!(" Max Concurrent Queries: {}\n",
92 capabilities.max_concurrent_queries.map(|n| n.to_string()).unwrap_or_else(|| "Unlimited".to_string())));
93 } else {
94 output.push_str(&format!(" {} {} (connector not found)\n", "•".red(), data_type));
95 }
96 } else {
97 output.push_str(&format!(" {} {}\n", "•".green(), data_type.cyan()));
98 }
99 }
100
101 output
102 }
103
104 pub async fn show_schema(&self, source: &str) -> NirvResult<String> {
106 let parts: Vec<&str> = source.split('.').collect();
108 if parts.len() != 2 {
109 return Err(crate::utils::error::NirvError::Internal(
110 "Source must be in format 'type.identifier' (e.g., 'postgres.users')".to_string()
111 ));
112 }
113
114 let object_type = parts[0];
115 let identifier = parts[1];
116
117 if !self.dispatcher.is_type_registered(object_type) {
119 return Err(crate::utils::error::NirvError::Dispatcher(
120 crate::utils::error::DispatcherError::UnregisteredObjectType(
121 format!("Data object type '{}' is not registered. Available types: {:?}",
122 object_type,
123 self.dispatcher.list_available_types())
124 )
125 ));
126 }
127
128 if let Some(connector) = self.dispatcher.get_connector(object_type) {
130 let schema = connector.get_schema(identifier).await?;
131
132 let mut output = String::new();
133 output.push_str(&format!("{} {}\n", "Schema for".bold(), source.cyan().bold()));
134 output.push_str(&format!("Name: {}\n", schema.name));
135
136 if let Some(pk) = &schema.primary_key {
137 output.push_str(&format!("Primary Key: {}\n", pk.join(", ").yellow()));
138 }
139
140 output.push_str(&format!("\n{}\n", "Columns:".bold()));
141 for col in &schema.columns {
142 let nullable_str = if col.nullable { "NULL" } else { "NOT NULL" };
143 let nullable_colored = if col.nullable {
144 nullable_str.yellow()
145 } else {
146 nullable_str.green()
147 };
148
149 output.push_str(&format!(" {} {} {} {}\n",
150 "•".green(),
151 col.name.cyan().bold(),
152 format!("{:?}", col.data_type).blue(),
153 nullable_colored
154 ));
155 }
156
157 if !schema.indexes.is_empty() {
158 output.push_str(&format!("\n{}\n", "Indexes:".bold()));
159 for index in &schema.indexes {
160 let unique_str = if index.unique { " (UNIQUE)" } else { "" };
161 output.push_str(&format!(" {} {} on ({}){}\n",
162 "•".green(),
163 index.name.cyan(),
164 index.columns.join(", ").yellow(),
165 unique_str.magenta()
166 ));
167 }
168 }
169
170 Ok(output)
171 } else {
172 Err(crate::utils::error::NirvError::Internal(
173 format!("Connector for type '{}' not found", object_type)
174 ))
175 }
176 }
177}
178
179pub async fn run_cli() -> anyhow::Result<()> {
181 let args = CliArgs::parse();
182
183 let runner = match CliRunner::new().await {
185 Ok(runner) => runner,
186 Err(e) => {
187 eprintln!("{}", OutputFormatter::format_error(&e));
188 std::process::exit(1);
189 }
190 };
191
192 let result = match args.command {
194 Commands::Query { sql, format, config: _, verbose } => {
195 match runner.execute_query(&sql, &format, verbose).await {
196 Ok(output) => {
197 println!("{}", output);
198 Ok(())
199 }
200 Err(e) => {
201 eprintln!("{}", OutputFormatter::format_error(&e));
202 std::process::exit(1);
203 }
204 }
205 }
206
207 Commands::Sources { detailed } => {
208 let output = runner.list_sources(detailed);
209 println!("{}", output);
210 Ok(())
211 }
212
213 Commands::Schema { source } => {
214 match runner.show_schema(&source).await {
215 Ok(output) => {
216 println!("{}", output);
217 Ok(())
218 }
219 Err(e) => {
220 eprintln!("{}", OutputFormatter::format_error(&e));
221 std::process::exit(1);
222 }
223 }
224 }
225 };
226
227 result
228}