1use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67 DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::secret_store::SecretStoreProvider;
76use crate::sources::Source as SourceTrait;
77use crate::state_store::StateStoreProvider;
78use drasi_core::models::SourceMiddlewareConfig;
79
80pub struct DrasiLibBuilder {
121 server_id: Option<String>,
122 priority_queue_capacity: Option<usize>,
123 dispatch_buffer_capacity: Option<usize>,
124 storage_backends: Vec<StorageBackendConfig>,
125 query_configs: Vec<QueryConfig>,
126 source_instances: Vec<(
127 Box<dyn SourceTrait>,
128 std::collections::HashMap<String, String>,
129 )>,
130 reaction_instances: Vec<(
131 Box<dyn ReactionTrait>,
132 std::collections::HashMap<String, String>,
133 )>,
134 bootstrap_metadata: Vec<(
137 String,
138 String,
139 std::collections::HashMap<String, serde_json::Value>,
140 )>,
141 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
142 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
143 identity_provider: Option<Arc<dyn IdentityProvider>>,
144 secret_store_provider: Option<Arc<dyn SecretStoreProvider>>,
145 default_recovery_policy: Option<crate::recovery::RecoveryPolicy>,
146}
147
148impl Default for DrasiLibBuilder {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl DrasiLibBuilder {
155 pub fn new() -> Self {
157 Self {
158 server_id: None,
159 priority_queue_capacity: None,
160 dispatch_buffer_capacity: None,
161 storage_backends: Vec::new(),
162 query_configs: Vec::new(),
163 source_instances: Vec::new(),
164 reaction_instances: Vec::new(),
165 bootstrap_metadata: Vec::new(),
166 index_provider: None,
167 state_store_provider: None,
168 identity_provider: None,
169 secret_store_provider: None,
170 default_recovery_policy: None,
171 }
172 }
173
174 pub fn with_id(mut self, id: impl Into<String>) -> Self {
176 self.server_id = Some(id.into());
177 self
178 }
179
180 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
182 self.priority_queue_capacity = Some(capacity);
183 self
184 }
185
186 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
188 self.dispatch_buffer_capacity = Some(capacity);
189 self
190 }
191
192 pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
194 self.storage_backends.push(config);
195 self
196 }
197
198 pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
220 self.index_provider = Some(provider);
221 self
222 }
223
224 pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
244 self.state_store_provider = Some(provider);
245 self
246 }
247
248 pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
269 self.identity_provider = Some(provider);
270 self
271 }
272
273 pub fn with_secret_store_provider(mut self, provider: Arc<dyn SecretStoreProvider>) -> Self {
298 self.secret_store_provider = Some(provider);
299 self
300 }
301
302 pub fn with_default_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
307 self.default_recovery_policy = Some(policy);
308 self
309 }
310
311 pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
325 self.source_instances
326 .push((Box::new(source), std::collections::HashMap::new()));
327 self
328 }
329
330 pub fn with_source_metadata(
335 mut self,
336 source: impl SourceTrait + 'static,
337 extra_metadata: std::collections::HashMap<String, String>,
338 ) -> Self {
339 self.source_instances
340 .push((Box::new(source), extra_metadata));
341 self
342 }
343
344 pub fn with_query(mut self, config: QueryConfig) -> Self {
346 self.query_configs.push(config);
347 self
348 }
349
350 pub fn with_bootstrap_for_source(
369 mut self,
370 source_id: impl Into<String>,
371 kind: impl Into<String>,
372 properties: std::collections::HashMap<String, serde_json::Value>,
373 ) -> Self {
374 self.bootstrap_metadata
375 .push((source_id.into(), kind.into(), properties));
376 self
377 }
378
379 pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
393 self.reaction_instances
394 .push((Box::new(reaction), std::collections::HashMap::new()));
395 self
396 }
397
398 pub fn with_reaction_metadata(
403 mut self,
404 reaction: impl ReactionTrait + 'static,
405 extra_metadata: std::collections::HashMap<String, String>,
406 ) -> Self {
407 self.reaction_instances
408 .push((Box::new(reaction), extra_metadata));
409 self
410 }
411
412 pub async fn build(self) -> Result<DrasiLib> {
417 let config = DrasiLibConfig {
419 id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
420 priority_queue_capacity: self.priority_queue_capacity,
421 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
422 storage_backends: self.storage_backends,
423 queries: self.query_configs.clone(),
424 };
425
426 config
428 .validate()
429 .map_err(|e| DrasiError::validation(e.to_string()))?;
430
431 let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
433 config,
434 self.index_provider,
435 self.state_store_provider,
436 self.identity_provider,
437 self.secret_store_provider,
438 self.default_recovery_policy,
439 ));
440 let mut core = DrasiLib::new(runtime_config);
441
442 let state_store = core.config.state_store_provider.clone();
444 core.source_manager
445 .inject_state_store(state_store.clone())
446 .await;
447 core.reaction_manager.inject_state_store(state_store).await;
448
449 {
452 use crate::sources::component_graph_source::ComponentGraphSource;
453 let graph_source = ComponentGraphSource::new(
454 core.component_event_broadcast_tx.clone(),
455 core.config.id.clone(),
456 core.component_graph.clone(),
457 )
458 .map_err(|e| {
459 DrasiError::operation_failed(
460 "source",
461 "component-graph",
462 "add",
463 format!("Failed to create: {e}"),
464 )
465 })?;
466
467 let source_id = graph_source.id().to_string();
468 let source_type = graph_source.type_name().to_string();
469 {
470 let mut graph = core.component_graph.write().await;
471 let mut metadata = std::collections::HashMap::new();
472 metadata.insert("kind".to_string(), source_type);
473 metadata.insert(
474 "autoStart".to_string(),
475 graph_source.auto_start().to_string(),
476 );
477 graph.register_source(&source_id, metadata).map_err(|e| {
478 DrasiError::operation_failed(
479 "source",
480 &source_id,
481 "add",
482 format!("Failed to register: {e}"),
483 )
484 })?;
485 }
486 if let Err(e) = core.source_manager.provision_source(graph_source).await {
487 let mut graph = core.component_graph.write().await;
488 let _ = graph.deregister(&source_id);
489 return Err(DrasiError::operation_failed(
490 "source",
491 &source_id,
492 "add",
493 format!("Failed to provision: {e}"),
494 ));
495 }
496 }
497
498 for (source, extra_metadata) in self.source_instances {
501 let source_id = source.id().to_string();
502 let source_type = source.type_name().to_string();
503 let auto_start = source.auto_start();
504
505 {
506 let mut graph = core.component_graph.write().await;
507 let mut metadata = std::collections::HashMap::new();
508 metadata.insert("kind".to_string(), source_type);
509 metadata.insert("autoStart".to_string(), auto_start.to_string());
510 metadata.extend(extra_metadata);
511 graph.register_source(&source_id, metadata).map_err(|e| {
512 DrasiError::operation_failed(
513 "source",
514 &source_id,
515 "add",
516 format!("Failed to register: {e}"),
517 )
518 })?;
519 }
520 if let Err(e) = core.source_manager.provision_source(source).await {
521 let mut graph = core.component_graph.write().await;
522 let _ = graph.deregister(&source_id);
523 return Err(DrasiError::operation_failed(
524 "source",
525 &source_id,
526 "add",
527 format!("Failed to provision: {e}"),
528 ));
529 }
530 }
531
532 for (source_id, kind, properties) in self.bootstrap_metadata {
536 let bp_id = format!("{source_id}-bootstrap");
537 let mut metadata = std::collections::HashMap::new();
538 metadata.insert("kind".to_string(), kind);
539 for (key, value) in properties {
540 metadata.insert(key, serde_json::to_string(&value).unwrap_or_default());
541 }
542 let mut graph = core.component_graph.write().await;
543 if let Err(e) =
544 graph.register_bootstrap_provider(&bp_id, metadata, &[source_id.clone()])
545 {
546 log::warn!(
547 "Failed to register bootstrap provider metadata for source '{source_id}': {e}"
548 );
549 }
550 }
551
552 core.initialize().await?;
554
555 for (reaction, extra_metadata) in self.reaction_instances {
557 let reaction_id = reaction.id().to_string();
558 let reaction_type = reaction.type_name().to_string();
559 let query_ids = reaction.query_ids();
560
561 {
563 let mut graph = core.component_graph.write().await;
564 let mut metadata = std::collections::HashMap::new();
565 metadata.insert("kind".to_string(), reaction_type);
566 metadata.extend(extra_metadata);
567 graph
568 .register_reaction(&reaction_id, metadata, &query_ids)
569 .map_err(|e| {
570 DrasiError::operation_failed(
571 "reaction",
572 &reaction_id,
573 "add",
574 format!("Failed to register: {e}"),
575 )
576 })?;
577 }
578 if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
579 let mut graph = core.component_graph.write().await;
580 let _ = graph.deregister(&reaction_id);
581 return Err(DrasiError::operation_failed(
582 "reaction",
583 &reaction_id,
584 "add",
585 format!("Failed to provision: {e}"),
586 ));
587 }
588 }
589
590 if core.config.identity_provider.is_some() {
594 let mut graph = core.component_graph.write().await;
595 let component_ids: Vec<String> = graph
596 .list_by_kind(&crate::component_graph::ComponentKind::Source)
597 .into_iter()
598 .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
599 .map(|(id, _)| id)
600 .collect();
601
602 let mut metadata = std::collections::HashMap::new();
603 metadata.insert("kind".to_string(), "identity_provider".to_string());
604 graph
605 .register_identity_provider("identity-provider", metadata, &component_ids)
606 .map_err(|e| {
607 DrasiError::operation_failed(
608 "identity_provider",
609 "identity-provider",
610 "add",
611 format!("Failed to register: {e}"),
612 )
613 })?;
614 }
615
616 Ok(core)
617 }
618}
619
620pub struct Query {
640 id: String,
641 query: String,
642 query_language: QueryLanguage,
643 sources: Vec<SourceSubscriptionConfig>,
644 middleware: Vec<SourceMiddlewareConfig>,
645 auto_start: bool,
646 joins: Option<Vec<QueryJoinConfig>>,
647 enable_bootstrap: bool,
648 bootstrap_buffer_size: usize,
649 priority_queue_capacity: Option<usize>,
650 dispatch_buffer_capacity: Option<usize>,
651 dispatch_mode: Option<DispatchMode>,
652 storage_backend: Option<crate::indexes::StorageBackendRef>,
653 recovery_policy: Option<crate::recovery::RecoveryPolicy>,
654 outbox_capacity: usize,
655 bootstrap_timeout_secs: u64,
656}
657
658impl Query {
659 pub fn cypher(id: impl Into<String>) -> Self {
661 Self {
662 id: id.into(),
663 query: String::new(),
664 query_language: QueryLanguage::Cypher,
665 sources: Vec::new(),
666 middleware: Vec::new(),
667 auto_start: true,
668 joins: None,
669 enable_bootstrap: true,
670 bootstrap_buffer_size: 10000,
671 priority_queue_capacity: None,
672 dispatch_buffer_capacity: None,
673 dispatch_mode: None,
674 storage_backend: None,
675 recovery_policy: None,
676 outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
677 bootstrap_timeout_secs: 300,
678 }
679 }
680
681 pub fn gql(id: impl Into<String>) -> Self {
683 Self {
684 id: id.into(),
685 query: String::new(),
686 query_language: QueryLanguage::GQL,
687 sources: Vec::new(),
688 middleware: Vec::new(),
689 auto_start: true,
690 joins: None,
691 enable_bootstrap: true,
692 bootstrap_buffer_size: 10000,
693 priority_queue_capacity: None,
694 dispatch_buffer_capacity: None,
695 dispatch_mode: None,
696 storage_backend: None,
697 recovery_policy: None,
698 outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
699 bootstrap_timeout_secs: 300,
700 }
701 }
702
703 pub fn query(mut self, query: impl Into<String>) -> Self {
705 self.query = query.into();
706 self
707 }
708
709 pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
711 self.sources.push(SourceSubscriptionConfig {
712 source_id: source_id.into(),
713 nodes: Vec::new(),
714 relations: Vec::new(),
715 pipeline: Vec::new(),
716 });
717 self
718 }
719
720 pub fn from_source_with_pipeline(
725 mut self,
726 source_id: impl Into<String>,
727 pipeline: Vec<String>,
728 ) -> Self {
729 self.sources.push(SourceSubscriptionConfig {
730 source_id: source_id.into(),
731 nodes: Vec::new(),
732 relations: Vec::new(),
733 pipeline,
734 });
735 self
736 }
737
738 pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
740 self.middleware.push(middleware);
741 self
742 }
743
744 pub fn auto_start(mut self, auto_start: bool) -> Self {
746 self.auto_start = auto_start;
747 self
748 }
749
750 pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
752 self.joins = Some(joins);
753 self
754 }
755
756 pub fn enable_bootstrap(mut self, enable: bool) -> Self {
758 self.enable_bootstrap = enable;
759 self
760 }
761
762 pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
764 self.bootstrap_buffer_size = size;
765 self
766 }
767
768 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
770 self.priority_queue_capacity = Some(capacity);
771 self
772 }
773
774 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
776 self.dispatch_buffer_capacity = Some(capacity);
777 self
778 }
779
780 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
782 self.dispatch_mode = Some(mode);
783 self
784 }
785
786 pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
788 self.storage_backend = Some(backend);
789 self
790 }
791
792 pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
795 self.recovery_policy = Some(policy);
796 self
797 }
798
799 pub fn with_outbox_capacity(mut self, capacity: usize) -> Self {
802 self.outbox_capacity = capacity;
803 self
804 }
805
806 pub fn with_bootstrap_timeout_secs(mut self, secs: u64) -> Self {
811 self.bootstrap_timeout_secs = secs;
812 self
813 }
814
815 pub fn build(self) -> QueryConfig {
817 QueryConfig {
818 id: self.id,
819 query: self.query,
820 query_language: self.query_language,
821 sources: self.sources,
822 middleware: self.middleware,
823 auto_start: self.auto_start,
824 joins: self.joins,
825 enable_bootstrap: self.enable_bootstrap,
826 bootstrap_buffer_size: self.bootstrap_buffer_size,
827 priority_queue_capacity: self.priority_queue_capacity,
828 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
829 dispatch_mode: self.dispatch_mode,
830 storage_backend: self.storage_backend,
831 recovery_policy: self.recovery_policy,
832 outbox_capacity: self.outbox_capacity,
833 bootstrap_timeout_secs: self.bootstrap_timeout_secs,
834 }
835 }
836}
837
838#[cfg(test)]
843mod tests {
844 use super::*;
845 use crate::DrasiLib;
846
847 #[test]
852 fn test_query_builder_cypher() {
853 let config = Query::cypher("test-query")
854 .query("MATCH (n) RETURN n")
855 .from_source("source1")
856 .auto_start(false)
857 .build();
858
859 assert_eq!(config.id, "test-query");
860 assert_eq!(config.query, "MATCH (n) RETURN n");
861 assert_eq!(config.query_language, QueryLanguage::Cypher);
862 assert!(!config.auto_start);
863 assert_eq!(config.sources.len(), 1);
864 assert_eq!(config.sources[0].source_id, "source1");
865 }
866
867 #[test]
868 fn test_query_builder_gql() {
869 let config = Query::gql("test-query")
870 .query("MATCH (n:Person) RETURN n.name")
871 .from_source("source1")
872 .build();
873
874 assert_eq!(config.query_language, QueryLanguage::GQL);
875 }
876
877 #[test]
878 fn test_query_builder_multiple_sources() {
879 let config = Query::cypher("test-query")
880 .query("MATCH (n) RETURN n")
881 .from_source("source1")
882 .from_source("source2")
883 .build();
884
885 assert_eq!(config.sources.len(), 2);
886 }
887
888 #[tokio::test]
889 async fn test_drasi_lib_builder_empty() {
890 let core = DrasiLibBuilder::new().build().await.unwrap();
891
892 assert!(!core.is_running().await);
893 }
894
895 #[tokio::test]
896 async fn test_drasi_lib_builder_with_id() {
897 let core = DrasiLibBuilder::new()
898 .with_id("test-server")
899 .build()
900 .await
901 .unwrap();
902
903 assert_eq!(core.get_config().id, "test-server");
904 }
905
906 #[tokio::test]
907 async fn test_drasi_lib_builder_with_query_no_source() {
908 let core = DrasiLibBuilder::new()
911 .with_id("test-server")
912 .with_query(
913 Query::cypher("query1")
914 .query("MATCH (n) RETURN n")
915 .auto_start(false)
917 .build(),
918 )
919 .build()
920 .await
921 .unwrap();
922
923 let queries = core.list_queries().await.unwrap();
924 assert_eq!(queries.len(), 1);
925 }
926
927 #[tokio::test]
932 async fn test_builder_creates_initialized_server() {
933 let core = DrasiLib::builder().with_id("builder-test").build().await;
934
935 assert!(core.is_ok(), "Builder should create initialized server");
936 let core = core.unwrap();
937 assert!(
938 core.state_guard.is_initialized(),
939 "Server should be initialized"
940 );
941 }
942
943 #[tokio::test]
944 async fn test_builder_with_query() {
945 let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
949 let core = DrasiLib::builder()
950 .with_id("complex-server")
951 .with_source(source)
952 .with_query(
953 Query::cypher("query1")
954 .query("MATCH (n) RETURN n")
955 .from_source("source1")
956 .build(),
957 )
958 .build()
959 .await;
960
961 assert!(core.is_ok(), "Builder with query should succeed");
962 let core = core.unwrap();
963 assert!(core.state_guard.is_initialized());
964 assert_eq!(core.config.queries.len(), 1);
965 }
966
967 #[test]
972 fn test_builder_with_id_sets_id() {
973 let builder = DrasiLibBuilder::new().with_id("my-server");
974 assert_eq!(builder.server_id, Some("my-server".to_string()));
975 }
976
977 #[test]
978 fn test_builder_with_id_accepts_string() {
979 let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
980 assert_eq!(builder.server_id, Some("owned-id".to_string()));
981 }
982
983 #[test]
984 fn test_builder_with_priority_queue_capacity() {
985 let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
986 assert_eq!(builder.priority_queue_capacity, Some(50000));
987 }
988
989 #[test]
990 fn test_builder_with_dispatch_buffer_capacity() {
991 let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
992 assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
993 }
994
995 #[test]
996 fn test_builder_with_query_adds_to_list() {
997 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
998 let builder = DrasiLibBuilder::new().with_query(q);
999 assert_eq!(builder.query_configs.len(), 1);
1000 assert_eq!(builder.query_configs[0].id, "q1");
1001 }
1002
1003 #[test]
1004 fn test_builder_with_multiple_queries() {
1005 let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
1006 let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
1007 let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
1008 assert_eq!(builder.query_configs.len(), 2);
1009 assert_eq!(builder.query_configs[0].id, "q1");
1010 assert_eq!(builder.query_configs[1].id, "q2");
1011 }
1012
1013 #[test]
1014 fn test_builder_add_storage_backend() {
1015 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1016
1017 let backend = StorageBackendConfig {
1018 id: "mem1".to_string(),
1019 spec: StorageBackendSpec::Memory {
1020 enable_archive: false,
1021 },
1022 };
1023 let builder = DrasiLibBuilder::new().add_storage_backend(backend);
1024 assert_eq!(builder.storage_backends.len(), 1);
1025 assert_eq!(builder.storage_backends[0].id, "mem1");
1026 }
1027
1028 #[test]
1029 fn test_builder_add_multiple_storage_backends() {
1030 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1031
1032 let b1 = StorageBackendConfig {
1033 id: "mem1".to_string(),
1034 spec: StorageBackendSpec::Memory {
1035 enable_archive: false,
1036 },
1037 };
1038 let b2 = StorageBackendConfig {
1039 id: "mem2".to_string(),
1040 spec: StorageBackendSpec::Memory {
1041 enable_archive: true,
1042 },
1043 };
1044 let builder = DrasiLibBuilder::new()
1045 .add_storage_backend(b1)
1046 .add_storage_backend(b2);
1047 assert_eq!(builder.storage_backends.len(), 2);
1048 assert_eq!(builder.storage_backends[0].id, "mem1");
1049 assert_eq!(builder.storage_backends[1].id, "mem2");
1050 }
1051
1052 #[test]
1053 fn test_builder_default_values() {
1054 let builder = DrasiLibBuilder::new();
1055 assert_eq!(builder.server_id, None);
1056 assert_eq!(builder.priority_queue_capacity, None);
1057 assert_eq!(builder.dispatch_buffer_capacity, None);
1058 assert!(builder.storage_backends.is_empty());
1059 assert!(builder.query_configs.is_empty());
1060 assert!(builder.source_instances.is_empty());
1061 assert!(builder.reaction_instances.is_empty());
1062 assert!(builder.index_provider.is_none());
1063 assert!(builder.state_store_provider.is_none());
1064 }
1065
1066 #[test]
1067 fn test_builder_fluent_chaining() {
1068 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1069
1070 let backend = StorageBackendConfig {
1071 id: "mem".to_string(),
1072 spec: StorageBackendSpec::Memory {
1073 enable_archive: false,
1074 },
1075 };
1076 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
1077
1078 let builder = DrasiLibBuilder::new()
1079 .with_id("chained")
1080 .with_priority_queue_capacity(20000)
1081 .with_dispatch_buffer_capacity(3000)
1082 .add_storage_backend(backend)
1083 .with_query(q);
1084
1085 assert_eq!(builder.server_id, Some("chained".to_string()));
1086 assert_eq!(builder.priority_queue_capacity, Some(20000));
1087 assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
1088 assert_eq!(builder.storage_backends.len(), 1);
1089 assert_eq!(builder.query_configs.len(), 1);
1090 }
1091
1092 #[tokio::test]
1093 async fn test_builder_default_id_when_none_set() {
1094 let core = DrasiLibBuilder::new().build().await.unwrap();
1095 assert_eq!(core.get_config().id, "drasi-lib");
1096 }
1097
1098 #[tokio::test]
1099 async fn test_builder_with_storage_backend_builds_ok() {
1100 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1101
1102 let backend = StorageBackendConfig {
1103 id: "test-mem".to_string(),
1104 spec: StorageBackendSpec::Memory {
1105 enable_archive: false,
1106 },
1107 };
1108 let core = DrasiLibBuilder::new()
1109 .add_storage_backend(backend)
1110 .build()
1111 .await;
1112 assert!(core.is_ok(), "Builder with storage backend should succeed");
1113 }
1114
1115 #[test]
1120 fn test_query_cypher_sets_id_and_language() {
1121 let q = Query::cypher("cypher-q");
1122 assert_eq!(q.id, "cypher-q");
1123 assert_eq!(q.query_language, QueryLanguage::Cypher);
1124 }
1125
1126 #[test]
1127 fn test_query_gql_sets_id_and_language() {
1128 let q = Query::gql("gql-q");
1129 assert_eq!(q.id, "gql-q");
1130 assert_eq!(q.query_language, QueryLanguage::GQL);
1131 }
1132
1133 #[test]
1134 fn test_query_from_source_adds_source() {
1135 let q = Query::cypher("q").from_source("src1");
1136 assert_eq!(q.sources.len(), 1);
1137 assert_eq!(q.sources[0].source_id, "src1");
1138 }
1139
1140 #[test]
1141 fn test_query_from_source_chaining() {
1142 let q = Query::cypher("q")
1143 .from_source("src1")
1144 .from_source("src2")
1145 .from_source("src3");
1146 assert_eq!(q.sources.len(), 3);
1147 assert_eq!(q.sources[0].source_id, "src1");
1148 assert_eq!(q.sources[1].source_id, "src2");
1149 assert_eq!(q.sources[2].source_id, "src3");
1150 }
1151
1152 #[test]
1153 fn test_query_auto_start_default_true() {
1154 let q = Query::cypher("q");
1155 assert!(q.auto_start);
1156 }
1157
1158 #[test]
1159 fn test_query_auto_start_false() {
1160 let q = Query::cypher("q").auto_start(false);
1161 assert!(!q.auto_start);
1162 }
1163
1164 #[test]
1165 fn test_query_enable_bootstrap_default_true() {
1166 let q = Query::cypher("q");
1167 assert!(q.enable_bootstrap);
1168 }
1169
1170 #[test]
1171 fn test_query_enable_bootstrap_false() {
1172 let q = Query::cypher("q").enable_bootstrap(false);
1173 assert!(!q.enable_bootstrap);
1174 }
1175
1176 #[test]
1177 fn test_query_bootstrap_buffer_size_default() {
1178 let q = Query::cypher("q");
1179 assert_eq!(q.bootstrap_buffer_size, 10000);
1180 }
1181
1182 #[test]
1183 fn test_query_with_bootstrap_buffer_size() {
1184 let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1185 assert_eq!(q.bootstrap_buffer_size, 5000);
1186 }
1187
1188 #[test]
1189 fn test_query_with_dispatch_mode_broadcast() {
1190 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1191 assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1192 }
1193
1194 #[test]
1195 fn test_query_with_dispatch_mode_channel() {
1196 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1197 assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1198 }
1199
1200 #[test]
1201 fn test_query_dispatch_mode_default_none() {
1202 let q = Query::cypher("q");
1203 assert_eq!(q.dispatch_mode, None);
1204 }
1205
1206 #[test]
1207 fn test_query_with_priority_queue_capacity() {
1208 let q = Query::cypher("q").with_priority_queue_capacity(50000);
1209 assert_eq!(q.priority_queue_capacity, Some(50000));
1210 }
1211
1212 #[test]
1213 fn test_query_priority_queue_capacity_default_none() {
1214 let q = Query::cypher("q");
1215 assert_eq!(q.priority_queue_capacity, None);
1216 }
1217
1218 #[test]
1219 fn test_query_with_dispatch_buffer_capacity() {
1220 let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1221 assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1222 }
1223
1224 #[test]
1225 fn test_query_dispatch_buffer_capacity_default_none() {
1226 let q = Query::cypher("q");
1227 assert_eq!(q.dispatch_buffer_capacity, None);
1228 }
1229
1230 #[test]
1231 fn test_query_build_propagates_all_fields() {
1232 let config = Query::cypher("full-query")
1233 .query("MATCH (n:Person) RETURN n.name")
1234 .from_source("source-a")
1235 .from_source("source-b")
1236 .auto_start(false)
1237 .enable_bootstrap(false)
1238 .with_bootstrap_buffer_size(5000)
1239 .with_priority_queue_capacity(50000)
1240 .with_dispatch_buffer_capacity(2500)
1241 .with_dispatch_mode(DispatchMode::Broadcast)
1242 .build();
1243
1244 assert_eq!(config.id, "full-query");
1245 assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1246 assert_eq!(config.query_language, QueryLanguage::Cypher);
1247 assert_eq!(config.sources.len(), 2);
1248 assert_eq!(config.sources[0].source_id, "source-a");
1249 assert_eq!(config.sources[1].source_id, "source-b");
1250 assert!(!config.auto_start);
1251 assert!(!config.enable_bootstrap);
1252 assert_eq!(config.bootstrap_buffer_size, 5000);
1253 assert_eq!(config.priority_queue_capacity, Some(50000));
1254 assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1255 assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1256 assert!(config.joins.is_none());
1257 assert!(config.middleware.is_empty());
1258 assert!(config.storage_backend.is_none());
1259 }
1260
1261 #[test]
1262 fn test_query_build_gql_propagates_language() {
1263 let config = Query::gql("gql-full")
1264 .query("MATCH (n) RETURN n")
1265 .from_source("src")
1266 .build();
1267
1268 assert_eq!(config.id, "gql-full");
1269 assert_eq!(config.query_language, QueryLanguage::GQL);
1270 assert_eq!(config.query, "MATCH (n) RETURN n");
1271 assert_eq!(config.sources.len(), 1);
1272 assert!(config.auto_start);
1274 assert!(config.enable_bootstrap);
1275 assert_eq!(config.bootstrap_buffer_size, 10000);
1276 }
1277
1278 #[test]
1279 fn test_query_build_defaults() {
1280 let config = Query::cypher("defaults-only").build();
1281
1282 assert_eq!(config.id, "defaults-only");
1283 assert_eq!(config.query, "");
1284 assert_eq!(config.query_language, QueryLanguage::Cypher);
1285 assert!(config.sources.is_empty());
1286 assert!(config.middleware.is_empty());
1287 assert!(config.auto_start);
1288 assert!(config.joins.is_none());
1289 assert!(config.enable_bootstrap);
1290 assert_eq!(config.bootstrap_buffer_size, 10000);
1291 assert_eq!(config.priority_queue_capacity, None);
1292 assert_eq!(config.dispatch_buffer_capacity, None);
1293 assert_eq!(config.dispatch_mode, None);
1294 assert!(config.storage_backend.is_none());
1295 }
1296}