use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
use crate::indexes::IndexBackendPlugin;
use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
use drasi_core::interface::{CreatedIndexes, IndexSet, NoOpSessionControl};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
#[derive(Debug)]
pub enum IndexError {
UnknownStore(String),
ConnectionFailed(String),
PathError(String),
InitializationFailed(String),
NotSupported,
}
impl fmt::Display for IndexError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IndexError::UnknownStore(name) => {
write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
}
IndexError::ConnectionFailed(details) => {
write!(f, "Failed to connect to storage backend: {details}")
}
IndexError::PathError(details) => {
write!(f, "Storage path error: {details}")
}
IndexError::InitializationFailed(details) => {
write!(f, "Failed to initialize storage backend: {details}")
}
IndexError::NotSupported => {
write!(f, "Operation not supported")
}
}
}
}
impl std::error::Error for IndexError {}
impl From<drasi_core::interface::IndexError> for IndexError {
fn from(err: drasi_core::interface::IndexError) -> Self {
IndexError::InitializationFailed(err.to_string())
}
}
pub struct IndexFactory {
memory_backends: HashMap<String, bool>,
plugin_backends: HashMap<String, String>,
providers: HashMap<String, Arc<dyn IndexBackendPlugin>>,
default_backend: Option<StorageBackendRef>,
}
impl fmt::Debug for IndexFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IndexFactory")
.field("memory_backends", &self.memory_backends)
.field("plugin_backends", &self.plugin_backends)
.field("providers", &self.providers.keys().collect::<Vec<_>>())
.field("default_backend", &self.default_backend)
.finish()
}
}
impl IndexFactory {
pub fn new(
backends: Vec<StorageBackendConfig>,
providers: HashMap<String, Arc<dyn IndexBackendPlugin>>,
) -> Self {
Self::new_with_default(backends, providers, None)
}
pub fn new_with_default(
backends: Vec<StorageBackendConfig>,
providers: HashMap<String, Arc<dyn IndexBackendPlugin>>,
default_backend: Option<StorageBackendRef>,
) -> Self {
let mut memory_backends = HashMap::new();
let mut plugin_backends = HashMap::new();
for b in backends {
match b.spec {
StorageBackendSpec::Memory { enable_archive } => {
memory_backends.insert(b.id, enable_archive);
}
StorageBackendSpec::Plugin { kind, .. } => {
plugin_backends.insert(b.id, kind);
}
}
}
Self {
memory_backends,
plugin_backends,
providers,
default_backend,
}
}
pub fn default_backend(&self) -> Option<&StorageBackendRef> {
self.default_backend.as_ref()
}
pub async fn build(
&self,
backend_ref: &StorageBackendRef,
query_id: &str,
) -> Result<CreatedIndexes, IndexError> {
match backend_ref {
StorageBackendRef::Named(name) => {
if let Some(provider) = self.providers.get(name) {
self.build_from_plugin(provider, query_id).await
} else if let Some(enable_archive) = self.memory_backends.get(name) {
self.build_memory_indexes(*enable_archive)
} else if let Some(kind) = self.plugin_backends.get(name) {
Err(IndexError::InitializationFailed(format!(
"Storage backend '{name}' (kind '{kind}') is declared but no index provider \
was injected for it. Use DrasiLib::builder().with_index_provider(\"{name}\", ...) \
to provide one."
)))
} else {
Err(IndexError::UnknownStore(name.clone()))
}
}
StorageBackendRef::Inline(StorageBackendSpec::Memory { enable_archive }) => {
self.build_memory_indexes(*enable_archive)
}
StorageBackendRef::Inline(StorageBackendSpec::Plugin { kind, .. }) => {
Err(IndexError::InitializationFailed(format!(
"Inline plugin storage backend (kind '{kind}') is not supported in embedded mode. \
Declare a named storage backend and inject a provider via \
DrasiLib::builder().with_index_provider(name, ...)."
)))
}
}
}
fn build_memory_indexes(&self, enable_archive: bool) -> Result<CreatedIndexes, IndexError> {
let mut element_index = InMemoryElementIndex::new();
if enable_archive {
element_index.enable_archive();
}
let element_index = Arc::new(element_index);
let result_index = InMemoryResultIndex::new();
let future_queue = InMemoryFutureQueue::new();
Ok(CreatedIndexes {
set: IndexSet {
element_index: element_index.clone(),
archive_index: element_index,
result_index: Arc::new(result_index),
future_queue: Arc::new(future_queue),
session_control: Arc::new(NoOpSessionControl),
},
checkpoint_store: None,
outbox_writer: None,
live_results_writer: None,
})
}
async fn build_from_plugin(
&self,
plugin: &Arc<dyn IndexBackendPlugin>,
query_id: &str,
) -> Result<CreatedIndexes, IndexError> {
plugin.create_indexes(query_id).await.map_err(|e| {
log::error!("Failed to create indexes for query '{query_id}': {e}");
IndexError::InitializationFailed(format!(
"Failed to create indexes for query '{query_id}': {e}"
))
})
}
pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
match backend_ref {
StorageBackendRef::Named(name) => {
if let Some(provider) = self.providers.get(name) {
provider.is_volatile()
} else if self.memory_backends.contains_key(name) {
true
} else {
false
}
}
StorageBackendRef::Inline(spec) => spec.is_volatile(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
struct MockPlugin {
volatile: bool,
}
#[async_trait]
impl IndexBackendPlugin for MockPlugin {
async fn create_indexes(
&self,
_query_id: &str,
) -> Result<drasi_core::interface::CreatedIndexes, drasi_core::interface::IndexError>
{
let element_index = Arc::new(InMemoryElementIndex::new());
Ok(CreatedIndexes {
set: IndexSet {
element_index: element_index.clone(),
archive_index: element_index,
result_index: Arc::new(InMemoryResultIndex::new()),
future_queue: Arc::new(InMemoryFutureQueue::new()),
session_control: Arc::new(NoOpSessionControl),
},
checkpoint_store: None,
outbox_writer: None,
live_results_writer: None,
})
}
fn is_volatile(&self) -> bool {
self.volatile
}
}
fn providers_with(name: &str, volatile: bool) -> HashMap<String, Arc<dyn IndexBackendPlugin>> {
let mut m: HashMap<String, Arc<dyn IndexBackendPlugin>> = HashMap::new();
m.insert(name.to_string(), Arc::new(MockPlugin { volatile }));
m
}
#[test]
fn test_index_factory_default_backend_accessor() {
let factory = IndexFactory::new(vec![], HashMap::new());
assert!(factory.default_backend().is_none());
let factory = IndexFactory::new_with_default(
vec![],
providers_with("rocks", false),
Some(StorageBackendRef::Named("rocks".to_string())),
);
match factory.default_backend() {
Some(StorageBackendRef::Named(name)) => assert_eq!(name, "rocks"),
other => panic!("expected default Named(\"rocks\"), got {other:?}"),
}
}
#[test]
fn test_index_factory_new_splits_backends() {
let backends = vec![
StorageBackendConfig {
id: "memory_test".to_string(),
spec: StorageBackendSpec::Memory {
enable_archive: true,
},
},
StorageBackendConfig {
id: "rocks_test".to_string(),
spec: StorageBackendSpec::Plugin {
kind: "rocksdb".to_string(),
config: serde_json::json!({ "path": "/tmp/test" }),
},
},
];
let factory = IndexFactory::new(backends, HashMap::new());
assert_eq!(factory.memory_backends.len(), 1);
assert!(factory.memory_backends.contains_key("memory_test"));
assert_eq!(factory.plugin_backends.len(), 1);
assert_eq!(
factory
.plugin_backends
.get("rocks_test")
.map(|s| s.as_str()),
Some("rocksdb")
);
}
#[tokio::test]
async fn test_build_memory_indexes() {
let backends = vec![StorageBackendConfig {
id: "memory_test".to_string(),
spec: StorageBackendSpec::Memory {
enable_archive: true,
},
}];
let factory = IndexFactory::new(backends, HashMap::new());
let backend_ref = StorageBackendRef::Named("memory_test".to_string());
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_build_named_provider() {
let factory = IndexFactory::new(vec![], providers_with("rocks", false));
let backend_ref = StorageBackendRef::Named("rocks".to_string());
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_build_unknown_backend() {
let factory = IndexFactory::new(vec![], HashMap::new());
let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_err());
match result.unwrap_err() {
IndexError::UnknownStore(name) => {
assert_eq!(name, "nonexistent");
}
_ => panic!("Expected UnknownStore error"),
}
}
#[tokio::test]
async fn test_build_declared_plugin_without_provider_errors() {
let backends = vec![StorageBackendConfig {
id: "rocks".to_string(),
spec: StorageBackendSpec::Plugin {
kind: "rocksdb".to_string(),
config: serde_json::json!({ "path": "/data/test" }),
},
}];
let factory = IndexFactory::new(backends, HashMap::new());
let backend_ref = StorageBackendRef::Named("rocks".to_string());
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_err());
match result.unwrap_err() {
IndexError::InitializationFailed(msg) => {
assert!(msg.contains("no index provider"));
assert!(msg.contains("rocks"));
}
_ => panic!("Expected InitializationFailed error"),
}
}
#[tokio::test]
async fn test_build_inline_memory() {
let factory = IndexFactory::new(vec![], HashMap::new());
let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
enable_archive: false,
});
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_build_inline_plugin_errors() {
let factory = IndexFactory::new(vec![], providers_with("rocks", false));
let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Plugin {
kind: "rocksdb".to_string(),
config: serde_json::json!({ "path": "/data/test" }),
});
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_err());
match result.unwrap_err() {
IndexError::InitializationFailed(msg) => {
assert!(msg.contains("Inline plugin"));
}
_ => panic!("Expected InitializationFailed error"),
}
}
#[test]
fn test_is_volatile_memory() {
let backends = vec![StorageBackendConfig {
id: "memory_test".to_string(),
spec: StorageBackendSpec::Memory {
enable_archive: false,
},
}];
let factory = IndexFactory::new(backends, HashMap::new());
let backend_ref = StorageBackendRef::Named("memory_test".to_string());
assert!(factory.is_volatile(&backend_ref));
}
#[test]
fn test_is_volatile_provider_delegates() {
let factory = IndexFactory::new(vec![], providers_with("rocks", false));
let backend_ref = StorageBackendRef::Named("rocks".to_string());
assert!(!factory.is_volatile(&backend_ref));
let factory = IndexFactory::new(vec![], providers_with("vol", true));
let backend_ref = StorageBackendRef::Named("vol".to_string());
assert!(factory.is_volatile(&backend_ref));
}
#[test]
fn test_is_volatile_declared_plugin_without_provider() {
let backends = vec![StorageBackendConfig {
id: "rocks".to_string(),
spec: StorageBackendSpec::Plugin {
kind: "rocksdb".to_string(),
config: serde_json::json!({ "path": "/data/test" }),
},
}];
let factory = IndexFactory::new(backends, HashMap::new());
let backend_ref = StorageBackendRef::Named("rocks".to_string());
assert!(!factory.is_volatile(&backend_ref));
}
#[test]
fn test_is_volatile_inline() {
let factory = IndexFactory::new(vec![], HashMap::new());
let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
enable_archive: false,
});
assert!(factory.is_volatile(&backend_ref));
let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Plugin {
kind: "rocksdb".to_string(),
config: serde_json::json!({ "path": "/data/test" }),
});
assert!(!factory.is_volatile(&backend_ref));
}
#[test]
fn test_is_volatile_unknown_backend() {
let factory = IndexFactory::new(vec![], HashMap::new());
let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
assert!(!factory.is_volatile(&backend_ref));
}
#[test]
fn test_index_error_display_unknown_store() {
let error = IndexError::UnknownStore("my_backend".to_string());
let display = format!("{error}");
assert!(display.contains("Unknown storage backend"));
assert!(display.contains("my_backend"));
}
#[test]
fn test_index_error_display_connection_failed() {
let error = IndexError::ConnectionFailed("Connection refused".to_string());
let display = format!("{error}");
assert!(display.contains("Failed to connect"));
assert!(display.contains("Connection refused"));
}
#[test]
fn test_index_error_display_path_error() {
let error = IndexError::PathError("/invalid/path".to_string());
let display = format!("{error}");
assert!(display.contains("Storage path error"));
assert!(display.contains("/invalid/path"));
}
#[test]
fn test_index_error_display_initialization_failed() {
let error = IndexError::InitializationFailed("Database init failed".to_string());
let display = format!("{error}");
assert!(display.contains("Failed to initialize"));
assert!(display.contains("Database init failed"));
}
#[test]
fn test_index_error_display_not_supported() {
let error = IndexError::NotSupported;
let display = format!("{error}");
assert!(display.contains("not supported"));
}
#[test]
fn test_index_error_is_std_error() {
let error = IndexError::UnknownStore("test".to_string());
let _: &dyn std::error::Error = &error;
}
#[test]
fn test_index_error_from_drasi_core_index_error() {
let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
let core_error = drasi_core::interface::IndexError::other(io_error);
let error: IndexError = core_error.into();
match error {
IndexError::InitializationFailed(msg) => {
assert!(msg.contains("test error"));
}
_ => panic!("Expected InitializationFailed error"),
}
}
#[test]
fn test_index_set_debug() {
let backends = vec![StorageBackendConfig {
id: "memory_test".to_string(),
spec: StorageBackendSpec::Memory {
enable_archive: false,
},
}];
let factory = IndexFactory::new(backends, HashMap::new());
let rt = tokio::runtime::Runtime::new().unwrap();
let index_set = rt
.block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
.unwrap();
let debug_str = format!("{index_set:?}");
assert!(debug_str.contains("IndexSet"));
assert!(debug_str.contains("element_index"));
assert!(debug_str.contains("archive_index"));
assert!(debug_str.contains("result_index"));
assert!(debug_str.contains("future_queue"));
}
#[test]
fn test_index_factory_debug() {
let backends = vec![StorageBackendConfig {
id: "memory_test".to_string(),
spec: StorageBackendSpec::Memory {
enable_archive: true,
},
}];
let factory = IndexFactory::new(backends, HashMap::new());
let debug_str = format!("{factory:?}");
assert!(debug_str.contains("IndexFactory"));
assert!(debug_str.contains("memory_backends"));
assert!(debug_str.contains("memory_test"));
}
#[test]
fn test_index_factory_debug_with_provider() {
let factory = IndexFactory::new(vec![], providers_with("rocks", false));
let debug_str = format!("{factory:?}");
assert!(debug_str.contains("IndexFactory"));
assert!(debug_str.contains("providers"));
assert!(debug_str.contains("rocks"));
}
#[tokio::test]
async fn test_build_memory_without_archive() {
let factory = IndexFactory::new(vec![], HashMap::new());
let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
enable_archive: false,
});
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_build_memory_with_archive() {
let factory = IndexFactory::new(vec![], HashMap::new());
let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
enable_archive: true,
});
let result = factory.build(&backend_ref, "test_query").await;
assert!(result.is_ok());
}
}