1use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
16use crate::indexes::IndexBackendPlugin;
17use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
18use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
19use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
20use drasi_core::interface::{ElementArchiveIndex, ElementIndex, FutureQueue, ResultIndex};
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25#[derive(Debug)]
27pub enum IndexError {
28 UnknownStore(String),
30 ConnectionFailed(String),
32 PathError(String),
34 InitializationFailed(String),
36 NotSupported,
38}
39
40impl fmt::Display for IndexError {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 IndexError::UnknownStore(name) => {
44 write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
45 }
46 IndexError::ConnectionFailed(details) => {
47 write!(f, "Failed to connect to storage backend: {details}")
48 }
49 IndexError::PathError(details) => {
50 write!(f, "Storage path error: {details}")
51 }
52 IndexError::InitializationFailed(details) => {
53 write!(f, "Failed to initialize storage backend: {details}")
54 }
55 IndexError::NotSupported => {
56 write!(f, "Operation not supported")
57 }
58 }
59 }
60}
61
62impl std::error::Error for IndexError {}
63
64impl From<drasi_core::interface::IndexError> for IndexError {
65 fn from(err: drasi_core::interface::IndexError) -> Self {
66 IndexError::InitializationFailed(err.to_string())
67 }
68}
69
70pub struct IndexSet {
72 pub element_index: Arc<dyn ElementIndex>,
74 pub archive_index: Arc<dyn ElementArchiveIndex>,
76 pub result_index: Arc<dyn ResultIndex>,
78 pub future_queue: Arc<dyn FutureQueue>,
80}
81
82impl fmt::Debug for IndexSet {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("IndexSet")
85 .field("element_index", &"<trait object>")
86 .field("archive_index", &"<trait object>")
87 .field("result_index", &"<trait object>")
88 .field("future_queue", &"<trait object>")
89 .finish()
90 }
91}
92
93pub struct IndexFactory {
95 backends: HashMap<String, StorageBackendSpec>,
97 plugin: Option<Arc<dyn IndexBackendPlugin>>,
99}
100
101impl fmt::Debug for IndexFactory {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 f.debug_struct("IndexFactory")
104 .field("backends", &self.backends)
105 .field("plugin", &self.plugin.as_ref().map(|_| "<plugin>"))
106 .finish()
107 }
108}
109
110impl IndexFactory {
111 pub fn new(
139 backends: Vec<StorageBackendConfig>,
140 plugin: Option<Arc<dyn IndexBackendPlugin>>,
141 ) -> Self {
142 let backends = backends.into_iter().map(|b| (b.id, b.spec)).collect();
143 Self { backends, plugin }
144 }
145
146 pub async fn build(
158 &self,
159 backend_ref: &StorageBackendRef,
160 query_id: &str,
161 ) -> Result<IndexSet, IndexError> {
162 let spec = match backend_ref {
163 StorageBackendRef::Named(name) => self
164 .backends
165 .get(name)
166 .ok_or_else(|| IndexError::UnknownStore(name.clone()))?,
167 StorageBackendRef::Inline(spec) => spec,
168 };
169
170 self.build_from_spec(spec, query_id).await
171 }
172
173 async fn build_from_spec(
175 &self,
176 spec: &StorageBackendSpec,
177 query_id: &str,
178 ) -> Result<IndexSet, IndexError> {
179 spec.validate().map_err(IndexError::InitializationFailed)?;
181
182 match spec {
183 StorageBackendSpec::Memory { enable_archive } => {
184 self.build_memory_indexes(*enable_archive)
185 }
186 StorageBackendSpec::RocksDb { .. } | StorageBackendSpec::Redis { .. } => {
187 match &self.plugin {
189 Some(plugin) => self.build_from_plugin(plugin, query_id).await,
190 None => Err(IndexError::InitializationFailed(
191 "RocksDB or Redis backend requested but no index provider configured. \
192 Use DrasiLib::builder().with_index_provider(...) to provide one."
193 .to_string(),
194 )),
195 }
196 }
197 }
198 }
199
200 fn build_memory_indexes(&self, enable_archive: bool) -> Result<IndexSet, IndexError> {
202 let mut element_index = InMemoryElementIndex::new();
203 if enable_archive {
204 element_index.enable_archive();
205 }
206 let element_index = Arc::new(element_index);
207 let result_index = InMemoryResultIndex::new();
208 let future_queue = InMemoryFutureQueue::new();
209
210 Ok(IndexSet {
211 element_index: element_index.clone(),
212 archive_index: element_index,
213 result_index: Arc::new(result_index),
214 future_queue: Arc::new(future_queue),
215 })
216 }
217
218 async fn build_from_plugin(
220 &self,
221 plugin: &Arc<dyn IndexBackendPlugin>,
222 query_id: &str,
223 ) -> Result<IndexSet, IndexError> {
224 let element_index = plugin.create_element_index(query_id).await.map_err(|e| {
225 log::error!("Failed to create element index for query '{query_id}': {e}");
226 IndexError::InitializationFailed(format!(
227 "Failed to create element index for query '{query_id}': {e}"
228 ))
229 })?;
230
231 let archive_index = plugin.create_archive_index(query_id).await.map_err(|e| {
232 log::error!("Failed to create archive index for query '{query_id}': {e}");
233 IndexError::InitializationFailed(format!(
234 "Failed to create archive index for query '{query_id}': {e}"
235 ))
236 })?;
237
238 let result_index = plugin.create_result_index(query_id).await.map_err(|e| {
239 log::error!("Failed to create result index for query '{query_id}': {e}");
240 IndexError::InitializationFailed(format!(
241 "Failed to create result index for query '{query_id}': {e}"
242 ))
243 })?;
244
245 let future_queue = plugin.create_future_queue(query_id).await.map_err(|e| {
246 log::error!("Failed to create future queue for query '{query_id}': {e}");
247 IndexError::InitializationFailed(format!(
248 "Failed to create future queue for query '{query_id}': {e}"
249 ))
250 })?;
251
252 Ok(IndexSet {
253 element_index,
254 archive_index,
255 result_index,
256 future_queue,
257 })
258 }
259
260 pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
266 let spec = match backend_ref {
267 StorageBackendRef::Named(name) => match self.backends.get(name) {
268 Some(spec) => spec,
269 None => return false, },
271 StorageBackendRef::Inline(spec) => spec,
272 };
273
274 spec.is_volatile()
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[test]
283 fn test_index_factory_new() {
284 let backends = vec![
285 StorageBackendConfig {
286 id: "memory_test".to_string(),
287 spec: StorageBackendSpec::Memory {
288 enable_archive: true,
289 },
290 },
291 StorageBackendConfig {
292 id: "rocks_test".to_string(),
293 spec: StorageBackendSpec::RocksDb {
294 path: "/tmp/test".to_string(),
295 enable_archive: false,
296 direct_io: false,
297 },
298 },
299 ];
300
301 let factory = IndexFactory::new(backends, None);
302 assert_eq!(factory.backends.len(), 2);
303 assert!(factory.backends.contains_key("memory_test"));
304 assert!(factory.backends.contains_key("rocks_test"));
305 }
306
307 #[tokio::test]
308 async fn test_build_memory_indexes() {
309 let backends = vec![StorageBackendConfig {
310 id: "memory_test".to_string(),
311 spec: StorageBackendSpec::Memory {
312 enable_archive: true,
313 },
314 }];
315
316 let factory = IndexFactory::new(backends, None);
317 let backend_ref = StorageBackendRef::Named("memory_test".to_string());
318 let result = factory.build(&backend_ref, "test_query").await;
319
320 assert!(result.is_ok());
321 }
322
323 #[tokio::test]
324 async fn test_build_unknown_backend() {
325 let factory = IndexFactory::new(vec![], None);
326 let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
327 let result = factory.build(&backend_ref, "test_query").await;
328
329 assert!(result.is_err());
330 match result.unwrap_err() {
331 IndexError::UnknownStore(name) => {
332 assert_eq!(name, "nonexistent");
333 }
334 _ => panic!("Expected UnknownStore error"),
335 }
336 }
337
338 #[tokio::test]
339 async fn test_build_inline_memory() {
340 let factory = IndexFactory::new(vec![], None);
341 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
342 enable_archive: false,
343 });
344 let result = factory.build(&backend_ref, "test_query").await;
345
346 assert!(result.is_ok());
347 }
348
349 #[tokio::test]
350 async fn test_build_rocksdb_without_plugin_errors() {
351 let factory = IndexFactory::new(vec![], None);
353 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
354 path: "/data/test".to_string(),
355 enable_archive: false,
356 direct_io: false,
357 });
358 let result = factory.build(&backend_ref, "test_query").await;
359
360 assert!(result.is_err());
361 match result.unwrap_err() {
362 IndexError::InitializationFailed(msg) => {
363 assert!(msg.contains("no index provider configured"));
364 }
365 _ => panic!("Expected InitializationFailed error"),
366 }
367 }
368
369 #[tokio::test]
370 async fn test_build_redis_without_plugin_errors() {
371 let factory = IndexFactory::new(vec![], None);
373 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
374 connection_string: "redis://localhost:6379".to_string(), cache_size: None,
376 });
377 let result = factory.build(&backend_ref, "test_query").await;
378
379 assert!(result.is_err());
380 match result.unwrap_err() {
381 IndexError::InitializationFailed(msg) => {
382 assert!(msg.contains("no index provider configured"));
383 }
384 _ => panic!("Expected InitializationFailed error"),
385 }
386 }
387
388 #[test]
389 fn test_is_volatile_memory() {
390 let backends = vec![StorageBackendConfig {
391 id: "memory_test".to_string(),
392 spec: StorageBackendSpec::Memory {
393 enable_archive: false,
394 },
395 }];
396
397 let factory = IndexFactory::new(backends, None);
398 let backend_ref = StorageBackendRef::Named("memory_test".to_string());
399 assert!(factory.is_volatile(&backend_ref));
400 }
401
402 #[test]
403 fn test_is_volatile_rocksdb() {
404 let backends = vec![StorageBackendConfig {
405 id: "rocks_test".to_string(),
406 spec: StorageBackendSpec::RocksDb {
407 path: "/data/test".to_string(),
408 enable_archive: false,
409 direct_io: false,
410 },
411 }];
412
413 let factory = IndexFactory::new(backends, None);
414 let backend_ref = StorageBackendRef::Named("rocks_test".to_string());
415 assert!(!factory.is_volatile(&backend_ref));
416 }
417
418 #[test]
419 fn test_is_volatile_inline() {
420 let factory = IndexFactory::new(vec![], None);
421 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
422 enable_archive: false,
423 });
424 assert!(factory.is_volatile(&backend_ref));
425
426 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
427 path: "/data/test".to_string(),
428 enable_archive: false,
429 direct_io: false,
430 });
431 assert!(!factory.is_volatile(&backend_ref));
432 }
433
434 #[test]
435 fn test_is_volatile_inline_redis() {
436 let factory = IndexFactory::new(vec![], None);
437 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
438 connection_string: "redis://localhost:6379".to_string(), cache_size: Some(1000),
440 });
441 assert!(!factory.is_volatile(&backend_ref));
442 }
443
444 #[test]
445 fn test_is_volatile_unknown_backend() {
446 let factory = IndexFactory::new(vec![], None);
448 let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
449 assert!(!factory.is_volatile(&backend_ref));
450 }
451
452 #[test]
453 fn test_index_error_display_unknown_store() {
454 let error = IndexError::UnknownStore("my_backend".to_string());
455 let display = format!("{error}");
456 assert!(display.contains("Unknown storage backend"));
457 assert!(display.contains("my_backend"));
458 }
459
460 #[test]
461 fn test_index_error_display_connection_failed() {
462 let error = IndexError::ConnectionFailed("Connection refused".to_string());
463 let display = format!("{error}");
464 assert!(display.contains("Failed to connect"));
465 assert!(display.contains("Connection refused"));
466 }
467
468 #[test]
469 fn test_index_error_display_path_error() {
470 let error = IndexError::PathError("/invalid/path".to_string());
471 let display = format!("{error}");
472 assert!(display.contains("Storage path error"));
473 assert!(display.contains("/invalid/path"));
474 }
475
476 #[test]
477 fn test_index_error_display_initialization_failed() {
478 let error = IndexError::InitializationFailed("Database init failed".to_string());
479 let display = format!("{error}");
480 assert!(display.contains("Failed to initialize"));
481 assert!(display.contains("Database init failed"));
482 }
483
484 #[test]
485 fn test_index_error_display_not_supported() {
486 let error = IndexError::NotSupported;
487 let display = format!("{error}");
488 assert!(display.contains("not supported"));
489 }
490
491 #[test]
492 fn test_index_error_is_std_error() {
493 let error = IndexError::UnknownStore("test".to_string());
494 let _: &dyn std::error::Error = &error;
496 }
497
498 #[test]
499 fn test_index_error_from_drasi_core_index_error() {
500 let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
502 let core_error = drasi_core::interface::IndexError::other(io_error);
503 let error: IndexError = core_error.into();
504 match error {
505 IndexError::InitializationFailed(msg) => {
506 assert!(msg.contains("test error"));
507 }
508 _ => panic!("Expected InitializationFailed error"),
509 }
510 }
511
512 #[test]
513 fn test_index_set_debug() {
514 let backends = vec![StorageBackendConfig {
517 id: "memory_test".to_string(),
518 spec: StorageBackendSpec::Memory {
519 enable_archive: false,
520 },
521 }];
522 let factory = IndexFactory::new(backends, None);
523
524 let rt = tokio::runtime::Runtime::new().unwrap();
526 let index_set = rt
527 .block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
528 .unwrap();
529
530 let debug_str = format!("{index_set:?}");
531 assert!(debug_str.contains("IndexSet"));
532 assert!(debug_str.contains("element_index"));
533 assert!(debug_str.contains("archive_index"));
534 assert!(debug_str.contains("result_index"));
535 assert!(debug_str.contains("future_queue"));
536 }
537
538 #[test]
539 fn test_index_factory_debug() {
540 let backends = vec![StorageBackendConfig {
541 id: "memory_test".to_string(),
542 spec: StorageBackendSpec::Memory {
543 enable_archive: true,
544 },
545 }];
546 let factory = IndexFactory::new(backends, None);
547 let debug_str = format!("{factory:?}");
548 assert!(debug_str.contains("IndexFactory"));
549 assert!(debug_str.contains("backends"));
550 assert!(debug_str.contains("memory_test"));
551 }
552
553 #[test]
554 fn test_index_factory_debug_with_plugin() {
555 use crate::indexes::IndexBackendPlugin;
556 use async_trait::async_trait;
557
558 struct MockPlugin;
560
561 #[async_trait]
562 impl IndexBackendPlugin for MockPlugin {
563 async fn create_element_index(
564 &self,
565 _query_id: &str,
566 ) -> Result<
567 Arc<dyn drasi_core::interface::ElementIndex>,
568 drasi_core::interface::IndexError,
569 > {
570 unimplemented!()
571 }
572
573 async fn create_archive_index(
574 &self,
575 _query_id: &str,
576 ) -> Result<
577 Arc<dyn drasi_core::interface::ElementArchiveIndex>,
578 drasi_core::interface::IndexError,
579 > {
580 unimplemented!()
581 }
582
583 async fn create_result_index(
584 &self,
585 _query_id: &str,
586 ) -> Result<
587 Arc<dyn drasi_core::interface::ResultIndex>,
588 drasi_core::interface::IndexError,
589 > {
590 unimplemented!()
591 }
592
593 async fn create_future_queue(
594 &self,
595 _query_id: &str,
596 ) -> Result<
597 Arc<dyn drasi_core::interface::FutureQueue>,
598 drasi_core::interface::IndexError,
599 > {
600 unimplemented!()
601 }
602
603 fn is_volatile(&self) -> bool {
604 false
605 }
606 }
607
608 let factory = IndexFactory::new(vec![], Some(Arc::new(MockPlugin)));
609 let debug_str = format!("{factory:?}");
610 assert!(debug_str.contains("IndexFactory"));
611 assert!(debug_str.contains("plugin"));
612 assert!(debug_str.contains("<plugin>"));
613 }
614
615 #[tokio::test]
616 async fn test_build_memory_without_archive() {
617 let factory = IndexFactory::new(vec![], None);
618 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
619 enable_archive: false,
620 });
621 let result = factory.build(&backend_ref, "test_query").await;
622 assert!(result.is_ok());
623 }
624
625 #[tokio::test]
626 async fn test_build_memory_with_archive() {
627 let factory = IndexFactory::new(vec![], None);
628 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
629 enable_archive: true,
630 });
631 let result = factory.build(&backend_ref, "test_query").await;
632 assert!(result.is_ok());
633 }
634}