Skip to main content

drasi_lib/indexes/
factory.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::indexes::config::{StorageBackendConfig, StorageBackendRef, StorageBackendSpec};
16use crate::indexes::IndexBackendPlugin;
17use drasi_core::in_memory_index::in_memory_element_index::InMemoryElementIndex;
18use drasi_core::in_memory_index::in_memory_future_queue::InMemoryFutureQueue;
19use drasi_core::in_memory_index::in_memory_result_index::InMemoryResultIndex;
20use drasi_core::interface::{CreatedIndexes, IndexSet, NoOpSessionControl};
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25/// Error type for index factory operations
26#[derive(Debug)]
27pub enum IndexError {
28    /// Referenced storage backend does not exist
29    UnknownStore(String),
30    /// Failed to connect to Redis/Garnet
31    ConnectionFailed(String),
32    /// RocksDB path error (doesn't exist, no permissions, etc.)
33    PathError(String),
34    /// Generic initialization failure
35    InitializationFailed(String),
36    /// Feature not supported
37    NotSupported,
38}
39
40impl fmt::Display for IndexError {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            IndexError::UnknownStore(name) => {
44                write!(f, "Unknown storage backend: '{name}'. Check that the backend is defined in storage_backends configuration.")
45            }
46            IndexError::ConnectionFailed(details) => {
47                write!(f, "Failed to connect to storage backend: {details}")
48            }
49            IndexError::PathError(details) => {
50                write!(f, "Storage path error: {details}")
51            }
52            IndexError::InitializationFailed(details) => {
53                write!(f, "Failed to initialize storage backend: {details}")
54            }
55            IndexError::NotSupported => {
56                write!(f, "Operation not supported")
57            }
58        }
59    }
60}
61
62impl std::error::Error for IndexError {}
63
64impl From<drasi_core::interface::IndexError> for IndexError {
65    fn from(err: drasi_core::interface::IndexError) -> Self {
66        IndexError::InitializationFailed(err.to_string())
67    }
68}
69
70/// Factory for creating index sets based on storage backend configuration
71pub struct IndexFactory {
72    /// Map of backend ID to backend specification
73    backends: HashMap<String, StorageBackendSpec>,
74    /// Optional index backend plugin for persistent storage (RocksDB, Redis/Garnet)
75    plugin: Option<Arc<dyn IndexBackendPlugin>>,
76}
77
78impl fmt::Debug for IndexFactory {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        f.debug_struct("IndexFactory")
81            .field("backends", &self.backends)
82            .field("plugin", &self.plugin.as_ref().map(|_| "<plugin>"))
83            .finish()
84    }
85}
86
87impl IndexFactory {
88    /// Create a new IndexFactory from a list of backend configurations
89    ///
90    /// # Arguments
91    ///
92    /// * `backends` - List of storage backend configurations
93    /// * `plugin` - Optional index backend plugin for persistent storage (RocksDB, Redis/Garnet).
94    ///   When using RocksDB or Redis backends, this plugin MUST be provided.
95    ///
96    /// # Example
97    /// ```no_run
98    /// # use drasi_lib::indexes::{IndexFactory, StorageBackendConfig, StorageBackendSpec};
99    /// // For in-memory only (no plugin needed)
100    /// let backends = vec![
101    ///     StorageBackendConfig {
102    ///         id: "memory_test".to_string(),
103    ///         spec: StorageBackendSpec::Memory {
104    ///             enable_archive: true,
105    ///         },
106    ///     },
107    /// ];
108    /// let factory = IndexFactory::new(backends, None);
109    ///
110    /// // For persistent storage (plugin required)
111    /// // use drasi_index_rocksdb::RocksDbIndexProvider;
112    /// // let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
113    /// // let factory = IndexFactory::new(backends, Some(Arc::new(provider)));
114    /// ```
115    pub fn new(
116        backends: Vec<StorageBackendConfig>,
117        plugin: Option<Arc<dyn IndexBackendPlugin>>,
118    ) -> Self {
119        let backends = backends.into_iter().map(|b| (b.id, b.spec)).collect();
120        Self { backends, plugin }
121    }
122
123    /// Build a CreatedIndexes for a query using the specified storage backend
124    ///
125    /// # Arguments
126    /// * `backend_ref` - Reference to storage backend (named or inline)
127    /// * `query_id` - Unique identifier for the query
128    ///
129    /// # Errors
130    /// Returns `IndexError` if:
131    /// - Named backend reference doesn't exist
132    /// - Backend initialization fails (connection, path, etc.)
133    /// - Invalid configuration
134    pub async fn build(
135        &self,
136        backend_ref: &StorageBackendRef,
137        query_id: &str,
138    ) -> Result<CreatedIndexes, IndexError> {
139        let spec = match backend_ref {
140            StorageBackendRef::Named(name) => self
141                .backends
142                .get(name)
143                .ok_or_else(|| IndexError::UnknownStore(name.clone()))?,
144            StorageBackendRef::Inline(spec) => spec,
145        };
146
147        self.build_from_spec(spec, query_id).await
148    }
149
150    /// Build a CreatedIndexes from a storage backend specification
151    async fn build_from_spec(
152        &self,
153        spec: &StorageBackendSpec,
154        query_id: &str,
155    ) -> Result<CreatedIndexes, IndexError> {
156        // Validate configuration before building
157        spec.validate().map_err(IndexError::InitializationFailed)?;
158
159        match spec {
160            StorageBackendSpec::Memory { enable_archive } => {
161                self.build_memory_indexes(*enable_archive)
162            }
163            StorageBackendSpec::RocksDb { .. } | StorageBackendSpec::Redis { .. } => {
164                // Delegate to the plugin for persistent storage backends
165                match &self.plugin {
166                    Some(plugin) => self.build_from_plugin(plugin, query_id).await,
167                    None => Err(IndexError::InitializationFailed(
168                        "RocksDB or Redis backend requested but no index provider configured. \
169                         Use DrasiLib::builder().with_index_provider(...) to provide one."
170                            .to_string(),
171                    )),
172                }
173            }
174        }
175    }
176
177    /// Build in-memory indexes (returns checkpoint_store: None — caller provides InMemoryCheckpointStore)
178    fn build_memory_indexes(&self, enable_archive: bool) -> Result<CreatedIndexes, IndexError> {
179        let mut element_index = InMemoryElementIndex::new();
180        if enable_archive {
181            element_index.enable_archive();
182        }
183        let element_index = Arc::new(element_index);
184        let result_index = InMemoryResultIndex::new();
185        let future_queue = InMemoryFutureQueue::new();
186
187        Ok(CreatedIndexes {
188            set: IndexSet {
189                element_index: element_index.clone(),
190                archive_index: element_index,
191                result_index: Arc::new(result_index),
192                future_queue: Arc::new(future_queue),
193                session_control: Arc::new(NoOpSessionControl),
194            },
195            checkpoint_store: None,
196        })
197    }
198
199    /// Build indexes using the provided plugin
200    async fn build_from_plugin(
201        &self,
202        plugin: &Arc<dyn IndexBackendPlugin>,
203        query_id: &str,
204    ) -> Result<CreatedIndexes, IndexError> {
205        plugin.create_indexes(query_id).await.map_err(|e| {
206            log::error!("Failed to create indexes for query '{query_id}': {e}");
207            IndexError::InitializationFailed(format!(
208                "Failed to create indexes for query '{query_id}': {e}"
209            ))
210        })
211    }
212
213    /// Check if a storage backend is volatile (requires re-bootstrap after restart)
214    ///
215    /// # Returns
216    /// - `true` for Memory backend (no persistence)
217    /// - `false` for RocksDB and Redis backends (persistent)
218    pub fn is_volatile(&self, backend_ref: &StorageBackendRef) -> bool {
219        let spec = match backend_ref {
220            StorageBackendRef::Named(name) => match self.backends.get(name) {
221                Some(spec) => spec,
222                None => return false, // Unknown backend, assume not volatile
223            },
224            StorageBackendRef::Inline(spec) => spec,
225        };
226
227        spec.is_volatile()
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    #[test]
236    fn test_index_factory_new() {
237        let backends = vec![
238            StorageBackendConfig {
239                id: "memory_test".to_string(),
240                spec: StorageBackendSpec::Memory {
241                    enable_archive: true,
242                },
243            },
244            StorageBackendConfig {
245                id: "rocks_test".to_string(),
246                spec: StorageBackendSpec::RocksDb {
247                    path: "/tmp/test".to_string(),
248                    enable_archive: false,
249                    direct_io: false,
250                },
251            },
252        ];
253
254        let factory = IndexFactory::new(backends, None);
255        assert_eq!(factory.backends.len(), 2);
256        assert!(factory.backends.contains_key("memory_test"));
257        assert!(factory.backends.contains_key("rocks_test"));
258    }
259
260    #[tokio::test]
261    async fn test_build_memory_indexes() {
262        let backends = vec![StorageBackendConfig {
263            id: "memory_test".to_string(),
264            spec: StorageBackendSpec::Memory {
265                enable_archive: true,
266            },
267        }];
268
269        let factory = IndexFactory::new(backends, None);
270        let backend_ref = StorageBackendRef::Named("memory_test".to_string());
271        let result = factory.build(&backend_ref, "test_query").await;
272
273        assert!(result.is_ok());
274    }
275
276    #[tokio::test]
277    async fn test_build_unknown_backend() {
278        let factory = IndexFactory::new(vec![], None);
279        let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
280        let result = factory.build(&backend_ref, "test_query").await;
281
282        assert!(result.is_err());
283        match result.unwrap_err() {
284            IndexError::UnknownStore(name) => {
285                assert_eq!(name, "nonexistent");
286            }
287            _ => panic!("Expected UnknownStore error"),
288        }
289    }
290
291    #[tokio::test]
292    async fn test_build_inline_memory() {
293        let factory = IndexFactory::new(vec![], None);
294        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
295            enable_archive: false,
296        });
297        let result = factory.build(&backend_ref, "test_query").await;
298
299        assert!(result.is_ok());
300    }
301
302    #[tokio::test]
303    async fn test_build_rocksdb_without_plugin_errors() {
304        // Verify that attempting to use RocksDB without a plugin returns an error
305        let factory = IndexFactory::new(vec![], None);
306        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
307            path: "/data/test".to_string(),
308            enable_archive: false,
309            direct_io: false,
310        });
311        let result = factory.build(&backend_ref, "test_query").await;
312
313        assert!(result.is_err());
314        match result.unwrap_err() {
315            IndexError::InitializationFailed(msg) => {
316                assert!(msg.contains("no index provider configured"));
317            }
318            _ => panic!("Expected InitializationFailed error"),
319        }
320    }
321
322    #[tokio::test]
323    async fn test_build_redis_without_plugin_errors() {
324        // Verify that attempting to use Redis without a plugin returns an error
325        let factory = IndexFactory::new(vec![], None);
326        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
327            connection_string: "redis://localhost:6379".to_string(), // DevSkim: ignore DS162092
328            cache_size: None,
329        });
330        let result = factory.build(&backend_ref, "test_query").await;
331
332        assert!(result.is_err());
333        match result.unwrap_err() {
334            IndexError::InitializationFailed(msg) => {
335                assert!(msg.contains("no index provider configured"));
336            }
337            _ => panic!("Expected InitializationFailed error"),
338        }
339    }
340
341    #[test]
342    fn test_is_volatile_memory() {
343        let backends = vec![StorageBackendConfig {
344            id: "memory_test".to_string(),
345            spec: StorageBackendSpec::Memory {
346                enable_archive: false,
347            },
348        }];
349
350        let factory = IndexFactory::new(backends, None);
351        let backend_ref = StorageBackendRef::Named("memory_test".to_string());
352        assert!(factory.is_volatile(&backend_ref));
353    }
354
355    #[test]
356    fn test_is_volatile_rocksdb() {
357        let backends = vec![StorageBackendConfig {
358            id: "rocks_test".to_string(),
359            spec: StorageBackendSpec::RocksDb {
360                path: "/data/test".to_string(),
361                enable_archive: false,
362                direct_io: false,
363            },
364        }];
365
366        let factory = IndexFactory::new(backends, None);
367        let backend_ref = StorageBackendRef::Named("rocks_test".to_string());
368        assert!(!factory.is_volatile(&backend_ref));
369    }
370
371    #[test]
372    fn test_is_volatile_inline() {
373        let factory = IndexFactory::new(vec![], None);
374        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
375            enable_archive: false,
376        });
377        assert!(factory.is_volatile(&backend_ref));
378
379        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::RocksDb {
380            path: "/data/test".to_string(),
381            enable_archive: false,
382            direct_io: false,
383        });
384        assert!(!factory.is_volatile(&backend_ref));
385    }
386
387    #[test]
388    fn test_is_volatile_inline_redis() {
389        let factory = IndexFactory::new(vec![], None);
390        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Redis {
391            connection_string: "redis://localhost:6379".to_string(), // DevSkim: ignore DS162092
392            cache_size: Some(1000),
393        });
394        assert!(!factory.is_volatile(&backend_ref));
395    }
396
397    #[test]
398    fn test_is_volatile_unknown_backend() {
399        // When a named backend doesn't exist, is_volatile returns false
400        let factory = IndexFactory::new(vec![], None);
401        let backend_ref = StorageBackendRef::Named("nonexistent".to_string());
402        assert!(!factory.is_volatile(&backend_ref));
403    }
404
405    #[test]
406    fn test_index_error_display_unknown_store() {
407        let error = IndexError::UnknownStore("my_backend".to_string());
408        let display = format!("{error}");
409        assert!(display.contains("Unknown storage backend"));
410        assert!(display.contains("my_backend"));
411    }
412
413    #[test]
414    fn test_index_error_display_connection_failed() {
415        let error = IndexError::ConnectionFailed("Connection refused".to_string());
416        let display = format!("{error}");
417        assert!(display.contains("Failed to connect"));
418        assert!(display.contains("Connection refused"));
419    }
420
421    #[test]
422    fn test_index_error_display_path_error() {
423        let error = IndexError::PathError("/invalid/path".to_string());
424        let display = format!("{error}");
425        assert!(display.contains("Storage path error"));
426        assert!(display.contains("/invalid/path"));
427    }
428
429    #[test]
430    fn test_index_error_display_initialization_failed() {
431        let error = IndexError::InitializationFailed("Database init failed".to_string());
432        let display = format!("{error}");
433        assert!(display.contains("Failed to initialize"));
434        assert!(display.contains("Database init failed"));
435    }
436
437    #[test]
438    fn test_index_error_display_not_supported() {
439        let error = IndexError::NotSupported;
440        let display = format!("{error}");
441        assert!(display.contains("not supported"));
442    }
443
444    #[test]
445    fn test_index_error_is_std_error() {
446        let error = IndexError::UnknownStore("test".to_string());
447        // Verify it implements std::error::Error
448        let _: &dyn std::error::Error = &error;
449    }
450
451    #[test]
452    fn test_index_error_from_drasi_core_index_error() {
453        // Create a std::io::Error to wrap in IndexError::other
454        let io_error = std::io::Error::new(std::io::ErrorKind::Other, "test error");
455        let core_error = drasi_core::interface::IndexError::other(io_error);
456        let error: IndexError = core_error.into();
457        match error {
458            IndexError::InitializationFailed(msg) => {
459                assert!(msg.contains("test error"));
460            }
461            _ => panic!("Expected InitializationFailed error"),
462        }
463    }
464
465    #[test]
466    fn test_index_set_debug() {
467        // We can't easily construct an IndexSet without going through the factory,
468        // but we can test via build
469        let backends = vec![StorageBackendConfig {
470            id: "memory_test".to_string(),
471            spec: StorageBackendSpec::Memory {
472                enable_archive: false,
473            },
474        }];
475        let factory = IndexFactory::new(backends, None);
476
477        // Use tokio runtime for async test
478        let rt = tokio::runtime::Runtime::new().unwrap();
479        let index_set = rt
480            .block_on(factory.build(&StorageBackendRef::Named("memory_test".to_string()), "q1"))
481            .unwrap();
482
483        let debug_str = format!("{index_set:?}");
484        assert!(debug_str.contains("IndexSet"));
485        assert!(debug_str.contains("element_index"));
486        assert!(debug_str.contains("archive_index"));
487        assert!(debug_str.contains("result_index"));
488        assert!(debug_str.contains("future_queue"));
489    }
490
491    #[test]
492    fn test_index_factory_debug() {
493        let backends = vec![StorageBackendConfig {
494            id: "memory_test".to_string(),
495            spec: StorageBackendSpec::Memory {
496                enable_archive: true,
497            },
498        }];
499        let factory = IndexFactory::new(backends, None);
500        let debug_str = format!("{factory:?}");
501        assert!(debug_str.contains("IndexFactory"));
502        assert!(debug_str.contains("backends"));
503        assert!(debug_str.contains("memory_test"));
504    }
505
506    #[test]
507    fn test_index_factory_debug_with_plugin() {
508        use crate::indexes::IndexBackendPlugin;
509        use async_trait::async_trait;
510
511        // Create a mock plugin for testing
512        struct MockPlugin;
513
514        #[async_trait]
515        impl IndexBackendPlugin for MockPlugin {
516            async fn create_indexes(
517                &self,
518                _query_id: &str,
519            ) -> Result<drasi_core::interface::CreatedIndexes, drasi_core::interface::IndexError>
520            {
521                unimplemented!()
522            }
523
524            fn is_volatile(&self) -> bool {
525                false
526            }
527        }
528
529        let factory = IndexFactory::new(vec![], Some(Arc::new(MockPlugin)));
530        let debug_str = format!("{factory:?}");
531        assert!(debug_str.contains("IndexFactory"));
532        assert!(debug_str.contains("plugin"));
533        assert!(debug_str.contains("<plugin>"));
534    }
535
536    #[tokio::test]
537    async fn test_build_memory_without_archive() {
538        let factory = IndexFactory::new(vec![], None);
539        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
540            enable_archive: false,
541        });
542        let result = factory.build(&backend_ref, "test_query").await;
543        assert!(result.is_ok());
544    }
545
546    #[tokio::test]
547    async fn test_build_memory_with_archive() {
548        let factory = IndexFactory::new(vec![], None);
549        let backend_ref = StorageBackendRef::Inline(StorageBackendSpec::Memory {
550            enable_archive: true,
551        });
552        let result = factory.build(&backend_ref, "test_query").await;
553        assert!(result.is_ok());
554    }
555}