Skip to main content

drasi_lib/
builder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//!     .with_id("my-server")
47//!     // .with_source(my_source)      // Ownership transferred
48//!     // .with_reaction(my_reaction)  // Ownership transferred
49//!     .with_query(
50//!         Query::cypher("my-query")
51//!             .query("MATCH (n:Person) RETURN n")
52//!             .from_source("events")
53//!             .build()
54//!     )
55//!     .build()
56//!     .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67    DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::indexes::IndexBackendPlugin;
71use crate::indexes::StorageBackendConfig;
72use crate::lib_core::DrasiLib;
73use crate::reactions::Reaction as ReactionTrait;
74use crate::sources::Source as SourceTrait;
75use crate::state_store::StateStoreProvider;
76use drasi_core::models::SourceMiddlewareConfig;
77
78// ============================================================================
79// DrasiLibBuilder
80// ============================================================================
81
82/// Fluent builder for creating DrasiLib instances.
83///
84/// Use `DrasiLib::builder()` to get started.
85///
86/// # Plugin Architecture
87///
88/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
89/// reactions are created externally as fully-configured instances implementing
90/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
91///
92/// # Example
93///
94/// ```no_run
95/// use drasi_lib::{DrasiLib, Query};
96///
97/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98/// // Source and reaction instances are created externally by plugins
99/// // Ownership is transferred to DrasiLib when added
100/// // let my_source = my_source_plugin::create(...);
101/// // let my_reaction = my_reaction_plugin::create(...);
102///
103/// let core = DrasiLib::builder()
104///     .with_id("my-server")
105///     // .with_source(my_source)      // Ownership transferred
106///     // .with_reaction(my_reaction)  // Ownership transferred
107///     .with_query(
108///         Query::cypher("my-query")
109///             .query("MATCH (n) RETURN n")
110///             .from_source("my-source")
111///             .build()
112///     )
113///     .build()
114///     .await?;
115/// # Ok(())
116/// # }
117/// ```
118pub struct DrasiLibBuilder {
119    server_id: Option<String>,
120    priority_queue_capacity: Option<usize>,
121    dispatch_buffer_capacity: Option<usize>,
122    storage_backends: Vec<StorageBackendConfig>,
123    query_configs: Vec<QueryConfig>,
124    source_instances: Vec<Box<dyn SourceTrait>>,
125    reaction_instances: Vec<Box<dyn ReactionTrait>>,
126    index_provider: Option<Arc<dyn IndexBackendPlugin>>,
127    state_store_provider: Option<Arc<dyn StateStoreProvider>>,
128}
129
130impl Default for DrasiLibBuilder {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl DrasiLibBuilder {
137    /// Create a new builder with default values.
138    pub fn new() -> Self {
139        Self {
140            server_id: None,
141            priority_queue_capacity: None,
142            dispatch_buffer_capacity: None,
143            storage_backends: Vec::new(),
144            query_configs: Vec::new(),
145            source_instances: Vec::new(),
146            reaction_instances: Vec::new(),
147            index_provider: None,
148            state_store_provider: None,
149        }
150    }
151
152    /// Set the server ID.
153    pub fn with_id(mut self, id: impl Into<String>) -> Self {
154        self.server_id = Some(id.into());
155        self
156    }
157
158    /// Set the default priority queue capacity for components.
159    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
160        self.priority_queue_capacity = Some(capacity);
161        self
162    }
163
164    /// Set the default dispatch buffer capacity for components.
165    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
166        self.dispatch_buffer_capacity = Some(capacity);
167        self
168    }
169
170    /// Add a storage backend configuration.
171    pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
172        self.storage_backends.push(config);
173        self
174    }
175
176    /// Set the index backend provider for persistent storage.
177    ///
178    /// When using RocksDB or Redis/Garnet storage backends, you must provide
179    /// an index provider that implements `IndexBackendPlugin`. The provider
180    /// is responsible for creating the actual index instances.
181    ///
182    /// If no index provider is set, only in-memory storage backends can be used.
183    /// Attempting to use RocksDB or Redis backends without a provider will result
184    /// in an error.
185    ///
186    /// # Example
187    /// ```ignore
188    /// use drasi_index_rocksdb::RocksDbIndexProvider;
189    /// use std::sync::Arc;
190    ///
191    /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
192    /// let core = DrasiLib::builder()
193    ///     .with_index_provider(Arc::new(provider))
194    ///     .build()
195    ///     .await?;
196    /// ```
197    pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
198        self.index_provider = Some(provider);
199        self
200    }
201
202    /// Set the state store provider for plugin state persistence.
203    ///
204    /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
205    /// to store and retrieve runtime state that can persist across runs of DrasiLib.
206    ///
207    /// If no state store provider is set, the default in-memory provider will be used.
208    /// The in-memory provider does not persist state across restarts.
209    ///
210    /// # Example
211    /// ```ignore
212    /// use drasi_state_store_json::JsonStateStoreProvider;
213    /// use std::sync::Arc;
214    ///
215    /// let state_store = JsonStateStoreProvider::new("/data/state");
216    /// let core = DrasiLib::builder()
217    ///     .with_state_store_provider(Arc::new(state_store))
218    ///     .build()
219    ///     .await?;
220    /// ```
221    pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
222        self.state_store_provider = Some(provider);
223        self
224    }
225
226    /// Add a source instance, taking ownership.
227    ///
228    /// Source instances are created externally by plugins with their own typed configurations.
229    /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
230    ///
231    /// # Example
232    /// ```ignore
233    /// let source = MySource::new("my-source", config)?;
234    /// let core = DrasiLib::builder()
235    ///     .with_source(source)  // Ownership transferred
236    ///     .build()
237    ///     .await?;
238    /// ```
239    pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
240        self.source_instances.push(Box::new(source));
241        self
242    }
243
244    /// Add a query configuration.
245    pub fn with_query(mut self, config: QueryConfig) -> Self {
246        self.query_configs.push(config);
247        self
248    }
249
250    /// Add a reaction instance, taking ownership.
251    ///
252    /// Reaction instances are created externally by plugins with their own typed configurations.
253    /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
254    ///
255    /// # Example
256    /// ```ignore
257    /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
258    /// let core = DrasiLib::builder()
259    ///     .with_reaction(reaction)  // Ownership transferred
260    ///     .build()
261    ///     .await?;
262    /// ```
263    pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
264        self.reaction_instances.push(Box::new(reaction));
265        self
266    }
267
268    /// Build the DrasiLib instance.
269    ///
270    /// This validates the configuration, creates all components, and initializes the server.
271    /// After building, you can call `start()` to begin processing.
272    pub async fn build(self) -> Result<DrasiLib> {
273        // Build the configuration
274        let config = DrasiLibConfig {
275            id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
276            priority_queue_capacity: self.priority_queue_capacity,
277            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
278            storage_backends: self.storage_backends,
279            queries: self.query_configs.clone(),
280        };
281
282        // Validate the configuration
283        config
284            .validate()
285            .map_err(|e| DrasiError::startup_validation(e.to_string()))?;
286
287        // Create runtime config and server with optional index and state store providers
288        let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
289            config,
290            self.index_provider,
291            self.state_store_provider,
292        ));
293        let mut core = DrasiLib::new(runtime_config);
294
295        // Initialize the server
296        core.initialize().await?;
297
298        // Inject pre-built source instances
299        for source in self.source_instances {
300            let source_id = source.id().to_string();
301            core.source_manager.add_source(source).await.map_err(|e| {
302                DrasiError::provisioning(format!(
303                    "Failed to add source instance '{source_id}': {e}"
304                ))
305            })?;
306        }
307
308        // Inject pre-built reaction instances
309        for reaction in self.reaction_instances {
310            let reaction_id = reaction.id().to_string();
311            core.reaction_manager
312                .add_reaction(reaction)
313                .await
314                .map_err(|e| {
315                    DrasiError::provisioning(format!(
316                        "Failed to add reaction instance '{reaction_id}': {e}"
317                    ))
318                })?;
319        }
320
321        Ok(core)
322    }
323}
324
325// ============================================================================
326// Query Builder
327// ============================================================================
328
329/// Fluent builder for query configurations.
330///
331/// Use `Query::cypher()` or `Query::gql()` to get started.
332///
333/// # Example
334///
335/// ```no_run
336/// use drasi_lib::Query;
337///
338/// let query_config = Query::cypher("my-query")
339///     .query("MATCH (n:Person) RETURN n.name, n.age")
340///     .from_source("my-source")
341///     .auto_start(true)
342///     .build();
343/// ```
344pub struct Query {
345    id: String,
346    query: String,
347    query_language: QueryLanguage,
348    sources: Vec<SourceSubscriptionConfig>,
349    middleware: Vec<SourceMiddlewareConfig>,
350    auto_start: bool,
351    joins: Option<Vec<QueryJoinConfig>>,
352    enable_bootstrap: bool,
353    bootstrap_buffer_size: usize,
354    priority_queue_capacity: Option<usize>,
355    dispatch_buffer_capacity: Option<usize>,
356    dispatch_mode: Option<DispatchMode>,
357    storage_backend: Option<crate::indexes::StorageBackendRef>,
358}
359
360impl Query {
361    /// Create a new Cypher query builder.
362    pub fn cypher(id: impl Into<String>) -> Self {
363        Self {
364            id: id.into(),
365            query: String::new(),
366            query_language: QueryLanguage::Cypher,
367            sources: Vec::new(),
368            middleware: Vec::new(),
369            auto_start: true,
370            joins: None,
371            enable_bootstrap: true,
372            bootstrap_buffer_size: 10000,
373            priority_queue_capacity: None,
374            dispatch_buffer_capacity: None,
375            dispatch_mode: None,
376            storage_backend: None,
377        }
378    }
379
380    /// Create a new GQL query builder.
381    pub fn gql(id: impl Into<String>) -> Self {
382        Self {
383            id: id.into(),
384            query: String::new(),
385            query_language: QueryLanguage::GQL,
386            sources: Vec::new(),
387            middleware: Vec::new(),
388            auto_start: true,
389            joins: None,
390            enable_bootstrap: true,
391            bootstrap_buffer_size: 10000,
392            priority_queue_capacity: None,
393            dispatch_buffer_capacity: None,
394            dispatch_mode: None,
395            storage_backend: None,
396        }
397    }
398
399    /// Set the query string.
400    pub fn query(mut self, query: impl Into<String>) -> Self {
401        self.query = query.into();
402        self
403    }
404
405    /// Subscribe to a source.
406    pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
407        self.sources.push(SourceSubscriptionConfig {
408            source_id: source_id.into(),
409            nodes: Vec::new(),
410            relations: Vec::new(),
411            pipeline: Vec::new(),
412        });
413        self
414    }
415
416    /// Subscribe to a source with a middleware pipeline.
417    ///
418    /// The pipeline is a list of middleware names (strings) that will be applied to
419    /// data from this source before it reaches the query.
420    pub fn from_source_with_pipeline(
421        mut self,
422        source_id: impl Into<String>,
423        pipeline: Vec<String>,
424    ) -> Self {
425        self.sources.push(SourceSubscriptionConfig {
426            source_id: source_id.into(),
427            nodes: Vec::new(),
428            relations: Vec::new(),
429            pipeline,
430        });
431        self
432    }
433
434    /// Add middleware to the query.
435    pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
436        self.middleware.push(middleware);
437        self
438    }
439
440    /// Set whether the query should auto-start.
441    pub fn auto_start(mut self, auto_start: bool) -> Self {
442        self.auto_start = auto_start;
443        self
444    }
445
446    /// Set the join configuration.
447    pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
448        self.joins = Some(joins);
449        self
450    }
451
452    /// Enable or disable bootstrap.
453    pub fn enable_bootstrap(mut self, enable: bool) -> Self {
454        self.enable_bootstrap = enable;
455        self
456    }
457
458    /// Set the bootstrap buffer size.
459    pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
460        self.bootstrap_buffer_size = size;
461        self
462    }
463
464    /// Set the priority queue capacity.
465    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
466        self.priority_queue_capacity = Some(capacity);
467        self
468    }
469
470    /// Set the dispatch buffer capacity.
471    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
472        self.dispatch_buffer_capacity = Some(capacity);
473        self
474    }
475
476    /// Set the dispatch mode.
477    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
478        self.dispatch_mode = Some(mode);
479        self
480    }
481
482    /// Set the storage backend reference.
483    pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
484        self.storage_backend = Some(backend);
485        self
486    }
487
488    /// Build the query configuration.
489    pub fn build(self) -> QueryConfig {
490        QueryConfig {
491            id: self.id,
492            query: self.query,
493            query_language: self.query_language,
494            sources: self.sources,
495            middleware: self.middleware,
496            auto_start: self.auto_start,
497            joins: self.joins,
498            enable_bootstrap: self.enable_bootstrap,
499            bootstrap_buffer_size: self.bootstrap_buffer_size,
500            priority_queue_capacity: self.priority_queue_capacity,
501            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
502            dispatch_mode: self.dispatch_mode,
503            storage_backend: self.storage_backend,
504        }
505    }
506}
507
508// ============================================================================
509// Tests
510// ============================================================================
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use crate::DrasiLib;
516
517    // ==========================================================================
518    // Query Builder Tests
519    // ==========================================================================
520
521    #[test]
522    fn test_query_builder_cypher() {
523        let config = Query::cypher("test-query")
524            .query("MATCH (n) RETURN n")
525            .from_source("source1")
526            .auto_start(false)
527            .build();
528
529        assert_eq!(config.id, "test-query");
530        assert_eq!(config.query, "MATCH (n) RETURN n");
531        assert_eq!(config.query_language, QueryLanguage::Cypher);
532        assert!(!config.auto_start);
533        assert_eq!(config.sources.len(), 1);
534        assert_eq!(config.sources[0].source_id, "source1");
535    }
536
537    #[test]
538    fn test_query_builder_gql() {
539        let config = Query::gql("test-query")
540            .query("MATCH (n:Person) RETURN n.name")
541            .from_source("source1")
542            .build();
543
544        assert_eq!(config.query_language, QueryLanguage::GQL);
545    }
546
547    #[test]
548    fn test_query_builder_multiple_sources() {
549        let config = Query::cypher("test-query")
550            .query("MATCH (n) RETURN n")
551            .from_source("source1")
552            .from_source("source2")
553            .build();
554
555        assert_eq!(config.sources.len(), 2);
556    }
557
558    #[tokio::test]
559    async fn test_drasi_lib_builder_empty() {
560        let core = DrasiLibBuilder::new().build().await.unwrap();
561
562        assert!(!core.is_running().await);
563    }
564
565    #[tokio::test]
566    async fn test_drasi_lib_builder_with_id() {
567        let core = DrasiLibBuilder::new()
568            .with_id("test-server")
569            .build()
570            .await
571            .unwrap();
572
573        assert_eq!(core.get_config().id, "test-server");
574    }
575
576    #[tokio::test]
577    async fn test_drasi_lib_builder_with_query_no_source() {
578        // Test builder with query configuration that has no source subscriptions
579        // In the instance-based approach, sources are added after build()
580        let core = DrasiLibBuilder::new()
581            .with_id("test-server")
582            .with_query(
583                Query::cypher("query1")
584                    .query("MATCH (n) RETURN n")
585                    // No from_source() call - query has no source subscriptions
586                    .auto_start(false)
587                    .build(),
588            )
589            .build()
590            .await
591            .unwrap();
592
593        let queries = core.list_queries().await.unwrap();
594        assert_eq!(queries.len(), 1);
595    }
596
597    // ==========================================================================
598    // DrasiLib Builder Integration Tests (from builder_tests.rs)
599    // ==========================================================================
600
601    #[tokio::test]
602    async fn test_builder_creates_initialized_server() {
603        let core = DrasiLib::builder().with_id("builder-test").build().await;
604
605        assert!(core.is_ok(), "Builder should create initialized server");
606        let core = core.unwrap();
607        assert!(
608            core.state_guard.is_initialized().await,
609            "Server should be initialized"
610        );
611    }
612
613    #[tokio::test]
614    async fn test_builder_with_query() {
615        // In the instance-based approach, sources and reactions are added as instances
616        // after the builder creates the core. Here we just test query config addition.
617        let core = DrasiLib::builder()
618            .with_id("complex-server")
619            .with_query(
620                Query::cypher("query1")
621                    .query("MATCH (n) RETURN n")
622                    .from_source("source1")
623                    .build(),
624            )
625            .build()
626            .await;
627
628        assert!(core.is_ok(), "Builder with query should succeed");
629        let core = core.unwrap();
630        assert!(core.state_guard.is_initialized().await);
631        assert_eq!(core.config.queries.len(), 1);
632    }
633}