nirv_engine/connectors/
mock_connector.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::time::{Duration, Instant};
4use crate::connectors::connector_trait::{Connector, ConnectorInitConfig, ConnectorCapabilities};
5use crate::utils::{
6    types::{
7        ConnectorType, ConnectorQuery, QueryResult, Schema, ColumnMetadata, 
8        DataType, Row, Value, Index, QueryOperation, PredicateOperator
9    },
10    error::{ConnectorError, NirvResult},
11};
12
13/// Mock connector for testing with deterministic in-memory data
14#[derive(Debug)]
15pub struct MockConnector {
16    connected: bool,
17    test_data: HashMap<String, TestTable>,
18    connection_delay_ms: u64,
19}
20
21/// Test table structure for mock data
22#[derive(Debug, Clone)]
23struct TestTable {
24    schema: Schema,
25    rows: Vec<Row>,
26}
27
28impl MockConnector {
29    /// Create a new mock connector with default test data
30    pub fn new() -> Self {
31        let mut connector = Self {
32            connected: false,
33            test_data: HashMap::new(),
34            connection_delay_ms: 10, // Simulate small connection delay
35        };
36        
37        connector.initialize_test_data();
38        connector
39    }
40    
41    /// Create a mock connector with custom connection delay
42    pub fn with_delay(delay_ms: u64) -> Self {
43        let mut connector = Self::new();
44        connector.connection_delay_ms = delay_ms;
45        connector
46    }
47    
48    /// Add custom test data for a table
49    pub fn add_test_data(&mut self, table_name: &str, rows: Vec<Vec<Value>>) {
50        self.add_test_data_with_schema(table_name, rows, None);
51    }
52    
53    /// Add custom test data for a table with column names
54    pub fn add_test_data_with_columns(&mut self, table_name: &str, column_names: Vec<&str>, rows: Vec<Vec<Value>>) {
55        // Create schema with provided column names
56        let columns = if let Some(first_row) = rows.first() {
57            first_row.iter().enumerate().map(|(i, value)| {
58                let data_type = match value {
59                    Value::Integer(_) => DataType::Integer,
60                    Value::Float(_) => DataType::Float,
61                    Value::Text(_) => DataType::Text,
62                    Value::Boolean(_) => DataType::Boolean,
63                    Value::Date(_) => DataType::Date,
64                    Value::DateTime(_) => DataType::DateTime,
65                    Value::Json(_) => DataType::Json,
66                    Value::Binary(_) => DataType::Binary,
67                    Value::Null => DataType::Text, // Default for null
68                };
69                
70                let column_name = column_names.get(i).unwrap_or(&format!("column_{}", i).as_str()).to_string();
71                
72                ColumnMetadata {
73                    name: column_name,
74                    data_type,
75                    nullable: true,
76                }
77            }).collect()
78        } else {
79            vec![]
80        };
81        
82        let schema = Schema {
83            name: table_name.to_string(),
84            columns,
85            primary_key: None,
86            indexes: vec![],
87        };
88        
89        let table_rows: Vec<Row> = rows.into_iter().map(Row::new).collect();
90        
91        self.test_data.insert(table_name.to_string(), TestTable {
92            schema,
93            rows: table_rows,
94        });
95    }
96    
97    /// Add custom test data for a table with optional schema
98    fn add_test_data_with_schema(&mut self, table_name: &str, rows: Vec<Vec<Value>>, _schema: Option<Schema>) {
99        // Create a simple schema based on the first row
100        let columns = if let Some(first_row) = rows.first() {
101            first_row.iter().enumerate().map(|(i, value)| {
102                let data_type = match value {
103                    Value::Integer(_) => DataType::Integer,
104                    Value::Float(_) => DataType::Float,
105                    Value::Text(_) => DataType::Text,
106                    Value::Boolean(_) => DataType::Boolean,
107                    Value::Date(_) => DataType::Date,
108                    Value::DateTime(_) => DataType::DateTime,
109                    Value::Json(_) => DataType::Json,
110                    Value::Binary(_) => DataType::Binary,
111                    Value::Null => DataType::Text, // Default for null
112                };
113                
114                ColumnMetadata {
115                    name: format!("column_{}", i),
116                    data_type,
117                    nullable: true,
118                }
119            }).collect()
120        } else {
121            vec![]
122        };
123        
124        let schema = Schema {
125            name: table_name.to_string(),
126            columns,
127            primary_key: None,
128            indexes: vec![],
129        };
130        
131        let table_rows: Vec<Row> = rows.into_iter().map(Row::new).collect();
132        
133        self.test_data.insert(table_name.to_string(), TestTable {
134            schema,
135            rows: table_rows,
136        });
137    }
138    
139    /// Initialize deterministic test data
140    fn initialize_test_data(&mut self) {
141        // Users table
142        let users_schema = Schema {
143            name: "users".to_string(),
144            columns: vec![
145                ColumnMetadata {
146                    name: "id".to_string(),
147                    data_type: DataType::Integer,
148                    nullable: false,
149                },
150                ColumnMetadata {
151                    name: "name".to_string(),
152                    data_type: DataType::Text,
153                    nullable: false,
154                },
155                ColumnMetadata {
156                    name: "email".to_string(),
157                    data_type: DataType::Text,
158                    nullable: true,
159                },
160                ColumnMetadata {
161                    name: "age".to_string(),
162                    data_type: DataType::Integer,
163                    nullable: true,
164                },
165                ColumnMetadata {
166                    name: "active".to_string(),
167                    data_type: DataType::Boolean,
168                    nullable: false,
169                },
170            ],
171            primary_key: Some(vec!["id".to_string()]),
172            indexes: vec![
173                Index {
174                    name: "idx_users_email".to_string(),
175                    columns: vec!["email".to_string()],
176                    unique: true,
177                },
178            ],
179        };
180        
181        let users_rows = vec![
182            Row::new(vec![
183                Value::Integer(1),
184                Value::Text("Alice Johnson".to_string()),
185                Value::Text("alice@example.com".to_string()),
186                Value::Integer(30),
187                Value::Boolean(true),
188            ]),
189            Row::new(vec![
190                Value::Integer(2),
191                Value::Text("Bob Smith".to_string()),
192                Value::Text("bob@example.com".to_string()),
193                Value::Integer(25),
194                Value::Boolean(true),
195            ]),
196            Row::new(vec![
197                Value::Integer(3),
198                Value::Text("Charlie Brown".to_string()),
199                Value::Null,
200                Value::Integer(35),
201                Value::Boolean(false),
202            ]),
203        ];
204        
205        self.test_data.insert("users".to_string(), TestTable {
206            schema: users_schema,
207            rows: users_rows,
208        });
209        
210        // Products table
211        let products_schema = Schema {
212            name: "products".to_string(),
213            columns: vec![
214                ColumnMetadata {
215                    name: "id".to_string(),
216                    data_type: DataType::Integer,
217                    nullable: false,
218                },
219                ColumnMetadata {
220                    name: "name".to_string(),
221                    data_type: DataType::Text,
222                    nullable: false,
223                },
224                ColumnMetadata {
225                    name: "price".to_string(),
226                    data_type: DataType::Float,
227                    nullable: false,
228                },
229                ColumnMetadata {
230                    name: "category".to_string(),
231                    data_type: DataType::Text,
232                    nullable: true,
233                },
234            ],
235            primary_key: Some(vec!["id".to_string()]),
236            indexes: vec![],
237        };
238        
239        let products_rows = vec![
240            Row::new(vec![
241                Value::Integer(1),
242                Value::Text("Laptop".to_string()),
243                Value::Float(999.99),
244                Value::Text("Electronics".to_string()),
245            ]),
246            Row::new(vec![
247                Value::Integer(2),
248                Value::Text("Coffee Mug".to_string()),
249                Value::Float(12.50),
250                Value::Text("Kitchen".to_string()),
251            ]),
252        ];
253        
254        self.test_data.insert("products".to_string(), TestTable {
255            schema: products_schema,
256            rows: products_rows,
257        });
258    }
259    
260    /// Apply WHERE clause filtering to rows
261    fn apply_filters(&self, rows: &[Row], query: &ConnectorQuery) -> Vec<Row> {
262        if query.query.predicates.is_empty() {
263            return rows.to_vec();
264        }
265        
266        let table_name = if let Some(source) = query.query.sources.first() {
267            &source.identifier
268        } else {
269            return rows.to_vec();
270        };
271        
272        let schema = if let Some(table) = self.test_data.get(table_name) {
273            &table.schema
274        } else {
275            return rows.to_vec();
276        };
277        
278        rows.iter()
279            .filter(|row| {
280                query.query.predicates.iter().all(|predicate| {
281                    // Find column index
282                    let col_index = schema.columns.iter()
283                        .position(|col| col.name == predicate.column);
284                    
285                    if let Some(index) = col_index {
286                        if let Some(value) = row.get(index) {
287                            self.evaluate_predicate(value, &predicate.operator, &predicate.value)
288                        } else {
289                            false
290                        }
291                    } else {
292                        false
293                    }
294                })
295            })
296            .cloned()
297            .collect()
298    }
299    
300    /// Evaluate a single predicate against a value
301    fn evaluate_predicate(&self, value: &Value, operator: &PredicateOperator, predicate_value: &crate::utils::types::PredicateValue) -> bool {
302        use crate::utils::types::PredicateValue;
303        
304        match operator {
305            PredicateOperator::Equal => {
306                match (value, predicate_value) {
307                    (Value::Integer(v), PredicateValue::Integer(p)) => v == p,
308                    (Value::Text(v), PredicateValue::String(p)) => v == p,
309                    (Value::Float(v), PredicateValue::Number(p)) => (v - p).abs() < f64::EPSILON,
310                    (Value::Boolean(v), PredicateValue::Boolean(p)) => v == p,
311                    (Value::Null, PredicateValue::Null) => true,
312                    _ => false,
313                }
314            },
315            PredicateOperator::NotEqual => !self.evaluate_predicate(value, &PredicateOperator::Equal, predicate_value),
316            PredicateOperator::GreaterThan => {
317                match (value, predicate_value) {
318                    (Value::Integer(v), PredicateValue::Integer(p)) => v > p,
319                    (Value::Float(v), PredicateValue::Number(p)) => v > p,
320                    _ => false,
321                }
322            },
323            PredicateOperator::GreaterThanOrEqual => {
324                self.evaluate_predicate(value, &PredicateOperator::GreaterThan, predicate_value) ||
325                self.evaluate_predicate(value, &PredicateOperator::Equal, predicate_value)
326            },
327            PredicateOperator::LessThan => {
328                match (value, predicate_value) {
329                    (Value::Integer(v), PredicateValue::Integer(p)) => v < p,
330                    (Value::Float(v), PredicateValue::Number(p)) => v < p,
331                    _ => false,
332                }
333            },
334            PredicateOperator::LessThanOrEqual => {
335                self.evaluate_predicate(value, &PredicateOperator::LessThan, predicate_value) ||
336                self.evaluate_predicate(value, &PredicateOperator::Equal, predicate_value)
337            },
338            PredicateOperator::Like => {
339                match (value, predicate_value) {
340                    (Value::Text(v), PredicateValue::String(p)) => {
341                        // Simple LIKE implementation (% as wildcard)
342                        let pattern = p.replace('%', ".*");
343                        regex::Regex::new(&pattern).map(|re| re.is_match(v)).unwrap_or(false)
344                    },
345                    _ => false,
346                }
347            },
348            PredicateOperator::IsNull => matches!(value, Value::Null),
349            PredicateOperator::IsNotNull => !matches!(value, Value::Null),
350            PredicateOperator::In => {
351                if let PredicateValue::List(values) = predicate_value {
352                    values.iter().any(|pv| self.evaluate_predicate(value, &PredicateOperator::Equal, pv))
353                } else {
354                    false
355                }
356            },
357        }
358    }
359    
360    /// Apply LIMIT clause to rows
361    fn apply_limit(&self, rows: Vec<Row>, limit: Option<u64>) -> Vec<Row> {
362        if let Some(limit_count) = limit {
363            rows.into_iter().take(limit_count as usize).collect()
364        } else {
365            rows
366        }
367    }
368}
369
370impl Default for MockConnector {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376#[async_trait]
377impl Connector for MockConnector {
378    async fn connect(&mut self, _config: ConnectorInitConfig) -> NirvResult<()> {
379        // Simulate connection delay
380        if self.connection_delay_ms > 0 {
381            tokio::time::sleep(Duration::from_millis(self.connection_delay_ms)).await;
382        }
383        
384        self.connected = true;
385        Ok(())
386    }
387    
388    async fn execute_query(&self, query: ConnectorQuery) -> NirvResult<QueryResult> {
389        if !self.connected {
390            return Err(ConnectorError::ConnectionFailed("Not connected".to_string()).into());
391        }
392        
393        let start_time = Instant::now();
394        
395        // Add a small delay to ensure execution time is recorded
396        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
397        
398        match query.query.operation {
399            QueryOperation::Select => {
400                if let Some(source) = query.query.sources.first() {
401                    if let Some(table) = self.test_data.get(&source.identifier) {
402                        let filtered_rows = self.apply_filters(&table.rows, &query);
403                        // Note: Limit is handled by the query executor, not the connector
404                        
405                        let result = QueryResult {
406                            columns: table.schema.columns.clone(),
407                            rows: filtered_rows,
408                            affected_rows: None,
409                            execution_time: start_time.elapsed(),
410                        };
411                        
412                        Ok(result)
413                    } else {
414                        Err(ConnectorError::QueryExecutionFailed(
415                            format!("Table '{}' not found", source.identifier)
416                        ).into())
417                    }
418                } else {
419                    Err(ConnectorError::QueryExecutionFailed(
420                        "No data source specified in query".to_string()
421                    ).into())
422                }
423            },
424            _ => Err(ConnectorError::UnsupportedOperation(
425                format!("Operation {:?} not supported by MockConnector", query.query.operation)
426            ).into()),
427        }
428    }
429    
430    async fn get_schema(&self, object_name: &str) -> NirvResult<Schema> {
431        if !self.connected {
432            return Err(ConnectorError::ConnectionFailed("Not connected".to_string()).into());
433        }
434        
435        if let Some(table) = self.test_data.get(object_name) {
436            Ok(table.schema.clone())
437        } else {
438            Err(ConnectorError::SchemaRetrievalFailed(
439                format!("Object '{}' not found", object_name)
440            ).into())
441        }
442    }
443    
444    async fn disconnect(&mut self) -> NirvResult<()> {
445        self.connected = false;
446        Ok(())
447    }
448    
449    fn get_connector_type(&self) -> ConnectorType {
450        ConnectorType::Mock
451    }
452    
453    fn supports_transactions(&self) -> bool {
454        false
455    }
456    
457    fn is_connected(&self) -> bool {
458        self.connected
459    }
460    
461    fn get_capabilities(&self) -> ConnectorCapabilities {
462        ConnectorCapabilities {
463            supports_joins: false,
464            supports_aggregations: false,
465            supports_subqueries: false,
466            supports_transactions: false,
467            supports_schema_introspection: true,
468            max_concurrent_queries: Some(10),
469        }
470    }
471}#[
472cfg(test)]
473mod tests {
474    use super::*;
475    use crate::utils::types::{
476        InternalQuery, QueryOperation, DataSource, Predicate, PredicateOperator, PredicateValue
477    };
478
479    #[tokio::test]
480    async fn test_mock_connector_creation() {
481        let connector = MockConnector::new();
482        
483        assert!(!connector.is_connected());
484        assert_eq!(connector.get_connector_type(), ConnectorType::Mock);
485        assert!(!connector.supports_transactions());
486        assert_eq!(connector.test_data.len(), 2); // users and products tables
487    }
488
489    #[tokio::test]
490    async fn test_mock_connector_with_delay() {
491        let connector = MockConnector::with_delay(50);
492        
493        assert!(!connector.is_connected());
494        assert_eq!(connector.connection_delay_ms, 50);
495    }
496
497    #[tokio::test]
498    async fn test_mock_connector_connection_lifecycle() {
499        let mut connector = MockConnector::new();
500        let config = ConnectorInitConfig::new();
501        
502        // Initially not connected
503        assert!(!connector.is_connected());
504        
505        // Connect
506        let result = connector.connect(config).await;
507        assert!(result.is_ok());
508        assert!(connector.is_connected());
509        
510        // Disconnect
511        let result = connector.disconnect().await;
512        assert!(result.is_ok());
513        assert!(!connector.is_connected());
514    }
515
516    #[tokio::test]
517    async fn test_mock_connector_connection_delay() {
518        let mut connector = MockConnector::with_delay(10);
519        let config = ConnectorInitConfig::new();
520        
521        let start = std::time::Instant::now();
522        let result = connector.connect(config).await;
523        let elapsed = start.elapsed();
524        
525        assert!(result.is_ok());
526        assert!(connector.is_connected());
527        assert!(elapsed >= std::time::Duration::from_millis(10));
528    }
529
530    #[tokio::test]
531    async fn test_mock_connector_query_without_connection() {
532        let connector = MockConnector::new();
533        let query = ConnectorQuery {
534            connector_type: ConnectorType::Mock,
535            query: InternalQuery::new(QueryOperation::Select),
536            connection_params: std::collections::HashMap::new(),
537        };
538        
539        let result = connector.execute_query(query).await;
540        assert!(result.is_err());
541        
542        match result.unwrap_err() {
543            crate::utils::error::NirvError::Connector(ConnectorError::ConnectionFailed(msg)) => {
544                assert_eq!(msg, "Not connected");
545            }
546            _ => panic!("Expected ConnectionFailed error"),
547        }
548    }
549
550    #[tokio::test]
551    async fn test_mock_connector_select_users_table() {
552        let mut connector = MockConnector::new();
553        let config = ConnectorInitConfig::new();
554        connector.connect(config).await.unwrap();
555        
556        let mut query = InternalQuery::new(QueryOperation::Select);
557        query.sources.push(DataSource {
558            object_type: "mock".to_string(),
559            identifier: "users".to_string(),
560            alias: None,
561        });
562        
563        let connector_query = ConnectorQuery {
564            connector_type: ConnectorType::Mock,
565            query,
566            connection_params: std::collections::HashMap::new(),
567        };
568        
569        let result = connector.execute_query(connector_query).await;
570        assert!(result.is_ok());
571        
572        let query_result = result.unwrap();
573        assert_eq!(query_result.columns.len(), 5); // id, name, email, age, active
574        assert_eq!(query_result.rows.len(), 3); // Alice, Bob, Charlie
575        assert!(query_result.execution_time > std::time::Duration::from_nanos(0));
576        
577        // Check first row (Alice)
578        let first_row = &query_result.rows[0];
579        assert_eq!(first_row.get(0), Some(&Value::Integer(1)));
580        assert_eq!(first_row.get(1), Some(&Value::Text("Alice Johnson".to_string())));
581        assert_eq!(first_row.get(2), Some(&Value::Text("alice@example.com".to_string())));
582        assert_eq!(first_row.get(3), Some(&Value::Integer(30)));
583        assert_eq!(first_row.get(4), Some(&Value::Boolean(true)));
584    }
585
586    #[tokio::test]
587    async fn test_mock_connector_select_products_table() {
588        let mut connector = MockConnector::new();
589        let config = ConnectorInitConfig::new();
590        connector.connect(config).await.unwrap();
591        
592        let mut query = InternalQuery::new(QueryOperation::Select);
593        query.sources.push(DataSource {
594            object_type: "mock".to_string(),
595            identifier: "products".to_string(),
596            alias: None,
597        });
598        
599        let connector_query = ConnectorQuery {
600            connector_type: ConnectorType::Mock,
601            query,
602            connection_params: std::collections::HashMap::new(),
603        };
604        
605        let result = connector.execute_query(connector_query).await;
606        assert!(result.is_ok());
607        
608        let query_result = result.unwrap();
609        assert_eq!(query_result.columns.len(), 4); // id, name, price, category
610        assert_eq!(query_result.rows.len(), 2); // Laptop, Coffee Mug
611        
612        // Check first row (Laptop)
613        let first_row = &query_result.rows[0];
614        assert_eq!(first_row.get(0), Some(&Value::Integer(1)));
615        assert_eq!(first_row.get(1), Some(&Value::Text("Laptop".to_string())));
616        assert_eq!(first_row.get(2), Some(&Value::Float(999.99)));
617        assert_eq!(first_row.get(3), Some(&Value::Text("Electronics".to_string())));
618    }
619
620    #[tokio::test]
621    async fn test_mock_connector_query_non_existent_table() {
622        let mut connector = MockConnector::new();
623        let config = ConnectorInitConfig::new();
624        connector.connect(config).await.unwrap();
625        
626        let mut query = InternalQuery::new(QueryOperation::Select);
627        query.sources.push(DataSource {
628            object_type: "mock".to_string(),
629            identifier: "non_existent".to_string(),
630            alias: None,
631        });
632        
633        let connector_query = ConnectorQuery {
634            connector_type: ConnectorType::Mock,
635            query,
636            connection_params: std::collections::HashMap::new(),
637        };
638        
639        let result = connector.execute_query(connector_query).await;
640        assert!(result.is_err());
641        
642        match result.unwrap_err() {
643            crate::utils::error::NirvError::Connector(ConnectorError::QueryExecutionFailed(msg)) => {
644                assert!(msg.contains("Table 'non_existent' not found"));
645            }
646            _ => panic!("Expected QueryExecutionFailed error"),
647        }
648    }
649
650    #[tokio::test]
651    async fn test_mock_connector_query_with_where_clause() {
652        let mut connector = MockConnector::new();
653        let config = ConnectorInitConfig::new();
654        connector.connect(config).await.unwrap();
655        
656        let mut query = InternalQuery::new(QueryOperation::Select);
657        query.sources.push(DataSource {
658            object_type: "mock".to_string(),
659            identifier: "users".to_string(),
660            alias: None,
661        });
662        
663        // Add WHERE age > 25
664        query.predicates.push(Predicate {
665            column: "age".to_string(),
666            operator: PredicateOperator::GreaterThan,
667            value: PredicateValue::Integer(25),
668        });
669        
670        let connector_query = ConnectorQuery {
671            connector_type: ConnectorType::Mock,
672            query,
673            connection_params: std::collections::HashMap::new(),
674        };
675        
676        let result = connector.execute_query(connector_query).await;
677        assert!(result.is_ok());
678        
679        let query_result = result.unwrap();
680        assert_eq!(query_result.rows.len(), 2); // Alice (30) and Charlie (35)
681        
682        // Check that returned users have age > 25
683        for row in &query_result.rows {
684            if let Some(Value::Integer(age)) = row.get(3) {
685                assert!(*age > 25);
686            }
687        }
688    }
689
690    #[tokio::test]
691    async fn test_mock_connector_query_with_limit() {
692        let mut connector = MockConnector::new();
693        let config = ConnectorInitConfig::new();
694        connector.connect(config).await.unwrap();
695        
696        let mut query = InternalQuery::new(QueryOperation::Select);
697        query.sources.push(DataSource {
698            object_type: "mock".to_string(),
699            identifier: "users".to_string(),
700            alias: None,
701        });
702        query.limit = Some(2);
703        
704        let connector_query = ConnectorQuery {
705            connector_type: ConnectorType::Mock,
706            query,
707            connection_params: std::collections::HashMap::new(),
708        };
709        
710        let result = connector.execute_query(connector_query).await;
711        assert!(result.is_ok());
712        
713        let query_result = result.unwrap();
714        assert_eq!(query_result.rows.len(), 2); // Limited to 2 rows
715    }
716
717    #[tokio::test]
718    async fn test_mock_connector_query_with_equal_predicate() {
719        let mut connector = MockConnector::new();
720        let config = ConnectorInitConfig::new();
721        connector.connect(config).await.unwrap();
722        
723        let mut query = InternalQuery::new(QueryOperation::Select);
724        query.sources.push(DataSource {
725            object_type: "mock".to_string(),
726            identifier: "users".to_string(),
727            alias: None,
728        });
729        
730        // Add WHERE name = 'Alice Johnson'
731        query.predicates.push(Predicate {
732            column: "name".to_string(),
733            operator: PredicateOperator::Equal,
734            value: PredicateValue::String("Alice Johnson".to_string()),
735        });
736        
737        let connector_query = ConnectorQuery {
738            connector_type: ConnectorType::Mock,
739            query,
740            connection_params: std::collections::HashMap::new(),
741        };
742        
743        let result = connector.execute_query(connector_query).await;
744        assert!(result.is_ok());
745        
746        let query_result = result.unwrap();
747        assert_eq!(query_result.rows.len(), 1); // Only Alice
748        
749        let alice_row = &query_result.rows[0];
750        assert_eq!(alice_row.get(1), Some(&Value::Text("Alice Johnson".to_string())));
751    }
752
753    #[tokio::test]
754    async fn test_mock_connector_query_with_null_predicate() {
755        let mut connector = MockConnector::new();
756        let config = ConnectorInitConfig::new();
757        connector.connect(config).await.unwrap();
758        
759        let mut query = InternalQuery::new(QueryOperation::Select);
760        query.sources.push(DataSource {
761            object_type: "mock".to_string(),
762            identifier: "users".to_string(),
763            alias: None,
764        });
765        
766        // Add WHERE email IS NULL
767        query.predicates.push(Predicate {
768            column: "email".to_string(),
769            operator: PredicateOperator::IsNull,
770            value: PredicateValue::Null,
771        });
772        
773        let connector_query = ConnectorQuery {
774            connector_type: ConnectorType::Mock,
775            query,
776            connection_params: std::collections::HashMap::new(),
777        };
778        
779        let result = connector.execute_query(connector_query).await;
780        assert!(result.is_ok());
781        
782        let query_result = result.unwrap();
783        assert_eq!(query_result.rows.len(), 1); // Only Charlie has null email
784        
785        let charlie_row = &query_result.rows[0];
786        assert_eq!(charlie_row.get(1), Some(&Value::Text("Charlie Brown".to_string())));
787        assert_eq!(charlie_row.get(2), Some(&Value::Null));
788    }
789
790    #[tokio::test]
791    async fn test_mock_connector_unsupported_operation() {
792        let mut connector = MockConnector::new();
793        let config = ConnectorInitConfig::new();
794        connector.connect(config).await.unwrap();
795        
796        let query = InternalQuery::new(QueryOperation::Insert);
797        let connector_query = ConnectorQuery {
798            connector_type: ConnectorType::Mock,
799            query,
800            connection_params: std::collections::HashMap::new(),
801        };
802        
803        let result = connector.execute_query(connector_query).await;
804        assert!(result.is_err());
805        
806        match result.unwrap_err() {
807            crate::utils::error::NirvError::Connector(ConnectorError::UnsupportedOperation(msg)) => {
808                assert!(msg.contains("Operation Insert not supported"));
809            }
810            _ => panic!("Expected UnsupportedOperation error"),
811        }
812    }
813
814    #[tokio::test]
815    async fn test_mock_connector_get_schema() {
816        let mut connector = MockConnector::new();
817        let config = ConnectorInitConfig::new();
818        connector.connect(config).await.unwrap();
819        
820        // Get users table schema
821        let result = connector.get_schema("users").await;
822        assert!(result.is_ok());
823        
824        let schema = result.unwrap();
825        assert_eq!(schema.name, "users");
826        assert_eq!(schema.columns.len(), 5);
827        assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
828        assert_eq!(schema.indexes.len(), 1);
829        
830        // Check column metadata
831        assert_eq!(schema.columns[0].name, "id");
832        assert_eq!(schema.columns[0].data_type, DataType::Integer);
833        assert!(!schema.columns[0].nullable);
834        
835        assert_eq!(schema.columns[1].name, "name");
836        assert_eq!(schema.columns[1].data_type, DataType::Text);
837        assert!(!schema.columns[1].nullable);
838        
839        assert_eq!(schema.columns[2].name, "email");
840        assert_eq!(schema.columns[2].data_type, DataType::Text);
841        assert!(schema.columns[2].nullable);
842    }
843
844    #[tokio::test]
845    async fn test_mock_connector_get_schema_non_existent() {
846        let mut connector = MockConnector::new();
847        let config = ConnectorInitConfig::new();
848        connector.connect(config).await.unwrap();
849        
850        let result = connector.get_schema("non_existent").await;
851        assert!(result.is_err());
852        
853        match result.unwrap_err() {
854            crate::utils::error::NirvError::Connector(ConnectorError::SchemaRetrievalFailed(msg)) => {
855                assert!(msg.contains("Object 'non_existent' not found"));
856            }
857            _ => panic!("Expected SchemaRetrievalFailed error"),
858        }
859    }
860
861    #[tokio::test]
862    async fn test_mock_connector_get_schema_without_connection() {
863        let connector = MockConnector::new();
864        
865        let result = connector.get_schema("users").await;
866        assert!(result.is_err());
867        
868        match result.unwrap_err() {
869            crate::utils::error::NirvError::Connector(ConnectorError::ConnectionFailed(msg)) => {
870                assert_eq!(msg, "Not connected");
871            }
872            _ => panic!("Expected ConnectionFailed error"),
873        }
874    }
875
876    #[tokio::test]
877    async fn test_mock_connector_capabilities() {
878        let connector = MockConnector::new();
879        let capabilities = connector.get_capabilities();
880        
881        assert!(!capabilities.supports_joins);
882        assert!(!capabilities.supports_aggregations);
883        assert!(!capabilities.supports_subqueries);
884        assert!(!capabilities.supports_transactions);
885        assert!(capabilities.supports_schema_introspection);
886        assert_eq!(capabilities.max_concurrent_queries, Some(10));
887    }
888
889    #[test]
890    fn test_mock_connector_default() {
891        let connector = MockConnector::default();
892        
893        assert!(!connector.is_connected());
894        assert_eq!(connector.get_connector_type(), ConnectorType::Mock);
895        assert_eq!(connector.test_data.len(), 2);
896    }
897}