1use crate::soch_ql::{SelectQuery, SochResult, SochValue, WhereClause};
43use parking_lot::RwLock;
44use std::collections::HashMap;
45use std::sync::Arc;
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum VirtualColumnType {
54 Bool,
55 Int64,
56 UInt64,
57 Float64,
58 Text,
59 Binary,
60 Timestamp,
61 Json,
62}
63
64#[derive(Debug, Clone)]
66pub struct VirtualColumnDef {
67 pub name: String,
69 pub col_type: VirtualColumnType,
71 pub nullable: bool,
73 pub primary_key: bool,
75 pub description: Option<String>,
77}
78
79#[derive(Debug, Clone)]
81pub struct VirtualTableSchema {
82 pub name: String,
84 pub columns: Vec<VirtualColumnDef>,
86 pub estimated_rows: Option<u64>,
88 pub description: Option<String>,
90}
91
92pub trait VirtualTable: Send + Sync {
96 fn schema(&self) -> &VirtualTableSchema;
98
99 fn scan(
101 &self,
102 columns: &[String],
103 filter: Option<&VirtualFilter>,
104 limit: Option<usize>,
105 offset: Option<usize>,
106 ) -> Result<Vec<VirtualRow>, VirtualTableError>;
107
108 fn get(&self, key: &SochValue) -> Result<Option<VirtualRow>, VirtualTableError> {
110 let schema = self.schema();
112 let pk_col = schema
113 .columns
114 .iter()
115 .find(|c| c.primary_key)
116 .map(|c| c.name.clone());
117
118 if let Some(pk) = pk_col {
119 let filter = VirtualFilter::Eq(pk, key.clone());
120 let rows = self.scan(&[], Some(&filter), Some(1), None)?;
121 Ok(rows.into_iter().next())
122 } else {
123 Err(VirtualTableError::NoPrimaryKey)
124 }
125 }
126
127 fn stats(&self) -> VirtualTableStats {
129 VirtualTableStats {
130 row_count: self.schema().estimated_rows,
131 size_bytes: None,
132 last_modified: None,
133 }
134 }
135
136 fn refresh(&self) -> Result<(), VirtualTableError> {
138 Ok(()) }
140}
141
142#[derive(Debug, Clone)]
144pub struct VirtualRow {
145 pub values: Vec<SochValue>,
147}
148
149impl VirtualRow {
150 pub fn new(values: Vec<SochValue>) -> Self {
152 Self { values }
153 }
154
155 pub fn get(&self, idx: usize) -> Option<&SochValue> {
157 self.values.get(idx)
158 }
159
160 pub fn get_by_name<'a>(
162 &'a self,
163 name: &str,
164 schema: &VirtualTableSchema,
165 ) -> Option<&'a SochValue> {
166 schema
167 .columns
168 .iter()
169 .position(|c| c.name == name)
170 .and_then(|idx| self.values.get(idx))
171 }
172}
173
174#[derive(Debug, Clone)]
176pub enum VirtualFilter {
177 Eq(String, SochValue),
179 Ne(String, SochValue),
181 Lt(String, SochValue),
183 Le(String, SochValue),
185 Gt(String, SochValue),
187 Ge(String, SochValue),
189 Like(String, String),
191 In(String, Vec<SochValue>),
193 Between(String, SochValue, SochValue),
195 IsNull(String),
197 IsNotNull(String),
199 And(Vec<VirtualFilter>),
201 Or(Vec<VirtualFilter>),
203 Not(Box<VirtualFilter>),
205}
206
207impl VirtualFilter {
208 pub fn from_where_clause(where_clause: &WhereClause) -> Self {
210 let filters: Vec<VirtualFilter> = where_clause
211 .conditions
212 .iter()
213 .map(|c| {
214 use crate::soch_ql::ComparisonOp::*;
215 match c.operator {
216 Eq => VirtualFilter::Eq(c.column.clone(), c.value.clone()),
217 Ne => VirtualFilter::Ne(c.column.clone(), c.value.clone()),
218 Lt => VirtualFilter::Lt(c.column.clone(), c.value.clone()),
219 Le => VirtualFilter::Le(c.column.clone(), c.value.clone()),
220 Gt => VirtualFilter::Gt(c.column.clone(), c.value.clone()),
221 Ge => VirtualFilter::Ge(c.column.clone(), c.value.clone()),
222 Like => {
223 if let SochValue::Text(pattern) = &c.value {
224 VirtualFilter::Like(c.column.clone(), pattern.clone())
225 } else {
226 VirtualFilter::Like(c.column.clone(), "".to_string())
227 }
228 }
229 In => VirtualFilter::In(c.column.clone(), vec![c.value.clone()]),
230 SimilarTo => {
231 if let SochValue::Text(pattern) = &c.value {
234 VirtualFilter::Like(c.column.clone(), pattern.clone())
235 } else {
236 VirtualFilter::Like(c.column.clone(), "".to_string())
237 }
238 }
239 }
240 })
241 .collect();
242
243 match where_clause.operator {
244 crate::soch_ql::LogicalOp::And => VirtualFilter::And(filters),
245 crate::soch_ql::LogicalOp::Or => VirtualFilter::Or(filters),
246 }
247 }
248
249 pub fn matches(&self, row: &VirtualRow, schema: &VirtualTableSchema) -> bool {
251 match self {
252 VirtualFilter::Eq(col, value) => row
253 .get_by_name(col, schema)
254 .map(|v| v == value)
255 .unwrap_or(false),
256 VirtualFilter::Ne(col, value) => row
257 .get_by_name(col, schema)
258 .map(|v| v != value)
259 .unwrap_or(false),
260 VirtualFilter::Lt(col, value) => {
261 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a < b)
262 }
263 VirtualFilter::Le(col, value) => {
264 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a <= b)
265 }
266 VirtualFilter::Gt(col, value) => {
267 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a > b)
268 }
269 VirtualFilter::Ge(col, value) => {
270 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a >= b)
271 }
272 VirtualFilter::Like(col, pattern) => row
273 .get_by_name(col, schema)
274 .and_then(|v| match v {
275 SochValue::Text(s) => Some(Self::match_like(s, pattern)),
276 _ => None,
277 })
278 .unwrap_or(false),
279 VirtualFilter::In(col, values) => row
280 .get_by_name(col, schema)
281 .map(|v| values.contains(v))
282 .unwrap_or(false),
283 VirtualFilter::Between(col, low, high) => row
284 .get_by_name(col, schema)
285 .map(|v| {
286 Self::compare_values(Some(v), low, |a, b| a >= b)
287 && Self::compare_values(Some(v), high, |a, b| a <= b)
288 })
289 .unwrap_or(false),
290 VirtualFilter::IsNull(col) => row
291 .get_by_name(col, schema)
292 .map(|v| *v == SochValue::Null)
293 .unwrap_or(true),
294 VirtualFilter::IsNotNull(col) => row
295 .get_by_name(col, schema)
296 .map(|v| *v != SochValue::Null)
297 .unwrap_or(false),
298 VirtualFilter::And(filters) => filters.iter().all(|f| f.matches(row, schema)),
299 VirtualFilter::Or(filters) => filters.iter().any(|f| f.matches(row, schema)),
300 VirtualFilter::Not(filter) => !filter.matches(row, schema),
301 }
302 }
303
304 fn compare_values<F>(val: Option<&SochValue>, other: &SochValue, cmp: F) -> bool
305 where
306 F: Fn(i64, i64) -> bool,
307 {
308 match (val, other) {
309 (Some(SochValue::Int(a)), SochValue::Int(b)) => cmp(*a, *b),
310 (Some(SochValue::UInt(a)), SochValue::UInt(b)) => cmp(*a as i64, *b as i64),
311 (Some(SochValue::Float(a)), SochValue::Float(b)) => {
312 cmp((*a * 1000.0) as i64, (*b * 1000.0) as i64)
313 }
314 _ => false,
315 }
316 }
317
318 fn match_like(s: &str, pattern: &str) -> bool {
319 if pattern.starts_with('%') && pattern.ends_with('%') {
321 let inner = &pattern[1..pattern.len() - 1];
322 s.contains(inner)
323 } else if let Some(suffix) = pattern.strip_prefix('%') {
324 s.ends_with(suffix)
325 } else if let Some(prefix) = pattern.strip_suffix('%') {
326 s.starts_with(prefix)
327 } else {
328 s == pattern
329 }
330 }
331}
332
333#[derive(Debug, Clone, Default)]
335pub struct VirtualTableStats {
336 pub row_count: Option<u64>,
338 pub size_bytes: Option<u64>,
340 pub last_modified: Option<u64>,
342}
343
344#[derive(Debug, Clone)]
346pub enum VirtualTableError {
347 NotFound(String),
349 ColumnNotFound(String),
351 NoPrimaryKey,
353 PluginError(String),
355 ScanFailed(String),
357 InvalidFilter(String),
359}
360
361impl std::fmt::Display for VirtualTableError {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 match self {
364 Self::NotFound(name) => write!(f, "virtual table not found: {}", name),
365 Self::ColumnNotFound(name) => write!(f, "column not found: {}", name),
366 Self::NoPrimaryKey => write!(f, "no primary key defined"),
367 Self::PluginError(msg) => write!(f, "plugin error: {}", msg),
368 Self::ScanFailed(msg) => write!(f, "scan failed: {}", msg),
369 Self::InvalidFilter(msg) => write!(f, "invalid filter: {}", msg),
370 }
371 }
372}
373
374impl std::error::Error for VirtualTableError {}
375
376pub struct PluginVirtualTable {
382 plugin_name: String,
384 table_name: String,
386 schema: VirtualTableSchema,
388 cache: RwLock<Option<CachedData>>,
390 cache_ttl_secs: u64,
392}
393
394struct CachedData {
396 rows: Vec<VirtualRow>,
397 cached_at: std::time::Instant,
398}
399
400impl PluginVirtualTable {
401 pub fn new(plugin_name: &str, table_name: &str, schema: VirtualTableSchema) -> Self {
403 Self {
404 plugin_name: plugin_name.to_string(),
405 table_name: table_name.to_string(),
406 schema,
407 cache: RwLock::new(None),
408 cache_ttl_secs: 60, }
410 }
411
412 pub fn with_cache_ttl(mut self, secs: u64) -> Self {
414 self.cache_ttl_secs = secs;
415 self
416 }
417
418 pub fn qualified_name(&self) -> String {
420 format!("{}.{}", self.plugin_name, self.table_name)
421 }
422
423 fn is_cache_valid(&self) -> bool {
425 if let Some(cached) = self.cache.read().as_ref() {
426 cached.cached_at.elapsed().as_secs() < self.cache_ttl_secs
427 } else {
428 false
429 }
430 }
431
432 fn update_cache(&self, rows: Vec<VirtualRow>) {
434 *self.cache.write() = Some(CachedData {
435 rows,
436 cached_at: std::time::Instant::now(),
437 });
438 }
439}
440
441impl VirtualTable for PluginVirtualTable {
442 fn schema(&self) -> &VirtualTableSchema {
443 &self.schema
444 }
445
446 fn scan(
447 &self,
448 columns: &[String],
449 filter: Option<&VirtualFilter>,
450 limit: Option<usize>,
451 offset: Option<usize>,
452 ) -> Result<Vec<VirtualRow>, VirtualTableError> {
453 if self.is_cache_valid()
455 && let Some(cached) = self.cache.read().as_ref()
456 {
457 let mut rows = cached.rows.clone();
458
459 if let Some(f) = filter {
461 rows.retain(|r| f.matches(r, &self.schema));
462 }
463
464 if let Some(o) = offset {
466 rows = rows.into_iter().skip(o).collect();
467 }
468
469 if let Some(l) = limit {
471 rows.truncate(l);
472 }
473
474 if !columns.is_empty() && columns[0] != "*" {
476 rows = self.project_columns(&rows, columns);
477 }
478
479 return Ok(rows);
480 }
481
482 let mock_rows = self.generate_mock_data(limit.unwrap_or(100));
485
486 self.update_cache(mock_rows.clone());
488
489 let mut result = mock_rows;
491 if let Some(f) = filter {
492 result.retain(|r| f.matches(r, &self.schema));
493 }
494
495 if let Some(o) = offset {
496 result = result.into_iter().skip(o).collect();
497 }
498
499 if let Some(l) = limit {
500 result.truncate(l);
501 }
502
503 Ok(result)
504 }
505
506 fn refresh(&self) -> Result<(), VirtualTableError> {
507 *self.cache.write() = None;
509 Ok(())
510 }
511}
512
513impl PluginVirtualTable {
514 fn project_columns(&self, rows: &[VirtualRow], columns: &[String]) -> Vec<VirtualRow> {
516 let indices: Vec<usize> = columns
517 .iter()
518 .filter_map(|col| self.schema.columns.iter().position(|c| c.name == *col))
519 .collect();
520
521 rows.iter()
522 .map(|row| {
523 let values: Vec<SochValue> = indices
524 .iter()
525 .map(|&i| row.values.get(i).cloned().unwrap_or(SochValue::Null))
526 .collect();
527 VirtualRow::new(values)
528 })
529 .collect()
530 }
531
532 fn generate_mock_data(&self, count: usize) -> Vec<VirtualRow> {
534 (0..count)
535 .map(|i| {
536 let values: Vec<SochValue> = self
537 .schema
538 .columns
539 .iter()
540 .enumerate()
541 .map(|(col_idx, col)| match col.col_type {
542 VirtualColumnType::Int64 => SochValue::Int(i as i64 + col_idx as i64),
543 VirtualColumnType::UInt64 => SochValue::UInt(i as u64 + col_idx as u64),
544 VirtualColumnType::Float64 => SochValue::Float(i as f64 * 0.1),
545 VirtualColumnType::Text => SochValue::Text(format!("{}_{}", col.name, i)),
546 VirtualColumnType::Bool => SochValue::Bool(i % 2 == 0),
547 _ => SochValue::Null,
548 })
549 .collect();
550 VirtualRow::new(values)
551 })
552 .collect()
553 }
554}
555
556pub struct VirtualTableRegistry {
562 tables: RwLock<HashMap<String, Arc<dyn VirtualTable>>>,
564}
565
566impl Default for VirtualTableRegistry {
567 fn default() -> Self {
568 Self::new()
569 }
570}
571
572impl VirtualTableRegistry {
573 pub fn new() -> Self {
575 Self {
576 tables: RwLock::new(HashMap::new()),
577 }
578 }
579
580 pub fn register(
582 &self,
583 qualified_name: &str,
584 table: Arc<dyn VirtualTable>,
585 ) -> Result<(), VirtualTableError> {
586 let mut tables = self.tables.write();
587
588 if tables.contains_key(qualified_name) {
589 return Err(VirtualTableError::PluginError(format!(
590 "table '{}' already registered",
591 qualified_name
592 )));
593 }
594
595 tables.insert(qualified_name.to_string(), table);
596 Ok(())
597 }
598
599 pub fn unregister(&self, qualified_name: &str) -> Result<(), VirtualTableError> {
601 let mut tables = self.tables.write();
602
603 if tables.remove(qualified_name).is_none() {
604 return Err(VirtualTableError::NotFound(qualified_name.to_string()));
605 }
606
607 Ok(())
608 }
609
610 pub fn get(&self, qualified_name: &str) -> Option<Arc<dyn VirtualTable>> {
612 self.tables.read().get(qualified_name).cloned()
613 }
614
615 pub fn list(&self) -> Vec<String> {
617 self.tables.read().keys().cloned().collect()
618 }
619
620 pub fn execute_select(&self, query: &SelectQuery) -> Result<SochResult, VirtualTableError> {
622 let table = self
623 .get(&query.table)
624 .ok_or_else(|| VirtualTableError::NotFound(query.table.clone()))?;
625
626 let schema = table.schema();
627
628 let filter = query
630 .where_clause
631 .as_ref()
632 .map(VirtualFilter::from_where_clause);
633
634 let rows = table.scan(&query.columns, filter.as_ref(), query.limit, query.offset)?;
636
637 let columns = if query.columns.is_empty() || query.columns[0] == "*" {
639 schema.columns.iter().map(|c| c.name.clone()).collect()
640 } else {
641 query.columns.clone()
642 };
643
644 let result_rows: Vec<Vec<SochValue>> = rows.into_iter().map(|r| r.values).collect();
645
646 Ok(SochResult {
647 table: query.table.clone(),
648 columns,
649 rows: result_rows,
650 })
651 }
652}
653
654#[cfg(test)]
659mod tests {
660 use super::*;
661
662 fn create_test_schema() -> VirtualTableSchema {
663 VirtualTableSchema {
664 name: "test_table".to_string(),
665 columns: vec![
666 VirtualColumnDef {
667 name: "id".to_string(),
668 col_type: VirtualColumnType::Int64,
669 nullable: false,
670 primary_key: true,
671 description: None,
672 },
673 VirtualColumnDef {
674 name: "name".to_string(),
675 col_type: VirtualColumnType::Text,
676 nullable: false,
677 primary_key: false,
678 description: None,
679 },
680 VirtualColumnDef {
681 name: "score".to_string(),
682 col_type: VirtualColumnType::Float64,
683 nullable: true,
684 primary_key: false,
685 description: None,
686 },
687 ],
688 estimated_rows: Some(1000),
689 description: None,
690 }
691 }
692
693 #[test]
694 fn test_plugin_virtual_table_creation() {
695 let schema = create_test_schema();
696 let table = PluginVirtualTable::new("test_plugin", "test_table", schema);
697
698 assert_eq!(table.qualified_name(), "test_plugin.test_table");
699 assert_eq!(table.schema().columns.len(), 3);
700 }
701
702 #[test]
703 fn test_virtual_table_scan() {
704 let schema = create_test_schema();
705 let table = PluginVirtualTable::new("plugin", "table", schema);
706
707 let rows = table.scan(&[], None, Some(10), None).unwrap();
708
709 assert_eq!(rows.len(), 10);
710 assert_eq!(rows[0].values.len(), 3); }
712
713 #[test]
714 fn test_virtual_table_scan_with_filter() {
715 let schema = create_test_schema();
716 let table = PluginVirtualTable::new("plugin", "table", schema.clone());
717
718 let filter = VirtualFilter::Gt("id".to_string(), SochValue::Int(5));
719 let rows = table.scan(&[], Some(&filter), Some(100), None).unwrap();
720
721 for row in &rows {
723 if let Some(SochValue::Int(id)) = row.get_by_name("id", &schema) {
724 assert!(*id > 5);
725 }
726 }
727 }
728
729 #[test]
730 fn test_virtual_filter_matches() {
731 let schema = create_test_schema();
732 let row = VirtualRow::new(vec![
733 SochValue::Int(42),
734 SochValue::Text("Alice".to_string()),
735 SochValue::Float(95.5),
736 ]);
737
738 let filter = VirtualFilter::Eq("id".to_string(), SochValue::Int(42));
740 assert!(filter.matches(&row, &schema));
741
742 let filter = VirtualFilter::Like("name".to_string(), "Al%".to_string());
744 assert!(filter.matches(&row, &schema));
745
746 let filter = VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0));
748 assert!(filter.matches(&row, &schema));
749
750 let filter = VirtualFilter::And(vec![
752 VirtualFilter::Eq("id".to_string(), SochValue::Int(42)),
753 VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0)),
754 ]);
755 assert!(filter.matches(&row, &schema));
756 }
757
758 #[test]
759 fn test_registry_operations() {
760 let registry = VirtualTableRegistry::new();
761 let schema = create_test_schema();
762
763 let table = Arc::new(PluginVirtualTable::new("plugin", "table", schema));
764
765 registry.register("plugin.table", table).unwrap();
767 assert_eq!(registry.list().len(), 1);
768
769 let retrieved = registry.get("plugin.table");
771 assert!(retrieved.is_some());
772
773 registry.unregister("plugin.table").unwrap();
775 assert!(registry.list().is_empty());
776 }
777
778 #[test]
779 fn test_registry_execute_select() {
780 let registry = VirtualTableRegistry::new();
781 let schema = create_test_schema();
782
783 let table = Arc::new(PluginVirtualTable::new("plugin", "data", schema));
784 registry.register("plugin.data", table).unwrap();
785
786 let query = SelectQuery {
787 columns: vec!["id".to_string(), "name".to_string()],
788 table: "plugin.data".to_string(),
789 where_clause: None,
790 order_by: None,
791 limit: Some(5),
792 offset: None,
793 };
794
795 let result = registry.execute_select(&query).unwrap();
796
797 assert_eq!(result.table, "plugin.data");
798 assert_eq!(result.columns, vec!["id", "name"]);
799 assert_eq!(result.rows.len(), 5);
800 }
801
802 #[test]
803 fn test_cache_behavior() {
804 let schema = create_test_schema();
805 let table = PluginVirtualTable::new("plugin", "cached", schema).with_cache_ttl(1); let rows1 = table.scan(&[], None, Some(5), None).unwrap();
809 assert!(table.is_cache_valid());
810
811 let rows2 = table.scan(&[], None, Some(5), None).unwrap();
813 assert_eq!(rows1.len(), rows2.len());
814
815 table.refresh().unwrap();
817 assert!(!table.is_cache_valid());
818 }
819
820 #[test]
821 fn test_column_projection() {
822 let schema = create_test_schema();
823 let table = PluginVirtualTable::new("plugin", "table", schema);
824
825 let rows = table
827 .scan(&["id".to_string(), "name".to_string()], None, Some(5), None)
828 .unwrap();
829
830 assert!(!rows.is_empty());
833 }
834}