Skip to main content

drasi_lib/
builder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//!     .with_id("my-server")
47//!     // .with_source(my_source)      // Ownership transferred
48//!     // .with_reaction(my_reaction)  // Ownership transferred
49//!     .with_query(
50//!         Query::cypher("my-query")
51//!             .query("MATCH (n:Person) RETURN n")
52//!             .from_source("events")
53//!             .build()
54//!     )
55//!     .build()
56//!     .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67    DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::sources::Source as SourceTrait;
76use crate::state_store::StateStoreProvider;
77use drasi_core::models::SourceMiddlewareConfig;
78
79// ============================================================================
80// DrasiLibBuilder
81// ============================================================================
82
83/// Fluent builder for creating DrasiLib instances.
84///
85/// Use `DrasiLib::builder()` to get started.
86///
87/// # Plugin Architecture
88///
89/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
90/// reactions are created externally as fully-configured instances implementing
91/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
92///
93/// # Example
94///
95/// ```no_run
96/// use drasi_lib::{DrasiLib, Query};
97///
98/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
99/// // Source and reaction instances are created externally by plugins
100/// // Ownership is transferred to DrasiLib when added
101/// // let my_source = my_source_plugin::create(...);
102/// // let my_reaction = my_reaction_plugin::create(...);
103///
104/// let core = DrasiLib::builder()
105///     .with_id("my-server")
106///     // .with_source(my_source)      // Ownership transferred
107///     // .with_reaction(my_reaction)  // Ownership transferred
108///     .with_query(
109///         Query::cypher("my-query")
110///             .query("MATCH (n) RETURN n")
111///             .from_source("my-source")
112///             .build()
113///     )
114///     .build()
115///     .await?;
116/// # Ok(())
117/// # }
118/// ```
119pub struct DrasiLibBuilder {
120    server_id: Option<String>,
121    priority_queue_capacity: Option<usize>,
122    dispatch_buffer_capacity: Option<usize>,
123    storage_backends: Vec<StorageBackendConfig>,
124    query_configs: Vec<QueryConfig>,
125    source_instances: Vec<(
126        Box<dyn SourceTrait>,
127        std::collections::HashMap<String, String>,
128    )>,
129    reaction_instances: Vec<(
130        Box<dyn ReactionTrait>,
131        std::collections::HashMap<String, String>,
132    )>,
133    index_provider: Option<Arc<dyn IndexBackendPlugin>>,
134    state_store_provider: Option<Arc<dyn StateStoreProvider>>,
135    identity_provider: Option<Arc<dyn IdentityProvider>>,
136}
137
138impl Default for DrasiLibBuilder {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl DrasiLibBuilder {
145    /// Create a new builder with default values.
146    pub fn new() -> Self {
147        Self {
148            server_id: None,
149            priority_queue_capacity: None,
150            dispatch_buffer_capacity: None,
151            storage_backends: Vec::new(),
152            query_configs: Vec::new(),
153            source_instances: Vec::new(),
154            reaction_instances: Vec::new(),
155            index_provider: None,
156            state_store_provider: None,
157            identity_provider: None,
158        }
159    }
160
161    /// Set the server ID.
162    pub fn with_id(mut self, id: impl Into<String>) -> Self {
163        self.server_id = Some(id.into());
164        self
165    }
166
167    /// Set the default priority queue capacity for components.
168    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
169        self.priority_queue_capacity = Some(capacity);
170        self
171    }
172
173    /// Set the default dispatch buffer capacity for components.
174    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
175        self.dispatch_buffer_capacity = Some(capacity);
176        self
177    }
178
179    /// Add a storage backend configuration.
180    pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
181        self.storage_backends.push(config);
182        self
183    }
184
185    /// Set the index backend provider for persistent storage.
186    ///
187    /// When using RocksDB or Redis/Garnet storage backends, you must provide
188    /// an index provider that implements `IndexBackendPlugin`. The provider
189    /// is responsible for creating the actual index instances.
190    ///
191    /// If no index provider is set, only in-memory storage backends can be used.
192    /// Attempting to use RocksDB or Redis backends without a provider will result
193    /// in an error.
194    ///
195    /// # Example
196    /// ```ignore
197    /// use drasi_index_rocksdb::RocksDbIndexProvider;
198    /// use std::sync::Arc;
199    ///
200    /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
201    /// let core = DrasiLib::builder()
202    ///     .with_index_provider(Arc::new(provider))
203    ///     .build()
204    ///     .await?;
205    /// ```
206    pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
207        self.index_provider = Some(provider);
208        self
209    }
210
211    /// Set the state store provider for plugin state persistence.
212    ///
213    /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
214    /// to store and retrieve runtime state that can persist across runs of DrasiLib.
215    ///
216    /// If no state store provider is set, the default in-memory provider will be used.
217    /// The in-memory provider does not persist state across restarts.
218    ///
219    /// # Example
220    /// ```ignore
221    /// use drasi_state_store_json::JsonStateStoreProvider;
222    /// use std::sync::Arc;
223    ///
224    /// let state_store = JsonStateStoreProvider::new("/data/state");
225    /// let core = DrasiLib::builder()
226    ///     .with_state_store_provider(Arc::new(state_store))
227    ///     .build()
228    ///     .await?;
229    /// ```
230    pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
231        self.state_store_provider = Some(provider);
232        self
233    }
234
235    /// Set the identity provider for credential injection.
236    ///
237    /// Identity providers supply authentication credentials (passwords, tokens,
238    /// certificates) to sources and reactions that need them for connecting to
239    /// external systems.
240    ///
241    /// If no identity provider is set, sources and reactions will receive `None`
242    /// for `context.identity_provider`.
243    ///
244    /// # Example
245    /// ```ignore
246    /// use drasi_identity_azure::AzureIdentityProvider;
247    /// use std::sync::Arc;
248    ///
249    /// let provider = AzureIdentityProvider::with_default_credentials("user@tenant.onmicrosoft.com")?;
250    /// let core = DrasiLib::builder()
251    ///     .with_identity_provider(Arc::new(provider))
252    ///     .build()
253    ///     .await?;
254    /// ```
255    pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
256        self.identity_provider = Some(provider);
257        self
258    }
259
260    /// Add a source instance, taking ownership.
261    ///
262    /// Source instances are created externally by plugins with their own typed configurations.
263    /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
264    ///
265    /// # Example
266    /// ```ignore
267    /// let source = MySource::new("my-source", config)?;
268    /// let core = DrasiLib::builder()
269    ///     .with_source(source)  // Ownership transferred
270    ///     .build()
271    ///     .await?;
272    /// ```
273    pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
274        self.source_instances
275            .push((Box::new(source), std::collections::HashMap::new()));
276        self
277    }
278
279    /// Add a source instance with additional component metadata.
280    ///
281    /// Like [`with_source`](Self::with_source) but merges `extra_metadata`
282    /// (e.g. `pluginId`) into the component graph node.
283    pub fn with_source_metadata(
284        mut self,
285        source: impl SourceTrait + 'static,
286        extra_metadata: std::collections::HashMap<String, String>,
287    ) -> Self {
288        self.source_instances
289            .push((Box::new(source), extra_metadata));
290        self
291    }
292
293    /// Add a query configuration.
294    pub fn with_query(mut self, config: QueryConfig) -> Self {
295        self.query_configs.push(config);
296        self
297    }
298
299    /// Add a reaction instance, taking ownership.
300    ///
301    /// Reaction instances are created externally by plugins with their own typed configurations.
302    /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
303    ///
304    /// # Example
305    /// ```ignore
306    /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
307    /// let core = DrasiLib::builder()
308    ///     .with_reaction(reaction)  // Ownership transferred
309    ///     .build()
310    ///     .await?;
311    /// ```
312    pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
313        self.reaction_instances
314            .push((Box::new(reaction), std::collections::HashMap::new()));
315        self
316    }
317
318    /// Add a reaction instance with additional component metadata.
319    ///
320    /// Like [`with_reaction`](Self::with_reaction) but merges `extra_metadata`
321    /// (e.g. `pluginId`) into the component graph node.
322    pub fn with_reaction_metadata(
323        mut self,
324        reaction: impl ReactionTrait + 'static,
325        extra_metadata: std::collections::HashMap<String, String>,
326    ) -> Self {
327        self.reaction_instances
328            .push((Box::new(reaction), extra_metadata));
329        self
330    }
331
332    /// Build the DrasiLib instance.
333    ///
334    /// This validates the configuration, creates all components, and initializes the server.
335    /// After building, you can call `start()` to begin processing.
336    pub async fn build(self) -> Result<DrasiLib> {
337        // Build the configuration
338        let config = DrasiLibConfig {
339            id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
340            priority_queue_capacity: self.priority_queue_capacity,
341            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
342            storage_backends: self.storage_backends,
343            queries: self.query_configs.clone(),
344        };
345
346        // Validate the configuration
347        config
348            .validate()
349            .map_err(|e| DrasiError::validation(e.to_string()))?;
350
351        // Create runtime config and server with optional index and state store providers
352        let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
353            config,
354            self.index_provider,
355            self.state_store_provider,
356            self.identity_provider,
357        ));
358        let mut core = DrasiLib::new(runtime_config);
359
360        // Inject state store before provisioning sources (they need it for initialization)
361        let state_store = core.config.state_store_provider.clone();
362        core.source_manager
363            .inject_state_store(state_store.clone())
364            .await;
365        core.reaction_manager.inject_state_store(state_store).await;
366
367        // Register the component graph source BEFORE initialize (which loads query config).
368        // Queries reference sources, so sources must exist in the graph first.
369        {
370            use crate::sources::component_graph_source::ComponentGraphSource;
371            let graph_source = ComponentGraphSource::new(
372                core.component_event_broadcast_tx.clone(),
373                core.config.id.clone(),
374                core.component_graph.clone(),
375            )
376            .map_err(|e| {
377                DrasiError::operation_failed(
378                    "source",
379                    "component-graph",
380                    "add",
381                    format!("Failed to create: {e}"),
382                )
383            })?;
384
385            let source_id = graph_source.id().to_string();
386            let source_type = graph_source.type_name().to_string();
387            {
388                let mut graph = core.component_graph.write().await;
389                let mut metadata = std::collections::HashMap::new();
390                metadata.insert("kind".to_string(), source_type);
391                metadata.insert(
392                    "autoStart".to_string(),
393                    graph_source.auto_start().to_string(),
394                );
395                graph.register_source(&source_id, metadata).map_err(|e| {
396                    DrasiError::operation_failed(
397                        "source",
398                        &source_id,
399                        "add",
400                        format!("Failed to register: {e}"),
401                    )
402                })?;
403            }
404            if let Err(e) = core.source_manager.provision_source(graph_source).await {
405                let mut graph = core.component_graph.write().await;
406                let _ = graph.deregister(&source_id);
407                return Err(DrasiError::operation_failed(
408                    "source",
409                    &source_id,
410                    "add",
411                    format!("Failed to provision: {e}"),
412                ));
413            }
414        }
415
416        // Inject pre-built source instances BEFORE initialize.
417        // Queries reference sources by ID, so sources must be in the graph first.
418        for (source, extra_metadata) in self.source_instances {
419            let source_id = source.id().to_string();
420            let source_type = source.type_name().to_string();
421            let auto_start = source.auto_start();
422
423            {
424                let mut graph = core.component_graph.write().await;
425                let mut metadata = std::collections::HashMap::new();
426                metadata.insert("kind".to_string(), source_type);
427                metadata.insert("autoStart".to_string(), auto_start.to_string());
428                metadata.extend(extra_metadata);
429                graph.register_source(&source_id, metadata).map_err(|e| {
430                    DrasiError::operation_failed(
431                        "source",
432                        &source_id,
433                        "add",
434                        format!("Failed to register: {e}"),
435                    )
436                })?;
437            }
438            if let Err(e) = core.source_manager.provision_source(source).await {
439                let mut graph = core.component_graph.write().await;
440                let _ = graph.deregister(&source_id);
441                return Err(DrasiError::operation_failed(
442                    "source",
443                    &source_id,
444                    "add",
445                    format!("Failed to provision: {e}"),
446                ));
447            }
448        }
449
450        // Initialize the server (loads query configurations — sources must already be registered)
451        core.initialize().await?;
452
453        // Inject pre-built reaction instances
454        for (reaction, extra_metadata) in self.reaction_instances {
455            let reaction_id = reaction.id().to_string();
456            let reaction_type = reaction.type_name().to_string();
457            let query_ids = reaction.query_ids();
458
459            // Register in graph first, then provision
460            {
461                let mut graph = core.component_graph.write().await;
462                let mut metadata = std::collections::HashMap::new();
463                metadata.insert("kind".to_string(), reaction_type);
464                metadata.extend(extra_metadata);
465                graph
466                    .register_reaction(&reaction_id, metadata, &query_ids)
467                    .map_err(|e| {
468                        DrasiError::operation_failed(
469                            "reaction",
470                            &reaction_id,
471                            "add",
472                            format!("Failed to register: {e}"),
473                        )
474                    })?;
475            }
476            if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
477                let mut graph = core.component_graph.write().await;
478                let _ = graph.deregister(&reaction_id);
479                return Err(DrasiError::operation_failed(
480                    "reaction",
481                    &reaction_id,
482                    "add",
483                    format!("Failed to provision: {e}"),
484                ));
485            }
486        }
487
488        // Register the identity provider in the component graph (if configured).
489        // This creates an IdentityProvider node with Authenticates edges to all
490        // sources and reactions that receive credentials from it.
491        if core.config.identity_provider.is_some() {
492            let mut graph = core.component_graph.write().await;
493            let component_ids: Vec<String> = graph
494                .list_by_kind(&crate::component_graph::ComponentKind::Source)
495                .into_iter()
496                .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
497                .map(|(id, _)| id)
498                .collect();
499
500            let mut metadata = std::collections::HashMap::new();
501            metadata.insert("kind".to_string(), "identity_provider".to_string());
502            graph
503                .register_identity_provider("identity-provider", metadata, &component_ids)
504                .map_err(|e| {
505                    DrasiError::operation_failed(
506                        "identity_provider",
507                        "identity-provider",
508                        "add",
509                        format!("Failed to register: {e}"),
510                    )
511                })?;
512        }
513
514        Ok(core)
515    }
516}
517
518// ============================================================================
519// Query Builder
520// ============================================================================
521
522/// Fluent builder for query configurations.
523///
524/// Use `Query::cypher()` or `Query::gql()` to get started.
525///
526/// # Example
527///
528/// ```no_run
529/// use drasi_lib::Query;
530///
531/// let query_config = Query::cypher("my-query")
532///     .query("MATCH (n:Person) RETURN n.name, n.age")
533///     .from_source("my-source")
534///     .auto_start(true)
535///     .build();
536/// ```
537pub struct Query {
538    id: String,
539    query: String,
540    query_language: QueryLanguage,
541    sources: Vec<SourceSubscriptionConfig>,
542    middleware: Vec<SourceMiddlewareConfig>,
543    auto_start: bool,
544    joins: Option<Vec<QueryJoinConfig>>,
545    enable_bootstrap: bool,
546    bootstrap_buffer_size: usize,
547    priority_queue_capacity: Option<usize>,
548    dispatch_buffer_capacity: Option<usize>,
549    dispatch_mode: Option<DispatchMode>,
550    storage_backend: Option<crate::indexes::StorageBackendRef>,
551    recovery_policy: Option<crate::recovery::RecoveryPolicy>,
552    outbox_capacity: usize,
553    bootstrap_timeout_secs: u64,
554}
555
556impl Query {
557    /// Create a new Cypher query builder.
558    pub fn cypher(id: impl Into<String>) -> Self {
559        Self {
560            id: id.into(),
561            query: String::new(),
562            query_language: QueryLanguage::Cypher,
563            sources: Vec::new(),
564            middleware: Vec::new(),
565            auto_start: true,
566            joins: None,
567            enable_bootstrap: true,
568            bootstrap_buffer_size: 10000,
569            priority_queue_capacity: None,
570            dispatch_buffer_capacity: None,
571            dispatch_mode: None,
572            storage_backend: None,
573            recovery_policy: None,
574            outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
575            bootstrap_timeout_secs: 300,
576        }
577    }
578
579    /// Create a new GQL (ISO 9074:2024) query builder.
580    pub fn gql(id: impl Into<String>) -> Self {
581        Self {
582            id: id.into(),
583            query: String::new(),
584            query_language: QueryLanguage::GQL,
585            sources: Vec::new(),
586            middleware: Vec::new(),
587            auto_start: true,
588            joins: None,
589            enable_bootstrap: true,
590            bootstrap_buffer_size: 10000,
591            priority_queue_capacity: None,
592            dispatch_buffer_capacity: None,
593            dispatch_mode: None,
594            storage_backend: None,
595            recovery_policy: None,
596            outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
597            bootstrap_timeout_secs: 300,
598        }
599    }
600
601    /// Set the query string.
602    pub fn query(mut self, query: impl Into<String>) -> Self {
603        self.query = query.into();
604        self
605    }
606
607    /// Subscribe to a source.
608    pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
609        self.sources.push(SourceSubscriptionConfig {
610            source_id: source_id.into(),
611            nodes: Vec::new(),
612            relations: Vec::new(),
613            pipeline: Vec::new(),
614        });
615        self
616    }
617
618    /// Subscribe to a source with a middleware pipeline.
619    ///
620    /// The pipeline is a list of middleware names (strings) that will be applied to
621    /// data from this source before it reaches the query.
622    pub fn from_source_with_pipeline(
623        mut self,
624        source_id: impl Into<String>,
625        pipeline: Vec<String>,
626    ) -> Self {
627        self.sources.push(SourceSubscriptionConfig {
628            source_id: source_id.into(),
629            nodes: Vec::new(),
630            relations: Vec::new(),
631            pipeline,
632        });
633        self
634    }
635
636    /// Add middleware to the query.
637    pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
638        self.middleware.push(middleware);
639        self
640    }
641
642    /// Set whether the query should auto-start.
643    pub fn auto_start(mut self, auto_start: bool) -> Self {
644        self.auto_start = auto_start;
645        self
646    }
647
648    /// Set the join configuration.
649    pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
650        self.joins = Some(joins);
651        self
652    }
653
654    /// Enable or disable bootstrap.
655    pub fn enable_bootstrap(mut self, enable: bool) -> Self {
656        self.enable_bootstrap = enable;
657        self
658    }
659
660    /// Set the bootstrap buffer size.
661    pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
662        self.bootstrap_buffer_size = size;
663        self
664    }
665
666    /// Set the priority queue capacity.
667    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
668        self.priority_queue_capacity = Some(capacity);
669        self
670    }
671
672    /// Set the dispatch buffer capacity.
673    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
674        self.dispatch_buffer_capacity = Some(capacity);
675        self
676    }
677
678    /// Set the dispatch mode.
679    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
680        self.dispatch_mode = Some(mode);
681        self
682    }
683
684    /// Set the storage backend reference.
685    pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
686        self.storage_backend = Some(backend);
687        self
688    }
689
690    /// Set the recovery policy. Applies only to queries with a persistent
691    /// storage backend. See [`RecoveryPolicy`](crate::RecoveryPolicy).
692    pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
693        self.recovery_policy = Some(policy);
694        self
695    }
696
697    /// Set the outbox capacity (number of recent QueryResult emissions retained).
698    /// Default: 1000.
699    pub fn with_outbox_capacity(mut self, capacity: usize) -> Self {
700        self.outbox_capacity = capacity;
701        self
702    }
703
704    /// Set the bootstrap timeout in seconds.
705    /// This controls how long `fetch_snapshot` / `fetch_outbox` will wait for
706    /// the query to finish bootstrapping before returning `FetchError::TimedOut`.
707    /// Default: 300 (5 minutes).
708    pub fn with_bootstrap_timeout_secs(mut self, secs: u64) -> Self {
709        self.bootstrap_timeout_secs = secs;
710        self
711    }
712
713    /// Build the query configuration.
714    pub fn build(self) -> QueryConfig {
715        QueryConfig {
716            id: self.id,
717            query: self.query,
718            query_language: self.query_language,
719            sources: self.sources,
720            middleware: self.middleware,
721            auto_start: self.auto_start,
722            joins: self.joins,
723            enable_bootstrap: self.enable_bootstrap,
724            bootstrap_buffer_size: self.bootstrap_buffer_size,
725            priority_queue_capacity: self.priority_queue_capacity,
726            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
727            dispatch_mode: self.dispatch_mode,
728            storage_backend: self.storage_backend,
729            recovery_policy: self.recovery_policy,
730            outbox_capacity: self.outbox_capacity,
731            bootstrap_timeout_secs: self.bootstrap_timeout_secs,
732        }
733    }
734}
735
736// ============================================================================
737// Tests
738// ============================================================================
739
740#[cfg(test)]
741mod tests {
742    use super::*;
743    use crate::DrasiLib;
744
745    // ==========================================================================
746    // Query Builder Tests
747    // ==========================================================================
748
749    #[test]
750    fn test_query_builder_cypher() {
751        let config = Query::cypher("test-query")
752            .query("MATCH (n) RETURN n")
753            .from_source("source1")
754            .auto_start(false)
755            .build();
756
757        assert_eq!(config.id, "test-query");
758        assert_eq!(config.query, "MATCH (n) RETURN n");
759        assert_eq!(config.query_language, QueryLanguage::Cypher);
760        assert!(!config.auto_start);
761        assert_eq!(config.sources.len(), 1);
762        assert_eq!(config.sources[0].source_id, "source1");
763    }
764
765    #[test]
766    fn test_query_builder_gql() {
767        let config = Query::gql("test-query")
768            .query("MATCH (n:Person) RETURN n.name")
769            .from_source("source1")
770            .build();
771
772        assert_eq!(config.query_language, QueryLanguage::GQL);
773    }
774
775    #[test]
776    fn test_query_builder_multiple_sources() {
777        let config = Query::cypher("test-query")
778            .query("MATCH (n) RETURN n")
779            .from_source("source1")
780            .from_source("source2")
781            .build();
782
783        assert_eq!(config.sources.len(), 2);
784    }
785
786    #[tokio::test]
787    async fn test_drasi_lib_builder_empty() {
788        let core = DrasiLibBuilder::new().build().await.unwrap();
789
790        assert!(!core.is_running().await);
791    }
792
793    #[tokio::test]
794    async fn test_drasi_lib_builder_with_id() {
795        let core = DrasiLibBuilder::new()
796            .with_id("test-server")
797            .build()
798            .await
799            .unwrap();
800
801        assert_eq!(core.get_config().id, "test-server");
802    }
803
804    #[tokio::test]
805    async fn test_drasi_lib_builder_with_query_no_source() {
806        // Test builder with query configuration that has no source subscriptions
807        // In the instance-based approach, sources are added after build()
808        let core = DrasiLibBuilder::new()
809            .with_id("test-server")
810            .with_query(
811                Query::cypher("query1")
812                    .query("MATCH (n) RETURN n")
813                    // No from_source() call - query has no source subscriptions
814                    .auto_start(false)
815                    .build(),
816            )
817            .build()
818            .await
819            .unwrap();
820
821        let queries = core.list_queries().await.unwrap();
822        assert_eq!(queries.len(), 1);
823    }
824
825    // ==========================================================================
826    // DrasiLib Builder Integration Tests (from builder_tests.rs)
827    // ==========================================================================
828
829    #[tokio::test]
830    async fn test_builder_creates_initialized_server() {
831        let core = DrasiLib::builder().with_id("builder-test").build().await;
832
833        assert!(core.is_ok(), "Builder should create initialized server");
834        let core = core.unwrap();
835        assert!(
836            core.state_guard.is_initialized(),
837            "Server should be initialized"
838        );
839    }
840
841    #[tokio::test]
842    async fn test_builder_with_query() {
843        // In the instance-based approach, sources and reactions are added as instances
844        // after the builder creates the core. Here we just test query config addition.
845        // Source must be registered before a query can reference it
846        let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
847        let core = DrasiLib::builder()
848            .with_id("complex-server")
849            .with_source(source)
850            .with_query(
851                Query::cypher("query1")
852                    .query("MATCH (n) RETURN n")
853                    .from_source("source1")
854                    .build(),
855            )
856            .build()
857            .await;
858
859        assert!(core.is_ok(), "Builder with query should succeed");
860        let core = core.unwrap();
861        assert!(core.state_guard.is_initialized());
862        assert_eq!(core.config.queries.len(), 1);
863    }
864
865    // ==========================================================================
866    // DrasiLibBuilder Unit Tests
867    // ==========================================================================
868
869    #[test]
870    fn test_builder_with_id_sets_id() {
871        let builder = DrasiLibBuilder::new().with_id("my-server");
872        assert_eq!(builder.server_id, Some("my-server".to_string()));
873    }
874
875    #[test]
876    fn test_builder_with_id_accepts_string() {
877        let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
878        assert_eq!(builder.server_id, Some("owned-id".to_string()));
879    }
880
881    #[test]
882    fn test_builder_with_priority_queue_capacity() {
883        let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
884        assert_eq!(builder.priority_queue_capacity, Some(50000));
885    }
886
887    #[test]
888    fn test_builder_with_dispatch_buffer_capacity() {
889        let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
890        assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
891    }
892
893    #[test]
894    fn test_builder_with_query_adds_to_list() {
895        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
896        let builder = DrasiLibBuilder::new().with_query(q);
897        assert_eq!(builder.query_configs.len(), 1);
898        assert_eq!(builder.query_configs[0].id, "q1");
899    }
900
901    #[test]
902    fn test_builder_with_multiple_queries() {
903        let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
904        let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
905        let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
906        assert_eq!(builder.query_configs.len(), 2);
907        assert_eq!(builder.query_configs[0].id, "q1");
908        assert_eq!(builder.query_configs[1].id, "q2");
909    }
910
911    #[test]
912    fn test_builder_add_storage_backend() {
913        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
914
915        let backend = StorageBackendConfig {
916            id: "mem1".to_string(),
917            spec: StorageBackendSpec::Memory {
918                enable_archive: false,
919            },
920        };
921        let builder = DrasiLibBuilder::new().add_storage_backend(backend);
922        assert_eq!(builder.storage_backends.len(), 1);
923        assert_eq!(builder.storage_backends[0].id, "mem1");
924    }
925
926    #[test]
927    fn test_builder_add_multiple_storage_backends() {
928        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
929
930        let b1 = StorageBackendConfig {
931            id: "mem1".to_string(),
932            spec: StorageBackendSpec::Memory {
933                enable_archive: false,
934            },
935        };
936        let b2 = StorageBackendConfig {
937            id: "mem2".to_string(),
938            spec: StorageBackendSpec::Memory {
939                enable_archive: true,
940            },
941        };
942        let builder = DrasiLibBuilder::new()
943            .add_storage_backend(b1)
944            .add_storage_backend(b2);
945        assert_eq!(builder.storage_backends.len(), 2);
946        assert_eq!(builder.storage_backends[0].id, "mem1");
947        assert_eq!(builder.storage_backends[1].id, "mem2");
948    }
949
950    #[test]
951    fn test_builder_default_values() {
952        let builder = DrasiLibBuilder::new();
953        assert_eq!(builder.server_id, None);
954        assert_eq!(builder.priority_queue_capacity, None);
955        assert_eq!(builder.dispatch_buffer_capacity, None);
956        assert!(builder.storage_backends.is_empty());
957        assert!(builder.query_configs.is_empty());
958        assert!(builder.source_instances.is_empty());
959        assert!(builder.reaction_instances.is_empty());
960        assert!(builder.index_provider.is_none());
961        assert!(builder.state_store_provider.is_none());
962    }
963
964    #[test]
965    fn test_builder_fluent_chaining() {
966        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
967
968        let backend = StorageBackendConfig {
969            id: "mem".to_string(),
970            spec: StorageBackendSpec::Memory {
971                enable_archive: false,
972            },
973        };
974        let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
975
976        let builder = DrasiLibBuilder::new()
977            .with_id("chained")
978            .with_priority_queue_capacity(20000)
979            .with_dispatch_buffer_capacity(3000)
980            .add_storage_backend(backend)
981            .with_query(q);
982
983        assert_eq!(builder.server_id, Some("chained".to_string()));
984        assert_eq!(builder.priority_queue_capacity, Some(20000));
985        assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
986        assert_eq!(builder.storage_backends.len(), 1);
987        assert_eq!(builder.query_configs.len(), 1);
988    }
989
990    #[tokio::test]
991    async fn test_builder_default_id_when_none_set() {
992        let core = DrasiLibBuilder::new().build().await.unwrap();
993        assert_eq!(core.get_config().id, "drasi-lib");
994    }
995
996    #[tokio::test]
997    async fn test_builder_with_storage_backend_builds_ok() {
998        use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
999
1000        let backend = StorageBackendConfig {
1001            id: "test-mem".to_string(),
1002            spec: StorageBackendSpec::Memory {
1003                enable_archive: false,
1004            },
1005        };
1006        let core = DrasiLibBuilder::new()
1007            .add_storage_backend(backend)
1008            .build()
1009            .await;
1010        assert!(core.is_ok(), "Builder with storage backend should succeed");
1011    }
1012
1013    // ==========================================================================
1014    // Query Builder Unit Tests
1015    // ==========================================================================
1016
1017    #[test]
1018    fn test_query_cypher_sets_id_and_language() {
1019        let q = Query::cypher("cypher-q");
1020        assert_eq!(q.id, "cypher-q");
1021        assert_eq!(q.query_language, QueryLanguage::Cypher);
1022    }
1023
1024    #[test]
1025    fn test_query_gql_sets_id_and_language() {
1026        let q = Query::gql("gql-q");
1027        assert_eq!(q.id, "gql-q");
1028        assert_eq!(q.query_language, QueryLanguage::GQL);
1029    }
1030
1031    #[test]
1032    fn test_query_from_source_adds_source() {
1033        let q = Query::cypher("q").from_source("src1");
1034        assert_eq!(q.sources.len(), 1);
1035        assert_eq!(q.sources[0].source_id, "src1");
1036    }
1037
1038    #[test]
1039    fn test_query_from_source_chaining() {
1040        let q = Query::cypher("q")
1041            .from_source("src1")
1042            .from_source("src2")
1043            .from_source("src3");
1044        assert_eq!(q.sources.len(), 3);
1045        assert_eq!(q.sources[0].source_id, "src1");
1046        assert_eq!(q.sources[1].source_id, "src2");
1047        assert_eq!(q.sources[2].source_id, "src3");
1048    }
1049
1050    #[test]
1051    fn test_query_auto_start_default_true() {
1052        let q = Query::cypher("q");
1053        assert!(q.auto_start);
1054    }
1055
1056    #[test]
1057    fn test_query_auto_start_false() {
1058        let q = Query::cypher("q").auto_start(false);
1059        assert!(!q.auto_start);
1060    }
1061
1062    #[test]
1063    fn test_query_enable_bootstrap_default_true() {
1064        let q = Query::cypher("q");
1065        assert!(q.enable_bootstrap);
1066    }
1067
1068    #[test]
1069    fn test_query_enable_bootstrap_false() {
1070        let q = Query::cypher("q").enable_bootstrap(false);
1071        assert!(!q.enable_bootstrap);
1072    }
1073
1074    #[test]
1075    fn test_query_bootstrap_buffer_size_default() {
1076        let q = Query::cypher("q");
1077        assert_eq!(q.bootstrap_buffer_size, 10000);
1078    }
1079
1080    #[test]
1081    fn test_query_with_bootstrap_buffer_size() {
1082        let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1083        assert_eq!(q.bootstrap_buffer_size, 5000);
1084    }
1085
1086    #[test]
1087    fn test_query_with_dispatch_mode_broadcast() {
1088        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1089        assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1090    }
1091
1092    #[test]
1093    fn test_query_with_dispatch_mode_channel() {
1094        let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1095        assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1096    }
1097
1098    #[test]
1099    fn test_query_dispatch_mode_default_none() {
1100        let q = Query::cypher("q");
1101        assert_eq!(q.dispatch_mode, None);
1102    }
1103
1104    #[test]
1105    fn test_query_with_priority_queue_capacity() {
1106        let q = Query::cypher("q").with_priority_queue_capacity(50000);
1107        assert_eq!(q.priority_queue_capacity, Some(50000));
1108    }
1109
1110    #[test]
1111    fn test_query_priority_queue_capacity_default_none() {
1112        let q = Query::cypher("q");
1113        assert_eq!(q.priority_queue_capacity, None);
1114    }
1115
1116    #[test]
1117    fn test_query_with_dispatch_buffer_capacity() {
1118        let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1119        assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1120    }
1121
1122    #[test]
1123    fn test_query_dispatch_buffer_capacity_default_none() {
1124        let q = Query::cypher("q");
1125        assert_eq!(q.dispatch_buffer_capacity, None);
1126    }
1127
1128    #[test]
1129    fn test_query_build_propagates_all_fields() {
1130        let config = Query::cypher("full-query")
1131            .query("MATCH (n:Person) RETURN n.name")
1132            .from_source("source-a")
1133            .from_source("source-b")
1134            .auto_start(false)
1135            .enable_bootstrap(false)
1136            .with_bootstrap_buffer_size(5000)
1137            .with_priority_queue_capacity(50000)
1138            .with_dispatch_buffer_capacity(2500)
1139            .with_dispatch_mode(DispatchMode::Broadcast)
1140            .build();
1141
1142        assert_eq!(config.id, "full-query");
1143        assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1144        assert_eq!(config.query_language, QueryLanguage::Cypher);
1145        assert_eq!(config.sources.len(), 2);
1146        assert_eq!(config.sources[0].source_id, "source-a");
1147        assert_eq!(config.sources[1].source_id, "source-b");
1148        assert!(!config.auto_start);
1149        assert!(!config.enable_bootstrap);
1150        assert_eq!(config.bootstrap_buffer_size, 5000);
1151        assert_eq!(config.priority_queue_capacity, Some(50000));
1152        assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1153        assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1154        assert!(config.joins.is_none());
1155        assert!(config.middleware.is_empty());
1156        assert!(config.storage_backend.is_none());
1157    }
1158
1159    #[test]
1160    fn test_query_build_gql_propagates_language() {
1161        let config = Query::gql("gql-full")
1162            .query("MATCH (n) RETURN n")
1163            .from_source("src")
1164            .build();
1165
1166        assert_eq!(config.id, "gql-full");
1167        assert_eq!(config.query_language, QueryLanguage::GQL);
1168        assert_eq!(config.query, "MATCH (n) RETURN n");
1169        assert_eq!(config.sources.len(), 1);
1170        // Verify defaults are preserved through build
1171        assert!(config.auto_start);
1172        assert!(config.enable_bootstrap);
1173        assert_eq!(config.bootstrap_buffer_size, 10000);
1174    }
1175
1176    #[test]
1177    fn test_query_build_defaults() {
1178        let config = Query::cypher("defaults-only").build();
1179
1180        assert_eq!(config.id, "defaults-only");
1181        assert_eq!(config.query, "");
1182        assert_eq!(config.query_language, QueryLanguage::Cypher);
1183        assert!(config.sources.is_empty());
1184        assert!(config.middleware.is_empty());
1185        assert!(config.auto_start);
1186        assert!(config.joins.is_none());
1187        assert!(config.enable_bootstrap);
1188        assert_eq!(config.bootstrap_buffer_size, 10000);
1189        assert_eq!(config.priority_queue_capacity, None);
1190        assert_eq!(config.dispatch_buffer_capacity, None);
1191        assert_eq!(config.dispatch_mode, None);
1192        assert!(config.storage_backend.is_none());
1193    }
1194}