use crate::soch_ql::{SelectQuery, SochResult, SochValue, WhereClause};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VirtualColumnType {
Bool,
Int64,
UInt64,
Float64,
Text,
Binary,
Timestamp,
Json,
}
#[derive(Debug, Clone)]
pub struct VirtualColumnDef {
pub name: String,
pub col_type: VirtualColumnType,
pub nullable: bool,
pub primary_key: bool,
pub description: Option<String>,
}
#[derive(Debug, Clone)]
pub struct VirtualTableSchema {
pub name: String,
pub columns: Vec<VirtualColumnDef>,
pub estimated_rows: Option<u64>,
pub description: Option<String>,
}
pub trait VirtualTable: Send + Sync {
fn schema(&self) -> &VirtualTableSchema;
fn scan(
&self,
columns: &[String],
filter: Option<&VirtualFilter>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<VirtualRow>, VirtualTableError>;
fn get(&self, key: &SochValue) -> Result<Option<VirtualRow>, VirtualTableError> {
let schema = self.schema();
let pk_col = schema
.columns
.iter()
.find(|c| c.primary_key)
.map(|c| c.name.clone());
if let Some(pk) = pk_col {
let filter = VirtualFilter::Eq(pk, key.clone());
let rows = self.scan(&[], Some(&filter), Some(1), None)?;
Ok(rows.into_iter().next())
} else {
Err(VirtualTableError::NoPrimaryKey)
}
}
fn stats(&self) -> VirtualTableStats {
VirtualTableStats {
row_count: self.schema().estimated_rows,
size_bytes: None,
last_modified: None,
}
}
fn refresh(&self) -> Result<(), VirtualTableError> {
Ok(()) }
}
#[derive(Debug, Clone)]
pub struct VirtualRow {
pub values: Vec<SochValue>,
}
impl VirtualRow {
pub fn new(values: Vec<SochValue>) -> Self {
Self { values }
}
pub fn get(&self, idx: usize) -> Option<&SochValue> {
self.values.get(idx)
}
pub fn get_by_name<'a>(
&'a self,
name: &str,
schema: &VirtualTableSchema,
) -> Option<&'a SochValue> {
schema
.columns
.iter()
.position(|c| c.name == name)
.and_then(|idx| self.values.get(idx))
}
}
#[derive(Debug, Clone)]
pub enum VirtualFilter {
Eq(String, SochValue),
Ne(String, SochValue),
Lt(String, SochValue),
Le(String, SochValue),
Gt(String, SochValue),
Ge(String, SochValue),
Like(String, String),
In(String, Vec<SochValue>),
Between(String, SochValue, SochValue),
IsNull(String),
IsNotNull(String),
And(Vec<VirtualFilter>),
Or(Vec<VirtualFilter>),
Not(Box<VirtualFilter>),
}
impl VirtualFilter {
pub fn from_where_clause(where_clause: &WhereClause) -> Self {
let filters: Vec<VirtualFilter> = where_clause
.conditions
.iter()
.map(|c| {
use crate::soch_ql::ComparisonOp::*;
match c.operator {
Eq => VirtualFilter::Eq(c.column.clone(), c.value.clone()),
Ne => VirtualFilter::Ne(c.column.clone(), c.value.clone()),
Lt => VirtualFilter::Lt(c.column.clone(), c.value.clone()),
Le => VirtualFilter::Le(c.column.clone(), c.value.clone()),
Gt => VirtualFilter::Gt(c.column.clone(), c.value.clone()),
Ge => VirtualFilter::Ge(c.column.clone(), c.value.clone()),
Like => {
if let SochValue::Text(pattern) = &c.value {
VirtualFilter::Like(c.column.clone(), pattern.clone())
} else {
VirtualFilter::Like(c.column.clone(), "".to_string())
}
}
In => VirtualFilter::In(c.column.clone(), vec![c.value.clone()]),
SimilarTo => {
if let SochValue::Text(pattern) = &c.value {
VirtualFilter::Like(c.column.clone(), pattern.clone())
} else {
VirtualFilter::Like(c.column.clone(), "".to_string())
}
}
}
})
.collect();
match where_clause.operator {
crate::soch_ql::LogicalOp::And => VirtualFilter::And(filters),
crate::soch_ql::LogicalOp::Or => VirtualFilter::Or(filters),
}
}
pub fn matches(&self, row: &VirtualRow, schema: &VirtualTableSchema) -> bool {
match self {
VirtualFilter::Eq(col, value) => row
.get_by_name(col, schema)
.map(|v| v == value)
.unwrap_or(false),
VirtualFilter::Ne(col, value) => row
.get_by_name(col, schema)
.map(|v| v != value)
.unwrap_or(false),
VirtualFilter::Lt(col, value) => {
Self::compare_values(row.get_by_name(col, schema), value, |a, b| a < b)
}
VirtualFilter::Le(col, value) => {
Self::compare_values(row.get_by_name(col, schema), value, |a, b| a <= b)
}
VirtualFilter::Gt(col, value) => {
Self::compare_values(row.get_by_name(col, schema), value, |a, b| a > b)
}
VirtualFilter::Ge(col, value) => {
Self::compare_values(row.get_by_name(col, schema), value, |a, b| a >= b)
}
VirtualFilter::Like(col, pattern) => row
.get_by_name(col, schema)
.and_then(|v| match v {
SochValue::Text(s) => Some(Self::match_like(s, pattern)),
_ => None,
})
.unwrap_or(false),
VirtualFilter::In(col, values) => row
.get_by_name(col, schema)
.map(|v| values.contains(v))
.unwrap_or(false),
VirtualFilter::Between(col, low, high) => row
.get_by_name(col, schema)
.map(|v| {
Self::compare_values(Some(v), low, |a, b| a >= b)
&& Self::compare_values(Some(v), high, |a, b| a <= b)
})
.unwrap_or(false),
VirtualFilter::IsNull(col) => row
.get_by_name(col, schema)
.map(|v| *v == SochValue::Null)
.unwrap_or(true),
VirtualFilter::IsNotNull(col) => row
.get_by_name(col, schema)
.map(|v| *v != SochValue::Null)
.unwrap_or(false),
VirtualFilter::And(filters) => filters.iter().all(|f| f.matches(row, schema)),
VirtualFilter::Or(filters) => filters.iter().any(|f| f.matches(row, schema)),
VirtualFilter::Not(filter) => !filter.matches(row, schema),
}
}
fn compare_values<F>(val: Option<&SochValue>, other: &SochValue, cmp: F) -> bool
where
F: Fn(i64, i64) -> bool,
{
match (val, other) {
(Some(SochValue::Int(a)), SochValue::Int(b)) => cmp(*a, *b),
(Some(SochValue::UInt(a)), SochValue::UInt(b)) => cmp(*a as i64, *b as i64),
(Some(SochValue::Float(a)), SochValue::Float(b)) => {
cmp((*a * 1000.0) as i64, (*b * 1000.0) as i64)
}
_ => false,
}
}
fn match_like(s: &str, pattern: &str) -> bool {
crate::like::like_match(s, pattern)
}
}
#[derive(Debug, Clone, Default)]
pub struct VirtualTableStats {
pub row_count: Option<u64>,
pub size_bytes: Option<u64>,
pub last_modified: Option<u64>,
}
#[derive(Debug, Clone)]
pub enum VirtualTableError {
NotFound(String),
ColumnNotFound(String),
NoPrimaryKey,
PluginError(String),
ScanFailed(String),
InvalidFilter(String),
}
impl std::fmt::Display for VirtualTableError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound(name) => write!(f, "virtual table not found: {}", name),
Self::ColumnNotFound(name) => write!(f, "column not found: {}", name),
Self::NoPrimaryKey => write!(f, "no primary key defined"),
Self::PluginError(msg) => write!(f, "plugin error: {}", msg),
Self::ScanFailed(msg) => write!(f, "scan failed: {}", msg),
Self::InvalidFilter(msg) => write!(f, "invalid filter: {}", msg),
}
}
}
impl std::error::Error for VirtualTableError {}
pub struct PluginVirtualTable {
plugin_name: String,
table_name: String,
schema: VirtualTableSchema,
cache: RwLock<Option<CachedData>>,
cache_ttl_secs: u64,
}
struct CachedData {
rows: Vec<VirtualRow>,
cached_at: std::time::Instant,
}
impl PluginVirtualTable {
pub fn new(plugin_name: &str, table_name: &str, schema: VirtualTableSchema) -> Self {
Self {
plugin_name: plugin_name.to_string(),
table_name: table_name.to_string(),
schema,
cache: RwLock::new(None),
cache_ttl_secs: 60, }
}
pub fn with_cache_ttl(mut self, secs: u64) -> Self {
self.cache_ttl_secs = secs;
self
}
pub fn qualified_name(&self) -> String {
format!("{}.{}", self.plugin_name, self.table_name)
}
fn is_cache_valid(&self) -> bool {
if let Some(cached) = self.cache.read().as_ref() {
cached.cached_at.elapsed().as_secs() < self.cache_ttl_secs
} else {
false
}
}
fn update_cache(&self, rows: Vec<VirtualRow>) {
*self.cache.write() = Some(CachedData {
rows,
cached_at: std::time::Instant::now(),
});
}
}
impl VirtualTable for PluginVirtualTable {
fn schema(&self) -> &VirtualTableSchema {
&self.schema
}
fn scan(
&self,
columns: &[String],
filter: Option<&VirtualFilter>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<VirtualRow>, VirtualTableError> {
if self.is_cache_valid()
&& let Some(cached) = self.cache.read().as_ref()
{
let mut rows = cached.rows.clone();
if let Some(f) = filter {
rows.retain(|r| f.matches(r, &self.schema));
}
if let Some(o) = offset {
rows = rows.into_iter().skip(o).collect();
}
if let Some(l) = limit {
rows.truncate(l);
}
if !columns.is_empty() && columns[0] != "*" {
rows = self.project_columns(&rows, columns);
}
return Ok(rows);
}
let mock_rows = self.generate_mock_data(limit.unwrap_or(100));
self.update_cache(mock_rows.clone());
let mut result = mock_rows;
if let Some(f) = filter {
result.retain(|r| f.matches(r, &self.schema));
}
if let Some(o) = offset {
result = result.into_iter().skip(o).collect();
}
if let Some(l) = limit {
result.truncate(l);
}
Ok(result)
}
fn refresh(&self) -> Result<(), VirtualTableError> {
*self.cache.write() = None;
Ok(())
}
}
impl PluginVirtualTable {
fn project_columns(&self, rows: &[VirtualRow], columns: &[String]) -> Vec<VirtualRow> {
let indices: Vec<usize> = columns
.iter()
.filter_map(|col| self.schema.columns.iter().position(|c| c.name == *col))
.collect();
rows.iter()
.map(|row| {
let values: Vec<SochValue> = indices
.iter()
.map(|&i| row.values.get(i).cloned().unwrap_or(SochValue::Null))
.collect();
VirtualRow::new(values)
})
.collect()
}
fn generate_mock_data(&self, count: usize) -> Vec<VirtualRow> {
(0..count)
.map(|i| {
let values: Vec<SochValue> = self
.schema
.columns
.iter()
.enumerate()
.map(|(col_idx, col)| match col.col_type {
VirtualColumnType::Int64 => SochValue::Int(i as i64 + col_idx as i64),
VirtualColumnType::UInt64 => SochValue::UInt(i as u64 + col_idx as u64),
VirtualColumnType::Float64 => SochValue::Float(i as f64 * 0.1),
VirtualColumnType::Text => SochValue::Text(format!("{}_{}", col.name, i)),
VirtualColumnType::Bool => SochValue::Bool(i % 2 == 0),
_ => SochValue::Null,
})
.collect();
VirtualRow::new(values)
})
.collect()
}
}
pub struct VirtualTableRegistry {
tables: RwLock<HashMap<String, Arc<dyn VirtualTable>>>,
}
impl Default for VirtualTableRegistry {
fn default() -> Self {
Self::new()
}
}
impl VirtualTableRegistry {
pub fn new() -> Self {
Self {
tables: RwLock::new(HashMap::new()),
}
}
pub fn register(
&self,
qualified_name: &str,
table: Arc<dyn VirtualTable>,
) -> Result<(), VirtualTableError> {
let mut tables = self.tables.write();
if tables.contains_key(qualified_name) {
return Err(VirtualTableError::PluginError(format!(
"table '{}' already registered",
qualified_name
)));
}
tables.insert(qualified_name.to_string(), table);
Ok(())
}
pub fn unregister(&self, qualified_name: &str) -> Result<(), VirtualTableError> {
let mut tables = self.tables.write();
if tables.remove(qualified_name).is_none() {
return Err(VirtualTableError::NotFound(qualified_name.to_string()));
}
Ok(())
}
pub fn get(&self, qualified_name: &str) -> Option<Arc<dyn VirtualTable>> {
self.tables.read().get(qualified_name).cloned()
}
pub fn list(&self) -> Vec<String> {
self.tables.read().keys().cloned().collect()
}
pub fn execute_select(&self, query: &SelectQuery) -> Result<SochResult, VirtualTableError> {
let table = self
.get(&query.table)
.ok_or_else(|| VirtualTableError::NotFound(query.table.clone()))?;
let schema = table.schema();
let filter = query
.where_clause
.as_ref()
.map(VirtualFilter::from_where_clause);
let rows = table.scan(&query.columns, filter.as_ref(), query.limit, query.offset)?;
let columns = if query.columns.is_empty() || query.columns[0] == "*" {
schema.columns.iter().map(|c| c.name.clone()).collect()
} else {
query.columns.clone()
};
let result_rows: Vec<Vec<SochValue>> = rows.into_iter().map(|r| r.values).collect();
Ok(SochResult {
table: query.table.clone(),
columns,
rows: result_rows,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_schema() -> VirtualTableSchema {
VirtualTableSchema {
name: "test_table".to_string(),
columns: vec![
VirtualColumnDef {
name: "id".to_string(),
col_type: VirtualColumnType::Int64,
nullable: false,
primary_key: true,
description: None,
},
VirtualColumnDef {
name: "name".to_string(),
col_type: VirtualColumnType::Text,
nullable: false,
primary_key: false,
description: None,
},
VirtualColumnDef {
name: "score".to_string(),
col_type: VirtualColumnType::Float64,
nullable: true,
primary_key: false,
description: None,
},
],
estimated_rows: Some(1000),
description: None,
}
}
#[test]
fn test_plugin_virtual_table_creation() {
let schema = create_test_schema();
let table = PluginVirtualTable::new("test_plugin", "test_table", schema);
assert_eq!(table.qualified_name(), "test_plugin.test_table");
assert_eq!(table.schema().columns.len(), 3);
}
#[test]
fn test_virtual_table_scan() {
let schema = create_test_schema();
let table = PluginVirtualTable::new("plugin", "table", schema);
let rows = table.scan(&[], None, Some(10), None).unwrap();
assert_eq!(rows.len(), 10);
assert_eq!(rows[0].values.len(), 3); }
#[test]
fn test_virtual_table_scan_with_filter() {
let schema = create_test_schema();
let table = PluginVirtualTable::new("plugin", "table", schema.clone());
let filter = VirtualFilter::Gt("id".to_string(), SochValue::Int(5));
let rows = table.scan(&[], Some(&filter), Some(100), None).unwrap();
for row in &rows {
if let Some(SochValue::Int(id)) = row.get_by_name("id", &schema) {
assert!(*id > 5);
}
}
}
#[test]
fn test_virtual_filter_matches() {
let schema = create_test_schema();
let row = VirtualRow::new(vec![
SochValue::Int(42),
SochValue::Text("Alice".to_string()),
SochValue::Float(95.5),
]);
let filter = VirtualFilter::Eq("id".to_string(), SochValue::Int(42));
assert!(filter.matches(&row, &schema));
let filter = VirtualFilter::Like("name".to_string(), "Al%".to_string());
assert!(filter.matches(&row, &schema));
let filter = VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0));
assert!(filter.matches(&row, &schema));
let filter = VirtualFilter::And(vec![
VirtualFilter::Eq("id".to_string(), SochValue::Int(42)),
VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0)),
]);
assert!(filter.matches(&row, &schema));
}
#[test]
fn test_registry_operations() {
let registry = VirtualTableRegistry::new();
let schema = create_test_schema();
let table = Arc::new(PluginVirtualTable::new("plugin", "table", schema));
registry.register("plugin.table", table).unwrap();
assert_eq!(registry.list().len(), 1);
let retrieved = registry.get("plugin.table");
assert!(retrieved.is_some());
registry.unregister("plugin.table").unwrap();
assert!(registry.list().is_empty());
}
#[test]
fn test_registry_execute_select() {
let registry = VirtualTableRegistry::new();
let schema = create_test_schema();
let table = Arc::new(PluginVirtualTable::new("plugin", "data", schema));
registry.register("plugin.data", table).unwrap();
let query = SelectQuery {
columns: vec!["id".to_string(), "name".to_string()],
table: "plugin.data".to_string(),
where_clause: None,
order_by: None,
limit: Some(5),
offset: None,
};
let result = registry.execute_select(&query).unwrap();
assert_eq!(result.table, "plugin.data");
assert_eq!(result.columns, vec!["id", "name"]);
assert_eq!(result.rows.len(), 5);
}
#[test]
fn test_cache_behavior() {
let schema = create_test_schema();
let table = PluginVirtualTable::new("plugin", "cached", schema).with_cache_ttl(1);
let rows1 = table.scan(&[], None, Some(5), None).unwrap();
assert!(table.is_cache_valid());
let rows2 = table.scan(&[], None, Some(5), None).unwrap();
assert_eq!(rows1.len(), rows2.len());
table.refresh().unwrap();
assert!(!table.is_cache_valid());
}
#[test]
fn test_column_projection() {
let schema = create_test_schema();
let table = PluginVirtualTable::new("plugin", "table", schema);
let rows = table
.scan(&["id".to_string(), "name".to_string()], None, Some(5), None)
.unwrap();
assert!(!rows.is_empty());
}
}