Skip to main content

sochdb_query/
plugin_table.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Plugin-as-Table Integration
19//!
20//! This module allows WASM plugins to expose virtual tables that can be
21//! queried using standard SELECT statements.
22//!
23//! ## Example
24//!
25//! A plugin exposing a virtual table:
26//!
27//! ```text
28//! SELECT * FROM plugin_name.table_name WHERE key = 'value' LIMIT 10
29//! ```
30//!
31//! This translates to plugin function calls:
32//!
33//! 1. `describe_table()` - Get schema
34//! 2. `scan_table(filter, limit)` - Get matching rows
35//!
36//! ## Virtual Table Protocol
37//!
38//! Plugins must export:
39//!
40//! - `describe_tables() -> Vec<TableDescriptor>` - List available tables
41//! - `describe_table(name) -> TableSchema` - Get table schema
42//! - `scan_table(name, filter, limit) -> Vec<Row>` - Scan with filter
43//! - `get_row(name, key) -> Option<Row>` - Point lookup (optional)
44
45use crate::soch_ql::{SelectQuery, SochResult, SochValue, WhereClause};
46use parking_lot::RwLock;
47use std::collections::HashMap;
48use std::sync::Arc;
49
50// ============================================================================
51// Virtual Table Trait
52// ============================================================================
53
54/// Column type for virtual tables
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum VirtualColumnType {
57    Bool,
58    Int64,
59    UInt64,
60    Float64,
61    Text,
62    Binary,
63    Timestamp,
64    Json,
65}
66
67/// Column definition for a virtual table
68#[derive(Debug, Clone)]
69pub struct VirtualColumnDef {
70    /// Column name
71    pub name: String,
72    /// Column type
73    pub col_type: VirtualColumnType,
74    /// Is nullable
75    pub nullable: bool,
76    /// Is primary key
77    pub primary_key: bool,
78    /// Description
79    pub description: Option<String>,
80}
81
82/// Schema for a virtual table
83#[derive(Debug, Clone)]
84pub struct VirtualTableSchema {
85    /// Table name
86    pub name: String,
87    /// Column definitions
88    pub columns: Vec<VirtualColumnDef>,
89    /// Estimated row count (for query planning)
90    pub estimated_rows: Option<u64>,
91    /// Description
92    pub description: Option<String>,
93}
94
95/// Virtual table trait
96///
97/// Plugins implement this trait to expose queryable tables.
98pub trait VirtualTable: Send + Sync {
99    /// Get table schema
100    fn schema(&self) -> &VirtualTableSchema;
101
102    /// Scan with optional filter
103    fn scan(
104        &self,
105        columns: &[String],
106        filter: Option<&VirtualFilter>,
107        limit: Option<usize>,
108        offset: Option<usize>,
109    ) -> Result<Vec<VirtualRow>, VirtualTableError>;
110
111    /// Point lookup by primary key
112    fn get(&self, key: &SochValue) -> Result<Option<VirtualRow>, VirtualTableError> {
113        // Default implementation: scan with equality filter on primary key
114        let schema = self.schema();
115        let pk_col = schema
116            .columns
117            .iter()
118            .find(|c| c.primary_key)
119            .map(|c| c.name.clone());
120
121        if let Some(pk) = pk_col {
122            let filter = VirtualFilter::Eq(pk, key.clone());
123            let rows = self.scan(&[], Some(&filter), Some(1), None)?;
124            Ok(rows.into_iter().next())
125        } else {
126            Err(VirtualTableError::NoPrimaryKey)
127        }
128    }
129
130    /// Get statistics for query planning
131    fn stats(&self) -> VirtualTableStats {
132        VirtualTableStats {
133            row_count: self.schema().estimated_rows,
134            size_bytes: None,
135            last_modified: None,
136        }
137    }
138
139    /// Refresh cached data (if applicable)
140    fn refresh(&self) -> Result<(), VirtualTableError> {
141        Ok(()) // Default: no-op
142    }
143}
144
145/// Row from a virtual table
146#[derive(Debug, Clone)]
147pub struct VirtualRow {
148    /// Column values
149    pub values: Vec<SochValue>,
150}
151
152impl VirtualRow {
153    /// Create a new row
154    pub fn new(values: Vec<SochValue>) -> Self {
155        Self { values }
156    }
157
158    /// Get value by index
159    pub fn get(&self, idx: usize) -> Option<&SochValue> {
160        self.values.get(idx)
161    }
162
163    /// Get value by column name (requires schema)
164    pub fn get_by_name<'a>(
165        &'a self,
166        name: &str,
167        schema: &VirtualTableSchema,
168    ) -> Option<&'a SochValue> {
169        schema
170            .columns
171            .iter()
172            .position(|c| c.name == name)
173            .and_then(|idx| self.values.get(idx))
174    }
175}
176
177/// Filter for virtual table scans
178#[derive(Debug, Clone)]
179pub enum VirtualFilter {
180    /// Equality: column = value
181    Eq(String, SochValue),
182    /// Not equal: column != value
183    Ne(String, SochValue),
184    /// Less than: column < value
185    Lt(String, SochValue),
186    /// Less than or equal: column <= value
187    Le(String, SochValue),
188    /// Greater than: column > value
189    Gt(String, SochValue),
190    /// Greater than or equal: column >= value
191    Ge(String, SochValue),
192    /// Like pattern: column LIKE pattern
193    Like(String, String),
194    /// In set: column IN (values)
195    In(String, Vec<SochValue>),
196    /// Between: column BETWEEN low AND high
197    Between(String, SochValue, SochValue),
198    /// Is null: column IS NULL
199    IsNull(String),
200    /// Is not null: column IS NOT NULL
201    IsNotNull(String),
202    /// AND of filters
203    And(Vec<VirtualFilter>),
204    /// OR of filters
205    Or(Vec<VirtualFilter>),
206    /// NOT of filter
207    Not(Box<VirtualFilter>),
208}
209
210impl VirtualFilter {
211    /// Convert from WHERE clause
212    pub fn from_where_clause(where_clause: &WhereClause) -> Self {
213        let filters: Vec<VirtualFilter> = where_clause
214            .conditions
215            .iter()
216            .map(|c| {
217                use crate::soch_ql::ComparisonOp::*;
218                match c.operator {
219                    Eq => VirtualFilter::Eq(c.column.clone(), c.value.clone()),
220                    Ne => VirtualFilter::Ne(c.column.clone(), c.value.clone()),
221                    Lt => VirtualFilter::Lt(c.column.clone(), c.value.clone()),
222                    Le => VirtualFilter::Le(c.column.clone(), c.value.clone()),
223                    Gt => VirtualFilter::Gt(c.column.clone(), c.value.clone()),
224                    Ge => VirtualFilter::Ge(c.column.clone(), c.value.clone()),
225                    Like => {
226                        if let SochValue::Text(pattern) = &c.value {
227                            VirtualFilter::Like(c.column.clone(), pattern.clone())
228                        } else {
229                            VirtualFilter::Like(c.column.clone(), "".to_string())
230                        }
231                    }
232                    In => VirtualFilter::In(c.column.clone(), vec![c.value.clone()]),
233                    SimilarTo => {
234                        // SimilarTo is used for vector similarity search
235                        // For now, we treat it as a Like pattern for virtual tables
236                        if let SochValue::Text(pattern) = &c.value {
237                            VirtualFilter::Like(c.column.clone(), pattern.clone())
238                        } else {
239                            VirtualFilter::Like(c.column.clone(), "".to_string())
240                        }
241                    }
242                }
243            })
244            .collect();
245
246        match where_clause.operator {
247            crate::soch_ql::LogicalOp::And => VirtualFilter::And(filters),
248            crate::soch_ql::LogicalOp::Or => VirtualFilter::Or(filters),
249        }
250    }
251
252    /// Evaluate filter against a row
253    pub fn matches(&self, row: &VirtualRow, schema: &VirtualTableSchema) -> bool {
254        match self {
255            VirtualFilter::Eq(col, value) => row
256                .get_by_name(col, schema)
257                .map(|v| v == value)
258                .unwrap_or(false),
259            VirtualFilter::Ne(col, value) => row
260                .get_by_name(col, schema)
261                .map(|v| v != value)
262                .unwrap_or(false),
263            VirtualFilter::Lt(col, value) => {
264                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a < b)
265            }
266            VirtualFilter::Le(col, value) => {
267                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a <= b)
268            }
269            VirtualFilter::Gt(col, value) => {
270                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a > b)
271            }
272            VirtualFilter::Ge(col, value) => {
273                Self::compare_values(row.get_by_name(col, schema), value, |a, b| a >= b)
274            }
275            VirtualFilter::Like(col, pattern) => row
276                .get_by_name(col, schema)
277                .and_then(|v| match v {
278                    SochValue::Text(s) => Some(Self::match_like(s, pattern)),
279                    _ => None,
280                })
281                .unwrap_or(false),
282            VirtualFilter::In(col, values) => row
283                .get_by_name(col, schema)
284                .map(|v| values.contains(v))
285                .unwrap_or(false),
286            VirtualFilter::Between(col, low, high) => row
287                .get_by_name(col, schema)
288                .map(|v| {
289                    Self::compare_values(Some(v), low, |a, b| a >= b)
290                        && Self::compare_values(Some(v), high, |a, b| a <= b)
291                })
292                .unwrap_or(false),
293            VirtualFilter::IsNull(col) => row
294                .get_by_name(col, schema)
295                .map(|v| *v == SochValue::Null)
296                .unwrap_or(true),
297            VirtualFilter::IsNotNull(col) => row
298                .get_by_name(col, schema)
299                .map(|v| *v != SochValue::Null)
300                .unwrap_or(false),
301            VirtualFilter::And(filters) => filters.iter().all(|f| f.matches(row, schema)),
302            VirtualFilter::Or(filters) => filters.iter().any(|f| f.matches(row, schema)),
303            VirtualFilter::Not(filter) => !filter.matches(row, schema),
304        }
305    }
306
307    fn compare_values<F>(val: Option<&SochValue>, other: &SochValue, cmp: F) -> bool
308    where
309        F: Fn(i64, i64) -> bool,
310    {
311        match (val, other) {
312            (Some(SochValue::Int(a)), SochValue::Int(b)) => cmp(*a, *b),
313            (Some(SochValue::UInt(a)), SochValue::UInt(b)) => cmp(*a as i64, *b as i64),
314            (Some(SochValue::Float(a)), SochValue::Float(b)) => {
315                cmp((*a * 1000.0) as i64, (*b * 1000.0) as i64)
316            }
317            _ => false,
318        }
319    }
320
321    fn match_like(s: &str, pattern: &str) -> bool {
322        // Canonical LIKE matcher (shared with scan / filter pushdown / eval paths).
323        crate::like::like_match(s, pattern)
324    }
325}
326
327/// Virtual table statistics
328#[derive(Debug, Clone, Default)]
329pub struct VirtualTableStats {
330    /// Estimated row count
331    pub row_count: Option<u64>,
332    /// Estimated size in bytes
333    pub size_bytes: Option<u64>,
334    /// Last modification timestamp
335    pub last_modified: Option<u64>,
336}
337
338/// Virtual table error
339#[derive(Debug, Clone)]
340pub enum VirtualTableError {
341    /// Table not found
342    NotFound(String),
343    /// Column not found
344    ColumnNotFound(String),
345    /// No primary key defined
346    NoPrimaryKey,
347    /// Plugin error
348    PluginError(String),
349    /// Scan failed
350    ScanFailed(String),
351    /// Invalid filter
352    InvalidFilter(String),
353}
354
355impl std::fmt::Display for VirtualTableError {
356    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357        match self {
358            Self::NotFound(name) => write!(f, "virtual table not found: {}", name),
359            Self::ColumnNotFound(name) => write!(f, "column not found: {}", name),
360            Self::NoPrimaryKey => write!(f, "no primary key defined"),
361            Self::PluginError(msg) => write!(f, "plugin error: {}", msg),
362            Self::ScanFailed(msg) => write!(f, "scan failed: {}", msg),
363            Self::InvalidFilter(msg) => write!(f, "invalid filter: {}", msg),
364        }
365    }
366}
367
368impl std::error::Error for VirtualTableError {}
369
370// ============================================================================
371// Plugin Virtual Table Wrapper
372// ============================================================================
373
374/// Virtual table backed by a WASM plugin
375pub struct PluginVirtualTable {
376    /// Plugin name
377    plugin_name: String,
378    /// Table name
379    table_name: String,
380    /// Cached schema
381    schema: VirtualTableSchema,
382    /// Cached rows (optional)
383    cache: RwLock<Option<CachedData>>,
384    /// Cache TTL in seconds
385    cache_ttl_secs: u64,
386}
387
388/// Cached table data
389struct CachedData {
390    rows: Vec<VirtualRow>,
391    cached_at: std::time::Instant,
392}
393
394impl PluginVirtualTable {
395    /// Create a new plugin-backed virtual table
396    pub fn new(plugin_name: &str, table_name: &str, schema: VirtualTableSchema) -> Self {
397        Self {
398            plugin_name: plugin_name.to_string(),
399            table_name: table_name.to_string(),
400            schema,
401            cache: RwLock::new(None),
402            cache_ttl_secs: 60, // 1 minute default
403        }
404    }
405
406    /// Set cache TTL
407    pub fn with_cache_ttl(mut self, secs: u64) -> Self {
408        self.cache_ttl_secs = secs;
409        self
410    }
411
412    /// Get the fully qualified table name
413    pub fn qualified_name(&self) -> String {
414        format!("{}.{}", self.plugin_name, self.table_name)
415    }
416
417    /// Check if cache is valid
418    fn is_cache_valid(&self) -> bool {
419        if let Some(cached) = self.cache.read().as_ref() {
420            cached.cached_at.elapsed().as_secs() < self.cache_ttl_secs
421        } else {
422            false
423        }
424    }
425
426    /// Update cache
427    fn update_cache(&self, rows: Vec<VirtualRow>) {
428        *self.cache.write() = Some(CachedData {
429            rows,
430            cached_at: std::time::Instant::now(),
431        });
432    }
433}
434
435impl VirtualTable for PluginVirtualTable {
436    fn schema(&self) -> &VirtualTableSchema {
437        &self.schema
438    }
439
440    fn scan(
441        &self,
442        columns: &[String],
443        filter: Option<&VirtualFilter>,
444        limit: Option<usize>,
445        offset: Option<usize>,
446    ) -> Result<Vec<VirtualRow>, VirtualTableError> {
447        // Check cache first
448        if self.is_cache_valid()
449            && let Some(cached) = self.cache.read().as_ref()
450        {
451            let mut rows = cached.rows.clone();
452
453            // Apply filter
454            if let Some(f) = filter {
455                rows.retain(|r| f.matches(r, &self.schema));
456            }
457
458            // Apply offset
459            if let Some(o) = offset {
460                rows = rows.into_iter().skip(o).collect();
461            }
462
463            // Apply limit
464            if let Some(l) = limit {
465                rows.truncate(l);
466            }
467
468            // Project columns
469            if !columns.is_empty() && columns[0] != "*" {
470                rows = self.project_columns(&rows, columns);
471            }
472
473            return Ok(rows);
474        }
475
476        // In production, this would call the plugin's scan_table function
477        // For now, return mock data
478        let mock_rows = self.generate_mock_data(limit.unwrap_or(100));
479
480        // Update cache
481        self.update_cache(mock_rows.clone());
482
483        // Apply filter to mock data
484        let mut result = mock_rows;
485        if let Some(f) = filter {
486            result.retain(|r| f.matches(r, &self.schema));
487        }
488
489        if let Some(o) = offset {
490            result = result.into_iter().skip(o).collect();
491        }
492
493        if let Some(l) = limit {
494            result.truncate(l);
495        }
496
497        Ok(result)
498    }
499
500    fn refresh(&self) -> Result<(), VirtualTableError> {
501        // Clear cache
502        *self.cache.write() = None;
503        Ok(())
504    }
505}
506
507impl PluginVirtualTable {
508    /// Project only requested columns
509    fn project_columns(&self, rows: &[VirtualRow], columns: &[String]) -> Vec<VirtualRow> {
510        let indices: Vec<usize> = columns
511            .iter()
512            .filter_map(|col| self.schema.columns.iter().position(|c| c.name == *col))
513            .collect();
514
515        rows.iter()
516            .map(|row| {
517                let values: Vec<SochValue> = indices
518                    .iter()
519                    .map(|&i| row.values.get(i).cloned().unwrap_or(SochValue::Null))
520                    .collect();
521                VirtualRow::new(values)
522            })
523            .collect()
524    }
525
526    /// Generate mock data (for demonstration)
527    fn generate_mock_data(&self, count: usize) -> Vec<VirtualRow> {
528        (0..count)
529            .map(|i| {
530                let values: Vec<SochValue> = self
531                    .schema
532                    .columns
533                    .iter()
534                    .enumerate()
535                    .map(|(col_idx, col)| match col.col_type {
536                        VirtualColumnType::Int64 => SochValue::Int(i as i64 + col_idx as i64),
537                        VirtualColumnType::UInt64 => SochValue::UInt(i as u64 + col_idx as u64),
538                        VirtualColumnType::Float64 => SochValue::Float(i as f64 * 0.1),
539                        VirtualColumnType::Text => SochValue::Text(format!("{}_{}", col.name, i)),
540                        VirtualColumnType::Bool => SochValue::Bool(i % 2 == 0),
541                        _ => SochValue::Null,
542                    })
543                    .collect();
544                VirtualRow::new(values)
545            })
546            .collect()
547    }
548}
549
550// ============================================================================
551// Virtual Table Registry
552// ============================================================================
553
554/// Registry for virtual tables
555pub struct VirtualTableRegistry {
556    /// Tables by qualified name (plugin.table)
557    tables: RwLock<HashMap<String, Arc<dyn VirtualTable>>>,
558}
559
560impl Default for VirtualTableRegistry {
561    fn default() -> Self {
562        Self::new()
563    }
564}
565
566impl VirtualTableRegistry {
567    /// Create a new registry
568    pub fn new() -> Self {
569        Self {
570            tables: RwLock::new(HashMap::new()),
571        }
572    }
573
574    /// Register a virtual table
575    pub fn register(
576        &self,
577        qualified_name: &str,
578        table: Arc<dyn VirtualTable>,
579    ) -> Result<(), VirtualTableError> {
580        let mut tables = self.tables.write();
581
582        if tables.contains_key(qualified_name) {
583            return Err(VirtualTableError::PluginError(format!(
584                "table '{}' already registered",
585                qualified_name
586            )));
587        }
588
589        tables.insert(qualified_name.to_string(), table);
590        Ok(())
591    }
592
593    /// Unregister a virtual table
594    pub fn unregister(&self, qualified_name: &str) -> Result<(), VirtualTableError> {
595        let mut tables = self.tables.write();
596
597        if tables.remove(qualified_name).is_none() {
598            return Err(VirtualTableError::NotFound(qualified_name.to_string()));
599        }
600
601        Ok(())
602    }
603
604    /// Get a virtual table
605    pub fn get(&self, qualified_name: &str) -> Option<Arc<dyn VirtualTable>> {
606        self.tables.read().get(qualified_name).cloned()
607    }
608
609    /// List all registered tables
610    pub fn list(&self) -> Vec<String> {
611        self.tables.read().keys().cloned().collect()
612    }
613
614    /// Execute a SELECT query on a virtual table
615    pub fn execute_select(&self, query: &SelectQuery) -> Result<SochResult, VirtualTableError> {
616        let table = self
617            .get(&query.table)
618            .ok_or_else(|| VirtualTableError::NotFound(query.table.clone()))?;
619
620        let schema = table.schema();
621
622        // Convert WHERE clause to filter
623        let filter = query
624            .where_clause
625            .as_ref()
626            .map(VirtualFilter::from_where_clause);
627
628        // Scan table
629        let rows = table.scan(&query.columns, filter.as_ref(), query.limit, query.offset)?;
630
631        // Convert to SochResult
632        let columns = if query.columns.is_empty() || query.columns[0] == "*" {
633            schema.columns.iter().map(|c| c.name.clone()).collect()
634        } else {
635            query.columns.clone()
636        };
637
638        let result_rows: Vec<Vec<SochValue>> = rows.into_iter().map(|r| r.values).collect();
639
640        Ok(SochResult {
641            table: query.table.clone(),
642            columns,
643            rows: result_rows,
644        })
645    }
646}
647
648// ============================================================================
649// Tests
650// ============================================================================
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655
656    fn create_test_schema() -> VirtualTableSchema {
657        VirtualTableSchema {
658            name: "test_table".to_string(),
659            columns: vec![
660                VirtualColumnDef {
661                    name: "id".to_string(),
662                    col_type: VirtualColumnType::Int64,
663                    nullable: false,
664                    primary_key: true,
665                    description: None,
666                },
667                VirtualColumnDef {
668                    name: "name".to_string(),
669                    col_type: VirtualColumnType::Text,
670                    nullable: false,
671                    primary_key: false,
672                    description: None,
673                },
674                VirtualColumnDef {
675                    name: "score".to_string(),
676                    col_type: VirtualColumnType::Float64,
677                    nullable: true,
678                    primary_key: false,
679                    description: None,
680                },
681            ],
682            estimated_rows: Some(1000),
683            description: None,
684        }
685    }
686
687    #[test]
688    fn test_plugin_virtual_table_creation() {
689        let schema = create_test_schema();
690        let table = PluginVirtualTable::new("test_plugin", "test_table", schema);
691
692        assert_eq!(table.qualified_name(), "test_plugin.test_table");
693        assert_eq!(table.schema().columns.len(), 3);
694    }
695
696    #[test]
697    fn test_virtual_table_scan() {
698        let schema = create_test_schema();
699        let table = PluginVirtualTable::new("plugin", "table", schema);
700
701        let rows = table.scan(&[], None, Some(10), None).unwrap();
702
703        assert_eq!(rows.len(), 10);
704        assert_eq!(rows[0].values.len(), 3); // 3 columns
705    }
706
707    #[test]
708    fn test_virtual_table_scan_with_filter() {
709        let schema = create_test_schema();
710        let table = PluginVirtualTable::new("plugin", "table", schema.clone());
711
712        let filter = VirtualFilter::Gt("id".to_string(), SochValue::Int(5));
713        let rows = table.scan(&[], Some(&filter), Some(100), None).unwrap();
714
715        // All rows should have id > 5
716        for row in &rows {
717            if let Some(SochValue::Int(id)) = row.get_by_name("id", &schema) {
718                assert!(*id > 5);
719            }
720        }
721    }
722
723    #[test]
724    fn test_virtual_filter_matches() {
725        let schema = create_test_schema();
726        let row = VirtualRow::new(vec![
727            SochValue::Int(42),
728            SochValue::Text("Alice".to_string()),
729            SochValue::Float(95.5),
730        ]);
731
732        // Equality filter
733        let filter = VirtualFilter::Eq("id".to_string(), SochValue::Int(42));
734        assert!(filter.matches(&row, &schema));
735
736        // Like filter
737        let filter = VirtualFilter::Like("name".to_string(), "Al%".to_string());
738        assert!(filter.matches(&row, &schema));
739
740        // Greater than
741        let filter = VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0));
742        assert!(filter.matches(&row, &schema));
743
744        // AND filter
745        let filter = VirtualFilter::And(vec![
746            VirtualFilter::Eq("id".to_string(), SochValue::Int(42)),
747            VirtualFilter::Gt("score".to_string(), SochValue::Float(90.0)),
748        ]);
749        assert!(filter.matches(&row, &schema));
750    }
751
752    #[test]
753    fn test_registry_operations() {
754        let registry = VirtualTableRegistry::new();
755        let schema = create_test_schema();
756
757        let table = Arc::new(PluginVirtualTable::new("plugin", "table", schema));
758
759        // Register
760        registry.register("plugin.table", table).unwrap();
761        assert_eq!(registry.list().len(), 1);
762
763        // Get
764        let retrieved = registry.get("plugin.table");
765        assert!(retrieved.is_some());
766
767        // Unregister
768        registry.unregister("plugin.table").unwrap();
769        assert!(registry.list().is_empty());
770    }
771
772    #[test]
773    fn test_registry_execute_select() {
774        let registry = VirtualTableRegistry::new();
775        let schema = create_test_schema();
776
777        let table = Arc::new(PluginVirtualTable::new("plugin", "data", schema));
778        registry.register("plugin.data", table).unwrap();
779
780        let query = SelectQuery {
781            columns: vec!["id".to_string(), "name".to_string()],
782            table: "plugin.data".to_string(),
783            where_clause: None,
784            order_by: None,
785            limit: Some(5),
786            offset: None,
787        };
788
789        let result = registry.execute_select(&query).unwrap();
790
791        assert_eq!(result.table, "plugin.data");
792        assert_eq!(result.columns, vec!["id", "name"]);
793        assert_eq!(result.rows.len(), 5);
794    }
795
796    #[test]
797    fn test_cache_behavior() {
798        let schema = create_test_schema();
799        let table = PluginVirtualTable::new("plugin", "cached", schema).with_cache_ttl(1); // 1 second TTL
800
801        // First scan populates cache
802        let rows1 = table.scan(&[], None, Some(5), None).unwrap();
803        assert!(table.is_cache_valid());
804
805        // Second scan uses cache
806        let rows2 = table.scan(&[], None, Some(5), None).unwrap();
807        assert_eq!(rows1.len(), rows2.len());
808
809        // Refresh clears cache
810        table.refresh().unwrap();
811        assert!(!table.is_cache_valid());
812    }
813
814    #[test]
815    fn test_column_projection() {
816        let schema = create_test_schema();
817        let table = PluginVirtualTable::new("plugin", "table", schema);
818
819        // Request only id and name
820        let rows = table
821            .scan(&["id".to_string(), "name".to_string()], None, Some(5), None)
822            .unwrap();
823
824        // Should still have all values (projection happens at registry level)
825        // In a real implementation, the plugin would handle projection
826        assert!(!rows.is_empty());
827    }
828}