Skip to main content

drasi_lib/indexes/
factory.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
16use crate::indexes::IndexBackendPlugin;
17use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
18use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
19use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
20use drasi_core::interface::{ElementArchiveIndex, ElementIndex, FutureQueue, ResultIndex};
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25/// Error type for index factory operations
26#[derive(Debug)]
27pub enum IndexError {
28    /// Referenced storage backend does not exist
29    UnknownStore(String),
30    /// Failed to connect to Redis/Garnet
31    ConnectionFailed(String),
32    /// RocksDB path error (doesn't exist, no permissions, etc.)
33    PathError(String),
34    /// Generic initialization failure
35    InitializationFailed(String),
36    /// Feature not supported
37    NotSupported,
38}
39
40impl fmt::Display for IndexError {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            IndexError::UnknownStore(name) => {
44                write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
45            }
46            IndexError::ConnectionFailed(details) => {
47                write!(f, "Failed to connect to storage backend: {details}")
48            }
49            IndexError::PathError(details) => {
50                write!(f, "Storage path error: {details}")
51            }
52            IndexError::InitializationFailed(details) => {
53                write!(f, "Failed to initialize storage backend: {details}")
54            }
55            IndexError::NotSupported => {
56                write!(f, "Operation not supported")
57            }
58        }
59    }
60}
61
62impl std::error::Error for IndexError {}
63
64impl From<drasi_core::interface::IndexError> for IndexError {
65    fn from(err: drasi_core::interface::IndexError) -> Self {
66        IndexError::InitializationFailed(err.to_string())
67    }
68}
69
70/// Set of indexes for a query
71pub struct IndexSet {
72    /// Element index for storing graph elements
73    pub element_index: Arc<dyn ElementIndex>,
74    /// Archive index for storing historical elements (for past() function)
75    pub archive_index: Arc<dyn ElementArchiveIndex>,
76    /// Result index for storing query results
77    pub result_index: Arc<dyn ResultIndex>,
78    /// Future queue for temporal queries
79    pub future_queue: Arc<dyn FutureQueue>,
80}
81
82impl fmt::Debug for IndexSet {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("IndexSet")
85            .field("element_index", &"<trait object>")
86            .field("archive_index", &"<trait object>")
87            .field("result_index", &"<trait object>")
88            .field("future_queue", &"<trait object>")
89            .finish()
90    }
91}
92
93/// Factory for creating index sets based on storage backend configuration
94pub struct IndexFactory {
95    /// Map of backend ID to backend specification
96    backends: HashMap<String, StorageBackendSpec>,
97    /// Optional index backend plugin for persistent storage (RocksDB, Redis/Garnet)
98    plugin: Option<Arc<dyn IndexBackendPlugin>>,
99}
100
101impl fmt::Debug for IndexFactory {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        f.debug_struct("IndexFactory")
104            .field("backends", &self.backends)
105            .field("plugin", &self.plugin.as_ref().map(|_| "<plugin>"))
106            .finish()
107    }
108}
109
110impl IndexFactory {
111    /// Create a new IndexFactory from a list of backend configurations
112    ///
113    /// # Arguments
114    ///
115    /// * `backends` - List of storage backend configurations
116    /// * `plugin` - Optional index backend plugin for persistent storage (RocksDB, Redis/Garnet).
117    ///   When using RocksDB or Redis backends, this plugin MUST be provided.
118    ///
119    /// # Example
120    /// ```no_run
121    /// # use drasi_lib::indexes::{IndexFactory, StorageBackendConfig, StorageBackendSpec};
122    /// // For in-memory only (no plugin needed)
123    /// let backends = vec![
124    ///     StorageBackendConfig {
125    ///         id: "memory_test".to_string(),
126    ///         spec: StorageBackendSpec::Memory {
127    ///             enable_archive: true,
128    ///         },
129    ///     },
130    /// ];
131    /// let factory = IndexFactory::new(backends, None);
132    ///
133    /// // For persistent storage (plugin required)
134    /// // use drasi_index_rocksdb::RocksDbIndexProvider;
135    /// // let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
136    /// // let factory = IndexFactory::new(backends, Some(Arc::new(provider)));
137    /// ```
138    pub fn new(
139        backends: Vec<StorageBackendConfig>,
140        plugin: Option<Arc<dyn IndexBackendPlugin>>,
141    ) -> Self {
142        let backends = backends.into_iter().map(|b| (b.id, b.spec)).collect();
143        Self { backends, plugin }
144    }
145
146    /// Build an IndexSet for a query using the specified storage backend
147    ///
148    /// # Arguments
149    /// * `backend_ref` - Reference to storage backend (named or inline)
150    /// * `query_id` - Unique identifier for the query
151    ///
152    /// # Errors
153    /// Returns `IndexError` if:
154    /// - Named backend reference doesn't exist
155    /// - Backend initialization fails (connection, path, etc.)
156    /// - Invalid configuration
157    pub async fn build(
158        &self,
159        backend_ref: &StorageBackendRef,
160        query_id: &str,
161    ) -> Result<IndexSet, IndexError> {
162        let spec = match backend_ref {
163            StorageBackendRef::Named(name) => self
164                .backends
165                .get(name)
166                .ok_or_else(|| IndexError::UnknownStore(name.clone()))?,
167            StorageBackendRef::Inline(spec) => spec,
168        };
169
170        self.build_from_spec(spec, query_id).await
171    }
172
173    /// Build an IndexSet from a storage backend specification
174    async fn build_from_spec(
175        &self,
176        spec: &StorageBackendSpec,
177        query_id: &str,
178    ) -> Result<IndexSet, IndexError> {
179        // Validate configuration before building
180        spec.validate().map_err(IndexError::InitializationFailed)?;
181
182        match spec {
183            StorageBackendSpec::Memory { enable_archive } => {
184                self.build_memory_indexes(*enable_archive)
185            }
186            StorageBackendSpec::RocksDb { .. } | StorageBackendSpec::Redis { .. } => {
187                // Delegate to the plugin for persistent storage backends
188                match &self.plugin {
189                    Some(plugin) => self.build_from_plugin(plugin, query_id).await,
190                    None => Err(IndexError::InitializationFailed(
191                        "RocksDB or Redis backend requested but no index provider configured. \
192                         Use DrasiLib::builder().with_index_provider(...) to provide one."
193                            .to_string(),
194                    )),
195                }
196            }
197        }
198    }
199
200    /// Build in-memory indexes
201    fn build_memory_indexes(&self, enable_archive: bool) -> Result<IndexSet, IndexError> {
202        let mut element_index = InMemoryElementIndex::new();
203        if enable_archive {
204            element_index.enable_archive();
205        }
206        let element_index = Arc::new(element_index);
207        let result_index = InMemoryResultIndex::new();
208        let future_queue = InMemoryFutureQueue::new();
209
210        Ok(IndexSet {
211            element_index: element_index.clone(),
212            archive_index: element_index,
213            result_index: Arc::new(result_index),
214            future_queue: Arc::new(future_queue),
215        })
216    }
217
218    /// Build indexes using the provided plugin
219    async fn build_from_plugin(
220        &self,
221        plugin: &Arc<dyn IndexBackendPlugin>,
222        query_id: &str,
223    ) -> Result<IndexSet, IndexError> {
224        let element_index = plugin.create_element_index(query_id).await.map_err(|e| {
225            log::error!("Failed to create element index for query '{query_id}': {e}");
226            IndexError::InitializationFailed(format!(
227                "Failed to create element index for query '{query_id}': {e}"
228            ))
229        })?;
230
231        let archive_index = plugin.create_archive_index(query_id).await.map_err(|e| {
232            log::error!("Failed to create archive index for query '{query_id}': {e}");
233            IndexError::InitializationFailed(format!(
234                "Failed to create archive index for query '{query_id}': {e}"
235            ))
236        })?;
237
238        let result_index = plugin.create_result_index(query_id).await.map_err(|e| {
239            log::error!("Failed to create result index for query '{query_id}': {e}");
240            IndexError::InitializationFailed(format!(
241                "Failed to create result index for query '{query_id}': {e}"
242            ))
243        })?;
244
245        let future_queue = plugin.create_future_queue(query_id).await.map_err(|e| {
246            log::error!("Failed to create future queue for query '{query_id}': {e}");
247            IndexError::InitializationFailed(format!(
248                "Failed to create future queue for query '{query_id}': {e}"
249            ))
250        })?;
251
252        Ok(IndexSet {
253            element_index,
254            archive_index,
255            result_index,
256            future_queue,
257        })
258    }
259
260    /// Check if a storage backend is volatile (requires re-bootstrap after restart)
261    ///
262    /// # Returns
263    /// - `true` for Memory backend (no persistence)
264    /// - `false` for RocksDB and Redis backends (persistent)
265    pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
266        let spec = match backend_ref {
267            StorageBackendRef::Named(name) => match self.backends.get(name) {
268                Some(spec) => spec,
269                None => return false, // Unknown backend, assume not volatile
270            },
271            StorageBackendRef::Inline(spec) => spec,
272        };
273
274        spec.is_volatile()
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_index_factory_new() {
284        let backends = vec![
285            StorageBackendConfig {
286                id: "memory_test".to_string(),
287                spec: StorageBackendSpec::Memory {
288                    enable_archive: true,
289                },
290            },
291            StorageBackendConfig {
292                id: "rocks_test".to_string(),
293                spec: StorageBackendSpec::RocksDb {
294                    path: "/tmp/test".to_string(),
295                    enable_archive: false,
296                    direct_io: false,
297                },
298            },
299        ];
300
301        let factory = IndexFactory::new(backends, None);
302        assert_eq!(factory.backends.len(), 2);
303        assert!(factory.backends.contains_key("memory_test"));
304        assert!(factory.backends.contains_key("rocks_test"));
305    }
306
307    #[tokio::test]
308    async fn test_build_memory_indexes() {
309        let backends = vec![StorageBackendConfig {
310            id: "memory_test".to_string(),
311            spec: StorageBackendSpec::Memory {
312                enable_archive: true,
313            },
314        }];
315
316        let factory = IndexFactory::new(backends, None);
317        let backend_ref = StorageBackendRef::Named("memory_test".to_string());
318        let result = factory.build(&backend_ref, "test_query").await;
319
320        assert!(result.is_ok());
321    }
322
323    #[tokio::test]
324    async fn test_build_unknown_backend() {
325        let factory = IndexFactory::new(vec![], None);
326        let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
327        let result = factory.build(&backend_ref, "test_query").await;
328
329        assert!(result.is_err());
330        match result.unwrap_err() {
331            IndexError::UnknownStore(name) => {
332                assert_eq!(name, "nonexistent");
333            }
334            _ => panic!("Expected UnknownStore error"),
335        }
336    }
337
338    #[tokio::test]
339    async fn test_build_inline_memory() {
340        let factory = IndexFactory::new(vec![], None);
341        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
342            enable_archive: false,
343        });
344        let result = factory.build(&backend_ref, "test_query").await;
345
346        assert!(result.is_ok());
347    }
348
349    #[tokio::test]
350    async fn test_build_rocksdb_without_plugin_errors() {
351        // Verify that attempting to use RocksDB without a plugin returns an error
352        let factory = IndexFactory::new(vec![], None);
353        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
354            path: "/data/test".to_string(),
355            enable_archive: false,
356            direct_io: false,
357        });
358        let result = factory.build(&backend_ref, "test_query").await;
359
360        assert!(result.is_err());
361        match result.unwrap_err() {
362            IndexError::InitializationFailed(msg) => {
363                assert!(msg.contains("no index provider configured"));
364            }
365            _ => panic!("Expected InitializationFailed error"),
366        }
367    }
368
369    #[tokio::test]
370    async fn test_build_redis_without_plugin_errors() {
371        // Verify that attempting to use Redis without a plugin returns an error
372        let factory = IndexFactory::new(vec![], None);
373        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
374            connection_string: "redis://localhost:6379".to_string(), // DevSkim: ignore DS162092
375            cache_size: None,
376        });
377        let result = factory.build(&backend_ref, "test_query").await;
378
379        assert!(result.is_err());
380        match result.unwrap_err() {
381            IndexError::InitializationFailed(msg) => {
382                assert!(msg.contains("no index provider configured"));
383            }
384            _ => panic!("Expected InitializationFailed error"),
385        }
386    }
387
388    #[test]
389    fn test_is_volatile_memory() {
390        let backends = vec![StorageBackendConfig {
391            id: "memory_test".to_string(),
392            spec: StorageBackendSpec::Memory {
393                enable_archive: false,
394            },
395        }];
396
397        let factory = IndexFactory::new(backends, None);
398        let backend_ref = StorageBackendRef::Named("memory_test".to_string());
399        assert!(factory.is_volatile(&backend_ref));
400    }
401
402    #[test]
403    fn test_is_volatile_rocksdb() {
404        let backends = vec![StorageBackendConfig {
405            id: "rocks_test".to_string(),
406            spec: StorageBackendSpec::RocksDb {
407                path: "/data/test".to_string(),
408                enable_archive: false,
409                direct_io: false,
410            },
411        }];
412
413        let factory = IndexFactory::new(backends, None);
414        let backend_ref = StorageBackendRef::Named("rocks_test".to_string());
415        assert!(!factory.is_volatile(&backend_ref));
416    }
417
418    #[test]
419    fn test_is_volatile_inline() {
420        let factory = IndexFactory::new(vec![], None);
421        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
422            enable_archive: false,
423        });
424        assert!(factory.is_volatile(&backend_ref));
425
426        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
427            path: "/data/test".to_string(),
428            enable_archive: false,
429            direct_io: false,
430        });
431        assert!(!factory.is_volatile(&backend_ref));
432    }
433
434    #[test]
435    fn test_is_volatile_inline_redis() {
436        let factory = IndexFactory::new(vec![], None);
437        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
438            connection_string: "redis://localhost:6379".to_string(), // DevSkim: ignore DS162092
439            cache_size: Some(1000),
440        });
441        assert!(!factory.is_volatile(&backend_ref));
442    }
443
444    #[test]
445    fn test_is_volatile_unknown_backend() {
446        // When a named backend doesn't exist, is_volatile returns false
447        let factory = IndexFactory::new(vec![], None);
448        let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
449        assert!(!factory.is_volatile(&backend_ref));
450    }
451
452    #[test]
453    fn test_index_error_display_unknown_store() {
454        let error = IndexError::UnknownStore("my_backend".to_string());
455        let display = format!("{error}");
456        assert!(display.contains("Unknown storage backend"));
457        assert!(display.contains("my_backend"));
458    }
459
460    #[test]
461    fn test_index_error_display_connection_failed() {
462        let error = IndexError::ConnectionFailed("Connection refused".to_string());
463        let display = format!("{error}");
464        assert!(display.contains("Failed to connect"));
465        assert!(display.contains("Connection refused"));
466    }
467
468    #[test]
469    fn test_index_error_display_path_error() {
470        let error = IndexError::PathError("/invalid/path".to_string());
471        let display = format!("{error}");
472        assert!(display.contains("Storage path error"));
473        assert!(display.contains("/invalid/path"));
474    }
475
476    #[test]
477    fn test_index_error_display_initialization_failed() {
478        let error = IndexError::InitializationFailed("Database init failed".to_string());
479        let display = format!("{error}");
480        assert!(display.contains("Failed to initialize"));
481        assert!(display.contains("Database init failed"));
482    }
483
484    #[test]
485    fn test_index_error_display_not_supported() {
486        let error = IndexError::NotSupported;
487        let display = format!("{error}");
488        assert!(display.contains("not supported"));
489    }
490
491    #[test]
492    fn test_index_error_is_std_error() {
493        let error = IndexError::UnknownStore("test".to_string());
494        // Verify it implements std::error::Error
495        let _: &dyn std::error::Error = &error;
496    }
497
498    #[test]
499    fn test_index_error_from_drasi_core_index_error() {
500        // Create a std::io::Error to wrap in IndexError::other
501        let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
502        let core_error = drasi_core::interface::IndexError::other(io_error);
503        let error: IndexError = core_error.into();
504        match error {
505            IndexError::InitializationFailed(msg) => {
506                assert!(msg.contains("test error"));
507            }
508            _ => panic!("Expected InitializationFailed error"),
509        }
510    }
511
512    #[test]
513    fn test_index_set_debug() {
514        // We can't easily construct an IndexSet without going through the factory,
515        // but we can test via build
516        let backends = vec![StorageBackendConfig {
517            id: "memory_test".to_string(),
518            spec: StorageBackendSpec::Memory {
519                enable_archive: false,
520            },
521        }];
522        let factory = IndexFactory::new(backends, None);
523
524        // Use tokio runtime for async test
525        let rt = tokio::runtime::Runtime::new().unwrap();
526        let index_set = rt
527            .block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
528            .unwrap();
529
530        let debug_str = format!("{index_set:?}");
531        assert!(debug_str.contains("IndexSet"));
532        assert!(debug_str.contains("element_index"));
533        assert!(debug_str.contains("archive_index"));
534        assert!(debug_str.contains("result_index"));
535        assert!(debug_str.contains("future_queue"));
536    }
537
538    #[test]
539    fn test_index_factory_debug() {
540        let backends = vec![StorageBackendConfig {
541            id: "memory_test".to_string(),
542            spec: StorageBackendSpec::Memory {
543                enable_archive: true,
544            },
545        }];
546        let factory = IndexFactory::new(backends, None);
547        let debug_str = format!("{factory:?}");
548        assert!(debug_str.contains("IndexFactory"));
549        assert!(debug_str.contains("backends"));
550        assert!(debug_str.contains("memory_test"));
551    }
552
553    #[test]
554    fn test_index_factory_debug_with_plugin() {
555        use crate::indexes::IndexBackendPlugin;
556        use async_trait::async_trait;
557
558        // Create a mock plugin for testing
559        struct MockPlugin;
560
561        #[async_trait]
562        impl IndexBackendPlugin for MockPlugin {
563            async fn create_element_index(
564                &self,
565                _query_id: &str,
566            ) -> Result<
567                Arc<dyn drasi_core::interface::ElementIndex>,
568                drasi_core::interface::IndexError,
569            > {
570                unimplemented!()
571            }
572
573            async fn create_archive_index(
574                &self,
575                _query_id: &str,
576            ) -> Result<
577                Arc<dyn drasi_core::interface::ElementArchiveIndex>,
578                drasi_core::interface::IndexError,
579            > {
580                unimplemented!()
581            }
582
583            async fn create_result_index(
584                &self,
585                _query_id: &str,
586            ) -> Result<
587                Arc<dyn drasi_core::interface::ResultIndex>,
588                drasi_core::interface::IndexError,
589            > {
590                unimplemented!()
591            }
592
593            async fn create_future_queue(
594                &self,
595                _query_id: &str,
596            ) -> Result<
597                Arc<dyn drasi_core::interface::FutureQueue>,
598                drasi_core::interface::IndexError,
599            > {
600                unimplemented!()
601            }
602
603            fn is_volatile(&self) -> bool {
604                false
605            }
606        }
607
608        let factory = IndexFactory::new(vec![], Some(Arc::new(MockPlugin)));
609        let debug_str = format!("{factory:?}");
610        assert!(debug_str.contains("IndexFactory"));
611        assert!(debug_str.contains("plugin"));
612        assert!(debug_str.contains("<plugin>"));
613    }
614
615    #[tokio::test]
616    async fn test_build_memory_without_archive() {
617        let factory = IndexFactory::new(vec![], None);
618        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
619            enable_archive: false,
620        });
621        let result = factory.build(&backend_ref, "test_query").await;
622        assert!(result.is_ok());
623    }
624
625    #[tokio::test]
626    async fn test_build_memory_with_archive() {
627        let factory = IndexFactory::new(vec![], None);
628        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
629            enable_archive: true,
630        });
631        let result = factory.build(&backend_ref, "test_query").await;
632        assert!(result.is_ok());
633    }
634}