1use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
16use crate::indexes::IndexBackendPlugin;
17use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
18use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
19use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
20use drasi_core::interface::{CreatedIndexes, IndexSet, NoOpSessionControl};
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25#[derive(Debug)]
27pub enum IndexError {
28 UnknownStore(String),
30 ConnectionFailed(String),
32 PathError(String),
34 InitializationFailed(String),
36 NotSupported,
38}
39
40impl fmt::Display for IndexError {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 IndexError::UnknownStore(name) => {
44 write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
45 }
46 IndexError::ConnectionFailed(details) => {
47 write!(f, "Failed to connect to storage backend: {details}")
48 }
49 IndexError::PathError(details) => {
50 write!(f, "Storage path error: {details}")
51 }
52 IndexError::InitializationFailed(details) => {
53 write!(f, "Failed to initialize storage backend: {details}")
54 }
55 IndexError::NotSupported => {
56 write!(f, "Operation not supported")
57 }
58 }
59 }
60}
61
62impl std::error::Error for IndexError {}
63
64impl From<drasi_core::interface::IndexError> for IndexError {
65 fn from(err: drasi_core::interface::IndexError) -> Self {
66 IndexError::InitializationFailed(err.to_string())
67 }
68}
69
70pub struct IndexFactory {
72 backends: HashMap<String, StorageBackendSpec>,
74 plugin: Option<Arc<dyn IndexBackendPlugin>>,
76}
77
78impl fmt::Debug for IndexFactory {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 f.debug_struct("IndexFactory")
81 .field("backends", &self.backends)
82 .field("plugin", &self.plugin.as_ref().map(|_| "<plugin>"))
83 .finish()
84 }
85}
86
87impl IndexFactory {
88 pub fn new(
116 backends: Vec<StorageBackendConfig>,
117 plugin: Option<Arc<dyn IndexBackendPlugin>>,
118 ) -> Self {
119 let backends = backends.into_iter().map(|b| (b.id, b.spec)).collect();
120 Self { backends, plugin }
121 }
122
123 pub async fn build(
135 &self,
136 backend_ref: &StorageBackendRef,
137 query_id: &str,
138 ) -> Result<CreatedIndexes, IndexError> {
139 let spec = match backend_ref {
140 StorageBackendRef::Named(name) => self
141 .backends
142 .get(name)
143 .ok_or_else(|| IndexError::UnknownStore(name.clone()))?,
144 StorageBackendRef::Inline(spec) => spec,
145 };
146
147 self.build_from_spec(spec, query_id).await
148 }
149
150 async fn build_from_spec(
152 &self,
153 spec: &StorageBackendSpec,
154 query_id: &str,
155 ) -> Result<CreatedIndexes, IndexError> {
156 spec.validate().map_err(IndexError::InitializationFailed)?;
158
159 match spec {
160 StorageBackendSpec::Memory { enable_archive } => {
161 self.build_memory_indexes(*enable_archive)
162 }
163 StorageBackendSpec::RocksDb { .. } | StorageBackendSpec::Redis { .. } => {
164 match &self.plugin {
166 Some(plugin) => self.build_from_plugin(plugin, query_id).await,
167 None => Err(IndexError::InitializationFailed(
168 "RocksDB or Redis backend requested but no index provider configured. \
169 Use DrasiLib::builder().with_index_provider(...) to provide one."
170 .to_string(),
171 )),
172 }
173 }
174 }
175 }
176
177 fn build_memory_indexes(&self, enable_archive: bool) -> Result<CreatedIndexes, IndexError> {
179 let mut element_index = InMemoryElementIndex::new();
180 if enable_archive {
181 element_index.enable_archive();
182 }
183 let element_index = Arc::new(element_index);
184 let result_index = InMemoryResultIndex::new();
185 let future_queue = InMemoryFutureQueue::new();
186
187 Ok(CreatedIndexes {
188 set: IndexSet {
189 element_index: element_index.clone(),
190 archive_index: element_index,
191 result_index: Arc::new(result_index),
192 future_queue: Arc::new(future_queue),
193 session_control: Arc::new(NoOpSessionControl),
194 },
195 checkpoint_store: None,
196 })
197 }
198
199 async fn build_from_plugin(
201 &self,
202 plugin: &Arc<dyn IndexBackendPlugin>,
203 query_id: &str,
204 ) -> Result<CreatedIndexes, IndexError> {
205 plugin.create_indexes(query_id).await.map_err(|e| {
206 log::error!("Failed to create indexes for query '{query_id}': {e}");
207 IndexError::InitializationFailed(format!(
208 "Failed to create indexes for query '{query_id}': {e}"
209 ))
210 })
211 }
212
213 pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
219 let spec = match backend_ref {
220 StorageBackendRef::Named(name) => match self.backends.get(name) {
221 Some(spec) => spec,
222 None => return false, },
224 StorageBackendRef::Inline(spec) => spec,
225 };
226
227 spec.is_volatile()
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 #[test]
236 fn test_index_factory_new() {
237 let backends = vec![
238 StorageBackendConfig {
239 id: "memory_test".to_string(),
240 spec: StorageBackendSpec::Memory {
241 enable_archive: true,
242 },
243 },
244 StorageBackendConfig {
245 id: "rocks_test".to_string(),
246 spec: StorageBackendSpec::RocksDb {
247 path: "/tmp/test".to_string(),
248 enable_archive: false,
249 direct_io: false,
250 },
251 },
252 ];
253
254 let factory = IndexFactory::new(backends, None);
255 assert_eq!(factory.backends.len(), 2);
256 assert!(factory.backends.contains_key("memory_test"));
257 assert!(factory.backends.contains_key("rocks_test"));
258 }
259
260 #[tokio::test]
261 async fn test_build_memory_indexes() {
262 let backends = vec![StorageBackendConfig {
263 id: "memory_test".to_string(),
264 spec: StorageBackendSpec::Memory {
265 enable_archive: true,
266 },
267 }];
268
269 let factory = IndexFactory::new(backends, None);
270 let backend_ref = StorageBackendRef::Named("memory_test".to_string());
271 let result = factory.build(&backend_ref, "test_query").await;
272
273 assert!(result.is_ok());
274 }
275
276 #[tokio::test]
277 async fn test_build_unknown_backend() {
278 let factory = IndexFactory::new(vec![], None);
279 let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
280 let result = factory.build(&backend_ref, "test_query").await;
281
282 assert!(result.is_err());
283 match result.unwrap_err() {
284 IndexError::UnknownStore(name) => {
285 assert_eq!(name, "nonexistent");
286 }
287 _ => panic!("Expected UnknownStore error"),
288 }
289 }
290
291 #[tokio::test]
292 async fn test_build_inline_memory() {
293 let factory = IndexFactory::new(vec![], None);
294 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
295 enable_archive: false,
296 });
297 let result = factory.build(&backend_ref, "test_query").await;
298
299 assert!(result.is_ok());
300 }
301
302 #[tokio::test]
303 async fn test_build_rocksdb_without_plugin_errors() {
304 let factory = IndexFactory::new(vec![], None);
306 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
307 path: "/data/test".to_string(),
308 enable_archive: false,
309 direct_io: false,
310 });
311 let result = factory.build(&backend_ref, "test_query").await;
312
313 assert!(result.is_err());
314 match result.unwrap_err() {
315 IndexError::InitializationFailed(msg) => {
316 assert!(msg.contains("no index provider configured"));
317 }
318 _ => panic!("Expected InitializationFailed error"),
319 }
320 }
321
322 #[tokio::test]
323 async fn test_build_redis_without_plugin_errors() {
324 let factory = IndexFactory::new(vec![], None);
326 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
327 connection_string: "redis://localhost:6379".to_string(), cache_size: None,
329 });
330 let result = factory.build(&backend_ref, "test_query").await;
331
332 assert!(result.is_err());
333 match result.unwrap_err() {
334 IndexError::InitializationFailed(msg) => {
335 assert!(msg.contains("no index provider configured"));
336 }
337 _ => panic!("Expected InitializationFailed error"),
338 }
339 }
340
341 #[test]
342 fn test_is_volatile_memory() {
343 let backends = vec![StorageBackendConfig {
344 id: "memory_test".to_string(),
345 spec: StorageBackendSpec::Memory {
346 enable_archive: false,
347 },
348 }];
349
350 let factory = IndexFactory::new(backends, None);
351 let backend_ref = StorageBackendRef::Named("memory_test".to_string());
352 assert!(factory.is_volatile(&backend_ref));
353 }
354
355 #[test]
356 fn test_is_volatile_rocksdb() {
357 let backends = vec![StorageBackendConfig {
358 id: "rocks_test".to_string(),
359 spec: StorageBackendSpec::RocksDb {
360 path: "/data/test".to_string(),
361 enable_archive: false,
362 direct_io: false,
363 },
364 }];
365
366 let factory = IndexFactory::new(backends, None);
367 let backend_ref = StorageBackendRef::Named("rocks_test".to_string());
368 assert!(!factory.is_volatile(&backend_ref));
369 }
370
371 #[test]
372 fn test_is_volatile_inline() {
373 let factory = IndexFactory::new(vec![], None);
374 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
375 enable_archive: false,
376 });
377 assert!(factory.is_volatile(&backend_ref));
378
379 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
380 path: "/data/test".to_string(),
381 enable_archive: false,
382 direct_io: false,
383 });
384 assert!(!factory.is_volatile(&backend_ref));
385 }
386
387 #[test]
388 fn test_is_volatile_inline_redis() {
389 let factory = IndexFactory::new(vec![], None);
390 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
391 connection_string: "redis://localhost:6379".to_string(), cache_size: Some(1000),
393 });
394 assert!(!factory.is_volatile(&backend_ref));
395 }
396
397 #[test]
398 fn test_is_volatile_unknown_backend() {
399 let factory = IndexFactory::new(vec![], None);
401 let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
402 assert!(!factory.is_volatile(&backend_ref));
403 }
404
405 #[test]
406 fn test_index_error_display_unknown_store() {
407 let error = IndexError::UnknownStore("my_backend".to_string());
408 let display = format!("{error}");
409 assert!(display.contains("Unknown storage backend"));
410 assert!(display.contains("my_backend"));
411 }
412
413 #[test]
414 fn test_index_error_display_connection_failed() {
415 let error = IndexError::ConnectionFailed("Connection refused".to_string());
416 let display = format!("{error}");
417 assert!(display.contains("Failed to connect"));
418 assert!(display.contains("Connection refused"));
419 }
420
421 #[test]
422 fn test_index_error_display_path_error() {
423 let error = IndexError::PathError("/invalid/path".to_string());
424 let display = format!("{error}");
425 assert!(display.contains("Storage path error"));
426 assert!(display.contains("/invalid/path"));
427 }
428
429 #[test]
430 fn test_index_error_display_initialization_failed() {
431 let error = IndexError::InitializationFailed("Database init failed".to_string());
432 let display = format!("{error}");
433 assert!(display.contains("Failed to initialize"));
434 assert!(display.contains("Database init failed"));
435 }
436
437 #[test]
438 fn test_index_error_display_not_supported() {
439 let error = IndexError::NotSupported;
440 let display = format!("{error}");
441 assert!(display.contains("not supported"));
442 }
443
444 #[test]
445 fn test_index_error_is_std_error() {
446 let error = IndexError::UnknownStore("test".to_string());
447 let _: &dyn std::error::Error = &error;
449 }
450
451 #[test]
452 fn test_index_error_from_drasi_core_index_error() {
453 let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
455 let core_error = drasi_core::interface::IndexError::other(io_error);
456 let error: IndexError = core_error.into();
457 match error {
458 IndexError::InitializationFailed(msg) => {
459 assert!(msg.contains("test error"));
460 }
461 _ => panic!("Expected InitializationFailed error"),
462 }
463 }
464
465 #[test]
466 fn test_index_set_debug() {
467 let backends = vec![StorageBackendConfig {
470 id: "memory_test".to_string(),
471 spec: StorageBackendSpec::Memory {
472 enable_archive: false,
473 },
474 }];
475 let factory = IndexFactory::new(backends, None);
476
477 let rt = tokio::runtime::Runtime::new().unwrap();
479 let index_set = rt
480 .block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
481 .unwrap();
482
483 let debug_str = format!("{index_set:?}");
484 assert!(debug_str.contains("IndexSet"));
485 assert!(debug_str.contains("element_index"));
486 assert!(debug_str.contains("archive_index"));
487 assert!(debug_str.contains("result_index"));
488 assert!(debug_str.contains("future_queue"));
489 }
490
491 #[test]
492 fn test_index_factory_debug() {
493 let backends = vec![StorageBackendConfig {
494 id: "memory_test".to_string(),
495 spec: StorageBackendSpec::Memory {
496 enable_archive: true,
497 },
498 }];
499 let factory = IndexFactory::new(backends, None);
500 let debug_str = format!("{factory:?}");
501 assert!(debug_str.contains("IndexFactory"));
502 assert!(debug_str.contains("backends"));
503 assert!(debug_str.contains("memory_test"));
504 }
505
506 #[test]
507 fn test_index_factory_debug_with_plugin() {
508 use crate::indexes::IndexBackendPlugin;
509 use async_trait::async_trait;
510
511 struct MockPlugin;
513
514 #[async_trait]
515 impl IndexBackendPlugin for MockPlugin {
516 async fn create_indexes(
517 &self,
518 _query_id: &str,
519 ) -> Result<drasi_core::interface::CreatedIndexes, drasi_core::interface::IndexError>
520 {
521 unimplemented!()
522 }
523
524 fn is_volatile(&self) -> bool {
525 false
526 }
527 }
528
529 let factory = IndexFactory::new(vec![], Some(Arc::new(MockPlugin)));
530 let debug_str = format!("{factory:?}");
531 assert!(debug_str.contains("IndexFactory"));
532 assert!(debug_str.contains("plugin"));
533 assert!(debug_str.contains("<plugin>"));
534 }
535
536 #[tokio::test]
537 async fn test_build_memory_without_archive() {
538 let factory = IndexFactory::new(vec![], None);
539 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
540 enable_archive: false,
541 });
542 let result = factory.build(&backend_ref, "test_query").await;
543 assert!(result.is_ok());
544 }
545
546 #[tokio::test]
547 async fn test_build_memory_with_archive() {
548 let factory = IndexFactory::new(vec![], None);
549 let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
550 enable_archive: true,
551 });
552 let result = factory.build(&backend_ref, "test_query").await;
553 assert!(result.is_ok());
554 }
555}