1use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
16use crate::indexes::IndexBackendPlugin;
17use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
18use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
19use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
20use drasi_core::interface::{IndexSet, NoOpSessionControl};
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25#[derive(Debug)]
27pub enum IndexError {
28 UnknownStore(String),
30 ConnectionFailed(String),
32 PathError(String),
34 InitializationFailed(String),
36 NotSupported,
38}
39
40impl fmt::Display for IndexError {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 IndexError::UnknownStore(name) => {
44 write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
45 }
46 IndexError::ConnectionFailed(details) => {
47 write!(f, "Failed to connect to storage backend: {details}")
48 }
49 IndexError::PathError(details) => {
50 write!(f, "Storage path error: {details}")
51 }
52 IndexError::InitializationFailed(details) => {
53 write!(f, "Failed to initialize storage backend: {details}")
54 }
55 IndexError::NotSupported => {
56 write!(f, "Operation not supported")
57 }
58 }
59 }
60}
61
62impl std::error::Error for IndexError {}
63
64impl From<drasi_core::interface::IndexError> for IndexError {
65 fn from(err: drasi_core::interface::IndexError) -> Self {
66 IndexError::InitializationFailed(err.to_string())
67 }
68}
69
70pub struct IndexFactory {
72 backends: HashMap<String, StorageBackendSpec>,
74 plugin: Option<Arc<dyn IndexBackendPlugin>>,
76}
77
78impl fmt::Debug for IndexFactory {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 f.debug_struct("IndexFactory")
81 .field("backends", &self.backends)
82 .field("plugin", &self.plugin.as_ref().map(|_| "<plugin>"))
83 .finish()
84 }
85}
86
87impl IndexFactory {
88 pub fn new(
116 backends: Vec<StorageBackendConfig>,
117 plugin: Option<Arc<dyn IndexBackendPlugin>>,
118 ) -> Self {
119 let backends = backends.into_iter().map(|b| (b.id, b.spec)).collect();
120 Self { backends, plugin }
121 }
122
123 pub async fn build(
135 &self,
136 backend_ref: &StorageBackendRef,
137 query_id: &str,
138 ) -> Result<IndexSet, IndexError> {
139 let spec = match backend_ref {
140 StorageBackendRef::Named(name) => self
141 .backends
142 .get(name)
143 .ok_or_else(|| IndexError::UnknownStore(name.clone()))?,
144 StorageBackendRef::Inline(spec) => spec,
145 };
146
147 self.build_from_spec(spec, query_id).await
148 }
149
150 async fn build_from_spec(
152 &self,
153 spec: &StorageBackendSpec,
154 query_id: &str,
155 ) -> Result<IndexSet, IndexError> {
156 spec.validate().map_err(IndexError::InitializationFailed)?;
158
159 match spec {
160 StorageBackendSpec::Memory { enable_archive } => {
161 self.build_memory_indexes(*enable_archive)
162 }
163 StorageBackendSpec::RocksDb { .. } | StorageBackendSpec::Redis { .. } => {
164 match &self.plugin {
166 Some(plugin) => self.build_from_plugin(plugin, query_id).await,
167 None => Err(IndexError::InitializationFailed(
168 "RocksDB or Redis backend requested but no index provider configured. \
169 Use DrasiLib::builder().with_index_provider(...) to provide one."
170 .to_string(),
171 )),
172 }
173 }
174 }
175 }
176
177 fn build_memory_indexes(&self, enable_archive: bool) -> Result<IndexSet, IndexError> {
179 let mut element_index = InMemoryElementIndex::new();
180 if enable_archive {
181 element_index.enable_archive();
182 }
183 let element_index = Arc::new(element_index);
184 let result_index = InMemoryResultIndex::new();
185 let future_queue = InMemoryFutureQueue::new();
186
187 Ok(IndexSet {
188 element_index: element_index.clone(),
189 archive_index: element_index,
190 result_index: Arc::new(result_index),
191 future_queue: Arc::new(future_queue),
192 session_control: Arc::new(NoOpSessionControl),
193 })
194 }
195
196 async fn build_from_plugin(
198 &self,
199 plugin: &Arc<dyn IndexBackendPlugin>,
200 query_id: &str,
201 ) -> Result<IndexSet, IndexError> {
202 plugin.create_index_set(query_id).await.map_err(|e| {
203 log::error!("Failed to create index set for query '{query_id}': {e}");
204 IndexError::InitializationFailed(format!(
205 "Failed to create index set for query '{query_id}': {e}"
206 ))
207 })
208 }
209
210 pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
216 let spec = match backend_ref {
217 StorageBackendRef::Named(name) => match self.backends.get(name) {
218 Some(spec) => spec,
219 None => return false, },
221 StorageBackendRef::Inline(spec) => spec,
222 };
223
224 spec.is_volatile()
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn test_index_factory_new() {
234 let backends = vec![
235 StorageBackendConfig {
236 id: "memory_test".to_string(),
237 spec: StorageBackendSpec::Memory {
238 enable_archive: true,
239 },
240 },
241 StorageBackendConfig {
242 id: "rocks_test".to_string(),
243 spec: StorageBackendSpec::RocksDb {
244 path: "/tmp/test".to_string(),
245 enable_archive: false,
246 direct_io: false,
247 },
248 },
249 ];
250
251 let factory = IndexFactory::new(backends, None);
252 assert_eq!(factory.backends.len(), 2);
253 assert!(factory.backends.contains_key("memory_test"));
254 assert!(factory.backends.contains_key("rocks_test"));
255 }
256
257 #[tokio::test]
258 async fn test_build_memory_indexes() {
259 let backends = vec![StorageBackendConfig {
260 id: "memory_test".to_string(),
261 spec: StorageBackendSpec::Memory {
262 enable_archive: true,
263 },
264 }];
265
266 let factory = IndexFactory::new(backends, None);
267 let backend_ref = StorageBackendRef::Named("memory_test".to_string());
268 let result = factory.build(&backend_ref, "test_query").await;
269
270 assert!(result.is_ok());
271 }
272
273 #[tokio::test]
274 async fn test_build_unknown_backend() {
275 let factory = IndexFactory::new(vec![], None);
276 let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
277 let result = factory.build(&backend_ref, "test_query").await;
278
279 assert!(result.is_err());
280 match result.unwrap_err() {
281 IndexError::UnknownStore(name) => {
282 assert_eq!(name, "nonexistent");
283 }
284 _ => panic!("Expected UnknownStore error"),
285 }
286 }
287
288 #[tokio::test]
289 async fn test_build_inline_memory() {
290 let factory = IndexFactory::new(vec![], None);
291 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
292 enable_archive: false,
293 });
294 let result = factory.build(&backend_ref, "test_query").await;
295
296 assert!(result.is_ok());
297 }
298
299 #[tokio::test]
300 async fn test_build_rocksdb_without_plugin_errors() {
301 let factory = IndexFactory::new(vec![], None);
303 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
304 path: "/data/test".to_string(),
305 enable_archive: false,
306 direct_io: false,
307 });
308 let result = factory.build(&backend_ref, "test_query").await;
309
310 assert!(result.is_err());
311 match result.unwrap_err() {
312 IndexError::InitializationFailed(msg) => {
313 assert!(msg.contains("no index provider configured"));
314 }
315 _ => panic!("Expected InitializationFailed error"),
316 }
317 }
318
319 #[tokio::test]
320 async fn test_build_redis_without_plugin_errors() {
321 let factory = IndexFactory::new(vec![], None);
323 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
324 connection_string: "redis://localhost:6379".to_string(), cache_size: None,
326 });
327 let result = factory.build(&backend_ref, "test_query").await;
328
329 assert!(result.is_err());
330 match result.unwrap_err() {
331 IndexError::InitializationFailed(msg) => {
332 assert!(msg.contains("no index provider configured"));
333 }
334 _ => panic!("Expected InitializationFailed error"),
335 }
336 }
337
338 #[test]
339 fn test_is_volatile_memory() {
340 let backends = vec![StorageBackendConfig {
341 id: "memory_test".to_string(),
342 spec: StorageBackendSpec::Memory {
343 enable_archive: false,
344 },
345 }];
346
347 let factory = IndexFactory::new(backends, None);
348 let backend_ref = StorageBackendRef::Named("memory_test".to_string());
349 assert!(factory.is_volatile(&backend_ref));
350 }
351
352 #[test]
353 fn test_is_volatile_rocksdb() {
354 let backends = vec![StorageBackendConfig {
355 id: "rocks_test".to_string(),
356 spec: StorageBackendSpec::RocksDb {
357 path: "/data/test".to_string(),
358 enable_archive: false,
359 direct_io: false,
360 },
361 }];
362
363 let factory = IndexFactory::new(backends, None);
364 let backend_ref = StorageBackendRef::Named("rocks_test".to_string());
365 assert!(!factory.is_volatile(&backend_ref));
366 }
367
368 #[test]
369 fn test_is_volatile_inline() {
370 let factory = IndexFactory::new(vec![], None);
371 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
372 enable_archive: false,
373 });
374 assert!(factory.is_volatile(&backend_ref));
375
376 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
377 path: "/data/test".to_string(),
378 enable_archive: false,
379 direct_io: false,
380 });
381 assert!(!factory.is_volatile(&backend_ref));
382 }
383
384 #[test]
385 fn test_is_volatile_inline_redis() {
386 let factory = IndexFactory::new(vec![], None);
387 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
388 connection_string: "redis://localhost:6379".to_string(), cache_size: Some(1000),
390 });
391 assert!(!factory.is_volatile(&backend_ref));
392 }
393
394 #[test]
395 fn test_is_volatile_unknown_backend() {
396 let factory = IndexFactory::new(vec![], None);
398 let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
399 assert!(!factory.is_volatile(&backend_ref));
400 }
401
402 #[test]
403 fn test_index_error_display_unknown_store() {
404 let error = IndexError::UnknownStore("my_backend".to_string());
405 let display = format!("{error}");
406 assert!(display.contains("Unknown storage backend"));
407 assert!(display.contains("my_backend"));
408 }
409
410 #[test]
411 fn test_index_error_display_connection_failed() {
412 let error = IndexError::ConnectionFailed("Connection refused".to_string());
413 let display = format!("{error}");
414 assert!(display.contains("Failed to connect"));
415 assert!(display.contains("Connection refused"));
416 }
417
418 #[test]
419 fn test_index_error_display_path_error() {
420 let error = IndexError::PathError("/invalid/path".to_string());
421 let display = format!("{error}");
422 assert!(display.contains("Storage path error"));
423 assert!(display.contains("/invalid/path"));
424 }
425
426 #[test]
427 fn test_index_error_display_initialization_failed() {
428 let error = IndexError::InitializationFailed("Database init failed".to_string());
429 let display = format!("{error}");
430 assert!(display.contains("Failed to initialize"));
431 assert!(display.contains("Database init failed"));
432 }
433
434 #[test]
435 fn test_index_error_display_not_supported() {
436 let error = IndexError::NotSupported;
437 let display = format!("{error}");
438 assert!(display.contains("not supported"));
439 }
440
441 #[test]
442 fn test_index_error_is_std_error() {
443 let error = IndexError::UnknownStore("test".to_string());
444 let _: &dyn std::error::Error = &error;
446 }
447
448 #[test]
449 fn test_index_error_from_drasi_core_index_error() {
450 let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
452 let core_error = drasi_core::interface::IndexError::other(io_error);
453 let error: IndexError = core_error.into();
454 match error {
455 IndexError::InitializationFailed(msg) => {
456 assert!(msg.contains("test error"));
457 }
458 _ => panic!("Expected InitializationFailed error"),
459 }
460 }
461
462 #[test]
463 fn test_index_set_debug() {
464 let backends = vec![StorageBackendConfig {
467 id: "memory_test".to_string(),
468 spec: StorageBackendSpec::Memory {
469 enable_archive: false,
470 },
471 }];
472 let factory = IndexFactory::new(backends, None);
473
474 let rt = tokio::runtime::Runtime::new().unwrap();
476 let index_set = rt
477 .block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
478 .unwrap();
479
480 let debug_str = format!("{index_set:?}");
481 assert!(debug_str.contains("IndexSet"));
482 assert!(debug_str.contains("element_index"));
483 assert!(debug_str.contains("archive_index"));
484 assert!(debug_str.contains("result_index"));
485 assert!(debug_str.contains("future_queue"));
486 }
487
488 #[test]
489 fn test_index_factory_debug() {
490 let backends = vec![StorageBackendConfig {
491 id: "memory_test".to_string(),
492 spec: StorageBackendSpec::Memory {
493 enable_archive: true,
494 },
495 }];
496 let factory = IndexFactory::new(backends, None);
497 let debug_str = format!("{factory:?}");
498 assert!(debug_str.contains("IndexFactory"));
499 assert!(debug_str.contains("backends"));
500 assert!(debug_str.contains("memory_test"));
501 }
502
503 #[test]
504 fn test_index_factory_debug_with_plugin() {
505 use crate::indexes::IndexBackendPlugin;
506 use async_trait::async_trait;
507
508 struct MockPlugin;
510
511 #[async_trait]
512 impl IndexBackendPlugin for MockPlugin {
513 async fn create_index_set(
514 &self,
515 _query_id: &str,
516 ) -> Result<drasi_core::interface::IndexSet, drasi_core::interface::IndexError>
517 {
518 unimplemented!()
519 }
520
521 fn is_volatile(&self) -> bool {
522 false
523 }
524 }
525
526 let factory = IndexFactory::new(vec![], Some(Arc::new(MockPlugin)));
527 let debug_str = format!("{factory:?}");
528 assert!(debug_str.contains("IndexFactory"));
529 assert!(debug_str.contains("plugin"));
530 assert!(debug_str.contains("<plugin>"));
531 }
532
533 #[tokio::test]
534 async fn test_build_memory_without_archive() {
535 let factory = IndexFactory::new(vec![], None);
536 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
537 enable_archive: false,
538 });
539 let result = factory.build(&backend_ref, "test_query").await;
540 assert!(result.is_ok());
541 }
542
543 #[tokio::test]
544 async fn test_build_memory_with_archive() {
545 let factory = IndexFactory::new(vec![], None);
546 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
547 enable_archive: true,
548 });
549 let result = factory.build(&backend_ref, "test_query").await;
550 assert!(result.is_ok());
551 }
552}