1use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67 DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::indexes::IndexBackendPlugin;
71use crate::indexes::StorageBackendConfig;
72use crate::lib_core::DrasiLib;
73use crate::reactions::Reaction as ReactionTrait;
74use crate::sources::Source as SourceTrait;
75use crate::state_store::StateStoreProvider;
76use drasi_core::models::SourceMiddlewareConfig;
77
78pub struct DrasiLibBuilder {
119 server_id: Option<String>,
120 priority_queue_capacity: Option<usize>,
121 dispatch_buffer_capacity: Option<usize>,
122 storage_backends: Vec<StorageBackendConfig>,
123 query_configs: Vec<QueryConfig>,
124 source_instances: Vec<(
125 Box<dyn SourceTrait>,
126 std::collections::HashMap<String, String>,
127 )>,
128 reaction_instances: Vec<(
129 Box<dyn ReactionTrait>,
130 std::collections::HashMap<String, String>,
131 )>,
132 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
133 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
134}
135
136impl Default for DrasiLibBuilder {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl DrasiLibBuilder {
143 pub fn new() -> Self {
145 Self {
146 server_id: None,
147 priority_queue_capacity: None,
148 dispatch_buffer_capacity: None,
149 storage_backends: Vec::new(),
150 query_configs: Vec::new(),
151 source_instances: Vec::new(),
152 reaction_instances: Vec::new(),
153 index_provider: None,
154 state_store_provider: None,
155 }
156 }
157
158 pub fn with_id(mut self, id: impl Into<String>) -> Self {
160 self.server_id = Some(id.into());
161 self
162 }
163
164 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
166 self.priority_queue_capacity = Some(capacity);
167 self
168 }
169
170 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
172 self.dispatch_buffer_capacity = Some(capacity);
173 self
174 }
175
176 pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
178 self.storage_backends.push(config);
179 self
180 }
181
182 pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
204 self.index_provider = Some(provider);
205 self
206 }
207
208 pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
228 self.state_store_provider = Some(provider);
229 self
230 }
231
232 pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
246 self.source_instances
247 .push((Box::new(source), std::collections::HashMap::new()));
248 self
249 }
250
251 pub fn with_source_metadata(
256 mut self,
257 source: impl SourceTrait + 'static,
258 extra_metadata: std::collections::HashMap<String, String>,
259 ) -> Self {
260 self.source_instances
261 .push((Box::new(source), extra_metadata));
262 self
263 }
264
265 pub fn with_query(mut self, config: QueryConfig) -> Self {
267 self.query_configs.push(config);
268 self
269 }
270
271 pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
285 self.reaction_instances
286 .push((Box::new(reaction), std::collections::HashMap::new()));
287 self
288 }
289
290 pub fn with_reaction_metadata(
295 mut self,
296 reaction: impl ReactionTrait + 'static,
297 extra_metadata: std::collections::HashMap<String, String>,
298 ) -> Self {
299 self.reaction_instances
300 .push((Box::new(reaction), extra_metadata));
301 self
302 }
303
304 pub async fn build(self) -> Result<DrasiLib> {
309 let config = DrasiLibConfig {
311 id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
312 priority_queue_capacity: self.priority_queue_capacity,
313 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
314 storage_backends: self.storage_backends,
315 queries: self.query_configs.clone(),
316 };
317
318 config
320 .validate()
321 .map_err(|e| DrasiError::validation(e.to_string()))?;
322
323 let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
325 config,
326 self.index_provider,
327 self.state_store_provider,
328 ));
329 let mut core = DrasiLib::new(runtime_config);
330
331 let state_store = core.config.state_store_provider.clone();
333 core.source_manager
334 .inject_state_store(state_store.clone())
335 .await;
336 core.reaction_manager.inject_state_store(state_store).await;
337
338 {
341 use crate::sources::component_graph_source::ComponentGraphSource;
342 let graph_source = ComponentGraphSource::new(
343 core.component_event_broadcast_tx.clone(),
344 core.config.id.clone(),
345 core.component_graph.clone(),
346 )
347 .map_err(|e| {
348 DrasiError::operation_failed(
349 "source",
350 "component-graph",
351 "add",
352 format!("Failed to create: {e}"),
353 )
354 })?;
355
356 let source_id = graph_source.id().to_string();
357 let source_type = graph_source.type_name().to_string();
358 {
359 let mut graph = core.component_graph.write().await;
360 let mut metadata = std::collections::HashMap::new();
361 metadata.insert("kind".to_string(), source_type);
362 metadata.insert(
363 "autoStart".to_string(),
364 graph_source.auto_start().to_string(),
365 );
366 graph.register_source(&source_id, metadata).map_err(|e| {
367 DrasiError::operation_failed(
368 "source",
369 &source_id,
370 "add",
371 format!("Failed to register: {e}"),
372 )
373 })?;
374 }
375 if let Err(e) = core.source_manager.provision_source(graph_source).await {
376 let mut graph = core.component_graph.write().await;
377 let _ = graph.deregister(&source_id);
378 return Err(DrasiError::operation_failed(
379 "source",
380 &source_id,
381 "add",
382 format!("Failed to provision: {e}"),
383 ));
384 }
385 }
386
387 for (source, extra_metadata) in self.source_instances {
390 let source_id = source.id().to_string();
391 let source_type = source.type_name().to_string();
392 let auto_start = source.auto_start();
393
394 {
395 let mut graph = core.component_graph.write().await;
396 let mut metadata = std::collections::HashMap::new();
397 metadata.insert("kind".to_string(), source_type);
398 metadata.insert("autoStart".to_string(), auto_start.to_string());
399 metadata.extend(extra_metadata);
400 graph.register_source(&source_id, metadata).map_err(|e| {
401 DrasiError::operation_failed(
402 "source",
403 &source_id,
404 "add",
405 format!("Failed to register: {e}"),
406 )
407 })?;
408 }
409 if let Err(e) = core.source_manager.provision_source(source).await {
410 let mut graph = core.component_graph.write().await;
411 let _ = graph.deregister(&source_id);
412 return Err(DrasiError::operation_failed(
413 "source",
414 &source_id,
415 "add",
416 format!("Failed to provision: {e}"),
417 ));
418 }
419 }
420
421 core.initialize().await?;
423
424 for (reaction, extra_metadata) in self.reaction_instances {
426 let reaction_id = reaction.id().to_string();
427 let reaction_type = reaction.type_name().to_string();
428 let query_ids = reaction.query_ids();
429
430 {
432 let mut graph = core.component_graph.write().await;
433 let mut metadata = std::collections::HashMap::new();
434 metadata.insert("kind".to_string(), reaction_type);
435 metadata.extend(extra_metadata);
436 graph
437 .register_reaction(&reaction_id, metadata, &query_ids)
438 .map_err(|e| {
439 DrasiError::operation_failed(
440 "reaction",
441 &reaction_id,
442 "add",
443 format!("Failed to register: {e}"),
444 )
445 })?;
446 }
447 if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
448 let mut graph = core.component_graph.write().await;
449 let _ = graph.deregister(&reaction_id);
450 return Err(DrasiError::operation_failed(
451 "reaction",
452 &reaction_id,
453 "add",
454 format!("Failed to provision: {e}"),
455 ));
456 }
457 }
458
459 Ok(core)
460 }
461}
462
463pub struct Query {
483 id: String,
484 query: String,
485 query_language: QueryLanguage,
486 sources: Vec<SourceSubscriptionConfig>,
487 middleware: Vec<SourceMiddlewareConfig>,
488 auto_start: bool,
489 joins: Option<Vec<QueryJoinConfig>>,
490 enable_bootstrap: bool,
491 bootstrap_buffer_size: usize,
492 priority_queue_capacity: Option<usize>,
493 dispatch_buffer_capacity: Option<usize>,
494 dispatch_mode: Option<DispatchMode>,
495 storage_backend: Option<crate::indexes::StorageBackendRef>,
496}
497
498impl Query {
499 pub fn cypher(id: impl Into<String>) -> Self {
501 Self {
502 id: id.into(),
503 query: String::new(),
504 query_language: QueryLanguage::Cypher,
505 sources: Vec::new(),
506 middleware: Vec::new(),
507 auto_start: true,
508 joins: None,
509 enable_bootstrap: true,
510 bootstrap_buffer_size: 10000,
511 priority_queue_capacity: None,
512 dispatch_buffer_capacity: None,
513 dispatch_mode: None,
514 storage_backend: None,
515 }
516 }
517
518 pub fn gql(id: impl Into<String>) -> Self {
520 Self {
521 id: id.into(),
522 query: String::new(),
523 query_language: QueryLanguage::GQL,
524 sources: Vec::new(),
525 middleware: Vec::new(),
526 auto_start: true,
527 joins: None,
528 enable_bootstrap: true,
529 bootstrap_buffer_size: 10000,
530 priority_queue_capacity: None,
531 dispatch_buffer_capacity: None,
532 dispatch_mode: None,
533 storage_backend: None,
534 }
535 }
536
537 pub fn query(mut self, query: impl Into<String>) -> Self {
539 self.query = query.into();
540 self
541 }
542
543 pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
545 self.sources.push(SourceSubscriptionConfig {
546 source_id: source_id.into(),
547 nodes: Vec::new(),
548 relations: Vec::new(),
549 pipeline: Vec::new(),
550 });
551 self
552 }
553
554 pub fn from_source_with_pipeline(
559 mut self,
560 source_id: impl Into<String>,
561 pipeline: Vec<String>,
562 ) -> Self {
563 self.sources.push(SourceSubscriptionConfig {
564 source_id: source_id.into(),
565 nodes: Vec::new(),
566 relations: Vec::new(),
567 pipeline,
568 });
569 self
570 }
571
572 pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
574 self.middleware.push(middleware);
575 self
576 }
577
578 pub fn auto_start(mut self, auto_start: bool) -> Self {
580 self.auto_start = auto_start;
581 self
582 }
583
584 pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
586 self.joins = Some(joins);
587 self
588 }
589
590 pub fn enable_bootstrap(mut self, enable: bool) -> Self {
592 self.enable_bootstrap = enable;
593 self
594 }
595
596 pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
598 self.bootstrap_buffer_size = size;
599 self
600 }
601
602 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
604 self.priority_queue_capacity = Some(capacity);
605 self
606 }
607
608 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
610 self.dispatch_buffer_capacity = Some(capacity);
611 self
612 }
613
614 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
616 self.dispatch_mode = Some(mode);
617 self
618 }
619
620 pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
622 self.storage_backend = Some(backend);
623 self
624 }
625
626 pub fn build(self) -> QueryConfig {
628 QueryConfig {
629 id: self.id,
630 query: self.query,
631 query_language: self.query_language,
632 sources: self.sources,
633 middleware: self.middleware,
634 auto_start: self.auto_start,
635 joins: self.joins,
636 enable_bootstrap: self.enable_bootstrap,
637 bootstrap_buffer_size: self.bootstrap_buffer_size,
638 priority_queue_capacity: self.priority_queue_capacity,
639 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
640 dispatch_mode: self.dispatch_mode,
641 storage_backend: self.storage_backend,
642 }
643 }
644}
645
646#[cfg(test)]
651mod tests {
652 use super::*;
653 use crate::DrasiLib;
654
655 #[test]
660 fn test_query_builder_cypher() {
661 let config = Query::cypher("test-query")
662 .query("MATCH (n) RETURN n")
663 .from_source("source1")
664 .auto_start(false)
665 .build();
666
667 assert_eq!(config.id, "test-query");
668 assert_eq!(config.query, "MATCH (n) RETURN n");
669 assert_eq!(config.query_language, QueryLanguage::Cypher);
670 assert!(!config.auto_start);
671 assert_eq!(config.sources.len(), 1);
672 assert_eq!(config.sources[0].source_id, "source1");
673 }
674
675 #[test]
676 fn test_query_builder_gql() {
677 let config = Query::gql("test-query")
678 .query("MATCH (n:Person) RETURN n.name")
679 .from_source("source1")
680 .build();
681
682 assert_eq!(config.query_language, QueryLanguage::GQL);
683 }
684
685 #[test]
686 fn test_query_builder_multiple_sources() {
687 let config = Query::cypher("test-query")
688 .query("MATCH (n) RETURN n")
689 .from_source("source1")
690 .from_source("source2")
691 .build();
692
693 assert_eq!(config.sources.len(), 2);
694 }
695
696 #[tokio::test]
697 async fn test_drasi_lib_builder_empty() {
698 let core = DrasiLibBuilder::new().build().await.unwrap();
699
700 assert!(!core.is_running().await);
701 }
702
703 #[tokio::test]
704 async fn test_drasi_lib_builder_with_id() {
705 let core = DrasiLibBuilder::new()
706 .with_id("test-server")
707 .build()
708 .await
709 .unwrap();
710
711 assert_eq!(core.get_config().id, "test-server");
712 }
713
714 #[tokio::test]
715 async fn test_drasi_lib_builder_with_query_no_source() {
716 let core = DrasiLibBuilder::new()
719 .with_id("test-server")
720 .with_query(
721 Query::cypher("query1")
722 .query("MATCH (n) RETURN n")
723 .auto_start(false)
725 .build(),
726 )
727 .build()
728 .await
729 .unwrap();
730
731 let queries = core.list_queries().await.unwrap();
732 assert_eq!(queries.len(), 1);
733 }
734
735 #[tokio::test]
740 async fn test_builder_creates_initialized_server() {
741 let core = DrasiLib::builder().with_id("builder-test").build().await;
742
743 assert!(core.is_ok(), "Builder should create initialized server");
744 let core = core.unwrap();
745 assert!(
746 core.state_guard.is_initialized(),
747 "Server should be initialized"
748 );
749 }
750
751 #[tokio::test]
752 async fn test_builder_with_query() {
753 let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
757 let core = DrasiLib::builder()
758 .with_id("complex-server")
759 .with_source(source)
760 .with_query(
761 Query::cypher("query1")
762 .query("MATCH (n) RETURN n")
763 .from_source("source1")
764 .build(),
765 )
766 .build()
767 .await;
768
769 assert!(core.is_ok(), "Builder with query should succeed");
770 let core = core.unwrap();
771 assert!(core.state_guard.is_initialized());
772 assert_eq!(core.config.queries.len(), 1);
773 }
774
775 #[test]
780 fn test_builder_with_id_sets_id() {
781 let builder = DrasiLibBuilder::new().with_id("my-server");
782 assert_eq!(builder.server_id, Some("my-server".to_string()));
783 }
784
785 #[test]
786 fn test_builder_with_id_accepts_string() {
787 let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
788 assert_eq!(builder.server_id, Some("owned-id".to_string()));
789 }
790
791 #[test]
792 fn test_builder_with_priority_queue_capacity() {
793 let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
794 assert_eq!(builder.priority_queue_capacity, Some(50000));
795 }
796
797 #[test]
798 fn test_builder_with_dispatch_buffer_capacity() {
799 let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
800 assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
801 }
802
803 #[test]
804 fn test_builder_with_query_adds_to_list() {
805 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
806 let builder = DrasiLibBuilder::new().with_query(q);
807 assert_eq!(builder.query_configs.len(), 1);
808 assert_eq!(builder.query_configs[0].id, "q1");
809 }
810
811 #[test]
812 fn test_builder_with_multiple_queries() {
813 let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
814 let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
815 let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
816 assert_eq!(builder.query_configs.len(), 2);
817 assert_eq!(builder.query_configs[0].id, "q1");
818 assert_eq!(builder.query_configs[1].id, "q2");
819 }
820
821 #[test]
822 fn test_builder_add_storage_backend() {
823 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
824
825 let backend = StorageBackendConfig {
826 id: "mem1".to_string(),
827 spec: StorageBackendSpec::Memory {
828 enable_archive: false,
829 },
830 };
831 let builder = DrasiLibBuilder::new().add_storage_backend(backend);
832 assert_eq!(builder.storage_backends.len(), 1);
833 assert_eq!(builder.storage_backends[0].id, "mem1");
834 }
835
836 #[test]
837 fn test_builder_add_multiple_storage_backends() {
838 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
839
840 let b1 = StorageBackendConfig {
841 id: "mem1".to_string(),
842 spec: StorageBackendSpec::Memory {
843 enable_archive: false,
844 },
845 };
846 let b2 = StorageBackendConfig {
847 id: "mem2".to_string(),
848 spec: StorageBackendSpec::Memory {
849 enable_archive: true,
850 },
851 };
852 let builder = DrasiLibBuilder::new()
853 .add_storage_backend(b1)
854 .add_storage_backend(b2);
855 assert_eq!(builder.storage_backends.len(), 2);
856 assert_eq!(builder.storage_backends[0].id, "mem1");
857 assert_eq!(builder.storage_backends[1].id, "mem2");
858 }
859
860 #[test]
861 fn test_builder_default_values() {
862 let builder = DrasiLibBuilder::new();
863 assert_eq!(builder.server_id, None);
864 assert_eq!(builder.priority_queue_capacity, None);
865 assert_eq!(builder.dispatch_buffer_capacity, None);
866 assert!(builder.storage_backends.is_empty());
867 assert!(builder.query_configs.is_empty());
868 assert!(builder.source_instances.is_empty());
869 assert!(builder.reaction_instances.is_empty());
870 assert!(builder.index_provider.is_none());
871 assert!(builder.state_store_provider.is_none());
872 }
873
874 #[test]
875 fn test_builder_fluent_chaining() {
876 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
877
878 let backend = StorageBackendConfig {
879 id: "mem".to_string(),
880 spec: StorageBackendSpec::Memory {
881 enable_archive: false,
882 },
883 };
884 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
885
886 let builder = DrasiLibBuilder::new()
887 .with_id("chained")
888 .with_priority_queue_capacity(20000)
889 .with_dispatch_buffer_capacity(3000)
890 .add_storage_backend(backend)
891 .with_query(q);
892
893 assert_eq!(builder.server_id, Some("chained".to_string()));
894 assert_eq!(builder.priority_queue_capacity, Some(20000));
895 assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
896 assert_eq!(builder.storage_backends.len(), 1);
897 assert_eq!(builder.query_configs.len(), 1);
898 }
899
900 #[tokio::test]
901 async fn test_builder_default_id_when_none_set() {
902 let core = DrasiLibBuilder::new().build().await.unwrap();
903 assert_eq!(core.get_config().id, "drasi-lib");
904 }
905
906 #[tokio::test]
907 async fn test_builder_with_storage_backend_builds_ok() {
908 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
909
910 let backend = StorageBackendConfig {
911 id: "test-mem".to_string(),
912 spec: StorageBackendSpec::Memory {
913 enable_archive: false,
914 },
915 };
916 let core = DrasiLibBuilder::new()
917 .add_storage_backend(backend)
918 .build()
919 .await;
920 assert!(core.is_ok(), "Builder with storage backend should succeed");
921 }
922
923 #[test]
928 fn test_query_cypher_sets_id_and_language() {
929 let q = Query::cypher("cypher-q");
930 assert_eq!(q.id, "cypher-q");
931 assert_eq!(q.query_language, QueryLanguage::Cypher);
932 }
933
934 #[test]
935 fn test_query_gql_sets_id_and_language() {
936 let q = Query::gql("gql-q");
937 assert_eq!(q.id, "gql-q");
938 assert_eq!(q.query_language, QueryLanguage::GQL);
939 }
940
941 #[test]
942 fn test_query_from_source_adds_source() {
943 let q = Query::cypher("q").from_source("src1");
944 assert_eq!(q.sources.len(), 1);
945 assert_eq!(q.sources[0].source_id, "src1");
946 }
947
948 #[test]
949 fn test_query_from_source_chaining() {
950 let q = Query::cypher("q")
951 .from_source("src1")
952 .from_source("src2")
953 .from_source("src3");
954 assert_eq!(q.sources.len(), 3);
955 assert_eq!(q.sources[0].source_id, "src1");
956 assert_eq!(q.sources[1].source_id, "src2");
957 assert_eq!(q.sources[2].source_id, "src3");
958 }
959
960 #[test]
961 fn test_query_auto_start_default_true() {
962 let q = Query::cypher("q");
963 assert!(q.auto_start);
964 }
965
966 #[test]
967 fn test_query_auto_start_false() {
968 let q = Query::cypher("q").auto_start(false);
969 assert!(!q.auto_start);
970 }
971
972 #[test]
973 fn test_query_enable_bootstrap_default_true() {
974 let q = Query::cypher("q");
975 assert!(q.enable_bootstrap);
976 }
977
978 #[test]
979 fn test_query_enable_bootstrap_false() {
980 let q = Query::cypher("q").enable_bootstrap(false);
981 assert!(!q.enable_bootstrap);
982 }
983
984 #[test]
985 fn test_query_bootstrap_buffer_size_default() {
986 let q = Query::cypher("q");
987 assert_eq!(q.bootstrap_buffer_size, 10000);
988 }
989
990 #[test]
991 fn test_query_with_bootstrap_buffer_size() {
992 let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
993 assert_eq!(q.bootstrap_buffer_size, 5000);
994 }
995
996 #[test]
997 fn test_query_with_dispatch_mode_broadcast() {
998 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
999 assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1000 }
1001
1002 #[test]
1003 fn test_query_with_dispatch_mode_channel() {
1004 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1005 assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1006 }
1007
1008 #[test]
1009 fn test_query_dispatch_mode_default_none() {
1010 let q = Query::cypher("q");
1011 assert_eq!(q.dispatch_mode, None);
1012 }
1013
1014 #[test]
1015 fn test_query_with_priority_queue_capacity() {
1016 let q = Query::cypher("q").with_priority_queue_capacity(50000);
1017 assert_eq!(q.priority_queue_capacity, Some(50000));
1018 }
1019
1020 #[test]
1021 fn test_query_priority_queue_capacity_default_none() {
1022 let q = Query::cypher("q");
1023 assert_eq!(q.priority_queue_capacity, None);
1024 }
1025
1026 #[test]
1027 fn test_query_with_dispatch_buffer_capacity() {
1028 let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1029 assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1030 }
1031
1032 #[test]
1033 fn test_query_dispatch_buffer_capacity_default_none() {
1034 let q = Query::cypher("q");
1035 assert_eq!(q.dispatch_buffer_capacity, None);
1036 }
1037
1038 #[test]
1039 fn test_query_build_propagates_all_fields() {
1040 let config = Query::cypher("full-query")
1041 .query("MATCH (n:Person) RETURN n.name")
1042 .from_source("source-a")
1043 .from_source("source-b")
1044 .auto_start(false)
1045 .enable_bootstrap(false)
1046 .with_bootstrap_buffer_size(5000)
1047 .with_priority_queue_capacity(50000)
1048 .with_dispatch_buffer_capacity(2500)
1049 .with_dispatch_mode(DispatchMode::Broadcast)
1050 .build();
1051
1052 assert_eq!(config.id, "full-query");
1053 assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1054 assert_eq!(config.query_language, QueryLanguage::Cypher);
1055 assert_eq!(config.sources.len(), 2);
1056 assert_eq!(config.sources[0].source_id, "source-a");
1057 assert_eq!(config.sources[1].source_id, "source-b");
1058 assert!(!config.auto_start);
1059 assert!(!config.enable_bootstrap);
1060 assert_eq!(config.bootstrap_buffer_size, 5000);
1061 assert_eq!(config.priority_queue_capacity, Some(50000));
1062 assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1063 assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1064 assert!(config.joins.is_none());
1065 assert!(config.middleware.is_empty());
1066 assert!(config.storage_backend.is_none());
1067 }
1068
1069 #[test]
1070 fn test_query_build_gql_propagates_language() {
1071 let config = Query::gql("gql-full")
1072 .query("MATCH (n) RETURN n")
1073 .from_source("src")
1074 .build();
1075
1076 assert_eq!(config.id, "gql-full");
1077 assert_eq!(config.query_language, QueryLanguage::GQL);
1078 assert_eq!(config.query, "MATCH (n) RETURN n");
1079 assert_eq!(config.sources.len(), 1);
1080 assert!(config.auto_start);
1082 assert!(config.enable_bootstrap);
1083 assert_eq!(config.bootstrap_buffer_size, 10000);
1084 }
1085
1086 #[test]
1087 fn test_query_build_defaults() {
1088 let config = Query::cypher("defaults-only").build();
1089
1090 assert_eq!(config.id, "defaults-only");
1091 assert_eq!(config.query, "");
1092 assert_eq!(config.query_language, QueryLanguage::Cypher);
1093 assert!(config.sources.is_empty());
1094 assert!(config.middleware.is_empty());
1095 assert!(config.auto_start);
1096 assert!(config.joins.is_none());
1097 assert!(config.enable_bootstrap);
1098 assert_eq!(config.bootstrap_buffer_size, 10000);
1099 assert_eq!(config.priority_queue_capacity, None);
1100 assert_eq!(config.dispatch_buffer_capacity, None);
1101 assert_eq!(config.dispatch_mode, None);
1102 assert!(config.storage_backend.is_none());
1103 }
1104}