Skip to main content

sochdb_kernel/
plugin.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Plugin Architecture
19//!
20//! SochDB uses a plugin architecture to keep the kernel minimal while
21//! allowing rich functionality through extensions.
22//!
23//! ## Design Philosophy
24//!
25//! 1. **Core is Minimal**: The kernel contains only ACID-critical code
26//! 2. **Extensions Add Features**: Storage backends, indices, observability are plugins
27//! 3. **No Bloat**: Users only pay for what they use
28//! 4. **Vendor Neutral**: No lock-in to specific monitoring/storage systems
29//!
30//! ## Plugin Categories
31//!
32//! - `StorageExtension`: Alternative storage backends (LSCS, RocksDB, etc.)
33//! - `IndexExtension`: Custom index types (vector, learned, etc.)
34//! - `ObservabilityExtension`: Metrics, tracing, logging backends
35//! - `CompressionExtension`: Compression algorithms
36//!
37//! ## Example: Adding Prometheus Metrics
38//!
39//! ```ignore
40//! // In a separate crate: sochdb-prometheus-plugin
41//! struct PrometheusPlugin { /* ... */ }
42//!
43//! impl ObservabilityExtension for PrometheusPlugin {
44//!     fn record_metric(&self, name: &str, value: f64, tags: &[(&str, &str)]) {
45//!         // Push to Prometheus
46//!     }
47//! }
48//!
49//! // Usage:
50//! let db = KernelDB::open(path)?;
51//! db.plugins().register_observability(Box::new(PrometheusPlugin::new()))?;
52//! ```
53
54use crate::error::{KernelError, KernelResult};
55use crate::kernel_api::{HealthInfo, RowId, TableId};
56use crate::transaction::TransactionId;
57use parking_lot::RwLock;
58use std::any::Any;
59use std::collections::HashMap;
60use std::sync::Arc;
61
62// ============================================================================
63// Extension Trait Definitions
64// ============================================================================
65
66/// Information about an extension
67#[derive(Debug, Clone)]
68pub struct ExtensionInfo {
69    /// Unique extension name (e.g., "prometheus-metrics")
70    pub name: String,
71    /// Extension version
72    pub version: String,
73    /// Human-readable description
74    pub description: String,
75    /// Extension author
76    pub author: String,
77    /// Capabilities provided
78    pub capabilities: Vec<ExtensionCapability>,
79}
80
81/// Capabilities an extension can provide
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub enum ExtensionCapability {
84    /// Alternative storage backend
85    Storage,
86    /// Custom index type
87    Index,
88    /// Metrics/tracing/logging
89    Observability,
90    /// Compression algorithm
91    Compression,
92    /// Query optimization
93    QueryOptimizer,
94    /// Authentication/Authorization
95    Auth,
96    /// Custom - for third-party extensions
97    Custom(String),
98}
99
100/// Base trait for all extensions
101pub trait Extension: Send + Sync {
102    /// Get extension information
103    fn info(&self) -> ExtensionInfo;
104
105    /// Initialize the extension
106    fn init(&mut self) -> KernelResult<()> {
107        Ok(())
108    }
109
110    /// Shutdown the extension gracefully
111    fn shutdown(&mut self) -> KernelResult<()> {
112        Ok(())
113    }
114
115    /// Cast to Any for downcasting
116    fn as_any(&self) -> &dyn Any;
117
118    /// Cast to mutable Any for downcasting
119    fn as_any_mut(&mut self) -> &mut dyn Any;
120}
121
122// ============================================================================
123// Storage Extension
124// ============================================================================
125
126/// Storage backend extension
127///
128/// Implement this to provide alternative storage engines (LSCS, RocksDB, etc.)
129pub trait StorageExtension: Extension {
130    /// Read data for a key
131    fn get(&self, table_id: TableId, key: &[u8]) -> KernelResult<Option<Vec<u8>>>;
132
133    /// Write data for a key
134    fn put(
135        &self,
136        table_id: TableId,
137        key: &[u8],
138        value: &[u8],
139        txn_id: TransactionId,
140    ) -> KernelResult<()>;
141
142    /// Delete a key
143    fn delete(&self, table_id: TableId, key: &[u8], txn_id: TransactionId) -> KernelResult<()>;
144
145    /// Scan a range of keys
146    fn scan(
147        &self,
148        table_id: TableId,
149        start: &[u8],
150        end: &[u8],
151        limit: usize,
152    ) -> KernelResult<Vec<(Vec<u8>, Vec<u8>)>>;
153
154    /// Flush pending writes
155    fn flush(&self) -> KernelResult<()>;
156
157    /// Compact storage (if applicable)
158    fn compact(&self) -> KernelResult<()> {
159        Ok(()) // Default: no-op
160    }
161
162    /// Get storage statistics
163    fn stats(&self) -> StorageStats {
164        StorageStats::default()
165    }
166}
167
168/// Storage statistics
169#[derive(Debug, Clone, Default)]
170pub struct StorageStats {
171    /// Total bytes stored
172    pub bytes_stored: u64,
173    /// Number of keys
174    pub key_count: u64,
175    /// Pending compaction bytes
176    pub pending_compaction_bytes: u64,
177    /// Write amplification factor
178    pub write_amplification: f64,
179}
180
181// ============================================================================
182// Index Extension
183// ============================================================================
184
185/// Index extension
186///
187/// Implement this for custom index types (vector, learned, full-text, etc.)
188pub trait IndexExtension: Extension {
189    /// Index type name (e.g., "hnsw", "learned", "btree")
190    fn index_type(&self) -> &str;
191
192    /// Build index on existing data
193    fn build(
194        &mut self,
195        table_id: TableId,
196        column_id: u16,
197        data: &[(RowId, Vec<u8>)],
198    ) -> KernelResult<()>;
199
200    /// Insert a key-value pair into the index
201    fn insert(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
202
203    /// Delete a key from the index
204    fn delete(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
205
206    /// Point lookup
207    fn lookup(&self, key: &[u8]) -> KernelResult<Vec<RowId>>;
208
209    /// Range scan
210    fn range(&self, start: &[u8], end: &[u8], limit: usize) -> KernelResult<Vec<RowId>>;
211
212    /// Nearest neighbor search (for vector indices)
213    fn nearest(&self, _query: &[u8], _k: usize) -> KernelResult<Vec<(RowId, f32)>> {
214        Err(KernelError::Plugin {
215            message: "nearest neighbor not supported by this index type".into(),
216        })
217    }
218
219    /// Get index size in bytes
220    fn size_bytes(&self) -> u64;
221}
222
223// ============================================================================
224// Observability Extension (PLUGIN ARCHITECTURE)
225// ============================================================================
226
227/// Observability extension
228///
229/// Implement this for metrics, tracing, and logging backends.
230///
231/// ## Why Plugin Architecture?
232///
233/// 1. **No Dependency Bloat**: Core doesn't pull in Prometheus, DataDog, etc.
234/// 2. **Vendor Neutral**: Users choose their observability stack
235/// 3. **Flexible**: Can run without any observability in embedded scenarios
236///
237/// ## Available Plugins (separate crates):
238///
239/// - `sochdb-prometheus`: Prometheus metrics
240/// - `sochdb-datadog`: DataDog integration  
241/// - `sochdb-opentelemetry`: OpenTelemetry support
242/// - `sochdb-logging-json`: JSON structured logging
243/// - `sochdb-logging-logfmt`: logfmt style logging
244pub trait ObservabilityExtension: Extension {
245    // -------------------------------------------------------------------------
246    // Metrics
247    // -------------------------------------------------------------------------
248
249    /// Record a counter increment
250    fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]);
251
252    /// Record a gauge value
253    fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]);
254
255    /// Record a histogram observation
256    fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]);
257
258    // -------------------------------------------------------------------------
259    // Tracing
260    // -------------------------------------------------------------------------
261
262    /// Start a new span
263    fn span_start(&self, name: &str, parent: Option<u64>) -> u64;
264
265    /// End a span
266    fn span_end(&self, span_id: u64);
267
268    /// Add an event to a span
269    fn span_event(&self, span_id: u64, name: &str, attributes: &[(&str, &str)]);
270
271    // -------------------------------------------------------------------------
272    // Logging
273    // -------------------------------------------------------------------------
274
275    /// Log at debug level
276    fn log_debug(&self, message: &str, fields: &[(&str, &str)]) {
277        let _ = (message, fields); // Default: no-op
278    }
279
280    /// Log at info level
281    fn log_info(&self, message: &str, fields: &[(&str, &str)]) {
282        let _ = (message, fields); // Default: no-op
283    }
284
285    /// Log at warn level
286    fn log_warn(&self, message: &str, fields: &[(&str, &str)]) {
287        let _ = (message, fields); // Default: no-op
288    }
289
290    /// Log at error level
291    fn log_error(&self, message: &str, fields: &[(&str, &str)]) {
292        let _ = (message, fields); // Default: no-op
293    }
294
295    // -------------------------------------------------------------------------
296    // Health Reporting
297    // -------------------------------------------------------------------------
298
299    /// Report health status (called periodically by kernel)
300    fn report_health(&self, health: &HealthInfo) {
301        let _ = health; // Default: no-op
302    }
303}
304
305// ============================================================================
306// Compression Extension
307// ============================================================================
308
309/// Compression algorithm extension
310pub trait CompressionExtension: Extension {
311    /// Algorithm name (e.g., "lz4", "zstd", "snappy")
312    fn algorithm(&self) -> &str;
313
314    /// Compress data
315    fn compress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
316
317    /// Decompress data
318    fn decompress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
319
320    /// Compression level (if applicable)
321    fn set_level(&mut self, _level: i32) -> KernelResult<()> {
322        Ok(())
323    }
324}
325
326// ============================================================================
327// Plugin Manager
328// ============================================================================
329
330/// Plugin manager - registry for all extensions
331///
332/// The kernel uses this to discover and invoke extensions.
333pub struct PluginManager {
334    /// Storage extensions by name
335    storage: RwLock<HashMap<String, Arc<dyn StorageExtension>>>,
336    /// Index extensions by name
337    indices: RwLock<HashMap<String, Arc<RwLock<dyn IndexExtension>>>>,
338    /// Observability extensions (can have multiple)
339    observability: RwLock<Vec<Arc<dyn ObservabilityExtension>>>,
340    /// Compression extensions by algorithm name
341    compression: RwLock<HashMap<String, Arc<dyn CompressionExtension>>>,
342    /// Active storage backend name
343    active_storage: RwLock<Option<String>>,
344}
345
346impl Default for PluginManager {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352impl PluginManager {
353    /// Create a new plugin manager
354    pub fn new() -> Self {
355        Self {
356            storage: RwLock::new(HashMap::new()),
357            indices: RwLock::new(HashMap::new()),
358            observability: RwLock::new(Vec::new()),
359            compression: RwLock::new(HashMap::new()),
360            active_storage: RwLock::new(None),
361        }
362    }
363
364    // -------------------------------------------------------------------------
365    // Storage Extensions
366    // -------------------------------------------------------------------------
367
368    /// Register a storage extension
369    pub fn register_storage(&self, ext: Arc<dyn StorageExtension>) -> KernelResult<()> {
370        let name = ext.info().name.clone();
371        let mut storage = self.storage.write();
372
373        if storage.contains_key(&name) {
374            return Err(KernelError::Plugin {
375                message: format!("storage extension '{}' already registered", name),
376            });
377        }
378
379        storage.insert(name.clone(), ext);
380
381        // Set as active if it's the first one
382        let mut active = self.active_storage.write();
383        if active.is_none() {
384            *active = Some(name);
385        }
386
387        Ok(())
388    }
389
390    /// Set the active storage backend
391    pub fn set_active_storage(&self, name: &str) -> KernelResult<()> {
392        let storage = self.storage.read();
393        if !storage.contains_key(name) {
394            return Err(KernelError::Plugin {
395                message: format!("storage extension '{}' not found", name),
396            });
397        }
398        *self.active_storage.write() = Some(name.to_string());
399        Ok(())
400    }
401
402    /// Get the active storage backend
403    pub fn storage(&self) -> Option<Arc<dyn StorageExtension>> {
404        let active = self.active_storage.read();
405        active
406            .as_ref()
407            .and_then(|name| self.storage.read().get(name).cloned())
408    }
409
410    // -------------------------------------------------------------------------
411    // Index Extensions
412    // -------------------------------------------------------------------------
413
414    /// Register an index extension
415    pub fn register_index(&self, ext: Arc<RwLock<dyn IndexExtension>>) -> KernelResult<()> {
416        let name = ext.read().info().name.clone();
417        let mut indices = self.indices.write();
418
419        if indices.contains_key(&name) {
420            return Err(KernelError::Plugin {
421                message: format!("index extension '{}' already registered", name),
422            });
423        }
424
425        indices.insert(name, ext);
426        Ok(())
427    }
428
429    /// Get an index extension by name
430    pub fn index(&self, name: &str) -> Option<Arc<RwLock<dyn IndexExtension>>> {
431        self.indices.read().get(name).cloned()
432    }
433
434    /// List registered index types
435    pub fn list_index_types(&self) -> Vec<String> {
436        self.indices.read().keys().cloned().collect()
437    }
438
439    // -------------------------------------------------------------------------
440    // Observability Extensions
441    // -------------------------------------------------------------------------
442
443    /// Register an observability extension
444    ///
445    /// Multiple observability extensions can be registered (fan-out to all)
446    pub fn register_observability(&self, ext: Arc<dyn ObservabilityExtension>) -> KernelResult<()> {
447        self.observability.write().push(ext);
448        Ok(())
449    }
450
451    /// Record a counter across all observability extensions
452    pub fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
453        for ext in self.observability.read().iter() {
454            ext.counter_inc(name, value, labels);
455        }
456    }
457
458    /// Record a gauge across all observability extensions
459    pub fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
460        for ext in self.observability.read().iter() {
461            ext.gauge_set(name, value, labels);
462        }
463    }
464
465    /// Record a histogram observation across all observability extensions
466    pub fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
467        for ext in self.observability.read().iter() {
468            ext.histogram_observe(name, value, labels);
469        }
470    }
471
472    /// Report health to all observability extensions
473    pub fn report_health(&self, health: &HealthInfo) {
474        for ext in self.observability.read().iter() {
475            ext.report_health(health);
476        }
477    }
478
479    /// Check if any observability is configured
480    pub fn has_observability(&self) -> bool {
481        !self.observability.read().is_empty()
482    }
483
484    // -------------------------------------------------------------------------
485    // Compression Extensions
486    // -------------------------------------------------------------------------
487
488    /// Register a compression extension
489    pub fn register_compression(&self, ext: Arc<dyn CompressionExtension>) -> KernelResult<()> {
490        let algo = ext.algorithm().to_string();
491        let mut compression = self.compression.write();
492
493        if compression.contains_key(&algo) {
494            return Err(KernelError::Plugin {
495                message: format!("compression '{}' already registered", algo),
496            });
497        }
498
499        compression.insert(algo, ext);
500        Ok(())
501    }
502
503    /// Get a compression extension by algorithm name
504    pub fn compression(&self, algorithm: &str) -> Option<Arc<dyn CompressionExtension>> {
505        self.compression.read().get(algorithm).cloned()
506    }
507
508    /// List available compression algorithms
509    pub fn list_compression(&self) -> Vec<String> {
510        self.compression.read().keys().cloned().collect()
511    }
512
513    // -------------------------------------------------------------------------
514    // Lifecycle
515    // -------------------------------------------------------------------------
516
517    /// Shutdown all extensions gracefully
518    pub fn shutdown_all(&self) -> KernelResult<()> {
519        // Extensions are immutable through Arc, so we can't call shutdown
520        // In a real implementation, we'd use Arc<RwLock<dyn Extension>>
521        // For now, this is a no-op placeholder
522        Ok(())
523    }
524
525    /// Get information about all registered extensions
526    pub fn list_extensions(&self) -> Vec<ExtensionInfo> {
527        let mut result = Vec::new();
528
529        for ext in self.storage.read().values() {
530            result.push(ext.info());
531        }
532
533        for ext in self.indices.read().values() {
534            result.push(ext.read().info());
535        }
536
537        for ext in self.observability.read().iter() {
538            result.push(ext.info());
539        }
540
541        for ext in self.compression.read().values() {
542            result.push(ext.info());
543        }
544
545        result
546    }
547}
548
549// ============================================================================
550// Null Observability (Default - No-Op)
551// ============================================================================
552
553/// Null observability extension - does nothing
554///
555/// Used when no observability is configured. Zero overhead.
556pub struct NullObservability;
557
558impl Extension for NullObservability {
559    fn info(&self) -> ExtensionInfo {
560        ExtensionInfo {
561            name: "null-observability".into(),
562            version: "0.0.0".into(),
563            description: "No-op observability (default)".into(),
564            author: "SochDB".into(),
565            capabilities: vec![ExtensionCapability::Observability],
566        }
567    }
568
569    fn as_any(&self) -> &dyn Any {
570        self
571    }
572
573    fn as_any_mut(&mut self) -> &mut dyn Any {
574        self
575    }
576}
577
578impl ObservabilityExtension for NullObservability {
579    fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
580    fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
581    fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
582    fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
583        0
584    }
585    fn span_end(&self, _span_id: u64) {}
586    fn span_event(&self, _span_id: u64, _name: &str, _attributes: &[(&str, &str)]) {}
587}
588
589// ============================================================================
590// Dynamic Plugin Loading (Optional Feature)
591// ============================================================================
592
593#[cfg(feature = "dynamic-plugins")]
594pub mod dynamic {
595    //! Dynamic plugin loading support
596    //!
597    //! Enabled with the `dynamic-plugins` feature.
598    //! Allows loading plugins from shared libraries at runtime.
599
600    use super::*;
601    use libloading::{Library, Symbol};
602    use std::path::Path;
603
604    /// Dynamic plugin loader
605    pub struct DynamicPluginLoader {
606        /// Loaded libraries (kept alive)
607        _libraries: Vec<Library>,
608    }
609
610    impl DynamicPluginLoader {
611        /// Create a new dynamic plugin loader
612        pub fn new() -> Self {
613            Self {
614                _libraries: Vec::new(),
615            }
616        }
617
618        /// Load an observability plugin from a shared library
619        ///
620        /// The library must export a function:
621        /// ```c
622        /// extern "C" fn create_observability_plugin() -> *mut dyn ObservabilityExtension
623        /// ```
624        pub fn load_observability(
625            &mut self,
626            path: &Path,
627        ) -> KernelResult<Arc<dyn ObservabilityExtension>> {
628            unsafe {
629                let lib = Library::new(path).map_err(|e| KernelError::Plugin {
630                    message: format!("failed to load library: {}", e),
631                })?;
632
633                let create_fn: Symbol<fn() -> Box<dyn ObservabilityExtension>> = lib
634                    .get(b"create_observability_plugin")
635                    .map_err(|e| KernelError::Plugin {
636                        message: format!("symbol not found: {}", e),
637                    })?;
638
639                let plugin = create_fn();
640                self._libraries.push(lib);
641
642                Ok(Arc::from(plugin))
643            }
644        }
645    }
646
647    impl Default for DynamicPluginLoader {
648        fn default() -> Self {
649            Self::new()
650        }
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657
658    #[test]
659    fn test_plugin_manager_creation() {
660        let pm = PluginManager::new();
661        assert!(!pm.has_observability());
662        assert!(pm.storage().is_none());
663    }
664
665    #[test]
666    fn test_null_observability() {
667        let null = NullObservability;
668        // Should not panic
669        null.counter_inc("test", 1, &[]);
670        null.gauge_set("test", 1.0, &[]);
671        null.histogram_observe("test", 1.0, &[]);
672        let span = null.span_start("test", None);
673        null.span_event(span, "event", &[]);
674        null.span_end(span);
675    }
676
677    #[test]
678    fn test_register_observability() {
679        let pm = PluginManager::new();
680        let null = Arc::new(NullObservability);
681
682        assert!(!pm.has_observability());
683        pm.register_observability(null).unwrap();
684        assert!(pm.has_observability());
685    }
686}