drasi_lib/builder.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fluent builders for DrasiLib and its components.
16//!
17//! This module provides the builder pattern for constructing DrasiLib instances
18//! and their components in a type-safe, ergonomic way.
19//!
20//! # Overview
21//!
22//! - [`DrasiLibBuilder`] - Main builder for creating a DrasiLib instance
23//! - [`Query`] - Builder for query configurations
24//!
25//! # Plugin Architecture
26//!
27//! **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
28//! reactions are created externally as fully-configured instances implementing
29//! `Source` and `Reaction` traits, then passed to DrasiLibBuilder via
30//! `with_source()` and `with_reaction()`.
31//!
32//! # Examples
33//!
34//! ## Basic Usage with Pre-built Instances
35//!
36//! ```no_run
37//! use drasi_lib::{DrasiLib, Query};
38//!
39//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40//! // Source and reaction instances are created externally by plugins
41//! // Ownership is transferred to DrasiLib when added
42//! // let my_source = my_source_plugin::create(...);
43//! // let my_reaction = my_reaction_plugin::create(...);
44//!
45//! let core = DrasiLib::builder()
46//! .with_id("my-server")
47//! // .with_source(my_source) // Ownership transferred
48//! // .with_reaction(my_reaction) // Ownership transferred
49//! .with_query(
50//! Query::cypher("my-query")
51//! .query("MATCH (n:Person) RETURN n")
52//! .from_source("events")
53//! .build()
54//! )
55//! .build()
56//! .await?;
57//!
58//! core.start().await?;
59//! # Ok(())
60//! # }
61//! ```
62
63use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67 DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::indexes::IndexBackendPlugin;
71use crate::indexes::StorageBackendConfig;
72use crate::lib_core::DrasiLib;
73use crate::reactions::Reaction as ReactionTrait;
74use crate::sources::Source as SourceTrait;
75use crate::state_store::StateStoreProvider;
76use drasi_core::models::SourceMiddlewareConfig;
77
78// ============================================================================
79// DrasiLibBuilder
80// ============================================================================
81
82/// Fluent builder for creating DrasiLib instances.
83///
84/// Use `DrasiLib::builder()` to get started.
85///
86/// # Plugin Architecture
87///
88/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
89/// reactions are created externally as fully-configured instances implementing
90/// `Source` and `Reaction` traits, then passed via `with_source()` and `with_reaction()`.
91///
92/// # Example
93///
94/// ```no_run
95/// use drasi_lib::{DrasiLib, Query};
96///
97/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98/// // Source and reaction instances are created externally by plugins
99/// // Ownership is transferred to DrasiLib when added
100/// // let my_source = my_source_plugin::create(...);
101/// // let my_reaction = my_reaction_plugin::create(...);
102///
103/// let core = DrasiLib::builder()
104/// .with_id("my-server")
105/// // .with_source(my_source) // Ownership transferred
106/// // .with_reaction(my_reaction) // Ownership transferred
107/// .with_query(
108/// Query::cypher("my-query")
109/// .query("MATCH (n) RETURN n")
110/// .from_source("my-source")
111/// .build()
112/// )
113/// .build()
114/// .await?;
115/// # Ok(())
116/// # }
117/// ```
118pub struct DrasiLibBuilder {
119 server_id: Option<String>,
120 priority_queue_capacity: Option<usize>,
121 dispatch_buffer_capacity: Option<usize>,
122 storage_backends: Vec<StorageBackendConfig>,
123 query_configs: Vec<QueryConfig>,
124 source_instances: Vec<Box<dyn SourceTrait>>,
125 reaction_instances: Vec<Box<dyn ReactionTrait>>,
126 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
127 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
128}
129
130impl Default for DrasiLibBuilder {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136impl DrasiLibBuilder {
137 /// Create a new builder with default values.
138 pub fn new() -> Self {
139 Self {
140 server_id: None,
141 priority_queue_capacity: None,
142 dispatch_buffer_capacity: None,
143 storage_backends: Vec::new(),
144 query_configs: Vec::new(),
145 source_instances: Vec::new(),
146 reaction_instances: Vec::new(),
147 index_provider: None,
148 state_store_provider: None,
149 }
150 }
151
152 /// Set the server ID.
153 pub fn with_id(mut self, id: impl Into<String>) -> Self {
154 self.server_id = Some(id.into());
155 self
156 }
157
158 /// Set the default priority queue capacity for components.
159 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
160 self.priority_queue_capacity = Some(capacity);
161 self
162 }
163
164 /// Set the default dispatch buffer capacity for components.
165 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
166 self.dispatch_buffer_capacity = Some(capacity);
167 self
168 }
169
170 /// Add a storage backend configuration.
171 pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
172 self.storage_backends.push(config);
173 self
174 }
175
176 /// Set the index backend provider for persistent storage.
177 ///
178 /// When using RocksDB or Redis/Garnet storage backends, you must provide
179 /// an index provider that implements `IndexBackendPlugin`. The provider
180 /// is responsible for creating the actual index instances.
181 ///
182 /// If no index provider is set, only in-memory storage backends can be used.
183 /// Attempting to use RocksDB or Redis backends without a provider will result
184 /// in an error.
185 ///
186 /// # Example
187 /// ```ignore
188 /// use drasi_index_rocksdb::RocksDbIndexProvider;
189 /// use std::sync::Arc;
190 ///
191 /// let provider = RocksDbIndexProvider::new("/data/drasi", true, false);
192 /// let core = DrasiLib::builder()
193 /// .with_index_provider(Arc::new(provider))
194 /// .build()
195 /// .await?;
196 /// ```
197 pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
198 self.index_provider = Some(provider);
199 self
200 }
201
202 /// Set the state store provider for plugin state persistence.
203 ///
204 /// State store providers allow plugins (Sources, BootstrapProviders, and Reactions)
205 /// to store and retrieve runtime state that can persist across runs of DrasiLib.
206 ///
207 /// If no state store provider is set, the default in-memory provider will be used.
208 /// The in-memory provider does not persist state across restarts.
209 ///
210 /// # Example
211 /// ```ignore
212 /// use drasi_state_store_json::JsonStateStoreProvider;
213 /// use std::sync::Arc;
214 ///
215 /// let state_store = JsonStateStoreProvider::new("/data/state");
216 /// let core = DrasiLib::builder()
217 /// .with_state_store_provider(Arc::new(state_store))
218 /// .build()
219 /// .await?;
220 /// ```
221 pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
222 self.state_store_provider = Some(provider);
223 self
224 }
225
226 /// Add a source instance, taking ownership.
227 ///
228 /// Source instances are created externally by plugins with their own typed configurations.
229 /// drasi-lib only knows about the `Source` trait - it has no knowledge of which plugins exist.
230 ///
231 /// # Example
232 /// ```ignore
233 /// let source = MySource::new("my-source", config)?;
234 /// let core = DrasiLib::builder()
235 /// .with_source(source) // Ownership transferred
236 /// .build()
237 /// .await?;
238 /// ```
239 pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
240 self.source_instances.push(Box::new(source));
241 self
242 }
243
244 /// Add a query configuration.
245 pub fn with_query(mut self, config: QueryConfig) -> Self {
246 self.query_configs.push(config);
247 self
248 }
249
250 /// Add a reaction instance, taking ownership.
251 ///
252 /// Reaction instances are created externally by plugins with their own typed configurations.
253 /// drasi-lib only knows about the `Reaction` trait - it has no knowledge of which plugins exist.
254 ///
255 /// # Example
256 /// ```ignore
257 /// let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
258 /// let core = DrasiLib::builder()
259 /// .with_reaction(reaction) // Ownership transferred
260 /// .build()
261 /// .await?;
262 /// ```
263 pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
264 self.reaction_instances.push(Box::new(reaction));
265 self
266 }
267
268 /// Build the DrasiLib instance.
269 ///
270 /// This validates the configuration, creates all components, and initializes the server.
271 /// After building, you can call `start()` to begin processing.
272 pub async fn build(self) -> Result<DrasiLib> {
273 // Build the configuration
274 let config = DrasiLibConfig {
275 id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
276 priority_queue_capacity: self.priority_queue_capacity,
277 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
278 storage_backends: self.storage_backends,
279 queries: self.query_configs.clone(),
280 };
281
282 // Validate the configuration
283 config
284 .validate()
285 .map_err(|e| DrasiError::startup_validation(e.to_string()))?;
286
287 // Create runtime config and server with optional index and state store providers
288 let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
289 config,
290 self.index_provider,
291 self.state_store_provider,
292 ));
293 let mut core = DrasiLib::new(runtime_config);
294
295 // Initialize the server
296 core.initialize().await?;
297
298 // Inject pre-built source instances
299 for source in self.source_instances {
300 let source_id = source.id().to_string();
301 core.source_manager.add_source(source).await.map_err(|e| {
302 DrasiError::provisioning(format!(
303 "Failed to add source instance '{source_id}': {e}"
304 ))
305 })?;
306 }
307
308 // Inject pre-built reaction instances
309 for reaction in self.reaction_instances {
310 let reaction_id = reaction.id().to_string();
311 core.reaction_manager
312 .add_reaction(reaction)
313 .await
314 .map_err(|e| {
315 DrasiError::provisioning(format!(
316 "Failed to add reaction instance '{reaction_id}': {e}"
317 ))
318 })?;
319 }
320
321 Ok(core)
322 }
323}
324
325// ============================================================================
326// Query Builder
327// ============================================================================
328
329/// Fluent builder for query configurations.
330///
331/// Use `Query::cypher()` or `Query::gql()` to get started.
332///
333/// # Example
334///
335/// ```no_run
336/// use drasi_lib::Query;
337///
338/// let query_config = Query::cypher("my-query")
339/// .query("MATCH (n:Person) RETURN n.name, n.age")
340/// .from_source("my-source")
341/// .auto_start(true)
342/// .build();
343/// ```
344pub struct Query {
345 id: String,
346 query: String,
347 query_language: QueryLanguage,
348 sources: Vec<SourceSubscriptionConfig>,
349 middleware: Vec<SourceMiddlewareConfig>,
350 auto_start: bool,
351 joins: Option<Vec<QueryJoinConfig>>,
352 enable_bootstrap: bool,
353 bootstrap_buffer_size: usize,
354 priority_queue_capacity: Option<usize>,
355 dispatch_buffer_capacity: Option<usize>,
356 dispatch_mode: Option<DispatchMode>,
357 storage_backend: Option<crate::indexes::StorageBackendRef>,
358}
359
360impl Query {
361 /// Create a new Cypher query builder.
362 pub fn cypher(id: impl Into<String>) -> Self {
363 Self {
364 id: id.into(),
365 query: String::new(),
366 query_language: QueryLanguage::Cypher,
367 sources: Vec::new(),
368 middleware: Vec::new(),
369 auto_start: true,
370 joins: None,
371 enable_bootstrap: true,
372 bootstrap_buffer_size: 10000,
373 priority_queue_capacity: None,
374 dispatch_buffer_capacity: None,
375 dispatch_mode: None,
376 storage_backend: None,
377 }
378 }
379
380 /// Create a new GQL query builder.
381 pub fn gql(id: impl Into<String>) -> Self {
382 Self {
383 id: id.into(),
384 query: String::new(),
385 query_language: QueryLanguage::GQL,
386 sources: Vec::new(),
387 middleware: Vec::new(),
388 auto_start: true,
389 joins: None,
390 enable_bootstrap: true,
391 bootstrap_buffer_size: 10000,
392 priority_queue_capacity: None,
393 dispatch_buffer_capacity: None,
394 dispatch_mode: None,
395 storage_backend: None,
396 }
397 }
398
399 /// Set the query string.
400 pub fn query(mut self, query: impl Into<String>) -> Self {
401 self.query = query.into();
402 self
403 }
404
405 /// Subscribe to a source.
406 pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
407 self.sources.push(SourceSubscriptionConfig {
408 source_id: source_id.into(),
409 nodes: Vec::new(),
410 relations: Vec::new(),
411 pipeline: Vec::new(),
412 });
413 self
414 }
415
416 /// Subscribe to a source with a middleware pipeline.
417 ///
418 /// The pipeline is a list of middleware names (strings) that will be applied to
419 /// data from this source before it reaches the query.
420 pub fn from_source_with_pipeline(
421 mut self,
422 source_id: impl Into<String>,
423 pipeline: Vec<String>,
424 ) -> Self {
425 self.sources.push(SourceSubscriptionConfig {
426 source_id: source_id.into(),
427 nodes: Vec::new(),
428 relations: Vec::new(),
429 pipeline,
430 });
431 self
432 }
433
434 /// Add middleware to the query.
435 pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
436 self.middleware.push(middleware);
437 self
438 }
439
440 /// Set whether the query should auto-start.
441 pub fn auto_start(mut self, auto_start: bool) -> Self {
442 self.auto_start = auto_start;
443 self
444 }
445
446 /// Set the join configuration.
447 pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
448 self.joins = Some(joins);
449 self
450 }
451
452 /// Enable or disable bootstrap.
453 pub fn enable_bootstrap(mut self, enable: bool) -> Self {
454 self.enable_bootstrap = enable;
455 self
456 }
457
458 /// Set the bootstrap buffer size.
459 pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
460 self.bootstrap_buffer_size = size;
461 self
462 }
463
464 /// Set the priority queue capacity.
465 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
466 self.priority_queue_capacity = Some(capacity);
467 self
468 }
469
470 /// Set the dispatch buffer capacity.
471 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
472 self.dispatch_buffer_capacity = Some(capacity);
473 self
474 }
475
476 /// Set the dispatch mode.
477 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
478 self.dispatch_mode = Some(mode);
479 self
480 }
481
482 /// Set the storage backend reference.
483 pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
484 self.storage_backend = Some(backend);
485 self
486 }
487
488 /// Build the query configuration.
489 pub fn build(self) -> QueryConfig {
490 QueryConfig {
491 id: self.id,
492 query: self.query,
493 query_language: self.query_language,
494 sources: self.sources,
495 middleware: self.middleware,
496 auto_start: self.auto_start,
497 joins: self.joins,
498 enable_bootstrap: self.enable_bootstrap,
499 bootstrap_buffer_size: self.bootstrap_buffer_size,
500 priority_queue_capacity: self.priority_queue_capacity,
501 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
502 dispatch_mode: self.dispatch_mode,
503 storage_backend: self.storage_backend,
504 }
505 }
506}
507
508// ============================================================================
509// Tests
510// ============================================================================
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use crate::DrasiLib;
516
517 // ==========================================================================
518 // Query Builder Tests
519 // ==========================================================================
520
521 #[test]
522 fn test_query_builder_cypher() {
523 let config = Query::cypher("test-query")
524 .query("MATCH (n) RETURN n")
525 .from_source("source1")
526 .auto_start(false)
527 .build();
528
529 assert_eq!(config.id, "test-query");
530 assert_eq!(config.query, "MATCH (n) RETURN n");
531 assert_eq!(config.query_language, QueryLanguage::Cypher);
532 assert!(!config.auto_start);
533 assert_eq!(config.sources.len(), 1);
534 assert_eq!(config.sources[0].source_id, "source1");
535 }
536
537 #[test]
538 fn test_query_builder_gql() {
539 let config = Query::gql("test-query")
540 .query("MATCH (n:Person) RETURN n.name")
541 .from_source("source1")
542 .build();
543
544 assert_eq!(config.query_language, QueryLanguage::GQL);
545 }
546
547 #[test]
548 fn test_query_builder_multiple_sources() {
549 let config = Query::cypher("test-query")
550 .query("MATCH (n) RETURN n")
551 .from_source("source1")
552 .from_source("source2")
553 .build();
554
555 assert_eq!(config.sources.len(), 2);
556 }
557
558 #[tokio::test]
559 async fn test_drasi_lib_builder_empty() {
560 let core = DrasiLibBuilder::new().build().await.unwrap();
561
562 assert!(!core.is_running().await);
563 }
564
565 #[tokio::test]
566 async fn test_drasi_lib_builder_with_id() {
567 let core = DrasiLibBuilder::new()
568 .with_id("test-server")
569 .build()
570 .await
571 .unwrap();
572
573 assert_eq!(core.get_config().id, "test-server");
574 }
575
576 #[tokio::test]
577 async fn test_drasi_lib_builder_with_query_no_source() {
578 // Test builder with query configuration that has no source subscriptions
579 // In the instance-based approach, sources are added after build()
580 let core = DrasiLibBuilder::new()
581 .with_id("test-server")
582 .with_query(
583 Query::cypher("query1")
584 .query("MATCH (n) RETURN n")
585 // No from_source() call - query has no source subscriptions
586 .auto_start(false)
587 .build(),
588 )
589 .build()
590 .await
591 .unwrap();
592
593 let queries = core.list_queries().await.unwrap();
594 assert_eq!(queries.len(), 1);
595 }
596
597 // ==========================================================================
598 // DrasiLib Builder Integration Tests (from builder_tests.rs)
599 // ==========================================================================
600
601 #[tokio::test]
602 async fn test_builder_creates_initialized_server() {
603 let core = DrasiLib::builder().with_id("builder-test").build().await;
604
605 assert!(core.is_ok(), "Builder should create initialized server");
606 let core = core.unwrap();
607 assert!(
608 core.state_guard.is_initialized().await,
609 "Server should be initialized"
610 );
611 }
612
613 #[tokio::test]
614 async fn test_builder_with_query() {
615 // In the instance-based approach, sources and reactions are added as instances
616 // after the builder creates the core. Here we just test query config addition.
617 let core = DrasiLib::builder()
618 .with_id("complex-server")
619 .with_query(
620 Query::cypher("query1")
621 .query("MATCH (n) RETURN n")
622 .from_source("source1")
623 .build(),
624 )
625 .build()
626 .await;
627
628 assert!(core.is_ok(), "Builder with query should succeed");
629 let core = core.unwrap();
630 assert!(core.state_guard.is_initialized().await);
631 assert_eq!(core.config.queries.len(), 1);
632 }
633}