sochdb_kernel/plugin.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Plugin Architecture
19//!
20//! SochDB uses a plugin architecture to keep the kernel minimal while
21//! allowing rich functionality through extensions.
22//!
23//! ## Design Philosophy
24//!
25//! 1. **Core is Minimal**: The kernel contains only ACID-critical code
26//! 2. **Extensions Add Features**: Storage backends, indices, observability are plugins
27//! 3. **No Bloat**: Users only pay for what they use
28//! 4. **Vendor Neutral**: No lock-in to specific monitoring/storage systems
29//!
30//! ## Plugin Categories
31//!
32//! - `StorageExtension`: Alternative storage backends (LSCS, RocksDB, etc.)
33//! - `IndexExtension`: Custom index types (vector, learned, etc.)
34//! - `ObservabilityExtension`: Metrics, tracing, logging backends
35//! - `CompressionExtension`: Compression algorithms
36//!
37//! ## Example: Adding Prometheus Metrics
38//!
39//! ```ignore
40//! // In a separate crate: sochdb-prometheus-plugin
41//! struct PrometheusPlugin { /* ... */ }
42//!
43//! impl ObservabilityExtension for PrometheusPlugin {
44//! fn record_metric(&self, name: &str, value: f64, tags: &[(&str, &str)]) {
45//! // Push to Prometheus
46//! }
47//! }
48//!
49//! // Usage:
50//! let db = KernelDB::open(path)?;
51//! db.plugins().register_observability(Box::new(PrometheusPlugin::new()))?;
52//! ```
53
54use crate::error::{KernelError, KernelResult};
55use crate::kernel_api::{HealthInfo, RowId, TableId};
56use crate::transaction::TransactionId;
57use parking_lot::{Mutex, RwLock};
58use std::any::Any;
59use std::collections::HashMap;
60use std::sync::Arc;
61
62// ============================================================================
63// Extension Trait Definitions
64// ============================================================================
65
66/// Information about an extension
67#[derive(Debug, Clone)]
68pub struct ExtensionInfo {
69 /// Unique extension name (e.g., "prometheus-metrics")
70 pub name: String,
71 /// Extension version
72 pub version: String,
73 /// Human-readable description
74 pub description: String,
75 /// Extension author
76 pub author: String,
77 /// Capabilities provided
78 pub capabilities: Vec<ExtensionCapability>,
79}
80
81/// Capabilities an extension can provide
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub enum ExtensionCapability {
84 /// Alternative storage backend
85 Storage,
86 /// Custom index type
87 Index,
88 /// Metrics/tracing/logging
89 Observability,
90 /// Compression algorithm
91 Compression,
92 /// Query optimization
93 QueryOptimizer,
94 /// Authentication/Authorization
95 Auth,
96 /// Custom - for third-party extensions
97 Custom(String),
98}
99
100/// Base trait for all extensions
101pub trait Extension: Send + Sync {
102 /// Get extension information
103 fn info(&self) -> ExtensionInfo;
104
105 /// Initialize the extension
106 fn init(&mut self) -> KernelResult<()> {
107 Ok(())
108 }
109
110 /// Shutdown the extension gracefully
111 fn shutdown(&mut self) -> KernelResult<()> {
112 Ok(())
113 }
114
115 /// Cast to Any for downcasting
116 fn as_any(&self) -> &dyn Any;
117
118 /// Cast to mutable Any for downcasting
119 fn as_any_mut(&mut self) -> &mut dyn Any;
120}
121
122// ============================================================================
123// Storage Extension
124// ============================================================================
125
126/// Storage backend extension
127///
128/// Implement this to provide alternative storage engines (LSCS, RocksDB, etc.)
129pub trait StorageExtension: Extension {
130 /// Read data for a key
131 fn get(&self, table_id: TableId, key: &[u8]) -> KernelResult<Option<Vec<u8>>>;
132
133 /// Write data for a key
134 fn put(
135 &self,
136 table_id: TableId,
137 key: &[u8],
138 value: &[u8],
139 txn_id: TransactionId,
140 ) -> KernelResult<()>;
141
142 /// Delete a key
143 fn delete(&self, table_id: TableId, key: &[u8], txn_id: TransactionId) -> KernelResult<()>;
144
145 /// Scan a range of keys
146 fn scan(
147 &self,
148 table_id: TableId,
149 start: &[u8],
150 end: &[u8],
151 limit: usize,
152 ) -> KernelResult<Vec<(Vec<u8>, Vec<u8>)>>;
153
154 /// Flush pending writes
155 fn flush(&self) -> KernelResult<()>;
156
157 /// Compact storage (if applicable)
158 fn compact(&self) -> KernelResult<()> {
159 Ok(()) // Default: no-op
160 }
161
162 /// Get storage statistics
163 fn stats(&self) -> StorageStats {
164 StorageStats::default()
165 }
166}
167
168/// Storage statistics
169#[derive(Debug, Clone, Default)]
170pub struct StorageStats {
171 /// Total bytes stored
172 pub bytes_stored: u64,
173 /// Number of keys
174 pub key_count: u64,
175 /// Pending compaction bytes
176 pub pending_compaction_bytes: u64,
177 /// Write amplification factor
178 pub write_amplification: f64,
179}
180
181// ============================================================================
182// Index Extension
183// ============================================================================
184
185/// Index extension
186///
187/// Implement this for custom index types (vector, learned, full-text, etc.)
188pub trait IndexExtension: Extension {
189 /// Index type name (e.g., "hnsw", "learned", "btree")
190 fn index_type(&self) -> &str;
191
192 /// Build index on existing data
193 fn build(
194 &mut self,
195 table_id: TableId,
196 column_id: u16,
197 data: &[(RowId, Vec<u8>)],
198 ) -> KernelResult<()>;
199
200 /// Insert a key-value pair into the index
201 fn insert(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
202
203 /// Delete a key from the index
204 fn delete(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
205
206 /// Point lookup
207 fn lookup(&self, key: &[u8]) -> KernelResult<Vec<RowId>>;
208
209 /// Range scan
210 fn range(&self, start: &[u8], end: &[u8], limit: usize) -> KernelResult<Vec<RowId>>;
211
212 /// Nearest neighbor search (for vector indices)
213 fn nearest(&self, _query: &[u8], _k: usize) -> KernelResult<Vec<(RowId, f32)>> {
214 Err(KernelError::Plugin {
215 message: "nearest neighbor not supported by this index type".into(),
216 })
217 }
218
219 /// Get index size in bytes
220 fn size_bytes(&self) -> u64;
221}
222
223// ============================================================================
224// Observability Extension (PLUGIN ARCHITECTURE)
225// ============================================================================
226
227/// Observability extension
228///
229/// Implement this for metrics, tracing, and logging backends.
230///
231/// ## Why Plugin Architecture?
232///
233/// 1. **No Dependency Bloat**: Core doesn't pull in Prometheus, DataDog, etc.
234/// 2. **Vendor Neutral**: Users choose their observability stack
235/// 3. **Flexible**: Can run without any observability in embedded scenarios
236///
237/// ## Available Plugins (separate crates):
238///
239/// - `sochdb-prometheus`: Prometheus metrics
240/// - `sochdb-datadog`: DataDog integration
241/// - `sochdb-opentelemetry`: OpenTelemetry support
242/// - `sochdb-logging-json`: JSON structured logging
243/// - `sochdb-logging-logfmt`: logfmt style logging
244pub trait ObservabilityExtension: Extension {
245 // -------------------------------------------------------------------------
246 // Metrics
247 // -------------------------------------------------------------------------
248
249 /// Record a counter increment
250 fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]);
251
252 /// Record a gauge value
253 fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]);
254
255 /// Record a histogram observation
256 fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]);
257
258 // -------------------------------------------------------------------------
259 // Tracing
260 // -------------------------------------------------------------------------
261
262 /// Start a new span
263 fn span_start(&self, name: &str, parent: Option<u64>) -> u64;
264
265 /// End a span
266 fn span_end(&self, span_id: u64);
267
268 /// Add an event to a span
269 fn span_event(&self, span_id: u64, name: &str, attributes: &[(&str, &str)]);
270
271 // -------------------------------------------------------------------------
272 // Logging
273 // -------------------------------------------------------------------------
274
275 /// Log at debug level
276 fn log_debug(&self, message: &str, fields: &[(&str, &str)]) {
277 let _ = (message, fields); // Default: no-op
278 }
279
280 /// Log at info level
281 fn log_info(&self, message: &str, fields: &[(&str, &str)]) {
282 let _ = (message, fields); // Default: no-op
283 }
284
285 /// Log at warn level
286 fn log_warn(&self, message: &str, fields: &[(&str, &str)]) {
287 let _ = (message, fields); // Default: no-op
288 }
289
290 /// Log at error level
291 fn log_error(&self, message: &str, fields: &[(&str, &str)]) {
292 let _ = (message, fields); // Default: no-op
293 }
294
295 // -------------------------------------------------------------------------
296 // Health Reporting
297 // -------------------------------------------------------------------------
298
299 /// Report health status (called periodically by kernel)
300 fn report_health(&self, health: &HealthInfo) {
301 let _ = health; // Default: no-op
302 }
303}
304
305// ============================================================================
306// Compression Extension
307// ============================================================================
308
309/// Compression algorithm extension
310pub trait CompressionExtension: Extension {
311 /// Algorithm name (e.g., "lz4", "zstd", "snappy")
312 fn algorithm(&self) -> &str;
313
314 /// Compress data
315 fn compress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
316
317 /// Decompress data
318 fn decompress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
319
320 /// Compression level (if applicable)
321 fn set_level(&mut self, _level: i32) -> KernelResult<()> {
322 Ok(())
323 }
324}
325
326// ============================================================================
327// Plugin Manager
328// ============================================================================
329
330/// Plugin manager - registry for all extensions
331///
332/// The kernel uses this to discover and invoke extensions.
333pub struct PluginManager {
334 /// Storage extensions by name
335 storage: RwLock<HashMap<String, Arc<dyn StorageExtension>>>,
336 /// Index extensions by name
337 indices: RwLock<HashMap<String, Arc<RwLock<dyn IndexExtension>>>>,
338 /// Observability extensions (can have multiple)
339 observability: RwLock<Vec<Arc<dyn ObservabilityExtension>>>,
340 /// Compression extensions by algorithm name
341 compression: RwLock<HashMap<String, Arc<dyn CompressionExtension>>>,
342 /// Active storage backend name
343 active_storage: RwLock<Option<String>>,
344 /// Shutdown handles for storage extensions (wraps extension in Mutex for interior mutability)
345 storage_shutdown: RwLock<HashMap<String, Mutex<Arc<dyn StorageExtension>>>>,
346 /// Shutdown handles for index extensions
347 index_shutdown: RwLock<HashMap<String, Arc<Mutex<Arc<RwLock<dyn IndexExtension>>>>>>,
348 /// Shutdown handles for observability
349 observability_shutdown: RwLock<Vec<Mutex<Arc<dyn ObservabilityExtension>>>>,
350 /// Shutdown handles for compression
351 compression_shutdown: RwLock<HashMap<String, Mutex<Arc<dyn CompressionExtension>>>>,
352}
353
354impl Default for PluginManager {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl PluginManager {
361 /// Create a new plugin manager
362 pub fn new() -> Self {
363 Self {
364 storage: RwLock::new(HashMap::new()),
365 indices: RwLock::new(HashMap::new()),
366 observability: RwLock::new(Vec::new()),
367 compression: RwLock::new(HashMap::new()),
368 active_storage: RwLock::new(None),
369 storage_shutdown: RwLock::new(HashMap::new()),
370 index_shutdown: RwLock::new(HashMap::new()),
371 observability_shutdown: RwLock::new(Vec::new()),
372 compression_shutdown: RwLock::new(HashMap::new()),
373 }
374 }
375
376 // -------------------------------------------------------------------------
377 // Storage Extensions
378 // -------------------------------------------------------------------------
379
380 /// Register a storage extension
381 pub fn register_storage(&self, ext: Arc<dyn StorageExtension>) -> KernelResult<()> {
382 let name = ext.info().name.clone();
383 let mut storage = self.storage.write();
384
385 if storage.contains_key(&name) {
386 return Err(KernelError::Plugin {
387 message: format!("storage extension '{}' already registered", name),
388 });
389 }
390
391 // Also add to shutdown storage - need to wrap the Arc<dyn StorageExtension> in Mutex
392 let mut shutdown = self.storage_shutdown.write();
393 shutdown.insert(name.clone(), Mutex::new(Arc::clone(&ext)));
394
395 storage.insert(name.clone(), ext);
396
397 // Set as active if it's the first one
398 let mut active = self.active_storage.write();
399 if active.is_none() {
400 *active = Some(name);
401 }
402
403 Ok(())
404 }
405
406 /// Set the active storage backend
407 pub fn set_active_storage(&self, name: &str) -> KernelResult<()> {
408 let storage = self.storage.read();
409 if !storage.contains_key(name) {
410 return Err(KernelError::Plugin {
411 message: format!("storage extension '{}' not found", name),
412 });
413 }
414 *self.active_storage.write() = Some(name.to_string());
415 Ok(())
416 }
417
418 /// Get the active storage backend
419 pub fn storage(&self) -> Option<Arc<dyn StorageExtension>> {
420 let active = self.active_storage.read();
421 active
422 .as_ref()
423 .and_then(|name| self.storage.read().get(name).cloned())
424 }
425
426 // -------------------------------------------------------------------------
427 // Index Extensions
428 // -------------------------------------------------------------------------
429
430 /// Register an index extension
431 pub fn register_index(&self, ext: Arc<RwLock<dyn IndexExtension>>) -> KernelResult<()> {
432 let name = ext.read().info().name.clone();
433 let mut indices = self.indices.write();
434
435 if indices.contains_key(&name) {
436 return Err(KernelError::Plugin {
437 message: format!("index extension '{}' already registered", name),
438 });
439 }
440
441 indices.insert(name.clone(), ext.clone());
442
443 // Also add to shutdown storage
444 let mut shutdown = self.index_shutdown.write();
445 shutdown.insert(name, Arc::new(Mutex::new(ext)));
446 Ok(())
447 }
448
449 /// Get an index extension by name
450 pub fn index(&self, name: &str) -> Option<Arc<RwLock<dyn IndexExtension>>> {
451 self.indices.read().get(name).cloned()
452 }
453
454 /// List registered index types
455 pub fn list_index_types(&self) -> Vec<String> {
456 self.indices.read().keys().cloned().collect()
457 }
458
459 // -------------------------------------------------------------------------
460 // Observability Extensions
461 // -------------------------------------------------------------------------
462
463 /// Register an observability extension
464 ///
465 /// Multiple observability extensions can be registered (fan-out to all)
466 pub fn register_observability(&self, ext: Arc<dyn ObservabilityExtension>) -> KernelResult<()> {
467 self.observability.write().push(ext.clone());
468
469 // Also add to shutdown storage
470 let mut shutdown = self.observability_shutdown.write();
471 shutdown.push(Mutex::new(Arc::clone(&ext)));
472 Ok(())
473 }
474
475 /// Record a counter across all observability extensions
476 pub fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
477 for ext in self.observability.read().iter() {
478 ext.counter_inc(name, value, labels);
479 }
480 }
481
482 /// Record a gauge across all observability extensions
483 pub fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
484 for ext in self.observability.read().iter() {
485 ext.gauge_set(name, value, labels);
486 }
487 }
488
489 /// Record a histogram observation across all observability extensions
490 pub fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
491 for ext in self.observability.read().iter() {
492 ext.histogram_observe(name, value, labels);
493 }
494 }
495
496 /// Report health to all observability extensions
497 pub fn report_health(&self, health: &HealthInfo) {
498 for ext in self.observability.read().iter() {
499 ext.report_health(health);
500 }
501 }
502
503 /// Check if any observability is configured
504 pub fn has_observability(&self) -> bool {
505 !self.observability.read().is_empty()
506 }
507
508 // -------------------------------------------------------------------------
509 // Compression Extensions
510 // -------------------------------------------------------------------------
511
512 /// Register a compression extension
513 pub fn register_compression(&self, ext: Arc<dyn CompressionExtension>) -> KernelResult<()> {
514 let algo = ext.algorithm().to_string();
515 let mut compression = self.compression.write();
516
517 if compression.contains_key(&algo) {
518 return Err(KernelError::Plugin {
519 message: format!("compression '{}' already registered", algo),
520 });
521 }
522
523 compression.insert(algo.clone(), ext.clone());
524
525 // Also add to shutdown storage
526 let mut shutdown = self.compression_shutdown.write();
527 shutdown.insert(algo.clone(), Mutex::new(Arc::clone(&ext)));
528 Ok(())
529 }
530
531 /// Get a compression extension by algorithm name
532 pub fn compression(&self, algorithm: &str) -> Option<Arc<dyn CompressionExtension>> {
533 self.compression.read().get(algorithm).cloned()
534 }
535
536 /// List available compression algorithms
537 pub fn list_compression(&self) -> Vec<String> {
538 self.compression.read().keys().cloned().collect()
539 }
540
541 // -------------------------------------------------------------------------
542 // Lifecycle
543 // -------------------------------------------------------------------------
544
545 /// Shutdown all extensions gracefully
546 ///
547 /// Calls `shutdown()` on each registered extension in reverse registration order.
548 /// This allows extensions to release resources, close connections, flush buffers, etc.
549 pub fn shutdown_all(&self) -> KernelResult<()> {
550 let mut errors = Vec::new();
551
552 // Shutdown storage extensions (in reverse order)
553 {
554 let storage = self.storage_shutdown.read();
555 let names: Vec<_> = storage.keys().cloned().collect();
556 for name in names.into_iter().rev() {
557 if let Some(ext) = storage.get(&name) {
558 let mut ext_guard = ext.lock();
559 if let Some(ext) = Arc::get_mut(&mut ext_guard) {
560 if let Err(e) = ext.shutdown() {
561 errors.push(format!("storage '{}': {}", name, e));
562 }
563 }
564 }
565 }
566 }
567
568 // Shutdown index extensions (in reverse order)
569 {
570 let indices = self.index_shutdown.read();
571 let names: Vec<_> = indices.keys().cloned().collect();
572 for name in names.into_iter().rev() {
573 if let Some(ext) = indices.get(&name) {
574 let ext_guard = ext.lock();
575 let mut inner = ext_guard.write();
576 if let Err(e) = inner.shutdown() {
577 errors.push(format!("index '{}': {}", name, e));
578 }
579 }
580 }
581 }
582
583 // Shutdown observability extensions (in reverse order)
584 {
585 let observability = self.observability_shutdown.read();
586 for ext in observability.iter().rev() {
587 let mut ext_guard = ext.lock();
588 if let Some(ext) = Arc::get_mut(&mut ext_guard) {
589 if let Err(e) = ext.shutdown() {
590 errors.push(format!("observability: {}", e));
591 }
592 }
593 }
594 }
595
596 // Shutdown compression extensions (in reverse order)
597 {
598 let compression = self.compression_shutdown.read();
599 let names: Vec<_> = compression.keys().cloned().collect();
600 for name in names.into_iter().rev() {
601 if let Some(ext) = compression.get(&name) {
602 let mut ext_guard = ext.lock();
603 if let Some(ext) = Arc::get_mut(&mut ext_guard) {
604 if let Err(e) = ext.shutdown() {
605 errors.push(format!("compression '{}': {}", name, e));
606 }
607 }
608 }
609 }
610 }
611
612 if errors.is_empty() {
613 Ok(())
614 } else {
615 Err(KernelError::Plugin {
616 message: format!("shutdown errors: {}", errors.join("; ")),
617 })
618 }
619 }
620
621 /// Get information about all registered extensions
622 pub fn list_extensions(&self) -> Vec<ExtensionInfo> {
623 let mut result = Vec::new();
624
625 for ext in self.storage.read().values() {
626 result.push(ext.info());
627 }
628
629 for ext in self.indices.read().values() {
630 result.push(ext.read().info());
631 }
632
633 for ext in self.observability.read().iter() {
634 result.push(ext.info());
635 }
636
637 for ext in self.compression.read().values() {
638 result.push(ext.info());
639 }
640
641 result
642 }
643}
644
645// ============================================================================
646// Null Observability (Default - No-Op)
647// ============================================================================
648
649/// Null observability extension - does nothing
650///
651/// Used when no observability is configured. Zero overhead.
652pub struct NullObservability;
653
654impl Extension for NullObservability {
655 fn info(&self) -> ExtensionInfo {
656 ExtensionInfo {
657 name: "null-observability".into(),
658 version: "0.0.0".into(),
659 description: "No-op observability (default)".into(),
660 author: "SochDB".into(),
661 capabilities: vec![ExtensionCapability::Observability],
662 }
663 }
664
665 fn as_any(&self) -> &dyn Any {
666 self
667 }
668
669 fn as_any_mut(&mut self) -> &mut dyn Any {
670 self
671 }
672}
673
674impl ObservabilityExtension for NullObservability {
675 fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
676 fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
677 fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
678 fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
679 0
680 }
681 fn span_end(&self, _span_id: u64) {}
682 fn span_event(&self, _span_id: u64, _name: &str, _attributes: &[(&str, &str)]) {}
683}
684
685// ============================================================================
686// Dynamic Plugin Loading (Optional Feature)
687// ============================================================================
688
689#[cfg(feature = "dynamic-plugins")]
690pub mod dynamic {
691 //! Dynamic plugin loading support
692 //!
693 //! Enabled with the `dynamic-plugins` feature.
694 //! Allows loading plugins from shared libraries at runtime.
695 //!
696 //! # Security Warning
697 //!
698 //! Loading a native shared library (`dlopen`) executes arbitrary code
699 //! in the host process — including the library's `_init()` constructors
700 //! which run *before* any symbol lookup. A malicious `.so/.dylib` can:
701 //!
702 //! - Exfiltrate data from the database process
703 //! - Spawn background threads or open network connections
704 //! - Corrupt kernel memory (no isolation boundary)
705 //!
706 //! **Only load libraries you have built from audited source code.**
707 //! For untrusted extensions, use the WASM plugin sandbox instead.
708
709 use super::*;
710 use libloading::{Library, Symbol};
711 use std::path::Path;
712
713 /// Environment variable that opts in to native (`dlopen`) plugin loading.
714 ///
715 /// Set to `1`/`true`/`yes` to allow a default-constructed
716 /// [`DynamicPluginLoader`] to load native shared libraries. Absent or any
717 /// other value keeps native loading disabled and steers callers to the WASM
718 /// sandbox.
719 pub const ALLOW_NATIVE_ENV: &str = "SOCHDB_ALLOW_NATIVE_PLUGINS";
720
721 fn env_opts_in_to_native() -> bool {
722 std::env::var(ALLOW_NATIVE_ENV)
723 .map(|v| {
724 let v = v.trim().to_ascii_lowercase();
725 v == "1" || v == "true" || v == "yes" || v == "on"
726 })
727 .unwrap_or(false)
728 }
729
730 /// Dynamic plugin loader
731 ///
732 /// Native loading is **untrusted by default**. Even with the
733 /// `dynamic-plugins` feature compiled in, a loader will refuse to `dlopen`
734 /// a shared library unless it was explicitly marked trusted — either by
735 /// constructing it with [`DynamicPluginLoader::new_trusted`] or by setting
736 /// the [`ALLOW_NATIVE_ENV`] environment variable. Untrusted extensions
737 /// should run in the WASM sandbox instead.
738 pub struct DynamicPluginLoader {
739 /// Loaded libraries (kept alive)
740 _libraries: Vec<Library>,
741 /// Whether native `dlopen` loading is permitted for this loader.
742 trusted: bool,
743 }
744
745 impl DynamicPluginLoader {
746 /// Create a new dynamic plugin loader.
747 ///
748 /// Native loading is **disabled** unless the [`ALLOW_NATIVE_ENV`]
749 /// environment variable opts in. Use [`new_trusted`](Self::new_trusted)
750 /// to enable native loading programmatically. This safe-by-default
751 /// stance means an attacker who can drop a `.so` on disk cannot get it
752 /// loaded without an explicit operator decision.
753 pub fn new() -> Self {
754 Self {
755 _libraries: Vec::new(),
756 trusted: env_opts_in_to_native(),
757 }
758 }
759
760 /// Create a loader that is explicitly permitted to load native
761 /// shared libraries.
762 ///
763 /// # Safety / Trust
764 ///
765 /// Calling this is an assertion by the operator that any library passed
766 /// to [`load_observability`](Self::load_observability) comes from
767 /// audited, trusted source code. Native libraries run with **no
768 /// isolation** and can execute arbitrary code in the database process
769 /// (see the module-level security warning). Prefer the WASM sandbox for
770 /// anything that is not fully trusted.
771 pub fn new_trusted() -> Self {
772 Self {
773 _libraries: Vec::new(),
774 trusted: true,
775 }
776 }
777
778 /// Whether this loader is permitted to load native libraries.
779 pub fn is_trusted(&self) -> bool {
780 self.trusted
781 }
782
783 /// Load an observability plugin from a shared library.
784 ///
785 /// # Safety
786 ///
787 /// This function is `unsafe` because loading a native shared library
788 /// can execute arbitrary code. The caller MUST ensure:
789 ///
790 /// 1. The library at `path` was built from audited, trusted source code.
791 /// 2. The path is an absolute, canonicalized path (no symlink races).
792 /// 3. The file permissions prevent modification by unprivileged users.
793 ///
794 /// The library must export:
795 /// ```c
796 /// extern "C" fn create_observability_plugin() -> *mut dyn ObservabilityExtension
797 /// ```
798 ///
799 /// Returns an error (without performing any `dlopen`) if this loader is
800 /// not trusted — see [`new_trusted`](Self::new_trusted) and
801 /// [`ALLOW_NATIVE_ENV`].
802 pub unsafe fn load_observability(
803 &mut self,
804 path: &Path,
805 ) -> KernelResult<Arc<dyn ObservabilityExtension>> {
806 // Untrusted-by-default gate: refuse native loading unless explicitly
807 // enabled. This check happens BEFORE any filesystem access or
808 // `dlopen`, so a malicious library is never even opened.
809 if !self.trusted {
810 return Err(KernelError::Plugin {
811 message: format!(
812 "native plugin loading is disabled (untrusted by default). \
813 Load untrusted extensions via the WASM sandbox, or opt in \
814 explicitly with DynamicPluginLoader::new_trusted() or by \
815 setting {ALLOW_NATIVE_ENV}=1. Refused: {}",
816 path.display()
817 ),
818 });
819 }
820
821 // Validate the path is absolute to prevent relative-path hijacking
822 if !path.is_absolute() {
823 return Err(KernelError::Plugin {
824 message: format!(
825 "plugin path must be absolute to prevent path hijacking: {}",
826 path.display()
827 ),
828 });
829 }
830
831 // Canonicalize to resolve symlinks and detect TOCTOU races
832 let canonical = path.canonicalize().map_err(|e| KernelError::Plugin {
833 message: format!("failed to canonicalize plugin path: {}", e),
834 })?;
835
836 unsafe {
837 let lib = Library::new(&canonical).map_err(|e| KernelError::Plugin {
838 message: format!("failed to load library: {}", e),
839 })?;
840
841 let create_fn: Symbol<fn() -> Box<dyn ObservabilityExtension>> = lib
842 .get(b"create_observability_plugin")
843 .map_err(|e| KernelError::Plugin {
844 message: format!("symbol not found: {}", e),
845 })?;
846
847 let plugin = create_fn();
848 self._libraries.push(lib);
849
850 Ok(Arc::from(plugin))
851 }
852 }
853 }
854
855 impl Default for DynamicPluginLoader {
856 fn default() -> Self {
857 Self::new()
858 }
859 }
860
861 #[cfg(test)]
862 mod dynamic_tests {
863 use super::*;
864
865 #[test]
866 fn test_untrusted_loader_refuses_native_load() {
867 // Default loader (no env opt-in) must refuse to load a native lib,
868 // returning an error WITHOUT touching the filesystem.
869 let mut loader = DynamicPluginLoader::new_untrusted_for_test();
870 assert!(!loader.is_trusted());
871 let path = Path::new("/nonexistent/evil.so");
872 let result = unsafe { loader.load_observability(path) };
873 assert!(result.is_err(), "untrusted loader must refuse native load");
874 let msg = format!("{:?}", result.err().unwrap());
875 assert!(
876 msg.contains("untrusted") || msg.contains("disabled"),
877 "error should explain the untrusted-by-default policy, got: {msg}"
878 );
879 }
880
881 #[test]
882 fn test_trusted_loader_passes_gate_then_fails_on_missing_file() {
883 // A trusted loader passes the trust gate; with a missing file it
884 // fails later (canonicalize), proving the gate itself is not what
885 // rejects it.
886 let mut loader = DynamicPluginLoader::new_trusted();
887 assert!(loader.is_trusted());
888 let path = Path::new("/nonexistent/trusted.so");
889 let result = unsafe { loader.load_observability(path) };
890 assert!(result.is_err());
891 let msg = format!("{:?}", result.err().unwrap());
892 assert!(
893 !msg.contains("untrusted"),
894 "trusted loader must fail past the trust gate, got: {msg}"
895 );
896 }
897 }
898
899 impl DynamicPluginLoader {
900 /// Test-only constructor forcing the untrusted state regardless of the
901 /// ambient environment (so the env var cannot make the test flaky).
902 #[cfg(test)]
903 fn new_untrusted_for_test() -> Self {
904 Self {
905 _libraries: Vec::new(),
906 trusted: false,
907 }
908 }
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915
916 #[test]
917 fn test_plugin_manager_creation() {
918 let pm = PluginManager::new();
919 assert!(!pm.has_observability());
920 assert!(pm.storage().is_none());
921 }
922
923 #[test]
924 fn test_null_observability() {
925 let null = NullObservability;
926 // Should not panic
927 null.counter_inc("test", 1, &[]);
928 null.gauge_set("test", 1.0, &[]);
929 null.histogram_observe("test", 1.0, &[]);
930 let span = null.span_start("test", None);
931 null.span_event(span, "event", &[]);
932 null.span_end(span);
933 }
934
935 #[test]
936 fn test_register_observability() {
937 let pm = PluginManager::new();
938 let null = Arc::new(NullObservability);
939
940 assert!(!pm.has_observability());
941 pm.register_observability(null).unwrap();
942 assert!(pm.has_observability());
943 }
944
945 struct TestExtension {
946 shutdown_called: Arc<std::sync::atomic::AtomicBool>,
947 }
948
949 impl TestExtension {
950 fn new_with_flag(flag: Arc<std::sync::atomic::AtomicBool>) -> Self {
951 Self {
952 shutdown_called: flag,
953 }
954 }
955 }
956
957 impl ObservabilityExtension for TestExtension {
958 fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
959 fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
960 fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
961 fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
962 0
963 }
964 fn span_event(&self, _span_id: u64, _name: &str, _labels: &[(&str, &str)]) {}
965 fn span_end(&self, _span_id: u64) {}
966 }
967
968 impl Extension for TestExtension {
969 fn info(&self) -> ExtensionInfo {
970 ExtensionInfo {
971 name: "test-extension".into(),
972 version: "0.0.1".into(),
973 description: "Test extension".into(),
974 author: "test".into(),
975 capabilities: vec![],
976 }
977 }
978
979 fn shutdown(&mut self) -> KernelResult<()> {
980 self.shutdown_called
981 .store(true, std::sync::atomic::Ordering::SeqCst);
982 Ok(())
983 }
984
985 fn as_any(&self) -> &dyn Any {
986 self
987 }
988
989 fn as_any_mut(&mut self) -> &mut dyn Any {
990 self
991 }
992 }
993
994 #[test]
995 fn test_shutdown_all_calls_shutdown() {
996 let pm = PluginManager::new();
997 let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
998
999 {
1000 let ext = TestExtension::new_with_flag(flag.clone());
1001 let arc_ext: Arc<dyn ObservabilityExtension> = Arc::new(ext);
1002
1003 let mut shutdown = pm.observability_shutdown.write();
1004 shutdown.push(Mutex::new(arc_ext));
1005 }
1006
1007 pm.shutdown_all().unwrap();
1008
1009 assert!(
1010 flag.load(std::sync::atomic::Ordering::SeqCst),
1011 "shutdown() should have been called on the extension"
1012 );
1013 }
1014
1015 #[test]
1016 fn test_shutdown_all_with_single_ref() {
1017 use std::sync::atomic::Ordering;
1018
1019 struct SingleRefExtension {
1020 flag: Arc<std::sync::atomic::AtomicBool>,
1021 }
1022
1023 impl SingleRefExtension {
1024 fn new(flag: Arc<std::sync::atomic::AtomicBool>) -> Self {
1025 Self { flag }
1026 }
1027 }
1028
1029 impl ObservabilityExtension for SingleRefExtension {
1030 fn counter_inc(&self, _: &str, _: u64, _: &[(&str, &str)]) {}
1031 fn gauge_set(&self, _: &str, _: f64, _: &[(&str, &str)]) {}
1032 fn histogram_observe(&self, _: &str, _: f64, _: &[(&str, &str)]) {}
1033 fn span_start(&self, _: &str, _: Option<u64>) -> u64 {
1034 0
1035 }
1036 fn span_event(&self, _: u64, _: &str, _: &[(&str, &str)]) {}
1037 fn span_end(&self, _: u64) {}
1038 }
1039
1040 impl Extension for SingleRefExtension {
1041 fn info(&self) -> ExtensionInfo {
1042 ExtensionInfo {
1043 name: "single-ref".into(),
1044 version: "0.0.1".into(),
1045 description: "Test".into(),
1046 author: "test".into(),
1047 capabilities: vec![],
1048 }
1049 }
1050 fn shutdown(&mut self) -> KernelResult<()> {
1051 self.flag.store(true, Ordering::SeqCst);
1052 Ok(())
1053 }
1054 fn as_any(&self) -> &dyn Any {
1055 self
1056 }
1057 fn as_any_mut(&mut self) -> &mut dyn Any {
1058 self
1059 }
1060 }
1061
1062 let pm = PluginManager::new();
1063 let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
1064
1065 {
1066 let ext = SingleRefExtension::new(flag.clone());
1067 let boxed: Box<dyn ObservabilityExtension> = Box::new(ext);
1068 let arc_ext: Arc<dyn ObservabilityExtension> = Arc::from(boxed);
1069
1070 let mut shutdown = pm.observability_shutdown.write();
1071 shutdown.push(Mutex::new(arc_ext));
1072 }
1073
1074 pm.shutdown_all().unwrap();
1075
1076 assert!(
1077 flag.load(Ordering::SeqCst),
1078 "shutdown should have been called"
1079 );
1080 }
1081}