1use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67 DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::sources::Source as SourceTrait;
76use crate::state_store::StateStoreProvider;
77use drasi_core::models::SourceMiddlewareConfig;
78
79pub struct DrasiLibBuilder {
120 server_id: Option<String>,
121 priority_queue_capacity: Option<usize>,
122 dispatch_buffer_capacity: Option<usize>,
123 storage_backends: Vec<StorageBackendConfig>,
124 query_configs: Vec<QueryConfig>,
125 source_instances: Vec<(
126 Box<dyn SourceTrait>,
127 std::collections::HashMap<String, String>,
128 )>,
129 reaction_instances: Vec<(
130 Box<dyn ReactionTrait>,
131 std::collections::HashMap<String, String>,
132 )>,
133 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
134 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
135 identity_provider: Option<Arc<dyn IdentityProvider>>,
136}
137
138impl Default for DrasiLibBuilder {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl DrasiLibBuilder {
145 pub fn new() -> Self {
147 Self {
148 server_id: None,
149 priority_queue_capacity: None,
150 dispatch_buffer_capacity: None,
151 storage_backends: Vec::new(),
152 query_configs: Vec::new(),
153 source_instances: Vec::new(),
154 reaction_instances: Vec::new(),
155 index_provider: None,
156 state_store_provider: None,
157 identity_provider: None,
158 }
159 }
160
161 pub fn with_id(mut self, id: impl Into<String>) -> Self {
163 self.server_id = Some(id.into());
164 self
165 }
166
167 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
169 self.priority_queue_capacity = Some(capacity);
170 self
171 }
172
173 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
175 self.dispatch_buffer_capacity = Some(capacity);
176 self
177 }
178
179 pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
181 self.storage_backends.push(config);
182 self
183 }
184
185 pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
207 self.index_provider = Some(provider);
208 self
209 }
210
211 pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
231 self.state_store_provider = Some(provider);
232 self
233 }
234
235 pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
256 self.identity_provider = Some(provider);
257 self
258 }
259
260 pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
274 self.source_instances
275 .push((Box::new(source), std::collections::HashMap::new()));
276 self
277 }
278
279 pub fn with_source_metadata(
284 mut self,
285 source: impl SourceTrait + 'static,
286 extra_metadata: std::collections::HashMap<String, String>,
287 ) -> Self {
288 self.source_instances
289 .push((Box::new(source), extra_metadata));
290 self
291 }
292
293 pub fn with_query(mut self, config: QueryConfig) -> Self {
295 self.query_configs.push(config);
296 self
297 }
298
299 pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
313 self.reaction_instances
314 .push((Box::new(reaction), std::collections::HashMap::new()));
315 self
316 }
317
318 pub fn with_reaction_metadata(
323 mut self,
324 reaction: impl ReactionTrait + 'static,
325 extra_metadata: std::collections::HashMap<String, String>,
326 ) -> Self {
327 self.reaction_instances
328 .push((Box::new(reaction), extra_metadata));
329 self
330 }
331
332 pub async fn build(self) -> Result<DrasiLib> {
337 let config = DrasiLibConfig {
339 id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
340 priority_queue_capacity: self.priority_queue_capacity,
341 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
342 storage_backends: self.storage_backends,
343 queries: self.query_configs.clone(),
344 };
345
346 config
348 .validate()
349 .map_err(|e| DrasiError::validation(e.to_string()))?;
350
351 let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
353 config,
354 self.index_provider,
355 self.state_store_provider,
356 self.identity_provider,
357 ));
358 let mut core = DrasiLib::new(runtime_config);
359
360 let state_store = core.config.state_store_provider.clone();
362 core.source_manager
363 .inject_state_store(state_store.clone())
364 .await;
365 core.reaction_manager.inject_state_store(state_store).await;
366
367 {
370 use crate::sources::component_graph_source::ComponentGraphSource;
371 let graph_source = ComponentGraphSource::new(
372 core.component_event_broadcast_tx.clone(),
373 core.config.id.clone(),
374 core.component_graph.clone(),
375 )
376 .map_err(|e| {
377 DrasiError::operation_failed(
378 "source",
379 "component-graph",
380 "add",
381 format!("Failed to create: {e}"),
382 )
383 })?;
384
385 let source_id = graph_source.id().to_string();
386 let source_type = graph_source.type_name().to_string();
387 {
388 let mut graph = core.component_graph.write().await;
389 let mut metadata = std::collections::HashMap::new();
390 metadata.insert("kind".to_string(), source_type);
391 metadata.insert(
392 "autoStart".to_string(),
393 graph_source.auto_start().to_string(),
394 );
395 graph.register_source(&source_id, metadata).map_err(|e| {
396 DrasiError::operation_failed(
397 "source",
398 &source_id,
399 "add",
400 format!("Failed to register: {e}"),
401 )
402 })?;
403 }
404 if let Err(e) = core.source_manager.provision_source(graph_source).await {
405 let mut graph = core.component_graph.write().await;
406 let _ = graph.deregister(&source_id);
407 return Err(DrasiError::operation_failed(
408 "source",
409 &source_id,
410 "add",
411 format!("Failed to provision: {e}"),
412 ));
413 }
414 }
415
416 for (source, extra_metadata) in self.source_instances {
419 let source_id = source.id().to_string();
420 let source_type = source.type_name().to_string();
421 let auto_start = source.auto_start();
422
423 {
424 let mut graph = core.component_graph.write().await;
425 let mut metadata = std::collections::HashMap::new();
426 metadata.insert("kind".to_string(), source_type);
427 metadata.insert("autoStart".to_string(), auto_start.to_string());
428 metadata.extend(extra_metadata);
429 graph.register_source(&source_id, metadata).map_err(|e| {
430 DrasiError::operation_failed(
431 "source",
432 &source_id,
433 "add",
434 format!("Failed to register: {e}"),
435 )
436 })?;
437 }
438 if let Err(e) = core.source_manager.provision_source(source).await {
439 let mut graph = core.component_graph.write().await;
440 let _ = graph.deregister(&source_id);
441 return Err(DrasiError::operation_failed(
442 "source",
443 &source_id,
444 "add",
445 format!("Failed to provision: {e}"),
446 ));
447 }
448 }
449
450 core.initialize().await?;
452
453 for (reaction, extra_metadata) in self.reaction_instances {
455 let reaction_id = reaction.id().to_string();
456 let reaction_type = reaction.type_name().to_string();
457 let query_ids = reaction.query_ids();
458
459 {
461 let mut graph = core.component_graph.write().await;
462 let mut metadata = std::collections::HashMap::new();
463 metadata.insert("kind".to_string(), reaction_type);
464 metadata.extend(extra_metadata);
465 graph
466 .register_reaction(&reaction_id, metadata, &query_ids)
467 .map_err(|e| {
468 DrasiError::operation_failed(
469 "reaction",
470 &reaction_id,
471 "add",
472 format!("Failed to register: {e}"),
473 )
474 })?;
475 }
476 if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
477 let mut graph = core.component_graph.write().await;
478 let _ = graph.deregister(&reaction_id);
479 return Err(DrasiError::operation_failed(
480 "reaction",
481 &reaction_id,
482 "add",
483 format!("Failed to provision: {e}"),
484 ));
485 }
486 }
487
488 if core.config.identity_provider.is_some() {
492 let mut graph = core.component_graph.write().await;
493 let component_ids: Vec<String> = graph
494 .list_by_kind(&crate::component_graph::ComponentKind::Source)
495 .into_iter()
496 .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
497 .map(|(id, _)| id)
498 .collect();
499
500 let mut metadata = std::collections::HashMap::new();
501 metadata.insert("kind".to_string(), "identity_provider".to_string());
502 graph
503 .register_identity_provider("identity-provider", metadata, &component_ids)
504 .map_err(|e| {
505 DrasiError::operation_failed(
506 "identity_provider",
507 "identity-provider",
508 "add",
509 format!("Failed to register: {e}"),
510 )
511 })?;
512 }
513
514 Ok(core)
515 }
516}
517
518pub struct Query {
538 id: String,
539 query: String,
540 query_language: QueryLanguage,
541 sources: Vec<SourceSubscriptionConfig>,
542 middleware: Vec<SourceMiddlewareConfig>,
543 auto_start: bool,
544 joins: Option<Vec<QueryJoinConfig>>,
545 enable_bootstrap: bool,
546 bootstrap_buffer_size: usize,
547 priority_queue_capacity: Option<usize>,
548 dispatch_buffer_capacity: Option<usize>,
549 dispatch_mode: Option<DispatchMode>,
550 storage_backend: Option<crate::indexes::StorageBackendRef>,
551 recovery_policy: Option<crate::recovery::RecoveryPolicy>,
552}
553
554impl Query {
555 pub fn cypher(id: impl Into<String>) -> Self {
557 Self {
558 id: id.into(),
559 query: String::new(),
560 query_language: QueryLanguage::Cypher,
561 sources: Vec::new(),
562 middleware: Vec::new(),
563 auto_start: true,
564 joins: None,
565 enable_bootstrap: true,
566 bootstrap_buffer_size: 10000,
567 priority_queue_capacity: None,
568 dispatch_buffer_capacity: None,
569 dispatch_mode: None,
570 storage_backend: None,
571 recovery_policy: None,
572 }
573 }
574
575 pub fn gql(id: impl Into<String>) -> Self {
577 Self {
578 id: id.into(),
579 query: String::new(),
580 query_language: QueryLanguage::GQL,
581 sources: Vec::new(),
582 middleware: Vec::new(),
583 auto_start: true,
584 joins: None,
585 enable_bootstrap: true,
586 bootstrap_buffer_size: 10000,
587 priority_queue_capacity: None,
588 dispatch_buffer_capacity: None,
589 dispatch_mode: None,
590 storage_backend: None,
591 recovery_policy: None,
592 }
593 }
594
595 pub fn query(mut self, query: impl Into<String>) -> Self {
597 self.query = query.into();
598 self
599 }
600
601 pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
603 self.sources.push(SourceSubscriptionConfig {
604 source_id: source_id.into(),
605 nodes: Vec::new(),
606 relations: Vec::new(),
607 pipeline: Vec::new(),
608 });
609 self
610 }
611
612 pub fn from_source_with_pipeline(
617 mut self,
618 source_id: impl Into<String>,
619 pipeline: Vec<String>,
620 ) -> Self {
621 self.sources.push(SourceSubscriptionConfig {
622 source_id: source_id.into(),
623 nodes: Vec::new(),
624 relations: Vec::new(),
625 pipeline,
626 });
627 self
628 }
629
630 pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
632 self.middleware.push(middleware);
633 self
634 }
635
636 pub fn auto_start(mut self, auto_start: bool) -> Self {
638 self.auto_start = auto_start;
639 self
640 }
641
642 pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
644 self.joins = Some(joins);
645 self
646 }
647
648 pub fn enable_bootstrap(mut self, enable: bool) -> Self {
650 self.enable_bootstrap = enable;
651 self
652 }
653
654 pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
656 self.bootstrap_buffer_size = size;
657 self
658 }
659
660 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
662 self.priority_queue_capacity = Some(capacity);
663 self
664 }
665
666 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
668 self.dispatch_buffer_capacity = Some(capacity);
669 self
670 }
671
672 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
674 self.dispatch_mode = Some(mode);
675 self
676 }
677
678 pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
680 self.storage_backend = Some(backend);
681 self
682 }
683
684 pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
687 self.recovery_policy = Some(policy);
688 self
689 }
690
691 pub fn build(self) -> QueryConfig {
693 QueryConfig {
694 id: self.id,
695 query: self.query,
696 query_language: self.query_language,
697 sources: self.sources,
698 middleware: self.middleware,
699 auto_start: self.auto_start,
700 joins: self.joins,
701 enable_bootstrap: self.enable_bootstrap,
702 bootstrap_buffer_size: self.bootstrap_buffer_size,
703 priority_queue_capacity: self.priority_queue_capacity,
704 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
705 dispatch_mode: self.dispatch_mode,
706 storage_backend: self.storage_backend,
707 recovery_policy: self.recovery_policy,
708 }
709 }
710}
711
712#[cfg(test)]
717mod tests {
718 use super::*;
719 use crate::DrasiLib;
720
721 #[test]
726 fn test_query_builder_cypher() {
727 let config = Query::cypher("test-query")
728 .query("MATCH (n) RETURN n")
729 .from_source("source1")
730 .auto_start(false)
731 .build();
732
733 assert_eq!(config.id, "test-query");
734 assert_eq!(config.query, "MATCH (n) RETURN n");
735 assert_eq!(config.query_language, QueryLanguage::Cypher);
736 assert!(!config.auto_start);
737 assert_eq!(config.sources.len(), 1);
738 assert_eq!(config.sources[0].source_id, "source1");
739 }
740
741 #[test]
742 fn test_query_builder_gql() {
743 let config = Query::gql("test-query")
744 .query("MATCH (n:Person) RETURN n.name")
745 .from_source("source1")
746 .build();
747
748 assert_eq!(config.query_language, QueryLanguage::GQL);
749 }
750
751 #[test]
752 fn test_query_builder_multiple_sources() {
753 let config = Query::cypher("test-query")
754 .query("MATCH (n) RETURN n")
755 .from_source("source1")
756 .from_source("source2")
757 .build();
758
759 assert_eq!(config.sources.len(), 2);
760 }
761
762 #[tokio::test]
763 async fn test_drasi_lib_builder_empty() {
764 let core = DrasiLibBuilder::new().build().await.unwrap();
765
766 assert!(!core.is_running().await);
767 }
768
769 #[tokio::test]
770 async fn test_drasi_lib_builder_with_id() {
771 let core = DrasiLibBuilder::new()
772 .with_id("test-server")
773 .build()
774 .await
775 .unwrap();
776
777 assert_eq!(core.get_config().id, "test-server");
778 }
779
780 #[tokio::test]
781 async fn test_drasi_lib_builder_with_query_no_source() {
782 let core = DrasiLibBuilder::new()
785 .with_id("test-server")
786 .with_query(
787 Query::cypher("query1")
788 .query("MATCH (n) RETURN n")
789 .auto_start(false)
791 .build(),
792 )
793 .build()
794 .await
795 .unwrap();
796
797 let queries = core.list_queries().await.unwrap();
798 assert_eq!(queries.len(), 1);
799 }
800
801 #[tokio::test]
806 async fn test_builder_creates_initialized_server() {
807 let core = DrasiLib::builder().with_id("builder-test").build().await;
808
809 assert!(core.is_ok(), "Builder should create initialized server");
810 let core = core.unwrap();
811 assert!(
812 core.state_guard.is_initialized(),
813 "Server should be initialized"
814 );
815 }
816
817 #[tokio::test]
818 async fn test_builder_with_query() {
819 let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
823 let core = DrasiLib::builder()
824 .with_id("complex-server")
825 .with_source(source)
826 .with_query(
827 Query::cypher("query1")
828 .query("MATCH (n) RETURN n")
829 .from_source("source1")
830 .build(),
831 )
832 .build()
833 .await;
834
835 assert!(core.is_ok(), "Builder with query should succeed");
836 let core = core.unwrap();
837 assert!(core.state_guard.is_initialized());
838 assert_eq!(core.config.queries.len(), 1);
839 }
840
841 #[test]
846 fn test_builder_with_id_sets_id() {
847 let builder = DrasiLibBuilder::new().with_id("my-server");
848 assert_eq!(builder.server_id, Some("my-server".to_string()));
849 }
850
851 #[test]
852 fn test_builder_with_id_accepts_string() {
853 let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
854 assert_eq!(builder.server_id, Some("owned-id".to_string()));
855 }
856
857 #[test]
858 fn test_builder_with_priority_queue_capacity() {
859 let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
860 assert_eq!(builder.priority_queue_capacity, Some(50000));
861 }
862
863 #[test]
864 fn test_builder_with_dispatch_buffer_capacity() {
865 let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
866 assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
867 }
868
869 #[test]
870 fn test_builder_with_query_adds_to_list() {
871 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
872 let builder = DrasiLibBuilder::new().with_query(q);
873 assert_eq!(builder.query_configs.len(), 1);
874 assert_eq!(builder.query_configs[0].id, "q1");
875 }
876
877 #[test]
878 fn test_builder_with_multiple_queries() {
879 let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
880 let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
881 let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
882 assert_eq!(builder.query_configs.len(), 2);
883 assert_eq!(builder.query_configs[0].id, "q1");
884 assert_eq!(builder.query_configs[1].id, "q2");
885 }
886
887 #[test]
888 fn test_builder_add_storage_backend() {
889 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
890
891 let backend = StorageBackendConfig {
892 id: "mem1".to_string(),
893 spec: StorageBackendSpec::Memory {
894 enable_archive: false,
895 },
896 };
897 let builder = DrasiLibBuilder::new().add_storage_backend(backend);
898 assert_eq!(builder.storage_backends.len(), 1);
899 assert_eq!(builder.storage_backends[0].id, "mem1");
900 }
901
902 #[test]
903 fn test_builder_add_multiple_storage_backends() {
904 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
905
906 let b1 = StorageBackendConfig {
907 id: "mem1".to_string(),
908 spec: StorageBackendSpec::Memory {
909 enable_archive: false,
910 },
911 };
912 let b2 = StorageBackendConfig {
913 id: "mem2".to_string(),
914 spec: StorageBackendSpec::Memory {
915 enable_archive: true,
916 },
917 };
918 let builder = DrasiLibBuilder::new()
919 .add_storage_backend(b1)
920 .add_storage_backend(b2);
921 assert_eq!(builder.storage_backends.len(), 2);
922 assert_eq!(builder.storage_backends[0].id, "mem1");
923 assert_eq!(builder.storage_backends[1].id, "mem2");
924 }
925
926 #[test]
927 fn test_builder_default_values() {
928 let builder = DrasiLibBuilder::new();
929 assert_eq!(builder.server_id, None);
930 assert_eq!(builder.priority_queue_capacity, None);
931 assert_eq!(builder.dispatch_buffer_capacity, None);
932 assert!(builder.storage_backends.is_empty());
933 assert!(builder.query_configs.is_empty());
934 assert!(builder.source_instances.is_empty());
935 assert!(builder.reaction_instances.is_empty());
936 assert!(builder.index_provider.is_none());
937 assert!(builder.state_store_provider.is_none());
938 }
939
940 #[test]
941 fn test_builder_fluent_chaining() {
942 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
943
944 let backend = StorageBackendConfig {
945 id: "mem".to_string(),
946 spec: StorageBackendSpec::Memory {
947 enable_archive: false,
948 },
949 };
950 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
951
952 let builder = DrasiLibBuilder::new()
953 .with_id("chained")
954 .with_priority_queue_capacity(20000)
955 .with_dispatch_buffer_capacity(3000)
956 .add_storage_backend(backend)
957 .with_query(q);
958
959 assert_eq!(builder.server_id, Some("chained".to_string()));
960 assert_eq!(builder.priority_queue_capacity, Some(20000));
961 assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
962 assert_eq!(builder.storage_backends.len(), 1);
963 assert_eq!(builder.query_configs.len(), 1);
964 }
965
966 #[tokio::test]
967 async fn test_builder_default_id_when_none_set() {
968 let core = DrasiLibBuilder::new().build().await.unwrap();
969 assert_eq!(core.get_config().id, "drasi-lib");
970 }
971
972 #[tokio::test]
973 async fn test_builder_with_storage_backend_builds_ok() {
974 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
975
976 let backend = StorageBackendConfig {
977 id: "test-mem".to_string(),
978 spec: StorageBackendSpec::Memory {
979 enable_archive: false,
980 },
981 };
982 let core = DrasiLibBuilder::new()
983 .add_storage_backend(backend)
984 .build()
985 .await;
986 assert!(core.is_ok(), "Builder with storage backend should succeed");
987 }
988
989 #[test]
994 fn test_query_cypher_sets_id_and_language() {
995 let q = Query::cypher("cypher-q");
996 assert_eq!(q.id, "cypher-q");
997 assert_eq!(q.query_language, QueryLanguage::Cypher);
998 }
999
1000 #[test]
1001 fn test_query_gql_sets_id_and_language() {
1002 let q = Query::gql("gql-q");
1003 assert_eq!(q.id, "gql-q");
1004 assert_eq!(q.query_language, QueryLanguage::GQL);
1005 }
1006
1007 #[test]
1008 fn test_query_from_source_adds_source() {
1009 let q = Query::cypher("q").from_source("src1");
1010 assert_eq!(q.sources.len(), 1);
1011 assert_eq!(q.sources[0].source_id, "src1");
1012 }
1013
1014 #[test]
1015 fn test_query_from_source_chaining() {
1016 let q = Query::cypher("q")
1017 .from_source("src1")
1018 .from_source("src2")
1019 .from_source("src3");
1020 assert_eq!(q.sources.len(), 3);
1021 assert_eq!(q.sources[0].source_id, "src1");
1022 assert_eq!(q.sources[1].source_id, "src2");
1023 assert_eq!(q.sources[2].source_id, "src3");
1024 }
1025
1026 #[test]
1027 fn test_query_auto_start_default_true() {
1028 let q = Query::cypher("q");
1029 assert!(q.auto_start);
1030 }
1031
1032 #[test]
1033 fn test_query_auto_start_false() {
1034 let q = Query::cypher("q").auto_start(false);
1035 assert!(!q.auto_start);
1036 }
1037
1038 #[test]
1039 fn test_query_enable_bootstrap_default_true() {
1040 let q = Query::cypher("q");
1041 assert!(q.enable_bootstrap);
1042 }
1043
1044 #[test]
1045 fn test_query_enable_bootstrap_false() {
1046 let q = Query::cypher("q").enable_bootstrap(false);
1047 assert!(!q.enable_bootstrap);
1048 }
1049
1050 #[test]
1051 fn test_query_bootstrap_buffer_size_default() {
1052 let q = Query::cypher("q");
1053 assert_eq!(q.bootstrap_buffer_size, 10000);
1054 }
1055
1056 #[test]
1057 fn test_query_with_bootstrap_buffer_size() {
1058 let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1059 assert_eq!(q.bootstrap_buffer_size, 5000);
1060 }
1061
1062 #[test]
1063 fn test_query_with_dispatch_mode_broadcast() {
1064 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1065 assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1066 }
1067
1068 #[test]
1069 fn test_query_with_dispatch_mode_channel() {
1070 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1071 assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1072 }
1073
1074 #[test]
1075 fn test_query_dispatch_mode_default_none() {
1076 let q = Query::cypher("q");
1077 assert_eq!(q.dispatch_mode, None);
1078 }
1079
1080 #[test]
1081 fn test_query_with_priority_queue_capacity() {
1082 let q = Query::cypher("q").with_priority_queue_capacity(50000);
1083 assert_eq!(q.priority_queue_capacity, Some(50000));
1084 }
1085
1086 #[test]
1087 fn test_query_priority_queue_capacity_default_none() {
1088 let q = Query::cypher("q");
1089 assert_eq!(q.priority_queue_capacity, None);
1090 }
1091
1092 #[test]
1093 fn test_query_with_dispatch_buffer_capacity() {
1094 let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1095 assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1096 }
1097
1098 #[test]
1099 fn test_query_dispatch_buffer_capacity_default_none() {
1100 let q = Query::cypher("q");
1101 assert_eq!(q.dispatch_buffer_capacity, None);
1102 }
1103
1104 #[test]
1105 fn test_query_build_propagates_all_fields() {
1106 let config = Query::cypher("full-query")
1107 .query("MATCH (n:Person) RETURN n.name")
1108 .from_source("source-a")
1109 .from_source("source-b")
1110 .auto_start(false)
1111 .enable_bootstrap(false)
1112 .with_bootstrap_buffer_size(5000)
1113 .with_priority_queue_capacity(50000)
1114 .with_dispatch_buffer_capacity(2500)
1115 .with_dispatch_mode(DispatchMode::Broadcast)
1116 .build();
1117
1118 assert_eq!(config.id, "full-query");
1119 assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1120 assert_eq!(config.query_language, QueryLanguage::Cypher);
1121 assert_eq!(config.sources.len(), 2);
1122 assert_eq!(config.sources[0].source_id, "source-a");
1123 assert_eq!(config.sources[1].source_id, "source-b");
1124 assert!(!config.auto_start);
1125 assert!(!config.enable_bootstrap);
1126 assert_eq!(config.bootstrap_buffer_size, 5000);
1127 assert_eq!(config.priority_queue_capacity, Some(50000));
1128 assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1129 assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1130 assert!(config.joins.is_none());
1131 assert!(config.middleware.is_empty());
1132 assert!(config.storage_backend.is_none());
1133 }
1134
1135 #[test]
1136 fn test_query_build_gql_propagates_language() {
1137 let config = Query::gql("gql-full")
1138 .query("MATCH (n) RETURN n")
1139 .from_source("src")
1140 .build();
1141
1142 assert_eq!(config.id, "gql-full");
1143 assert_eq!(config.query_language, QueryLanguage::GQL);
1144 assert_eq!(config.query, "MATCH (n) RETURN n");
1145 assert_eq!(config.sources.len(), 1);
1146 assert!(config.auto_start);
1148 assert!(config.enable_bootstrap);
1149 assert_eq!(config.bootstrap_buffer_size, 10000);
1150 }
1151
1152 #[test]
1153 fn test_query_build_defaults() {
1154 let config = Query::cypher("defaults-only").build();
1155
1156 assert_eq!(config.id, "defaults-only");
1157 assert_eq!(config.query, "");
1158 assert_eq!(config.query_language, QueryLanguage::Cypher);
1159 assert!(config.sources.is_empty());
1160 assert!(config.middleware.is_empty());
1161 assert!(config.auto_start);
1162 assert!(config.joins.is_none());
1163 assert!(config.enable_bootstrap);
1164 assert_eq!(config.bootstrap_buffer_size, 10000);
1165 assert_eq!(config.priority_queue_capacity, None);
1166 assert_eq!(config.dispatch_buffer_capacity, None);
1167 assert_eq!(config.dispatch_mode, None);
1168 assert!(config.storage_backend.is_none());
1169 }
1170}