Skip to main content

sochdb_kernel/
python_sandbox.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Modern Python Plugin Runtime
19//!
20//! AI-era design for running Python plugins in SochDB using:
21//! - **Pyodide**: Full CPython 3.12 with numpy, pandas, scikit-learn
22//! - **WASM Component Model**: Standard interfaces for cross-language composition
23//! - **AI Triggers**: Natural language → Python code generation
24//!
25//! ## Architecture
26//!
27//! ```text
28//! ┌──────────────────────────────────────────────────────────┐
29//! │                  Python Plugin System                     │
30//! ├──────────────────────────────────────────────────────────┤
31//! │  PythonPlugin → PyodideRuntime → WASM Sandbox            │
32//! │       ↓              ↓                ↓                  │
33//! │   packages:      micropip          Memory isolation      │
34//! │   numpy,pandas   install           Resource metering     │
35//! └──────────────────────────────────────────────────────────┘
36//! ```
37//!
38//! ## Example
39//!
40//! ```rust,ignore
41//! let runtime = PyodideRuntime::new(RuntimeConfig::default()).await?;
42//! runtime.install_packages(&["numpy", "pandas"]).await?;
43//!
44//! let plugin = PythonPlugin::new("fraud_detector")
45//!     .with_code(r#"
46//!         import numpy as np
47//!         def on_insert(row):
48//!             if row["amount"] > 10000:
49//!                 raise TriggerAbort("Amount too high")
50//!             return row
51//!     "#)
52//!     .with_trigger("transactions", TriggerEvent::BeforeInsert);
53//!
54//! runtime.register(plugin)?;
55//! ```
56
57use crate::error::{KernelError, KernelResult};
58use parking_lot::RwLock;
59use std::collections::HashMap;
60use std::sync::atomic::{AtomicU64, Ordering};
61use std::sync::Arc;
62use std::time::{Duration, Instant};
63
64// ============================================================================
65// Runtime Configuration
66// ============================================================================
67
68/// Configuration for the Pyodide runtime
69#[derive(Debug, Clone)]
70pub struct RuntimeConfig {
71    /// Memory limit per plugin instance (bytes)
72    pub memory_limit_bytes: u64,
73    /// CPU time limit (milliseconds)
74    pub timeout_ms: u64,
75    /// Pre-installed packages
76    pub packages: Vec<String>,
77    /// Enable debug logging
78    pub debug: bool,
79    /// Allow network access (for package installation)
80    pub allow_network: bool,
81    /// Custom wheel URLs
82    pub wheel_urls: Vec<String>,
83}
84
85impl Default for RuntimeConfig {
86    fn default() -> Self {
87        Self {
88            memory_limit_bytes: 64 * 1024 * 1024, // 64 MB
89            timeout_ms: 5000,                      // 5 seconds
90            packages: vec![],
91            debug: false,
92            allow_network: false,
93            wheel_urls: vec![],
94        }
95    }
96}
97
98impl RuntimeConfig {
99    /// Create config with ML packages (numpy, pandas, sklearn)
100    pub fn with_ml_packages() -> Self {
101        Self {
102            packages: vec![
103                "numpy".into(),
104                "pandas".into(),
105                "scikit-learn".into(),
106            ],
107            memory_limit_bytes: 256 * 1024 * 1024, // 256 MB for ML
108            timeout_ms: 30000,                      // 30s for model inference
109            ..Default::default()
110        }
111    }
112
113    /// Create lightweight config for validation scripts
114    pub fn lightweight() -> Self {
115        Self {
116            memory_limit_bytes: 16 * 1024 * 1024, // 16 MB
117            timeout_ms: 100,                       // 100ms
118            packages: vec![],
119            ..Default::default()
120        }
121    }
122}
123
124// ============================================================================
125// Trigger Events
126// ============================================================================
127
128/// Types of trigger events
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
130pub enum TriggerEvent {
131    BeforeInsert,
132    AfterInsert,
133    BeforeUpdate,
134    AfterUpdate,
135    BeforeDelete,
136    AfterDelete,
137    /// Stream processing (micro-batch)
138    OnBatch,
139}
140
141impl TriggerEvent {
142    pub fn from_str(s: &str) -> Option<Self> {
143        match s.to_uppercase().replace(' ', "_").as_str() {
144            "BEFORE_INSERT" => Some(Self::BeforeInsert),
145            "AFTER_INSERT" => Some(Self::AfterInsert),
146            "BEFORE_UPDATE" => Some(Self::BeforeUpdate),
147            "AFTER_UPDATE" => Some(Self::AfterUpdate),
148            "BEFORE_DELETE" => Some(Self::BeforeDelete),
149            "AFTER_DELETE" => Some(Self::AfterDelete),
150            "ON_BATCH" => Some(Self::OnBatch),
151            _ => None,
152        }
153    }
154
155    pub fn handler_name(&self) -> &'static str {
156        match self {
157            Self::BeforeInsert => "on_before_insert",
158            Self::AfterInsert => "on_after_insert",
159            Self::BeforeUpdate => "on_before_update",
160            Self::AfterUpdate => "on_after_update",
161            Self::BeforeDelete => "on_before_delete",
162            Self::AfterDelete => "on_after_delete",
163            Self::OnBatch => "on_batch",
164        }
165    }
166
167    pub fn is_before(&self) -> bool {
168        matches!(self, Self::BeforeInsert | Self::BeforeUpdate | Self::BeforeDelete)
169    }
170}
171
172// ============================================================================
173// Python Plugin Definition
174// ============================================================================
175
176/// A Python plugin with code and trigger bindings
177#[derive(Debug, Clone)]
178pub struct PythonPlugin {
179    /// Unique plugin name
180    pub name: String,
181    /// Plugin version
182    pub version: String,
183    /// Python source code
184    pub code: String,
185    /// Required packages
186    pub packages: Vec<String>,
187    /// Custom wheel URLs (for private packages)
188    pub wheels: Vec<String>,
189    /// Table → Events mapping
190    pub triggers: HashMap<String, Vec<TriggerEvent>>,
191    /// Plugin-specific config overrides
192    pub config: Option<RuntimeConfig>,
193}
194
195impl PythonPlugin {
196    pub fn new(name: &str) -> Self {
197        Self {
198            name: name.to_string(),
199            version: "1.0.0".to_string(),
200            code: String::new(),
201            packages: vec![],
202            wheels: vec![],
203            triggers: HashMap::new(),
204            config: None,
205        }
206    }
207
208    pub fn with_version(mut self, version: &str) -> Self {
209        self.version = version.to_string();
210        self
211    }
212
213    pub fn with_code(mut self, code: &str) -> Self {
214        self.code = code.to_string();
215        self
216    }
217
218    pub fn with_packages(mut self, packages: Vec<&str>) -> Self {
219        self.packages = packages.into_iter().map(String::from).collect();
220        self
221    }
222
223    pub fn with_trigger(mut self, table: &str, event: TriggerEvent) -> Self {
224        self.triggers
225            .entry(table.to_string())
226            .or_default()
227            .push(event);
228        self
229    }
230
231    pub fn with_config(mut self, config: RuntimeConfig) -> Self {
232        self.config = Some(config);
233        self
234    }
235}
236
237// ============================================================================
238// Trigger Context & Result
239// ============================================================================
240
241/// Context passed to trigger execution
242#[derive(Debug, Clone)]
243pub struct TriggerContext {
244    /// Table being modified
245    pub table: String,
246    /// Event type
247    pub event: TriggerEvent,
248    /// Row data as JSON string (for Pyodide interop)
249    pub row_json: String,
250    /// Old row for UPDATE/DELETE (JSON)
251    pub old_row_json: Option<String>,
252    /// Transaction ID
253    pub txn_id: u64,
254    /// Batch of rows for ON_BATCH events
255    pub batch_json: Option<String>,
256}
257
258/// Result from trigger execution
259#[derive(Debug, Clone)]
260pub enum TriggerResult {
261    /// Continue with optionally modified row (JSON)
262    Continue(Option<String>),
263    /// Abort with error message and code
264    Abort { message: String, code: String },
265    /// Skip this row
266    Skip,
267    /// Batch result (for ON_BATCH)
268    Batch(String),
269}
270
271// ============================================================================
272// Pyodide Runtime (Simulated for now)
273// ============================================================================
274
275/// Runtime statistics
276#[derive(Debug, Default)]
277pub struct RuntimeStats {
278    pub total_executions: AtomicU64,
279    pub total_time_us: AtomicU64,
280    pub errors: AtomicU64,
281    pub aborts: AtomicU64,
282    pub packages_installed: AtomicU64,
283}
284
285/// Pyodide-based Python runtime
286///
287/// In production, this wraps actual Pyodide WASM module.
288/// Currently provides a simulation for API design validation.
289pub struct PyodideRuntime {
290    config: RuntimeConfig,
291    /// Registered plugins
292    plugins: RwLock<HashMap<String, PythonPlugin>>,
293    /// Table → Plugin mappings
294    trigger_map: RwLock<HashMap<(String, TriggerEvent), Vec<String>>>,
295    /// Installed packages
296    installed_packages: RwLock<Vec<String>>,
297    /// Runtime statistics
298    stats: Arc<RuntimeStats>,
299    /// Plugin instances (in production: actual WASM instances)
300    #[allow(dead_code)]
301    instances: RwLock<HashMap<String, PluginInstance>>,
302}
303
304/// A loaded plugin instance
305#[allow(dead_code)]
306struct PluginInstance {
307    plugin_name: String,
308    loaded_at: u64,
309    memory_used: u64,
310    call_count: u64,
311}
312
313impl PyodideRuntime {
314    /// Create a new runtime with configuration
315    pub fn new(config: RuntimeConfig) -> Self {
316        Self {
317            config,
318            plugins: RwLock::new(HashMap::new()),
319            trigger_map: RwLock::new(HashMap::new()),
320            installed_packages: RwLock::new(vec![]),
321            stats: Arc::new(RuntimeStats::default()),
322            instances: RwLock::new(HashMap::new()),
323        }
324    }
325
326    /// Install Python packages via micropip
327    ///
328    /// In production, this downloads and installs packages into the WASM environment.
329    pub async fn install_packages(&self, packages: &[&str]) -> KernelResult<()> {
330        let mut installed = self.installed_packages.write();
331        for pkg in packages {
332            if !installed.contains(&pkg.to_string()) {
333                // Simulate package installation
334                if self.config.debug {
335                    eprintln!("[Pyodide] Installing package: {}", pkg);
336                }
337                installed.push(pkg.to_string());
338                self.stats.packages_installed.fetch_add(1, Ordering::Relaxed);
339            }
340        }
341        Ok(())
342    }
343
344    /// Register a Python plugin
345    pub fn register(&self, plugin: PythonPlugin) -> KernelResult<()> {
346        // Validate plugin code
347        self.validate_code(&plugin.code)?;
348
349        // Register plugin
350        let name = plugin.name.clone();
351        {
352            let mut plugins = self.plugins.write();
353            plugins.insert(name.clone(), plugin.clone());
354        }
355
356        // Update trigger mappings
357        {
358            let mut trigger_map = self.trigger_map.write();
359            for (table, events) in &plugin.triggers {
360                for event in events {
361                    trigger_map
362                        .entry((table.clone(), *event))
363                        .or_default()
364                        .push(name.clone());
365                }
366            }
367        }
368
369        if self.config.debug {
370            eprintln!("[Pyodide] Registered plugin: {}", name);
371        }
372
373        Ok(())
374    }
375
376    /// Unregister a plugin
377    pub fn unregister(&self, name: &str) -> KernelResult<()> {
378        let mut plugins = self.plugins.write();
379        if let Some(plugin) = plugins.remove(name) {
380            // Remove from trigger map
381            let mut trigger_map = self.trigger_map.write();
382            for (table, events) in &plugin.triggers {
383                for event in events {
384                    if let Some(names) = trigger_map.get_mut(&(table.clone(), *event)) {
385                        names.retain(|n| n != name);
386                    }
387                }
388            }
389            Ok(())
390        } else {
391            Err(KernelError::Plugin {
392                message: format!("Plugin not found: {}", name),
393            })
394        }
395    }
396
397    /// Fire triggers for an event
398    pub async fn fire(
399        &self,
400        table: &str,
401        event: TriggerEvent,
402        context: &TriggerContext,
403    ) -> KernelResult<TriggerResult> {
404        let start = Instant::now();
405        self.stats.total_executions.fetch_add(1, Ordering::Relaxed);
406
407        // Find plugins to execute
408        let plugin_names = {
409            let trigger_map = self.trigger_map.read();
410            trigger_map
411                .get(&(table.to_string(), event))
412                .cloned()
413                .unwrap_or_default()
414        };
415
416        if plugin_names.is_empty() {
417            return Ok(TriggerResult::Continue(None));
418        }
419
420        // Execute each plugin in order
421        let mut current_row = context.row_json.clone();
422
423        for name in plugin_names {
424            let plugins = self.plugins.read();
425            if let Some(plugin) = plugins.get(&name) {
426                let result = self.execute_plugin(plugin, event, &current_row).await?;
427
428                match result {
429                    TriggerResult::Continue(Some(modified)) => {
430                        current_row = modified;
431                    }
432                    TriggerResult::Abort { message, code } => {
433                        self.stats.aborts.fetch_add(1, Ordering::Relaxed);
434                        return Ok(TriggerResult::Abort { message, code });
435                    }
436                    TriggerResult::Skip => {
437                        return Ok(TriggerResult::Skip);
438                    }
439                    _ => {}
440                }
441            }
442        }
443
444        let elapsed = start.elapsed().as_micros() as u64;
445        self.stats.total_time_us.fetch_add(elapsed, Ordering::Relaxed);
446
447        Ok(TriggerResult::Continue(Some(current_row)))
448    }
449
450    /// Execute a single plugin
451    async fn execute_plugin(
452        &self,
453        plugin: &PythonPlugin,
454        event: TriggerEvent,
455        row_json: &str,
456    ) -> KernelResult<TriggerResult> {
457        let timeout = Duration::from_millis(self.config.timeout_ms);
458        let start = Instant::now();
459
460        // In production, this would:
461        // 1. Get or create WASM instance for this plugin
462        // 2. Call the appropriate handler function
463        // 3. Marshal data between Rust and Python
464
465        // Simulate execution
466        let result = self.simulate_execution(plugin, event, row_json, timeout)?;
467
468        if self.config.debug {
469            eprintln!(
470                "[Pyodide] {} executed in {:?}",
471                plugin.name,
472                start.elapsed()
473            );
474        }
475
476        Ok(result)
477    }
478
479    /// Simulated execution (placeholder for real Pyodide)
480    fn simulate_execution(
481        &self,
482        plugin: &PythonPlugin,
483        event: TriggerEvent,
484        row_json: &str,
485        timeout: Duration,
486    ) -> KernelResult<TriggerResult> {
487        let start = Instant::now();
488
489        // Check timeout
490        if start.elapsed() > timeout {
491            return Err(KernelError::Plugin {
492                message: "Execution timed out".to_string(),
493            });
494        }
495
496        // Simulate common trigger logic based on code patterns
497        let code = &plugin.code;
498
499        // Check for abort conditions
500        if code.contains("TriggerAbort") || code.contains("raise") {
501            // Parse simulated condition from code
502            if code.contains("amount") && code.contains("> 10000") {
503                // Check if row has high amount
504                if row_json.contains("\"amount\":") {
505                    if let Some(amount) = self.extract_amount(row_json) {
506                        if amount > 10000.0 {
507                            return Ok(TriggerResult::Abort {
508                                message: "Amount too high".to_string(),
509                                code: "LIMIT_EXCEEDED".to_string(),
510                            });
511                        }
512                    }
513                }
514            }
515        }
516
517        // Check for transformations
518        if code.contains(".lower()") {
519            // Simulate lowercase transformation
520            let modified = row_json.to_lowercase();
521            return Ok(TriggerResult::Continue(Some(modified)));
522        }
523
524        // For BEFORE triggers, return potentially modified row
525        if event.is_before() {
526            Ok(TriggerResult::Continue(Some(row_json.to_string())))
527        } else {
528            Ok(TriggerResult::Continue(None))
529        }
530    }
531
532    fn extract_amount(&self, json: &str) -> Option<f64> {
533        // Simple extraction (in production, use serde_json)
534        if let Some(start) = json.find("\"amount\":") {
535            let rest = &json[start + 9..].trim_start();
536            let end = rest.find(|c: char| !c.is_numeric() && c != '.' && c != '-');
537            let num_str = match end {
538                Some(e) => &rest[..e],
539                None => rest,
540            };
541            num_str.trim().parse().ok()
542        } else {
543            None
544        }
545    }
546
547    /// Validate Python code
548    fn validate_code(&self, code: &str) -> KernelResult<()> {
549        // Check for obviously dangerous patterns
550        let forbidden = [
551            "__import__('os')",
552            "subprocess",
553            "eval(",
554            "exec(",
555            "compile(",
556            "open(",
557            "__builtins__",
558        ];
559
560        for pattern in forbidden {
561            if code.contains(pattern) {
562                return Err(KernelError::Plugin {
563                    message: format!("Forbidden pattern in code: {}", pattern),
564                });
565            }
566        }
567
568        // Check for required handler function
569        let handlers = ["on_insert", "on_before_insert", "on_after_insert", 
570                        "on_update", "on_delete", "on_batch", "handler"];
571        if !handlers.iter().any(|h| code.contains(&format!("def {}(", h))) {
572            return Err(KernelError::Plugin {
573                message: "Code must define a handler function".to_string(),
574            });
575        }
576
577        Ok(())
578    }
579
580    /// Get runtime statistics
581    pub fn stats(&self) -> &RuntimeStats {
582        &self.stats
583    }
584
585    /// List registered plugins
586    pub fn list_plugins(&self) -> Vec<String> {
587        self.plugins.read().keys().cloned().collect()
588    }
589}
590
591// ============================================================================
592// AI Trigger Generator (Future Feature)
593// ============================================================================
594
595/// Generates Python trigger code from natural language instructions
596#[allow(dead_code)]
597pub struct AiTriggerGenerator {
598    /// Model name (e.g., "gpt-4o", "claude-3", "local:llama")
599    model: String,
600    /// API endpoint
601    endpoint: Option<String>,
602}
603
604#[allow(dead_code)]
605impl AiTriggerGenerator {
606    pub fn new(model: &str) -> Self {
607        Self {
608            model: model.to_string(),
609            endpoint: None,
610        }
611    }
612
613    /// Generate trigger code from natural language
614    pub async fn generate(&self, instruction: &str, table_schema: &str) -> KernelResult<String> {
615        // In production, call LLM API
616        // For now, return a template
617        let code = format!(
618            r#"
619# Generated from: {}
620# Table schema: {}
621
622def on_before_insert(row: dict) -> dict:
623    # TODO: Implement validation logic
624    return row
625"#,
626            instruction, table_schema
627        );
628        Ok(code)
629    }
630}
631
632// ============================================================================
633// Tests
634// ============================================================================
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639
640    #[test]
641    fn test_plugin_builder() {
642        let plugin = PythonPlugin::new("test")
643            .with_version("2.0.0")
644            .with_code("def on_insert(row): return row")
645            .with_packages(vec!["numpy", "pandas"])
646            .with_trigger("users", TriggerEvent::BeforeInsert);
647
648        assert_eq!(plugin.name, "test");
649        assert_eq!(plugin.version, "2.0.0");
650        assert!(plugin.packages.contains(&"numpy".to_string()));
651        assert!(plugin.triggers.contains_key("users"));
652    }
653
654    #[test]
655    fn test_runtime_config() {
656        let ml_config = RuntimeConfig::with_ml_packages();
657        assert!(ml_config.packages.contains(&"numpy".to_string()));
658        assert_eq!(ml_config.memory_limit_bytes, 256 * 1024 * 1024);
659
660        let light_config = RuntimeConfig::lightweight();
661        assert_eq!(light_config.timeout_ms, 100);
662    }
663
664    #[tokio::test]
665    async fn test_runtime_register() {
666        let runtime = PyodideRuntime::new(RuntimeConfig::default());
667
668        let plugin = PythonPlugin::new("validator")
669            .with_code("def on_insert(row): return row")
670            .with_trigger("users", TriggerEvent::BeforeInsert);
671
672        runtime.register(plugin).unwrap();
673        assert!(runtime.list_plugins().contains(&"validator".to_string()));
674    }
675
676    #[tokio::test]
677    async fn test_runtime_fire_trigger() {
678        let runtime = PyodideRuntime::new(RuntimeConfig::default());
679
680        let plugin = PythonPlugin::new("amount_check")
681            .with_code(r#"
682def on_insert(row):
683    if row["amount"] > 10000:
684        raise TriggerAbort("Amount too high")
685    return row
686"#)
687            .with_trigger("orders", TriggerEvent::BeforeInsert);
688
689        runtime.register(plugin).unwrap();
690
691        // Test normal row
692        let context = TriggerContext {
693            table: "orders".to_string(),
694            event: TriggerEvent::BeforeInsert,
695            row_json: r#"{"amount": 500}"#.to_string(),
696            old_row_json: None,
697            txn_id: 1,
698            batch_json: None,
699        };
700
701        let result = runtime.fire("orders", TriggerEvent::BeforeInsert, &context).await;
702        assert!(matches!(result, Ok(TriggerResult::Continue(_))));
703
704        // Test high amount (should abort)
705        let context2 = TriggerContext {
706            table: "orders".to_string(),
707            event: TriggerEvent::BeforeInsert,
708            row_json: r#"{"amount": 50000}"#.to_string(),
709            old_row_json: None,
710            txn_id: 2,
711            batch_json: None,
712        };
713
714        let result2 = runtime.fire("orders", TriggerEvent::BeforeInsert, &context2).await;
715        assert!(matches!(result2, Ok(TriggerResult::Abort { .. })));
716    }
717
718    #[test]
719    fn test_code_validation() {
720        let runtime = PyodideRuntime::new(RuntimeConfig::default());
721
722        // Valid code
723        assert!(runtime.validate_code("def on_insert(row): return row").is_ok());
724
725        // Forbidden pattern
726        assert!(runtime.validate_code("import subprocess").is_err());
727
728        // No handler function
729        assert!(runtime.validate_code("x = 42").is_err());
730    }
731}