1use std::sync::Arc;
64
65use crate::channels::DispatchMode;
66use crate::config::{
67 DrasiLibConfig, QueryConfig, QueryJoinConfig, QueryLanguage, SourceSubscriptionConfig,
68};
69use crate::error::{DrasiError, Result};
70use crate::identity::IdentityProvider;
71use crate::indexes::IndexBackendPlugin;
72use crate::indexes::StorageBackendConfig;
73use crate::lib_core::DrasiLib;
74use crate::reactions::Reaction as ReactionTrait;
75use crate::sources::Source as SourceTrait;
76use crate::state_store::StateStoreProvider;
77use drasi_core::models::SourceMiddlewareConfig;
78
79pub struct DrasiLibBuilder {
120 server_id: Option<String>,
121 priority_queue_capacity: Option<usize>,
122 dispatch_buffer_capacity: Option<usize>,
123 storage_backends: Vec<StorageBackendConfig>,
124 query_configs: Vec<QueryConfig>,
125 source_instances: Vec<(
126 Box<dyn SourceTrait>,
127 std::collections::HashMap<String, String>,
128 )>,
129 reaction_instances: Vec<(
130 Box<dyn ReactionTrait>,
131 std::collections::HashMap<String, String>,
132 )>,
133 bootstrap_metadata: Vec<(
136 String,
137 String,
138 std::collections::HashMap<String, serde_json::Value>,
139 )>,
140 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
141 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
142 identity_provider: Option<Arc<dyn IdentityProvider>>,
143 default_recovery_policy: Option<crate::recovery::RecoveryPolicy>,
144}
145
146impl Default for DrasiLibBuilder {
147 fn default() -> Self {
148 Self::new()
149 }
150}
151
152impl DrasiLibBuilder {
153 pub fn new() -> Self {
155 Self {
156 server_id: None,
157 priority_queue_capacity: None,
158 dispatch_buffer_capacity: None,
159 storage_backends: Vec::new(),
160 query_configs: Vec::new(),
161 source_instances: Vec::new(),
162 reaction_instances: Vec::new(),
163 bootstrap_metadata: Vec::new(),
164 index_provider: None,
165 state_store_provider: None,
166 identity_provider: None,
167 default_recovery_policy: None,
168 }
169 }
170
171 pub fn with_id(mut self, id: impl Into<String>) -> Self {
173 self.server_id = Some(id.into());
174 self
175 }
176
177 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
179 self.priority_queue_capacity = Some(capacity);
180 self
181 }
182
183 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
185 self.dispatch_buffer_capacity = Some(capacity);
186 self
187 }
188
189 pub fn add_storage_backend(mut self, config: StorageBackendConfig) -> Self {
191 self.storage_backends.push(config);
192 self
193 }
194
195 pub fn with_index_provider(mut self, provider: Arc<dyn IndexBackendPlugin>) -> Self {
217 self.index_provider = Some(provider);
218 self
219 }
220
221 pub fn with_state_store_provider(mut self, provider: Arc<dyn StateStoreProvider>) -> Self {
241 self.state_store_provider = Some(provider);
242 self
243 }
244
245 pub fn with_identity_provider(mut self, provider: Arc<dyn IdentityProvider>) -> Self {
266 self.identity_provider = Some(provider);
267 self
268 }
269
270 pub fn with_default_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
275 self.default_recovery_policy = Some(policy);
276 self
277 }
278
279 pub fn with_source(mut self, source: impl SourceTrait + 'static) -> Self {
293 self.source_instances
294 .push((Box::new(source), std::collections::HashMap::new()));
295 self
296 }
297
298 pub fn with_source_metadata(
303 mut self,
304 source: impl SourceTrait + 'static,
305 extra_metadata: std::collections::HashMap<String, String>,
306 ) -> Self {
307 self.source_instances
308 .push((Box::new(source), extra_metadata));
309 self
310 }
311
312 pub fn with_query(mut self, config: QueryConfig) -> Self {
314 self.query_configs.push(config);
315 self
316 }
317
318 pub fn with_bootstrap_for_source(
337 mut self,
338 source_id: impl Into<String>,
339 kind: impl Into<String>,
340 properties: std::collections::HashMap<String, serde_json::Value>,
341 ) -> Self {
342 self.bootstrap_metadata
343 .push((source_id.into(), kind.into(), properties));
344 self
345 }
346
347 pub fn with_reaction(mut self, reaction: impl ReactionTrait + 'static) -> Self {
361 self.reaction_instances
362 .push((Box::new(reaction), std::collections::HashMap::new()));
363 self
364 }
365
366 pub fn with_reaction_metadata(
371 mut self,
372 reaction: impl ReactionTrait + 'static,
373 extra_metadata: std::collections::HashMap<String, String>,
374 ) -> Self {
375 self.reaction_instances
376 .push((Box::new(reaction), extra_metadata));
377 self
378 }
379
380 pub async fn build(self) -> Result<DrasiLib> {
385 let config = DrasiLibConfig {
387 id: self.server_id.unwrap_or_else(|| "drasi-lib".to_string()),
388 priority_queue_capacity: self.priority_queue_capacity,
389 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
390 storage_backends: self.storage_backends,
391 queries: self.query_configs.clone(),
392 };
393
394 config
396 .validate()
397 .map_err(|e| DrasiError::validation(e.to_string()))?;
398
399 let runtime_config = Arc::new(crate::config::RuntimeConfig::new(
401 config,
402 self.index_provider,
403 self.state_store_provider,
404 self.identity_provider,
405 self.default_recovery_policy,
406 ));
407 let mut core = DrasiLib::new(runtime_config);
408
409 let state_store = core.config.state_store_provider.clone();
411 core.source_manager
412 .inject_state_store(state_store.clone())
413 .await;
414 core.reaction_manager.inject_state_store(state_store).await;
415
416 {
419 use crate::sources::component_graph_source::ComponentGraphSource;
420 let graph_source = ComponentGraphSource::new(
421 core.component_event_broadcast_tx.clone(),
422 core.config.id.clone(),
423 core.component_graph.clone(),
424 )
425 .map_err(|e| {
426 DrasiError::operation_failed(
427 "source",
428 "component-graph",
429 "add",
430 format!("Failed to create: {e}"),
431 )
432 })?;
433
434 let source_id = graph_source.id().to_string();
435 let source_type = graph_source.type_name().to_string();
436 {
437 let mut graph = core.component_graph.write().await;
438 let mut metadata = std::collections::HashMap::new();
439 metadata.insert("kind".to_string(), source_type);
440 metadata.insert(
441 "autoStart".to_string(),
442 graph_source.auto_start().to_string(),
443 );
444 graph.register_source(&source_id, metadata).map_err(|e| {
445 DrasiError::operation_failed(
446 "source",
447 &source_id,
448 "add",
449 format!("Failed to register: {e}"),
450 )
451 })?;
452 }
453 if let Err(e) = core.source_manager.provision_source(graph_source).await {
454 let mut graph = core.component_graph.write().await;
455 let _ = graph.deregister(&source_id);
456 return Err(DrasiError::operation_failed(
457 "source",
458 &source_id,
459 "add",
460 format!("Failed to provision: {e}"),
461 ));
462 }
463 }
464
465 for (source, extra_metadata) in self.source_instances {
468 let source_id = source.id().to_string();
469 let source_type = source.type_name().to_string();
470 let auto_start = source.auto_start();
471
472 {
473 let mut graph = core.component_graph.write().await;
474 let mut metadata = std::collections::HashMap::new();
475 metadata.insert("kind".to_string(), source_type);
476 metadata.insert("autoStart".to_string(), auto_start.to_string());
477 metadata.extend(extra_metadata);
478 graph.register_source(&source_id, metadata).map_err(|e| {
479 DrasiError::operation_failed(
480 "source",
481 &source_id,
482 "add",
483 format!("Failed to register: {e}"),
484 )
485 })?;
486 }
487 if let Err(e) = core.source_manager.provision_source(source).await {
488 let mut graph = core.component_graph.write().await;
489 let _ = graph.deregister(&source_id);
490 return Err(DrasiError::operation_failed(
491 "source",
492 &source_id,
493 "add",
494 format!("Failed to provision: {e}"),
495 ));
496 }
497 }
498
499 for (source_id, kind, properties) in self.bootstrap_metadata {
503 let bp_id = format!("{source_id}-bootstrap");
504 let mut metadata = std::collections::HashMap::new();
505 metadata.insert("kind".to_string(), kind);
506 for (key, value) in properties {
507 metadata.insert(key, serde_json::to_string(&value).unwrap_or_default());
508 }
509 let mut graph = core.component_graph.write().await;
510 if let Err(e) =
511 graph.register_bootstrap_provider(&bp_id, metadata, &[source_id.clone()])
512 {
513 log::warn!(
514 "Failed to register bootstrap provider metadata for source '{source_id}': {e}"
515 );
516 }
517 }
518
519 core.initialize().await?;
521
522 for (reaction, extra_metadata) in self.reaction_instances {
524 let reaction_id = reaction.id().to_string();
525 let reaction_type = reaction.type_name().to_string();
526 let query_ids = reaction.query_ids();
527
528 {
530 let mut graph = core.component_graph.write().await;
531 let mut metadata = std::collections::HashMap::new();
532 metadata.insert("kind".to_string(), reaction_type);
533 metadata.extend(extra_metadata);
534 graph
535 .register_reaction(&reaction_id, metadata, &query_ids)
536 .map_err(|e| {
537 DrasiError::operation_failed(
538 "reaction",
539 &reaction_id,
540 "add",
541 format!("Failed to register: {e}"),
542 )
543 })?;
544 }
545 if let Err(e) = core.reaction_manager.provision_reaction(reaction).await {
546 let mut graph = core.component_graph.write().await;
547 let _ = graph.deregister(&reaction_id);
548 return Err(DrasiError::operation_failed(
549 "reaction",
550 &reaction_id,
551 "add",
552 format!("Failed to provision: {e}"),
553 ));
554 }
555 }
556
557 if core.config.identity_provider.is_some() {
561 let mut graph = core.component_graph.write().await;
562 let component_ids: Vec<String> = graph
563 .list_by_kind(&crate::component_graph::ComponentKind::Source)
564 .into_iter()
565 .chain(graph.list_by_kind(&crate::component_graph::ComponentKind::Reaction))
566 .map(|(id, _)| id)
567 .collect();
568
569 let mut metadata = std::collections::HashMap::new();
570 metadata.insert("kind".to_string(), "identity_provider".to_string());
571 graph
572 .register_identity_provider("identity-provider", metadata, &component_ids)
573 .map_err(|e| {
574 DrasiError::operation_failed(
575 "identity_provider",
576 "identity-provider",
577 "add",
578 format!("Failed to register: {e}"),
579 )
580 })?;
581 }
582
583 Ok(core)
584 }
585}
586
587pub struct Query {
607 id: String,
608 query: String,
609 query_language: QueryLanguage,
610 sources: Vec<SourceSubscriptionConfig>,
611 middleware: Vec<SourceMiddlewareConfig>,
612 auto_start: bool,
613 joins: Option<Vec<QueryJoinConfig>>,
614 enable_bootstrap: bool,
615 bootstrap_buffer_size: usize,
616 priority_queue_capacity: Option<usize>,
617 dispatch_buffer_capacity: Option<usize>,
618 dispatch_mode: Option<DispatchMode>,
619 storage_backend: Option<crate::indexes::StorageBackendRef>,
620 recovery_policy: Option<crate::recovery::RecoveryPolicy>,
621 outbox_capacity: usize,
622 bootstrap_timeout_secs: u64,
623}
624
625impl Query {
626 pub fn cypher(id: impl Into<String>) -> Self {
628 Self {
629 id: id.into(),
630 query: String::new(),
631 query_language: QueryLanguage::Cypher,
632 sources: Vec::new(),
633 middleware: Vec::new(),
634 auto_start: true,
635 joins: None,
636 enable_bootstrap: true,
637 bootstrap_buffer_size: 10000,
638 priority_queue_capacity: None,
639 dispatch_buffer_capacity: None,
640 dispatch_mode: None,
641 storage_backend: None,
642 recovery_policy: None,
643 outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
644 bootstrap_timeout_secs: 300,
645 }
646 }
647
648 pub fn gql(id: impl Into<String>) -> Self {
650 Self {
651 id: id.into(),
652 query: String::new(),
653 query_language: QueryLanguage::GQL,
654 sources: Vec::new(),
655 middleware: Vec::new(),
656 auto_start: true,
657 joins: None,
658 enable_bootstrap: true,
659 bootstrap_buffer_size: 10000,
660 priority_queue_capacity: None,
661 dispatch_buffer_capacity: None,
662 dispatch_mode: None,
663 storage_backend: None,
664 recovery_policy: None,
665 outbox_capacity: crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY,
666 bootstrap_timeout_secs: 300,
667 }
668 }
669
670 pub fn query(mut self, query: impl Into<String>) -> Self {
672 self.query = query.into();
673 self
674 }
675
676 pub fn from_source(mut self, source_id: impl Into<String>) -> Self {
678 self.sources.push(SourceSubscriptionConfig {
679 source_id: source_id.into(),
680 nodes: Vec::new(),
681 relations: Vec::new(),
682 pipeline: Vec::new(),
683 });
684 self
685 }
686
687 pub fn from_source_with_pipeline(
692 mut self,
693 source_id: impl Into<String>,
694 pipeline: Vec<String>,
695 ) -> Self {
696 self.sources.push(SourceSubscriptionConfig {
697 source_id: source_id.into(),
698 nodes: Vec::new(),
699 relations: Vec::new(),
700 pipeline,
701 });
702 self
703 }
704
705 pub fn with_middleware(mut self, middleware: SourceMiddlewareConfig) -> Self {
707 self.middleware.push(middleware);
708 self
709 }
710
711 pub fn auto_start(mut self, auto_start: bool) -> Self {
713 self.auto_start = auto_start;
714 self
715 }
716
717 pub fn with_joins(mut self, joins: Vec<QueryJoinConfig>) -> Self {
719 self.joins = Some(joins);
720 self
721 }
722
723 pub fn enable_bootstrap(mut self, enable: bool) -> Self {
725 self.enable_bootstrap = enable;
726 self
727 }
728
729 pub fn with_bootstrap_buffer_size(mut self, size: usize) -> Self {
731 self.bootstrap_buffer_size = size;
732 self
733 }
734
735 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
737 self.priority_queue_capacity = Some(capacity);
738 self
739 }
740
741 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
743 self.dispatch_buffer_capacity = Some(capacity);
744 self
745 }
746
747 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
749 self.dispatch_mode = Some(mode);
750 self
751 }
752
753 pub fn with_storage_backend(mut self, backend: crate::indexes::StorageBackendRef) -> Self {
755 self.storage_backend = Some(backend);
756 self
757 }
758
759 pub fn with_recovery_policy(mut self, policy: crate::recovery::RecoveryPolicy) -> Self {
762 self.recovery_policy = Some(policy);
763 self
764 }
765
766 pub fn with_outbox_capacity(mut self, capacity: usize) -> Self {
769 self.outbox_capacity = capacity;
770 self
771 }
772
773 pub fn with_bootstrap_timeout_secs(mut self, secs: u64) -> Self {
778 self.bootstrap_timeout_secs = secs;
779 self
780 }
781
782 pub fn build(self) -> QueryConfig {
784 QueryConfig {
785 id: self.id,
786 query: self.query,
787 query_language: self.query_language,
788 sources: self.sources,
789 middleware: self.middleware,
790 auto_start: self.auto_start,
791 joins: self.joins,
792 enable_bootstrap: self.enable_bootstrap,
793 bootstrap_buffer_size: self.bootstrap_buffer_size,
794 priority_queue_capacity: self.priority_queue_capacity,
795 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
796 dispatch_mode: self.dispatch_mode,
797 storage_backend: self.storage_backend,
798 recovery_policy: self.recovery_policy,
799 outbox_capacity: self.outbox_capacity,
800 bootstrap_timeout_secs: self.bootstrap_timeout_secs,
801 }
802 }
803}
804
805#[cfg(test)]
810mod tests {
811 use super::*;
812 use crate::DrasiLib;
813
814 #[test]
819 fn test_query_builder_cypher() {
820 let config = Query::cypher("test-query")
821 .query("MATCH (n) RETURN n")
822 .from_source("source1")
823 .auto_start(false)
824 .build();
825
826 assert_eq!(config.id, "test-query");
827 assert_eq!(config.query, "MATCH (n) RETURN n");
828 assert_eq!(config.query_language, QueryLanguage::Cypher);
829 assert!(!config.auto_start);
830 assert_eq!(config.sources.len(), 1);
831 assert_eq!(config.sources[0].source_id, "source1");
832 }
833
834 #[test]
835 fn test_query_builder_gql() {
836 let config = Query::gql("test-query")
837 .query("MATCH (n:Person) RETURN n.name")
838 .from_source("source1")
839 .build();
840
841 assert_eq!(config.query_language, QueryLanguage::GQL);
842 }
843
844 #[test]
845 fn test_query_builder_multiple_sources() {
846 let config = Query::cypher("test-query")
847 .query("MATCH (n) RETURN n")
848 .from_source("source1")
849 .from_source("source2")
850 .build();
851
852 assert_eq!(config.sources.len(), 2);
853 }
854
855 #[tokio::test]
856 async fn test_drasi_lib_builder_empty() {
857 let core = DrasiLibBuilder::new().build().await.unwrap();
858
859 assert!(!core.is_running().await);
860 }
861
862 #[tokio::test]
863 async fn test_drasi_lib_builder_with_id() {
864 let core = DrasiLibBuilder::new()
865 .with_id("test-server")
866 .build()
867 .await
868 .unwrap();
869
870 assert_eq!(core.get_config().id, "test-server");
871 }
872
873 #[tokio::test]
874 async fn test_drasi_lib_builder_with_query_no_source() {
875 let core = DrasiLibBuilder::new()
878 .with_id("test-server")
879 .with_query(
880 Query::cypher("query1")
881 .query("MATCH (n) RETURN n")
882 .auto_start(false)
884 .build(),
885 )
886 .build()
887 .await
888 .unwrap();
889
890 let queries = core.list_queries().await.unwrap();
891 assert_eq!(queries.len(), 1);
892 }
893
894 #[tokio::test]
899 async fn test_builder_creates_initialized_server() {
900 let core = DrasiLib::builder().with_id("builder-test").build().await;
901
902 assert!(core.is_ok(), "Builder should create initialized server");
903 let core = core.unwrap();
904 assert!(
905 core.state_guard.is_initialized(),
906 "Server should be initialized"
907 );
908 }
909
910 #[tokio::test]
911 async fn test_builder_with_query() {
912 let source = crate::sources::tests::TestMockSource::new("source1".to_string()).unwrap();
916 let core = DrasiLib::builder()
917 .with_id("complex-server")
918 .with_source(source)
919 .with_query(
920 Query::cypher("query1")
921 .query("MATCH (n) RETURN n")
922 .from_source("source1")
923 .build(),
924 )
925 .build()
926 .await;
927
928 assert!(core.is_ok(), "Builder with query should succeed");
929 let core = core.unwrap();
930 assert!(core.state_guard.is_initialized());
931 assert_eq!(core.config.queries.len(), 1);
932 }
933
934 #[test]
939 fn test_builder_with_id_sets_id() {
940 let builder = DrasiLibBuilder::new().with_id("my-server");
941 assert_eq!(builder.server_id, Some("my-server".to_string()));
942 }
943
944 #[test]
945 fn test_builder_with_id_accepts_string() {
946 let builder = DrasiLibBuilder::new().with_id(String::from("owned-id"));
947 assert_eq!(builder.server_id, Some("owned-id".to_string()));
948 }
949
950 #[test]
951 fn test_builder_with_priority_queue_capacity() {
952 let builder = DrasiLibBuilder::new().with_priority_queue_capacity(50000);
953 assert_eq!(builder.priority_queue_capacity, Some(50000));
954 }
955
956 #[test]
957 fn test_builder_with_dispatch_buffer_capacity() {
958 let builder = DrasiLibBuilder::new().with_dispatch_buffer_capacity(2000);
959 assert_eq!(builder.dispatch_buffer_capacity, Some(2000));
960 }
961
962 #[test]
963 fn test_builder_with_query_adds_to_list() {
964 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
965 let builder = DrasiLibBuilder::new().with_query(q);
966 assert_eq!(builder.query_configs.len(), 1);
967 assert_eq!(builder.query_configs[0].id, "q1");
968 }
969
970 #[test]
971 fn test_builder_with_multiple_queries() {
972 let q1 = Query::cypher("q1").query("MATCH (a) RETURN a").build();
973 let q2 = Query::gql("q2").query("MATCH (b) RETURN b").build();
974 let builder = DrasiLibBuilder::new().with_query(q1).with_query(q2);
975 assert_eq!(builder.query_configs.len(), 2);
976 assert_eq!(builder.query_configs[0].id, "q1");
977 assert_eq!(builder.query_configs[1].id, "q2");
978 }
979
980 #[test]
981 fn test_builder_add_storage_backend() {
982 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
983
984 let backend = StorageBackendConfig {
985 id: "mem1".to_string(),
986 spec: StorageBackendSpec::Memory {
987 enable_archive: false,
988 },
989 };
990 let builder = DrasiLibBuilder::new().add_storage_backend(backend);
991 assert_eq!(builder.storage_backends.len(), 1);
992 assert_eq!(builder.storage_backends[0].id, "mem1");
993 }
994
995 #[test]
996 fn test_builder_add_multiple_storage_backends() {
997 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
998
999 let b1 = StorageBackendConfig {
1000 id: "mem1".to_string(),
1001 spec: StorageBackendSpec::Memory {
1002 enable_archive: false,
1003 },
1004 };
1005 let b2 = StorageBackendConfig {
1006 id: "mem2".to_string(),
1007 spec: StorageBackendSpec::Memory {
1008 enable_archive: true,
1009 },
1010 };
1011 let builder = DrasiLibBuilder::new()
1012 .add_storage_backend(b1)
1013 .add_storage_backend(b2);
1014 assert_eq!(builder.storage_backends.len(), 2);
1015 assert_eq!(builder.storage_backends[0].id, "mem1");
1016 assert_eq!(builder.storage_backends[1].id, "mem2");
1017 }
1018
1019 #[test]
1020 fn test_builder_default_values() {
1021 let builder = DrasiLibBuilder::new();
1022 assert_eq!(builder.server_id, None);
1023 assert_eq!(builder.priority_queue_capacity, None);
1024 assert_eq!(builder.dispatch_buffer_capacity, None);
1025 assert!(builder.storage_backends.is_empty());
1026 assert!(builder.query_configs.is_empty());
1027 assert!(builder.source_instances.is_empty());
1028 assert!(builder.reaction_instances.is_empty());
1029 assert!(builder.index_provider.is_none());
1030 assert!(builder.state_store_provider.is_none());
1031 }
1032
1033 #[test]
1034 fn test_builder_fluent_chaining() {
1035 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1036
1037 let backend = StorageBackendConfig {
1038 id: "mem".to_string(),
1039 spec: StorageBackendSpec::Memory {
1040 enable_archive: false,
1041 },
1042 };
1043 let q = Query::cypher("q1").query("MATCH (n) RETURN n").build();
1044
1045 let builder = DrasiLibBuilder::new()
1046 .with_id("chained")
1047 .with_priority_queue_capacity(20000)
1048 .with_dispatch_buffer_capacity(3000)
1049 .add_storage_backend(backend)
1050 .with_query(q);
1051
1052 assert_eq!(builder.server_id, Some("chained".to_string()));
1053 assert_eq!(builder.priority_queue_capacity, Some(20000));
1054 assert_eq!(builder.dispatch_buffer_capacity, Some(3000));
1055 assert_eq!(builder.storage_backends.len(), 1);
1056 assert_eq!(builder.query_configs.len(), 1);
1057 }
1058
1059 #[tokio::test]
1060 async fn test_builder_default_id_when_none_set() {
1061 let core = DrasiLibBuilder::new().build().await.unwrap();
1062 assert_eq!(core.get_config().id, "drasi-lib");
1063 }
1064
1065 #[tokio::test]
1066 async fn test_builder_with_storage_backend_builds_ok() {
1067 use crate::indexes::config::{StorageBackendConfig, StorageBackendSpec};
1068
1069 let backend = StorageBackendConfig {
1070 id: "test-mem".to_string(),
1071 spec: StorageBackendSpec::Memory {
1072 enable_archive: false,
1073 },
1074 };
1075 let core = DrasiLibBuilder::new()
1076 .add_storage_backend(backend)
1077 .build()
1078 .await;
1079 assert!(core.is_ok(), "Builder with storage backend should succeed");
1080 }
1081
1082 #[test]
1087 fn test_query_cypher_sets_id_and_language() {
1088 let q = Query::cypher("cypher-q");
1089 assert_eq!(q.id, "cypher-q");
1090 assert_eq!(q.query_language, QueryLanguage::Cypher);
1091 }
1092
1093 #[test]
1094 fn test_query_gql_sets_id_and_language() {
1095 let q = Query::gql("gql-q");
1096 assert_eq!(q.id, "gql-q");
1097 assert_eq!(q.query_language, QueryLanguage::GQL);
1098 }
1099
1100 #[test]
1101 fn test_query_from_source_adds_source() {
1102 let q = Query::cypher("q").from_source("src1");
1103 assert_eq!(q.sources.len(), 1);
1104 assert_eq!(q.sources[0].source_id, "src1");
1105 }
1106
1107 #[test]
1108 fn test_query_from_source_chaining() {
1109 let q = Query::cypher("q")
1110 .from_source("src1")
1111 .from_source("src2")
1112 .from_source("src3");
1113 assert_eq!(q.sources.len(), 3);
1114 assert_eq!(q.sources[0].source_id, "src1");
1115 assert_eq!(q.sources[1].source_id, "src2");
1116 assert_eq!(q.sources[2].source_id, "src3");
1117 }
1118
1119 #[test]
1120 fn test_query_auto_start_default_true() {
1121 let q = Query::cypher("q");
1122 assert!(q.auto_start);
1123 }
1124
1125 #[test]
1126 fn test_query_auto_start_false() {
1127 let q = Query::cypher("q").auto_start(false);
1128 assert!(!q.auto_start);
1129 }
1130
1131 #[test]
1132 fn test_query_enable_bootstrap_default_true() {
1133 let q = Query::cypher("q");
1134 assert!(q.enable_bootstrap);
1135 }
1136
1137 #[test]
1138 fn test_query_enable_bootstrap_false() {
1139 let q = Query::cypher("q").enable_bootstrap(false);
1140 assert!(!q.enable_bootstrap);
1141 }
1142
1143 #[test]
1144 fn test_query_bootstrap_buffer_size_default() {
1145 let q = Query::cypher("q");
1146 assert_eq!(q.bootstrap_buffer_size, 10000);
1147 }
1148
1149 #[test]
1150 fn test_query_with_bootstrap_buffer_size() {
1151 let q = Query::cypher("q").with_bootstrap_buffer_size(5000);
1152 assert_eq!(q.bootstrap_buffer_size, 5000);
1153 }
1154
1155 #[test]
1156 fn test_query_with_dispatch_mode_broadcast() {
1157 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Broadcast);
1158 assert_eq!(q.dispatch_mode, Some(DispatchMode::Broadcast));
1159 }
1160
1161 #[test]
1162 fn test_query_with_dispatch_mode_channel() {
1163 let q = Query::cypher("q").with_dispatch_mode(DispatchMode::Channel);
1164 assert_eq!(q.dispatch_mode, Some(DispatchMode::Channel));
1165 }
1166
1167 #[test]
1168 fn test_query_dispatch_mode_default_none() {
1169 let q = Query::cypher("q");
1170 assert_eq!(q.dispatch_mode, None);
1171 }
1172
1173 #[test]
1174 fn test_query_with_priority_queue_capacity() {
1175 let q = Query::cypher("q").with_priority_queue_capacity(50000);
1176 assert_eq!(q.priority_queue_capacity, Some(50000));
1177 }
1178
1179 #[test]
1180 fn test_query_priority_queue_capacity_default_none() {
1181 let q = Query::cypher("q");
1182 assert_eq!(q.priority_queue_capacity, None);
1183 }
1184
1185 #[test]
1186 fn test_query_with_dispatch_buffer_capacity() {
1187 let q = Query::cypher("q").with_dispatch_buffer_capacity(5000);
1188 assert_eq!(q.dispatch_buffer_capacity, Some(5000));
1189 }
1190
1191 #[test]
1192 fn test_query_dispatch_buffer_capacity_default_none() {
1193 let q = Query::cypher("q");
1194 assert_eq!(q.dispatch_buffer_capacity, None);
1195 }
1196
1197 #[test]
1198 fn test_query_build_propagates_all_fields() {
1199 let config = Query::cypher("full-query")
1200 .query("MATCH (n:Person) RETURN n.name")
1201 .from_source("source-a")
1202 .from_source("source-b")
1203 .auto_start(false)
1204 .enable_bootstrap(false)
1205 .with_bootstrap_buffer_size(5000)
1206 .with_priority_queue_capacity(50000)
1207 .with_dispatch_buffer_capacity(2500)
1208 .with_dispatch_mode(DispatchMode::Broadcast)
1209 .build();
1210
1211 assert_eq!(config.id, "full-query");
1212 assert_eq!(config.query, "MATCH (n:Person) RETURN n.name");
1213 assert_eq!(config.query_language, QueryLanguage::Cypher);
1214 assert_eq!(config.sources.len(), 2);
1215 assert_eq!(config.sources[0].source_id, "source-a");
1216 assert_eq!(config.sources[1].source_id, "source-b");
1217 assert!(!config.auto_start);
1218 assert!(!config.enable_bootstrap);
1219 assert_eq!(config.bootstrap_buffer_size, 5000);
1220 assert_eq!(config.priority_queue_capacity, Some(50000));
1221 assert_eq!(config.dispatch_buffer_capacity, Some(2500));
1222 assert_eq!(config.dispatch_mode, Some(DispatchMode::Broadcast));
1223 assert!(config.joins.is_none());
1224 assert!(config.middleware.is_empty());
1225 assert!(config.storage_backend.is_none());
1226 }
1227
1228 #[test]
1229 fn test_query_build_gql_propagates_language() {
1230 let config = Query::gql("gql-full")
1231 .query("MATCH (n) RETURN n")
1232 .from_source("src")
1233 .build();
1234
1235 assert_eq!(config.id, "gql-full");
1236 assert_eq!(config.query_language, QueryLanguage::GQL);
1237 assert_eq!(config.query, "MATCH (n) RETURN n");
1238 assert_eq!(config.sources.len(), 1);
1239 assert!(config.auto_start);
1241 assert!(config.enable_bootstrap);
1242 assert_eq!(config.bootstrap_buffer_size, 10000);
1243 }
1244
1245 #[test]
1246 fn test_query_build_defaults() {
1247 let config = Query::cypher("defaults-only").build();
1248
1249 assert_eq!(config.id, "defaults-only");
1250 assert_eq!(config.query, "");
1251 assert_eq!(config.query_language, QueryLanguage::Cypher);
1252 assert!(config.sources.is_empty());
1253 assert!(config.middleware.is_empty());
1254 assert!(config.auto_start);
1255 assert!(config.joins.is_none());
1256 assert!(config.enable_bootstrap);
1257 assert_eq!(config.bootstrap_buffer_size, 10000);
1258 assert_eq!(config.priority_queue_capacity, None);
1259 assert_eq!(config.dispatch_buffer_capacity, None);
1260 assert_eq!(config.dispatch_mode, None);
1261 assert!(config.storage_backend.is_none());
1262 }
1263}