1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use std::fs;
4use glob::glob;
5use csv::ReaderBuilder;
6use serde_json;
7
8use crate::connectors::{Connector, ConnectorInitConfig, ConnectorCapabilities};
9use crate::utils::{
10 types::{
11 ConnectorType, ConnectorQuery, QueryResult, Schema, ColumnMetadata, DataType,
12 Row, Value, PredicateOperator, PredicateValue
13 },
14 error::{ConnectorError, NirvResult},
15};
16
17pub struct FileConnector {
19 base_path: Option<PathBuf>,
20 supported_extensions: Vec<String>,
21 connected: bool,
22}
23
24impl FileConnector {
25 pub fn new() -> Self {
27 Self {
28 base_path: None,
29 supported_extensions: vec!["csv".to_string(), "json".to_string()],
30 connected: false,
31 }
32 }
33
34 fn is_supported_extension(&self, extension: &str) -> bool {
36 self.supported_extensions.iter().any(|ext| ext.eq_ignore_ascii_case(extension))
37 }
38
39 fn resolve_file_path(&self, identifier: &str) -> NirvResult<Vec<PathBuf>> {
41 let base_path = self.base_path.as_ref()
42 .ok_or_else(|| ConnectorError::ConnectionFailed("Not connected".to_string()))?;
43
44 let full_path = base_path.join(identifier);
45
46 if identifier.contains('*') || identifier.contains('?') {
48 let pattern = full_path.to_string_lossy();
49 let mut paths = Vec::new();
50
51 match glob(&pattern) {
52 Ok(entries) => {
53 for entry in entries {
54 match entry {
55 Ok(path) => {
56 if path.is_file() {
57 if let Some(ext) = path.extension() {
58 if self.is_supported_extension(&ext.to_string_lossy()) {
59 paths.push(path);
60 }
61 }
62 }
63 }
64 Err(_) => continue,
65 }
66 }
67 }
68 Err(e) => {
69 return Err(ConnectorError::QueryExecutionFailed(
70 format!("Pattern matching failed: {}", e)
71 ).into());
72 }
73 }
74
75 if paths.is_empty() {
76 return Err(ConnectorError::QueryExecutionFailed(
77 format!("No files found matching pattern: {}", identifier)
78 ).into());
79 }
80
81 Ok(paths)
82 } else {
83 if !full_path.exists() {
85 return Err(ConnectorError::QueryExecutionFailed(
86 format!("File not found: {}", identifier)
87 ).into());
88 }
89
90 if !full_path.is_file() {
91 return Err(ConnectorError::QueryExecutionFailed(
92 format!("Path is not a file: {}", identifier)
93 ).into());
94 }
95
96 if let Some(ext) = full_path.extension() {
98 if !self.is_supported_extension(&ext.to_string_lossy()) {
99 return Err(ConnectorError::UnsupportedOperation(
100 format!("Unsupported file extension: {}", ext.to_string_lossy())
101 ).into());
102 }
103 } else {
104 return Err(ConnectorError::UnsupportedOperation(
105 "File has no extension".to_string()
106 ).into());
107 }
108
109 Ok(vec![full_path])
110 }
111 }
112
113 fn parse_csv_file(&self, file_path: &Path) -> NirvResult<(Vec<ColumnMetadata>, Vec<Row>)> {
115 let content = fs::read_to_string(file_path)
116 .map_err(|e| ConnectorError::QueryExecutionFailed(
117 format!("Failed to read CSV file: {}", e)
118 ))?;
119
120 let mut reader = ReaderBuilder::new()
121 .has_headers(true)
122 .from_reader(content.as_bytes());
123
124 let headers = reader.headers()
126 .map_err(|e| ConnectorError::QueryExecutionFailed(
127 format!("Failed to read CSV headers: {}", e)
128 ))?;
129
130 let columns: Vec<ColumnMetadata> = headers.iter()
131 .map(|header| ColumnMetadata {
132 name: header.to_string(),
133 data_type: DataType::Text, nullable: true,
135 })
136 .collect();
137
138 let mut rows = Vec::new();
139 for result in reader.records() {
140 let record = result
141 .map_err(|e| ConnectorError::QueryExecutionFailed(
142 format!("Failed to read CSV record: {}", e)
143 ))?;
144
145 let values: Vec<Value> = record.iter()
146 .map(|field| {
147 if field.is_empty() {
149 Value::Null
150 } else if let Ok(int_val) = field.parse::<i64>() {
151 Value::Integer(int_val)
152 } else if let Ok(float_val) = field.parse::<f64>() {
153 Value::Float(float_val)
154 } else if let Ok(bool_val) = field.parse::<bool>() {
155 Value::Boolean(bool_val)
156 } else {
157 Value::Text(field.to_string())
158 }
159 })
160 .collect();
161
162 rows.push(Row::new(values));
163 }
164
165 Ok((columns, rows))
166 }
167
168 fn parse_json_file(&self, file_path: &Path) -> NirvResult<(Vec<ColumnMetadata>, Vec<Row>)> {
170 let content = fs::read_to_string(file_path)
171 .map_err(|e| ConnectorError::QueryExecutionFailed(
172 format!("Failed to read JSON file: {}", e)
173 ))?;
174
175 let json_data: serde_json::Value = serde_json::from_str(&content)
176 .map_err(|e| ConnectorError::QueryExecutionFailed(
177 format!("Failed to parse JSON: {}", e)
178 ))?;
179
180 match json_data {
181 serde_json::Value::Array(array) => {
182 if array.is_empty() {
183 return Ok((Vec::new(), Vec::new()));
184 }
185
186 let mut columns = Vec::new();
188 if let Some(first_obj) = array.first() {
189 if let serde_json::Value::Object(obj) = first_obj {
190 for key in obj.keys() {
191 columns.push(ColumnMetadata {
192 name: key.clone(),
193 data_type: DataType::Text, nullable: true,
195 });
196 }
197 }
198 }
199
200 let mut rows = Vec::new();
202 for item in array {
203 if let serde_json::Value::Object(obj) = item {
204 let mut values = Vec::new();
205 for column in &columns {
206 let value = obj.get(&column.name)
207 .map(|v| self.json_value_to_value(v))
208 .unwrap_or(Value::Null);
209 values.push(value);
210 }
211 rows.push(Row::new(values));
212 }
213 }
214
215 Ok((columns, rows))
216 }
217 _ => Err(ConnectorError::QueryExecutionFailed(
218 "JSON file must contain an array of objects".to_string()
219 ).into())
220 }
221 }
222
223 fn json_value_to_value(&self, json_val: &serde_json::Value) -> Value {
225 match json_val {
226 serde_json::Value::Null => Value::Null,
227 serde_json::Value::Bool(b) => Value::Boolean(*b),
228 serde_json::Value::Number(n) => {
229 if let Some(i) = n.as_i64() {
230 Value::Integer(i)
231 } else if let Some(f) = n.as_f64() {
232 Value::Float(f)
233 } else {
234 Value::Text(n.to_string())
235 }
236 }
237 serde_json::Value::String(s) => Value::Text(s.clone()),
238 serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
239 Value::Json(json_val.to_string())
240 }
241 }
242 }
243
244 fn apply_predicates(&self, columns: &[ColumnMetadata], rows: Vec<Row>, predicates: &[crate::utils::types::Predicate]) -> Vec<Row> {
246 if predicates.is_empty() {
247 return rows;
248 }
249
250 rows.into_iter()
251 .filter(|row| {
252 predicates.iter().all(|predicate| {
253 let column_index = columns.iter()
255 .position(|col| col.name == predicate.column);
256
257 if let Some(index) = column_index {
258 if let Some(value) = row.values.get(index) {
259 self.evaluate_predicate(value, &predicate.operator, &predicate.value)
260 } else {
261 false
262 }
263 } else {
264 false }
266 })
267 })
268 .collect()
269 }
270
271 fn evaluate_predicate(&self, value: &Value, operator: &PredicateOperator, predicate_value: &PredicateValue) -> bool {
273 match operator {
274 PredicateOperator::Equal => self.values_equal(value, predicate_value),
275 PredicateOperator::NotEqual => !self.values_equal(value, predicate_value),
276 PredicateOperator::GreaterThan => self.value_greater_than(value, predicate_value),
277 PredicateOperator::GreaterThanOrEqual => {
278 self.value_greater_than(value, predicate_value) || self.values_equal(value, predicate_value)
279 }
280 PredicateOperator::LessThan => self.value_less_than(value, predicate_value),
281 PredicateOperator::LessThanOrEqual => {
282 self.value_less_than(value, predicate_value) || self.values_equal(value, predicate_value)
283 }
284 PredicateOperator::Like => self.value_like(value, predicate_value),
285 PredicateOperator::In => self.value_in(value, predicate_value),
286 PredicateOperator::IsNull => matches!(value, Value::Null),
287 PredicateOperator::IsNotNull => !matches!(value, Value::Null),
288 }
289 }
290
291 fn values_equal(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
293 match (value, predicate_value) {
294 (Value::Text(v), PredicateValue::String(p)) => v == p,
295 (Value::Integer(v), PredicateValue::Integer(p)) => v == p,
296 (Value::Float(v), PredicateValue::Number(p)) => (v - p).abs() < f64::EPSILON,
297 (Value::Boolean(v), PredicateValue::Boolean(p)) => v == p,
298 (Value::Null, PredicateValue::Null) => true,
299 (Value::Integer(v), PredicateValue::Number(p)) => (*v as f64 - p).abs() < f64::EPSILON,
301 (Value::Float(v), PredicateValue::Integer(p)) => (v - *p as f64).abs() < f64::EPSILON,
302 _ => false,
303 }
304 }
305
306 fn value_greater_than(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
308 match (value, predicate_value) {
309 (Value::Integer(v), PredicateValue::Integer(p)) => v > p,
310 (Value::Float(v), PredicateValue::Number(p)) => v > p,
311 (Value::Integer(v), PredicateValue::Number(p)) => (*v as f64) > *p,
312 (Value::Float(v), PredicateValue::Integer(p)) => *v > (*p as f64),
313 (Value::Text(v), PredicateValue::String(p)) => v > p,
314 _ => false,
315 }
316 }
317
318 fn value_less_than(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
320 match (value, predicate_value) {
321 (Value::Integer(v), PredicateValue::Integer(p)) => v < p,
322 (Value::Float(v), PredicateValue::Number(p)) => v < p,
323 (Value::Integer(v), PredicateValue::Number(p)) => (*v as f64) < *p,
324 (Value::Float(v), PredicateValue::Integer(p)) => *v < (*p as f64),
325 (Value::Text(v), PredicateValue::String(p)) => v < p,
326 _ => false,
327 }
328 }
329
330 fn value_like(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
332 match (value, predicate_value) {
333 (Value::Text(v), PredicateValue::String(pattern)) => {
334 let regex_pattern = pattern
336 .replace('%', ".*")
337 .replace('_', ".");
338
339 if let Ok(regex) = regex::Regex::new(&format!("^{}$", regex_pattern)) {
340 regex.is_match(v)
341 } else {
342 false
343 }
344 }
345 _ => false,
346 }
347 }
348
349 fn value_in(&self, value: &Value, predicate_value: &PredicateValue) -> bool {
351 match predicate_value {
352 PredicateValue::List(list) => {
353 list.iter().any(|item| self.values_equal(value, item))
354 }
355 _ => false,
356 }
357 }
358}
359
360impl Default for FileConnector {
361 fn default() -> Self {
362 Self::new()
363 }
364}
365
366#[async_trait]
367impl Connector for FileConnector {
368 async fn connect(&mut self, config: ConnectorInitConfig) -> NirvResult<()> {
369 let base_path_str = config.connection_params.get("base_path")
370 .ok_or_else(|| ConnectorError::ConnectionFailed(
371 "base_path parameter is required".to_string()
372 ))?;
373
374 let base_path = PathBuf::from(base_path_str);
375
376 if !base_path.exists() {
377 return Err(ConnectorError::ConnectionFailed(
378 format!("Base path does not exist: {}", base_path_str)
379 ).into());
380 }
381
382 if !base_path.is_dir() {
383 return Err(ConnectorError::ConnectionFailed(
384 format!("Base path is not a directory: {}", base_path_str)
385 ).into());
386 }
387
388 if let Some(extensions_str) = config.connection_params.get("file_extensions") {
390 self.supported_extensions = extensions_str
391 .split(',')
392 .map(|ext| ext.trim().to_lowercase())
393 .collect();
394 }
395
396 self.base_path = Some(base_path);
397 self.connected = true;
398
399 Ok(())
400 }
401
402 async fn execute_query(&self, query: ConnectorQuery) -> NirvResult<QueryResult> {
403 if !self.connected {
404 return Err(ConnectorError::ConnectionFailed(
405 "File connector is not connected".to_string()
406 ).into());
407 }
408
409 if query.query.sources.is_empty() {
410 return Err(ConnectorError::QueryExecutionFailed(
411 "No data source specified in query".to_string()
412 ).into());
413 }
414
415 let source = &query.query.sources[0]; let file_paths = self.resolve_file_path(&source.identifier)?;
417
418 let mut all_columns: Option<Vec<ColumnMetadata>> = None;
419 let mut all_rows = Vec::new();
420
421 for file_path in file_paths {
423 let (columns, mut rows) = if let Some(ext) = file_path.extension() {
424 match ext.to_string_lossy().to_lowercase().as_str() {
425 "csv" => self.parse_csv_file(&file_path)?,
426 "json" => self.parse_json_file(&file_path)?,
427 _ => return Err(ConnectorError::UnsupportedOperation(
428 format!("Unsupported file extension: {}", ext.to_string_lossy())
429 ).into()),
430 }
431 } else {
432 return Err(ConnectorError::UnsupportedOperation(
433 "File has no extension".to_string()
434 ).into());
435 };
436
437 rows = self.apply_predicates(&columns, rows, &query.query.predicates);
439
440 if let Some(ref existing_columns) = all_columns {
442 if existing_columns.len() != columns.len() ||
443 existing_columns.iter().zip(columns.iter()).any(|(a, b)| a.name != b.name) {
444 return Err(ConnectorError::QueryExecutionFailed(
445 "Schema mismatch between files in pattern".to_string()
446 ).into());
447 }
448 } else {
449 all_columns = Some(columns);
450 }
451
452 all_rows.extend(rows);
453 }
454
455 let columns = all_columns.unwrap_or_default();
456
457 if let Some(limit) = query.query.limit {
459 all_rows.truncate(limit as usize);
460 }
461
462 Ok(QueryResult {
463 columns,
464 rows: all_rows,
465 affected_rows: None,
466 execution_time: std::time::Duration::from_millis(0), })
468 }
469
470 async fn get_schema(&self, object_name: &str) -> NirvResult<Schema> {
471 if !self.connected {
472 return Err(ConnectorError::ConnectionFailed(
473 "File connector is not connected".to_string()
474 ).into());
475 }
476
477 let file_paths = self.resolve_file_path(object_name)?;
478
479 if file_paths.is_empty() {
480 return Err(ConnectorError::SchemaRetrievalFailed(
481 format!("No files found for: {}", object_name)
482 ).into());
483 }
484
485 let file_path = &file_paths[0];
487
488 let (columns, _) = if let Some(ext) = file_path.extension() {
489 match ext.to_string_lossy().to_lowercase().as_str() {
490 "csv" => self.parse_csv_file(file_path)?,
491 "json" => self.parse_json_file(file_path)?,
492 _ => return Err(ConnectorError::UnsupportedOperation(
493 format!("Unsupported file extension: {}", ext.to_string_lossy())
494 ).into()),
495 }
496 } else {
497 return Err(ConnectorError::UnsupportedOperation(
498 "File has no extension".to_string()
499 ).into());
500 };
501
502 Ok(Schema {
503 name: object_name.to_string(),
504 columns,
505 primary_key: None,
506 indexes: Vec::new(),
507 })
508 }
509
510 async fn disconnect(&mut self) -> NirvResult<()> {
511 self.base_path = None;
512 self.connected = false;
513 Ok(())
514 }
515
516 fn get_connector_type(&self) -> ConnectorType {
517 ConnectorType::File
518 }
519
520 fn supports_transactions(&self) -> bool {
521 false }
523
524 fn is_connected(&self) -> bool {
525 self.connected
526 }
527
528 fn get_capabilities(&self) -> ConnectorCapabilities {
529 ConnectorCapabilities {
530 supports_joins: false, supports_aggregations: true, supports_subqueries: false,
533 supports_transactions: false,
534 supports_schema_introspection: true,
535 max_concurrent_queries: Some(1), }
537 }
538}