use crate::error::{KernelError, KernelResult};
use crate::kernel_api::{HealthInfo, RowId, TableId};
use crate::transaction::TransactionId;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ExtensionInfo {
pub name: String,
pub version: String,
pub description: String,
pub author: String,
pub capabilities: Vec<ExtensionCapability>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ExtensionCapability {
Storage,
Index,
Observability,
Compression,
QueryOptimizer,
Auth,
Custom(String),
}
pub trait Extension: Send + Sync {
fn info(&self) -> ExtensionInfo;
fn init(&mut self) -> KernelResult<()> {
Ok(())
}
fn shutdown(&mut self) -> KernelResult<()> {
Ok(())
}
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
pub trait StorageExtension: Extension {
fn get(&self, table_id: TableId, key: &[u8]) -> KernelResult<Option<Vec<u8>>>;
fn put(
&self,
table_id: TableId,
key: &[u8],
value: &[u8],
txn_id: TransactionId,
) -> KernelResult<()>;
fn delete(&self, table_id: TableId, key: &[u8], txn_id: TransactionId) -> KernelResult<()>;
fn scan(
&self,
table_id: TableId,
start: &[u8],
end: &[u8],
limit: usize,
) -> KernelResult<Vec<(Vec<u8>, Vec<u8>)>>;
fn flush(&self) -> KernelResult<()>;
fn compact(&self) -> KernelResult<()> {
Ok(()) }
fn stats(&self) -> StorageStats {
StorageStats::default()
}
}
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub bytes_stored: u64,
pub key_count: u64,
pub pending_compaction_bytes: u64,
pub write_amplification: f64,
}
pub trait IndexExtension: Extension {
fn index_type(&self) -> &str;
fn build(
&mut self,
table_id: TableId,
column_id: u16,
data: &[(RowId, Vec<u8>)],
) -> KernelResult<()>;
fn insert(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
fn delete(&mut self, key: &[u8], row_id: RowId) -> KernelResult<()>;
fn lookup(&self, key: &[u8]) -> KernelResult<Vec<RowId>>;
fn range(&self, start: &[u8], end: &[u8], limit: usize) -> KernelResult<Vec<RowId>>;
fn nearest(&self, _query: &[u8], _k: usize) -> KernelResult<Vec<(RowId, f32)>> {
Err(KernelError::Plugin {
message: "nearest neighbor not supported by this index type".into(),
})
}
fn size_bytes(&self) -> u64;
}
pub trait ObservabilityExtension: Extension {
fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]);
fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]);
fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]);
fn span_start(&self, name: &str, parent: Option<u64>) -> u64;
fn span_end(&self, span_id: u64);
fn span_event(&self, span_id: u64, name: &str, attributes: &[(&str, &str)]);
fn log_debug(&self, message: &str, fields: &[(&str, &str)]) {
let _ = (message, fields); }
fn log_info(&self, message: &str, fields: &[(&str, &str)]) {
let _ = (message, fields); }
fn log_warn(&self, message: &str, fields: &[(&str, &str)]) {
let _ = (message, fields); }
fn log_error(&self, message: &str, fields: &[(&str, &str)]) {
let _ = (message, fields); }
fn report_health(&self, health: &HealthInfo) {
let _ = health; }
}
pub trait CompressionExtension: Extension {
fn algorithm(&self) -> &str;
fn compress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
fn decompress(&self, input: &[u8]) -> KernelResult<Vec<u8>>;
fn set_level(&mut self, _level: i32) -> KernelResult<()> {
Ok(())
}
}
pub struct PluginManager {
storage: RwLock<HashMap<String, Arc<dyn StorageExtension>>>,
indices: RwLock<HashMap<String, Arc<RwLock<dyn IndexExtension>>>>,
observability: RwLock<Vec<Arc<dyn ObservabilityExtension>>>,
compression: RwLock<HashMap<String, Arc<dyn CompressionExtension>>>,
active_storage: RwLock<Option<String>>,
}
impl Default for PluginManager {
fn default() -> Self {
Self::new()
}
}
impl PluginManager {
pub fn new() -> Self {
Self {
storage: RwLock::new(HashMap::new()),
indices: RwLock::new(HashMap::new()),
observability: RwLock::new(Vec::new()),
compression: RwLock::new(HashMap::new()),
active_storage: RwLock::new(None),
}
}
pub fn register_storage(&self, ext: Arc<dyn StorageExtension>) -> KernelResult<()> {
let name = ext.info().name.clone();
let mut storage = self.storage.write();
if storage.contains_key(&name) {
return Err(KernelError::Plugin {
message: format!("storage extension '{}' already registered", name),
});
}
storage.insert(name.clone(), ext);
let mut active = self.active_storage.write();
if active.is_none() {
*active = Some(name);
}
Ok(())
}
pub fn set_active_storage(&self, name: &str) -> KernelResult<()> {
let storage = self.storage.read();
if !storage.contains_key(name) {
return Err(KernelError::Plugin {
message: format!("storage extension '{}' not found", name),
});
}
*self.active_storage.write() = Some(name.to_string());
Ok(())
}
pub fn storage(&self) -> Option<Arc<dyn StorageExtension>> {
let active = self.active_storage.read();
active
.as_ref()
.and_then(|name| self.storage.read().get(name).cloned())
}
pub fn register_index(&self, ext: Arc<RwLock<dyn IndexExtension>>) -> KernelResult<()> {
let name = ext.read().info().name.clone();
let mut indices = self.indices.write();
if indices.contains_key(&name) {
return Err(KernelError::Plugin {
message: format!("index extension '{}' already registered", name),
});
}
indices.insert(name, ext);
Ok(())
}
pub fn index(&self, name: &str) -> Option<Arc<RwLock<dyn IndexExtension>>> {
self.indices.read().get(name).cloned()
}
pub fn list_index_types(&self) -> Vec<String> {
self.indices.read().keys().cloned().collect()
}
pub fn register_observability(&self, ext: Arc<dyn ObservabilityExtension>) -> KernelResult<()> {
self.observability.write().push(ext);
Ok(())
}
pub fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
for ext in self.observability.read().iter() {
ext.counter_inc(name, value, labels);
}
}
pub fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
for ext in self.observability.read().iter() {
ext.gauge_set(name, value, labels);
}
}
pub fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
for ext in self.observability.read().iter() {
ext.histogram_observe(name, value, labels);
}
}
pub fn report_health(&self, health: &HealthInfo) {
for ext in self.observability.read().iter() {
ext.report_health(health);
}
}
pub fn has_observability(&self) -> bool {
!self.observability.read().is_empty()
}
pub fn register_compression(&self, ext: Arc<dyn CompressionExtension>) -> KernelResult<()> {
let algo = ext.algorithm().to_string();
let mut compression = self.compression.write();
if compression.contains_key(&algo) {
return Err(KernelError::Plugin {
message: format!("compression '{}' already registered", algo),
});
}
compression.insert(algo, ext);
Ok(())
}
pub fn compression(&self, algorithm: &str) -> Option<Arc<dyn CompressionExtension>> {
self.compression.read().get(algorithm).cloned()
}
pub fn list_compression(&self) -> Vec<String> {
self.compression.read().keys().cloned().collect()
}
pub fn shutdown_all(&self) -> KernelResult<()> {
Ok(())
}
pub fn list_extensions(&self) -> Vec<ExtensionInfo> {
let mut result = Vec::new();
for ext in self.storage.read().values() {
result.push(ext.info());
}
for ext in self.indices.read().values() {
result.push(ext.read().info());
}
for ext in self.observability.read().iter() {
result.push(ext.info());
}
for ext in self.compression.read().values() {
result.push(ext.info());
}
result
}
}
pub struct NullObservability;
impl Extension for NullObservability {
fn info(&self) -> ExtensionInfo {
ExtensionInfo {
name: "null-observability".into(),
version: "0.0.0".into(),
description: "No-op observability (default)".into(),
author: "SochDB".into(),
capabilities: vec![ExtensionCapability::Observability],
}
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl ObservabilityExtension for NullObservability {
fn counter_inc(&self, _name: &str, _value: u64, _labels: &[(&str, &str)]) {}
fn gauge_set(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
fn histogram_observe(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
fn span_start(&self, _name: &str, _parent: Option<u64>) -> u64 {
0
}
fn span_end(&self, _span_id: u64) {}
fn span_event(&self, _span_id: u64, _name: &str, _attributes: &[(&str, &str)]) {}
}
#[cfg(feature = "dynamic-plugins")]
pub mod dynamic {
use super::*;
use libloading::{Library, Symbol};
use std::path::Path;
pub struct DynamicPluginLoader {
_libraries: Vec<Library>,
}
impl DynamicPluginLoader {
pub fn new() -> Self {
Self {
_libraries: Vec::new(),
}
}
pub unsafe fn load_observability(
&mut self,
path: &Path,
) -> KernelResult<Arc<dyn ObservabilityExtension>> {
if !path.is_absolute() {
return Err(KernelError::Plugin {
message: format!(
"plugin path must be absolute to prevent path hijacking: {}",
path.display()
),
});
}
let canonical = path.canonicalize().map_err(|e| KernelError::Plugin {
message: format!("failed to canonicalize plugin path: {}", e),
})?;
unsafe {
let lib = Library::new(&canonical).map_err(|e| KernelError::Plugin {
message: format!("failed to load library: {}", e),
})?;
let create_fn: Symbol<fn() -> Box<dyn ObservabilityExtension>> = lib
.get(b"create_observability_plugin")
.map_err(|e| KernelError::Plugin {
message: format!("symbol not found: {}", e),
})?;
let plugin = create_fn();
self._libraries.push(lib);
Ok(Arc::from(plugin))
}
}
}
impl Default for DynamicPluginLoader {
fn default() -> Self {
Self::new()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_plugin_manager_creation() {
let pm = PluginManager::new();
assert!(!pm.has_observability());
assert!(pm.storage().is_none());
}
#[test]
fn test_null_observability() {
let null = NullObservability;
null.counter_inc("test", 1, &[]);
null.gauge_set("test", 1.0, &[]);
null.histogram_observe("test", 1.0, &[]);
let span = null.span_start("test", None);
null.span_event(span, "event", &[]);
null.span_end(span);
}
#[test]
fn test_register_observability() {
let pm = PluginManager::new();
let null = Arc::new(NullObservability);
assert!(!pm.has_observability());
pm.register_observability(null).unwrap();
assert!(pm.has_observability());
}
}