Skip to main content

sochdb_kernel/
plugin.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Plugin Architecture
19//!
20//! SochDB uses a plugin architecture to keep the kernel minimal while
21//! allowing rich functionality through extensions.
22//!
23//! ## Design Philosophy
24//!
25//! 1. **Core is Minimal**: The kernel contains only ACID-critical code
26//! 2. **Extensions Add Features**: Storage backends, indices, observability are plugins
27//! 3. **No Bloat**: Users only pay for what they use
28//! 4. **Vendor Neutral**: No lock-in to specific monitoring/storage systems
29//!
30//! ## Plugin Categories
31//!
32//! - `StorageExtension`: Alternative storage backends (LSCS, RocksDB, etc.)
33//! - `IndexExtension`: Custom index types (vector, learned, etc.)
34//! - `ObservabilityExtension`: Metrics, tracing, logging backends
35//! - `CompressionExtension`: Compression algorithms
36//!
37//! ## Example: Adding Prometheus Metrics
38//!
39//! ```ignore
40//! // In a separate crate: sochdb-prometheus-plugin
41//! struct PrometheusPlugin { /* ... */ }
42//!
43//! impl ObservabilityExtension for PrometheusPlugin {
44//!     fn record_metric(&self, name: &str, value: f64, tags: &[(&str, &str)]) {
45//!         // Push to Prometheus
46//!     }
47//! }
48//!
49//! // Usage:
50//! let db = KernelDB::open(path)?;
51//! db.plugins().register_observability(Box::new(PrometheusPlugin::new()))?;
52//! ```
53
54use crate::error::{KernelError, KernelResult};
55use crate::kernel_api::{HealthInfo, RowId, TableId};
56use crate::transaction::TransactionId;
57use parking_lot::{Mutex, RwLock};
58use std::any::Any;
59use std::collections::HashMap;
60use std::sync::Arc;
61
62// ============================================================================
63// Extension Trait Definitions
64// ============================================================================
65
66/// Information about an extension
67#[derive(Debug, Clone)]
68pub struct ExtensionInfo {
69    /// Unique extension name (e.g., "prometheus-metrics")
70    pub name: String,
71    /// Extension version
72    pub version: String,
73    /// Human-readable description
74    pub description: String,
75    /// Extension author
76    pub author: String,
77    /// Capabilities provided
78    pub capabilities: Vec<ExtensionCapability>,
79}
80
81/// Capabilities an extension can provide
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub enum ExtensionCapability {
84    /// Alternative storage backend
85    Storage,
86    /// Custom index type
87    Index,
88    /// Metrics/tracing/logging
89    Observability,
90    /// Compression algorithm
91    Compression,
92    /// Query optimization
93    QueryOptimizer,
94    /// Authentication/Authorization
95    Auth,
96    /// Custom - for third-party extensions
97    Custom(String),
98}
99
100/// Base trait for all extensions
101pub trait Extension: Send + Sync {
102    /// Get extension information
103    fn info(&self) -> ExtensionInfo;
104
105    /// Initialize the extension
106    fn init(&mut self) -> KernelResult<()> {
107        Ok(())
108    }
109
110    /// Shutdown the extension gracefully
111    fn shutdown(&mut self) -> KernelResult<()> {
112        Ok(())
113    }
114
115    /// Cast to Any for downcasting
116    fn as_any(&self) -> &dyn Any;
117
118    /// Cast to mutable Any for downcasting
119    fn as_any_mut(&mut self) -> &mut dyn Any;
120}
121
122// ============================================================================
123// Storage Extension
124// ============================================================================
125
126/// Storage backend extension
127///
128/// Implement this to provide alternative storage engines (LSCS, RocksDB, etc.)
129pub trait StorageExtension: Extension {
130    /// Read data for a key
131    fn get(&self, table_id: TableId, key: &[u8]) -> KernelResult<Option<Vec<u8>>>;
132
133    /// Write data for a key
134    fn put(
135        &self,
136        table_id: TableId,
137        key: &[u8],
138        value: &[u8],
139        txn_id: TransactionId,
140    ) -> KernelResult<()>;
141
142    /// Delete a key
143    fn delete(&self, table_id: TableId, key: &[u8], txn_id: TransactionId) -> KernelResult<()>;
144
145    /// Scan a range of keys
146    fn scan(
147        &self,
148        table_id: TableId,
149        start: &[u8],
150        end: &[u8],
151        limit: usize,
152    ) -> KernelResult<Vec<(Vec<u8>, Vec<u8>)>>;
153
154    /// Flush pending writes
155    fn flush(&self) -> KernelResult<()>;
156
157    /// Compact storage (if applicable)
158    fn compact(&self) -> KernelResult<()> {
159        Ok(()) // Default: no-op
160    }
161
162    /// Get storage statistics
163    fn stats(&self) -> StorageStats {
164        StorageStats::default()
165    }
166}
167
168/// Storage statistics
169#[derive(Debug, Clone, Default)]
170pub struct StorageStats {
171    /// Total bytes stored
172    pub bytes_stored: u64,
173    /// Number of keys
174    pub key_count: u64,
175    /// Pending compaction bytes
176    pub pending_compaction_bytes: u64,
177    /// Write amplification factor
178    pub write_amplification: f64,
179}
180
181// ============================================================================
182// Index Extension
183// ============================================================================
184
185/// Index extension
186///
187/// Implement this for custom index types (vector, learned, full-text, etc.)
188pub trait IndexExtension: Extension {
189    /// Index type name (e.g., "hnsw", "learned", "btree")
190    fn index_type(&self) -> &str;
191
192    /// Build index on existing data
193    fn build(
194        &mut self,
195        table_id: TableId,
196        column_id: u16,
197        data: &[(RowId, Vec<u8>)],
198    ) -> KernelResult<()>;
199
200    /// Insert a key-value pair into the index
201    fn insert(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
202
203    /// Delete a key from the index
204    fn delete(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
205
206    /// Point lookup
207    fn lookup(&self, key: &[u8]) -> KernelResult<Vec<RowId>>;
208
209    /// Range scan
210    fn range(&self, start: &[u8], end: &[u8], limit: usize) -> KernelResult<Vec<RowId>>;
211
212    /// Nearest neighbor search (for vector indices)
213    fn nearest(&self, _query: &[u8], _k: usize) -> KernelResult<Vec<(RowId, f32)>> {
214        Err(KernelError::Plugin {
215            message: "nearest neighbor not supported by this index type".into(),
216        })
217    }
218
219    /// Get index size in bytes
220    fn size_bytes(&self) -> u64;
221}
222
223// ============================================================================
224// Observability Extension (PLUGIN ARCHITECTURE)
225// ============================================================================
226
227/// Observability extension
228///
229/// Implement this for metrics, tracing, and logging backends.
230///
231/// ## Why Plugin Architecture?
232///
233/// 1. **No Dependency Bloat**: Core doesn't pull in Prometheus, DataDog, etc.
234/// 2. **Vendor Neutral**: Users choose their observability stack
235/// 3. **Flexible**: Can run without any observability in embedded scenarios
236///
237/// ## Available Plugins (separate crates):
238///
239/// - `sochdb-prometheus`: Prometheus metrics
240/// - `sochdb-datadog`: DataDog integration  
241/// - `sochdb-opentelemetry`: OpenTelemetry support
242/// - `sochdb-logging-json`: JSON structured logging
243/// - `sochdb-logging-logfmt`: logfmt style logging
244pub trait ObservabilityExtension: Extension {
245    // -------------------------------------------------------------------------
246    // Metrics
247    // -------------------------------------------------------------------------
248
249    /// Record a counter increment
250    fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]);
251
252    /// Record a gauge value
253    fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]);
254
255    /// Record a histogram observation
256    fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]);
257
258    // -------------------------------------------------------------------------
259    // Tracing
260    // -------------------------------------------------------------------------
261
262    /// Start a new span
263    fn span_start(&self, name: &str, parent: Option<u64>) -> u64;
264
265    /// End a span
266    fn span_end(&self, span_id: u64);
267
268    /// Add an event to a span
269    fn span_event(&self, span_id: u64, name: &str, attributes: &[(&str, &str)]);
270
271    // -------------------------------------------------------------------------
272    // Logging
273    // -------------------------------------------------------------------------
274
275    /// Log at debug level
276    fn log_debug(&self, message: &str, fields: &[(&str, &str)]) {
277        let _ = (message, fields); // Default: no-op
278    }
279
280    /// Log at info level
281    fn log_info(&self, message: &str, fields: &[(&str, &str)]) {
282        let _ = (message, fields); // Default: no-op
283    }
284
285    /// Log at warn level
286    fn log_warn(&self, message: &str, fields: &[(&str, &str)]) {
287        let _ = (message, fields); // Default: no-op
288    }
289
290    /// Log at error level
291    fn log_error(&self, message: &str, fields: &[(&str, &str)]) {
292        let _ = (message, fields); // Default: no-op
293    }
294
295    // -------------------------------------------------------------------------
296    // Health Reporting
297    // -------------------------------------------------------------------------
298
299    /// Report health status (called periodically by kernel)
300    fn report_health(&self, health: &HealthInfo) {
301        let _ = health; // Default: no-op
302    }
303}
304
305// ============================================================================
306// Compression Extension
307// ============================================================================
308
309/// Compression algorithm extension
310pub trait CompressionExtension: Extension {
311    /// Algorithm name (e.g., "lz4", "zstd", "snappy")
312    fn algorithm(&self) -> &str;
313
314    /// Compress data
315    fn compress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
316
317    /// Decompress data
318    fn decompress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
319
320    /// Compression level (if applicable)
321    fn set_level(&mut self, _level: i32) -> KernelResult<()> {
322        Ok(())
323    }
324}
325
326// ============================================================================
327// Plugin Manager
328// ============================================================================
329
330/// Plugin manager - registry for all extensions
331///
332/// The kernel uses this to discover and invoke extensions.
333pub struct PluginManager {
334    /// Storage extensions by name
335    storage: RwLock<HashMap<String, Arc<dyn StorageExtension>>>,
336    /// Index extensions by name
337    indices: RwLock<HashMap<String, Arc<RwLock<dyn IndexExtension>>>>,
338    /// Observability extensions (can have multiple)
339    observability: RwLock<Vec<Arc<dyn ObservabilityExtension>>>,
340    /// Compression extensions by algorithm name
341    compression: RwLock<HashMap<String, Arc<dyn CompressionExtension>>>,
342    /// Active storage backend name
343    active_storage: RwLock<Option<String>>,
344    /// Shutdown handles for storage extensions (wraps extension in Mutex for interior mutability)
345    storage_shutdown: RwLock<HashMap<String, Mutex<Arc<dyn StorageExtension>>>>,
346    /// Shutdown handles for index extensions
347    index_shutdown: RwLock<HashMap<String, Arc<Mutex<Arc<RwLock<dyn IndexExtension>>>>>>,
348    /// Shutdown handles for observability
349    observability_shutdown: RwLock<Vec<Mutex<Arc<dyn ObservabilityExtension>>>>,
350    /// Shutdown handles for compression
351    compression_shutdown: RwLock<HashMap<String, Mutex<Arc<dyn CompressionExtension>>>>,
352}
353
354impl Default for PluginManager {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl PluginManager {
361    /// Create a new plugin manager
362    pub fn new() -> Self {
363        Self {
364            storage: RwLock::new(HashMap::new()),
365            indices: RwLock::new(HashMap::new()),
366            observability: RwLock::new(Vec::new()),
367            compression: RwLock::new(HashMap::new()),
368            active_storage: RwLock::new(None),
369            storage_shutdown: RwLock::new(HashMap::new()),
370            index_shutdown: RwLock::new(HashMap::new()),
371            observability_shutdown: RwLock::new(Vec::new()),
372            compression_shutdown: RwLock::new(HashMap::new()),
373        }
374    }
375
376    // -------------------------------------------------------------------------
377    // Storage Extensions
378    // -------------------------------------------------------------------------
379
380    /// Register a storage extension
381    pub fn register_storage(&self, ext: Arc<dyn StorageExtension>) -> KernelResult<()> {
382        let name = ext.info().name.clone();
383        let mut storage = self.storage.write();
384
385        if storage.contains_key(&name) {
386            return Err(KernelError::Plugin {
387                message: format!("storage extension '{}' already registered", name),
388            });
389        }
390
391        // Also add to shutdown storage - need to wrap the Arc<dyn StorageExtension> in Mutex
392        let mut shutdown = self.storage_shutdown.write();
393        shutdown.insert(name.clone(), Mutex::new(Arc::clone(&ext)));
394
395        storage.insert(name.clone(), ext);
396
397        // Set as active if it's the first one
398        let mut active = self.active_storage.write();
399        if active.is_none() {
400            *active = Some(name);
401        }
402
403        Ok(())
404    }
405
406    /// Set the active storage backend
407    pub fn set_active_storage(&self, name: &str) -> KernelResult<()> {
408        let storage = self.storage.read();
409        if !storage.contains_key(name) {
410            return Err(KernelError::Plugin {
411                message: format!("storage extension '{}' not found", name),
412            });
413        }
414        *self.active_storage.write() = Some(name.to_string());
415        Ok(())
416    }
417
418    /// Get the active storage backend
419    pub fn storage(&self) -> Option<Arc<dyn StorageExtension>> {
420        let active = self.active_storage.read();
421        active
422            .as_ref()
423            .and_then(|name| self.storage.read().get(name).cloned())
424    }
425
426    // -------------------------------------------------------------------------
427    // Index Extensions
428    // -------------------------------------------------------------------------
429
430    /// Register an index extension
431    pub fn register_index(&self, ext: Arc<RwLock<dyn IndexExtension>>) -> KernelResult<()> {
432        let name = ext.read().info().name.clone();
433        let mut indices = self.indices.write();
434
435        if indices.contains_key(&name) {
436            return Err(KernelError::Plugin {
437                message: format!("index extension '{}' already registered", name),
438            });
439        }
440
441        indices.insert(name.clone(), ext.clone());
442
443        // Also add to shutdown storage
444        let mut shutdown = self.index_shutdown.write();
445        shutdown.insert(name, Arc::new(Mutex::new(ext)));
446        Ok(())
447    }
448
449    /// Get an index extension by name
450    pub fn index(&self, name: &str) -> Option<Arc<RwLock<dyn IndexExtension>>> {
451        self.indices.read().get(name).cloned()
452    }
453
454    /// List registered index types
455    pub fn list_index_types(&self) -> Vec<String> {
456        self.indices.read().keys().cloned().collect()
457    }
458
459    // -------------------------------------------------------------------------
460    // Observability Extensions
461    // -------------------------------------------------------------------------
462
463    /// Register an observability extension
464    ///
465    /// Multiple observability extensions can be registered (fan-out to all)
466    pub fn register_observability(&self, ext: Arc<dyn ObservabilityExtension>) -> KernelResult<()> {
467        self.observability.write().push(ext.clone());
468
469        // Also add to shutdown storage
470        let mut shutdown = self.observability_shutdown.write();
471        shutdown.push(Mutex::new(Arc::clone(&ext)));
472        Ok(())
473    }
474
475    /// Record a counter across all observability extensions
476    pub fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
477        for ext in self.observability.read().iter() {
478            ext.counter_inc(name, value, labels);
479        }
480    }
481
482    /// Record a gauge across all observability extensions
483    pub fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
484        for ext in self.observability.read().iter() {
485            ext.gauge_set(name, value, labels);
486        }
487    }
488
489    /// Record a histogram observation across all observability extensions
490    pub fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
491        for ext in self.observability.read().iter() {
492            ext.histogram_observe(name, value, labels);
493        }
494    }
495
496    /// Report health to all observability extensions
497    pub fn report_health(&self, health: &HealthInfo) {
498        for ext in self.observability.read().iter() {
499            ext.report_health(health);
500        }
501    }
502
503    /// Check if any observability is configured
504    pub fn has_observability(&self) -> bool {
505        !self.observability.read().is_empty()
506    }
507
508    // -------------------------------------------------------------------------
509    // Compression Extensions
510    // -------------------------------------------------------------------------
511
512    /// Register a compression extension
513    pub fn register_compression(&self, ext: Arc<dyn CompressionExtension>) -> KernelResult<()> {
514        let algo = ext.algorithm().to_string();
515        let mut compression = self.compression.write();
516
517        if compression.contains_key(&algo) {
518            return Err(KernelError::Plugin {
519                message: format!("compression '{}' already registered", algo),
520            });
521        }
522
523        compression.insert(algo.clone(), ext.clone());
524
525        // Also add to shutdown storage
526        let mut shutdown = self.compression_shutdown.write();
527        shutdown.insert(algo.clone(), Mutex::new(Arc::clone(&ext)));
528        Ok(())
529    }
530
531    /// Get a compression extension by algorithm name
532    pub fn compression(&self, algorithm: &str) -> Option<Arc<dyn CompressionExtension>> {
533        self.compression.read().get(algorithm).cloned()
534    }
535
536    /// List available compression algorithms
537    pub fn list_compression(&self) -> Vec<String> {
538        self.compression.read().keys().cloned().collect()
539    }
540
541    // -------------------------------------------------------------------------
542    // Lifecycle
543    // -------------------------------------------------------------------------
544
545    /// Shutdown all extensions gracefully
546    ///
547    /// Calls `shutdown()` on each registered extension in reverse registration order.
548    /// This allows extensions to release resources, close connections, flush buffers, etc.
549    pub fn shutdown_all(&self) -> KernelResult<()> {
550        let mut errors = Vec::new();
551
552        // Shutdown storage extensions (in reverse order)
553        {
554            let storage = self.storage_shutdown.read();
555            let names: Vec<_> = storage.keys().cloned().collect();
556            for name in names.into_iter().rev() {
557                if let Some(ext) = storage.get(&name) {
558                    let mut ext_guard = ext.lock();
559                    if let Some(ext) = Arc::get_mut(&mut ext_guard) {
560                        if let Err(e) = ext.shutdown() {
561                            errors.push(format!("storage '{}': {}", name, e));
562                        }
563                    }
564                }
565            }
566        }
567
568        // Shutdown index extensions (in reverse order)
569        {
570            let indices = self.index_shutdown.read();
571            let names: Vec<_> = indices.keys().cloned().collect();
572            for name in names.into_iter().rev() {
573                if let Some(ext) = indices.get(&name) {
574                    let ext_guard = ext.lock();
575                    let mut inner = ext_guard.write();
576                    if let Err(e) = inner.shutdown() {
577                        errors.push(format!("index '{}': {}", name, e));
578                    }
579                }
580            }
581        }
582
583        // Shutdown observability extensions (in reverse order)
584        {
585            let observability = self.observability_shutdown.read();
586            for ext in observability.iter().rev() {
587                let mut ext_guard = ext.lock();
588                if let Some(ext) = Arc::get_mut(&mut ext_guard) {
589                    if let Err(e) = ext.shutdown() {
590                        errors.push(format!("observability: {}", e));
591                    }
592                }
593            }
594        }
595
596        // Shutdown compression extensions (in reverse order)
597        {
598            let compression = self.compression_shutdown.read();
599            let names: Vec<_> = compression.keys().cloned().collect();
600            for name in names.into_iter().rev() {
601                if let Some(ext) = compression.get(&name) {
602                    let mut ext_guard = ext.lock();
603                    if let Some(ext) = Arc::get_mut(&mut ext_guard) {
604                        if let Err(e) = ext.shutdown() {
605                            errors.push(format!("compression '{}': {}", name, e));
606                        }
607                    }
608                }
609            }
610        }
611
612        if errors.is_empty() {
613            Ok(())
614        } else {
615            Err(KernelError::Plugin {
616                message: format!("shutdown errors: {}", errors.join("; ")),
617            })
618        }
619    }
620
621    /// Get information about all registered extensions
622    pub fn list_extensions(&self) -> Vec<ExtensionInfo> {
623        let mut result = Vec::new();
624
625        for ext in self.storage.read().values() {
626            result.push(ext.info());
627        }
628
629        for ext in self.indices.read().values() {
630            result.push(ext.read().info());
631        }
632
633        for ext in self.observability.read().iter() {
634            result.push(ext.info());
635        }
636
637        for ext in self.compression.read().values() {
638            result.push(ext.info());
639        }
640
641        result
642    }
643}
644
645// ============================================================================
646// Null Observability (Default - No-Op)
647// ============================================================================
648
649/// Null observability extension - does nothing
650///
651/// Used when no observability is configured. Zero overhead.
652pub struct NullObservability;
653
654impl Extension for NullObservability {
655    fn info(&self) -> ExtensionInfo {
656        ExtensionInfo {
657            name: "null-observability".into(),
658            version: "0.0.0".into(),
659            description: "No-op observability (default)".into(),
660            author: "SochDB".into(),
661            capabilities: vec![ExtensionCapability::Observability],
662        }
663    }
664
665    fn as_any(&self) -> &dyn Any {
666        self
667    }
668
669    fn as_any_mut(&mut self) -> &mut dyn Any {
670        self
671    }
672}
673
674impl ObservabilityExtension for NullObservability {
675    fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
676    fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
677    fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
678    fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
679        0
680    }
681    fn span_end(&self, _span_id: u64) {}
682    fn span_event(&self, _span_id: u64, _name: &str, _attributes: &[(&str, &str)]) {}
683}
684
685// ============================================================================
686// Dynamic Plugin Loading (Optional Feature)
687// ============================================================================
688
689#[cfg(feature = "dynamic-plugins")]
690pub mod dynamic {
691    //! Dynamic plugin loading support
692    //!
693    //! Enabled with the `dynamic-plugins` feature.
694    //! Allows loading plugins from shared libraries at runtime.
695    //!
696    //! # Security Warning
697    //!
698    //! Loading a native shared library (`dlopen`) executes arbitrary code
699    //! in the host process — including the library's `_init()` constructors
700    //! which run *before* any symbol lookup.  A malicious `.so/.dylib` can:
701    //!
702    //! - Exfiltrate data from the database process
703    //! - Spawn background threads or open network connections
704    //! - Corrupt kernel memory (no isolation boundary)
705    //!
706    //! **Only load libraries you have built from audited source code.**
707    //! For untrusted extensions, use the WASM plugin sandbox instead.
708
709    use super::*;
710    use libloading::{Library, Symbol};
711    use std::path::Path;
712
713    /// Environment variable that opts in to native (`dlopen`) plugin loading.
714    ///
715    /// Set to `1`/`true`/`yes` to allow a default-constructed
716    /// [`DynamicPluginLoader`] to load native shared libraries. Absent or any
717    /// other value keeps native loading disabled and steers callers to the WASM
718    /// sandbox.
719    pub const ALLOW_NATIVE_ENV: &str = "SOCHDB_ALLOW_NATIVE_PLUGINS";
720
721    fn env_opts_in_to_native() -> bool {
722        std::env::var(ALLOW_NATIVE_ENV)
723            .map(|v| {
724                let v = v.trim().to_ascii_lowercase();
725                v == "1" || v == "true" || v == "yes" || v == "on"
726            })
727            .unwrap_or(false)
728    }
729
730    /// Dynamic plugin loader
731    ///
732    /// Native loading is **untrusted by default**. Even with the
733    /// `dynamic-plugins` feature compiled in, a loader will refuse to `dlopen`
734    /// a shared library unless it was explicitly marked trusted — either by
735    /// constructing it with [`DynamicPluginLoader::new_trusted`] or by setting
736    /// the [`ALLOW_NATIVE_ENV`] environment variable. Untrusted extensions
737    /// should run in the WASM sandbox instead.
738    pub struct DynamicPluginLoader {
739        /// Loaded libraries (kept alive)
740        _libraries: Vec<Library>,
741        /// Whether native `dlopen` loading is permitted for this loader.
742        trusted: bool,
743    }
744
745    impl DynamicPluginLoader {
746        /// Create a new dynamic plugin loader.
747        ///
748        /// Native loading is **disabled** unless the [`ALLOW_NATIVE_ENV`]
749        /// environment variable opts in. Use [`new_trusted`](Self::new_trusted)
750        /// to enable native loading programmatically. This safe-by-default
751        /// stance means an attacker who can drop a `.so` on disk cannot get it
752        /// loaded without an explicit operator decision.
753        pub fn new() -> Self {
754            Self {
755                _libraries: Vec::new(),
756                trusted: env_opts_in_to_native(),
757            }
758        }
759
760        /// Create a loader that is explicitly permitted to load native
761        /// shared libraries.
762        ///
763        /// # Safety / Trust
764        ///
765        /// Calling this is an assertion by the operator that any library passed
766        /// to [`load_observability`](Self::load_observability) comes from
767        /// audited, trusted source code. Native libraries run with **no
768        /// isolation** and can execute arbitrary code in the database process
769        /// (see the module-level security warning). Prefer the WASM sandbox for
770        /// anything that is not fully trusted.
771        pub fn new_trusted() -> Self {
772            Self {
773                _libraries: Vec::new(),
774                trusted: true,
775            }
776        }
777
778        /// Whether this loader is permitted to load native libraries.
779        pub fn is_trusted(&self) -> bool {
780            self.trusted
781        }
782
783        /// Load an observability plugin from a shared library.
784        ///
785        /// # Safety
786        ///
787        /// This function is `unsafe` because loading a native shared library
788        /// can execute arbitrary code.  The caller MUST ensure:
789        ///
790        /// 1. The library at `path` was built from audited, trusted source code.
791        /// 2. The path is an absolute, canonicalized path (no symlink races).
792        /// 3. The file permissions prevent modification by unprivileged users.
793        ///
794        /// The library must export:
795        /// ```c
796        /// extern "C" fn create_observability_plugin() -> *mut dyn ObservabilityExtension
797        /// ```
798        ///
799        /// Returns an error (without performing any `dlopen`) if this loader is
800        /// not trusted — see [`new_trusted`](Self::new_trusted) and
801        /// [`ALLOW_NATIVE_ENV`].
802        pub unsafe fn load_observability(
803            &mut self,
804            path: &Path,
805        ) -> KernelResult<Arc<dyn ObservabilityExtension>> {
806            // Untrusted-by-default gate: refuse native loading unless explicitly
807            // enabled. This check happens BEFORE any filesystem access or
808            // `dlopen`, so a malicious library is never even opened.
809            if !self.trusted {
810                return Err(KernelError::Plugin {
811                    message: format!(
812                        "native plugin loading is disabled (untrusted by default). \
813                         Load untrusted extensions via the WASM sandbox, or opt in \
814                         explicitly with DynamicPluginLoader::new_trusted() or by \
815                         setting {ALLOW_NATIVE_ENV}=1. Refused: {}",
816                        path.display()
817                    ),
818                });
819            }
820
821            // Validate the path is absolute to prevent relative-path hijacking
822            if !path.is_absolute() {
823                return Err(KernelError::Plugin {
824                    message: format!(
825                        "plugin path must be absolute to prevent path hijacking: {}",
826                        path.display()
827                    ),
828                });
829            }
830
831            // Canonicalize to resolve symlinks and detect TOCTOU races
832            let canonical = path.canonicalize().map_err(|e| KernelError::Plugin {
833                message: format!("failed to canonicalize plugin path: {}", e),
834            })?;
835
836            unsafe {
837                let lib = Library::new(&canonical).map_err(|e| KernelError::Plugin {
838                    message: format!("failed to load library: {}", e),
839                })?;
840
841                let create_fn: Symbol<fn() -> Box<dyn ObservabilityExtension>> = lib
842                    .get(b"create_observability_plugin")
843                    .map_err(|e| KernelError::Plugin {
844                        message: format!("symbol not found: {}", e),
845                    })?;
846
847                let plugin = create_fn();
848                self._libraries.push(lib);
849
850                Ok(Arc::from(plugin))
851            }
852        }
853    }
854
855    impl Default for DynamicPluginLoader {
856        fn default() -> Self {
857            Self::new()
858        }
859    }
860
861    #[cfg(test)]
862    mod dynamic_tests {
863        use super::*;
864
865        #[test]
866        fn test_untrusted_loader_refuses_native_load() {
867            // Default loader (no env opt-in) must refuse to load a native lib,
868            // returning an error WITHOUT touching the filesystem.
869            let mut loader = DynamicPluginLoader::new_untrusted_for_test();
870            assert!(!loader.is_trusted());
871            let path = Path::new("/nonexistent/evil.so");
872            let result = unsafe { loader.load_observability(path) };
873            assert!(result.is_err(), "untrusted loader must refuse native load");
874            let msg = format!("{:?}", result.err().unwrap());
875            assert!(
876                msg.contains("untrusted") || msg.contains("disabled"),
877                "error should explain the untrusted-by-default policy, got: {msg}"
878            );
879        }
880
881        #[test]
882        fn test_trusted_loader_passes_gate_then_fails_on_missing_file() {
883            // A trusted loader passes the trust gate; with a missing file it
884            // fails later (canonicalize), proving the gate itself is not what
885            // rejects it.
886            let mut loader = DynamicPluginLoader::new_trusted();
887            assert!(loader.is_trusted());
888            let path = Path::new("/nonexistent/trusted.so");
889            let result = unsafe { loader.load_observability(path) };
890            assert!(result.is_err());
891            let msg = format!("{:?}", result.err().unwrap());
892            assert!(
893                !msg.contains("untrusted"),
894                "trusted loader must fail past the trust gate, got: {msg}"
895            );
896        }
897    }
898
899    impl DynamicPluginLoader {
900        /// Test-only constructor forcing the untrusted state regardless of the
901        /// ambient environment (so the env var cannot make the test flaky).
902        #[cfg(test)]
903        fn new_untrusted_for_test() -> Self {
904            Self {
905                _libraries: Vec::new(),
906                trusted: false,
907            }
908        }
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915
916    #[test]
917    fn test_plugin_manager_creation() {
918        let pm = PluginManager::new();
919        assert!(!pm.has_observability());
920        assert!(pm.storage().is_none());
921    }
922
923    #[test]
924    fn test_null_observability() {
925        let null = NullObservability;
926        // Should not panic
927        null.counter_inc("test", 1, &[]);
928        null.gauge_set("test", 1.0, &[]);
929        null.histogram_observe("test", 1.0, &[]);
930        let span = null.span_start("test", None);
931        null.span_event(span, "event", &[]);
932        null.span_end(span);
933    }
934
935    #[test]
936    fn test_register_observability() {
937        let pm = PluginManager::new();
938        let null = Arc::new(NullObservability);
939
940        assert!(!pm.has_observability());
941        pm.register_observability(null).unwrap();
942        assert!(pm.has_observability());
943    }
944
945    struct TestExtension {
946        shutdown_called: Arc<std::sync::atomic::AtomicBool>,
947    }
948
949    impl TestExtension {
950        fn new_with_flag(flag: Arc<std::sync::atomic::AtomicBool>) -> Self {
951            Self {
952                shutdown_called: flag,
953            }
954        }
955    }
956
957    impl ObservabilityExtension for TestExtension {
958        fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
959        fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
960        fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
961        fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
962            0
963        }
964        fn span_event(&self, _span_id: u64, _name: &str, _labels: &[(&str, &str)]) {}
965        fn span_end(&self, _span_id: u64) {}
966    }
967
968    impl Extension for TestExtension {
969        fn info(&self) -> ExtensionInfo {
970            ExtensionInfo {
971                name: "test-extension".into(),
972                version: "0.0.1".into(),
973                description: "Test extension".into(),
974                author: "test".into(),
975                capabilities: vec![],
976            }
977        }
978
979        fn shutdown(&mut self) -> KernelResult<()> {
980            self.shutdown_called
981                .store(true, std::sync::atomic::Ordering::SeqCst);
982            Ok(())
983        }
984
985        fn as_any(&self) -> &dyn Any {
986            self
987        }
988
989        fn as_any_mut(&mut self) -> &mut dyn Any {
990            self
991        }
992    }
993
994    #[test]
995    fn test_shutdown_all_calls_shutdown() {
996        let pm = PluginManager::new();
997        let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
998
999        {
1000            let ext = TestExtension::new_with_flag(flag.clone());
1001            let arc_ext: Arc<dyn ObservabilityExtension> = Arc::new(ext);
1002
1003            let mut shutdown = pm.observability_shutdown.write();
1004            shutdown.push(Mutex::new(arc_ext));
1005        }
1006
1007        pm.shutdown_all().unwrap();
1008
1009        assert!(
1010            flag.load(std::sync::atomic::Ordering::SeqCst),
1011            "shutdown() should have been called on the extension"
1012        );
1013    }
1014
1015    #[test]
1016    fn test_shutdown_all_with_single_ref() {
1017        use std::sync::atomic::Ordering;
1018
1019        struct SingleRefExtension {
1020            flag: Arc<std::sync::atomic::AtomicBool>,
1021        }
1022
1023        impl SingleRefExtension {
1024            fn new(flag: Arc<std::sync::atomic::AtomicBool>) -> Self {
1025                Self { flag }
1026            }
1027        }
1028
1029        impl ObservabilityExtension for SingleRefExtension {
1030            fn counter_inc(&self, _: &str, _: u64, _: &[(&str, &str)]) {}
1031            fn gauge_set(&self, _: &str, _: f64, _: &[(&str, &str)]) {}
1032            fn histogram_observe(&self, _: &str, _: f64, _: &[(&str, &str)]) {}
1033            fn span_start(&self, _: &str, _: Option<u64>) -> u64 {
1034                0
1035            }
1036            fn span_event(&self, _: u64, _: &str, _: &[(&str, &str)]) {}
1037            fn span_end(&self, _: u64) {}
1038        }
1039
1040        impl Extension for SingleRefExtension {
1041            fn info(&self) -> ExtensionInfo {
1042                ExtensionInfo {
1043                    name: "single-ref".into(),
1044                    version: "0.0.1".into(),
1045                    description: "Test".into(),
1046                    author: "test".into(),
1047                    capabilities: vec![],
1048                }
1049            }
1050            fn shutdown(&mut self) -> KernelResult<()> {
1051                self.flag.store(true, Ordering::SeqCst);
1052                Ok(())
1053            }
1054            fn as_any(&self) -> &dyn Any {
1055                self
1056            }
1057            fn as_any_mut(&mut self) -> &mut dyn Any {
1058                self
1059            }
1060        }
1061
1062        let pm = PluginManager::new();
1063        let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
1064
1065        {
1066            let ext = SingleRefExtension::new(flag.clone());
1067            let boxed: Box<dyn ObservabilityExtension> = Box::new(ext);
1068            let arc_ext: Arc<dyn ObservabilityExtension> = Arc::from(boxed);
1069
1070            let mut shutdown = pm.observability_shutdown.write();
1071            shutdown.push(Mutex::new(arc_ext));
1072        }
1073
1074        pm.shutdown_all().unwrap();
1075
1076        assert!(
1077            flag.load(Ordering::SeqCst),
1078            "shutdown should have been called"
1079        );
1080    }
1081}