Skip to main content

drasi_lib/
builder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//!     .with_id("my-server")
47//!     // .with_source(my_source)      // Ownership transferred
48//!     // .with_reaction(my_reaction)  // Ownership transferred
49//!     .with_query(
50//!         Query::cypher("my-query")
51//!             .query("MATCH (n:Person) RETURN n")
52//!             .from_source("events")
53//!             .build()
54//!     )
55//!     .build()
56//!     .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67    DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::sources::Source as SourceTrait;
76use crate::state_store::StateStoreProvider;
77use drasi_core::models::SourceMiddlewareConfig;
78
79// ============================================================================
80// DrasiLibBuilder
81// ============================================================================
82
83/// Fluent builder for creating DrasiLib instances.
84///
85/// Use `DrasiLib::builder()` to get started.
86///
87/// # Plugin Architecture
88///
89/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
90/// reactions are created externally as fully-configured instances implementing
91/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
92///
93/// # Example
94///
95/// ```no_run
96/// use drasi_lib::{DrasiLib, Query};
97///
98/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
99/// // Source and reaction instances are created externally by plugins
100/// // Ownership is transferred to DrasiLib when added
101/// // let my_source = my_source_plugin::create(...);
102/// // let my_reaction = my_reaction_plugin::create(...);
103///
104/// let core = DrasiLib::builder()
105///     .with_id("my-server")
106///     // .with_source(my_source)      // Ownership transferred
107///     // .with_reaction(my_reaction)  // Ownership transferred
108///     .with_query(
109///         Query::cypher("my-query")
110///             .query("MATCH (n) RETURN n")
111///             .from_source("my-source")
112///             .build()
113///     )
114///     .build()
115///     .await?;
116/// # Ok(())
117/// # }
118/// ```
119pub struct DrasiLibBuilder {
120    server_id: Option<String>,
121    priority_queue_capacity: Option<usize>,
122    dispatch_buffer_capacity: Option<usize>,
123    storage_backends: Vec<StorageBackendConfig>,
124    query_configs: Vec<QueryConfig>,
125    source_instances: Vec<(
126        Box<dyn SourceTrait>,
127        std::collections::HashMap<String, String>,
128    )>,
129    reaction_instances: Vec<(
130        Box<dyn ReactionTrait>,
131        std::collections::HashMap<String, String>,
132    )>,
133    /// Bootstrap provider metadata to register in the component graph.
134    /// Each entry: (source_id, kind, properties).
135    bootstrap_metadata: Vec<(
136        String,
137        String,
138        std::collections::HashMap<String, serde_json::Value>,
139    )>,
140    index_provider: Option<Arc<dyn IndexBackendPlugin>>,
141    state_store_provider: Option<Arc<dyn StateStoreProvider>>,
142    identity_provider: Option<Arc<dyn IdentityProvider>>,
143    default_recovery_policy: Option<crate::recovery::RecoveryPolicy>,
144}
145
146impl Default for DrasiLibBuilder {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl DrasiLibBuilder {
153    /// Create a new builder with default values.
154    pub fn new() -> Self {
155        Self {
156            server_id: None,
157            priority_queue_capacity: None,
158            dispatch_buffer_capacity: None,
159            storage_backends: Vec::new(),
160            query_configs: Vec::new(),
161            source_instances: Vec::new(),
162            reaction_instances: Vec::new(),
163            bootstrap_metadata: Vec::new(),
164            index_provider: None,
165            state_store_provider: None,
166            identity_provider: None,
167            default_recovery_policy: None,
168        }
169    }
170
171    /// Set the server ID.
172    pub fn with_id(mut self, id: impl Into<String>) -> Self {
173        self.server_id = Some(id.into());
174        self
175    }
176
177    /// Set the default priority queue capacity for components.
178    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
179        self.priority_queue_capacity = Some(capacity);
180        self
181    }
182
183    /// Set the default dispatch buffer capacity for components.
184    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
185        self.dispatch_buffer_capacity = Some(capacity);
186        self
187    }
188
189    /// Add a storage backend configuration.
190    pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
191        self.storage_backends.push(config);
192        self
193    }
194
195    /// Set the index backend provider for persistent storage.
196    ///
197    /// When using RocksDB or Redis/Garnet storage backends, you must provide
198    /// an index provider that implements `IndexBackendPlugin`. The provider
199    /// is responsible for creating the actual index instances.
200    ///
201    /// If no index provider is set, only in-memory storage backends can be used.
202    /// Attempting to use RocksDB or Redis backends without a provider will result
203    /// in an error.
204    ///
205    /// # Example
206    /// ```ignore
207    /// use drasi_index_rocksdb::RocksDbIndexProvider;
208    /// use std::sync::Arc;
209    ///
210    /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
211    /// let core = DrasiLib::builder()
212    ///     .with_index_provider(Arc::new(provider))
213    ///     .build()
214    ///     .await?;
215    /// ```
216    pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
217        self.index_provider = Some(provider);
218        self
219    }
220
221    /// Set the state store provider for plugin state persistence.
222    ///
223    /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
224    /// to store and retrieve runtime state that can persist across runs of DrasiLib.
225    ///
226    /// If no state store provider is set, the default in-memory provider will be used.
227    /// The in-memory provider does not persist state across restarts.
228    ///
229    /// # Example
230    /// ```ignore
231    /// use drasi_state_store_json::JsonStateStoreProvider;
232    /// use std::sync::Arc;
233    ///
234    /// let state_store = JsonStateStoreProvider::new("/data/state");
235    /// let core = DrasiLib::builder()
236    ///     .with_state_store_provider(Arc::new(state_store))
237    ///     .build()
238    ///     .await?;
239    /// ```
240    pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
241        self.state_store_provider = Some(provider);
242        self
243    }
244
245    /// Set the identity provider for credential injection.
246    ///
247    /// Identity providers supply authentication credentials (passwords, tokens,
248    /// certificates) to sources and reactions that need them for connecting to
249    /// external systems.
250    ///
251    /// If no identity provider is set, sources and reactions will receive `None`
252    /// for `context.identity_provider`.
253    ///
254    /// # Example
255    /// ```ignore
256    /// use drasi_identity_azure::AzureIdentityProvider;
257    /// use std::sync::Arc;
258    ///
259    /// let provider = AzureIdentityProvider::with_default_credentials("user@tenant.onmicrosoft.com")?;
260    /// let core = DrasiLib::builder()
261    ///     .with_identity_provider(Arc::new(provider))
262    ///     .build()
263    ///     .await?;
264    /// ```
265    pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
266        self.identity_provider = Some(provider);
267        self
268    }
269
270    /// Set the global default recovery policy for all queries.
271    ///
272    /// Per-query `QueryConfig::recovery_policy` overrides this.
273    /// If neither is set, defaults to [`RecoveryPolicy::Strict`](crate::RecoveryPolicy::Strict).
274    pub fn with_default_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
275        self.default_recovery_policy = Some(policy);
276        self
277    }
278
279    /// Add a source instance, taking ownership.
280    ///
281    /// Source instances are created externally by plugins with their own typed configurations.
282    /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
283    ///
284    /// # Example
285    /// ```ignore
286    /// let source = MySource::new("my-source", config)?;
287    /// let core = DrasiLib::builder()
288    ///     .with_source(source)  // Ownership transferred
289    ///     .build()
290    ///     .await?;
291    /// ```
292    pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
293        self.source_instances
294            .push((Box::new(source), std::collections::HashMap::new()));
295        self
296    }
297
298    /// Add a source instance with additional component metadata.
299    ///
300    /// Like [`with_source`](Self::with_source) but merges `extra_metadata`
301    /// (e.g. `pluginId`) into the component graph node.
302    pub fn with_source_metadata(
303        mut self,
304        source: impl SourceTrait + 'static,
305        extra_metadata: std::collections::HashMap<String, String>,
306    ) -> Self {
307        self.source_instances
308            .push((Box::new(source), extra_metadata));
309        self
310    }
311
312    /// Add a query configuration.
313    pub fn with_query(mut self, config: QueryConfig) -> Self {
314        self.query_configs.push(config);
315        self
316    }
317
318    /// Register bootstrap provider metadata for a source.
319    ///
320    /// This records the bootstrap provider's kind and configuration properties
321    /// so they can be persisted in the component graph and included in
322    /// `snapshot_configuration()` output.
323    ///
324    /// # Arguments
325    /// * `source_id` - The ID of the source this bootstrap provider is attached to
326    /// * `kind` - Bootstrap provider kind (e.g., "http", "postgres", "scriptfile")
327    /// * `properties` - Configuration properties for persistence (raw config with
328    ///   ConfigValue envelopes intact for lossless roundtripping)
329    ///
330    /// # Example
331    /// ```ignore
332    /// let builder = DrasiLib::builder()
333    ///     .with_source(my_source)
334    ///     .with_bootstrap_for_source("my-source", "http", config_properties);
335    /// ```
336    pub fn with_bootstrap_for_source(
337        mut self,
338        source_id: impl Into<String>,
339        kind: impl Into<String>,
340        properties: std::collections::HashMap<String, serde_json::Value>,
341    ) -> Self {
342        self.bootstrap_metadata
343            .push((source_id.into(), kind.into(), properties));
344        self
345    }
346
347    /// Add a reaction instance, taking ownership.
348    ///
349    /// Reaction instances are created externally by plugins with their own typed configurations.
350    /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
351    ///
352    /// # Example
353    /// ```ignore
354    /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
355    /// let core = DrasiLib::builder()
356    ///     .with_reaction(reaction)  // Ownership transferred
357    ///     .build()
358    ///     .await?;
359    /// ```
360    pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
361        self.reaction_instances
362            .push((Box::new(reaction), std::collections::HashMap::new()));
363        self
364    }
365
366    /// Add a reaction instance with additional component metadata.
367    ///
368    /// Like [`with_reaction`](Self::with_reaction) but merges `extra_metadata`
369    /// (e.g. `pluginId`) into the component graph node.
370    pub fn with_reaction_metadata(
371        mut self,
372        reaction: impl ReactionTrait + 'static,
373        extra_metadata: std::collections::HashMap<String, String>,
374    ) -> Self {
375        self.reaction_instances
376            .push((Box::new(reaction), extra_metadata));
377        self
378    }
379
380    /// Build the DrasiLib instance.
381    ///
382    /// This validates the configuration, creates all components, and initializes the server.
383    /// After building, you can call `start()` to begin processing.
384    pub async fn build(self) -> Result<DrasiLib> {
385        // Build the configuration
386        let config = DrasiLibConfig {
387            id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
388            priority_queue_capacity: self.priority_queue_capacity,
389            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
390            storage_backends: self.storage_backends,
391            queries: self.query_configs.clone(),
392        };
393
394        // Validate the configuration
395        config
396            .validate()
397            .map_err(|e| DrasiError::validation(e.to_string()))?;
398
399        // Create runtime config and server with optional index and state store providers
400        let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
401            config,
402            self.index_provider,
403            self.state_store_provider,
404            self.identity_provider,
405            self.default_recovery_policy,
406        ));
407        let mut core = DrasiLib::new(runtime_config);
408
409        // Inject state store before provisioning sources (they need it for initialization)
410        let state_store = core.config.state_store_provider.clone();
411        core.source_manager
412            .inject_state_store(state_store.clone())
413            .await;
414        core.reaction_manager.inject_state_store(state_store).await;
415
416        // Register the component graph source BEFORE initialize (which loads query config).
417        // Queries reference sources, so sources must exist in the graph first.
418        {
419            use crate::sources::component_graph_source::ComponentGraphSource;
420            let graph_source = ComponentGraphSource::new(
421                core.component_event_broadcast_tx.clone(),
422                core.config.id.clone(),
423                core.component_graph.clone(),
424            )
425            .map_err(|e| {
426                DrasiError::operation_failed(
427                    "source",
428                    "component-graph",
429                    "add",
430                    format!("Failed to create: {e}"),
431                )
432            })?;
433
434            let source_id = graph_source.id().to_string();
435            let source_type = graph_source.type_name().to_string();
436            {
437                let mut graph = core.component_graph.write().await;
438                let mut metadata = std::collections::HashMap::new();
439                metadata.insert("kind".to_string(), source_type);
440                metadata.insert(
441                    "autoStart".to_string(),
442                    graph_source.auto_start().to_string(),
443                );
444                graph.register_source(&source_id, metadata).map_err(|e| {
445                    DrasiError::operation_failed(
446                        "source",
447                        &source_id,
448                        "add",
449                        format!("Failed to register: {e}"),
450                    )
451                })?;
452            }
453            if let Err(e) = core.source_manager.provision_source(graph_source).await {
454                let mut graph = core.component_graph.write().await;
455                let _ = graph.deregister(&source_id);
456                return Err(DrasiError::operation_failed(
457                    "source",
458                    &source_id,
459                    "add",
460                    format!("Failed to provision: {e}"),
461                ));
462            }
463        }
464
465        // Inject pre-built source instances BEFORE initialize.
466        // Queries reference sources by ID, so sources must be in the graph first.
467        for (source, extra_metadata) in self.source_instances {
468            let source_id = source.id().to_string();
469            let source_type = source.type_name().to_string();
470            let auto_start = source.auto_start();
471
472            {
473                let mut graph = core.component_graph.write().await;
474                let mut metadata = std::collections::HashMap::new();
475                metadata.insert("kind".to_string(), source_type);
476                metadata.insert("autoStart".to_string(), auto_start.to_string());
477                metadata.extend(extra_metadata);
478                graph.register_source(&source_id, metadata).map_err(|e| {
479                    DrasiError::operation_failed(
480                        "source",
481                        &source_id,
482                        "add",
483                        format!("Failed to register: {e}"),
484                    )
485                })?;
486            }
487            if let Err(e) = core.source_manager.provision_source(source).await {
488                let mut graph = core.component_graph.write().await;
489                let _ = graph.deregister(&source_id);
490                return Err(DrasiError::operation_failed(
491                    "source",
492                    &source_id,
493                    "add",
494                    format!("Failed to provision: {e}"),
495                ));
496            }
497        }
498
499        // Register bootstrap provider metadata in the component graph.
500        // This enables snapshot_configuration() to persist and reconstruct
501        // bootstrap provider configurations.
502        for (source_id, kind, properties) in self.bootstrap_metadata {
503            let bp_id = format!("{source_id}-bootstrap");
504            let mut metadata = std::collections::HashMap::new();
505            metadata.insert("kind".to_string(), kind);
506            for (key, value) in properties {
507                metadata.insert(key, serde_json::to_string(&value).unwrap_or_default());
508            }
509            let mut graph = core.component_graph.write().await;
510            if let Err(e) =
511                graph.register_bootstrap_provider(&bp_id, metadata, &[source_id.clone()])
512            {
513                log::warn!(
514                    "Failed to register bootstrap provider metadata for source '{source_id}': {e}"
515                );
516            }
517        }
518
519        // Initialize the server (loads query configurations — sources must already be registered)
520        core.initialize().await?;
521
522        // Inject pre-built reaction instances
523        for (reaction, extra_metadata) in self.reaction_instances {
524            let reaction_id = reaction.id().to_string();
525            let reaction_type = reaction.type_name().to_string();
526            let query_ids = reaction.query_ids();
527
528            // Register in graph first, then provision
529            {
530                let mut graph = core.component_graph.write().await;
531                let mut metadata = std::collections::HashMap::new();
532                metadata.insert("kind".to_string(), reaction_type);
533                metadata.extend(extra_metadata);
534                graph
535                    .register_reaction(&reaction_id, metadata, &query_ids)
536                    .map_err(|e| {
537                        DrasiError::operation_failed(
538                            "reaction",
539                            &reaction_id,
540                            "add",
541                            format!("Failed to register: {e}"),
542                        )
543                    })?;
544            }
545            if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
546                let mut graph = core.component_graph.write().await;
547                let _ = graph.deregister(&reaction_id);
548                return Err(DrasiError::operation_failed(
549                    "reaction",
550                    &reaction_id,
551                    "add",
552                    format!("Failed to provision: {e}"),
553                ));
554            }
555        }
556
557        // Register the identity provider in the component graph (if configured).
558        // This creates an IdentityProvider node with Authenticates edges to all
559        // sources and reactions that receive credentials from it.
560        if core.config.identity_provider.is_some() {
561            let mut graph = core.component_graph.write().await;
562            let component_ids: Vec<String> = graph
563                .list_by_kind(&crate::component_graph::ComponentKind::Source)
564                .into_iter()
565                .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
566                .map(|(id, _)| id)
567                .collect();
568
569            let mut metadata = std::collections::HashMap::new();
570            metadata.insert("kind".to_string(), "identity_provider".to_string());
571            graph
572                .register_identity_provider("identity-provider", metadata, &component_ids)
573                .map_err(|e| {
574                    DrasiError::operation_failed(
575                        "identity_provider",
576                        "identity-provider",
577                        "add",
578                        format!("Failed to register: {e}"),
579                    )
580                })?;
581        }
582
583        Ok(core)
584    }
585}
586
587// ============================================================================
588// Query Builder
589// ============================================================================
590
591/// Fluent builder for query configurations.
592///
593/// Use `Query::cypher()` or `Query::gql()` to get started.
594///
595/// # Example
596///
597/// ```no_run
598/// use drasi_lib::Query;
599///
600/// let query_config = Query::cypher("my-query")
601///     .query("MATCH (n:Person) RETURN n.name, n.age")
602///     .from_source("my-source")
603///     .auto_start(true)
604///     .build();
605/// ```
606pub struct Query {
607    id: String,
608    query: String,
609    query_language: QueryLanguage,
610    sources: Vec<SourceSubscriptionConfig>,
611    middleware: Vec<SourceMiddlewareConfig>,
612    auto_start: bool,
613    joins: Option<Vec<QueryJoinConfig>>,
614    enable_bootstrap: bool,
615    bootstrap_buffer_size: usize,
616    priority_queue_capacity: Option<usize>,
617    dispatch_buffer_capacity: Option<usize>,
618    dispatch_mode: Option<DispatchMode>,
619    storage_backend: Option<crate::indexes::StorageBackendRef>,
620    recovery_policy: Option<crate::recovery::RecoveryPolicy>,
621    outbox_capacity: usize,
622    bootstrap_timeout_secs: u64,
623}
624
625impl Query {
626    /// Create a new Cypher query builder.
627    pub fn cypher(id: impl Into<String>) -> Self {
628        Self {
629            id: id.into(),
630            query: String::new(),
631            query_language: QueryLanguage::Cypher,
632            sources: Vec::new(),
633            middleware: Vec::new(),
634            auto_start: true,
635            joins: None,
636            enable_bootstrap: true,
637            bootstrap_buffer_size: 10000,
638            priority_queue_capacity: None,
639            dispatch_buffer_capacity: None,
640            dispatch_mode: None,
641            storage_backend: None,
642            recovery_policy: None,
643            outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
644            bootstrap_timeout_secs: 300,
645        }
646    }
647
648    /// Create a new GQL (ISO 9074:2024) query builder.
649    pub fn gql(id: impl Into<String>) -> Self {
650        Self {
651            id: id.into(),
652            query: String::new(),
653            query_language: QueryLanguage::GQL,
654            sources: Vec::new(),
655            middleware: Vec::new(),
656            auto_start: true,
657            joins: None,
658            enable_bootstrap: true,
659            bootstrap_buffer_size: 10000,
660            priority_queue_capacity: None,
661            dispatch_buffer_capacity: None,
662            dispatch_mode: None,
663            storage_backend: None,
664            recovery_policy: None,
665            outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
666            bootstrap_timeout_secs: 300,
667        }
668    }
669
670    /// Set the query string.
671    pub fn query(mut self, query: impl Into<String>) -> Self {
672        self.query = query.into();
673        self
674    }
675
676    /// Subscribe to a source.
677    pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
678        self.sources.push(SourceSubscriptionConfig {
679            source_id: source_id.into(),
680            nodes: Vec::new(),
681            relations: Vec::new(),
682            pipeline: Vec::new(),
683        });
684        self
685    }
686
687    /// Subscribe to a source with a middleware pipeline.
688    ///
689    /// The pipeline is a list of middleware names (strings) that will be applied to
690    /// data from this source before it reaches the query.
691    pub fn from_source_with_pipeline(
692        mut self,
693        source_id: impl Into<String>,
694        pipeline: Vec<String>,
695    ) -> Self {
696        self.sources.push(SourceSubscriptionConfig {
697            source_id: source_id.into(),
698            nodes: Vec::new(),
699            relations: Vec::new(),
700            pipeline,
701        });
702        self
703    }
704
705    /// Add middleware to the query.
706    pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
707        self.middleware.push(middleware);
708        self
709    }
710
711    /// Set whether the query should auto-start.
712    pub fn auto_start(mut self, auto_start: bool) -> Self {
713        self.auto_start = auto_start;
714        self
715    }
716
717    /// Set the join configuration.
718    pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
719        self.joins = Some(joins);
720        self
721    }
722
723    /// Enable or disable bootstrap.
724    pub fn enable_bootstrap(mut self, enable: bool) -> Self {
725        self.enable_bootstrap = enable;
726        self
727    }
728
729    /// Set the bootstrap buffer size.
730    pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
731        self.bootstrap_buffer_size = size;
732        self
733    }
734
735    /// Set the priority queue capacity.
736    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
737        self.priority_queue_capacity = Some(capacity);
738        self
739    }
740
741    /// Set the dispatch buffer capacity.
742    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
743        self.dispatch_buffer_capacity = Some(capacity);
744        self
745    }
746
747    /// Set the dispatch mode.
748    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
749        self.dispatch_mode = Some(mode);
750        self
751    }
752
753    /// Set the storage backend reference.
754    pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
755        self.storage_backend = Some(backend);
756        self
757    }
758
759    /// Set the recovery policy. Applies only to queries with a persistent
760    /// storage backend. See [`RecoveryPolicy`](crate::RecoveryPolicy).
761    pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
762        self.recovery_policy = Some(policy);
763        self
764    }
765
766    /// Set the outbox capacity (number of recent QueryResult emissions retained).
767    /// Default: 1000.
768    pub fn with_outbox_capacity(mut self, capacity: usize) -> Self {
769        self.outbox_capacity = capacity;
770        self
771    }
772
773    /// Set the bootstrap timeout in seconds.
774    /// This controls how long `fetch_snapshot` / `fetch_outbox` will wait for
775    /// the query to finish bootstrapping before returning `FetchError::TimedOut`.
776    /// Default: 300 (5 minutes).
777    pub fn with_bootstrap_timeout_secs(mut self, secs: u64) -> Self {
778        self.bootstrap_timeout_secs = secs;
779        self
780    }
781
782    /// Build the query configuration.
783    pub fn build(self) -> QueryConfig {
784        QueryConfig {
785            id: self.id,
786            query: self.query,
787            query_language: self.query_language,
788            sources: self.sources,
789            middleware: self.middleware,
790            auto_start: self.auto_start,
791            joins: self.joins,
792            enable_bootstrap: self.enable_bootstrap,
793            bootstrap_buffer_size: self.bootstrap_buffer_size,
794            priority_queue_capacity: self.priority_queue_capacity,
795            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
796            dispatch_mode: self.dispatch_mode,
797            storage_backend: self.storage_backend,
798            recovery_policy: self.recovery_policy,
799            outbox_capacity: self.outbox_capacity,
800            bootstrap_timeout_secs: self.bootstrap_timeout_secs,
801        }
802    }
803}
804
805// ============================================================================
806// Tests
807// ============================================================================
808
809#[cfg(test)]
810mod tests {
811    use super::*;
812    use crate::DrasiLib;
813
814    // ==========================================================================
815    // Query Builder Tests
816    // ==========================================================================
817
818    #[test]
819    fn test_query_builder_cypher() {
820        let config = Query::cypher("test-query")
821            .query("MATCH (n) RETURN n")
822            .from_source("source1")
823            .auto_start(false)
824            .build();
825
826        assert_eq!(config.id, "test-query");
827        assert_eq!(config.query, "MATCH (n) RETURN n");
828        assert_eq!(config.query_language, QueryLanguage::Cypher);
829        assert!(!config.auto_start);
830        assert_eq!(config.sources.len(), 1);
831        assert_eq!(config.sources[0].source_id, "source1");
832    }
833
834    #[test]
835    fn test_query_builder_gql() {
836        let config = Query::gql("test-query")
837            .query("MATCH (n:Person) RETURN n.name")
838            .from_source("source1")
839            .build();
840
841        assert_eq!(config.query_language, QueryLanguage::GQL);
842    }
843
844    #[test]
845    fn test_query_builder_multiple_sources() {
846        let config = Query::cypher("test-query")
847            .query("MATCH (n) RETURN n")
848            .from_source("source1")
849            .from_source("source2")
850            .build();
851
852        assert_eq!(config.sources.len(), 2);
853    }
854
855    #[tokio::test]
856    async fn test_drasi_lib_builder_empty() {
857        let core = DrasiLibBuilder::new().build().await.unwrap();
858
859        assert!(!core.is_running().await);
860    }
861
862    #[tokio::test]
863    async fn test_drasi_lib_builder_with_id() {
864        let core = DrasiLibBuilder::new()
865            .with_id("test-server")
866            .build()
867            .await
868            .unwrap();
869
870        assert_eq!(core.get_config().id, "test-server");
871    }
872
873    #[tokio::test]
874    async fn test_drasi_lib_builder_with_query_no_source() {
875        // Test builder with query configuration that has no source subscriptions
876        // In the instance-based approach, sources are added after build()
877        let core = DrasiLibBuilder::new()
878            .with_id("test-server")
879            .with_query(
880                Query::cypher("query1")
881                    .query("MATCH (n) RETURN n")
882                    // No from_source() call - query has no source subscriptions
883                    .auto_start(false)
884                    .build(),
885            )
886            .build()
887            .await
888            .unwrap();
889
890        let queries = core.list_queries().await.unwrap();
891        assert_eq!(queries.len(), 1);
892    }
893
894    // ==========================================================================
895    // DrasiLib Builder Integration Tests (from builder_tests.rs)
896    // ==========================================================================
897
898    #[tokio::test]
899    async fn test_builder_creates_initialized_server() {
900        let core = DrasiLib::builder().with_id("builder-test").build().await;
901
902        assert!(core.is_ok(), "Builder should create initialized server");
903        let core = core.unwrap();
904        assert!(
905            core.state_guard.is_initialized(),
906            "Server should be initialized"
907        );
908    }
909
910    #[tokio::test]
911    async fn test_builder_with_query() {
912        // In the instance-based approach, sources and reactions are added as instances
913        // after the builder creates the core. Here we just test query config addition.
914        // Source must be registered before a query can reference it
915        let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
916        let core = DrasiLib::builder()
917            .with_id("complex-server")
918            .with_source(source)
919            .with_query(
920                Query::cypher("query1")
921                    .query("MATCH (n) RETURN n")
922                    .from_source("source1")
923                    .build(),
924            )
925            .build()
926            .await;
927
928        assert!(core.is_ok(), "Builder with query should succeed");
929        let core = core.unwrap();
930        assert!(core.state_guard.is_initialized());
931        assert_eq!(core.config.queries.len(), 1);
932    }
933
934    // ==========================================================================
935    // DrasiLibBuilder Unit Tests
936    // ==========================================================================
937
938    #[test]
939    fn test_builder_with_id_sets_id() {
940        let builder = DrasiLibBuilder::new().with_id("my-server");
941        assert_eq!(builder.server_id, Some("my-server".to_string()));
942    }
943
944    #[test]
945    fn test_builder_with_id_accepts_string() {
946        let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
947        assert_eq!(builder.server_id, Some("owned-id".to_string()));
948    }
949
950    #[test]
951    fn test_builder_with_priority_queue_capacity() {
952        let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
953        assert_eq!(builder.priority_queue_capacity, Some(50000));
954    }
955
956    #[test]
957    fn test_builder_with_dispatch_buffer_capacity() {
958        let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
959        assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
960    }
961
962    #[test]
963    fn test_builder_with_query_adds_to_list() {
964        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
965        let builder = DrasiLibBuilder::new().with_query(q);
966        assert_eq!(builder.query_configs.len(), 1);
967        assert_eq!(builder.query_configs[0].id, "q1");
968    }
969
970    #[test]
971    fn test_builder_with_multiple_queries() {
972        let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
973        let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
974        let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
975        assert_eq!(builder.query_configs.len(), 2);
976        assert_eq!(builder.query_configs[0].id, "q1");
977        assert_eq!(builder.query_configs[1].id, "q2");
978    }
979
980    #[test]
981    fn test_builder_add_storage_backend() {
982        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
983
984        let backend = StorageBackendConfig {
985            id: "mem1".to_string(),
986            spec: StorageBackendSpec::Memory {
987                enable_archive: false,
988            },
989        };
990        let builder = DrasiLibBuilder::new().add_storage_backend(backend);
991        assert_eq!(builder.storage_backends.len(), 1);
992        assert_eq!(builder.storage_backends[0].id, "mem1");
993    }
994
995    #[test]
996    fn test_builder_add_multiple_storage_backends() {
997        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
998
999        let b1 = StorageBackendConfig {
1000            id: "mem1".to_string(),
1001            spec: StorageBackendSpec::Memory {
1002                enable_archive: false,
1003            },
1004        };
1005        let b2 = StorageBackendConfig {
1006            id: "mem2".to_string(),
1007            spec: StorageBackendSpec::Memory {
1008                enable_archive: true,
1009            },
1010        };
1011        let builder = DrasiLibBuilder::new()
1012            .add_storage_backend(b1)
1013            .add_storage_backend(b2);
1014        assert_eq!(builder.storage_backends.len(), 2);
1015        assert_eq!(builder.storage_backends[0].id, "mem1");
1016        assert_eq!(builder.storage_backends[1].id, "mem2");
1017    }
1018
1019    #[test]
1020    fn test_builder_default_values() {
1021        let builder = DrasiLibBuilder::new();
1022        assert_eq!(builder.server_id, None);
1023        assert_eq!(builder.priority_queue_capacity, None);
1024        assert_eq!(builder.dispatch_buffer_capacity, None);
1025        assert!(builder.storage_backends.is_empty());
1026        assert!(builder.query_configs.is_empty());
1027        assert!(builder.source_instances.is_empty());
1028        assert!(builder.reaction_instances.is_empty());
1029        assert!(builder.index_provider.is_none());
1030        assert!(builder.state_store_provider.is_none());
1031    }
1032
1033    #[test]
1034    fn test_builder_fluent_chaining() {
1035        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1036
1037        let backend = StorageBackendConfig {
1038            id: "mem".to_string(),
1039            spec: StorageBackendSpec::Memory {
1040                enable_archive: false,
1041            },
1042        };
1043        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
1044
1045        let builder = DrasiLibBuilder::new()
1046            .with_id("chained")
1047            .with_priority_queue_capacity(20000)
1048            .with_dispatch_buffer_capacity(3000)
1049            .add_storage_backend(backend)
1050            .with_query(q);
1051
1052        assert_eq!(builder.server_id, Some("chained".to_string()));
1053        assert_eq!(builder.priority_queue_capacity, Some(20000));
1054        assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
1055        assert_eq!(builder.storage_backends.len(), 1);
1056        assert_eq!(builder.query_configs.len(), 1);
1057    }
1058
1059    #[tokio::test]
1060    async fn test_builder_default_id_when_none_set() {
1061        let core = DrasiLibBuilder::new().build().await.unwrap();
1062        assert_eq!(core.get_config().id, "drasi-lib");
1063    }
1064
1065    #[tokio::test]
1066    async fn test_builder_with_storage_backend_builds_ok() {
1067        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1068
1069        let backend = StorageBackendConfig {
1070            id: "test-mem".to_string(),
1071            spec: StorageBackendSpec::Memory {
1072                enable_archive: false,
1073            },
1074        };
1075        let core = DrasiLibBuilder::new()
1076            .add_storage_backend(backend)
1077            .build()
1078            .await;
1079        assert!(core.is_ok(), "Builder with storage backend should succeed");
1080    }
1081
1082    // ==========================================================================
1083    // Query Builder Unit Tests
1084    // ==========================================================================
1085
1086    #[test]
1087    fn test_query_cypher_sets_id_and_language() {
1088        let q = Query::cypher("cypher-q");
1089        assert_eq!(q.id, "cypher-q");
1090        assert_eq!(q.query_language, QueryLanguage::Cypher);
1091    }
1092
1093    #[test]
1094    fn test_query_gql_sets_id_and_language() {
1095        let q = Query::gql("gql-q");
1096        assert_eq!(q.id, "gql-q");
1097        assert_eq!(q.query_language, QueryLanguage::GQL);
1098    }
1099
1100    #[test]
1101    fn test_query_from_source_adds_source() {
1102        let q = Query::cypher("q").from_source("src1");
1103        assert_eq!(q.sources.len(), 1);
1104        assert_eq!(q.sources[0].source_id, "src1");
1105    }
1106
1107    #[test]
1108    fn test_query_from_source_chaining() {
1109        let q = Query::cypher("q")
1110            .from_source("src1")
1111            .from_source("src2")
1112            .from_source("src3");
1113        assert_eq!(q.sources.len(), 3);
1114        assert_eq!(q.sources[0].source_id, "src1");
1115        assert_eq!(q.sources[1].source_id, "src2");
1116        assert_eq!(q.sources[2].source_id, "src3");
1117    }
1118
1119    #[test]
1120    fn test_query_auto_start_default_true() {
1121        let q = Query::cypher("q");
1122        assert!(q.auto_start);
1123    }
1124
1125    #[test]
1126    fn test_query_auto_start_false() {
1127        let q = Query::cypher("q").auto_start(false);
1128        assert!(!q.auto_start);
1129    }
1130
1131    #[test]
1132    fn test_query_enable_bootstrap_default_true() {
1133        let q = Query::cypher("q");
1134        assert!(q.enable_bootstrap);
1135    }
1136
1137    #[test]
1138    fn test_query_enable_bootstrap_false() {
1139        let q = Query::cypher("q").enable_bootstrap(false);
1140        assert!(!q.enable_bootstrap);
1141    }
1142
1143    #[test]
1144    fn test_query_bootstrap_buffer_size_default() {
1145        let q = Query::cypher("q");
1146        assert_eq!(q.bootstrap_buffer_size, 10000);
1147    }
1148
1149    #[test]
1150    fn test_query_with_bootstrap_buffer_size() {
1151        let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1152        assert_eq!(q.bootstrap_buffer_size, 5000);
1153    }
1154
1155    #[test]
1156    fn test_query_with_dispatch_mode_broadcast() {
1157        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1158        assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1159    }
1160
1161    #[test]
1162    fn test_query_with_dispatch_mode_channel() {
1163        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1164        assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1165    }
1166
1167    #[test]
1168    fn test_query_dispatch_mode_default_none() {
1169        let q = Query::cypher("q");
1170        assert_eq!(q.dispatch_mode, None);
1171    }
1172
1173    #[test]
1174    fn test_query_with_priority_queue_capacity() {
1175        let q = Query::cypher("q").with_priority_queue_capacity(50000);
1176        assert_eq!(q.priority_queue_capacity, Some(50000));
1177    }
1178
1179    #[test]
1180    fn test_query_priority_queue_capacity_default_none() {
1181        let q = Query::cypher("q");
1182        assert_eq!(q.priority_queue_capacity, None);
1183    }
1184
1185    #[test]
1186    fn test_query_with_dispatch_buffer_capacity() {
1187        let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1188        assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1189    }
1190
1191    #[test]
1192    fn test_query_dispatch_buffer_capacity_default_none() {
1193        let q = Query::cypher("q");
1194        assert_eq!(q.dispatch_buffer_capacity, None);
1195    }
1196
1197    #[test]
1198    fn test_query_build_propagates_all_fields() {
1199        let config = Query::cypher("full-query")
1200            .query("MATCH (n:Person) RETURN n.name")
1201            .from_source("source-a")
1202            .from_source("source-b")
1203            .auto_start(false)
1204            .enable_bootstrap(false)
1205            .with_bootstrap_buffer_size(5000)
1206            .with_priority_queue_capacity(50000)
1207            .with_dispatch_buffer_capacity(2500)
1208            .with_dispatch_mode(DispatchMode::Broadcast)
1209            .build();
1210
1211        assert_eq!(config.id, "full-query");
1212        assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1213        assert_eq!(config.query_language, QueryLanguage::Cypher);
1214        assert_eq!(config.sources.len(), 2);
1215        assert_eq!(config.sources[0].source_id, "source-a");
1216        assert_eq!(config.sources[1].source_id, "source-b");
1217        assert!(!config.auto_start);
1218        assert!(!config.enable_bootstrap);
1219        assert_eq!(config.bootstrap_buffer_size, 5000);
1220        assert_eq!(config.priority_queue_capacity, Some(50000));
1221        assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1222        assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1223        assert!(config.joins.is_none());
1224        assert!(config.middleware.is_empty());
1225        assert!(config.storage_backend.is_none());
1226    }
1227
1228    #[test]
1229    fn test_query_build_gql_propagates_language() {
1230        let config = Query::gql("gql-full")
1231            .query("MATCH (n) RETURN n")
1232            .from_source("src")
1233            .build();
1234
1235        assert_eq!(config.id, "gql-full");
1236        assert_eq!(config.query_language, QueryLanguage::GQL);
1237        assert_eq!(config.query, "MATCH (n) RETURN n");
1238        assert_eq!(config.sources.len(), 1);
1239        // Verify defaults are preserved through build
1240        assert!(config.auto_start);
1241        assert!(config.enable_bootstrap);
1242        assert_eq!(config.bootstrap_buffer_size, 10000);
1243    }
1244
1245    #[test]
1246    fn test_query_build_defaults() {
1247        let config = Query::cypher("defaults-only").build();
1248
1249        assert_eq!(config.id, "defaults-only");
1250        assert_eq!(config.query, "");
1251        assert_eq!(config.query_language, QueryLanguage::Cypher);
1252        assert!(config.sources.is_empty());
1253        assert!(config.middleware.is_empty());
1254        assert!(config.auto_start);
1255        assert!(config.joins.is_none());
1256        assert!(config.enable_bootstrap);
1257        assert_eq!(config.bootstrap_buffer_size, 10000);
1258        assert_eq!(config.priority_queue_capacity, None);
1259        assert_eq!(config.dispatch_buffer_capacity, None);
1260        assert_eq!(config.dispatch_mode, None);
1261        assert!(config.storage_backend.is_none());
1262    }
1263}