nirv_engine/connectors/
file_connector.rs

1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use std::fs;
4use glob::glob;
5use csv::ReaderBuilder;
6use serde_json;
7
8use crate::connectors::{Connector, ConnectorInitConfig, ConnectorCapabilities};
9use crate::utils::{
10    types::{
11        ConnectorType, ConnectorQuery, QueryResult, Schema, ColumnMetadata, DataType, 
12        Row, Value, PredicateOperator, PredicateValue
13    },
14    error::{ConnectorError, NirvResult},
15};
16
17/// File system connector for CSV, JSON, and other file formats
18pub struct FileConnector {
19    base_path: Option<PathBuf>,
20    supported_extensions: Vec<String>,
21    connected: bool,
22}
23
24impl FileConnector {
25    /// Create a new file connector instance
26    pub fn new() -> Self {
27        Self {
28            base_path: None,
29            supported_extensions: vec!["csv".to_string(), "json".to_string()],
30            connected: false,
31        }
32    }
33
34    /// Check if a file extension is supported
35    fn is_supported_extension(&self, extension: &str) -> bool {
36        self.supported_extensions.iter().any(|ext| ext.eq_ignore_ascii_case(extension))
37    }
38
39    /// Resolve file path, handling patterns and relative paths
40    fn resolve_file_path(&self, identifier: &str) -> NirvResult<Vec<PathBuf>> {
41        let base_path = self.base_path.as_ref()
42            .ok_or_else(|| ConnectorError::ConnectionFailed("Not connected".to_string()))?;
43
44        let full_path = base_path.join(identifier);
45        
46        // Check if it's a glob pattern
47        if identifier.contains('*') || identifier.contains('?') {
48            let pattern = full_path.to_string_lossy();
49            let mut paths = Vec::new();
50            
51            match glob(&pattern) {
52                Ok(entries) => {
53                    for entry in entries {
54                        match entry {
55                            Ok(path) => {
56                                if path.is_file() {
57                                    if let Some(ext) = path.extension() {
58                                        if self.is_supported_extension(&ext.to_string_lossy()) {
59                                            paths.push(path);
60                                        }
61                                    }
62                                }
63                            }
64                            Err(_) => continue,
65                        }
66                    }
67                }
68                Err(e) => {
69                    return Err(ConnectorError::QueryExecutionFailed(
70                        format!("Pattern matching failed: {}", e)
71                    ).into());
72                }
73            }
74            
75            if paths.is_empty() {
76                return Err(ConnectorError::QueryExecutionFailed(
77                    format!("No files found matching pattern: {}", identifier)
78                ).into());
79            }
80            
81            Ok(paths)
82        } else {
83            // Single file
84            if !full_path.exists() {
85                return Err(ConnectorError::QueryExecutionFailed(
86                    format!("File not found: {}", identifier)
87                ).into());
88            }
89            
90            if !full_path.is_file() {
91                return Err(ConnectorError::QueryExecutionFailed(
92                    format!("Path is not a file: {}", identifier)
93                ).into());
94            }
95            
96            // Check file extension
97            if let Some(ext) = full_path.extension() {
98                if !self.is_supported_extension(&ext.to_string_lossy()) {
99                    return Err(ConnectorError::UnsupportedOperation(
100                        format!("Unsupported file extension: {}", ext.to_string_lossy())
101                    ).into());
102                }
103            } else {
104                return Err(ConnectorError::UnsupportedOperation(
105                    "File has no extension".to_string()
106                ).into());
107            }
108            
109            Ok(vec![full_path])
110        }
111    }
112
113    /// Parse CSV file and return structured data
114    fn parse_csv_file(&self, file_path: &Path) -> NirvResult<(Vec<ColumnMetadata>, Vec<Row>)> {
115        let content = fs::read_to_string(file_path)
116            .map_err(|e| ConnectorError::QueryExecutionFailed(
117                format!("Failed to read CSV file: {}", e)
118            ))?;
119
120        let mut reader = ReaderBuilder::new()
121            .has_headers(true)
122            .from_reader(content.as_bytes());
123
124        // Get headers
125        let headers = reader.headers()
126            .map_err(|e| ConnectorError::QueryExecutionFailed(
127                format!("Failed to read CSV headers: {}", e)
128            ))?;
129
130        let columns: Vec<ColumnMetadata> = headers.iter()
131            .map(|header| ColumnMetadata {
132                name: header.to_string(),
133                data_type: DataType::Text, // Default to text, could be improved with type inference
134                nullable: true,
135            })
136            .collect();
137
138        let mut rows = Vec::new();
139        for result in reader.records() {
140            let record = result
141                .map_err(|e| ConnectorError::QueryExecutionFailed(
142                    format!("Failed to read CSV record: {}", e)
143                ))?;
144
145            let values: Vec<Value> = record.iter()
146                .map(|field| {
147                    // Try to infer type from string value
148                    if field.is_empty() {
149                        Value::Null
150                    } else if let Ok(int_val) = field.parse::<i64>() {
151                        Value::Integer(int_val)
152                    } else if let Ok(float_val) = field.parse::<f64>() {
153                        Value::Float(float_val)
154                    } else if let Ok(bool_val) = field.parse::<bool>() {
155                        Value::Boolean(bool_val)
156                    } else {
157                        Value::Text(field.to_string())
158                    }
159                })
160                .collect();
161
162            rows.push(Row::new(values));
163        }
164
165        Ok((columns, rows))
166    }
167
168    /// Parse JSON file and return structured data
169    fn parse_json_file(&self, file_path: &Path) -> NirvResult<(Vec<ColumnMetadata>, Vec<Row>)> {
170        let content = fs::read_to_string(file_path)
171            .map_err(|e| ConnectorError::QueryExecutionFailed(
172                format!("Failed to read JSON file: {}", e)
173            ))?;
174
175        let json_data: serde_json::Value = serde_json::from_str(&content)
176            .map_err(|e| ConnectorError::QueryExecutionFailed(
177                format!("Failed to parse JSON: {}", e)
178            ))?;
179
180        match json_data {
181            serde_json::Value::Array(array) => {
182                if array.is_empty() {
183                    return Ok((Vec::new(), Vec::new()));
184                }
185
186                // Infer schema from first object
187                let mut columns = Vec::new();
188                if let Some(first_obj) = array.first() {
189                    if let serde_json::Value::Object(obj) = first_obj {
190                        for key in obj.keys() {
191                            columns.push(ColumnMetadata {
192                                name: key.clone(),
193                                data_type: DataType::Text, // Default to text
194                                nullable: true,
195                            });
196                        }
197                    }
198                }
199
200                // Convert array to rows
201                let mut rows = Vec::new();
202                for item in array {
203                    if let serde_json::Value::Object(obj) = item {
204                        let mut values = Vec::new();
205                        for column in &columns {
206                            let value = obj.get(&column.name)
207                                .map(|v| self.json_value_to_value(v))
208                                .unwrap_or(Value::Null);
209                            values.push(value);
210                        }
211                        rows.push(Row::new(values));
212                    }
213                }
214
215                Ok((columns, rows))
216            }
217            _ => Err(ConnectorError::QueryExecutionFailed(
218                "JSON file must contain an array of objects".to_string()
219            ).into())
220        }
221    }
222
223    /// Convert serde_json::Value to our Value type
224    fn json_value_to_value(&self, json_val: &serde_json::Value) -> Value {
225        match json_val {
226            serde_json::Value::Null => Value::Null,
227            serde_json::Value::Bool(b) => Value::Boolean(*b),
228            serde_json::Value::Number(n) => {
229                if let Some(i) = n.as_i64() {
230                    Value::Integer(i)
231                } else if let Some(f) = n.as_f64() {
232                    Value::Float(f)
233                } else {
234                    Value::Text(n.to_string())
235                }
236            }
237            serde_json::Value::String(s) => Value::Text(s.clone()),
238            serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
239                Value::Json(json_val.to_string())
240            }
241        }
242    }
243
244    /// Apply WHERE clause predicates to filter rows
245    fn apply_predicates(&self, columns: &[ColumnMetadata], rows: Vec<Row>, predicates: &[crate::utils::types::Predicate]) -> Vec<Row> {
246        if predicates.is_empty() {
247            return rows;
248        }
249
250        rows.into_iter()
251            .filter(|row| {
252                predicates.iter().all(|predicate| {
253                    // Find column index
254                    let column_index = columns.iter()
255                        .position(|col| col.name == predicate.column);
256                    
257                    if let Some(index) = column_index {
258                        if let Some(value) = row.values.get(index) {
259                            self.evaluate_predicate(value, &predicate.operator, &predicate.value)
260                        } else {
261                            false
262                        }
263                    } else {
264                        false // Column not found
265                    }
266                })
267            })
268            .collect()
269    }
270
271    /// Evaluate a single predicate against a value
272    fn evaluate_predicate(&self, value: &Value, operator: &PredicateOperator, predicate_value: &PredicateValue) -> bool {
273        match operator {
274            PredicateOperator::Equal => self.values_equal(value, predicate_value),
275            PredicateOperator::NotEqual => !self.values_equal(value, predicate_value),
276            PredicateOperator::GreaterThan => self.value_greater_than(value, predicate_value),
277            PredicateOperator::GreaterThanOrEqual => {
278                self.value_greater_than(value, predicate_value) || self.values_equal(value, predicate_value)
279            }
280            PredicateOperator::LessThan => self.value_less_than(value, predicate_value),
281            PredicateOperator::LessThanOrEqual => {
282                self.value_less_than(value, predicate_value) || self.values_equal(value, predicate_value)
283            }
284            PredicateOperator::Like => self.value_like(value, predicate_value),
285            PredicateOperator::In => self.value_in(value, predicate_value),
286            PredicateOperator::IsNull => matches!(value, Value::Null),
287            PredicateOperator::IsNotNull => !matches!(value, Value::Null),
288        }
289    }
290
291    /// Check if two values are equal
292    fn values_equal(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
293        match (value, predicate_value) {
294            (Value::Text(v), PredicateValue::String(p)) => v == p,
295            (Value::Integer(v), PredicateValue::Integer(p)) => v == p,
296            (Value::Float(v), PredicateValue::Number(p)) => (v - p).abs() < f64::EPSILON,
297            (Value::Boolean(v), PredicateValue::Boolean(p)) => v == p,
298            (Value::Null, PredicateValue::Null) => true,
299            // Type coercion
300            (Value::Integer(v), PredicateValue::Number(p)) => (*v as f64 - p).abs() < f64::EPSILON,
301            (Value::Float(v), PredicateValue::Integer(p)) => (v - *p as f64).abs() < f64::EPSILON,
302            _ => false,
303        }
304    }
305
306    /// Check if value is greater than predicate value
307    fn value_greater_than(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
308        match (value, predicate_value) {
309            (Value::Integer(v), PredicateValue::Integer(p)) => v > p,
310            (Value::Float(v), PredicateValue::Number(p)) => v > p,
311            (Value::Integer(v), PredicateValue::Number(p)) => (*v as f64) > *p,
312            (Value::Float(v), PredicateValue::Integer(p)) => *v > (*p as f64),
313            (Value::Text(v), PredicateValue::String(p)) => v > p,
314            _ => false,
315        }
316    }
317
318    /// Check if value is less than predicate value
319    fn value_less_than(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
320        match (value, predicate_value) {
321            (Value::Integer(v), PredicateValue::Integer(p)) => v < p,
322            (Value::Float(v), PredicateValue::Number(p)) => v < p,
323            (Value::Integer(v), PredicateValue::Number(p)) => (*v as f64) < *p,
324            (Value::Float(v), PredicateValue::Integer(p)) => *v < (*p as f64),
325            (Value::Text(v), PredicateValue::String(p)) => v < p,
326            _ => false,
327        }
328    }
329
330    /// Check if value matches LIKE pattern
331    fn value_like(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
332        match (value, predicate_value) {
333            (Value::Text(v), PredicateValue::String(pattern)) => {
334                // Simple LIKE implementation - convert SQL LIKE to regex
335                let regex_pattern = pattern
336                    .replace('%', ".*")
337                    .replace('_', ".");
338                
339                if let Ok(regex) = regex::Regex::new(&format!("^{}$", regex_pattern)) {
340                    regex.is_match(v)
341                } else {
342                    false
343                }
344            }
345            _ => false,
346        }
347    }
348
349    /// Check if value is in list
350    fn value_in(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
351        match predicate_value {
352            PredicateValue::List(list) => {
353                list.iter().any(|item| self.values_equal(value, item))
354            }
355            _ => false,
356        }
357    }
358}
359
360impl Default for FileConnector {
361    fn default() -> Self {
362        Self::new()
363    }
364}
365
366#[async_trait]
367impl Connector for FileConnector {
368    async fn connect(&mut self, config: ConnectorInitConfig) -> NirvResult<()> {
369        let base_path_str = config.connection_params.get("base_path")
370            .ok_or_else(|| ConnectorError::ConnectionFailed(
371                "base_path parameter is required".to_string()
372            ))?;
373
374        let base_path = PathBuf::from(base_path_str);
375        
376        if !base_path.exists() {
377            return Err(ConnectorError::ConnectionFailed(
378                format!("Base path does not exist: {}", base_path_str)
379            ).into());
380        }
381
382        if !base_path.is_dir() {
383            return Err(ConnectorError::ConnectionFailed(
384                format!("Base path is not a directory: {}", base_path_str)
385            ).into());
386        }
387
388        // Update supported extensions if provided
389        if let Some(extensions_str) = config.connection_params.get("file_extensions") {
390            self.supported_extensions = extensions_str
391                .split(',')
392                .map(|ext| ext.trim().to_lowercase())
393                .collect();
394        }
395
396        self.base_path = Some(base_path);
397        self.connected = true;
398
399        Ok(())
400    }
401
402    async fn execute_query(&self, query: ConnectorQuery) -> NirvResult<QueryResult> {
403        if !self.connected {
404            return Err(ConnectorError::ConnectionFailed(
405                "File connector is not connected".to_string()
406            ).into());
407        }
408
409        if query.query.sources.is_empty() {
410            return Err(ConnectorError::QueryExecutionFailed(
411                "No data source specified in query".to_string()
412            ).into());
413        }
414
415        let source = &query.query.sources[0]; // For now, handle single source
416        let file_paths = self.resolve_file_path(&source.identifier)?;
417
418        let mut all_columns: Option<Vec<ColumnMetadata>> = None;
419        let mut all_rows = Vec::new();
420
421        // Process each file (for pattern matching)
422        for file_path in file_paths {
423            let (columns, mut rows) = if let Some(ext) = file_path.extension() {
424                match ext.to_string_lossy().to_lowercase().as_str() {
425                    "csv" => self.parse_csv_file(&file_path)?,
426                    "json" => self.parse_json_file(&file_path)?,
427                    _ => return Err(ConnectorError::UnsupportedOperation(
428                        format!("Unsupported file extension: {}", ext.to_string_lossy())
429                    ).into()),
430                }
431            } else {
432                return Err(ConnectorError::UnsupportedOperation(
433                    "File has no extension".to_string()
434                ).into());
435            };
436
437            // Apply WHERE clause predicates (pushdown optimization)
438            rows = self.apply_predicates(&columns, rows, &query.query.predicates);
439
440            // For multiple files, ensure schema compatibility
441            if let Some(ref existing_columns) = all_columns {
442                if existing_columns.len() != columns.len() || 
443                   existing_columns.iter().zip(columns.iter()).any(|(a, b)| a.name != b.name) {
444                    return Err(ConnectorError::QueryExecutionFailed(
445                        "Schema mismatch between files in pattern".to_string()
446                    ).into());
447                }
448            } else {
449                all_columns = Some(columns);
450            }
451
452            all_rows.extend(rows);
453        }
454
455        let columns = all_columns.unwrap_or_default();
456
457        // Apply LIMIT if specified
458        if let Some(limit) = query.query.limit {
459            all_rows.truncate(limit as usize);
460        }
461
462        Ok(QueryResult {
463            columns,
464            rows: all_rows,
465            affected_rows: None,
466            execution_time: std::time::Duration::from_millis(0), // TODO: measure actual time
467        })
468    }
469
470    async fn get_schema(&self, object_name: &str) -> NirvResult<Schema> {
471        if !self.connected {
472            return Err(ConnectorError::ConnectionFailed(
473                "File connector is not connected".to_string()
474            ).into());
475        }
476
477        let file_paths = self.resolve_file_path(object_name)?;
478        
479        if file_paths.is_empty() {
480            return Err(ConnectorError::SchemaRetrievalFailed(
481                format!("No files found for: {}", object_name)
482            ).into());
483        }
484
485        // Use first file for schema (assuming all files in pattern have same schema)
486        let file_path = &file_paths[0];
487        
488        let (columns, _) = if let Some(ext) = file_path.extension() {
489            match ext.to_string_lossy().to_lowercase().as_str() {
490                "csv" => self.parse_csv_file(file_path)?,
491                "json" => self.parse_json_file(file_path)?,
492                _ => return Err(ConnectorError::UnsupportedOperation(
493                    format!("Unsupported file extension: {}", ext.to_string_lossy())
494                ).into()),
495            }
496        } else {
497            return Err(ConnectorError::UnsupportedOperation(
498                "File has no extension".to_string()
499            ).into());
500        };
501
502        Ok(Schema {
503            name: object_name.to_string(),
504            columns,
505            primary_key: None,
506            indexes: Vec::new(),
507        })
508    }
509
510    async fn disconnect(&mut self) -> NirvResult<()> {
511        self.base_path = None;
512        self.connected = false;
513        Ok(())
514    }
515
516    fn get_connector_type(&self) -> ConnectorType {
517        ConnectorType::File
518    }
519
520    fn supports_transactions(&self) -> bool {
521        false // File system doesn't support transactions
522    }
523
524    fn is_connected(&self) -> bool {
525        self.connected
526    }
527
528    fn get_capabilities(&self) -> ConnectorCapabilities {
529        ConnectorCapabilities {
530            supports_joins: false, // No cross-file joins for now
531            supports_aggregations: true, // Basic aggregations can be implemented
532            supports_subqueries: false,
533            supports_transactions: false,
534            supports_schema_introspection: true,
535            max_concurrent_queries: Some(1), // Single-threaded file access for now
536        }
537    }
538}