Skip to main content

drasi_lib/indexes/
factory.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
16use crate::indexes::IndexBackendPlugin;
17use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
18use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
19use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
20use drasi_core::interface::{IndexSet, NoOpSessionControl};
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25/// Error type for index factory operations
26#[derive(Debug)]
27pub enum IndexError {
28    /// Referenced storage backend does not exist
29    UnknownStore(String),
30    /// Failed to connect to Redis/Garnet
31    ConnectionFailed(String),
32    /// RocksDB path error (doesn't exist, no permissions, etc.)
33    PathError(String),
34    /// Generic initialization failure
35    InitializationFailed(String),
36    /// Feature not supported
37    NotSupported,
38}
39
40impl fmt::Display for IndexError {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            IndexError::UnknownStore(name) => {
44                write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
45            }
46            IndexError::ConnectionFailed(details) => {
47                write!(f, "Failed to connect to storage backend: {details}")
48            }
49            IndexError::PathError(details) => {
50                write!(f, "Storage path error: {details}")
51            }
52            IndexError::InitializationFailed(details) => {
53                write!(f, "Failed to initialize storage backend: {details}")
54            }
55            IndexError::NotSupported => {
56                write!(f, "Operation not supported")
57            }
58        }
59    }
60}
61
62impl std::error::Error for IndexError {}
63
64impl From<drasi_core::interface::IndexError> for IndexError {
65    fn from(err: drasi_core::interface::IndexError) -> Self {
66        IndexError::InitializationFailed(err.to_string())
67    }
68}
69
70/// Factory for creating index sets based on storage backend configuration
71pub struct IndexFactory {
72    /// Map of backend ID to backend specification
73    backends: HashMap<String, StorageBackendSpec>,
74    /// Optional index backend plugin for persistent storage (RocksDB, Redis/Garnet)
75    plugin: Option<Arc<dyn IndexBackendPlugin>>,
76}
77
78impl fmt::Debug for IndexFactory {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        f.debug_struct("IndexFactory")
81            .field("backends", &self.backends)
82            .field("plugin", &self.plugin.as_ref().map(|_| "<plugin>"))
83            .finish()
84    }
85}
86
87impl IndexFactory {
88    /// Create a new IndexFactory from a list of backend configurations
89    ///
90    /// # Arguments
91    ///
92    /// * `backends` - List of storage backend configurations
93    /// * `plugin` - Optional index backend plugin for persistent storage (RocksDB, Redis/Garnet).
94    ///   When using RocksDB or Redis backends, this plugin MUST be provided.
95    ///
96    /// # Example
97    /// ```no_run
98    /// # use drasi_lib::indexes::{IndexFactory, StorageBackendConfig, StorageBackendSpec};
99    /// // For in-memory only (no plugin needed)
100    /// let backends = vec![
101    ///     StorageBackendConfig {
102    ///         id: "memory_test".to_string(),
103    ///         spec: StorageBackendSpec::Memory {
104    ///             enable_archive: true,
105    ///         },
106    ///     },
107    /// ];
108    /// let factory = IndexFactory::new(backends, None);
109    ///
110    /// // For persistent storage (plugin required)
111    /// // use drasi_index_rocksdb::RocksDbIndexProvider;
112    /// // let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
113    /// // let factory = IndexFactory::new(backends, Some(Arc::new(provider)));
114    /// ```
115    pub fn new(
116        backends: Vec<StorageBackendConfig>,
117        plugin: Option<Arc<dyn IndexBackendPlugin>>,
118    ) -> Self {
119        let backends = backends.into_iter().map(|b| (b.id, b.spec)).collect();
120        Self { backends, plugin }
121    }
122
123    /// Build an IndexSet for a query using the specified storage backend
124    ///
125    /// # Arguments
126    /// * `backend_ref` - Reference to storage backend (named or inline)
127    /// * `query_id` - Unique identifier for the query
128    ///
129    /// # Errors
130    /// Returns `IndexError` if:
131    /// - Named backend reference doesn't exist
132    /// - Backend initialization fails (connection, path, etc.)
133    /// - Invalid configuration
134    pub async fn build(
135        &self,
136        backend_ref: &StorageBackendRef,
137        query_id: &str,
138    ) -> Result<IndexSet, IndexError> {
139        let spec = match backend_ref {
140            StorageBackendRef::Named(name) => self
141                .backends
142                .get(name)
143                .ok_or_else(|| IndexError::UnknownStore(name.clone()))?,
144            StorageBackendRef::Inline(spec) => spec,
145        };
146
147        self.build_from_spec(spec, query_id).await
148    }
149
150    /// Build an IndexSet from a storage backend specification
151    async fn build_from_spec(
152        &self,
153        spec: &StorageBackendSpec,
154        query_id: &str,
155    ) -> Result<IndexSet, IndexError> {
156        // Validate configuration before building
157        spec.validate().map_err(IndexError::InitializationFailed)?;
158
159        match spec {
160            StorageBackendSpec::Memory { enable_archive } => {
161                self.build_memory_indexes(*enable_archive)
162            }
163            StorageBackendSpec::RocksDb { .. } | StorageBackendSpec::Redis { .. } => {
164                // Delegate to the plugin for persistent storage backends
165                match &self.plugin {
166                    Some(plugin) => self.build_from_plugin(plugin, query_id).await,
167                    None => Err(IndexError::InitializationFailed(
168                        "RocksDB or Redis backend requested but no index provider configured. \
169                         Use DrasiLib::builder().with_index_provider(...) to provide one."
170                            .to_string(),
171                    )),
172                }
173            }
174        }
175    }
176
177    /// Build in-memory indexes
178    fn build_memory_indexes(&self, enable_archive: bool) -> Result<IndexSet, IndexError> {
179        let mut element_index = InMemoryElementIndex::new();
180        if enable_archive {
181            element_index.enable_archive();
182        }
183        let element_index = Arc::new(element_index);
184        let result_index = InMemoryResultIndex::new();
185        let future_queue = InMemoryFutureQueue::new();
186
187        Ok(IndexSet {
188            element_index: element_index.clone(),
189            archive_index: element_index,
190            result_index: Arc::new(result_index),
191            future_queue: Arc::new(future_queue),
192            session_control: Arc::new(NoOpSessionControl),
193        })
194    }
195
196    /// Build indexes using the provided plugin
197    async fn build_from_plugin(
198        &self,
199        plugin: &Arc<dyn IndexBackendPlugin>,
200        query_id: &str,
201    ) -> Result<IndexSet, IndexError> {
202        plugin.create_index_set(query_id).await.map_err(|e| {
203            log::error!("Failed to create index set for query '{query_id}': {e}");
204            IndexError::InitializationFailed(format!(
205                "Failed to create index set for query '{query_id}': {e}"
206            ))
207        })
208    }
209
210    /// Check if a storage backend is volatile (requires re-bootstrap after restart)
211    ///
212    /// # Returns
213    /// - `true` for Memory backend (no persistence)
214    /// - `false` for RocksDB and Redis backends (persistent)
215    pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
216        let spec = match backend_ref {
217            StorageBackendRef::Named(name) => match self.backends.get(name) {
218                Some(spec) => spec,
219                None => return false, // Unknown backend, assume not volatile
220            },
221            StorageBackendRef::Inline(spec) => spec,
222        };
223
224        spec.is_volatile()
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn test_index_factory_new() {
234        let backends = vec![
235            StorageBackendConfig {
236                id: "memory_test".to_string(),
237                spec: StorageBackendSpec::Memory {
238                    enable_archive: true,
239                },
240            },
241            StorageBackendConfig {
242                id: "rocks_test".to_string(),
243                spec: StorageBackendSpec::RocksDb {
244                    path: "/tmp/test".to_string(),
245                    enable_archive: false,
246                    direct_io: false,
247                },
248            },
249        ];
250
251        let factory = IndexFactory::new(backends, None);
252        assert_eq!(factory.backends.len(), 2);
253        assert!(factory.backends.contains_key("memory_test"));
254        assert!(factory.backends.contains_key("rocks_test"));
255    }
256
257    #[tokio::test]
258    async fn test_build_memory_indexes() {
259        let backends = vec![StorageBackendConfig {
260            id: "memory_test".to_string(),
261            spec: StorageBackendSpec::Memory {
262                enable_archive: true,
263            },
264        }];
265
266        let factory = IndexFactory::new(backends, None);
267        let backend_ref = StorageBackendRef::Named("memory_test".to_string());
268        let result = factory.build(&backend_ref, "test_query").await;
269
270        assert!(result.is_ok());
271    }
272
273    #[tokio::test]
274    async fn test_build_unknown_backend() {
275        let factory = IndexFactory::new(vec![], None);
276        let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
277        let result = factory.build(&backend_ref, "test_query").await;
278
279        assert!(result.is_err());
280        match result.unwrap_err() {
281            IndexError::UnknownStore(name) => {
282                assert_eq!(name, "nonexistent");
283            }
284            _ => panic!("Expected UnknownStore error"),
285        }
286    }
287
288    #[tokio::test]
289    async fn test_build_inline_memory() {
290        let factory = IndexFactory::new(vec![], None);
291        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
292            enable_archive: false,
293        });
294        let result = factory.build(&backend_ref, "test_query").await;
295
296        assert!(result.is_ok());
297    }
298
299    #[tokio::test]
300    async fn test_build_rocksdb_without_plugin_errors() {
301        // Verify that attempting to use RocksDB without a plugin returns an error
302        let factory = IndexFactory::new(vec![], None);
303        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
304            path: "/data/test".to_string(),
305            enable_archive: false,
306            direct_io: false,
307        });
308        let result = factory.build(&backend_ref, "test_query").await;
309
310        assert!(result.is_err());
311        match result.unwrap_err() {
312            IndexError::InitializationFailed(msg) => {
313                assert!(msg.contains("no index provider configured"));
314            }
315            _ => panic!("Expected InitializationFailed error"),
316        }
317    }
318
319    #[tokio::test]
320    async fn test_build_redis_without_plugin_errors() {
321        // Verify that attempting to use Redis without a plugin returns an error
322        let factory = IndexFactory::new(vec![], None);
323        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
324            connection_string: "redis://localhost:6379".to_string(), // DevSkim: ignore DS162092
325            cache_size: None,
326        });
327        let result = factory.build(&backend_ref, "test_query").await;
328
329        assert!(result.is_err());
330        match result.unwrap_err() {
331            IndexError::InitializationFailed(msg) => {
332                assert!(msg.contains("no index provider configured"));
333            }
334            _ => panic!("Expected InitializationFailed error"),
335        }
336    }
337
338    #[test]
339    fn test_is_volatile_memory() {
340        let backends = vec![StorageBackendConfig {
341            id: "memory_test".to_string(),
342            spec: StorageBackendSpec::Memory {
343                enable_archive: false,
344            },
345        }];
346
347        let factory = IndexFactory::new(backends, None);
348        let backend_ref = StorageBackendRef::Named("memory_test".to_string());
349        assert!(factory.is_volatile(&backend_ref));
350    }
351
352    #[test]
353    fn test_is_volatile_rocksdb() {
354        let backends = vec![StorageBackendConfig {
355            id: "rocks_test".to_string(),
356            spec: StorageBackendSpec::RocksDb {
357                path: "/data/test".to_string(),
358                enable_archive: false,
359                direct_io: false,
360            },
361        }];
362
363        let factory = IndexFactory::new(backends, None);
364        let backend_ref = StorageBackendRef::Named("rocks_test".to_string());
365        assert!(!factory.is_volatile(&backend_ref));
366    }
367
368    #[test]
369    fn test_is_volatile_inline() {
370        let factory = IndexFactory::new(vec![], None);
371        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
372            enable_archive: false,
373        });
374        assert!(factory.is_volatile(&backend_ref));
375
376        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
377            path: "/data/test".to_string(),
378            enable_archive: false,
379            direct_io: false,
380        });
381        assert!(!factory.is_volatile(&backend_ref));
382    }
383
384    #[test]
385    fn test_is_volatile_inline_redis() {
386        let factory = IndexFactory::new(vec![], None);
387        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
388            connection_string: "redis://localhost:6379".to_string(), // DevSkim: ignore DS162092
389            cache_size: Some(1000),
390        });
391        assert!(!factory.is_volatile(&backend_ref));
392    }
393
394    #[test]
395    fn test_is_volatile_unknown_backend() {
396        // When a named backend doesn't exist, is_volatile returns false
397        let factory = IndexFactory::new(vec![], None);
398        let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
399        assert!(!factory.is_volatile(&backend_ref));
400    }
401
402    #[test]
403    fn test_index_error_display_unknown_store() {
404        let error = IndexError::UnknownStore("my_backend".to_string());
405        let display = format!("{error}");
406        assert!(display.contains("Unknown storage backend"));
407        assert!(display.contains("my_backend"));
408    }
409
410    #[test]
411    fn test_index_error_display_connection_failed() {
412        let error = IndexError::ConnectionFailed("Connection refused".to_string());
413        let display = format!("{error}");
414        assert!(display.contains("Failed to connect"));
415        assert!(display.contains("Connection refused"));
416    }
417
418    #[test]
419    fn test_index_error_display_path_error() {
420        let error = IndexError::PathError("/invalid/path".to_string());
421        let display = format!("{error}");
422        assert!(display.contains("Storage path error"));
423        assert!(display.contains("/invalid/path"));
424    }
425
426    #[test]
427    fn test_index_error_display_initialization_failed() {
428        let error = IndexError::InitializationFailed("Database init failed".to_string());
429        let display = format!("{error}");
430        assert!(display.contains("Failed to initialize"));
431        assert!(display.contains("Database init failed"));
432    }
433
434    #[test]
435    fn test_index_error_display_not_supported() {
436        let error = IndexError::NotSupported;
437        let display = format!("{error}");
438        assert!(display.contains("not supported"));
439    }
440
441    #[test]
442    fn test_index_error_is_std_error() {
443        let error = IndexError::UnknownStore("test".to_string());
444        // Verify it implements std::error::Error
445        let _: &dyn std::error::Error = &error;
446    }
447
448    #[test]
449    fn test_index_error_from_drasi_core_index_error() {
450        // Create a std::io::Error to wrap in IndexError::other
451        let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
452        let core_error = drasi_core::interface::IndexError::other(io_error);
453        let error: IndexError = core_error.into();
454        match error {
455            IndexError::InitializationFailed(msg) => {
456                assert!(msg.contains("test error"));
457            }
458            _ => panic!("Expected InitializationFailed error"),
459        }
460    }
461
462    #[test]
463    fn test_index_set_debug() {
464        // We can't easily construct an IndexSet without going through the factory,
465        // but we can test via build
466        let backends = vec![StorageBackendConfig {
467            id: "memory_test".to_string(),
468            spec: StorageBackendSpec::Memory {
469                enable_archive: false,
470            },
471        }];
472        let factory = IndexFactory::new(backends, None);
473
474        // Use tokio runtime for async test
475        let rt = tokio::runtime::Runtime::new().unwrap();
476        let index_set = rt
477            .block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
478            .unwrap();
479
480        let debug_str = format!("{index_set:?}");
481        assert!(debug_str.contains("IndexSet"));
482        assert!(debug_str.contains("element_index"));
483        assert!(debug_str.contains("archive_index"));
484        assert!(debug_str.contains("result_index"));
485        assert!(debug_str.contains("future_queue"));
486    }
487
488    #[test]
489    fn test_index_factory_debug() {
490        let backends = vec![StorageBackendConfig {
491            id: "memory_test".to_string(),
492            spec: StorageBackendSpec::Memory {
493                enable_archive: true,
494            },
495        }];
496        let factory = IndexFactory::new(backends, None);
497        let debug_str = format!("{factory:?}");
498        assert!(debug_str.contains("IndexFactory"));
499        assert!(debug_str.contains("backends"));
500        assert!(debug_str.contains("memory_test"));
501    }
502
503    #[test]
504    fn test_index_factory_debug_with_plugin() {
505        use crate::indexes::IndexBackendPlugin;
506        use async_trait::async_trait;
507
508        // Create a mock plugin for testing
509        struct MockPlugin;
510
511        #[async_trait]
512        impl IndexBackendPlugin for MockPlugin {
513            async fn create_index_set(
514                &self,
515                _query_id: &str,
516            ) -> Result<drasi_core::interface::IndexSet, drasi_core::interface::IndexError>
517            {
518                unimplemented!()
519            }
520
521            fn is_volatile(&self) -> bool {
522                false
523            }
524        }
525
526        let factory = IndexFactory::new(vec![], Some(Arc::new(MockPlugin)));
527        let debug_str = format!("{factory:?}");
528        assert!(debug_str.contains("IndexFactory"));
529        assert!(debug_str.contains("plugin"));
530        assert!(debug_str.contains("<plugin>"));
531    }
532
533    #[tokio::test]
534    async fn test_build_memory_without_archive() {
535        let factory = IndexFactory::new(vec![], None);
536        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
537            enable_archive: false,
538        });
539        let result = factory.build(&backend_ref, "test_query").await;
540        assert!(result.is_ok());
541    }
542
543    #[tokio::test]
544    async fn test_build_memory_with_archive() {
545        let factory = IndexFactory::new(vec![], None);
546        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
547            enable_archive: true,
548        });
549        let result = factory.build(&backend_ref, "test_query").await;
550        assert!(result.is_ok());
551    }
552}