sochdb_kernel/plugin.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Plugin Architecture
19//!
20//! SochDB uses a plugin architecture to keep the kernel minimal while
21//! allowing rich functionality through extensions.
22//!
23//! ## Design Philosophy
24//!
25//! 1. **Core is Minimal**: The kernel contains only ACID-critical code
26//! 2. **Extensions Add Features**: Storage backends, indices, observability are plugins
27//! 3. **No Bloat**: Users only pay for what they use
28//! 4. **Vendor Neutral**: No lock-in to specific monitoring/storage systems
29//!
30//! ## Plugin Categories
31//!
32//! - `StorageExtension`: Alternative storage backends (LSCS, RocksDB, etc.)
33//! - `IndexExtension`: Custom index types (vector, learned, etc.)
34//! - `ObservabilityExtension`: Metrics, tracing, logging backends
35//! - `CompressionExtension`: Compression algorithms
36//!
37//! ## Example: Adding Prometheus Metrics
38//!
39//! ```ignore
40//! // In a separate crate: sochdb-prometheus-plugin
41//! struct PrometheusPlugin { /* ... */ }
42//!
43//! impl ObservabilityExtension for PrometheusPlugin {
44//! fn record_metric(&self, name: &str, value: f64, tags: &[(&str, &str)]) {
45//! // Push to Prometheus
46//! }
47//! }
48//!
49//! // Usage:
50//! let db = KernelDB::open(path)?;
51//! db.plugins().register_observability(Box::new(PrometheusPlugin::new()))?;
52//! ```
53
54use crate::error::{KernelError, KernelResult};
55use crate::kernel_api::{HealthInfo, RowId, TableId};
56use crate::transaction::TransactionId;
57use parking_lot::RwLock;
58use std::any::Any;
59use std::collections::HashMap;
60use std::sync::Arc;
61
62// ============================================================================
63// Extension Trait Definitions
64// ============================================================================
65
66/// Information about an extension
67#[derive(Debug, Clone)]
68pub struct ExtensionInfo {
69 /// Unique extension name (e.g., "prometheus-metrics")
70 pub name: String,
71 /// Extension version
72 pub version: String,
73 /// Human-readable description
74 pub description: String,
75 /// Extension author
76 pub author: String,
77 /// Capabilities provided
78 pub capabilities: Vec<ExtensionCapability>,
79}
80
81/// Capabilities an extension can provide
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83pub enum ExtensionCapability {
84 /// Alternative storage backend
85 Storage,
86 /// Custom index type
87 Index,
88 /// Metrics/tracing/logging
89 Observability,
90 /// Compression algorithm
91 Compression,
92 /// Query optimization
93 QueryOptimizer,
94 /// Authentication/Authorization
95 Auth,
96 /// Custom - for third-party extensions
97 Custom(String),
98}
99
100/// Base trait for all extensions
101pub trait Extension: Send + Sync {
102 /// Get extension information
103 fn info(&self) -> ExtensionInfo;
104
105 /// Initialize the extension
106 fn init(&mut self) -> KernelResult<()> {
107 Ok(())
108 }
109
110 /// Shutdown the extension gracefully
111 fn shutdown(&mut self) -> KernelResult<()> {
112 Ok(())
113 }
114
115 /// Cast to Any for downcasting
116 fn as_any(&self) -> &dyn Any;
117
118 /// Cast to mutable Any for downcasting
119 fn as_any_mut(&mut self) -> &mut dyn Any;
120}
121
122// ============================================================================
123// Storage Extension
124// ============================================================================
125
126/// Storage backend extension
127///
128/// Implement this to provide alternative storage engines (LSCS, RocksDB, etc.)
129pub trait StorageExtension: Extension {
130 /// Read data for a key
131 fn get(&self, table_id: TableId, key: &[u8]) -> KernelResult<Option<Vec<u8>>>;
132
133 /// Write data for a key
134 fn put(
135 &self,
136 table_id: TableId,
137 key: &[u8],
138 value: &[u8],
139 txn_id: TransactionId,
140 ) -> KernelResult<()>;
141
142 /// Delete a key
143 fn delete(&self, table_id: TableId, key: &[u8], txn_id: TransactionId) -> KernelResult<()>;
144
145 /// Scan a range of keys
146 fn scan(
147 &self,
148 table_id: TableId,
149 start: &[u8],
150 end: &[u8],
151 limit: usize,
152 ) -> KernelResult<Vec<(Vec<u8>, Vec<u8>)>>;
153
154 /// Flush pending writes
155 fn flush(&self) -> KernelResult<()>;
156
157 /// Compact storage (if applicable)
158 fn compact(&self) -> KernelResult<()> {
159 Ok(()) // Default: no-op
160 }
161
162 /// Get storage statistics
163 fn stats(&self) -> StorageStats {
164 StorageStats::default()
165 }
166}
167
168/// Storage statistics
169#[derive(Debug, Clone, Default)]
170pub struct StorageStats {
171 /// Total bytes stored
172 pub bytes_stored: u64,
173 /// Number of keys
174 pub key_count: u64,
175 /// Pending compaction bytes
176 pub pending_compaction_bytes: u64,
177 /// Write amplification factor
178 pub write_amplification: f64,
179}
180
181// ============================================================================
182// Index Extension
183// ============================================================================
184
185/// Index extension
186///
187/// Implement this for custom index types (vector, learned, full-text, etc.)
188pub trait IndexExtension: Extension {
189 /// Index type name (e.g., "hnsw", "learned", "btree")
190 fn index_type(&self) -> &str;
191
192 /// Build index on existing data
193 fn build(
194 &mut self,
195 table_id: TableId,
196 column_id: u16,
197 data: &[(RowId, Vec<u8>)],
198 ) -> KernelResult<()>;
199
200 /// Insert a key-value pair into the index
201 fn insert(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
202
203 /// Delete a key from the index
204 fn delete(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
205
206 /// Point lookup
207 fn lookup(&self, key: &[u8]) -> KernelResult<Vec<RowId>>;
208
209 /// Range scan
210 fn range(&self, start: &[u8], end: &[u8], limit: usize) -> KernelResult<Vec<RowId>>;
211
212 /// Nearest neighbor search (for vector indices)
213 fn nearest(&self, _query: &[u8], _k: usize) -> KernelResult<Vec<(RowId, f32)>> {
214 Err(KernelError::Plugin {
215 message: "nearest neighbor not supported by this index type".into(),
216 })
217 }
218
219 /// Get index size in bytes
220 fn size_bytes(&self) -> u64;
221}
222
223// ============================================================================
224// Observability Extension (PLUGIN ARCHITECTURE)
225// ============================================================================
226
227/// Observability extension
228///
229/// Implement this for metrics, tracing, and logging backends.
230///
231/// ## Why Plugin Architecture?
232///
233/// 1. **No Dependency Bloat**: Core doesn't pull in Prometheus, DataDog, etc.
234/// 2. **Vendor Neutral**: Users choose their observability stack
235/// 3. **Flexible**: Can run without any observability in embedded scenarios
236///
237/// ## Available Plugins (separate crates):
238///
239/// - `sochdb-prometheus`: Prometheus metrics
240/// - `sochdb-datadog`: DataDog integration
241/// - `sochdb-opentelemetry`: OpenTelemetry support
242/// - `sochdb-logging-json`: JSON structured logging
243/// - `sochdb-logging-logfmt`: logfmt style logging
244pub trait ObservabilityExtension: Extension {
245 // -------------------------------------------------------------------------
246 // Metrics
247 // -------------------------------------------------------------------------
248
249 /// Record a counter increment
250 fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]);
251
252 /// Record a gauge value
253 fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]);
254
255 /// Record a histogram observation
256 fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]);
257
258 // -------------------------------------------------------------------------
259 // Tracing
260 // -------------------------------------------------------------------------
261
262 /// Start a new span
263 fn span_start(&self, name: &str, parent: Option<u64>) -> u64;
264
265 /// End a span
266 fn span_end(&self, span_id: u64);
267
268 /// Add an event to a span
269 fn span_event(&self, span_id: u64, name: &str, attributes: &[(&str, &str)]);
270
271 // -------------------------------------------------------------------------
272 // Logging
273 // -------------------------------------------------------------------------
274
275 /// Log at debug level
276 fn log_debug(&self, message: &str, fields: &[(&str, &str)]) {
277 let _ = (message, fields); // Default: no-op
278 }
279
280 /// Log at info level
281 fn log_info(&self, message: &str, fields: &[(&str, &str)]) {
282 let _ = (message, fields); // Default: no-op
283 }
284
285 /// Log at warn level
286 fn log_warn(&self, message: &str, fields: &[(&str, &str)]) {
287 let _ = (message, fields); // Default: no-op
288 }
289
290 /// Log at error level
291 fn log_error(&self, message: &str, fields: &[(&str, &str)]) {
292 let _ = (message, fields); // Default: no-op
293 }
294
295 // -------------------------------------------------------------------------
296 // Health Reporting
297 // -------------------------------------------------------------------------
298
299 /// Report health status (called periodically by kernel)
300 fn report_health(&self, health: &HealthInfo) {
301 let _ = health; // Default: no-op
302 }
303}
304
305// ============================================================================
306// Compression Extension
307// ============================================================================
308
309/// Compression algorithm extension
310pub trait CompressionExtension: Extension {
311 /// Algorithm name (e.g., "lz4", "zstd", "snappy")
312 fn algorithm(&self) -> &str;
313
314 /// Compress data
315 fn compress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
316
317 /// Decompress data
318 fn decompress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
319
320 /// Compression level (if applicable)
321 fn set_level(&mut self, _level: i32) -> KernelResult<()> {
322 Ok(())
323 }
324}
325
326// ============================================================================
327// Plugin Manager
328// ============================================================================
329
330/// Plugin manager - registry for all extensions
331///
332/// The kernel uses this to discover and invoke extensions.
333pub struct PluginManager {
334 /// Storage extensions by name
335 storage: RwLock<HashMap<String, Arc<dyn StorageExtension>>>,
336 /// Index extensions by name
337 indices: RwLock<HashMap<String, Arc<RwLock<dyn IndexExtension>>>>,
338 /// Observability extensions (can have multiple)
339 observability: RwLock<Vec<Arc<dyn ObservabilityExtension>>>,
340 /// Compression extensions by algorithm name
341 compression: RwLock<HashMap<String, Arc<dyn CompressionExtension>>>,
342 /// Active storage backend name
343 active_storage: RwLock<Option<String>>,
344}
345
346impl Default for PluginManager {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352impl PluginManager {
353 /// Create a new plugin manager
354 pub fn new() -> Self {
355 Self {
356 storage: RwLock::new(HashMap::new()),
357 indices: RwLock::new(HashMap::new()),
358 observability: RwLock::new(Vec::new()),
359 compression: RwLock::new(HashMap::new()),
360 active_storage: RwLock::new(None),
361 }
362 }
363
364 // -------------------------------------------------------------------------
365 // Storage Extensions
366 // -------------------------------------------------------------------------
367
368 /// Register a storage extension
369 pub fn register_storage(&self, ext: Arc<dyn StorageExtension>) -> KernelResult<()> {
370 let name = ext.info().name.clone();
371 let mut storage = self.storage.write();
372
373 if storage.contains_key(&name) {
374 return Err(KernelError::Plugin {
375 message: format!("storage extension '{}' already registered", name),
376 });
377 }
378
379 storage.insert(name.clone(), ext);
380
381 // Set as active if it's the first one
382 let mut active = self.active_storage.write();
383 if active.is_none() {
384 *active = Some(name);
385 }
386
387 Ok(())
388 }
389
390 /// Set the active storage backend
391 pub fn set_active_storage(&self, name: &str) -> KernelResult<()> {
392 let storage = self.storage.read();
393 if !storage.contains_key(name) {
394 return Err(KernelError::Plugin {
395 message: format!("storage extension '{}' not found", name),
396 });
397 }
398 *self.active_storage.write() = Some(name.to_string());
399 Ok(())
400 }
401
402 /// Get the active storage backend
403 pub fn storage(&self) -> Option<Arc<dyn StorageExtension>> {
404 let active = self.active_storage.read();
405 active
406 .as_ref()
407 .and_then(|name| self.storage.read().get(name).cloned())
408 }
409
410 // -------------------------------------------------------------------------
411 // Index Extensions
412 // -------------------------------------------------------------------------
413
414 /// Register an index extension
415 pub fn register_index(&self, ext: Arc<RwLock<dyn IndexExtension>>) -> KernelResult<()> {
416 let name = ext.read().info().name.clone();
417 let mut indices = self.indices.write();
418
419 if indices.contains_key(&name) {
420 return Err(KernelError::Plugin {
421 message: format!("index extension '{}' already registered", name),
422 });
423 }
424
425 indices.insert(name, ext);
426 Ok(())
427 }
428
429 /// Get an index extension by name
430 pub fn index(&self, name: &str) -> Option<Arc<RwLock<dyn IndexExtension>>> {
431 self.indices.read().get(name).cloned()
432 }
433
434 /// List registered index types
435 pub fn list_index_types(&self) -> Vec<String> {
436 self.indices.read().keys().cloned().collect()
437 }
438
439 // -------------------------------------------------------------------------
440 // Observability Extensions
441 // -------------------------------------------------------------------------
442
443 /// Register an observability extension
444 ///
445 /// Multiple observability extensions can be registered (fan-out to all)
446 pub fn register_observability(&self, ext: Arc<dyn ObservabilityExtension>) -> KernelResult<()> {
447 self.observability.write().push(ext);
448 Ok(())
449 }
450
451 /// Record a counter across all observability extensions
452 pub fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
453 for ext in self.observability.read().iter() {
454 ext.counter_inc(name, value, labels);
455 }
456 }
457
458 /// Record a gauge across all observability extensions
459 pub fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
460 for ext in self.observability.read().iter() {
461 ext.gauge_set(name, value, labels);
462 }
463 }
464
465 /// Record a histogram observation across all observability extensions
466 pub fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
467 for ext in self.observability.read().iter() {
468 ext.histogram_observe(name, value, labels);
469 }
470 }
471
472 /// Report health to all observability extensions
473 pub fn report_health(&self, health: &HealthInfo) {
474 for ext in self.observability.read().iter() {
475 ext.report_health(health);
476 }
477 }
478
479 /// Check if any observability is configured
480 pub fn has_observability(&self) -> bool {
481 !self.observability.read().is_empty()
482 }
483
484 // -------------------------------------------------------------------------
485 // Compression Extensions
486 // -------------------------------------------------------------------------
487
488 /// Register a compression extension
489 pub fn register_compression(&self, ext: Arc<dyn CompressionExtension>) -> KernelResult<()> {
490 let algo = ext.algorithm().to_string();
491 let mut compression = self.compression.write();
492
493 if compression.contains_key(&algo) {
494 return Err(KernelError::Plugin {
495 message: format!("compression '{}' already registered", algo),
496 });
497 }
498
499 compression.insert(algo, ext);
500 Ok(())
501 }
502
503 /// Get a compression extension by algorithm name
504 pub fn compression(&self, algorithm: &str) -> Option<Arc<dyn CompressionExtension>> {
505 self.compression.read().get(algorithm).cloned()
506 }
507
508 /// List available compression algorithms
509 pub fn list_compression(&self) -> Vec<String> {
510 self.compression.read().keys().cloned().collect()
511 }
512
513 // -------------------------------------------------------------------------
514 // Lifecycle
515 // -------------------------------------------------------------------------
516
517 /// Shutdown all extensions gracefully
518 pub fn shutdown_all(&self) -> KernelResult<()> {
519 // Extensions are immutable through Arc, so we can't call shutdown
520 // In a real implementation, we'd use Arc<RwLock<dyn Extension>>
521 // For now, this is a no-op placeholder
522 Ok(())
523 }
524
525 /// Get information about all registered extensions
526 pub fn list_extensions(&self) -> Vec<ExtensionInfo> {
527 let mut result = Vec::new();
528
529 for ext in self.storage.read().values() {
530 result.push(ext.info());
531 }
532
533 for ext in self.indices.read().values() {
534 result.push(ext.read().info());
535 }
536
537 for ext in self.observability.read().iter() {
538 result.push(ext.info());
539 }
540
541 for ext in self.compression.read().values() {
542 result.push(ext.info());
543 }
544
545 result
546 }
547}
548
549// ============================================================================
550// Null Observability (Default - No-Op)
551// ============================================================================
552
553/// Null observability extension - does nothing
554///
555/// Used when no observability is configured. Zero overhead.
556pub struct NullObservability;
557
558impl Extension for NullObservability {
559 fn info(&self) -> ExtensionInfo {
560 ExtensionInfo {
561 name: "null-observability".into(),
562 version: "0.0.0".into(),
563 description: "No-op observability (default)".into(),
564 author: "SochDB".into(),
565 capabilities: vec![ExtensionCapability::Observability],
566 }
567 }
568
569 fn as_any(&self) -> &dyn Any {
570 self
571 }
572
573 fn as_any_mut(&mut self) -> &mut dyn Any {
574 self
575 }
576}
577
578impl ObservabilityExtension for NullObservability {
579 fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
580 fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
581 fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
582 fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
583 0
584 }
585 fn span_end(&self, _span_id: u64) {}
586 fn span_event(&self, _span_id: u64, _name: &str, _attributes: &[(&str, &str)]) {}
587}
588
589// ============================================================================
590// Dynamic Plugin Loading (Optional Feature)
591// ============================================================================
592
593#[cfg(feature = "dynamic-plugins")]
594pub mod dynamic {
595 //! Dynamic plugin loading support
596 //!
597 //! Enabled with the `dynamic-plugins` feature.
598 //! Allows loading plugins from shared libraries at runtime.
599
600 use super::*;
601 use libloading::{Library, Symbol};
602 use std::path::Path;
603
604 /// Dynamic plugin loader
605 pub struct DynamicPluginLoader {
606 /// Loaded libraries (kept alive)
607 _libraries: Vec<Library>,
608 }
609
610 impl DynamicPluginLoader {
611 /// Create a new dynamic plugin loader
612 pub fn new() -> Self {
613 Self {
614 _libraries: Vec::new(),
615 }
616 }
617
618 /// Load an observability plugin from a shared library
619 ///
620 /// The library must export a function:
621 /// ```c
622 /// extern "C" fn create_observability_plugin() -> *mut dyn ObservabilityExtension
623 /// ```
624 pub fn load_observability(
625 &mut self,
626 path: &Path,
627 ) -> KernelResult<Arc<dyn ObservabilityExtension>> {
628 unsafe {
629 let lib = Library::new(path).map_err(|e| KernelError::Plugin {
630 message: format!("failed to load library: {}", e),
631 })?;
632
633 let create_fn: Symbol<fn() -> Box<dyn ObservabilityExtension>> = lib
634 .get(b"create_observability_plugin")
635 .map_err(|e| KernelError::Plugin {
636 message: format!("symbol not found: {}", e),
637 })?;
638
639 let plugin = create_fn();
640 self._libraries.push(lib);
641
642 Ok(Arc::from(plugin))
643 }
644 }
645 }
646
647 impl Default for DynamicPluginLoader {
648 fn default() -> Self {
649 Self::new()
650 }
651 }
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[test]
659 fn test_plugin_manager_creation() {
660 let pm = PluginManager::new();
661 assert!(!pm.has_observability());
662 assert!(pm.storage().is_none());
663 }
664
665 #[test]
666 fn test_null_observability() {
667 let null = NullObservability;
668 // Should not panic
669 null.counter_inc("test", 1, &[]);
670 null.gauge_set("test", 1.0, &[]);
671 null.histogram_observe("test", 1.0, &[]);
672 let span = null.span_start("test", None);
673 null.span_event(span, "event", &[]);
674 null.span_end(span);
675 }
676
677 #[test]
678 fn test_register_observability() {
679 let pm = PluginManager::new();
680 let null = Arc::new(NullObservability);
681
682 assert!(!pm.has_observability());
683 pm.register_observability(null).unwrap();
684 assert!(pm.has_observability());
685 }
686}