Skip to main content

drasi_lib/
builder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//!     .with_id("my-server")
47//!     // .with_source(my_source)      // Ownership transferred
48//!     // .with_reaction(my_reaction)  // Ownership transferred
49//!     .with_query(
50//!         Query::cypher("my-query")
51//!             .query("MATCH (n:Person) RETURN n")
52//!             .from_source("events")
53//!             .build()
54//!     )
55//!     .build()
56//!     .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67    DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::sources::Source as SourceTrait;
76use crate::state_store::StateStoreProvider;
77use drasi_core::models::SourceMiddlewareConfig;
78
79// ============================================================================
80// DrasiLibBuilder
81// ============================================================================
82
83/// Fluent builder for creating DrasiLib instances.
84///
85/// Use `DrasiLib::builder()` to get started.
86///
87/// # Plugin Architecture
88///
89/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
90/// reactions are created externally as fully-configured instances implementing
91/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
92///
93/// # Example
94///
95/// ```no_run
96/// use drasi_lib::{DrasiLib, Query};
97///
98/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
99/// // Source and reaction instances are created externally by plugins
100/// // Ownership is transferred to DrasiLib when added
101/// // let my_source = my_source_plugin::create(...);
102/// // let my_reaction = my_reaction_plugin::create(...);
103///
104/// let core = DrasiLib::builder()
105///     .with_id("my-server")
106///     // .with_source(my_source)      // Ownership transferred
107///     // .with_reaction(my_reaction)  // Ownership transferred
108///     .with_query(
109///         Query::cypher("my-query")
110///             .query("MATCH (n) RETURN n")
111///             .from_source("my-source")
112///             .build()
113///     )
114///     .build()
115///     .await?;
116/// # Ok(())
117/// # }
118/// ```
119pub struct DrasiLibBuilder {
120    server_id: Option<String>,
121    priority_queue_capacity: Option<usize>,
122    dispatch_buffer_capacity: Option<usize>,
123    storage_backends: Vec<StorageBackendConfig>,
124    query_configs: Vec<QueryConfig>,
125    source_instances: Vec<(
126        Box<dyn SourceTrait>,
127        std::collections::HashMap<String, String>,
128    )>,
129    reaction_instances: Vec<(
130        Box<dyn ReactionTrait>,
131        std::collections::HashMap<String, String>,
132    )>,
133    index_provider: Option<Arc<dyn IndexBackendPlugin>>,
134    state_store_provider: Option<Arc<dyn StateStoreProvider>>,
135    identity_provider: Option<Arc<dyn IdentityProvider>>,
136}
137
138impl Default for DrasiLibBuilder {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl DrasiLibBuilder {
145    /// Create a new builder with default values.
146    pub fn new() -> Self {
147        Self {
148            server_id: None,
149            priority_queue_capacity: None,
150            dispatch_buffer_capacity: None,
151            storage_backends: Vec::new(),
152            query_configs: Vec::new(),
153            source_instances: Vec::new(),
154            reaction_instances: Vec::new(),
155            index_provider: None,
156            state_store_provider: None,
157            identity_provider: None,
158        }
159    }
160
161    /// Set the server ID.
162    pub fn with_id(mut self, id: impl Into<String>) -> Self {
163        self.server_id = Some(id.into());
164        self
165    }
166
167    /// Set the default priority queue capacity for components.
168    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
169        self.priority_queue_capacity = Some(capacity);
170        self
171    }
172
173    /// Set the default dispatch buffer capacity for components.
174    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
175        self.dispatch_buffer_capacity = Some(capacity);
176        self
177    }
178
179    /// Add a storage backend configuration.
180    pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
181        self.storage_backends.push(config);
182        self
183    }
184
185    /// Set the index backend provider for persistent storage.
186    ///
187    /// When using RocksDB or Redis/Garnet storage backends, you must provide
188    /// an index provider that implements `IndexBackendPlugin`. The provider
189    /// is responsible for creating the actual index instances.
190    ///
191    /// If no index provider is set, only in-memory storage backends can be used.
192    /// Attempting to use RocksDB or Redis backends without a provider will result
193    /// in an error.
194    ///
195    /// # Example
196    /// ```ignore
197    /// use drasi_index_rocksdb::RocksDbIndexProvider;
198    /// use std::sync::Arc;
199    ///
200    /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
201    /// let core = DrasiLib::builder()
202    ///     .with_index_provider(Arc::new(provider))
203    ///     .build()
204    ///     .await?;
205    /// ```
206    pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
207        self.index_provider = Some(provider);
208        self
209    }
210
211    /// Set the state store provider for plugin state persistence.
212    ///
213    /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
214    /// to store and retrieve runtime state that can persist across runs of DrasiLib.
215    ///
216    /// If no state store provider is set, the default in-memory provider will be used.
217    /// The in-memory provider does not persist state across restarts.
218    ///
219    /// # Example
220    /// ```ignore
221    /// use drasi_state_store_json::JsonStateStoreProvider;
222    /// use std::sync::Arc;
223    ///
224    /// let state_store = JsonStateStoreProvider::new("/data/state");
225    /// let core = DrasiLib::builder()
226    ///     .with_state_store_provider(Arc::new(state_store))
227    ///     .build()
228    ///     .await?;
229    /// ```
230    pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
231        self.state_store_provider = Some(provider);
232        self
233    }
234
235    /// Set the identity provider for credential injection.
236    ///
237    /// Identity providers supply authentication credentials (passwords, tokens,
238    /// certificates) to sources and reactions that need them for connecting to
239    /// external systems.
240    ///
241    /// If no identity provider is set, sources and reactions will receive `None`
242    /// for `context.identity_provider`.
243    ///
244    /// # Example
245    /// ```ignore
246    /// use drasi_identity_azure::AzureIdentityProvider;
247    /// use std::sync::Arc;
248    ///
249    /// let provider = AzureIdentityProvider::with_default_credentials("user@tenant.onmicrosoft.com")?;
250    /// let core = DrasiLib::builder()
251    ///     .with_identity_provider(Arc::new(provider))
252    ///     .build()
253    ///     .await?;
254    /// ```
255    pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
256        self.identity_provider = Some(provider);
257        self
258    }
259
260    /// Add a source instance, taking ownership.
261    ///
262    /// Source instances are created externally by plugins with their own typed configurations.
263    /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
264    ///
265    /// # Example
266    /// ```ignore
267    /// let source = MySource::new("my-source", config)?;
268    /// let core = DrasiLib::builder()
269    ///     .with_source(source)  // Ownership transferred
270    ///     .build()
271    ///     .await?;
272    /// ```
273    pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
274        self.source_instances
275            .push((Box::new(source), std::collections::HashMap::new()));
276        self
277    }
278
279    /// Add a source instance with additional component metadata.
280    ///
281    /// Like [`with_source`](Self::with_source) but merges `extra_metadata`
282    /// (e.g. `pluginId`) into the component graph node.
283    pub fn with_source_metadata(
284        mut self,
285        source: impl SourceTrait + 'static,
286        extra_metadata: std::collections::HashMap<String, String>,
287    ) -> Self {
288        self.source_instances
289            .push((Box::new(source), extra_metadata));
290        self
291    }
292
293    /// Add a query configuration.
294    pub fn with_query(mut self, config: QueryConfig) -> Self {
295        self.query_configs.push(config);
296        self
297    }
298
299    /// Add a reaction instance, taking ownership.
300    ///
301    /// Reaction instances are created externally by plugins with their own typed configurations.
302    /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
303    ///
304    /// # Example
305    /// ```ignore
306    /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
307    /// let core = DrasiLib::builder()
308    ///     .with_reaction(reaction)  // Ownership transferred
309    ///     .build()
310    ///     .await?;
311    /// ```
312    pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
313        self.reaction_instances
314            .push((Box::new(reaction), std::collections::HashMap::new()));
315        self
316    }
317
318    /// Add a reaction instance with additional component metadata.
319    ///
320    /// Like [`with_reaction`](Self::with_reaction) but merges `extra_metadata`
321    /// (e.g. `pluginId`) into the component graph node.
322    pub fn with_reaction_metadata(
323        mut self,
324        reaction: impl ReactionTrait + 'static,
325        extra_metadata: std::collections::HashMap<String, String>,
326    ) -> Self {
327        self.reaction_instances
328            .push((Box::new(reaction), extra_metadata));
329        self
330    }
331
332    /// Build the DrasiLib instance.
333    ///
334    /// This validates the configuration, creates all components, and initializes the server.
335    /// After building, you can call `start()` to begin processing.
336    pub async fn build(self) -> Result<DrasiLib> {
337        // Build the configuration
338        let config = DrasiLibConfig {
339            id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
340            priority_queue_capacity: self.priority_queue_capacity,
341            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
342            storage_backends: self.storage_backends,
343            queries: self.query_configs.clone(),
344        };
345
346        // Validate the configuration
347        config
348            .validate()
349            .map_err(|e| DrasiError::validation(e.to_string()))?;
350
351        // Create runtime config and server with optional index and state store providers
352        let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
353            config,
354            self.index_provider,
355            self.state_store_provider,
356            self.identity_provider,
357        ));
358        let mut core = DrasiLib::new(runtime_config);
359
360        // Inject state store before provisioning sources (they need it for initialization)
361        let state_store = core.config.state_store_provider.clone();
362        core.source_manager
363            .inject_state_store(state_store.clone())
364            .await;
365        core.reaction_manager.inject_state_store(state_store).await;
366
367        // Register the component graph source BEFORE initialize (which loads query config).
368        // Queries reference sources, so sources must exist in the graph first.
369        {
370            use crate::sources::component_graph_source::ComponentGraphSource;
371            let graph_source = ComponentGraphSource::new(
372                core.component_event_broadcast_tx.clone(),
373                core.config.id.clone(),
374                core.component_graph.clone(),
375            )
376            .map_err(|e| {
377                DrasiError::operation_failed(
378                    "source",
379                    "component-graph",
380                    "add",
381                    format!("Failed to create: {e}"),
382                )
383            })?;
384
385            let source_id = graph_source.id().to_string();
386            let source_type = graph_source.type_name().to_string();
387            {
388                let mut graph = core.component_graph.write().await;
389                let mut metadata = std::collections::HashMap::new();
390                metadata.insert("kind".to_string(), source_type);
391                metadata.insert(
392                    "autoStart".to_string(),
393                    graph_source.auto_start().to_string(),
394                );
395                graph.register_source(&source_id, metadata).map_err(|e| {
396                    DrasiError::operation_failed(
397                        "source",
398                        &source_id,
399                        "add",
400                        format!("Failed to register: {e}"),
401                    )
402                })?;
403            }
404            if let Err(e) = core.source_manager.provision_source(graph_source).await {
405                let mut graph = core.component_graph.write().await;
406                let _ = graph.deregister(&source_id);
407                return Err(DrasiError::operation_failed(
408                    "source",
409                    &source_id,
410                    "add",
411                    format!("Failed to provision: {e}"),
412                ));
413            }
414        }
415
416        // Inject pre-built source instances BEFORE initialize.
417        // Queries reference sources by ID, so sources must be in the graph first.
418        for (source, extra_metadata) in self.source_instances {
419            let source_id = source.id().to_string();
420            let source_type = source.type_name().to_string();
421            let auto_start = source.auto_start();
422
423            {
424                let mut graph = core.component_graph.write().await;
425                let mut metadata = std::collections::HashMap::new();
426                metadata.insert("kind".to_string(), source_type);
427                metadata.insert("autoStart".to_string(), auto_start.to_string());
428                metadata.extend(extra_metadata);
429                graph.register_source(&source_id, metadata).map_err(|e| {
430                    DrasiError::operation_failed(
431                        "source",
432                        &source_id,
433                        "add",
434                        format!("Failed to register: {e}"),
435                    )
436                })?;
437            }
438            if let Err(e) = core.source_manager.provision_source(source).await {
439                let mut graph = core.component_graph.write().await;
440                let _ = graph.deregister(&source_id);
441                return Err(DrasiError::operation_failed(
442                    "source",
443                    &source_id,
444                    "add",
445                    format!("Failed to provision: {e}"),
446                ));
447            }
448        }
449
450        // Initialize the server (loads query configurations — sources must already be registered)
451        core.initialize().await?;
452
453        // Inject pre-built reaction instances
454        for (reaction, extra_metadata) in self.reaction_instances {
455            let reaction_id = reaction.id().to_string();
456            let reaction_type = reaction.type_name().to_string();
457            let query_ids = reaction.query_ids();
458
459            // Register in graph first, then provision
460            {
461                let mut graph = core.component_graph.write().await;
462                let mut metadata = std::collections::HashMap::new();
463                metadata.insert("kind".to_string(), reaction_type);
464                metadata.extend(extra_metadata);
465                graph
466                    .register_reaction(&reaction_id, metadata, &query_ids)
467                    .map_err(|e| {
468                        DrasiError::operation_failed(
469                            "reaction",
470                            &reaction_id,
471                            "add",
472                            format!("Failed to register: {e}"),
473                        )
474                    })?;
475            }
476            if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
477                let mut graph = core.component_graph.write().await;
478                let _ = graph.deregister(&reaction_id);
479                return Err(DrasiError::operation_failed(
480                    "reaction",
481                    &reaction_id,
482                    "add",
483                    format!("Failed to provision: {e}"),
484                ));
485            }
486        }
487
488        // Register the identity provider in the component graph (if configured).
489        // This creates an IdentityProvider node with Authenticates edges to all
490        // sources and reactions that receive credentials from it.
491        if core.config.identity_provider.is_some() {
492            let mut graph = core.component_graph.write().await;
493            let component_ids: Vec<String> = graph
494                .list_by_kind(&crate::component_graph::ComponentKind::Source)
495                .into_iter()
496                .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
497                .map(|(id, _)| id)
498                .collect();
499
500            let mut metadata = std::collections::HashMap::new();
501            metadata.insert("kind".to_string(), "identity_provider".to_string());
502            graph
503                .register_identity_provider("identity-provider", metadata, &component_ids)
504                .map_err(|e| {
505                    DrasiError::operation_failed(
506                        "identity_provider",
507                        "identity-provider",
508                        "add",
509                        format!("Failed to register: {e}"),
510                    )
511                })?;
512        }
513
514        Ok(core)
515    }
516}
517
518// ============================================================================
519// Query Builder
520// ============================================================================
521
522/// Fluent builder for query configurations.
523///
524/// Use `Query::cypher()` or `Query::gql()` to get started.
525///
526/// # Example
527///
528/// ```no_run
529/// use drasi_lib::Query;
530///
531/// let query_config = Query::cypher("my-query")
532///     .query("MATCH (n:Person) RETURN n.name, n.age")
533///     .from_source("my-source")
534///     .auto_start(true)
535///     .build();
536/// ```
537pub struct Query {
538    id: String,
539    query: String,
540    query_language: QueryLanguage,
541    sources: Vec<SourceSubscriptionConfig>,
542    middleware: Vec<SourceMiddlewareConfig>,
543    auto_start: bool,
544    joins: Option<Vec<QueryJoinConfig>>,
545    enable_bootstrap: bool,
546    bootstrap_buffer_size: usize,
547    priority_queue_capacity: Option<usize>,
548    dispatch_buffer_capacity: Option<usize>,
549    dispatch_mode: Option<DispatchMode>,
550    storage_backend: Option<crate::indexes::StorageBackendRef>,
551    recovery_policy: Option<crate::recovery::RecoveryPolicy>,
552}
553
554impl Query {
555    /// Create a new Cypher query builder.
556    pub fn cypher(id: impl Into<String>) -> Self {
557        Self {
558            id: id.into(),
559            query: String::new(),
560            query_language: QueryLanguage::Cypher,
561            sources: Vec::new(),
562            middleware: Vec::new(),
563            auto_start: true,
564            joins: None,
565            enable_bootstrap: true,
566            bootstrap_buffer_size: 10000,
567            priority_queue_capacity: None,
568            dispatch_buffer_capacity: None,
569            dispatch_mode: None,
570            storage_backend: None,
571            recovery_policy: None,
572        }
573    }
574
575    /// Create a new GQL (ISO 9074:2024) query builder.
576    pub fn gql(id: impl Into<String>) -> Self {
577        Self {
578            id: id.into(),
579            query: String::new(),
580            query_language: QueryLanguage::GQL,
581            sources: Vec::new(),
582            middleware: Vec::new(),
583            auto_start: true,
584            joins: None,
585            enable_bootstrap: true,
586            bootstrap_buffer_size: 10000,
587            priority_queue_capacity: None,
588            dispatch_buffer_capacity: None,
589            dispatch_mode: None,
590            storage_backend: None,
591            recovery_policy: None,
592        }
593    }
594
595    /// Set the query string.
596    pub fn query(mut self, query: impl Into<String>) -> Self {
597        self.query = query.into();
598        self
599    }
600
601    /// Subscribe to a source.
602    pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
603        self.sources.push(SourceSubscriptionConfig {
604            source_id: source_id.into(),
605            nodes: Vec::new(),
606            relations: Vec::new(),
607            pipeline: Vec::new(),
608        });
609        self
610    }
611
612    /// Subscribe to a source with a middleware pipeline.
613    ///
614    /// The pipeline is a list of middleware names (strings) that will be applied to
615    /// data from this source before it reaches the query.
616    pub fn from_source_with_pipeline(
617        mut self,
618        source_id: impl Into<String>,
619        pipeline: Vec<String>,
620    ) -> Self {
621        self.sources.push(SourceSubscriptionConfig {
622            source_id: source_id.into(),
623            nodes: Vec::new(),
624            relations: Vec::new(),
625            pipeline,
626        });
627        self
628    }
629
630    /// Add middleware to the query.
631    pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
632        self.middleware.push(middleware);
633        self
634    }
635
636    /// Set whether the query should auto-start.
637    pub fn auto_start(mut self, auto_start: bool) -> Self {
638        self.auto_start = auto_start;
639        self
640    }
641
642    /// Set the join configuration.
643    pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
644        self.joins = Some(joins);
645        self
646    }
647
648    /// Enable or disable bootstrap.
649    pub fn enable_bootstrap(mut self, enable: bool) -> Self {
650        self.enable_bootstrap = enable;
651        self
652    }
653
654    /// Set the bootstrap buffer size.
655    pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
656        self.bootstrap_buffer_size = size;
657        self
658    }
659
660    /// Set the priority queue capacity.
661    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
662        self.priority_queue_capacity = Some(capacity);
663        self
664    }
665
666    /// Set the dispatch buffer capacity.
667    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
668        self.dispatch_buffer_capacity = Some(capacity);
669        self
670    }
671
672    /// Set the dispatch mode.
673    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
674        self.dispatch_mode = Some(mode);
675        self
676    }
677
678    /// Set the storage backend reference.
679    pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
680        self.storage_backend = Some(backend);
681        self
682    }
683
684    /// Set the recovery policy. Applies only to queries with a persistent
685    /// storage backend. See [`RecoveryPolicy`](crate::RecoveryPolicy).
686    pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
687        self.recovery_policy = Some(policy);
688        self
689    }
690
691    /// Build the query configuration.
692    pub fn build(self) -> QueryConfig {
693        QueryConfig {
694            id: self.id,
695            query: self.query,
696            query_language: self.query_language,
697            sources: self.sources,
698            middleware: self.middleware,
699            auto_start: self.auto_start,
700            joins: self.joins,
701            enable_bootstrap: self.enable_bootstrap,
702            bootstrap_buffer_size: self.bootstrap_buffer_size,
703            priority_queue_capacity: self.priority_queue_capacity,
704            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
705            dispatch_mode: self.dispatch_mode,
706            storage_backend: self.storage_backend,
707            recovery_policy: self.recovery_policy,
708        }
709    }
710}
711
712// ============================================================================
713// Tests
714// ============================================================================
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719    use crate::DrasiLib;
720
721    // ==========================================================================
722    // Query Builder Tests
723    // ==========================================================================
724
725    #[test]
726    fn test_query_builder_cypher() {
727        let config = Query::cypher("test-query")
728            .query("MATCH (n) RETURN n")
729            .from_source("source1")
730            .auto_start(false)
731            .build();
732
733        assert_eq!(config.id, "test-query");
734        assert_eq!(config.query, "MATCH (n) RETURN n");
735        assert_eq!(config.query_language, QueryLanguage::Cypher);
736        assert!(!config.auto_start);
737        assert_eq!(config.sources.len(), 1);
738        assert_eq!(config.sources[0].source_id, "source1");
739    }
740
741    #[test]
742    fn test_query_builder_gql() {
743        let config = Query::gql("test-query")
744            .query("MATCH (n:Person) RETURN n.name")
745            .from_source("source1")
746            .build();
747
748        assert_eq!(config.query_language, QueryLanguage::GQL);
749    }
750
751    #[test]
752    fn test_query_builder_multiple_sources() {
753        let config = Query::cypher("test-query")
754            .query("MATCH (n) RETURN n")
755            .from_source("source1")
756            .from_source("source2")
757            .build();
758
759        assert_eq!(config.sources.len(), 2);
760    }
761
762    #[tokio::test]
763    async fn test_drasi_lib_builder_empty() {
764        let core = DrasiLibBuilder::new().build().await.unwrap();
765
766        assert!(!core.is_running().await);
767    }
768
769    #[tokio::test]
770    async fn test_drasi_lib_builder_with_id() {
771        let core = DrasiLibBuilder::new()
772            .with_id("test-server")
773            .build()
774            .await
775            .unwrap();
776
777        assert_eq!(core.get_config().id, "test-server");
778    }
779
780    #[tokio::test]
781    async fn test_drasi_lib_builder_with_query_no_source() {
782        // Test builder with query configuration that has no source subscriptions
783        // In the instance-based approach, sources are added after build()
784        let core = DrasiLibBuilder::new()
785            .with_id("test-server")
786            .with_query(
787                Query::cypher("query1")
788                    .query("MATCH (n) RETURN n")
789                    // No from_source() call - query has no source subscriptions
790                    .auto_start(false)
791                    .build(),
792            )
793            .build()
794            .await
795            .unwrap();
796
797        let queries = core.list_queries().await.unwrap();
798        assert_eq!(queries.len(), 1);
799    }
800
801    // ==========================================================================
802    // DrasiLib Builder Integration Tests (from builder_tests.rs)
803    // ==========================================================================
804
805    #[tokio::test]
806    async fn test_builder_creates_initialized_server() {
807        let core = DrasiLib::builder().with_id("builder-test").build().await;
808
809        assert!(core.is_ok(), "Builder should create initialized server");
810        let core = core.unwrap();
811        assert!(
812            core.state_guard.is_initialized(),
813            "Server should be initialized"
814        );
815    }
816
817    #[tokio::test]
818    async fn test_builder_with_query() {
819        // In the instance-based approach, sources and reactions are added as instances
820        // after the builder creates the core. Here we just test query config addition.
821        // Source must be registered before a query can reference it
822        let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
823        let core = DrasiLib::builder()
824            .with_id("complex-server")
825            .with_source(source)
826            .with_query(
827                Query::cypher("query1")
828                    .query("MATCH (n) RETURN n")
829                    .from_source("source1")
830                    .build(),
831            )
832            .build()
833            .await;
834
835        assert!(core.is_ok(), "Builder with query should succeed");
836        let core = core.unwrap();
837        assert!(core.state_guard.is_initialized());
838        assert_eq!(core.config.queries.len(), 1);
839    }
840
841    // ==========================================================================
842    // DrasiLibBuilder Unit Tests
843    // ==========================================================================
844
845    #[test]
846    fn test_builder_with_id_sets_id() {
847        let builder = DrasiLibBuilder::new().with_id("my-server");
848        assert_eq!(builder.server_id, Some("my-server".to_string()));
849    }
850
851    #[test]
852    fn test_builder_with_id_accepts_string() {
853        let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
854        assert_eq!(builder.server_id, Some("owned-id".to_string()));
855    }
856
857    #[test]
858    fn test_builder_with_priority_queue_capacity() {
859        let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
860        assert_eq!(builder.priority_queue_capacity, Some(50000));
861    }
862
863    #[test]
864    fn test_builder_with_dispatch_buffer_capacity() {
865        let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
866        assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
867    }
868
869    #[test]
870    fn test_builder_with_query_adds_to_list() {
871        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
872        let builder = DrasiLibBuilder::new().with_query(q);
873        assert_eq!(builder.query_configs.len(), 1);
874        assert_eq!(builder.query_configs[0].id, "q1");
875    }
876
877    #[test]
878    fn test_builder_with_multiple_queries() {
879        let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
880        let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
881        let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
882        assert_eq!(builder.query_configs.len(), 2);
883        assert_eq!(builder.query_configs[0].id, "q1");
884        assert_eq!(builder.query_configs[1].id, "q2");
885    }
886
887    #[test]
888    fn test_builder_add_storage_backend() {
889        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
890
891        let backend = StorageBackendConfig {
892            id: "mem1".to_string(),
893            spec: StorageBackendSpec::Memory {
894                enable_archive: false,
895            },
896        };
897        let builder = DrasiLibBuilder::new().add_storage_backend(backend);
898        assert_eq!(builder.storage_backends.len(), 1);
899        assert_eq!(builder.storage_backends[0].id, "mem1");
900    }
901
902    #[test]
903    fn test_builder_add_multiple_storage_backends() {
904        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
905
906        let b1 = StorageBackendConfig {
907            id: "mem1".to_string(),
908            spec: StorageBackendSpec::Memory {
909                enable_archive: false,
910            },
911        };
912        let b2 = StorageBackendConfig {
913            id: "mem2".to_string(),
914            spec: StorageBackendSpec::Memory {
915                enable_archive: true,
916            },
917        };
918        let builder = DrasiLibBuilder::new()
919            .add_storage_backend(b1)
920            .add_storage_backend(b2);
921        assert_eq!(builder.storage_backends.len(), 2);
922        assert_eq!(builder.storage_backends[0].id, "mem1");
923        assert_eq!(builder.storage_backends[1].id, "mem2");
924    }
925
926    #[test]
927    fn test_builder_default_values() {
928        let builder = DrasiLibBuilder::new();
929        assert_eq!(builder.server_id, None);
930        assert_eq!(builder.priority_queue_capacity, None);
931        assert_eq!(builder.dispatch_buffer_capacity, None);
932        assert!(builder.storage_backends.is_empty());
933        assert!(builder.query_configs.is_empty());
934        assert!(builder.source_instances.is_empty());
935        assert!(builder.reaction_instances.is_empty());
936        assert!(builder.index_provider.is_none());
937        assert!(builder.state_store_provider.is_none());
938    }
939
940    #[test]
941    fn test_builder_fluent_chaining() {
942        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
943
944        let backend = StorageBackendConfig {
945            id: "mem".to_string(),
946            spec: StorageBackendSpec::Memory {
947                enable_archive: false,
948            },
949        };
950        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
951
952        let builder = DrasiLibBuilder::new()
953            .with_id("chained")
954            .with_priority_queue_capacity(20000)
955            .with_dispatch_buffer_capacity(3000)
956            .add_storage_backend(backend)
957            .with_query(q);
958
959        assert_eq!(builder.server_id, Some("chained".to_string()));
960        assert_eq!(builder.priority_queue_capacity, Some(20000));
961        assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
962        assert_eq!(builder.storage_backends.len(), 1);
963        assert_eq!(builder.query_configs.len(), 1);
964    }
965
966    #[tokio::test]
967    async fn test_builder_default_id_when_none_set() {
968        let core = DrasiLibBuilder::new().build().await.unwrap();
969        assert_eq!(core.get_config().id, "drasi-lib");
970    }
971
972    #[tokio::test]
973    async fn test_builder_with_storage_backend_builds_ok() {
974        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
975
976        let backend = StorageBackendConfig {
977            id: "test-mem".to_string(),
978            spec: StorageBackendSpec::Memory {
979                enable_archive: false,
980            },
981        };
982        let core = DrasiLibBuilder::new()
983            .add_storage_backend(backend)
984            .build()
985            .await;
986        assert!(core.is_ok(), "Builder with storage backend should succeed");
987    }
988
989    // ==========================================================================
990    // Query Builder Unit Tests
991    // ==========================================================================
992
993    #[test]
994    fn test_query_cypher_sets_id_and_language() {
995        let q = Query::cypher("cypher-q");
996        assert_eq!(q.id, "cypher-q");
997        assert_eq!(q.query_language, QueryLanguage::Cypher);
998    }
999
1000    #[test]
1001    fn test_query_gql_sets_id_and_language() {
1002        let q = Query::gql("gql-q");
1003        assert_eq!(q.id, "gql-q");
1004        assert_eq!(q.query_language, QueryLanguage::GQL);
1005    }
1006
1007    #[test]
1008    fn test_query_from_source_adds_source() {
1009        let q = Query::cypher("q").from_source("src1");
1010        assert_eq!(q.sources.len(), 1);
1011        assert_eq!(q.sources[0].source_id, "src1");
1012    }
1013
1014    #[test]
1015    fn test_query_from_source_chaining() {
1016        let q = Query::cypher("q")
1017            .from_source("src1")
1018            .from_source("src2")
1019            .from_source("src3");
1020        assert_eq!(q.sources.len(), 3);
1021        assert_eq!(q.sources[0].source_id, "src1");
1022        assert_eq!(q.sources[1].source_id, "src2");
1023        assert_eq!(q.sources[2].source_id, "src3");
1024    }
1025
1026    #[test]
1027    fn test_query_auto_start_default_true() {
1028        let q = Query::cypher("q");
1029        assert!(q.auto_start);
1030    }
1031
1032    #[test]
1033    fn test_query_auto_start_false() {
1034        let q = Query::cypher("q").auto_start(false);
1035        assert!(!q.auto_start);
1036    }
1037
1038    #[test]
1039    fn test_query_enable_bootstrap_default_true() {
1040        let q = Query::cypher("q");
1041        assert!(q.enable_bootstrap);
1042    }
1043
1044    #[test]
1045    fn test_query_enable_bootstrap_false() {
1046        let q = Query::cypher("q").enable_bootstrap(false);
1047        assert!(!q.enable_bootstrap);
1048    }
1049
1050    #[test]
1051    fn test_query_bootstrap_buffer_size_default() {
1052        let q = Query::cypher("q");
1053        assert_eq!(q.bootstrap_buffer_size, 10000);
1054    }
1055
1056    #[test]
1057    fn test_query_with_bootstrap_buffer_size() {
1058        let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1059        assert_eq!(q.bootstrap_buffer_size, 5000);
1060    }
1061
1062    #[test]
1063    fn test_query_with_dispatch_mode_broadcast() {
1064        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1065        assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1066    }
1067
1068    #[test]
1069    fn test_query_with_dispatch_mode_channel() {
1070        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1071        assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1072    }
1073
1074    #[test]
1075    fn test_query_dispatch_mode_default_none() {
1076        let q = Query::cypher("q");
1077        assert_eq!(q.dispatch_mode, None);
1078    }
1079
1080    #[test]
1081    fn test_query_with_priority_queue_capacity() {
1082        let q = Query::cypher("q").with_priority_queue_capacity(50000);
1083        assert_eq!(q.priority_queue_capacity, Some(50000));
1084    }
1085
1086    #[test]
1087    fn test_query_priority_queue_capacity_default_none() {
1088        let q = Query::cypher("q");
1089        assert_eq!(q.priority_queue_capacity, None);
1090    }
1091
1092    #[test]
1093    fn test_query_with_dispatch_buffer_capacity() {
1094        let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1095        assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1096    }
1097
1098    #[test]
1099    fn test_query_dispatch_buffer_capacity_default_none() {
1100        let q = Query::cypher("q");
1101        assert_eq!(q.dispatch_buffer_capacity, None);
1102    }
1103
1104    #[test]
1105    fn test_query_build_propagates_all_fields() {
1106        let config = Query::cypher("full-query")
1107            .query("MATCH (n:Person) RETURN n.name")
1108            .from_source("source-a")
1109            .from_source("source-b")
1110            .auto_start(false)
1111            .enable_bootstrap(false)
1112            .with_bootstrap_buffer_size(5000)
1113            .with_priority_queue_capacity(50000)
1114            .with_dispatch_buffer_capacity(2500)
1115            .with_dispatch_mode(DispatchMode::Broadcast)
1116            .build();
1117
1118        assert_eq!(config.id, "full-query");
1119        assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1120        assert_eq!(config.query_language, QueryLanguage::Cypher);
1121        assert_eq!(config.sources.len(), 2);
1122        assert_eq!(config.sources[0].source_id, "source-a");
1123        assert_eq!(config.sources[1].source_id, "source-b");
1124        assert!(!config.auto_start);
1125        assert!(!config.enable_bootstrap);
1126        assert_eq!(config.bootstrap_buffer_size, 5000);
1127        assert_eq!(config.priority_queue_capacity, Some(50000));
1128        assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1129        assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1130        assert!(config.joins.is_none());
1131        assert!(config.middleware.is_empty());
1132        assert!(config.storage_backend.is_none());
1133    }
1134
1135    #[test]
1136    fn test_query_build_gql_propagates_language() {
1137        let config = Query::gql("gql-full")
1138            .query("MATCH (n) RETURN n")
1139            .from_source("src")
1140            .build();
1141
1142        assert_eq!(config.id, "gql-full");
1143        assert_eq!(config.query_language, QueryLanguage::GQL);
1144        assert_eq!(config.query, "MATCH (n) RETURN n");
1145        assert_eq!(config.sources.len(), 1);
1146        // Verify defaults are preserved through build
1147        assert!(config.auto_start);
1148        assert!(config.enable_bootstrap);
1149        assert_eq!(config.bootstrap_buffer_size, 10000);
1150    }
1151
1152    #[test]
1153    fn test_query_build_defaults() {
1154        let config = Query::cypher("defaults-only").build();
1155
1156        assert_eq!(config.id, "defaults-only");
1157        assert_eq!(config.query, "");
1158        assert_eq!(config.query_language, QueryLanguage::Cypher);
1159        assert!(config.sources.is_empty());
1160        assert!(config.middleware.is_empty());
1161        assert!(config.auto_start);
1162        assert!(config.joins.is_none());
1163        assert!(config.enable_bootstrap);
1164        assert_eq!(config.bootstrap_buffer_size, 10000);
1165        assert_eq!(config.priority_queue_capacity, None);
1166        assert_eq!(config.dispatch_buffer_capacity, None);
1167        assert_eq!(config.dispatch_mode, None);
1168        assert!(config.storage_backend.is_none());
1169    }
1170}