1use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67 DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::sources::Source as SourceTrait;
76use crate::state_store::StateStoreProvider;
77use drasi_core::models::SourceMiddlewareConfig;
78
79pub struct DrasiLibBuilder {
120 server_id: Option<String>,
121 priority_queue_capacity: Option<usize>,
122 dispatch_buffer_capacity: Option<usize>,
123 storage_backends: Vec<StorageBackendConfig>,
124 query_configs: Vec<QueryConfig>,
125 source_instances: Vec<(
126 Box<dyn SourceTrait>,
127 std::collections::HashMap<String, String>,
128 )>,
129 reaction_instances: Vec<(
130 Box<dyn ReactionTrait>,
131 std::collections::HashMap<String, String>,
132 )>,
133 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
134 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
135 identity_provider: Option<Arc<dyn IdentityProvider>>,
136}
137
138impl Default for DrasiLibBuilder {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl DrasiLibBuilder {
145 pub fn new() -> Self {
147 Self {
148 server_id: None,
149 priority_queue_capacity: None,
150 dispatch_buffer_capacity: None,
151 storage_backends: Vec::new(),
152 query_configs: Vec::new(),
153 source_instances: Vec::new(),
154 reaction_instances: Vec::new(),
155 index_provider: None,
156 state_store_provider: None,
157 identity_provider: None,
158 }
159 }
160
161 pub fn with_id(mut self, id: impl Into<String>) -> Self {
163 self.server_id = Some(id.into());
164 self
165 }
166
167 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
169 self.priority_queue_capacity = Some(capacity);
170 self
171 }
172
173 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
175 self.dispatch_buffer_capacity = Some(capacity);
176 self
177 }
178
179 pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
181 self.storage_backends.push(config);
182 self
183 }
184
185 pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
207 self.index_provider = Some(provider);
208 self
209 }
210
211 pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
231 self.state_store_provider = Some(provider);
232 self
233 }
234
235 pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
256 self.identity_provider = Some(provider);
257 self
258 }
259
260 pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
274 self.source_instances
275 .push((Box::new(source), std::collections::HashMap::new()));
276 self
277 }
278
279 pub fn with_source_metadata(
284 mut self,
285 source: impl SourceTrait + 'static,
286 extra_metadata: std::collections::HashMap<String, String>,
287 ) -> Self {
288 self.source_instances
289 .push((Box::new(source), extra_metadata));
290 self
291 }
292
293 pub fn with_query(mut self, config: QueryConfig) -> Self {
295 self.query_configs.push(config);
296 self
297 }
298
299 pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
313 self.reaction_instances
314 .push((Box::new(reaction), std::collections::HashMap::new()));
315 self
316 }
317
318 pub fn with_reaction_metadata(
323 mut self,
324 reaction: impl ReactionTrait + 'static,
325 extra_metadata: std::collections::HashMap<String, String>,
326 ) -> Self {
327 self.reaction_instances
328 .push((Box::new(reaction), extra_metadata));
329 self
330 }
331
332 pub async fn build(self) -> Result<DrasiLib> {
337 let config = DrasiLibConfig {
339 id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
340 priority_queue_capacity: self.priority_queue_capacity,
341 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
342 storage_backends: self.storage_backends,
343 queries: self.query_configs.clone(),
344 };
345
346 config
348 .validate()
349 .map_err(|e| DrasiError::validation(e.to_string()))?;
350
351 let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
353 config,
354 self.index_provider,
355 self.state_store_provider,
356 self.identity_provider,
357 ));
358 let mut core = DrasiLib::new(runtime_config);
359
360 let state_store = core.config.state_store_provider.clone();
362 core.source_manager
363 .inject_state_store(state_store.clone())
364 .await;
365 core.reaction_manager.inject_state_store(state_store).await;
366
367 {
370 use crate::sources::component_graph_source::ComponentGraphSource;
371 let graph_source = ComponentGraphSource::new(
372 core.component_event_broadcast_tx.clone(),
373 core.config.id.clone(),
374 core.component_graph.clone(),
375 )
376 .map_err(|e| {
377 DrasiError::operation_failed(
378 "source",
379 "component-graph",
380 "add",
381 format!("Failed to create: {e}"),
382 )
383 })?;
384
385 let source_id = graph_source.id().to_string();
386 let source_type = graph_source.type_name().to_string();
387 {
388 let mut graph = core.component_graph.write().await;
389 let mut metadata = std::collections::HashMap::new();
390 metadata.insert("kind".to_string(), source_type);
391 metadata.insert(
392 "autoStart".to_string(),
393 graph_source.auto_start().to_string(),
394 );
395 graph.register_source(&source_id, metadata).map_err(|e| {
396 DrasiError::operation_failed(
397 "source",
398 &source_id,
399 "add",
400 format!("Failed to register: {e}"),
401 )
402 })?;
403 }
404 if let Err(e) = core.source_manager.provision_source(graph_source).await {
405 let mut graph = core.component_graph.write().await;
406 let _ = graph.deregister(&source_id);
407 return Err(DrasiError::operation_failed(
408 "source",
409 &source_id,
410 "add",
411 format!("Failed to provision: {e}"),
412 ));
413 }
414 }
415
416 for (source, extra_metadata) in self.source_instances {
419 let source_id = source.id().to_string();
420 let source_type = source.type_name().to_string();
421 let auto_start = source.auto_start();
422
423 {
424 let mut graph = core.component_graph.write().await;
425 let mut metadata = std::collections::HashMap::new();
426 metadata.insert("kind".to_string(), source_type);
427 metadata.insert("autoStart".to_string(), auto_start.to_string());
428 metadata.extend(extra_metadata);
429 graph.register_source(&source_id, metadata).map_err(|e| {
430 DrasiError::operation_failed(
431 "source",
432 &source_id,
433 "add",
434 format!("Failed to register: {e}"),
435 )
436 })?;
437 }
438 if let Err(e) = core.source_manager.provision_source(source).await {
439 let mut graph = core.component_graph.write().await;
440 let _ = graph.deregister(&source_id);
441 return Err(DrasiError::operation_failed(
442 "source",
443 &source_id,
444 "add",
445 format!("Failed to provision: {e}"),
446 ));
447 }
448 }
449
450 core.initialize().await?;
452
453 for (reaction, extra_metadata) in self.reaction_instances {
455 let reaction_id = reaction.id().to_string();
456 let reaction_type = reaction.type_name().to_string();
457 let query_ids = reaction.query_ids();
458
459 {
461 let mut graph = core.component_graph.write().await;
462 let mut metadata = std::collections::HashMap::new();
463 metadata.insert("kind".to_string(), reaction_type);
464 metadata.extend(extra_metadata);
465 graph
466 .register_reaction(&reaction_id, metadata, &query_ids)
467 .map_err(|e| {
468 DrasiError::operation_failed(
469 "reaction",
470 &reaction_id,
471 "add",
472 format!("Failed to register: {e}"),
473 )
474 })?;
475 }
476 if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
477 let mut graph = core.component_graph.write().await;
478 let _ = graph.deregister(&reaction_id);
479 return Err(DrasiError::operation_failed(
480 "reaction",
481 &reaction_id,
482 "add",
483 format!("Failed to provision: {e}"),
484 ));
485 }
486 }
487
488 if core.config.identity_provider.is_some() {
492 let mut graph = core.component_graph.write().await;
493 let component_ids: Vec<String> = graph
494 .list_by_kind(&crate::component_graph::ComponentKind::Source)
495 .into_iter()
496 .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
497 .map(|(id, _)| id)
498 .collect();
499
500 let mut metadata = std::collections::HashMap::new();
501 metadata.insert("kind".to_string(), "identity_provider".to_string());
502 graph
503 .register_identity_provider("identity-provider", metadata, &component_ids)
504 .map_err(|e| {
505 DrasiError::operation_failed(
506 "identity_provider",
507 "identity-provider",
508 "add",
509 format!("Failed to register: {e}"),
510 )
511 })?;
512 }
513
514 Ok(core)
515 }
516}
517
518pub struct Query {
538 id: String,
539 query: String,
540 query_language: QueryLanguage,
541 sources: Vec<SourceSubscriptionConfig>,
542 middleware: Vec<SourceMiddlewareConfig>,
543 auto_start: bool,
544 joins: Option<Vec<QueryJoinConfig>>,
545 enable_bootstrap: bool,
546 bootstrap_buffer_size: usize,
547 priority_queue_capacity: Option<usize>,
548 dispatch_buffer_capacity: Option<usize>,
549 dispatch_mode: Option<DispatchMode>,
550 storage_backend: Option<crate::indexes::StorageBackendRef>,
551 recovery_policy: Option<crate::recovery::RecoveryPolicy>,
552 outbox_capacity: usize,
553 bootstrap_timeout_secs: u64,
554}
555
556impl Query {
557 pub fn cypher(id: impl Into<String>) -> Self {
559 Self {
560 id: id.into(),
561 query: String::new(),
562 query_language: QueryLanguage::Cypher,
563 sources: Vec::new(),
564 middleware: Vec::new(),
565 auto_start: true,
566 joins: None,
567 enable_bootstrap: true,
568 bootstrap_buffer_size: 10000,
569 priority_queue_capacity: None,
570 dispatch_buffer_capacity: None,
571 dispatch_mode: None,
572 storage_backend: None,
573 recovery_policy: None,
574 outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
575 bootstrap_timeout_secs: 300,
576 }
577 }
578
579 pub fn gql(id: impl Into<String>) -> Self {
581 Self {
582 id: id.into(),
583 query: String::new(),
584 query_language: QueryLanguage::GQL,
585 sources: Vec::new(),
586 middleware: Vec::new(),
587 auto_start: true,
588 joins: None,
589 enable_bootstrap: true,
590 bootstrap_buffer_size: 10000,
591 priority_queue_capacity: None,
592 dispatch_buffer_capacity: None,
593 dispatch_mode: None,
594 storage_backend: None,
595 recovery_policy: None,
596 outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
597 bootstrap_timeout_secs: 300,
598 }
599 }
600
601 pub fn query(mut self, query: impl Into<String>) -> Self {
603 self.query = query.into();
604 self
605 }
606
607 pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
609 self.sources.push(SourceSubscriptionConfig {
610 source_id: source_id.into(),
611 nodes: Vec::new(),
612 relations: Vec::new(),
613 pipeline: Vec::new(),
614 });
615 self
616 }
617
618 pub fn from_source_with_pipeline(
623 mut self,
624 source_id: impl Into<String>,
625 pipeline: Vec<String>,
626 ) -> Self {
627 self.sources.push(SourceSubscriptionConfig {
628 source_id: source_id.into(),
629 nodes: Vec::new(),
630 relations: Vec::new(),
631 pipeline,
632 });
633 self
634 }
635
636 pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
638 self.middleware.push(middleware);
639 self
640 }
641
642 pub fn auto_start(mut self, auto_start: bool) -> Self {
644 self.auto_start = auto_start;
645 self
646 }
647
648 pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
650 self.joins = Some(joins);
651 self
652 }
653
654 pub fn enable_bootstrap(mut self, enable: bool) -> Self {
656 self.enable_bootstrap = enable;
657 self
658 }
659
660 pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
662 self.bootstrap_buffer_size = size;
663 self
664 }
665
666 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
668 self.priority_queue_capacity = Some(capacity);
669 self
670 }
671
672 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
674 self.dispatch_buffer_capacity = Some(capacity);
675 self
676 }
677
678 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
680 self.dispatch_mode = Some(mode);
681 self
682 }
683
684 pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
686 self.storage_backend = Some(backend);
687 self
688 }
689
690 pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
693 self.recovery_policy = Some(policy);
694 self
695 }
696
697 pub fn with_outbox_capacity(mut self, capacity: usize) -> Self {
700 self.outbox_capacity = capacity;
701 self
702 }
703
704 pub fn with_bootstrap_timeout_secs(mut self, secs: u64) -> Self {
709 self.bootstrap_timeout_secs = secs;
710 self
711 }
712
713 pub fn build(self) -> QueryConfig {
715 QueryConfig {
716 id: self.id,
717 query: self.query,
718 query_language: self.query_language,
719 sources: self.sources,
720 middleware: self.middleware,
721 auto_start: self.auto_start,
722 joins: self.joins,
723 enable_bootstrap: self.enable_bootstrap,
724 bootstrap_buffer_size: self.bootstrap_buffer_size,
725 priority_queue_capacity: self.priority_queue_capacity,
726 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
727 dispatch_mode: self.dispatch_mode,
728 storage_backend: self.storage_backend,
729 recovery_policy: self.recovery_policy,
730 outbox_capacity: self.outbox_capacity,
731 bootstrap_timeout_secs: self.bootstrap_timeout_secs,
732 }
733 }
734}
735
736#[cfg(test)]
741mod tests {
742 use super::*;
743 use crate::DrasiLib;
744
745 #[test]
750 fn test_query_builder_cypher() {
751 let config = Query::cypher("test-query")
752 .query("MATCH (n) RETURN n")
753 .from_source("source1")
754 .auto_start(false)
755 .build();
756
757 assert_eq!(config.id, "test-query");
758 assert_eq!(config.query, "MATCH (n) RETURN n");
759 assert_eq!(config.query_language, QueryLanguage::Cypher);
760 assert!(!config.auto_start);
761 assert_eq!(config.sources.len(), 1);
762 assert_eq!(config.sources[0].source_id, "source1");
763 }
764
765 #[test]
766 fn test_query_builder_gql() {
767 let config = Query::gql("test-query")
768 .query("MATCH (n:Person) RETURN n.name")
769 .from_source("source1")
770 .build();
771
772 assert_eq!(config.query_language, QueryLanguage::GQL);
773 }
774
775 #[test]
776 fn test_query_builder_multiple_sources() {
777 let config = Query::cypher("test-query")
778 .query("MATCH (n) RETURN n")
779 .from_source("source1")
780 .from_source("source2")
781 .build();
782
783 assert_eq!(config.sources.len(), 2);
784 }
785
786 #[tokio::test]
787 async fn test_drasi_lib_builder_empty() {
788 let core = DrasiLibBuilder::new().build().await.unwrap();
789
790 assert!(!core.is_running().await);
791 }
792
793 #[tokio::test]
794 async fn test_drasi_lib_builder_with_id() {
795 let core = DrasiLibBuilder::new()
796 .with_id("test-server")
797 .build()
798 .await
799 .unwrap();
800
801 assert_eq!(core.get_config().id, "test-server");
802 }
803
804 #[tokio::test]
805 async fn test_drasi_lib_builder_with_query_no_source() {
806 let core = DrasiLibBuilder::new()
809 .with_id("test-server")
810 .with_query(
811 Query::cypher("query1")
812 .query("MATCH (n) RETURN n")
813 .auto_start(false)
815 .build(),
816 )
817 .build()
818 .await
819 .unwrap();
820
821 let queries = core.list_queries().await.unwrap();
822 assert_eq!(queries.len(), 1);
823 }
824
825 #[tokio::test]
830 async fn test_builder_creates_initialized_server() {
831 let core = DrasiLib::builder().with_id("builder-test").build().await;
832
833 assert!(core.is_ok(), "Builder should create initialized server");
834 let core = core.unwrap();
835 assert!(
836 core.state_guard.is_initialized(),
837 "Server should be initialized"
838 );
839 }
840
841 #[tokio::test]
842 async fn test_builder_with_query() {
843 let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
847 let core = DrasiLib::builder()
848 .with_id("complex-server")
849 .with_source(source)
850 .with_query(
851 Query::cypher("query1")
852 .query("MATCH (n) RETURN n")
853 .from_source("source1")
854 .build(),
855 )
856 .build()
857 .await;
858
859 assert!(core.is_ok(), "Builder with query should succeed");
860 let core = core.unwrap();
861 assert!(core.state_guard.is_initialized());
862 assert_eq!(core.config.queries.len(), 1);
863 }
864
865 #[test]
870 fn test_builder_with_id_sets_id() {
871 let builder = DrasiLibBuilder::new().with_id("my-server");
872 assert_eq!(builder.server_id, Some("my-server".to_string()));
873 }
874
875 #[test]
876 fn test_builder_with_id_accepts_string() {
877 let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
878 assert_eq!(builder.server_id, Some("owned-id".to_string()));
879 }
880
881 #[test]
882 fn test_builder_with_priority_queue_capacity() {
883 let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
884 assert_eq!(builder.priority_queue_capacity, Some(50000));
885 }
886
887 #[test]
888 fn test_builder_with_dispatch_buffer_capacity() {
889 let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
890 assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
891 }
892
893 #[test]
894 fn test_builder_with_query_adds_to_list() {
895 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
896 let builder = DrasiLibBuilder::new().with_query(q);
897 assert_eq!(builder.query_configs.len(), 1);
898 assert_eq!(builder.query_configs[0].id, "q1");
899 }
900
901 #[test]
902 fn test_builder_with_multiple_queries() {
903 let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
904 let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
905 let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
906 assert_eq!(builder.query_configs.len(), 2);
907 assert_eq!(builder.query_configs[0].id, "q1");
908 assert_eq!(builder.query_configs[1].id, "q2");
909 }
910
911 #[test]
912 fn test_builder_add_storage_backend() {
913 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
914
915 let backend = StorageBackendConfig {
916 id: "mem1".to_string(),
917 spec: StorageBackendSpec::Memory {
918 enable_archive: false,
919 },
920 };
921 let builder = DrasiLibBuilder::new().add_storage_backend(backend);
922 assert_eq!(builder.storage_backends.len(), 1);
923 assert_eq!(builder.storage_backends[0].id, "mem1");
924 }
925
926 #[test]
927 fn test_builder_add_multiple_storage_backends() {
928 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
929
930 let b1 = StorageBackendConfig {
931 id: "mem1".to_string(),
932 spec: StorageBackendSpec::Memory {
933 enable_archive: false,
934 },
935 };
936 let b2 = StorageBackendConfig {
937 id: "mem2".to_string(),
938 spec: StorageBackendSpec::Memory {
939 enable_archive: true,
940 },
941 };
942 let builder = DrasiLibBuilder::new()
943 .add_storage_backend(b1)
944 .add_storage_backend(b2);
945 assert_eq!(builder.storage_backends.len(), 2);
946 assert_eq!(builder.storage_backends[0].id, "mem1");
947 assert_eq!(builder.storage_backends[1].id, "mem2");
948 }
949
950 #[test]
951 fn test_builder_default_values() {
952 let builder = DrasiLibBuilder::new();
953 assert_eq!(builder.server_id, None);
954 assert_eq!(builder.priority_queue_capacity, None);
955 assert_eq!(builder.dispatch_buffer_capacity, None);
956 assert!(builder.storage_backends.is_empty());
957 assert!(builder.query_configs.is_empty());
958 assert!(builder.source_instances.is_empty());
959 assert!(builder.reaction_instances.is_empty());
960 assert!(builder.index_provider.is_none());
961 assert!(builder.state_store_provider.is_none());
962 }
963
964 #[test]
965 fn test_builder_fluent_chaining() {
966 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
967
968 let backend = StorageBackendConfig {
969 id: "mem".to_string(),
970 spec: StorageBackendSpec::Memory {
971 enable_archive: false,
972 },
973 };
974 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
975
976 let builder = DrasiLibBuilder::new()
977 .with_id("chained")
978 .with_priority_queue_capacity(20000)
979 .with_dispatch_buffer_capacity(3000)
980 .add_storage_backend(backend)
981 .with_query(q);
982
983 assert_eq!(builder.server_id, Some("chained".to_string()));
984 assert_eq!(builder.priority_queue_capacity, Some(20000));
985 assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
986 assert_eq!(builder.storage_backends.len(), 1);
987 assert_eq!(builder.query_configs.len(), 1);
988 }
989
990 #[tokio::test]
991 async fn test_builder_default_id_when_none_set() {
992 let core = DrasiLibBuilder::new().build().await.unwrap();
993 assert_eq!(core.get_config().id, "drasi-lib");
994 }
995
996 #[tokio::test]
997 async fn test_builder_with_storage_backend_builds_ok() {
998 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
999
1000 let backend = StorageBackendConfig {
1001 id: "test-mem".to_string(),
1002 spec: StorageBackendSpec::Memory {
1003 enable_archive: false,
1004 },
1005 };
1006 let core = DrasiLibBuilder::new()
1007 .add_storage_backend(backend)
1008 .build()
1009 .await;
1010 assert!(core.is_ok(), "Builder with storage backend should succeed");
1011 }
1012
1013 #[test]
1018 fn test_query_cypher_sets_id_and_language() {
1019 let q = Query::cypher("cypher-q");
1020 assert_eq!(q.id, "cypher-q");
1021 assert_eq!(q.query_language, QueryLanguage::Cypher);
1022 }
1023
1024 #[test]
1025 fn test_query_gql_sets_id_and_language() {
1026 let q = Query::gql("gql-q");
1027 assert_eq!(q.id, "gql-q");
1028 assert_eq!(q.query_language, QueryLanguage::GQL);
1029 }
1030
1031 #[test]
1032 fn test_query_from_source_adds_source() {
1033 let q = Query::cypher("q").from_source("src1");
1034 assert_eq!(q.sources.len(), 1);
1035 assert_eq!(q.sources[0].source_id, "src1");
1036 }
1037
1038 #[test]
1039 fn test_query_from_source_chaining() {
1040 let q = Query::cypher("q")
1041 .from_source("src1")
1042 .from_source("src2")
1043 .from_source("src3");
1044 assert_eq!(q.sources.len(), 3);
1045 assert_eq!(q.sources[0].source_id, "src1");
1046 assert_eq!(q.sources[1].source_id, "src2");
1047 assert_eq!(q.sources[2].source_id, "src3");
1048 }
1049
1050 #[test]
1051 fn test_query_auto_start_default_true() {
1052 let q = Query::cypher("q");
1053 assert!(q.auto_start);
1054 }
1055
1056 #[test]
1057 fn test_query_auto_start_false() {
1058 let q = Query::cypher("q").auto_start(false);
1059 assert!(!q.auto_start);
1060 }
1061
1062 #[test]
1063 fn test_query_enable_bootstrap_default_true() {
1064 let q = Query::cypher("q");
1065 assert!(q.enable_bootstrap);
1066 }
1067
1068 #[test]
1069 fn test_query_enable_bootstrap_false() {
1070 let q = Query::cypher("q").enable_bootstrap(false);
1071 assert!(!q.enable_bootstrap);
1072 }
1073
1074 #[test]
1075 fn test_query_bootstrap_buffer_size_default() {
1076 let q = Query::cypher("q");
1077 assert_eq!(q.bootstrap_buffer_size, 10000);
1078 }
1079
1080 #[test]
1081 fn test_query_with_bootstrap_buffer_size() {
1082 let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1083 assert_eq!(q.bootstrap_buffer_size, 5000);
1084 }
1085
1086 #[test]
1087 fn test_query_with_dispatch_mode_broadcast() {
1088 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1089 assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1090 }
1091
1092 #[test]
1093 fn test_query_with_dispatch_mode_channel() {
1094 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1095 assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1096 }
1097
1098 #[test]
1099 fn test_query_dispatch_mode_default_none() {
1100 let q = Query::cypher("q");
1101 assert_eq!(q.dispatch_mode, None);
1102 }
1103
1104 #[test]
1105 fn test_query_with_priority_queue_capacity() {
1106 let q = Query::cypher("q").with_priority_queue_capacity(50000);
1107 assert_eq!(q.priority_queue_capacity, Some(50000));
1108 }
1109
1110 #[test]
1111 fn test_query_priority_queue_capacity_default_none() {
1112 let q = Query::cypher("q");
1113 assert_eq!(q.priority_queue_capacity, None);
1114 }
1115
1116 #[test]
1117 fn test_query_with_dispatch_buffer_capacity() {
1118 let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1119 assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1120 }
1121
1122 #[test]
1123 fn test_query_dispatch_buffer_capacity_default_none() {
1124 let q = Query::cypher("q");
1125 assert_eq!(q.dispatch_buffer_capacity, None);
1126 }
1127
1128 #[test]
1129 fn test_query_build_propagates_all_fields() {
1130 let config = Query::cypher("full-query")
1131 .query("MATCH (n:Person) RETURN n.name")
1132 .from_source("source-a")
1133 .from_source("source-b")
1134 .auto_start(false)
1135 .enable_bootstrap(false)
1136 .with_bootstrap_buffer_size(5000)
1137 .with_priority_queue_capacity(50000)
1138 .with_dispatch_buffer_capacity(2500)
1139 .with_dispatch_mode(DispatchMode::Broadcast)
1140 .build();
1141
1142 assert_eq!(config.id, "full-query");
1143 assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1144 assert_eq!(config.query_language, QueryLanguage::Cypher);
1145 assert_eq!(config.sources.len(), 2);
1146 assert_eq!(config.sources[0].source_id, "source-a");
1147 assert_eq!(config.sources[1].source_id, "source-b");
1148 assert!(!config.auto_start);
1149 assert!(!config.enable_bootstrap);
1150 assert_eq!(config.bootstrap_buffer_size, 5000);
1151 assert_eq!(config.priority_queue_capacity, Some(50000));
1152 assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1153 assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1154 assert!(config.joins.is_none());
1155 assert!(config.middleware.is_empty());
1156 assert!(config.storage_backend.is_none());
1157 }
1158
1159 #[test]
1160 fn test_query_build_gql_propagates_language() {
1161 let config = Query::gql("gql-full")
1162 .query("MATCH (n) RETURN n")
1163 .from_source("src")
1164 .build();
1165
1166 assert_eq!(config.id, "gql-full");
1167 assert_eq!(config.query_language, QueryLanguage::GQL);
1168 assert_eq!(config.query, "MATCH (n) RETURN n");
1169 assert_eq!(config.sources.len(), 1);
1170 assert!(config.auto_start);
1172 assert!(config.enable_bootstrap);
1173 assert_eq!(config.bootstrap_buffer_size, 10000);
1174 }
1175
1176 #[test]
1177 fn test_query_build_defaults() {
1178 let config = Query::cypher("defaults-only").build();
1179
1180 assert_eq!(config.id, "defaults-only");
1181 assert_eq!(config.query, "");
1182 assert_eq!(config.query_language, QueryLanguage::Cypher);
1183 assert!(config.sources.is_empty());
1184 assert!(config.middleware.is_empty());
1185 assert!(config.auto_start);
1186 assert!(config.joins.is_none());
1187 assert!(config.enable_bootstrap);
1188 assert_eq!(config.bootstrap_buffer_size, 10000);
1189 assert_eq!(config.priority_queue_capacity, None);
1190 assert_eq!(config.dispatch_buffer_capacity, None);
1191 assert_eq!(config.dispatch_mode, None);
1192 assert!(config.storage_backend.is_none());
1193 }
1194}