1use crate::soch_ql::{SelectQuery, SochResult, SochValue, WhereClause};
46use parking_lot::RwLock;
47use std::collections::HashMap;
48use std::sync::Arc;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum VirtualColumnType {
57 Bool,
58 Int64,
59 UInt64,
60 Float64,
61 Text,
62 Binary,
63 Timestamp,
64 Json,
65}
66
67#[derive(Debug, Clone)]
69pub struct VirtualColumnDef {
70 pub name: String,
72 pub col_type: VirtualColumnType,
74 pub nullable: bool,
76 pub primary_key: bool,
78 pub description: Option<String>,
80}
81
82#[derive(Debug, Clone)]
84pub struct VirtualTableSchema {
85 pub name: String,
87 pub columns: Vec<VirtualColumnDef>,
89 pub estimated_rows: Option<u64>,
91 pub description: Option<String>,
93}
94
95pub trait VirtualTable: Send + Sync {
99 fn schema(&self) -> &VirtualTableSchema;
101
102 fn scan(
104 &self,
105 columns: &[String],
106 filter: Option<&VirtualFilter>,
107 limit: Option<usize>,
108 offset: Option<usize>,
109 ) -> Result<Vec<VirtualRow>, VirtualTableError>;
110
111 fn get(&self, key: &SochValue) -> Result<Option<VirtualRow>, VirtualTableError> {
113 let schema = self.schema();
115 let pk_col = schema
116 .columns
117 .iter()
118 .find(|c| c.primary_key)
119 .map(|c| c.name.clone());
120
121 if let Some(pk) = pk_col {
122 let filter = VirtualFilter::Eq(pk, key.clone());
123 let rows = self.scan(&[], Some(&filter), Some(1), None)?;
124 Ok(rows.into_iter().next())
125 } else {
126 Err(VirtualTableError::NoPrimaryKey)
127 }
128 }
129
130 fn stats(&self) -> VirtualTableStats {
132 VirtualTableStats {
133 row_count: self.schema().estimated_rows,
134 size_bytes: None,
135 last_modified: None,
136 }
137 }
138
139 fn refresh(&self) -> Result<(), VirtualTableError> {
141 Ok(()) }
143}
144
145#[derive(Debug, Clone)]
147pub struct VirtualRow {
148 pub values: Vec<SochValue>,
150}
151
152impl VirtualRow {
153 pub fn new(values: Vec<SochValue>) -> Self {
155 Self { values }
156 }
157
158 pub fn get(&self, idx: usize) -> Option<&SochValue> {
160 self.values.get(idx)
161 }
162
163 pub fn get_by_name<'a>(
165 &'a self,
166 name: &str,
167 schema: &VirtualTableSchema,
168 ) -> Option<&'a SochValue> {
169 schema
170 .columns
171 .iter()
172 .position(|c| c.name == name)
173 .and_then(|idx| self.values.get(idx))
174 }
175}
176
177#[derive(Debug, Clone)]
179pub enum VirtualFilter {
180 Eq(String, SochValue),
182 Ne(String, SochValue),
184 Lt(String, SochValue),
186 Le(String, SochValue),
188 Gt(String, SochValue),
190 Ge(String, SochValue),
192 Like(String, String),
194 In(String, Vec<SochValue>),
196 Between(String, SochValue, SochValue),
198 IsNull(String),
200 IsNotNull(String),
202 And(Vec<VirtualFilter>),
204 Or(Vec<VirtualFilter>),
206 Not(Box<VirtualFilter>),
208}
209
210impl VirtualFilter {
211 pub fn from_where_clause(where_clause: &WhereClause) -> Self {
213 let filters: Vec<VirtualFilter> = where_clause
214 .conditions
215 .iter()
216 .map(|c| {
217 use crate::soch_ql::ComparisonOp::*;
218 match c.operator {
219 Eq => VirtualFilter::Eq(c.column.clone(), c.value.clone()),
220 Ne => VirtualFilter::Ne(c.column.clone(), c.value.clone()),
221 Lt => VirtualFilter::Lt(c.column.clone(), c.value.clone()),
222 Le => VirtualFilter::Le(c.column.clone(), c.value.clone()),
223 Gt => VirtualFilter::Gt(c.column.clone(), c.value.clone()),
224 Ge => VirtualFilter::Ge(c.column.clone(), c.value.clone()),
225 Like => {
226 if let SochValue::Text(pattern) = &c.value {
227 VirtualFilter::Like(c.column.clone(), pattern.clone())
228 } else {
229 VirtualFilter::Like(c.column.clone(), "".to_string())
230 }
231 }
232 In => VirtualFilter::In(c.column.clone(), vec![c.value.clone()]),
233 SimilarTo => {
234 if let SochValue::Text(pattern) = &c.value {
237 VirtualFilter::Like(c.column.clone(), pattern.clone())
238 } else {
239 VirtualFilter::Like(c.column.clone(), "".to_string())
240 }
241 }
242 }
243 })
244 .collect();
245
246 match where_clause.operator {
247 crate::soch_ql::LogicalOp::And => VirtualFilter::And(filters),
248 crate::soch_ql::LogicalOp::Or => VirtualFilter::Or(filters),
249 }
250 }
251
252 pub fn matches(&self, row: &VirtualRow, schema: &VirtualTableSchema) -> bool {
254 match self {
255 VirtualFilter::Eq(col, value) => row
256 .get_by_name(col, schema)
257 .map(|v| v == value)
258 .unwrap_or(false),
259 VirtualFilter::Ne(col, value) => row
260 .get_by_name(col, schema)
261 .map(|v| v != value)
262 .unwrap_or(false),
263 VirtualFilter::Lt(col, value) => {
264 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a < b)
265 }
266 VirtualFilter::Le(col, value) => {
267 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a <= b)
268 }
269 VirtualFilter::Gt(col, value) => {
270 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a > b)
271 }
272 VirtualFilter::Ge(col, value) => {
273 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a >= b)
274 }
275 VirtualFilter::Like(col, pattern) => row
276 .get_by_name(col, schema)
277 .and_then(|v| match v {
278 SochValue::Text(s) => Some(Self::match_like(s, pattern)),
279 _ => None,
280 })
281 .unwrap_or(false),
282 VirtualFilter::In(col, values) => row
283 .get_by_name(col, schema)
284 .map(|v| values.contains(v))
285 .unwrap_or(false),
286 VirtualFilter::Between(col, low, high) => row
287 .get_by_name(col, schema)
288 .map(|v| {
289 Self::compare_values(Some(v), low, |a, b| a >= b)
290 && Self::compare_values(Some(v), high, |a, b| a <= b)
291 })
292 .unwrap_or(false),
293 VirtualFilter::IsNull(col) => row
294 .get_by_name(col, schema)
295 .map(|v| *v == SochValue::Null)
296 .unwrap_or(true),
297 VirtualFilter::IsNotNull(col) => row
298 .get_by_name(col, schema)
299 .map(|v| *v != SochValue::Null)
300 .unwrap_or(false),
301 VirtualFilter::And(filters) => filters.iter().all(|f| f.matches(row, schema)),
302 VirtualFilter::Or(filters) => filters.iter().any(|f| f.matches(row, schema)),
303 VirtualFilter::Not(filter) => !filter.matches(row, schema),
304 }
305 }
306
307 fn compare_values<F>(val: Option<&SochValue>, other: &SochValue, cmp: F) -> bool
308 where
309 F: Fn(i64, i64) -> bool,
310 {
311 match (val, other) {
312 (Some(SochValue::Int(a)), SochValue::Int(b)) => cmp(*a, *b),
313 (Some(SochValue::UInt(a)), SochValue::UInt(b)) => cmp(*a as i64, *b as i64),
314 (Some(SochValue::Float(a)), SochValue::Float(b)) => {
315 cmp((*a * 1000.0) as i64, (*b * 1000.0) as i64)
316 }
317 _ => false,
318 }
319 }
320
321 fn match_like(s: &str, pattern: &str) -> bool {
322 if pattern.starts_with('%') && pattern.ends_with('%') {
324 let inner = &pattern[1..pattern.len() - 1];
325 s.contains(inner)
326 } else if let Some(suffix) = pattern.strip_prefix('%') {
327 s.ends_with(suffix)
328 } else if let Some(prefix) = pattern.strip_suffix('%') {
329 s.starts_with(prefix)
330 } else {
331 s == pattern
332 }
333 }
334}
335
336#[derive(Debug, Clone, Default)]
338pub struct VirtualTableStats {
339 pub row_count: Option<u64>,
341 pub size_bytes: Option<u64>,
343 pub last_modified: Option<u64>,
345}
346
347#[derive(Debug, Clone)]
349pub enum VirtualTableError {
350 NotFound(String),
352 ColumnNotFound(String),
354 NoPrimaryKey,
356 PluginError(String),
358 ScanFailed(String),
360 InvalidFilter(String),
362}
363
364impl std::fmt::Display for VirtualTableError {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 match self {
367 Self::NotFound(name) => write!(f, "virtual table not found: {}", name),
368 Self::ColumnNotFound(name) => write!(f, "column not found: {}", name),
369 Self::NoPrimaryKey => write!(f, "no primary key defined"),
370 Self::PluginError(msg) => write!(f, "plugin error: {}", msg),
371 Self::ScanFailed(msg) => write!(f, "scan failed: {}", msg),
372 Self::InvalidFilter(msg) => write!(f, "invalid filter: {}", msg),
373 }
374 }
375}
376
377impl std::error::Error for VirtualTableError {}
378
379pub struct PluginVirtualTable {
385 plugin_name: String,
387 table_name: String,
389 schema: VirtualTableSchema,
391 cache: RwLock<Option<CachedData>>,
393 cache_ttl_secs: u64,
395}
396
397struct CachedData {
399 rows: Vec<VirtualRow>,
400 cached_at: std::time::Instant,
401}
402
403impl PluginVirtualTable {
404 pub fn new(plugin_name: &str, table_name: &str, schema: VirtualTableSchema) -> Self {
406 Self {
407 plugin_name: plugin_name.to_string(),
408 table_name: table_name.to_string(),
409 schema,
410 cache: RwLock::new(None),
411 cache_ttl_secs: 60, }
413 }
414
415 pub fn with_cache_ttl(mut self, secs: u64) -> Self {
417 self.cache_ttl_secs = secs;
418 self
419 }
420
421 pub fn qualified_name(&self) -> String {
423 format!("{}.{}", self.plugin_name, self.table_name)
424 }
425
426 fn is_cache_valid(&self) -> bool {
428 if let Some(cached) = self.cache.read().as_ref() {
429 cached.cached_at.elapsed().as_secs() < self.cache_ttl_secs
430 } else {
431 false
432 }
433 }
434
435 fn update_cache(&self, rows: Vec<VirtualRow>) {
437 *self.cache.write() = Some(CachedData {
438 rows,
439 cached_at: std::time::Instant::now(),
440 });
441 }
442}
443
444impl VirtualTable for PluginVirtualTable {
445 fn schema(&self) -> &VirtualTableSchema {
446 &self.schema
447 }
448
449 fn scan(
450 &self,
451 columns: &[String],
452 filter: Option<&VirtualFilter>,
453 limit: Option<usize>,
454 offset: Option<usize>,
455 ) -> Result<Vec<VirtualRow>, VirtualTableError> {
456 if self.is_cache_valid()
458 && let Some(cached) = self.cache.read().as_ref()
459 {
460 let mut rows = cached.rows.clone();
461
462 if let Some(f) = filter {
464 rows.retain(|r| f.matches(r, &self.schema));
465 }
466
467 if let Some(o) = offset {
469 rows = rows.into_iter().skip(o).collect();
470 }
471
472 if let Some(l) = limit {
474 rows.truncate(l);
475 }
476
477 if !columns.is_empty() && columns[0] != "*" {
479 rows = self.project_columns(&rows, columns);
480 }
481
482 return Ok(rows);
483 }
484
485 let mock_rows = self.generate_mock_data(limit.unwrap_or(100));
488
489 self.update_cache(mock_rows.clone());
491
492 let mut result = mock_rows;
494 if let Some(f) = filter {
495 result.retain(|r| f.matches(r, &self.schema));
496 }
497
498 if let Some(o) = offset {
499 result = result.into_iter().skip(o).collect();
500 }
501
502 if let Some(l) = limit {
503 result.truncate(l);
504 }
505
506 Ok(result)
507 }
508
509 fn refresh(&self) -> Result<(), VirtualTableError> {
510 *self.cache.write() = None;
512 Ok(())
513 }
514}
515
516impl PluginVirtualTable {
517 fn project_columns(&self, rows: &[VirtualRow], columns: &[String]) -> Vec<VirtualRow> {
519 let indices: Vec<usize> = columns
520 .iter()
521 .filter_map(|col| self.schema.columns.iter().position(|c| c.name == *col))
522 .collect();
523
524 rows.iter()
525 .map(|row| {
526 let values: Vec<SochValue> = indices
527 .iter()
528 .map(|&i| row.values.get(i).cloned().unwrap_or(SochValue::Null))
529 .collect();
530 VirtualRow::new(values)
531 })
532 .collect()
533 }
534
535 fn generate_mock_data(&self, count: usize) -> Vec<VirtualRow> {
537 (0..count)
538 .map(|i| {
539 let values: Vec<SochValue> = self
540 .schema
541 .columns
542 .iter()
543 .enumerate()
544 .map(|(col_idx, col)| match col.col_type {
545 VirtualColumnType::Int64 => SochValue::Int(i as i64 + col_idx as i64),
546 VirtualColumnType::UInt64 => SochValue::UInt(i as u64 + col_idx as u64),
547 VirtualColumnType::Float64 => SochValue::Float(i as f64 * 0.1),
548 VirtualColumnType::Text => SochValue::Text(format!("{}_{}", col.name, i)),
549 VirtualColumnType::Bool => SochValue::Bool(i % 2 == 0),
550 _ => SochValue::Null,
551 })
552 .collect();
553 VirtualRow::new(values)
554 })
555 .collect()
556 }
557}
558
559pub struct VirtualTableRegistry {
565 tables: RwLock<HashMap<String, Arc<dyn VirtualTable>>>,
567}
568
569impl Default for VirtualTableRegistry {
570 fn default() -> Self {
571 Self::new()
572 }
573}
574
575impl VirtualTableRegistry {
576 pub fn new() -> Self {
578 Self {
579 tables: RwLock::new(HashMap::new()),
580 }
581 }
582
583 pub fn register(
585 &self,
586 qualified_name: &str,
587 table: Arc<dyn VirtualTable>,
588 ) -> Result<(), VirtualTableError> {
589 let mut tables = self.tables.write();
590
591 if tables.contains_key(qualified_name) {
592 return Err(VirtualTableError::PluginError(format!(
593 "table '{}' already registered",
594 qualified_name
595 )));
596 }
597
598 tables.insert(qualified_name.to_string(), table);
599 Ok(())
600 }
601
602 pub fn unregister(&self, qualified_name: &str) -> Result<(), VirtualTableError> {
604 let mut tables = self.tables.write();
605
606 if tables.remove(qualified_name).is_none() {
607 return Err(VirtualTableError::NotFound(qualified_name.to_string()));
608 }
609
610 Ok(())
611 }
612
613 pub fn get(&self, qualified_name: &str) -> Option<Arc<dyn VirtualTable>> {
615 self.tables.read().get(qualified_name).cloned()
616 }
617
618 pub fn list(&self) -> Vec<String> {
620 self.tables.read().keys().cloned().collect()
621 }
622
623 pub fn execute_select(&self, query: &SelectQuery) -> Result<SochResult, VirtualTableError> {
625 let table = self
626 .get(&query.table)
627 .ok_or_else(|| VirtualTableError::NotFound(query.table.clone()))?;
628
629 let schema = table.schema();
630
631 let filter = query
633 .where_clause
634 .as_ref()
635 .map(VirtualFilter::from_where_clause);
636
637 let rows = table.scan(&query.columns, filter.as_ref(), query.limit, query.offset)?;
639
640 let columns = if query.columns.is_empty() || query.columns[0] == "*" {
642 schema.columns.iter().map(|c| c.name.clone()).collect()
643 } else {
644 query.columns.clone()
645 };
646
647 let result_rows: Vec<Vec<SochValue>> = rows.into_iter().map(|r| r.values).collect();
648
649 Ok(SochResult {
650 table: query.table.clone(),
651 columns,
652 rows: result_rows,
653 })
654 }
655}
656
657#[cfg(test)]
662mod tests {
663 use super::*;
664
665 fn create_test_schema() -> VirtualTableSchema {
666 VirtualTableSchema {
667 name: "test_table".to_string(),
668 columns: vec![
669 VirtualColumnDef {
670 name: "id".to_string(),
671 col_type: VirtualColumnType::Int64,
672 nullable: false,
673 primary_key: true,
674 description: None,
675 },
676 VirtualColumnDef {
677 name: "name".to_string(),
678 col_type: VirtualColumnType::Text,
679 nullable: false,
680 primary_key: false,
681 description: None,
682 },
683 VirtualColumnDef {
684 name: "score".to_string(),
685 col_type: VirtualColumnType::Float64,
686 nullable: true,
687 primary_key: false,
688 description: None,
689 },
690 ],
691 estimated_rows: Some(1000),
692 description: None,
693 }
694 }
695
696 #[test]
697 fn test_plugin_virtual_table_creation() {
698 let schema = create_test_schema();
699 let table = PluginVirtualTable::new("test_plugin", "test_table", schema);
700
701 assert_eq!(table.qualified_name(), "test_plugin.test_table");
702 assert_eq!(table.schema().columns.len(), 3);
703 }
704
705 #[test]
706 fn test_virtual_table_scan() {
707 let schema = create_test_schema();
708 let table = PluginVirtualTable::new("plugin", "table", schema);
709
710 let rows = table.scan(&[], None, Some(10), None).unwrap();
711
712 assert_eq!(rows.len(), 10);
713 assert_eq!(rows[0].values.len(), 3); }
715
716 #[test]
717 fn test_virtual_table_scan_with_filter() {
718 let schema = create_test_schema();
719 let table = PluginVirtualTable::new("plugin", "table", schema.clone());
720
721 let filter = VirtualFilter::Gt("id".to_string(), SochValue::Int(5));
722 let rows = table.scan(&[], Some(&filter), Some(100), None).unwrap();
723
724 for row in &rows {
726 if let Some(SochValue::Int(id)) = row.get_by_name("id", &schema) {
727 assert!(*id > 5);
728 }
729 }
730 }
731
732 #[test]
733 fn test_virtual_filter_matches() {
734 let schema = create_test_schema();
735 let row = VirtualRow::new(vec![
736 SochValue::Int(42),
737 SochValue::Text("Alice".to_string()),
738 SochValue::Float(95.5),
739 ]);
740
741 let filter = VirtualFilter::Eq("id".to_string(), SochValue::Int(42));
743 assert!(filter.matches(&row, &schema));
744
745 let filter = VirtualFilter::Like("name".to_string(), "Al%".to_string());
747 assert!(filter.matches(&row, &schema));
748
749 let filter = VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0));
751 assert!(filter.matches(&row, &schema));
752
753 let filter = VirtualFilter::And(vec![
755 VirtualFilter::Eq("id".to_string(), SochValue::Int(42)),
756 VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0)),
757 ]);
758 assert!(filter.matches(&row, &schema));
759 }
760
761 #[test]
762 fn test_registry_operations() {
763 let registry = VirtualTableRegistry::new();
764 let schema = create_test_schema();
765
766 let table = Arc::new(PluginVirtualTable::new("plugin", "table", schema));
767
768 registry.register("plugin.table", table).unwrap();
770 assert_eq!(registry.list().len(), 1);
771
772 let retrieved = registry.get("plugin.table");
774 assert!(retrieved.is_some());
775
776 registry.unregister("plugin.table").unwrap();
778 assert!(registry.list().is_empty());
779 }
780
781 #[test]
782 fn test_registry_execute_select() {
783 let registry = VirtualTableRegistry::new();
784 let schema = create_test_schema();
785
786 let table = Arc::new(PluginVirtualTable::new("plugin", "data", schema));
787 registry.register("plugin.data", table).unwrap();
788
789 let query = SelectQuery {
790 columns: vec!["id".to_string(), "name".to_string()],
791 table: "plugin.data".to_string(),
792 where_clause: None,
793 order_by: None,
794 limit: Some(5),
795 offset: None,
796 };
797
798 let result = registry.execute_select(&query).unwrap();
799
800 assert_eq!(result.table, "plugin.data");
801 assert_eq!(result.columns, vec!["id", "name"]);
802 assert_eq!(result.rows.len(), 5);
803 }
804
805 #[test]
806 fn test_cache_behavior() {
807 let schema = create_test_schema();
808 let table = PluginVirtualTable::new("plugin", "cached", schema).with_cache_ttl(1); let rows1 = table.scan(&[], None, Some(5), None).unwrap();
812 assert!(table.is_cache_valid());
813
814 let rows2 = table.scan(&[], None, Some(5), None).unwrap();
816 assert_eq!(rows1.len(), rows2.len());
817
818 table.refresh().unwrap();
820 assert!(!table.is_cache_valid());
821 }
822
823 #[test]
824 fn test_column_projection() {
825 let schema = create_test_schema();
826 let table = PluginVirtualTable::new("plugin", "table", schema);
827
828 let rows = table
830 .scan(&["id".to_string(), "name".to_string()], None, Some(5), None)
831 .unwrap();
832
833 assert!(!rows.is_empty());
836 }
837}