nirv_engine/cli/
cli_runner.rs

1use clap::Parser;
2use colored::*;
3use crate::cli::{CliArgs, Commands, OutputFormatter};
4use crate::engine::{DefaultQueryParser, DefaultQueryExecutor, DefaultDispatcher, Dispatcher};
5use crate::connectors::{MockConnector, Connector};
6use crate::utils::error::NirvResult;
7
8/// Main CLI runner that handles command execution
9pub struct CliRunner {
10    query_parser: DefaultQueryParser,
11    query_executor: DefaultQueryExecutor,
12    dispatcher: DefaultDispatcher,
13}
14
15impl CliRunner {
16    /// Create a new CLI runner with default components
17    pub async fn new() -> NirvResult<Self> {
18        let query_parser = DefaultQueryParser::new()?;
19        let mut dispatcher = DefaultDispatcher::new();
20        
21        // Register and connect mock connector for testing
22        let mut mock_connector = Box::new(MockConnector::new());
23        let config = crate::connectors::ConnectorInitConfig::new();
24        mock_connector.connect(config).await?;
25        dispatcher.register_connector("mock", mock_connector).await?;
26        
27        let query_executor = DefaultQueryExecutor::new();
28        
29        Ok(Self {
30            query_parser,
31            query_executor,
32            dispatcher,
33        })
34    }
35    
36    /// Execute a SQL query and return formatted results
37    pub async fn execute_query(&self, sql: &str, format: &crate::cli::OutputFormat, verbose: bool) -> NirvResult<String> {
38        if verbose {
39            eprintln!("{}", OutputFormatter::format_info(&format!("Parsing query: {}", sql)));
40        }
41        
42        // Parse the SQL query
43        let internal_query = self.query_parser.parse(sql)?;
44        
45        if verbose {
46            eprintln!("{}", OutputFormatter::format_info(&format!("Query parsed successfully. Sources: {:?}", 
47                internal_query.sources.iter().map(|s| format!("{}.{}", s.object_type, s.identifier)).collect::<Vec<_>>())));
48        }
49        
50        // Route the query through the dispatcher
51        let connector_queries = self.dispatcher.route_query(&internal_query).await?;
52        
53        if verbose {
54            eprintln!("{}", OutputFormatter::format_info(&format!("Query routed to {} connector(s)", connector_queries.len())));
55        }
56        
57        // Execute the distributed query
58        let result = self.dispatcher.execute_distributed_query(connector_queries).await?;
59        
60        if verbose {
61            eprintln!("{}", OutputFormatter::format_info(&format!("Query executed successfully. {} rows returned", result.row_count())));
62        }
63        
64        // Format the results
65        Ok(OutputFormatter::format_result(&result, format))
66    }
67    
68    /// List available data sources
69    pub fn list_sources(&self, detailed: bool) -> String {
70        let available_types = self.dispatcher.list_available_types();
71        
72        if available_types.is_empty() {
73            return OutputFormatter::format_info("No data sources are currently registered.");
74        }
75        
76        let mut output = String::new();
77        output.push_str(&format!("{}\n", "Available Data Sources:".bold()));
78        
79        for data_type in &available_types {
80            if detailed {
81                if let Some(connector) = self.dispatcher.get_connector(data_type) {
82                    let capabilities = connector.get_capabilities();
83                    output.push_str(&format!("  {} {}\n", "•".green(), data_type.cyan().bold()));
84                    output.push_str(&format!("    Type: {:?}\n", connector.get_connector_type()));
85                    output.push_str(&format!("    Connected: {}\n", 
86                        if connector.is_connected() { "Yes".green() } else { "No".red() }));
87                    output.push_str(&format!("    Supports Joins: {}\n", 
88                        if capabilities.supports_joins { "Yes".green() } else { "No".red() }));
89                    output.push_str(&format!("    Supports Transactions: {}\n", 
90                        if capabilities.supports_transactions { "Yes".green() } else { "No".red() }));
91                    output.push_str(&format!("    Max Concurrent Queries: {}\n", 
92                        capabilities.max_concurrent_queries.map(|n| n.to_string()).unwrap_or_else(|| "Unlimited".to_string())));
93                } else {
94                    output.push_str(&format!("  {} {} (connector not found)\n", "•".red(), data_type));
95                }
96            } else {
97                output.push_str(&format!("  {} {}\n", "•".green(), data_type.cyan()));
98            }
99        }
100        
101        output
102    }
103    
104    /// Show schema information for a data source
105    pub async fn show_schema(&self, source: &str) -> NirvResult<String> {
106        // Parse source identifier (e.g., "postgres.users" -> type="postgres", identifier="users")
107        let parts: Vec<&str> = source.split('.').collect();
108        if parts.len() != 2 {
109            return Err(crate::utils::error::NirvError::Internal(
110                "Source must be in format 'type.identifier' (e.g., 'postgres.users')".to_string()
111            ));
112        }
113        
114        let object_type = parts[0];
115        let identifier = parts[1];
116        
117        // Check if the data object type is registered
118        if !self.dispatcher.is_type_registered(object_type) {
119            return Err(crate::utils::error::NirvError::Dispatcher(
120                crate::utils::error::DispatcherError::UnregisteredObjectType(
121                    format!("Data object type '{}' is not registered. Available types: {:?}", 
122                           object_type, 
123                           self.dispatcher.list_available_types())
124                )
125            ));
126        }
127        
128        // Get the connector and retrieve schema
129        if let Some(connector) = self.dispatcher.get_connector(object_type) {
130            let schema = connector.get_schema(identifier).await?;
131            
132            let mut output = String::new();
133            output.push_str(&format!("{} {}\n", "Schema for".bold(), source.cyan().bold()));
134            output.push_str(&format!("Name: {}\n", schema.name));
135            
136            if let Some(pk) = &schema.primary_key {
137                output.push_str(&format!("Primary Key: {}\n", pk.join(", ").yellow()));
138            }
139            
140            output.push_str(&format!("\n{}\n", "Columns:".bold()));
141            for col in &schema.columns {
142                let nullable_str = if col.nullable { "NULL" } else { "NOT NULL" };
143                let nullable_colored = if col.nullable { 
144                    nullable_str.yellow() 
145                } else { 
146                    nullable_str.green() 
147                };
148                
149                output.push_str(&format!("  {} {} {} {}\n", 
150                    "•".green(),
151                    col.name.cyan().bold(),
152                    format!("{:?}", col.data_type).blue(),
153                    nullable_colored
154                ));
155            }
156            
157            if !schema.indexes.is_empty() {
158                output.push_str(&format!("\n{}\n", "Indexes:".bold()));
159                for index in &schema.indexes {
160                    let unique_str = if index.unique { " (UNIQUE)" } else { "" };
161                    output.push_str(&format!("  {} {} on ({}){}\n", 
162                        "•".green(),
163                        index.name.cyan(),
164                        index.columns.join(", ").yellow(),
165                        unique_str.magenta()
166                    ));
167                }
168            }
169            
170            Ok(output)
171        } else {
172            Err(crate::utils::error::NirvError::Internal(
173                format!("Connector for type '{}' not found", object_type)
174            ))
175        }
176    }
177}
178
179/// Main entry point for CLI execution
180pub async fn run_cli() -> anyhow::Result<()> {
181    let args = CliArgs::parse();
182    
183    // Initialize CLI runner
184    let runner = match CliRunner::new().await {
185        Ok(runner) => runner,
186        Err(e) => {
187            eprintln!("{}", OutputFormatter::format_error(&e));
188            std::process::exit(1);
189        }
190    };
191    
192    // Execute the command
193    let result = match args.command {
194        Commands::Query { sql, format, config: _, verbose } => {
195            match runner.execute_query(&sql, &format, verbose).await {
196                Ok(output) => {
197                    println!("{}", output);
198                    Ok(())
199                }
200                Err(e) => {
201                    eprintln!("{}", OutputFormatter::format_error(&e));
202                    std::process::exit(1);
203                }
204            }
205        }
206        
207        Commands::Sources { detailed } => {
208            let output = runner.list_sources(detailed);
209            println!("{}", output);
210            Ok(())
211        }
212        
213        Commands::Schema { source } => {
214            match runner.show_schema(&source).await {
215                Ok(output) => {
216                    println!("{}", output);
217                    Ok(())
218                }
219                Err(e) => {
220                    eprintln!("{}", OutputFormatter::format_error(&e));
221                    std::process::exit(1);
222                }
223            }
224        }
225    };
226    
227    result
228}