1use crate::soch_ql::{SelectQuery, SochResult, SochValue, WhereClause};
46use parking_lot::RwLock;
47use std::collections::HashMap;
48use std::sync::Arc;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum VirtualColumnType {
57 Bool,
58 Int64,
59 UInt64,
60 Float64,
61 Text,
62 Binary,
63 Timestamp,
64 Json,
65}
66
67#[derive(Debug, Clone)]
69pub struct VirtualColumnDef {
70 pub name: String,
72 pub col_type: VirtualColumnType,
74 pub nullable: bool,
76 pub primary_key: bool,
78 pub description: Option<String>,
80}
81
82#[derive(Debug, Clone)]
84pub struct VirtualTableSchema {
85 pub name: String,
87 pub columns: Vec<VirtualColumnDef>,
89 pub estimated_rows: Option<u64>,
91 pub description: Option<String>,
93}
94
95pub trait VirtualTable: Send + Sync {
99 fn schema(&self) -> &VirtualTableSchema;
101
102 fn scan(
104 &self,
105 columns: &[String],
106 filter: Option<&VirtualFilter>,
107 limit: Option<usize>,
108 offset: Option<usize>,
109 ) -> Result<Vec<VirtualRow>, VirtualTableError>;
110
111 fn get(&self, key: &SochValue) -> Result<Option<VirtualRow>, VirtualTableError> {
113 let schema = self.schema();
115 let pk_col = schema
116 .columns
117 .iter()
118 .find(|c| c.primary_key)
119 .map(|c| c.name.clone());
120
121 if let Some(pk) = pk_col {
122 let filter = VirtualFilter::Eq(pk, key.clone());
123 let rows = self.scan(&[], Some(&filter), Some(1), None)?;
124 Ok(rows.into_iter().next())
125 } else {
126 Err(VirtualTableError::NoPrimaryKey)
127 }
128 }
129
130 fn stats(&self) -> VirtualTableStats {
132 VirtualTableStats {
133 row_count: self.schema().estimated_rows,
134 size_bytes: None,
135 last_modified: None,
136 }
137 }
138
139 fn refresh(&self) -> Result<(), VirtualTableError> {
141 Ok(()) }
143}
144
145#[derive(Debug, Clone)]
147pub struct VirtualRow {
148 pub values: Vec<SochValue>,
150}
151
152impl VirtualRow {
153 pub fn new(values: Vec<SochValue>) -> Self {
155 Self { values }
156 }
157
158 pub fn get(&self, idx: usize) -> Option<&SochValue> {
160 self.values.get(idx)
161 }
162
163 pub fn get_by_name<'a>(
165 &'a self,
166 name: &str,
167 schema: &VirtualTableSchema,
168 ) -> Option<&'a SochValue> {
169 schema
170 .columns
171 .iter()
172 .position(|c| c.name == name)
173 .and_then(|idx| self.values.get(idx))
174 }
175}
176
177#[derive(Debug, Clone)]
179pub enum VirtualFilter {
180 Eq(String, SochValue),
182 Ne(String, SochValue),
184 Lt(String, SochValue),
186 Le(String, SochValue),
188 Gt(String, SochValue),
190 Ge(String, SochValue),
192 Like(String, String),
194 In(String, Vec<SochValue>),
196 Between(String, SochValue, SochValue),
198 IsNull(String),
200 IsNotNull(String),
202 And(Vec<VirtualFilter>),
204 Or(Vec<VirtualFilter>),
206 Not(Box<VirtualFilter>),
208}
209
210impl VirtualFilter {
211 pub fn from_where_clause(where_clause: &WhereClause) -> Self {
213 let filters: Vec<VirtualFilter> = where_clause
214 .conditions
215 .iter()
216 .map(|c| {
217 use crate::soch_ql::ComparisonOp::*;
218 match c.operator {
219 Eq => VirtualFilter::Eq(c.column.clone(), c.value.clone()),
220 Ne => VirtualFilter::Ne(c.column.clone(), c.value.clone()),
221 Lt => VirtualFilter::Lt(c.column.clone(), c.value.clone()),
222 Le => VirtualFilter::Le(c.column.clone(), c.value.clone()),
223 Gt => VirtualFilter::Gt(c.column.clone(), c.value.clone()),
224 Ge => VirtualFilter::Ge(c.column.clone(), c.value.clone()),
225 Like => {
226 if let SochValue::Text(pattern) = &c.value {
227 VirtualFilter::Like(c.column.clone(), pattern.clone())
228 } else {
229 VirtualFilter::Like(c.column.clone(), "".to_string())
230 }
231 }
232 In => VirtualFilter::In(c.column.clone(), vec![c.value.clone()]),
233 SimilarTo => {
234 if let SochValue::Text(pattern) = &c.value {
237 VirtualFilter::Like(c.column.clone(), pattern.clone())
238 } else {
239 VirtualFilter::Like(c.column.clone(), "".to_string())
240 }
241 }
242 }
243 })
244 .collect();
245
246 match where_clause.operator {
247 crate::soch_ql::LogicalOp::And => VirtualFilter::And(filters),
248 crate::soch_ql::LogicalOp::Or => VirtualFilter::Or(filters),
249 }
250 }
251
252 pub fn matches(&self, row: &VirtualRow, schema: &VirtualTableSchema) -> bool {
254 match self {
255 VirtualFilter::Eq(col, value) => row
256 .get_by_name(col, schema)
257 .map(|v| v == value)
258 .unwrap_or(false),
259 VirtualFilter::Ne(col, value) => row
260 .get_by_name(col, schema)
261 .map(|v| v != value)
262 .unwrap_or(false),
263 VirtualFilter::Lt(col, value) => {
264 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a < b)
265 }
266 VirtualFilter::Le(col, value) => {
267 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a <= b)
268 }
269 VirtualFilter::Gt(col, value) => {
270 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a > b)
271 }
272 VirtualFilter::Ge(col, value) => {
273 Self::compare_values(row.get_by_name(col, schema), value, |a, b| a >= b)
274 }
275 VirtualFilter::Like(col, pattern) => row
276 .get_by_name(col, schema)
277 .and_then(|v| match v {
278 SochValue::Text(s) => Some(Self::match_like(s, pattern)),
279 _ => None,
280 })
281 .unwrap_or(false),
282 VirtualFilter::In(col, values) => row
283 .get_by_name(col, schema)
284 .map(|v| values.contains(v))
285 .unwrap_or(false),
286 VirtualFilter::Between(col, low, high) => row
287 .get_by_name(col, schema)
288 .map(|v| {
289 Self::compare_values(Some(v), low, |a, b| a >= b)
290 && Self::compare_values(Some(v), high, |a, b| a <= b)
291 })
292 .unwrap_or(false),
293 VirtualFilter::IsNull(col) => row
294 .get_by_name(col, schema)
295 .map(|v| *v == SochValue::Null)
296 .unwrap_or(true),
297 VirtualFilter::IsNotNull(col) => row
298 .get_by_name(col, schema)
299 .map(|v| *v != SochValue::Null)
300 .unwrap_or(false),
301 VirtualFilter::And(filters) => filters.iter().all(|f| f.matches(row, schema)),
302 VirtualFilter::Or(filters) => filters.iter().any(|f| f.matches(row, schema)),
303 VirtualFilter::Not(filter) => !filter.matches(row, schema),
304 }
305 }
306
307 fn compare_values<F>(val: Option<&SochValue>, other: &SochValue, cmp: F) -> bool
308 where
309 F: Fn(i64, i64) -> bool,
310 {
311 match (val, other) {
312 (Some(SochValue::Int(a)), SochValue::Int(b)) => cmp(*a, *b),
313 (Some(SochValue::UInt(a)), SochValue::UInt(b)) => cmp(*a as i64, *b as i64),
314 (Some(SochValue::Float(a)), SochValue::Float(b)) => {
315 cmp((*a * 1000.0) as i64, (*b * 1000.0) as i64)
316 }
317 _ => false,
318 }
319 }
320
321 fn match_like(s: &str, pattern: &str) -> bool {
322 crate::like::like_match(s, pattern)
324 }
325}
326
327#[derive(Debug, Clone, Default)]
329pub struct VirtualTableStats {
330 pub row_count: Option<u64>,
332 pub size_bytes: Option<u64>,
334 pub last_modified: Option<u64>,
336}
337
338#[derive(Debug, Clone)]
340pub enum VirtualTableError {
341 NotFound(String),
343 ColumnNotFound(String),
345 NoPrimaryKey,
347 PluginError(String),
349 ScanFailed(String),
351 InvalidFilter(String),
353}
354
355impl std::fmt::Display for VirtualTableError {
356 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357 match self {
358 Self::NotFound(name) => write!(f, "virtual table not found: {}", name),
359 Self::ColumnNotFound(name) => write!(f, "column not found: {}", name),
360 Self::NoPrimaryKey => write!(f, "no primary key defined"),
361 Self::PluginError(msg) => write!(f, "plugin error: {}", msg),
362 Self::ScanFailed(msg) => write!(f, "scan failed: {}", msg),
363 Self::InvalidFilter(msg) => write!(f, "invalid filter: {}", msg),
364 }
365 }
366}
367
368impl std::error::Error for VirtualTableError {}
369
370pub struct PluginVirtualTable {
376 plugin_name: String,
378 table_name: String,
380 schema: VirtualTableSchema,
382 cache: RwLock<Option<CachedData>>,
384 cache_ttl_secs: u64,
386}
387
388struct CachedData {
390 rows: Vec<VirtualRow>,
391 cached_at: std::time::Instant,
392}
393
394impl PluginVirtualTable {
395 pub fn new(plugin_name: &str, table_name: &str, schema: VirtualTableSchema) -> Self {
397 Self {
398 plugin_name: plugin_name.to_string(),
399 table_name: table_name.to_string(),
400 schema,
401 cache: RwLock::new(None),
402 cache_ttl_secs: 60, }
404 }
405
406 pub fn with_cache_ttl(mut self, secs: u64) -> Self {
408 self.cache_ttl_secs = secs;
409 self
410 }
411
412 pub fn qualified_name(&self) -> String {
414 format!("{}.{}", self.plugin_name, self.table_name)
415 }
416
417 fn is_cache_valid(&self) -> bool {
419 if let Some(cached) = self.cache.read().as_ref() {
420 cached.cached_at.elapsed().as_secs() < self.cache_ttl_secs
421 } else {
422 false
423 }
424 }
425
426 fn update_cache(&self, rows: Vec<VirtualRow>) {
428 *self.cache.write() = Some(CachedData {
429 rows,
430 cached_at: std::time::Instant::now(),
431 });
432 }
433}
434
435impl VirtualTable for PluginVirtualTable {
436 fn schema(&self) -> &VirtualTableSchema {
437 &self.schema
438 }
439
440 fn scan(
441 &self,
442 columns: &[String],
443 filter: Option<&VirtualFilter>,
444 limit: Option<usize>,
445 offset: Option<usize>,
446 ) -> Result<Vec<VirtualRow>, VirtualTableError> {
447 if self.is_cache_valid()
449 && let Some(cached) = self.cache.read().as_ref()
450 {
451 let mut rows = cached.rows.clone();
452
453 if let Some(f) = filter {
455 rows.retain(|r| f.matches(r, &self.schema));
456 }
457
458 if let Some(o) = offset {
460 rows = rows.into_iter().skip(o).collect();
461 }
462
463 if let Some(l) = limit {
465 rows.truncate(l);
466 }
467
468 if !columns.is_empty() && columns[0] != "*" {
470 rows = self.project_columns(&rows, columns);
471 }
472
473 return Ok(rows);
474 }
475
476 let mock_rows = self.generate_mock_data(limit.unwrap_or(100));
479
480 self.update_cache(mock_rows.clone());
482
483 let mut result = mock_rows;
485 if let Some(f) = filter {
486 result.retain(|r| f.matches(r, &self.schema));
487 }
488
489 if let Some(o) = offset {
490 result = result.into_iter().skip(o).collect();
491 }
492
493 if let Some(l) = limit {
494 result.truncate(l);
495 }
496
497 Ok(result)
498 }
499
500 fn refresh(&self) -> Result<(), VirtualTableError> {
501 *self.cache.write() = None;
503 Ok(())
504 }
505}
506
507impl PluginVirtualTable {
508 fn project_columns(&self, rows: &[VirtualRow], columns: &[String]) -> Vec<VirtualRow> {
510 let indices: Vec<usize> = columns
511 .iter()
512 .filter_map(|col| self.schema.columns.iter().position(|c| c.name == *col))
513 .collect();
514
515 rows.iter()
516 .map(|row| {
517 let values: Vec<SochValue> = indices
518 .iter()
519 .map(|&i| row.values.get(i).cloned().unwrap_or(SochValue::Null))
520 .collect();
521 VirtualRow::new(values)
522 })
523 .collect()
524 }
525
526 fn generate_mock_data(&self, count: usize) -> Vec<VirtualRow> {
528 (0..count)
529 .map(|i| {
530 let values: Vec<SochValue> = self
531 .schema
532 .columns
533 .iter()
534 .enumerate()
535 .map(|(col_idx, col)| match col.col_type {
536 VirtualColumnType::Int64 => SochValue::Int(i as i64 + col_idx as i64),
537 VirtualColumnType::UInt64 => SochValue::UInt(i as u64 + col_idx as u64),
538 VirtualColumnType::Float64 => SochValue::Float(i as f64 * 0.1),
539 VirtualColumnType::Text => SochValue::Text(format!("{}_{}", col.name, i)),
540 VirtualColumnType::Bool => SochValue::Bool(i % 2 == 0),
541 _ => SochValue::Null,
542 })
543 .collect();
544 VirtualRow::new(values)
545 })
546 .collect()
547 }
548}
549
550pub struct VirtualTableRegistry {
556 tables: RwLock<HashMap<String, Arc<dyn VirtualTable>>>,
558}
559
560impl Default for VirtualTableRegistry {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566impl VirtualTableRegistry {
567 pub fn new() -> Self {
569 Self {
570 tables: RwLock::new(HashMap::new()),
571 }
572 }
573
574 pub fn register(
576 &self,
577 qualified_name: &str,
578 table: Arc<dyn VirtualTable>,
579 ) -> Result<(), VirtualTableError> {
580 let mut tables = self.tables.write();
581
582 if tables.contains_key(qualified_name) {
583 return Err(VirtualTableError::PluginError(format!(
584 "table '{}' already registered",
585 qualified_name
586 )));
587 }
588
589 tables.insert(qualified_name.to_string(), table);
590 Ok(())
591 }
592
593 pub fn unregister(&self, qualified_name: &str) -> Result<(), VirtualTableError> {
595 let mut tables = self.tables.write();
596
597 if tables.remove(qualified_name).is_none() {
598 return Err(VirtualTableError::NotFound(qualified_name.to_string()));
599 }
600
601 Ok(())
602 }
603
604 pub fn get(&self, qualified_name: &str) -> Option<Arc<dyn VirtualTable>> {
606 self.tables.read().get(qualified_name).cloned()
607 }
608
609 pub fn list(&self) -> Vec<String> {
611 self.tables.read().keys().cloned().collect()
612 }
613
614 pub fn execute_select(&self, query: &SelectQuery) -> Result<SochResult, VirtualTableError> {
616 let table = self
617 .get(&query.table)
618 .ok_or_else(|| VirtualTableError::NotFound(query.table.clone()))?;
619
620 let schema = table.schema();
621
622 let filter = query
624 .where_clause
625 .as_ref()
626 .map(VirtualFilter::from_where_clause);
627
628 let rows = table.scan(&query.columns, filter.as_ref(), query.limit, query.offset)?;
630
631 let columns = if query.columns.is_empty() || query.columns[0] == "*" {
633 schema.columns.iter().map(|c| c.name.clone()).collect()
634 } else {
635 query.columns.clone()
636 };
637
638 let result_rows: Vec<Vec<SochValue>> = rows.into_iter().map(|r| r.values).collect();
639
640 Ok(SochResult {
641 table: query.table.clone(),
642 columns,
643 rows: result_rows,
644 })
645 }
646}
647
648#[cfg(test)]
653mod tests {
654 use super::*;
655
656 fn create_test_schema() -> VirtualTableSchema {
657 VirtualTableSchema {
658 name: "test_table".to_string(),
659 columns: vec![
660 VirtualColumnDef {
661 name: "id".to_string(),
662 col_type: VirtualColumnType::Int64,
663 nullable: false,
664 primary_key: true,
665 description: None,
666 },
667 VirtualColumnDef {
668 name: "name".to_string(),
669 col_type: VirtualColumnType::Text,
670 nullable: false,
671 primary_key: false,
672 description: None,
673 },
674 VirtualColumnDef {
675 name: "score".to_string(),
676 col_type: VirtualColumnType::Float64,
677 nullable: true,
678 primary_key: false,
679 description: None,
680 },
681 ],
682 estimated_rows: Some(1000),
683 description: None,
684 }
685 }
686
687 #[test]
688 fn test_plugin_virtual_table_creation() {
689 let schema = create_test_schema();
690 let table = PluginVirtualTable::new("test_plugin", "test_table", schema);
691
692 assert_eq!(table.qualified_name(), "test_plugin.test_table");
693 assert_eq!(table.schema().columns.len(), 3);
694 }
695
696 #[test]
697 fn test_virtual_table_scan() {
698 let schema = create_test_schema();
699 let table = PluginVirtualTable::new("plugin", "table", schema);
700
701 let rows = table.scan(&[], None, Some(10), None).unwrap();
702
703 assert_eq!(rows.len(), 10);
704 assert_eq!(rows[0].values.len(), 3); }
706
707 #[test]
708 fn test_virtual_table_scan_with_filter() {
709 let schema = create_test_schema();
710 let table = PluginVirtualTable::new("plugin", "table", schema.clone());
711
712 let filter = VirtualFilter::Gt("id".to_string(), SochValue::Int(5));
713 let rows = table.scan(&[], Some(&filter), Some(100), None).unwrap();
714
715 for row in &rows {
717 if let Some(SochValue::Int(id)) = row.get_by_name("id", &schema) {
718 assert!(*id > 5);
719 }
720 }
721 }
722
723 #[test]
724 fn test_virtual_filter_matches() {
725 let schema = create_test_schema();
726 let row = VirtualRow::new(vec![
727 SochValue::Int(42),
728 SochValue::Text("Alice".to_string()),
729 SochValue::Float(95.5),
730 ]);
731
732 let filter = VirtualFilter::Eq("id".to_string(), SochValue::Int(42));
734 assert!(filter.matches(&row, &schema));
735
736 let filter = VirtualFilter::Like("name".to_string(), "Al%".to_string());
738 assert!(filter.matches(&row, &schema));
739
740 let filter = VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0));
742 assert!(filter.matches(&row, &schema));
743
744 let filter = VirtualFilter::And(vec![
746 VirtualFilter::Eq("id".to_string(), SochValue::Int(42)),
747 VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0)),
748 ]);
749 assert!(filter.matches(&row, &schema));
750 }
751
752 #[test]
753 fn test_registry_operations() {
754 let registry = VirtualTableRegistry::new();
755 let schema = create_test_schema();
756
757 let table = Arc::new(PluginVirtualTable::new("plugin", "table", schema));
758
759 registry.register("plugin.table", table).unwrap();
761 assert_eq!(registry.list().len(), 1);
762
763 let retrieved = registry.get("plugin.table");
765 assert!(retrieved.is_some());
766
767 registry.unregister("plugin.table").unwrap();
769 assert!(registry.list().is_empty());
770 }
771
772 #[test]
773 fn test_registry_execute_select() {
774 let registry = VirtualTableRegistry::new();
775 let schema = create_test_schema();
776
777 let table = Arc::new(PluginVirtualTable::new("plugin", "data", schema));
778 registry.register("plugin.data", table).unwrap();
779
780 let query = SelectQuery {
781 columns: vec!["id".to_string(), "name".to_string()],
782 table: "plugin.data".to_string(),
783 where_clause: None,
784 order_by: None,
785 limit: Some(5),
786 offset: None,
787 };
788
789 let result = registry.execute_select(&query).unwrap();
790
791 assert_eq!(result.table, "plugin.data");
792 assert_eq!(result.columns, vec!["id", "name"]);
793 assert_eq!(result.rows.len(), 5);
794 }
795
796 #[test]
797 fn test_cache_behavior() {
798 let schema = create_test_schema();
799 let table = PluginVirtualTable::new("plugin", "cached", schema).with_cache_ttl(1); let rows1 = table.scan(&[], None, Some(5), None).unwrap();
803 assert!(table.is_cache_valid());
804
805 let rows2 = table.scan(&[], None, Some(5), None).unwrap();
807 assert_eq!(rows1.len(), rows2.len());
808
809 table.refresh().unwrap();
811 assert!(!table.is_cache_valid());
812 }
813
814 #[test]
815 fn test_column_projection() {
816 let schema = create_test_schema();
817 let table = PluginVirtualTable::new("plugin", "table", schema);
818
819 let rows = table
821 .scan(&["id".to_string(), "name".to_string()], None, Some(5), None)
822 .unwrap();
823
824 assert!(!rows.is_empty());
827 }
828}