Skip to main content

drasi_lib/
builder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//!     .with_id("my-server")
47//!     // .with_source(my_source)      // Ownership transferred
48//!     // .with_reaction(my_reaction)  // Ownership transferred
49//!     .with_query(
50//!         Query::cypher("my-query")
51//!             .query("MATCH (n:Person) RETURN n")
52//!             .from_source("events")
53//!             .build()
54//!     )
55//!     .build()
56//!     .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67    DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::secret_store::SecretStoreProvider;
76use crate::sources::Source as SourceTrait;
77use crate::state_store::StateStoreProvider;
78use drasi_core::models::SourceMiddlewareConfig;
79
80// ============================================================================
81// DrasiLibBuilder
82// ============================================================================
83
84/// Fluent builder for creating DrasiLib instances.
85///
86/// Use `DrasiLib::builder()` to get started.
87///
88/// # Plugin Architecture
89///
90/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
91/// reactions are created externally as fully-configured instances implementing
92/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
93///
94/// # Example
95///
96/// ```no_run
97/// use drasi_lib::{DrasiLib, Query};
98///
99/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
100/// // Source and reaction instances are created externally by plugins
101/// // Ownership is transferred to DrasiLib when added
102/// // let my_source = my_source_plugin::create(...);
103/// // let my_reaction = my_reaction_plugin::create(...);
104///
105/// let core = DrasiLib::builder()
106///     .with_id("my-server")
107///     // .with_source(my_source)      // Ownership transferred
108///     // .with_reaction(my_reaction)  // Ownership transferred
109///     .with_query(
110///         Query::cypher("my-query")
111///             .query("MATCH (n) RETURN n")
112///             .from_source("my-source")
113///             .build()
114///     )
115///     .build()
116///     .await?;
117/// # Ok(())
118/// # }
119/// ```
120pub struct DrasiLibBuilder {
121    server_id: Option<String>,
122    priority_queue_capacity: Option<usize>,
123    dispatch_buffer_capacity: Option<usize>,
124    storage_backends: Vec<StorageBackendConfig>,
125    query_configs: Vec<QueryConfig>,
126    source_instances: Vec<(
127        Box<dyn SourceTrait>,
128        std::collections::HashMap<String, String>,
129    )>,
130    reaction_instances: Vec<(
131        Box<dyn ReactionTrait>,
132        std::collections::HashMap<String, String>,
133    )>,
134    /// Bootstrap provider metadata to register in the component graph.
135    /// Each entry: (source_id, kind, properties).
136    bootstrap_metadata: Vec<(
137        String,
138        String,
139        std::collections::HashMap<String, serde_json::Value>,
140    )>,
141    index_provider: Option<Arc<dyn IndexBackendPlugin>>,
142    state_store_provider: Option<Arc<dyn StateStoreProvider>>,
143    identity_provider: Option<Arc<dyn IdentityProvider>>,
144    secret_store_provider: Option<Arc<dyn SecretStoreProvider>>,
145    default_recovery_policy: Option<crate::recovery::RecoveryPolicy>,
146}
147
148impl Default for DrasiLibBuilder {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154impl DrasiLibBuilder {
155    /// Create a new builder with default values.
156    pub fn new() -> Self {
157        Self {
158            server_id: None,
159            priority_queue_capacity: None,
160            dispatch_buffer_capacity: None,
161            storage_backends: Vec::new(),
162            query_configs: Vec::new(),
163            source_instances: Vec::new(),
164            reaction_instances: Vec::new(),
165            bootstrap_metadata: Vec::new(),
166            index_provider: None,
167            state_store_provider: None,
168            identity_provider: None,
169            secret_store_provider: None,
170            default_recovery_policy: None,
171        }
172    }
173
174    /// Set the server ID.
175    pub fn with_id(mut self, id: impl Into<String>) -> Self {
176        self.server_id = Some(id.into());
177        self
178    }
179
180    /// Set the default priority queue capacity for components.
181    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
182        self.priority_queue_capacity = Some(capacity);
183        self
184    }
185
186    /// Set the default dispatch buffer capacity for components.
187    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
188        self.dispatch_buffer_capacity = Some(capacity);
189        self
190    }
191
192    /// Add a storage backend configuration.
193    pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
194        self.storage_backends.push(config);
195        self
196    }
197
198    /// Set the index backend provider for persistent storage.
199    ///
200    /// When using RocksDB or Redis/Garnet storage backends, you must provide
201    /// an index provider that implements `IndexBackendPlugin`. The provider
202    /// is responsible for creating the actual index instances.
203    ///
204    /// If no index provider is set, only in-memory storage backends can be used.
205    /// Attempting to use RocksDB or Redis backends without a provider will result
206    /// in an error.
207    ///
208    /// # Example
209    /// ```ignore
210    /// use drasi_index_rocksdb::RocksDbIndexProvider;
211    /// use std::sync::Arc;
212    ///
213    /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
214    /// let core = DrasiLib::builder()
215    ///     .with_index_provider(Arc::new(provider))
216    ///     .build()
217    ///     .await?;
218    /// ```
219    pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
220        self.index_provider = Some(provider);
221        self
222    }
223
224    /// Set the state store provider for plugin state persistence.
225    ///
226    /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
227    /// to store and retrieve runtime state that can persist across runs of DrasiLib.
228    ///
229    /// If no state store provider is set, the default in-memory provider will be used.
230    /// The in-memory provider does not persist state across restarts.
231    ///
232    /// # Example
233    /// ```ignore
234    /// use drasi_state_store_json::JsonStateStoreProvider;
235    /// use std::sync::Arc;
236    ///
237    /// let state_store = JsonStateStoreProvider::new("/data/state");
238    /// let core = DrasiLib::builder()
239    ///     .with_state_store_provider(Arc::new(state_store))
240    ///     .build()
241    ///     .await?;
242    /// ```
243    pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
244        self.state_store_provider = Some(provider);
245        self
246    }
247
248    /// Set the identity provider for credential injection.
249    ///
250    /// Identity providers supply authentication credentials (passwords, tokens,
251    /// certificates) to sources and reactions that need them for connecting to
252    /// external systems.
253    ///
254    /// If no identity provider is set, sources and reactions will receive `None`
255    /// for `context.identity_provider`.
256    ///
257    /// # Example
258    /// ```ignore
259    /// use drasi_identity_azure::AzureIdentityProvider;
260    /// use std::sync::Arc;
261    ///
262    /// let provider = AzureIdentityProvider::with_default_credentials("user@tenant.onmicrosoft.com")?;
263    /// let core = DrasiLib::builder()
264    ///     .with_identity_provider(Arc::new(provider))
265    ///     .build()
266    ///     .await?;
267    /// ```
268    pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
269        self.identity_provider = Some(provider);
270        self
271    }
272
273    /// Set the secret store provider for resolving `ConfigValue::Secret` references.
274    ///
275    /// Secret store providers resolve named secret references (e.g., database passwords,
276    /// API keys) from external secret management systems like Azure Key Vault, OS keyrings,
277    /// or local secret files.
278    ///
279    /// The secret store provider is initialized **before** any source/reaction/bootstrap
280    /// plugins, and its resolved values are injected into `DtoMapper` via the global
281    /// secret resolver registry.
282    ///
283    /// If no secret store provider is set, `ConfigValue::Secret` references will fail
284    /// with a "not implemented" error.
285    ///
286    /// # Example
287    /// ```ignore
288    /// use drasi_secret_store_file::FileSecretStoreProvider;
289    /// use std::sync::Arc;
290    ///
291    /// let secrets = FileSecretStoreProvider::new("/etc/drasi/secrets.json").await?;
292    /// let core = DrasiLib::builder()
293    ///     .with_secret_store_provider(Arc::new(secrets))
294    ///     .build()
295    ///     .await?;
296    /// ```
297    pub fn with_secret_store_provider(mut self, provider: Arc<dyn SecretStoreProvider>) -> Self {
298        self.secret_store_provider = Some(provider);
299        self
300    }
301
302    /// Set the global default recovery policy for all queries.
303    ///
304    /// Per-query `QueryConfig::recovery_policy` overrides this.
305    /// If neither is set, defaults to [`RecoveryPolicy::Strict`](crate::RecoveryPolicy::Strict).
306    pub fn with_default_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
307        self.default_recovery_policy = Some(policy);
308        self
309    }
310
311    /// Add a source instance, taking ownership.
312    ///
313    /// Source instances are created externally by plugins with their own typed configurations.
314    /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
315    ///
316    /// # Example
317    /// ```ignore
318    /// let source = MySource::new("my-source", config)?;
319    /// let core = DrasiLib::builder()
320    ///     .with_source(source)  // Ownership transferred
321    ///     .build()
322    ///     .await?;
323    /// ```
324    pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
325        self.source_instances
326            .push((Box::new(source), std::collections::HashMap::new()));
327        self
328    }
329
330    /// Add a source instance with additional component metadata.
331    ///
332    /// Like [`with_source`](Self::with_source) but merges `extra_metadata`
333    /// (e.g. `pluginId`) into the component graph node.
334    pub fn with_source_metadata(
335        mut self,
336        source: impl SourceTrait + 'static,
337        extra_metadata: std::collections::HashMap<String, String>,
338    ) -> Self {
339        self.source_instances
340            .push((Box::new(source), extra_metadata));
341        self
342    }
343
344    /// Add a query configuration.
345    pub fn with_query(mut self, config: QueryConfig) -> Self {
346        self.query_configs.push(config);
347        self
348    }
349
350    /// Register bootstrap provider metadata for a source.
351    ///
352    /// This records the bootstrap provider's kind and configuration properties
353    /// so they can be persisted in the component graph and included in
354    /// `snapshot_configuration()` output.
355    ///
356    /// # Arguments
357    /// * `source_id` - The ID of the source this bootstrap provider is attached to
358    /// * `kind` - Bootstrap provider kind (e.g., "http", "postgres", "scriptfile")
359    /// * `properties` - Configuration properties for persistence (raw config with
360    ///   ConfigValue envelopes intact for lossless roundtripping)
361    ///
362    /// # Example
363    /// ```ignore
364    /// let builder = DrasiLib::builder()
365    ///     .with_source(my_source)
366    ///     .with_bootstrap_for_source("my-source", "http", config_properties);
367    /// ```
368    pub fn with_bootstrap_for_source(
369        mut self,
370        source_id: impl Into<String>,
371        kind: impl Into<String>,
372        properties: std::collections::HashMap<String, serde_json::Value>,
373    ) -> Self {
374        self.bootstrap_metadata
375            .push((source_id.into(), kind.into(), properties));
376        self
377    }
378
379    /// Add a reaction instance, taking ownership.
380    ///
381    /// Reaction instances are created externally by plugins with their own typed configurations.
382    /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
383    ///
384    /// # Example
385    /// ```ignore
386    /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
387    /// let core = DrasiLib::builder()
388    ///     .with_reaction(reaction)  // Ownership transferred
389    ///     .build()
390    ///     .await?;
391    /// ```
392    pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
393        self.reaction_instances
394            .push((Box::new(reaction), std::collections::HashMap::new()));
395        self
396    }
397
398    /// Add a reaction instance with additional component metadata.
399    ///
400    /// Like [`with_reaction`](Self::with_reaction) but merges `extra_metadata`
401    /// (e.g. `pluginId`) into the component graph node.
402    pub fn with_reaction_metadata(
403        mut self,
404        reaction: impl ReactionTrait + 'static,
405        extra_metadata: std::collections::HashMap<String, String>,
406    ) -> Self {
407        self.reaction_instances
408            .push((Box::new(reaction), extra_metadata));
409        self
410    }
411
412    /// Build the DrasiLib instance.
413    ///
414    /// This validates the configuration, creates all components, and initializes the server.
415    /// After building, you can call `start()` to begin processing.
416    pub async fn build(self) -> Result<DrasiLib> {
417        // Build the configuration
418        let config = DrasiLibConfig {
419            id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
420            priority_queue_capacity: self.priority_queue_capacity,
421            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
422            storage_backends: self.storage_backends,
423            queries: self.query_configs.clone(),
424        };
425
426        // Validate the configuration
427        config
428            .validate()
429            .map_err(|e| DrasiError::validation(e.to_string()))?;
430
431        // Create runtime config and server with optional index and state store providers
432        let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
433            config,
434            self.index_provider,
435            self.state_store_provider,
436            self.identity_provider,
437            self.secret_store_provider,
438            self.default_recovery_policy,
439        ));
440        let mut core = DrasiLib::new(runtime_config);
441
442        // Inject state store before provisioning sources (they need it for initialization)
443        let state_store = core.config.state_store_provider.clone();
444        core.source_manager
445            .inject_state_store(state_store.clone())
446            .await;
447        core.reaction_manager.inject_state_store(state_store).await;
448
449        // Register the component graph source BEFORE initialize (which loads query config).
450        // Queries reference sources, so sources must exist in the graph first.
451        {
452            use crate::sources::component_graph_source::ComponentGraphSource;
453            let graph_source = ComponentGraphSource::new(
454                core.component_event_broadcast_tx.clone(),
455                core.config.id.clone(),
456                core.component_graph.clone(),
457            )
458            .map_err(|e| {
459                DrasiError::operation_failed(
460                    "source",
461                    "component-graph",
462                    "add",
463                    format!("Failed to create: {e}"),
464                )
465            })?;
466
467            let source_id = graph_source.id().to_string();
468            let source_type = graph_source.type_name().to_string();
469            {
470                let mut graph = core.component_graph.write().await;
471                let mut metadata = std::collections::HashMap::new();
472                metadata.insert("kind".to_string(), source_type);
473                metadata.insert(
474                    "autoStart".to_string(),
475                    graph_source.auto_start().to_string(),
476                );
477                graph.register_source(&source_id, metadata).map_err(|e| {
478                    DrasiError::operation_failed(
479                        "source",
480                        &source_id,
481                        "add",
482                        format!("Failed to register: {e}"),
483                    )
484                })?;
485            }
486            if let Err(e) = core.source_manager.provision_source(graph_source).await {
487                let mut graph = core.component_graph.write().await;
488                let _ = graph.deregister(&source_id);
489                return Err(DrasiError::operation_failed(
490                    "source",
491                    &source_id,
492                    "add",
493                    format!("Failed to provision: {e}"),
494                ));
495            }
496        }
497
498        // Inject pre-built source instances BEFORE initialize.
499        // Queries reference sources by ID, so sources must be in the graph first.
500        for (source, extra_metadata) in self.source_instances {
501            let source_id = source.id().to_string();
502            let source_type = source.type_name().to_string();
503            let auto_start = source.auto_start();
504
505            {
506                let mut graph = core.component_graph.write().await;
507                let mut metadata = std::collections::HashMap::new();
508                metadata.insert("kind".to_string(), source_type);
509                metadata.insert("autoStart".to_string(), auto_start.to_string());
510                metadata.extend(extra_metadata);
511                graph.register_source(&source_id, metadata).map_err(|e| {
512                    DrasiError::operation_failed(
513                        "source",
514                        &source_id,
515                        "add",
516                        format!("Failed to register: {e}"),
517                    )
518                })?;
519            }
520            if let Err(e) = core.source_manager.provision_source(source).await {
521                let mut graph = core.component_graph.write().await;
522                let _ = graph.deregister(&source_id);
523                return Err(DrasiError::operation_failed(
524                    "source",
525                    &source_id,
526                    "add",
527                    format!("Failed to provision: {e}"),
528                ));
529            }
530        }
531
532        // Register bootstrap provider metadata in the component graph.
533        // This enables snapshot_configuration() to persist and reconstruct
534        // bootstrap provider configurations.
535        for (source_id, kind, properties) in self.bootstrap_metadata {
536            let bp_id = format!("{source_id}-bootstrap");
537            let mut metadata = std::collections::HashMap::new();
538            metadata.insert("kind".to_string(), kind);
539            for (key, value) in properties {
540                metadata.insert(key, serde_json::to_string(&value).unwrap_or_default());
541            }
542            let mut graph = core.component_graph.write().await;
543            if let Err(e) =
544                graph.register_bootstrap_provider(&bp_id, metadata, &[source_id.clone()])
545            {
546                log::warn!(
547                    "Failed to register bootstrap provider metadata for source '{source_id}': {e}"
548                );
549            }
550        }
551
552        // Initialize the server (loads query configurations — sources must already be registered)
553        core.initialize().await?;
554
555        // Inject pre-built reaction instances
556        for (reaction, extra_metadata) in self.reaction_instances {
557            let reaction_id = reaction.id().to_string();
558            let reaction_type = reaction.type_name().to_string();
559            let query_ids = reaction.query_ids();
560
561            // Register in graph first, then provision
562            {
563                let mut graph = core.component_graph.write().await;
564                let mut metadata = std::collections::HashMap::new();
565                metadata.insert("kind".to_string(), reaction_type);
566                metadata.extend(extra_metadata);
567                graph
568                    .register_reaction(&reaction_id, metadata, &query_ids)
569                    .map_err(|e| {
570                        DrasiError::operation_failed(
571                            "reaction",
572                            &reaction_id,
573                            "add",
574                            format!("Failed to register: {e}"),
575                        )
576                    })?;
577            }
578            if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
579                let mut graph = core.component_graph.write().await;
580                let _ = graph.deregister(&reaction_id);
581                return Err(DrasiError::operation_failed(
582                    "reaction",
583                    &reaction_id,
584                    "add",
585                    format!("Failed to provision: {e}"),
586                ));
587            }
588        }
589
590        // Register the identity provider in the component graph (if configured).
591        // This creates an IdentityProvider node with Authenticates edges to all
592        // sources and reactions that receive credentials from it.
593        if core.config.identity_provider.is_some() {
594            let mut graph = core.component_graph.write().await;
595            let component_ids: Vec<String> = graph
596                .list_by_kind(&crate::component_graph::ComponentKind::Source)
597                .into_iter()
598                .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
599                .map(|(id, _)| id)
600                .collect();
601
602            let mut metadata = std::collections::HashMap::new();
603            metadata.insert("kind".to_string(), "identity_provider".to_string());
604            graph
605                .register_identity_provider("identity-provider", metadata, &component_ids)
606                .map_err(|e| {
607                    DrasiError::operation_failed(
608                        "identity_provider",
609                        "identity-provider",
610                        "add",
611                        format!("Failed to register: {e}"),
612                    )
613                })?;
614        }
615
616        Ok(core)
617    }
618}
619
620// ============================================================================
621// Query Builder
622// ============================================================================
623
624/// Fluent builder for query configurations.
625///
626/// Use `Query::cypher()` or `Query::gql()` to get started.
627///
628/// # Example
629///
630/// ```no_run
631/// use drasi_lib::Query;
632///
633/// let query_config = Query::cypher("my-query")
634///     .query("MATCH (n:Person) RETURN n.name, n.age")
635///     .from_source("my-source")
636///     .auto_start(true)
637///     .build();
638/// ```
639pub struct Query {
640    id: String,
641    query: String,
642    query_language: QueryLanguage,
643    sources: Vec<SourceSubscriptionConfig>,
644    middleware: Vec<SourceMiddlewareConfig>,
645    auto_start: bool,
646    joins: Option<Vec<QueryJoinConfig>>,
647    enable_bootstrap: bool,
648    bootstrap_buffer_size: usize,
649    priority_queue_capacity: Option<usize>,
650    dispatch_buffer_capacity: Option<usize>,
651    dispatch_mode: Option<DispatchMode>,
652    storage_backend: Option<crate::indexes::StorageBackendRef>,
653    recovery_policy: Option<crate::recovery::RecoveryPolicy>,
654    outbox_capacity: usize,
655    bootstrap_timeout_secs: u64,
656}
657
658impl Query {
659    /// Create a new Cypher query builder.
660    pub fn cypher(id: impl Into<String>) -> Self {
661        Self {
662            id: id.into(),
663            query: String::new(),
664            query_language: QueryLanguage::Cypher,
665            sources: Vec::new(),
666            middleware: Vec::new(),
667            auto_start: true,
668            joins: None,
669            enable_bootstrap: true,
670            bootstrap_buffer_size: 10000,
671            priority_queue_capacity: None,
672            dispatch_buffer_capacity: None,
673            dispatch_mode: None,
674            storage_backend: None,
675            recovery_policy: None,
676            outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
677            bootstrap_timeout_secs: 300,
678        }
679    }
680
681    /// Create a new GQL (ISO 9074:2024) query builder.
682    pub fn gql(id: impl Into<String>) -> Self {
683        Self {
684            id: id.into(),
685            query: String::new(),
686            query_language: QueryLanguage::GQL,
687            sources: Vec::new(),
688            middleware: Vec::new(),
689            auto_start: true,
690            joins: None,
691            enable_bootstrap: true,
692            bootstrap_buffer_size: 10000,
693            priority_queue_capacity: None,
694            dispatch_buffer_capacity: None,
695            dispatch_mode: None,
696            storage_backend: None,
697            recovery_policy: None,
698            outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
699            bootstrap_timeout_secs: 300,
700        }
701    }
702
703    /// Set the query string.
704    pub fn query(mut self, query: impl Into<String>) -> Self {
705        self.query = query.into();
706        self
707    }
708
709    /// Subscribe to a source.
710    pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
711        self.sources.push(SourceSubscriptionConfig {
712            source_id: source_id.into(),
713            nodes: Vec::new(),
714            relations: Vec::new(),
715            pipeline: Vec::new(),
716        });
717        self
718    }
719
720    /// Subscribe to a source with a middleware pipeline.
721    ///
722    /// The pipeline is a list of middleware names (strings) that will be applied to
723    /// data from this source before it reaches the query.
724    pub fn from_source_with_pipeline(
725        mut self,
726        source_id: impl Into<String>,
727        pipeline: Vec<String>,
728    ) -> Self {
729        self.sources.push(SourceSubscriptionConfig {
730            source_id: source_id.into(),
731            nodes: Vec::new(),
732            relations: Vec::new(),
733            pipeline,
734        });
735        self
736    }
737
738    /// Add middleware to the query.
739    pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
740        self.middleware.push(middleware);
741        self
742    }
743
744    /// Set whether the query should auto-start.
745    pub fn auto_start(mut self, auto_start: bool) -> Self {
746        self.auto_start = auto_start;
747        self
748    }
749
750    /// Set the join configuration.
751    pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
752        self.joins = Some(joins);
753        self
754    }
755
756    /// Enable or disable bootstrap.
757    pub fn enable_bootstrap(mut self, enable: bool) -> Self {
758        self.enable_bootstrap = enable;
759        self
760    }
761
762    /// Set the bootstrap buffer size.
763    pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
764        self.bootstrap_buffer_size = size;
765        self
766    }
767
768    /// Set the priority queue capacity.
769    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
770        self.priority_queue_capacity = Some(capacity);
771        self
772    }
773
774    /// Set the dispatch buffer capacity.
775    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
776        self.dispatch_buffer_capacity = Some(capacity);
777        self
778    }
779
780    /// Set the dispatch mode.
781    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
782        self.dispatch_mode = Some(mode);
783        self
784    }
785
786    /// Set the storage backend reference.
787    pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
788        self.storage_backend = Some(backend);
789        self
790    }
791
792    /// Set the recovery policy. Applies only to queries with a persistent
793    /// storage backend. See [`RecoveryPolicy`](crate::RecoveryPolicy).
794    pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
795        self.recovery_policy = Some(policy);
796        self
797    }
798
799    /// Set the outbox capacity (number of recent QueryResult emissions retained).
800    /// Default: 1000.
801    pub fn with_outbox_capacity(mut self, capacity: usize) -> Self {
802        self.outbox_capacity = capacity;
803        self
804    }
805
806    /// Set the bootstrap timeout in seconds.
807    /// This controls how long `fetch_snapshot` / `fetch_outbox` will wait for
808    /// the query to finish bootstrapping before returning `FetchError::TimedOut`.
809    /// Default: 300 (5 minutes).
810    pub fn with_bootstrap_timeout_secs(mut self, secs: u64) -> Self {
811        self.bootstrap_timeout_secs = secs;
812        self
813    }
814
815    /// Build the query configuration.
816    pub fn build(self) -> QueryConfig {
817        QueryConfig {
818            id: self.id,
819            query: self.query,
820            query_language: self.query_language,
821            sources: self.sources,
822            middleware: self.middleware,
823            auto_start: self.auto_start,
824            joins: self.joins,
825            enable_bootstrap: self.enable_bootstrap,
826            bootstrap_buffer_size: self.bootstrap_buffer_size,
827            priority_queue_capacity: self.priority_queue_capacity,
828            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
829            dispatch_mode: self.dispatch_mode,
830            storage_backend: self.storage_backend,
831            recovery_policy: self.recovery_policy,
832            outbox_capacity: self.outbox_capacity,
833            bootstrap_timeout_secs: self.bootstrap_timeout_secs,
834        }
835    }
836}
837
838// ============================================================================
839// Tests
840// ============================================================================
841
842#[cfg(test)]
843mod tests {
844    use super::*;
845    use crate::DrasiLib;
846
847    // ==========================================================================
848    // Query Builder Tests
849    // ==========================================================================
850
851    #[test]
852    fn test_query_builder_cypher() {
853        let config = Query::cypher("test-query")
854            .query("MATCH (n) RETURN n")
855            .from_source("source1")
856            .auto_start(false)
857            .build();
858
859        assert_eq!(config.id, "test-query");
860        assert_eq!(config.query, "MATCH (n) RETURN n");
861        assert_eq!(config.query_language, QueryLanguage::Cypher);
862        assert!(!config.auto_start);
863        assert_eq!(config.sources.len(), 1);
864        assert_eq!(config.sources[0].source_id, "source1");
865    }
866
867    #[test]
868    fn test_query_builder_gql() {
869        let config = Query::gql("test-query")
870            .query("MATCH (n:Person) RETURN n.name")
871            .from_source("source1")
872            .build();
873
874        assert_eq!(config.query_language, QueryLanguage::GQL);
875    }
876
877    #[test]
878    fn test_query_builder_multiple_sources() {
879        let config = Query::cypher("test-query")
880            .query("MATCH (n) RETURN n")
881            .from_source("source1")
882            .from_source("source2")
883            .build();
884
885        assert_eq!(config.sources.len(), 2);
886    }
887
888    #[tokio::test]
889    async fn test_drasi_lib_builder_empty() {
890        let core = DrasiLibBuilder::new().build().await.unwrap();
891
892        assert!(!core.is_running().await);
893    }
894
895    #[tokio::test]
896    async fn test_drasi_lib_builder_with_id() {
897        let core = DrasiLibBuilder::new()
898            .with_id("test-server")
899            .build()
900            .await
901            .unwrap();
902
903        assert_eq!(core.get_config().id, "test-server");
904    }
905
906    #[tokio::test]
907    async fn test_drasi_lib_builder_with_query_no_source() {
908        // Test builder with query configuration that has no source subscriptions
909        // In the instance-based approach, sources are added after build()
910        let core = DrasiLibBuilder::new()
911            .with_id("test-server")
912            .with_query(
913                Query::cypher("query1")
914                    .query("MATCH (n) RETURN n")
915                    // No from_source() call - query has no source subscriptions
916                    .auto_start(false)
917                    .build(),
918            )
919            .build()
920            .await
921            .unwrap();
922
923        let queries = core.list_queries().await.unwrap();
924        assert_eq!(queries.len(), 1);
925    }
926
927    // ==========================================================================
928    // DrasiLib Builder Integration Tests (from builder_tests.rs)
929    // ==========================================================================
930
931    #[tokio::test]
932    async fn test_builder_creates_initialized_server() {
933        let core = DrasiLib::builder().with_id("builder-test").build().await;
934
935        assert!(core.is_ok(), "Builder should create initialized server");
936        let core = core.unwrap();
937        assert!(
938            core.state_guard.is_initialized(),
939            "Server should be initialized"
940        );
941    }
942
943    #[tokio::test]
944    async fn test_builder_with_query() {
945        // In the instance-based approach, sources and reactions are added as instances
946        // after the builder creates the core. Here we just test query config addition.
947        // Source must be registered before a query can reference it
948        let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
949        let core = DrasiLib::builder()
950            .with_id("complex-server")
951            .with_source(source)
952            .with_query(
953                Query::cypher("query1")
954                    .query("MATCH (n) RETURN n")
955                    .from_source("source1")
956                    .build(),
957            )
958            .build()
959            .await;
960
961        assert!(core.is_ok(), "Builder with query should succeed");
962        let core = core.unwrap();
963        assert!(core.state_guard.is_initialized());
964        assert_eq!(core.config.queries.len(), 1);
965    }
966
967    // ==========================================================================
968    // DrasiLibBuilder Unit Tests
969    // ==========================================================================
970
971    #[test]
972    fn test_builder_with_id_sets_id() {
973        let builder = DrasiLibBuilder::new().with_id("my-server");
974        assert_eq!(builder.server_id, Some("my-server".to_string()));
975    }
976
977    #[test]
978    fn test_builder_with_id_accepts_string() {
979        let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
980        assert_eq!(builder.server_id, Some("owned-id".to_string()));
981    }
982
983    #[test]
984    fn test_builder_with_priority_queue_capacity() {
985        let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
986        assert_eq!(builder.priority_queue_capacity, Some(50000));
987    }
988
989    #[test]
990    fn test_builder_with_dispatch_buffer_capacity() {
991        let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
992        assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
993    }
994
995    #[test]
996    fn test_builder_with_query_adds_to_list() {
997        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
998        let builder = DrasiLibBuilder::new().with_query(q);
999        assert_eq!(builder.query_configs.len(), 1);
1000        assert_eq!(builder.query_configs[0].id, "q1");
1001    }
1002
1003    #[test]
1004    fn test_builder_with_multiple_queries() {
1005        let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
1006        let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
1007        let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
1008        assert_eq!(builder.query_configs.len(), 2);
1009        assert_eq!(builder.query_configs[0].id, "q1");
1010        assert_eq!(builder.query_configs[1].id, "q2");
1011    }
1012
1013    #[test]
1014    fn test_builder_add_storage_backend() {
1015        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1016
1017        let backend = StorageBackendConfig {
1018            id: "mem1".to_string(),
1019            spec: StorageBackendSpec::Memory {
1020                enable_archive: false,
1021            },
1022        };
1023        let builder = DrasiLibBuilder::new().add_storage_backend(backend);
1024        assert_eq!(builder.storage_backends.len(), 1);
1025        assert_eq!(builder.storage_backends[0].id, "mem1");
1026    }
1027
1028    #[test]
1029    fn test_builder_add_multiple_storage_backends() {
1030        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1031
1032        let b1 = StorageBackendConfig {
1033            id: "mem1".to_string(),
1034            spec: StorageBackendSpec::Memory {
1035                enable_archive: false,
1036            },
1037        };
1038        let b2 = StorageBackendConfig {
1039            id: "mem2".to_string(),
1040            spec: StorageBackendSpec::Memory {
1041                enable_archive: true,
1042            },
1043        };
1044        let builder = DrasiLibBuilder::new()
1045            .add_storage_backend(b1)
1046            .add_storage_backend(b2);
1047        assert_eq!(builder.storage_backends.len(), 2);
1048        assert_eq!(builder.storage_backends[0].id, "mem1");
1049        assert_eq!(builder.storage_backends[1].id, "mem2");
1050    }
1051
1052    #[test]
1053    fn test_builder_default_values() {
1054        let builder = DrasiLibBuilder::new();
1055        assert_eq!(builder.server_id, None);
1056        assert_eq!(builder.priority_queue_capacity, None);
1057        assert_eq!(builder.dispatch_buffer_capacity, None);
1058        assert!(builder.storage_backends.is_empty());
1059        assert!(builder.query_configs.is_empty());
1060        assert!(builder.source_instances.is_empty());
1061        assert!(builder.reaction_instances.is_empty());
1062        assert!(builder.index_provider.is_none());
1063        assert!(builder.state_store_provider.is_none());
1064    }
1065
1066    #[test]
1067    fn test_builder_fluent_chaining() {
1068        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1069
1070        let backend = StorageBackendConfig {
1071            id: "mem".to_string(),
1072            spec: StorageBackendSpec::Memory {
1073                enable_archive: false,
1074            },
1075        };
1076        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
1077
1078        let builder = DrasiLibBuilder::new()
1079            .with_id("chained")
1080            .with_priority_queue_capacity(20000)
1081            .with_dispatch_buffer_capacity(3000)
1082            .add_storage_backend(backend)
1083            .with_query(q);
1084
1085        assert_eq!(builder.server_id, Some("chained".to_string()));
1086        assert_eq!(builder.priority_queue_capacity, Some(20000));
1087        assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
1088        assert_eq!(builder.storage_backends.len(), 1);
1089        assert_eq!(builder.query_configs.len(), 1);
1090    }
1091
1092    #[tokio::test]
1093    async fn test_builder_default_id_when_none_set() {
1094        let core = DrasiLibBuilder::new().build().await.unwrap();
1095        assert_eq!(core.get_config().id, "drasi-lib");
1096    }
1097
1098    #[tokio::test]
1099    async fn test_builder_with_storage_backend_builds_ok() {
1100        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1101
1102        let backend = StorageBackendConfig {
1103            id: "test-mem".to_string(),
1104            spec: StorageBackendSpec::Memory {
1105                enable_archive: false,
1106            },
1107        };
1108        let core = DrasiLibBuilder::new()
1109            .add_storage_backend(backend)
1110            .build()
1111            .await;
1112        assert!(core.is_ok(), "Builder with storage backend should succeed");
1113    }
1114
1115    // ==========================================================================
1116    // Query Builder Unit Tests
1117    // ==========================================================================
1118
1119    #[test]
1120    fn test_query_cypher_sets_id_and_language() {
1121        let q = Query::cypher("cypher-q");
1122        assert_eq!(q.id, "cypher-q");
1123        assert_eq!(q.query_language, QueryLanguage::Cypher);
1124    }
1125
1126    #[test]
1127    fn test_query_gql_sets_id_and_language() {
1128        let q = Query::gql("gql-q");
1129        assert_eq!(q.id, "gql-q");
1130        assert_eq!(q.query_language, QueryLanguage::GQL);
1131    }
1132
1133    #[test]
1134    fn test_query_from_source_adds_source() {
1135        let q = Query::cypher("q").from_source("src1");
1136        assert_eq!(q.sources.len(), 1);
1137        assert_eq!(q.sources[0].source_id, "src1");
1138    }
1139
1140    #[test]
1141    fn test_query_from_source_chaining() {
1142        let q = Query::cypher("q")
1143            .from_source("src1")
1144            .from_source("src2")
1145            .from_source("src3");
1146        assert_eq!(q.sources.len(), 3);
1147        assert_eq!(q.sources[0].source_id, "src1");
1148        assert_eq!(q.sources[1].source_id, "src2");
1149        assert_eq!(q.sources[2].source_id, "src3");
1150    }
1151
1152    #[test]
1153    fn test_query_auto_start_default_true() {
1154        let q = Query::cypher("q");
1155        assert!(q.auto_start);
1156    }
1157
1158    #[test]
1159    fn test_query_auto_start_false() {
1160        let q = Query::cypher("q").auto_start(false);
1161        assert!(!q.auto_start);
1162    }
1163
1164    #[test]
1165    fn test_query_enable_bootstrap_default_true() {
1166        let q = Query::cypher("q");
1167        assert!(q.enable_bootstrap);
1168    }
1169
1170    #[test]
1171    fn test_query_enable_bootstrap_false() {
1172        let q = Query::cypher("q").enable_bootstrap(false);
1173        assert!(!q.enable_bootstrap);
1174    }
1175
1176    #[test]
1177    fn test_query_bootstrap_buffer_size_default() {
1178        let q = Query::cypher("q");
1179        assert_eq!(q.bootstrap_buffer_size, 10000);
1180    }
1181
1182    #[test]
1183    fn test_query_with_bootstrap_buffer_size() {
1184        let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1185        assert_eq!(q.bootstrap_buffer_size, 5000);
1186    }
1187
1188    #[test]
1189    fn test_query_with_dispatch_mode_broadcast() {
1190        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1191        assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1192    }
1193
1194    #[test]
1195    fn test_query_with_dispatch_mode_channel() {
1196        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1197        assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1198    }
1199
1200    #[test]
1201    fn test_query_dispatch_mode_default_none() {
1202        let q = Query::cypher("q");
1203        assert_eq!(q.dispatch_mode, None);
1204    }
1205
1206    #[test]
1207    fn test_query_with_priority_queue_capacity() {
1208        let q = Query::cypher("q").with_priority_queue_capacity(50000);
1209        assert_eq!(q.priority_queue_capacity, Some(50000));
1210    }
1211
1212    #[test]
1213    fn test_query_priority_queue_capacity_default_none() {
1214        let q = Query::cypher("q");
1215        assert_eq!(q.priority_queue_capacity, None);
1216    }
1217
1218    #[test]
1219    fn test_query_with_dispatch_buffer_capacity() {
1220        let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1221        assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1222    }
1223
1224    #[test]
1225    fn test_query_dispatch_buffer_capacity_default_none() {
1226        let q = Query::cypher("q");
1227        assert_eq!(q.dispatch_buffer_capacity, None);
1228    }
1229
1230    #[test]
1231    fn test_query_build_propagates_all_fields() {
1232        let config = Query::cypher("full-query")
1233            .query("MATCH (n:Person) RETURN n.name")
1234            .from_source("source-a")
1235            .from_source("source-b")
1236            .auto_start(false)
1237            .enable_bootstrap(false)
1238            .with_bootstrap_buffer_size(5000)
1239            .with_priority_queue_capacity(50000)
1240            .with_dispatch_buffer_capacity(2500)
1241            .with_dispatch_mode(DispatchMode::Broadcast)
1242            .build();
1243
1244        assert_eq!(config.id, "full-query");
1245        assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1246        assert_eq!(config.query_language, QueryLanguage::Cypher);
1247        assert_eq!(config.sources.len(), 2);
1248        assert_eq!(config.sources[0].source_id, "source-a");
1249        assert_eq!(config.sources[1].source_id, "source-b");
1250        assert!(!config.auto_start);
1251        assert!(!config.enable_bootstrap);
1252        assert_eq!(config.bootstrap_buffer_size, 5000);
1253        assert_eq!(config.priority_queue_capacity, Some(50000));
1254        assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1255        assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1256        assert!(config.joins.is_none());
1257        assert!(config.middleware.is_empty());
1258        assert!(config.storage_backend.is_none());
1259    }
1260
1261    #[test]
1262    fn test_query_build_gql_propagates_language() {
1263        let config = Query::gql("gql-full")
1264            .query("MATCH (n) RETURN n")
1265            .from_source("src")
1266            .build();
1267
1268        assert_eq!(config.id, "gql-full");
1269        assert_eq!(config.query_language, QueryLanguage::GQL);
1270        assert_eq!(config.query, "MATCH (n) RETURN n");
1271        assert_eq!(config.sources.len(), 1);
1272        // Verify defaults are preserved through build
1273        assert!(config.auto_start);
1274        assert!(config.enable_bootstrap);
1275        assert_eq!(config.bootstrap_buffer_size, 10000);
1276    }
1277
1278    #[test]
1279    fn test_query_build_defaults() {
1280        let config = Query::cypher("defaults-only").build();
1281
1282        assert_eq!(config.id, "defaults-only");
1283        assert_eq!(config.query, "");
1284        assert_eq!(config.query_language, QueryLanguage::Cypher);
1285        assert!(config.sources.is_empty());
1286        assert!(config.middleware.is_empty());
1287        assert!(config.auto_start);
1288        assert!(config.joins.is_none());
1289        assert!(config.enable_bootstrap);
1290        assert_eq!(config.bootstrap_buffer_size, 10000);
1291        assert_eq!(config.priority_queue_capacity, None);
1292        assert_eq!(config.dispatch_buffer_capacity, None);
1293        assert_eq!(config.dispatch_mode, None);
1294        assert!(config.storage_backend.is_none());
1295    }
1296}