Skip to main content

sochdb_query/
plugin_table.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Plugin-as-Table Integration
19//!
20//! This module allows WASM plugins to expose virtual tables that can be
21//! queried using standard SELECT statements.
22//!
23//! ## Example
24//!
25//! A plugin exposing a virtual table:
26//!
27//! ```text
28//! SELECT * FROM plugin_name.table_name WHERE key = 'value' LIMIT 10
29//! ```
30//!
31//! This translates to plugin function calls:
32//!
33//! 1. `describe_table()` - Get schema
34//! 2. `scan_table(filter, limit)` - Get matching rows
35//!
36//! ## Virtual Table Protocol
37//!
38//! Plugins must export:
39//!
40//! - `describe_tables() -> Vec<TableDescriptor>` - List available tables
41//! - `describe_table(name) -> TableSchema` - Get table schema
42//! - `scan_table(name, filter, limit) -> Vec<Row>` - Scan with filter
43//! - `get_row(name, key) -> Option<Row>` - Point lookup (optional)
44
45use crate::soch_ql::{SelectQuery, SochResult, SochValue, WhereClause};
46use parking_lot::RwLock;
47use std::collections::HashMap;
48use std::sync::Arc;
49
50// ============================================================================
51// Virtual Table Trait
52// ============================================================================
53
54/// Column type for virtual tables
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum VirtualColumnType {
57    Bool,
58    Int64,
59    UInt64,
60    Float64,
61    Text,
62    Binary,
63    Timestamp,
64    Json,
65}
66
67/// Column definition for a virtual table
68#[derive(Debug, Clone)]
69pub struct VirtualColumnDef {
70    /// Column name
71    pub name: String,
72    /// Column type
73    pub col_type: VirtualColumnType,
74    /// Is nullable
75    pub nullable: bool,
76    /// Is primary key
77    pub primary_key: bool,
78    /// Description
79    pub description: Option<String>,
80}
81
82/// Schema for a virtual table
83#[derive(Debug, Clone)]
84pub struct VirtualTableSchema {
85    /// Table name
86    pub name: String,
87    /// Column definitions
88    pub columns: Vec<VirtualColumnDef>,
89    /// Estimated row count (for query planning)
90    pub estimated_rows: Option<u64>,
91    /// Description
92    pub description: Option<String>,
93}
94
95/// Virtual table trait
96///
97/// Plugins implement this trait to expose queryable tables.
98pub trait VirtualTable: Send + Sync {
99    /// Get table schema
100    fn schema(&self) -> &VirtualTableSchema;
101
102    /// Scan with optional filter
103    fn scan(
104        &self,
105        columns: &[String],
106        filter: Option<&VirtualFilter>,
107        limit: Option<usize>,
108        offset: Option<usize>,
109    ) -> Result<Vec<VirtualRow>, VirtualTableError>;
110
111    /// Point lookup by primary key
112    fn get(&self, key: &SochValue) -> Result<Option<VirtualRow>, VirtualTableError> {
113        // Default implementation: scan with equality filter on primary key
114        let schema = self.schema();
115        let pk_col = schema
116            .columns
117            .iter()
118            .find(|c| c.primary_key)
119            .map(|c| c.name.clone());
120
121        if let Some(pk) = pk_col {
122            let filter = VirtualFilter::Eq(pk, key.clone());
123            let rows = self.scan(&[], Some(&filter), Some(1), None)?;
124            Ok(rows.into_iter().next())
125        } else {
126            Err(VirtualTableError::NoPrimaryKey)
127        }
128    }
129
130    /// Get statistics for query planning
131    fn stats(&self) -> VirtualTableStats {
132        VirtualTableStats {
133            row_count: self.schema().estimated_rows,
134            size_bytes: None,
135            last_modified: None,
136        }
137    }
138
139    /// Refresh cached data (if applicable)
140    fn refresh(&self) -> Result<(), VirtualTableError> {
141        Ok(()) // Default: no-op
142    }
143}
144
145/// Row from a virtual table
146#[derive(Debug, Clone)]
147pub struct VirtualRow {
148    /// Column values
149    pub values: Vec<SochValue>,
150}
151
152impl VirtualRow {
153    /// Create a new row
154    pub fn new(values: Vec<SochValue>) -> Self {
155        Self { values }
156    }
157
158    /// Get value by index
159    pub fn get(&self, idx: usize) -> Option<&SochValue> {
160        self.values.get(idx)
161    }
162
163    /// Get value by column name (requires schema)
164    pub fn get_by_name<'a>(
165        &'a self,
166        name: &str,
167        schema: &VirtualTableSchema,
168    ) -> Option<&'a SochValue> {
169        schema
170            .columns
171            .iter()
172            .position(|c| c.name == name)
173            .and_then(|idx| self.values.get(idx))
174    }
175}
176
177/// Filter for virtual table scans
178#[derive(Debug, Clone)]
179pub enum VirtualFilter {
180    /// Equality: column = value
181    Eq(String, SochValue),
182    /// Not equal: column != value
183    Ne(String, SochValue),
184    /// Less than: column < value
185    Lt(String, SochValue),
186    /// Less than or equal: column <= value
187    Le(String, SochValue),
188    /// Greater than: column > value
189    Gt(String, SochValue),
190    /// Greater than or equal: column >= value
191    Ge(String, SochValue),
192    /// Like pattern: column LIKE pattern
193    Like(String, String),
194    /// In set: column IN (values)
195    In(String, Vec<SochValue>),
196    /// Between: column BETWEEN low AND high
197    Between(String, SochValue, SochValue),
198    /// Is null: column IS NULL
199    IsNull(String),
200    /// Is not null: column IS NOT NULL
201    IsNotNull(String),
202    /// AND of filters
203    And(Vec<VirtualFilter>),
204    /// OR of filters
205    Or(Vec<VirtualFilter>),
206    /// NOT of filter
207    Not(Box<VirtualFilter>),
208}
209
210impl VirtualFilter {
211    /// Convert from WHERE clause
212    pub fn from_where_clause(where_clause: &WhereClause) -> Self {
213        let filters: Vec<VirtualFilter> = where_clause
214            .conditions
215            .iter()
216            .map(|c| {
217                use crate::soch_ql::ComparisonOp::*;
218                match c.operator {
219                    Eq => VirtualFilter::Eq(c.column.clone(), c.value.clone()),
220                    Ne => VirtualFilter::Ne(c.column.clone(), c.value.clone()),
221                    Lt => VirtualFilter::Lt(c.column.clone(), c.value.clone()),
222                    Le => VirtualFilter::Le(c.column.clone(), c.value.clone()),
223                    Gt => VirtualFilter::Gt(c.column.clone(), c.value.clone()),
224                    Ge => VirtualFilter::Ge(c.column.clone(), c.value.clone()),
225                    Like => {
226                        if let SochValue::Text(pattern) = &c.value {
227                            VirtualFilter::Like(c.column.clone(), pattern.clone())
228                        } else {
229                            VirtualFilter::Like(c.column.clone(), "".to_string())
230                        }
231                    }
232                    In => VirtualFilter::In(c.column.clone(), vec![c.value.clone()]),
233                    SimilarTo => {
234                        // SimilarTo is used for vector similarity search
235                        // For now, we treat it as a Like pattern for virtual tables
236                        if let SochValue::Text(pattern) = &c.value {
237                            VirtualFilter::Like(c.column.clone(), pattern.clone())
238                        } else {
239                            VirtualFilter::Like(c.column.clone(), "".to_string())
240                        }
241                    }
242                }
243            })
244            .collect();
245
246        match where_clause.operator {
247            crate::soch_ql::LogicalOp::And => VirtualFilter::And(filters),
248            crate::soch_ql::LogicalOp::Or => VirtualFilter::Or(filters),
249        }
250    }
251
252    /// Evaluate filter against a row
253    pub fn matches(&self, row: &VirtualRow, schema: &VirtualTableSchema) -> bool {
254        match self {
255            VirtualFilter::Eq(col, value) => row
256                .get_by_name(col, schema)
257                .map(|v| v == value)
258                .unwrap_or(false),
259            VirtualFilter::Ne(col, value) => row
260                .get_by_name(col, schema)
261                .map(|v| v != value)
262                .unwrap_or(false),
263            VirtualFilter::Lt(col, value) => {
264                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a < b)
265            }
266            VirtualFilter::Le(col, value) => {
267                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a <= b)
268            }
269            VirtualFilter::Gt(col, value) => {
270                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a > b)
271            }
272            VirtualFilter::Ge(col, value) => {
273                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a >= b)
274            }
275            VirtualFilter::Like(col, pattern) => row
276                .get_by_name(col, schema)
277                .and_then(|v| match v {
278                    SochValue::Text(s) => Some(Self::match_like(s, pattern)),
279                    _ => None,
280                })
281                .unwrap_or(false),
282            VirtualFilter::In(col, values) => row
283                .get_by_name(col, schema)
284                .map(|v| values.contains(v))
285                .unwrap_or(false),
286            VirtualFilter::Between(col, low, high) => row
287                .get_by_name(col, schema)
288                .map(|v| {
289                    Self::compare_values(Some(v), low, |a, b| a >= b)
290                        && Self::compare_values(Some(v), high, |a, b| a <= b)
291                })
292                .unwrap_or(false),
293            VirtualFilter::IsNull(col) => row
294                .get_by_name(col, schema)
295                .map(|v| *v == SochValue::Null)
296                .unwrap_or(true),
297            VirtualFilter::IsNotNull(col) => row
298                .get_by_name(col, schema)
299                .map(|v| *v != SochValue::Null)
300                .unwrap_or(false),
301            VirtualFilter::And(filters) => filters.iter().all(|f| f.matches(row, schema)),
302            VirtualFilter::Or(filters) => filters.iter().any(|f| f.matches(row, schema)),
303            VirtualFilter::Not(filter) => !filter.matches(row, schema),
304        }
305    }
306
307    fn compare_values<F>(val: Option<&SochValue>, other: &SochValue, cmp: F) -> bool
308    where
309        F: Fn(i64, i64) -> bool,
310    {
311        match (val, other) {
312            (Some(SochValue::Int(a)), SochValue::Int(b)) => cmp(*a, *b),
313            (Some(SochValue::UInt(a)), SochValue::UInt(b)) => cmp(*a as i64, *b as i64),
314            (Some(SochValue::Float(a)), SochValue::Float(b)) => {
315                cmp((*a * 1000.0) as i64, (*b * 1000.0) as i64)
316            }
317            _ => false,
318        }
319    }
320
321    fn match_like(s: &str, pattern: &str) -> bool {
322        // Simple LIKE implementation
323        if pattern.starts_with('%') && pattern.ends_with('%') {
324            let inner = &pattern[1..pattern.len() - 1];
325            s.contains(inner)
326        } else if let Some(suffix) = pattern.strip_prefix('%') {
327            s.ends_with(suffix)
328        } else if let Some(prefix) = pattern.strip_suffix('%') {
329            s.starts_with(prefix)
330        } else {
331            s == pattern
332        }
333    }
334}
335
336/// Virtual table statistics
337#[derive(Debug, Clone, Default)]
338pub struct VirtualTableStats {
339    /// Estimated row count
340    pub row_count: Option<u64>,
341    /// Estimated size in bytes
342    pub size_bytes: Option<u64>,
343    /// Last modification timestamp
344    pub last_modified: Option<u64>,
345}
346
347/// Virtual table error
348#[derive(Debug, Clone)]
349pub enum VirtualTableError {
350    /// Table not found
351    NotFound(String),
352    /// Column not found
353    ColumnNotFound(String),
354    /// No primary key defined
355    NoPrimaryKey,
356    /// Plugin error
357    PluginError(String),
358    /// Scan failed
359    ScanFailed(String),
360    /// Invalid filter
361    InvalidFilter(String),
362}
363
364impl std::fmt::Display for VirtualTableError {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        match self {
367            Self::NotFound(name) => write!(f, "virtual table not found: {}", name),
368            Self::ColumnNotFound(name) => write!(f, "column not found: {}", name),
369            Self::NoPrimaryKey => write!(f, "no primary key defined"),
370            Self::PluginError(msg) => write!(f, "plugin error: {}", msg),
371            Self::ScanFailed(msg) => write!(f, "scan failed: {}", msg),
372            Self::InvalidFilter(msg) => write!(f, "invalid filter: {}", msg),
373        }
374    }
375}
376
377impl std::error::Error for VirtualTableError {}
378
379// ============================================================================
380// Plugin Virtual Table Wrapper
381// ============================================================================
382
383/// Virtual table backed by a WASM plugin
384pub struct PluginVirtualTable {
385    /// Plugin name
386    plugin_name: String,
387    /// Table name
388    table_name: String,
389    /// Cached schema
390    schema: VirtualTableSchema,
391    /// Cached rows (optional)
392    cache: RwLock<Option<CachedData>>,
393    /// Cache TTL in seconds
394    cache_ttl_secs: u64,
395}
396
397/// Cached table data
398struct CachedData {
399    rows: Vec<VirtualRow>,
400    cached_at: std::time::Instant,
401}
402
403impl PluginVirtualTable {
404    /// Create a new plugin-backed virtual table
405    pub fn new(plugin_name: &str, table_name: &str, schema: VirtualTableSchema) -> Self {
406        Self {
407            plugin_name: plugin_name.to_string(),
408            table_name: table_name.to_string(),
409            schema,
410            cache: RwLock::new(None),
411            cache_ttl_secs: 60, // 1 minute default
412        }
413    }
414
415    /// Set cache TTL
416    pub fn with_cache_ttl(mut self, secs: u64) -> Self {
417        self.cache_ttl_secs = secs;
418        self
419    }
420
421    /// Get the fully qualified table name
422    pub fn qualified_name(&self) -> String {
423        format!("{}.{}", self.plugin_name, self.table_name)
424    }
425
426    /// Check if cache is valid
427    fn is_cache_valid(&self) -> bool {
428        if let Some(cached) = self.cache.read().as_ref() {
429            cached.cached_at.elapsed().as_secs() < self.cache_ttl_secs
430        } else {
431            false
432        }
433    }
434
435    /// Update cache
436    fn update_cache(&self, rows: Vec<VirtualRow>) {
437        *self.cache.write() = Some(CachedData {
438            rows,
439            cached_at: std::time::Instant::now(),
440        });
441    }
442}
443
444impl VirtualTable for PluginVirtualTable {
445    fn schema(&self) -> &VirtualTableSchema {
446        &self.schema
447    }
448
449    fn scan(
450        &self,
451        columns: &[String],
452        filter: Option<&VirtualFilter>,
453        limit: Option<usize>,
454        offset: Option<usize>,
455    ) -> Result<Vec<VirtualRow>, VirtualTableError> {
456        // Check cache first
457        if self.is_cache_valid()
458            && let Some(cached) = self.cache.read().as_ref()
459        {
460            let mut rows = cached.rows.clone();
461
462            // Apply filter
463            if let Some(f) = filter {
464                rows.retain(|r| f.matches(r, &self.schema));
465            }
466
467            // Apply offset
468            if let Some(o) = offset {
469                rows = rows.into_iter().skip(o).collect();
470            }
471
472            // Apply limit
473            if let Some(l) = limit {
474                rows.truncate(l);
475            }
476
477            // Project columns
478            if !columns.is_empty() && columns[0] != "*" {
479                rows = self.project_columns(&rows, columns);
480            }
481
482            return Ok(rows);
483        }
484
485        // In production, this would call the plugin's scan_table function
486        // For now, return mock data
487        let mock_rows = self.generate_mock_data(limit.unwrap_or(100));
488
489        // Update cache
490        self.update_cache(mock_rows.clone());
491
492        // Apply filter to mock data
493        let mut result = mock_rows;
494        if let Some(f) = filter {
495            result.retain(|r| f.matches(r, &self.schema));
496        }
497
498        if let Some(o) = offset {
499            result = result.into_iter().skip(o).collect();
500        }
501
502        if let Some(l) = limit {
503            result.truncate(l);
504        }
505
506        Ok(result)
507    }
508
509    fn refresh(&self) -> Result<(), VirtualTableError> {
510        // Clear cache
511        *self.cache.write() = None;
512        Ok(())
513    }
514}
515
516impl PluginVirtualTable {
517    /// Project only requested columns
518    fn project_columns(&self, rows: &[VirtualRow], columns: &[String]) -> Vec<VirtualRow> {
519        let indices: Vec<usize> = columns
520            .iter()
521            .filter_map(|col| self.schema.columns.iter().position(|c| c.name == *col))
522            .collect();
523
524        rows.iter()
525            .map(|row| {
526                let values: Vec<SochValue> = indices
527                    .iter()
528                    .map(|&i| row.values.get(i).cloned().unwrap_or(SochValue::Null))
529                    .collect();
530                VirtualRow::new(values)
531            })
532            .collect()
533    }
534
535    /// Generate mock data (for demonstration)
536    fn generate_mock_data(&self, count: usize) -> Vec<VirtualRow> {
537        (0..count)
538            .map(|i| {
539                let values: Vec<SochValue> = self
540                    .schema
541                    .columns
542                    .iter()
543                    .enumerate()
544                    .map(|(col_idx, col)| match col.col_type {
545                        VirtualColumnType::Int64 => SochValue::Int(i as i64 + col_idx as i64),
546                        VirtualColumnType::UInt64 => SochValue::UInt(i as u64 + col_idx as u64),
547                        VirtualColumnType::Float64 => SochValue::Float(i as f64 * 0.1),
548                        VirtualColumnType::Text => SochValue::Text(format!("{}_{}", col.name, i)),
549                        VirtualColumnType::Bool => SochValue::Bool(i % 2 == 0),
550                        _ => SochValue::Null,
551                    })
552                    .collect();
553                VirtualRow::new(values)
554            })
555            .collect()
556    }
557}
558
559// ============================================================================
560// Virtual Table Registry
561// ============================================================================
562
563/// Registry for virtual tables
564pub struct VirtualTableRegistry {
565    /// Tables by qualified name (plugin.table)
566    tables: RwLock<HashMap<String, Arc<dyn VirtualTable>>>,
567}
568
569impl Default for VirtualTableRegistry {
570    fn default() -> Self {
571        Self::new()
572    }
573}
574
575impl VirtualTableRegistry {
576    /// Create a new registry
577    pub fn new() -> Self {
578        Self {
579            tables: RwLock::new(HashMap::new()),
580        }
581    }
582
583    /// Register a virtual table
584    pub fn register(
585        &self,
586        qualified_name: &str,
587        table: Arc<dyn VirtualTable>,
588    ) -> Result<(), VirtualTableError> {
589        let mut tables = self.tables.write();
590
591        if tables.contains_key(qualified_name) {
592            return Err(VirtualTableError::PluginError(format!(
593                "table '{}' already registered",
594                qualified_name
595            )));
596        }
597
598        tables.insert(qualified_name.to_string(), table);
599        Ok(())
600    }
601
602    /// Unregister a virtual table
603    pub fn unregister(&self, qualified_name: &str) -> Result<(), VirtualTableError> {
604        let mut tables = self.tables.write();
605
606        if tables.remove(qualified_name).is_none() {
607            return Err(VirtualTableError::NotFound(qualified_name.to_string()));
608        }
609
610        Ok(())
611    }
612
613    /// Get a virtual table
614    pub fn get(&self, qualified_name: &str) -> Option<Arc<dyn VirtualTable>> {
615        self.tables.read().get(qualified_name).cloned()
616    }
617
618    /// List all registered tables
619    pub fn list(&self) -> Vec<String> {
620        self.tables.read().keys().cloned().collect()
621    }
622
623    /// Execute a SELECT query on a virtual table
624    pub fn execute_select(&self, query: &SelectQuery) -> Result<SochResult, VirtualTableError> {
625        let table = self
626            .get(&query.table)
627            .ok_or_else(|| VirtualTableError::NotFound(query.table.clone()))?;
628
629        let schema = table.schema();
630
631        // Convert WHERE clause to filter
632        let filter = query
633            .where_clause
634            .as_ref()
635            .map(VirtualFilter::from_where_clause);
636
637        // Scan table
638        let rows = table.scan(&query.columns, filter.as_ref(), query.limit, query.offset)?;
639
640        // Convert to SochResult
641        let columns = if query.columns.is_empty() || query.columns[0] == "*" {
642            schema.columns.iter().map(|c| c.name.clone()).collect()
643        } else {
644            query.columns.clone()
645        };
646
647        let result_rows: Vec<Vec<SochValue>> = rows.into_iter().map(|r| r.values).collect();
648
649        Ok(SochResult {
650            table: query.table.clone(),
651            columns,
652            rows: result_rows,
653        })
654    }
655}
656
657// ============================================================================
658// Tests
659// ============================================================================
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664
665    fn create_test_schema() -> VirtualTableSchema {
666        VirtualTableSchema {
667            name: "test_table".to_string(),
668            columns: vec![
669                VirtualColumnDef {
670                    name: "id".to_string(),
671                    col_type: VirtualColumnType::Int64,
672                    nullable: false,
673                    primary_key: true,
674                    description: None,
675                },
676                VirtualColumnDef {
677                    name: "name".to_string(),
678                    col_type: VirtualColumnType::Text,
679                    nullable: false,
680                    primary_key: false,
681                    description: None,
682                },
683                VirtualColumnDef {
684                    name: "score".to_string(),
685                    col_type: VirtualColumnType::Float64,
686                    nullable: true,
687                    primary_key: false,
688                    description: None,
689                },
690            ],
691            estimated_rows: Some(1000),
692            description: None,
693        }
694    }
695
696    #[test]
697    fn test_plugin_virtual_table_creation() {
698        let schema = create_test_schema();
699        let table = PluginVirtualTable::new("test_plugin", "test_table", schema);
700
701        assert_eq!(table.qualified_name(), "test_plugin.test_table");
702        assert_eq!(table.schema().columns.len(), 3);
703    }
704
705    #[test]
706    fn test_virtual_table_scan() {
707        let schema = create_test_schema();
708        let table = PluginVirtualTable::new("plugin", "table", schema);
709
710        let rows = table.scan(&[], None, Some(10), None).unwrap();
711
712        assert_eq!(rows.len(), 10);
713        assert_eq!(rows[0].values.len(), 3); // 3 columns
714    }
715
716    #[test]
717    fn test_virtual_table_scan_with_filter() {
718        let schema = create_test_schema();
719        let table = PluginVirtualTable::new("plugin", "table", schema.clone());
720
721        let filter = VirtualFilter::Gt("id".to_string(), SochValue::Int(5));
722        let rows = table.scan(&[], Some(&filter), Some(100), None).unwrap();
723
724        // All rows should have id > 5
725        for row in &rows {
726            if let Some(SochValue::Int(id)) = row.get_by_name("id", &schema) {
727                assert!(*id > 5);
728            }
729        }
730    }
731
732    #[test]
733    fn test_virtual_filter_matches() {
734        let schema = create_test_schema();
735        let row = VirtualRow::new(vec![
736            SochValue::Int(42),
737            SochValue::Text("Alice".to_string()),
738            SochValue::Float(95.5),
739        ]);
740
741        // Equality filter
742        let filter = VirtualFilter::Eq("id".to_string(), SochValue::Int(42));
743        assert!(filter.matches(&row, &schema));
744
745        // Like filter
746        let filter = VirtualFilter::Like("name".to_string(), "Al%".to_string());
747        assert!(filter.matches(&row, &schema));
748
749        // Greater than
750        let filter = VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0));
751        assert!(filter.matches(&row, &schema));
752
753        // AND filter
754        let filter = VirtualFilter::And(vec![
755            VirtualFilter::Eq("id".to_string(), SochValue::Int(42)),
756            VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0)),
757        ]);
758        assert!(filter.matches(&row, &schema));
759    }
760
761    #[test]
762    fn test_registry_operations() {
763        let registry = VirtualTableRegistry::new();
764        let schema = create_test_schema();
765
766        let table = Arc::new(PluginVirtualTable::new("plugin", "table", schema));
767
768        // Register
769        registry.register("plugin.table", table).unwrap();
770        assert_eq!(registry.list().len(), 1);
771
772        // Get
773        let retrieved = registry.get("plugin.table");
774        assert!(retrieved.is_some());
775
776        // Unregister
777        registry.unregister("plugin.table").unwrap();
778        assert!(registry.list().is_empty());
779    }
780
781    #[test]
782    fn test_registry_execute_select() {
783        let registry = VirtualTableRegistry::new();
784        let schema = create_test_schema();
785
786        let table = Arc::new(PluginVirtualTable::new("plugin", "data", schema));
787        registry.register("plugin.data", table).unwrap();
788
789        let query = SelectQuery {
790            columns: vec!["id".to_string(), "name".to_string()],
791            table: "plugin.data".to_string(),
792            where_clause: None,
793            order_by: None,
794            limit: Some(5),
795            offset: None,
796        };
797
798        let result = registry.execute_select(&query).unwrap();
799
800        assert_eq!(result.table, "plugin.data");
801        assert_eq!(result.columns, vec!["id", "name"]);
802        assert_eq!(result.rows.len(), 5);
803    }
804
805    #[test]
806    fn test_cache_behavior() {
807        let schema = create_test_schema();
808        let table = PluginVirtualTable::new("plugin", "cached", schema).with_cache_ttl(1); // 1 second TTL
809
810        // First scan populates cache
811        let rows1 = table.scan(&[], None, Some(5), None).unwrap();
812        assert!(table.is_cache_valid());
813
814        // Second scan uses cache
815        let rows2 = table.scan(&[], None, Some(5), None).unwrap();
816        assert_eq!(rows1.len(), rows2.len());
817
818        // Refresh clears cache
819        table.refresh().unwrap();
820        assert!(!table.is_cache_valid());
821    }
822
823    #[test]
824    fn test_column_projection() {
825        let schema = create_test_schema();
826        let table = PluginVirtualTable::new("plugin", "table", schema);
827
828        // Request only id and name
829        let rows = table
830            .scan(&["id".to_string(), "name".to_string()], None, Some(5), None)
831            .unwrap();
832
833        // Should still have all values (projection happens at registry level)
834        // In a real implementation, the plugin would handle projection
835        assert!(!rows.is_empty());
836    }
837}