Skip to main content

drasi_lib/
builder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//!     .with_id("my-server")
47//!     // .with_source(my_source)      // Ownership transferred
48//!     // .with_reaction(my_reaction)  // Ownership transferred
49//!     .with_query(
50//!         Query::cypher("my-query")
51//!             .query("MATCH (n:Person) RETURN n")
52//!             .from_source("events")
53//!             .build()
54//!     )
55//!     .build()
56//!     .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67    DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::indexes::IndexBackendPlugin;
71use crate::indexes::StorageBackendConfig;
72use crate::lib_core::DrasiLib;
73use crate::reactions::Reaction as ReactionTrait;
74use crate::sources::Source as SourceTrait;
75use crate::state_store::StateStoreProvider;
76use drasi_core::models::SourceMiddlewareConfig;
77
78// ============================================================================
79// DrasiLibBuilder
80// ============================================================================
81
82/// Fluent builder for creating DrasiLib instances.
83///
84/// Use `DrasiLib::builder()` to get started.
85///
86/// # Plugin Architecture
87///
88/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
89/// reactions are created externally as fully-configured instances implementing
90/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
91///
92/// # Example
93///
94/// ```no_run
95/// use drasi_lib::{DrasiLib, Query};
96///
97/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98/// // Source and reaction instances are created externally by plugins
99/// // Ownership is transferred to DrasiLib when added
100/// // let my_source = my_source_plugin::create(...);
101/// // let my_reaction = my_reaction_plugin::create(...);
102///
103/// let core = DrasiLib::builder()
104///     .with_id("my-server")
105///     // .with_source(my_source)      // Ownership transferred
106///     // .with_reaction(my_reaction)  // Ownership transferred
107///     .with_query(
108///         Query::cypher("my-query")
109///             .query("MATCH (n) RETURN n")
110///             .from_source("my-source")
111///             .build()
112///     )
113///     .build()
114///     .await?;
115/// # Ok(())
116/// # }
117/// ```
118pub struct DrasiLibBuilder {
119    server_id: Option<String>,
120    priority_queue_capacity: Option<usize>,
121    dispatch_buffer_capacity: Option<usize>,
122    storage_backends: Vec<StorageBackendConfig>,
123    query_configs: Vec<QueryConfig>,
124    source_instances: Vec<(
125        Box<dyn SourceTrait>,
126        std::collections::HashMap<String, String>,
127    )>,
128    reaction_instances: Vec<(
129        Box<dyn ReactionTrait>,
130        std::collections::HashMap<String, String>,
131    )>,
132    index_provider: Option<Arc<dyn IndexBackendPlugin>>,
133    state_store_provider: Option<Arc<dyn StateStoreProvider>>,
134}
135
136impl Default for DrasiLibBuilder {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl DrasiLibBuilder {
143    /// Create a new builder with default values.
144    pub fn new() -> Self {
145        Self {
146            server_id: None,
147            priority_queue_capacity: None,
148            dispatch_buffer_capacity: None,
149            storage_backends: Vec::new(),
150            query_configs: Vec::new(),
151            source_instances: Vec::new(),
152            reaction_instances: Vec::new(),
153            index_provider: None,
154            state_store_provider: None,
155        }
156    }
157
158    /// Set the server ID.
159    pub fn with_id(mut self, id: impl Into<String>) -> Self {
160        self.server_id = Some(id.into());
161        self
162    }
163
164    /// Set the default priority queue capacity for components.
165    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
166        self.priority_queue_capacity = Some(capacity);
167        self
168    }
169
170    /// Set the default dispatch buffer capacity for components.
171    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
172        self.dispatch_buffer_capacity = Some(capacity);
173        self
174    }
175
176    /// Add a storage backend configuration.
177    pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
178        self.storage_backends.push(config);
179        self
180    }
181
182    /// Set the index backend provider for persistent storage.
183    ///
184    /// When using RocksDB or Redis/Garnet storage backends, you must provide
185    /// an index provider that implements `IndexBackendPlugin`. The provider
186    /// is responsible for creating the actual index instances.
187    ///
188    /// If no index provider is set, only in-memory storage backends can be used.
189    /// Attempting to use RocksDB or Redis backends without a provider will result
190    /// in an error.
191    ///
192    /// # Example
193    /// ```ignore
194    /// use drasi_index_rocksdb::RocksDbIndexProvider;
195    /// use std::sync::Arc;
196    ///
197    /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
198    /// let core = DrasiLib::builder()
199    ///     .with_index_provider(Arc::new(provider))
200    ///     .build()
201    ///     .await?;
202    /// ```
203    pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
204        self.index_provider = Some(provider);
205        self
206    }
207
208    /// Set the state store provider for plugin state persistence.
209    ///
210    /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
211    /// to store and retrieve runtime state that can persist across runs of DrasiLib.
212    ///
213    /// If no state store provider is set, the default in-memory provider will be used.
214    /// The in-memory provider does not persist state across restarts.
215    ///
216    /// # Example
217    /// ```ignore
218    /// use drasi_state_store_json::JsonStateStoreProvider;
219    /// use std::sync::Arc;
220    ///
221    /// let state_store = JsonStateStoreProvider::new("/data/state");
222    /// let core = DrasiLib::builder()
223    ///     .with_state_store_provider(Arc::new(state_store))
224    ///     .build()
225    ///     .await?;
226    /// ```
227    pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
228        self.state_store_provider = Some(provider);
229        self
230    }
231
232    /// Add a source instance, taking ownership.
233    ///
234    /// Source instances are created externally by plugins with their own typed configurations.
235    /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
236    ///
237    /// # Example
238    /// ```ignore
239    /// let source = MySource::new("my-source", config)?;
240    /// let core = DrasiLib::builder()
241    ///     .with_source(source)  // Ownership transferred
242    ///     .build()
243    ///     .await?;
244    /// ```
245    pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
246        self.source_instances
247            .push((Box::new(source), std::collections::HashMap::new()));
248        self
249    }
250
251    /// Add a source instance with additional component metadata.
252    ///
253    /// Like [`with_source`](Self::with_source) but merges `extra_metadata`
254    /// (e.g. `pluginId`, `pluginGeneration`) into the component graph node.
255    pub fn with_source_metadata(
256        mut self,
257        source: impl SourceTrait + 'static,
258        extra_metadata: std::collections::HashMap<String, String>,
259    ) -> Self {
260        self.source_instances
261            .push((Box::new(source), extra_metadata));
262        self
263    }
264
265    /// Add a query configuration.
266    pub fn with_query(mut self, config: QueryConfig) -> Self {
267        self.query_configs.push(config);
268        self
269    }
270
271    /// Add a reaction instance, taking ownership.
272    ///
273    /// Reaction instances are created externally by plugins with their own typed configurations.
274    /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
275    ///
276    /// # Example
277    /// ```ignore
278    /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
279    /// let core = DrasiLib::builder()
280    ///     .with_reaction(reaction)  // Ownership transferred
281    ///     .build()
282    ///     .await?;
283    /// ```
284    pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
285        self.reaction_instances
286            .push((Box::new(reaction), std::collections::HashMap::new()));
287        self
288    }
289
290    /// Add a reaction instance with additional component metadata.
291    ///
292    /// Like [`with_reaction`](Self::with_reaction) but merges `extra_metadata`
293    /// (e.g. `pluginId`, `pluginGeneration`) into the component graph node.
294    pub fn with_reaction_metadata(
295        mut self,
296        reaction: impl ReactionTrait + 'static,
297        extra_metadata: std::collections::HashMap<String, String>,
298    ) -> Self {
299        self.reaction_instances
300            .push((Box::new(reaction), extra_metadata));
301        self
302    }
303
304    /// Build the DrasiLib instance.
305    ///
306    /// This validates the configuration, creates all components, and initializes the server.
307    /// After building, you can call `start()` to begin processing.
308    pub async fn build(self) -> Result<DrasiLib> {
309        // Build the configuration
310        let config = DrasiLibConfig {
311            id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
312            priority_queue_capacity: self.priority_queue_capacity,
313            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
314            storage_backends: self.storage_backends,
315            queries: self.query_configs.clone(),
316        };
317
318        // Validate the configuration
319        config
320            .validate()
321            .map_err(|e| DrasiError::validation(e.to_string()))?;
322
323        // Create runtime config and server with optional index and state store providers
324        let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
325            config,
326            self.index_provider,
327            self.state_store_provider,
328        ));
329        let mut core = DrasiLib::new(runtime_config);
330
331        // Inject state store before provisioning sources (they need it for initialization)
332        let state_store = core.config.state_store_provider.clone();
333        core.source_manager
334            .inject_state_store(state_store.clone())
335            .await;
336        core.reaction_manager.inject_state_store(state_store).await;
337
338        // Register the component graph source BEFORE initialize (which loads query config).
339        // Queries reference sources, so sources must exist in the graph first.
340        {
341            use crate::sources::component_graph_source::ComponentGraphSource;
342            let graph_source = ComponentGraphSource::new(
343                core.component_event_broadcast_tx.clone(),
344                core.config.id.clone(),
345                core.component_graph.clone(),
346            )
347            .map_err(|e| {
348                DrasiError::operation_failed(
349                    "source",
350                    "component-graph",
351                    "add",
352                    format!("Failed to create: {e}"),
353                )
354            })?;
355
356            let source_id = graph_source.id().to_string();
357            let source_type = graph_source.type_name().to_string();
358            {
359                let mut graph = core.component_graph.write().await;
360                let mut metadata = std::collections::HashMap::new();
361                metadata.insert("kind".to_string(), source_type);
362                metadata.insert(
363                    "autoStart".to_string(),
364                    graph_source.auto_start().to_string(),
365                );
366                graph.register_source(&source_id, metadata).map_err(|e| {
367                    DrasiError::operation_failed(
368                        "source",
369                        &source_id,
370                        "add",
371                        format!("Failed to register: {e}"),
372                    )
373                })?;
374            }
375            if let Err(e) = core.source_manager.provision_source(graph_source).await {
376                let mut graph = core.component_graph.write().await;
377                let _ = graph.deregister(&source_id);
378                return Err(DrasiError::operation_failed(
379                    "source",
380                    &source_id,
381                    "add",
382                    format!("Failed to provision: {e}"),
383                ));
384            }
385        }
386
387        // Inject pre-built source instances BEFORE initialize.
388        // Queries reference sources by ID, so sources must be in the graph first.
389        for (source, extra_metadata) in self.source_instances {
390            let source_id = source.id().to_string();
391            let source_type = source.type_name().to_string();
392            let auto_start = source.auto_start();
393
394            {
395                let mut graph = core.component_graph.write().await;
396                let mut metadata = std::collections::HashMap::new();
397                metadata.insert("kind".to_string(), source_type);
398                metadata.insert("autoStart".to_string(), auto_start.to_string());
399                metadata.extend(extra_metadata);
400                graph.register_source(&source_id, metadata).map_err(|e| {
401                    DrasiError::operation_failed(
402                        "source",
403                        &source_id,
404                        "add",
405                        format!("Failed to register: {e}"),
406                    )
407                })?;
408            }
409            if let Err(e) = core.source_manager.provision_source(source).await {
410                let mut graph = core.component_graph.write().await;
411                let _ = graph.deregister(&source_id);
412                return Err(DrasiError::operation_failed(
413                    "source",
414                    &source_id,
415                    "add",
416                    format!("Failed to provision: {e}"),
417                ));
418            }
419        }
420
421        // Initialize the server (loads query configurations — sources must already be registered)
422        core.initialize().await?;
423
424        // Inject pre-built reaction instances
425        for (reaction, extra_metadata) in self.reaction_instances {
426            let reaction_id = reaction.id().to_string();
427            let reaction_type = reaction.type_name().to_string();
428            let query_ids = reaction.query_ids();
429
430            // Register in graph first, then provision
431            {
432                let mut graph = core.component_graph.write().await;
433                let mut metadata = std::collections::HashMap::new();
434                metadata.insert("kind".to_string(), reaction_type);
435                metadata.extend(extra_metadata);
436                graph
437                    .register_reaction(&reaction_id, metadata, &query_ids)
438                    .map_err(|e| {
439                        DrasiError::operation_failed(
440                            "reaction",
441                            &reaction_id,
442                            "add",
443                            format!("Failed to register: {e}"),
444                        )
445                    })?;
446            }
447            if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
448                let mut graph = core.component_graph.write().await;
449                let _ = graph.deregister(&reaction_id);
450                return Err(DrasiError::operation_failed(
451                    "reaction",
452                    &reaction_id,
453                    "add",
454                    format!("Failed to provision: {e}"),
455                ));
456            }
457        }
458
459        Ok(core)
460    }
461}
462
463// ============================================================================
464// Query Builder
465// ============================================================================
466
467/// Fluent builder for query configurations.
468///
469/// Use `Query::cypher()` or `Query::gql()` to get started.
470///
471/// # Example
472///
473/// ```no_run
474/// use drasi_lib::Query;
475///
476/// let query_config = Query::cypher("my-query")
477///     .query("MATCH (n:Person) RETURN n.name, n.age")
478///     .from_source("my-source")
479///     .auto_start(true)
480///     .build();
481/// ```
482pub struct Query {
483    id: String,
484    query: String,
485    query_language: QueryLanguage,
486    sources: Vec<SourceSubscriptionConfig>,
487    middleware: Vec<SourceMiddlewareConfig>,
488    auto_start: bool,
489    joins: Option<Vec<QueryJoinConfig>>,
490    enable_bootstrap: bool,
491    bootstrap_buffer_size: usize,
492    priority_queue_capacity: Option<usize>,
493    dispatch_buffer_capacity: Option<usize>,
494    dispatch_mode: Option<DispatchMode>,
495    storage_backend: Option<crate::indexes::StorageBackendRef>,
496}
497
498impl Query {
499    /// Create a new Cypher query builder.
500    pub fn cypher(id: impl Into<String>) -> Self {
501        Self {
502            id: id.into(),
503            query: String::new(),
504            query_language: QueryLanguage::Cypher,
505            sources: Vec::new(),
506            middleware: Vec::new(),
507            auto_start: true,
508            joins: None,
509            enable_bootstrap: true,
510            bootstrap_buffer_size: 10000,
511            priority_queue_capacity: None,
512            dispatch_buffer_capacity: None,
513            dispatch_mode: None,
514            storage_backend: None,
515        }
516    }
517
518    /// Create a new GQL (ISO 9074:2024) query builder.
519    pub fn gql(id: impl Into<String>) -> Self {
520        Self {
521            id: id.into(),
522            query: String::new(),
523            query_language: QueryLanguage::GQL,
524            sources: Vec::new(),
525            middleware: Vec::new(),
526            auto_start: true,
527            joins: None,
528            enable_bootstrap: true,
529            bootstrap_buffer_size: 10000,
530            priority_queue_capacity: None,
531            dispatch_buffer_capacity: None,
532            dispatch_mode: None,
533            storage_backend: None,
534        }
535    }
536
537    /// Set the query string.
538    pub fn query(mut self, query: impl Into<String>) -> Self {
539        self.query = query.into();
540        self
541    }
542
543    /// Subscribe to a source.
544    pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
545        self.sources.push(SourceSubscriptionConfig {
546            source_id: source_id.into(),
547            nodes: Vec::new(),
548            relations: Vec::new(),
549            pipeline: Vec::new(),
550        });
551        self
552    }
553
554    /// Subscribe to a source with a middleware pipeline.
555    ///
556    /// The pipeline is a list of middleware names (strings) that will be applied to
557    /// data from this source before it reaches the query.
558    pub fn from_source_with_pipeline(
559        mut self,
560        source_id: impl Into<String>,
561        pipeline: Vec<String>,
562    ) -> Self {
563        self.sources.push(SourceSubscriptionConfig {
564            source_id: source_id.into(),
565            nodes: Vec::new(),
566            relations: Vec::new(),
567            pipeline,
568        });
569        self
570    }
571
572    /// Add middleware to the query.
573    pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
574        self.middleware.push(middleware);
575        self
576    }
577
578    /// Set whether the query should auto-start.
579    pub fn auto_start(mut self, auto_start: bool) -> Self {
580        self.auto_start = auto_start;
581        self
582    }
583
584    /// Set the join configuration.
585    pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
586        self.joins = Some(joins);
587        self
588    }
589
590    /// Enable or disable bootstrap.
591    pub fn enable_bootstrap(mut self, enable: bool) -> Self {
592        self.enable_bootstrap = enable;
593        self
594    }
595
596    /// Set the bootstrap buffer size.
597    pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
598        self.bootstrap_buffer_size = size;
599        self
600    }
601
602    /// Set the priority queue capacity.
603    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
604        self.priority_queue_capacity = Some(capacity);
605        self
606    }
607
608    /// Set the dispatch buffer capacity.
609    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
610        self.dispatch_buffer_capacity = Some(capacity);
611        self
612    }
613
614    /// Set the dispatch mode.
615    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
616        self.dispatch_mode = Some(mode);
617        self
618    }
619
620    /// Set the storage backend reference.
621    pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
622        self.storage_backend = Some(backend);
623        self
624    }
625
626    /// Build the query configuration.
627    pub fn build(self) -> QueryConfig {
628        QueryConfig {
629            id: self.id,
630            query: self.query,
631            query_language: self.query_language,
632            sources: self.sources,
633            middleware: self.middleware,
634            auto_start: self.auto_start,
635            joins: self.joins,
636            enable_bootstrap: self.enable_bootstrap,
637            bootstrap_buffer_size: self.bootstrap_buffer_size,
638            priority_queue_capacity: self.priority_queue_capacity,
639            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
640            dispatch_mode: self.dispatch_mode,
641            storage_backend: self.storage_backend,
642        }
643    }
644}
645
646// ============================================================================
647// Tests
648// ============================================================================
649
650#[cfg(test)]
651mod tests {
652    use super::*;
653    use crate::DrasiLib;
654
655    // ==========================================================================
656    // Query Builder Tests
657    // ==========================================================================
658
659    #[test]
660    fn test_query_builder_cypher() {
661        let config = Query::cypher("test-query")
662            .query("MATCH (n) RETURN n")
663            .from_source("source1")
664            .auto_start(false)
665            .build();
666
667        assert_eq!(config.id, "test-query");
668        assert_eq!(config.query, "MATCH (n) RETURN n");
669        assert_eq!(config.query_language, QueryLanguage::Cypher);
670        assert!(!config.auto_start);
671        assert_eq!(config.sources.len(), 1);
672        assert_eq!(config.sources[0].source_id, "source1");
673    }
674
675    #[test]
676    fn test_query_builder_gql() {
677        let config = Query::gql("test-query")
678            .query("MATCH (n:Person) RETURN n.name")
679            .from_source("source1")
680            .build();
681
682        assert_eq!(config.query_language, QueryLanguage::GQL);
683    }
684
685    #[test]
686    fn test_query_builder_multiple_sources() {
687        let config = Query::cypher("test-query")
688            .query("MATCH (n) RETURN n")
689            .from_source("source1")
690            .from_source("source2")
691            .build();
692
693        assert_eq!(config.sources.len(), 2);
694    }
695
696    #[tokio::test]
697    async fn test_drasi_lib_builder_empty() {
698        let core = DrasiLibBuilder::new().build().await.unwrap();
699
700        assert!(!core.is_running().await);
701    }
702
703    #[tokio::test]
704    async fn test_drasi_lib_builder_with_id() {
705        let core = DrasiLibBuilder::new()
706            .with_id("test-server")
707            .build()
708            .await
709            .unwrap();
710
711        assert_eq!(core.get_config().id, "test-server");
712    }
713
714    #[tokio::test]
715    async fn test_drasi_lib_builder_with_query_no_source() {
716        // Test builder with query configuration that has no source subscriptions
717        // In the instance-based approach, sources are added after build()
718        let core = DrasiLibBuilder::new()
719            .with_id("test-server")
720            .with_query(
721                Query::cypher("query1")
722                    .query("MATCH (n) RETURN n")
723                    // No from_source() call - query has no source subscriptions
724                    .auto_start(false)
725                    .build(),
726            )
727            .build()
728            .await
729            .unwrap();
730
731        let queries = core.list_queries().await.unwrap();
732        assert_eq!(queries.len(), 1);
733    }
734
735    // ==========================================================================
736    // DrasiLib Builder Integration Tests (from builder_tests.rs)
737    // ==========================================================================
738
739    #[tokio::test]
740    async fn test_builder_creates_initialized_server() {
741        let core = DrasiLib::builder().with_id("builder-test").build().await;
742
743        assert!(core.is_ok(), "Builder should create initialized server");
744        let core = core.unwrap();
745        assert!(
746            core.state_guard.is_initialized(),
747            "Server should be initialized"
748        );
749    }
750
751    #[tokio::test]
752    async fn test_builder_with_query() {
753        // In the instance-based approach, sources and reactions are added as instances
754        // after the builder creates the core. Here we just test query config addition.
755        // Source must be registered before a query can reference it
756        let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
757        let core = DrasiLib::builder()
758            .with_id("complex-server")
759            .with_source(source)
760            .with_query(
761                Query::cypher("query1")
762                    .query("MATCH (n) RETURN n")
763                    .from_source("source1")
764                    .build(),
765            )
766            .build()
767            .await;
768
769        assert!(core.is_ok(), "Builder with query should succeed");
770        let core = core.unwrap();
771        assert!(core.state_guard.is_initialized());
772        assert_eq!(core.config.queries.len(), 1);
773    }
774
775    // ==========================================================================
776    // DrasiLibBuilder Unit Tests
777    // ==========================================================================
778
779    #[test]
780    fn test_builder_with_id_sets_id() {
781        let builder = DrasiLibBuilder::new().with_id("my-server");
782        assert_eq!(builder.server_id, Some("my-server".to_string()));
783    }
784
785    #[test]
786    fn test_builder_with_id_accepts_string() {
787        let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
788        assert_eq!(builder.server_id, Some("owned-id".to_string()));
789    }
790
791    #[test]
792    fn test_builder_with_priority_queue_capacity() {
793        let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
794        assert_eq!(builder.priority_queue_capacity, Some(50000));
795    }
796
797    #[test]
798    fn test_builder_with_dispatch_buffer_capacity() {
799        let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
800        assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
801    }
802
803    #[test]
804    fn test_builder_with_query_adds_to_list() {
805        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
806        let builder = DrasiLibBuilder::new().with_query(q);
807        assert_eq!(builder.query_configs.len(), 1);
808        assert_eq!(builder.query_configs[0].id, "q1");
809    }
810
811    #[test]
812    fn test_builder_with_multiple_queries() {
813        let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
814        let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
815        let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
816        assert_eq!(builder.query_configs.len(), 2);
817        assert_eq!(builder.query_configs[0].id, "q1");
818        assert_eq!(builder.query_configs[1].id, "q2");
819    }
820
821    #[test]
822    fn test_builder_add_storage_backend() {
823        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
824
825        let backend = StorageBackendConfig {
826            id: "mem1".to_string(),
827            spec: StorageBackendSpec::Memory {
828                enable_archive: false,
829            },
830        };
831        let builder = DrasiLibBuilder::new().add_storage_backend(backend);
832        assert_eq!(builder.storage_backends.len(), 1);
833        assert_eq!(builder.storage_backends[0].id, "mem1");
834    }
835
836    #[test]
837    fn test_builder_add_multiple_storage_backends() {
838        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
839
840        let b1 = StorageBackendConfig {
841            id: "mem1".to_string(),
842            spec: StorageBackendSpec::Memory {
843                enable_archive: false,
844            },
845        };
846        let b2 = StorageBackendConfig {
847            id: "mem2".to_string(),
848            spec: StorageBackendSpec::Memory {
849                enable_archive: true,
850            },
851        };
852        let builder = DrasiLibBuilder::new()
853            .add_storage_backend(b1)
854            .add_storage_backend(b2);
855        assert_eq!(builder.storage_backends.len(), 2);
856        assert_eq!(builder.storage_backends[0].id, "mem1");
857        assert_eq!(builder.storage_backends[1].id, "mem2");
858    }
859
860    #[test]
861    fn test_builder_default_values() {
862        let builder = DrasiLibBuilder::new();
863        assert_eq!(builder.server_id, None);
864        assert_eq!(builder.priority_queue_capacity, None);
865        assert_eq!(builder.dispatch_buffer_capacity, None);
866        assert!(builder.storage_backends.is_empty());
867        assert!(builder.query_configs.is_empty());
868        assert!(builder.source_instances.is_empty());
869        assert!(builder.reaction_instances.is_empty());
870        assert!(builder.index_provider.is_none());
871        assert!(builder.state_store_provider.is_none());
872    }
873
874    #[test]
875    fn test_builder_fluent_chaining() {
876        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
877
878        let backend = StorageBackendConfig {
879            id: "mem".to_string(),
880            spec: StorageBackendSpec::Memory {
881                enable_archive: false,
882            },
883        };
884        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
885
886        let builder = DrasiLibBuilder::new()
887            .with_id("chained")
888            .with_priority_queue_capacity(20000)
889            .with_dispatch_buffer_capacity(3000)
890            .add_storage_backend(backend)
891            .with_query(q);
892
893        assert_eq!(builder.server_id, Some("chained".to_string()));
894        assert_eq!(builder.priority_queue_capacity, Some(20000));
895        assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
896        assert_eq!(builder.storage_backends.len(), 1);
897        assert_eq!(builder.query_configs.len(), 1);
898    }
899
900    #[tokio::test]
901    async fn test_builder_default_id_when_none_set() {
902        let core = DrasiLibBuilder::new().build().await.unwrap();
903        assert_eq!(core.get_config().id, "drasi-lib");
904    }
905
906    #[tokio::test]
907    async fn test_builder_with_storage_backend_builds_ok() {
908        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
909
910        let backend = StorageBackendConfig {
911            id: "test-mem".to_string(),
912            spec: StorageBackendSpec::Memory {
913                enable_archive: false,
914            },
915        };
916        let core = DrasiLibBuilder::new()
917            .add_storage_backend(backend)
918            .build()
919            .await;
920        assert!(core.is_ok(), "Builder with storage backend should succeed");
921    }
922
923    // ==========================================================================
924    // Query Builder Unit Tests
925    // ==========================================================================
926
927    #[test]
928    fn test_query_cypher_sets_id_and_language() {
929        let q = Query::cypher("cypher-q");
930        assert_eq!(q.id, "cypher-q");
931        assert_eq!(q.query_language, QueryLanguage::Cypher);
932    }
933
934    #[test]
935    fn test_query_gql_sets_id_and_language() {
936        let q = Query::gql("gql-q");
937        assert_eq!(q.id, "gql-q");
938        assert_eq!(q.query_language, QueryLanguage::GQL);
939    }
940
941    #[test]
942    fn test_query_from_source_adds_source() {
943        let q = Query::cypher("q").from_source("src1");
944        assert_eq!(q.sources.len(), 1);
945        assert_eq!(q.sources[0].source_id, "src1");
946    }
947
948    #[test]
949    fn test_query_from_source_chaining() {
950        let q = Query::cypher("q")
951            .from_source("src1")
952            .from_source("src2")
953            .from_source("src3");
954        assert_eq!(q.sources.len(), 3);
955        assert_eq!(q.sources[0].source_id, "src1");
956        assert_eq!(q.sources[1].source_id, "src2");
957        assert_eq!(q.sources[2].source_id, "src3");
958    }
959
960    #[test]
961    fn test_query_auto_start_default_true() {
962        let q = Query::cypher("q");
963        assert!(q.auto_start);
964    }
965
966    #[test]
967    fn test_query_auto_start_false() {
968        let q = Query::cypher("q").auto_start(false);
969        assert!(!q.auto_start);
970    }
971
972    #[test]
973    fn test_query_enable_bootstrap_default_true() {
974        let q = Query::cypher("q");
975        assert!(q.enable_bootstrap);
976    }
977
978    #[test]
979    fn test_query_enable_bootstrap_false() {
980        let q = Query::cypher("q").enable_bootstrap(false);
981        assert!(!q.enable_bootstrap);
982    }
983
984    #[test]
985    fn test_query_bootstrap_buffer_size_default() {
986        let q = Query::cypher("q");
987        assert_eq!(q.bootstrap_buffer_size, 10000);
988    }
989
990    #[test]
991    fn test_query_with_bootstrap_buffer_size() {
992        let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
993        assert_eq!(q.bootstrap_buffer_size, 5000);
994    }
995
996    #[test]
997    fn test_query_with_dispatch_mode_broadcast() {
998        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
999        assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1000    }
1001
1002    #[test]
1003    fn test_query_with_dispatch_mode_channel() {
1004        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1005        assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1006    }
1007
1008    #[test]
1009    fn test_query_dispatch_mode_default_none() {
1010        let q = Query::cypher("q");
1011        assert_eq!(q.dispatch_mode, None);
1012    }
1013
1014    #[test]
1015    fn test_query_with_priority_queue_capacity() {
1016        let q = Query::cypher("q").with_priority_queue_capacity(50000);
1017        assert_eq!(q.priority_queue_capacity, Some(50000));
1018    }
1019
1020    #[test]
1021    fn test_query_priority_queue_capacity_default_none() {
1022        let q = Query::cypher("q");
1023        assert_eq!(q.priority_queue_capacity, None);
1024    }
1025
1026    #[test]
1027    fn test_query_with_dispatch_buffer_capacity() {
1028        let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1029        assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1030    }
1031
1032    #[test]
1033    fn test_query_dispatch_buffer_capacity_default_none() {
1034        let q = Query::cypher("q");
1035        assert_eq!(q.dispatch_buffer_capacity, None);
1036    }
1037
1038    #[test]
1039    fn test_query_build_propagates_all_fields() {
1040        let config = Query::cypher("full-query")
1041            .query("MATCH (n:Person) RETURN n.name")
1042            .from_source("source-a")
1043            .from_source("source-b")
1044            .auto_start(false)
1045            .enable_bootstrap(false)
1046            .with_bootstrap_buffer_size(5000)
1047            .with_priority_queue_capacity(50000)
1048            .with_dispatch_buffer_capacity(2500)
1049            .with_dispatch_mode(DispatchMode::Broadcast)
1050            .build();
1051
1052        assert_eq!(config.id, "full-query");
1053        assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1054        assert_eq!(config.query_language, QueryLanguage::Cypher);
1055        assert_eq!(config.sources.len(), 2);
1056        assert_eq!(config.sources[0].source_id, "source-a");
1057        assert_eq!(config.sources[1].source_id, "source-b");
1058        assert!(!config.auto_start);
1059        assert!(!config.enable_bootstrap);
1060        assert_eq!(config.bootstrap_buffer_size, 5000);
1061        assert_eq!(config.priority_queue_capacity, Some(50000));
1062        assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1063        assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1064        assert!(config.joins.is_none());
1065        assert!(config.middleware.is_empty());
1066        assert!(config.storage_backend.is_none());
1067    }
1068
1069    #[test]
1070    fn test_query_build_gql_propagates_language() {
1071        let config = Query::gql("gql-full")
1072            .query("MATCH (n) RETURN n")
1073            .from_source("src")
1074            .build();
1075
1076        assert_eq!(config.id, "gql-full");
1077        assert_eq!(config.query_language, QueryLanguage::GQL);
1078        assert_eq!(config.query, "MATCH (n) RETURN n");
1079        assert_eq!(config.sources.len(), 1);
1080        // Verify defaults are preserved through build
1081        assert!(config.auto_start);
1082        assert!(config.enable_bootstrap);
1083        assert_eq!(config.bootstrap_buffer_size, 10000);
1084    }
1085
1086    #[test]
1087    fn test_query_build_defaults() {
1088        let config = Query::cypher("defaults-only").build();
1089
1090        assert_eq!(config.id, "defaults-only");
1091        assert_eq!(config.query, "");
1092        assert_eq!(config.query_language, QueryLanguage::Cypher);
1093        assert!(config.sources.is_empty());
1094        assert!(config.middleware.is_empty());
1095        assert!(config.auto_start);
1096        assert!(config.joins.is_none());
1097        assert!(config.enable_bootstrap);
1098        assert_eq!(config.bootstrap_buffer_size, 10000);
1099        assert_eq!(config.priority_queue_capacity, None);
1100        assert_eq!(config.dispatch_buffer_capacity, None);
1101        assert_eq!(config.dispatch_mode, None);
1102        assert!(config.storage_backend.is_none());
1103    }
1104}