1use crate::error::Result;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use uuid::Uuid;
15
16use super::{
17 KeywordQuery, KeywordSearchEngine, KeywordSearchResult, SimilarityQuery, SimilarityResult,
18 VectorSearchEngine,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct HybridSearchEngineConfig {
24 pub semantic_weight: f32,
26 pub keyword_weight: f32,
28 pub rrf_k: f32,
30 pub default_limit: usize,
32 pub min_score_threshold: f32,
34 pub over_fetch_multiplier: usize,
36}
37
38impl Default for HybridSearchEngineConfig {
39 fn default() -> Self {
40 Self {
41 semantic_weight: 0.5,
42 keyword_weight: 0.5,
43 rrf_k: 60.0,
44 default_limit: 10,
45 min_score_threshold: 0.0,
46 over_fetch_multiplier: 3,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
53pub enum SearchMode {
54 SemanticOnly,
56 KeywordOnly,
58 #[default]
60 Hybrid,
61}
62
63#[derive(Debug, Clone, Default, Serialize, Deserialize)]
65pub struct MetadataFilters {
66 pub event_type: Option<String>,
68 pub entity_id: Option<String>,
70 pub time_from: Option<DateTime<Utc>>,
72 pub time_to: Option<DateTime<Utc>>,
74}
75
76impl MetadataFilters {
77 pub fn new() -> Self {
78 Self::default()
79 }
80
81 pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
82 self.event_type = Some(event_type.into());
83 self
84 }
85
86 pub fn with_entity_id(mut self, entity_id: impl Into<String>) -> Self {
87 self.entity_id = Some(entity_id.into());
88 self
89 }
90
91 pub fn with_time_range(mut self, from: DateTime<Utc>, to: DateTime<Utc>) -> Self {
92 self.time_from = Some(from);
93 self.time_to = Some(to);
94 self
95 }
96
97 pub fn with_time_from(mut self, from: DateTime<Utc>) -> Self {
98 self.time_from = Some(from);
99 self
100 }
101
102 pub fn with_time_to(mut self, to: DateTime<Utc>) -> Self {
103 self.time_to = Some(to);
104 self
105 }
106
107 pub fn has_filters(&self) -> bool {
109 self.event_type.is_some()
110 || self.entity_id.is_some()
111 || self.time_from.is_some()
112 || self.time_to.is_some()
113 }
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct SearchQuery {
119 pub query: String,
121 pub limit: usize,
123 pub tenant_id: Option<String>,
125 pub mode: SearchMode,
127 pub filters: MetadataFilters,
129 pub min_similarity: Option<f32>,
131}
132
133impl SearchQuery {
134 pub fn new(query: impl Into<String>) -> Self {
135 Self {
136 query: query.into(),
137 limit: 10,
138 tenant_id: None,
139 mode: SearchMode::Hybrid,
140 filters: MetadataFilters::default(),
141 min_similarity: None,
142 }
143 }
144
145 pub fn with_limit(mut self, limit: usize) -> Self {
146 self.limit = limit;
147 self
148 }
149
150 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
151 self.tenant_id = Some(tenant_id.into());
152 self
153 }
154
155 pub fn with_mode(mut self, mode: SearchMode) -> Self {
156 self.mode = mode;
157 self
158 }
159
160 pub fn with_filters(mut self, filters: MetadataFilters) -> Self {
161 self.filters = filters;
162 self
163 }
164
165 pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
166 self.filters.event_type = Some(event_type.into());
167 self
168 }
169
170 pub fn with_entity_id(mut self, entity_id: impl Into<String>) -> Self {
171 self.filters.entity_id = Some(entity_id.into());
172 self
173 }
174
175 pub fn with_time_range(mut self, from: DateTime<Utc>, to: DateTime<Utc>) -> Self {
176 self.filters.time_from = Some(from);
177 self.filters.time_to = Some(to);
178 self
179 }
180
181 pub fn with_min_similarity(mut self, threshold: f32) -> Self {
182 self.min_similarity = Some(threshold);
183 self
184 }
185
186 pub fn semantic(query: impl Into<String>) -> Self {
188 Self::new(query).with_mode(SearchMode::SemanticOnly)
189 }
190
191 pub fn keyword(query: impl Into<String>) -> Self {
193 Self::new(query).with_mode(SearchMode::KeywordOnly)
194 }
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct HybridSearchResult {
200 pub event_id: Uuid,
202 pub score: f32,
204 pub semantic_score: Option<f32>,
206 pub keyword_score: Option<f32>,
208 pub event_type: Option<String>,
210 pub entity_id: Option<String>,
212 pub source_text: Option<String>,
214}
215
216#[derive(Debug, Clone, Default)]
218pub struct EventMetadata {
219 pub event_type: Option<String>,
220 pub entity_id: Option<String>,
221 pub timestamp: Option<DateTime<Utc>>,
222}
223
224pub struct HybridSearchEngine {
232 config: HybridSearchEngineConfig,
233 vector_engine: Arc<VectorSearchEngine>,
234 keyword_engine: Arc<KeywordSearchEngine>,
235 metadata_cache: parking_lot::RwLock<HashMap<Uuid, EventMetadata>>,
237 stats: parking_lot::RwLock<EngineStats>,
239}
240
241#[derive(Debug, Default, Clone)]
242struct EngineStats {
243 total_searches: u64,
244 semantic_searches: u64,
245 keyword_searches: u64,
246 hybrid_searches: u64,
247}
248
249impl HybridSearchEngine {
250 pub fn new(
252 vector_engine: Arc<VectorSearchEngine>,
253 keyword_engine: Arc<KeywordSearchEngine>,
254 ) -> Self {
255 Self::with_config(
256 vector_engine,
257 keyword_engine,
258 HybridSearchEngineConfig::default(),
259 )
260 }
261
262 pub fn with_config(
264 vector_engine: Arc<VectorSearchEngine>,
265 keyword_engine: Arc<KeywordSearchEngine>,
266 config: HybridSearchEngineConfig,
267 ) -> Self {
268 Self {
269 config,
270 vector_engine,
271 keyword_engine,
272 metadata_cache: parking_lot::RwLock::new(HashMap::new()),
273 stats: parking_lot::RwLock::new(EngineStats::default()),
274 }
275 }
276
277 pub fn config(&self) -> &HybridSearchEngineConfig {
279 &self.config
280 }
281
282 pub fn store_metadata(&self, event_id: Uuid, metadata: EventMetadata) {
284 let mut cache = self.metadata_cache.write();
285 cache.insert(event_id, metadata);
286 }
287
288 #[cfg(any(feature = "vector-search", feature = "keyword-search"))]
293 pub async fn index_event(
294 &self,
295 event_id: Uuid,
296 tenant_id: &str,
297 event_type: &str,
298 entity_id: Option<&str>,
299 payload: &serde_json::Value,
300 timestamp: DateTime<Utc>,
301 ) -> Result<()> {
302 self.store_metadata(
304 event_id,
305 EventMetadata {
306 event_type: Some(event_type.to_string()),
307 entity_id: entity_id.map(|s| s.to_string()),
308 timestamp: Some(timestamp),
309 },
310 );
311
312 #[cfg(feature = "keyword-search")]
314 self.keyword_engine
315 .index_event(event_id, tenant_id, event_type, entity_id, payload)?;
316
317 #[cfg(feature = "vector-search")]
319 {
320 let embedding = self.vector_engine.embed_event(payload)?;
321 let source_text = extract_source_text(payload);
322 self.vector_engine
323 .index_event(event_id, tenant_id, embedding, source_text)
324 .await?;
325 }
326
327 Ok(())
328 }
329
330 #[cfg(not(any(feature = "vector-search", feature = "keyword-search")))]
332 pub async fn index_event(
333 &self,
334 event_id: Uuid,
335 _tenant_id: &str,
336 event_type: &str,
337 entity_id: Option<&str>,
338 _payload: &serde_json::Value,
339 timestamp: DateTime<Utc>,
340 ) -> Result<()> {
341 self.store_metadata(
343 event_id,
344 EventMetadata {
345 event_type: Some(event_type.to_string()),
346 entity_id: entity_id.map(|s| s.to_string()),
347 timestamp: Some(timestamp),
348 },
349 );
350 Ok(())
351 }
352
353 #[cfg(feature = "keyword-search")]
355 pub fn commit(&self) -> Result<()> {
356 self.keyword_engine.commit()
357 }
358
359 #[cfg(not(feature = "keyword-search"))]
360 pub fn commit(&self) -> Result<()> {
361 Ok(())
362 }
363
364 pub fn search(&self, query: &SearchQuery) -> Result<Vec<HybridSearchResult>> {
373 {
375 let mut stats = self.stats.write();
376 stats.total_searches += 1;
377 match query.mode {
378 SearchMode::SemanticOnly => stats.semantic_searches += 1,
379 SearchMode::KeywordOnly => stats.keyword_searches += 1,
380 SearchMode::Hybrid => stats.hybrid_searches += 1,
381 }
382 }
383
384 let fetch_limit = if query.filters.has_filters() {
386 query.limit * self.config.over_fetch_multiplier
387 } else {
388 query.limit
389 };
390
391 match query.mode {
392 SearchMode::SemanticOnly => self.search_semantic_only(query, fetch_limit),
393 SearchMode::KeywordOnly => self.search_keyword_only(query, fetch_limit),
394 SearchMode::Hybrid => self.search_hybrid(query, fetch_limit),
395 }
396 }
397
398 fn search_semantic_only(
400 &self,
401 query: &SearchQuery,
402 fetch_limit: usize,
403 ) -> Result<Vec<HybridSearchResult>> {
404 let semantic_results = self.run_semantic_search(query, fetch_limit)?;
405
406 let mut results: Vec<HybridSearchResult> = semantic_results
407 .into_iter()
408 .map(|r| HybridSearchResult {
409 event_id: r.event_id,
410 score: r.score,
411 semantic_score: Some(r.score),
412 keyword_score: None,
413 event_type: self.get_cached_event_type(r.event_id),
414 entity_id: self.get_cached_entity_id(r.event_id),
415 source_text: r.source_text,
416 })
417 .collect();
418
419 self.apply_filters(&mut results, &query.filters);
421
422 results.retain(|r| r.score >= self.config.min_score_threshold);
424
425 results.truncate(query.limit);
427
428 Ok(results)
429 }
430
431 fn search_keyword_only(
433 &self,
434 query: &SearchQuery,
435 fetch_limit: usize,
436 ) -> Result<Vec<HybridSearchResult>> {
437 let keyword_results = self.run_keyword_search(query, fetch_limit)?;
438
439 let max_score = keyword_results
441 .iter()
442 .map(|r| r.score)
443 .fold(0.0_f32, f32::max);
444
445 let mut results: Vec<HybridSearchResult> = keyword_results
446 .into_iter()
447 .map(|r| {
448 let normalized_score = if max_score > 0.0 {
449 r.score / max_score
450 } else {
451 0.0
452 };
453 HybridSearchResult {
454 event_id: r.event_id,
455 score: normalized_score,
456 semantic_score: None,
457 keyword_score: Some(r.score),
458 event_type: Some(r.event_type),
459 entity_id: r.entity_id,
460 source_text: None,
461 }
462 })
463 .collect();
464
465 self.apply_filters(&mut results, &query.filters);
467
468 results.retain(|r| r.score >= self.config.min_score_threshold);
470
471 results.truncate(query.limit);
473
474 Ok(results)
475 }
476
477 fn search_hybrid(
479 &self,
480 query: &SearchQuery,
481 fetch_limit: usize,
482 ) -> Result<Vec<HybridSearchResult>> {
483 let semantic_results = self.run_semantic_search(query, fetch_limit)?;
485 let keyword_results = self.run_keyword_search(query, fetch_limit)?;
486
487 let mut combined_scores: HashMap<Uuid, HybridSearchResult> = HashMap::new();
489
490 for (rank, result) in semantic_results.into_iter().enumerate() {
492 let rrf_score = 1.0 / (self.config.rrf_k + rank as f32 + 1.0);
493 let weighted_score = rrf_score * self.config.semantic_weight;
494
495 combined_scores.insert(
496 result.event_id,
497 HybridSearchResult {
498 event_id: result.event_id,
499 score: weighted_score,
500 semantic_score: Some(result.score),
501 keyword_score: None,
502 event_type: self.get_cached_event_type(result.event_id),
503 entity_id: self.get_cached_entity_id(result.event_id),
504 source_text: result.source_text,
505 },
506 );
507 }
508
509 for (rank, result) in keyword_results.into_iter().enumerate() {
511 let rrf_score = 1.0 / (self.config.rrf_k + rank as f32 + 1.0);
512 let weighted_score = rrf_score * self.config.keyword_weight;
513
514 if let Some(existing) = combined_scores.get_mut(&result.event_id) {
515 existing.score += weighted_score;
517 existing.keyword_score = Some(result.score);
518 existing.event_type = Some(result.event_type);
520 existing.entity_id = result.entity_id;
521 } else {
522 combined_scores.insert(
523 result.event_id,
524 HybridSearchResult {
525 event_id: result.event_id,
526 score: weighted_score,
527 semantic_score: None,
528 keyword_score: Some(result.score),
529 event_type: Some(result.event_type),
530 entity_id: result.entity_id,
531 source_text: None,
532 },
533 );
534 }
535 }
536
537 let mut results: Vec<HybridSearchResult> = combined_scores.into_values().collect();
539 results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
540
541 self.apply_filters(&mut results, &query.filters);
543
544 results.retain(|r| r.score >= self.config.min_score_threshold);
546
547 results.truncate(query.limit);
549
550 Ok(results)
551 }
552
553 fn run_semantic_search(
555 &self,
556 query: &SearchQuery,
557 limit: usize,
558 ) -> Result<Vec<SimilarityResult>> {
559 let query_embedding = self.vector_engine.embed_text(&query.query)?;
561
562 let similarity_query = SimilarityQuery::new(query_embedding, limit)
563 .with_min_similarity(query.min_similarity.unwrap_or(0.0));
564
565 let similarity_query = if let Some(ref tenant_id) = query.tenant_id {
566 similarity_query.with_tenant(tenant_id.clone())
567 } else {
568 similarity_query
569 };
570
571 self.vector_engine.search_similar(&similarity_query)
572 }
573
574 fn run_keyword_search(
576 &self,
577 query: &SearchQuery,
578 limit: usize,
579 ) -> Result<Vec<KeywordSearchResult>> {
580 let keyword_query = KeywordQuery::new(&query.query).with_limit(limit);
581
582 let keyword_query = if let Some(ref tenant_id) = query.tenant_id {
583 keyword_query.with_tenant(tenant_id)
584 } else {
585 keyword_query
586 };
587
588 self.keyword_engine.search_keywords(&keyword_query)
589 }
590
591 fn apply_filters(&self, results: &mut Vec<HybridSearchResult>, filters: &MetadataFilters) {
593 if !filters.has_filters() {
594 return;
595 }
596
597 let metadata_cache = self.metadata_cache.read();
598
599 results.retain(|result| {
600 let cached = metadata_cache.get(&result.event_id);
602
603 if let Some(ref filter_type) = filters.event_type {
605 let event_type = result
606 .event_type
607 .as_ref()
608 .or_else(|| cached.and_then(|m| m.event_type.as_ref()));
609 match event_type {
610 Some(t) if t == filter_type => {}
611 _ => return false,
612 }
613 }
614
615 if let Some(ref filter_entity) = filters.entity_id {
617 let entity_id = result
618 .entity_id
619 .as_ref()
620 .or_else(|| cached.and_then(|m| m.entity_id.as_ref()));
621 match entity_id {
622 Some(e) if e == filter_entity => {}
623 _ => return false,
624 }
625 }
626
627 if filters.time_from.is_some() || filters.time_to.is_some() {
629 if let Some(timestamp) = cached.and_then(|m| m.timestamp) {
630 if let Some(from) = filters.time_from {
631 if timestamp < from {
632 return false;
633 }
634 }
635 if let Some(to) = filters.time_to {
636 if timestamp > to {
637 return false;
638 }
639 }
640 } else {
641 return false;
643 }
644 }
645
646 true
647 });
648 }
649
650 fn get_cached_event_type(&self, event_id: Uuid) -> Option<String> {
652 self.metadata_cache
653 .read()
654 .get(&event_id)
655 .and_then(|m| m.event_type.clone())
656 }
657
658 fn get_cached_entity_id(&self, event_id: Uuid) -> Option<String> {
660 self.metadata_cache
661 .read()
662 .get(&event_id)
663 .and_then(|m| m.entity_id.clone())
664 }
665
666 pub fn stats(&self) -> (u64, u64, u64, u64) {
668 let stats = self.stats.read();
669 (
670 stats.total_searches,
671 stats.semantic_searches,
672 stats.keyword_searches,
673 stats.hybrid_searches,
674 )
675 }
676
677 pub fn health_check(&self) -> Result<()> {
679 self.vector_engine.health_check()?;
680 self.keyword_engine.health_check()?;
681 Ok(())
682 }
683
684 pub fn cached_metadata_count(&self) -> usize {
686 self.metadata_cache.read().len()
687 }
688
689 pub fn clear_metadata_cache(&self) {
691 self.metadata_cache.write().clear();
692 }
693}
694
695#[cfg(feature = "vector-search")]
697fn extract_source_text(payload: &serde_json::Value) -> Option<String> {
698 let priority_fields = ["content", "text", "body", "message", "description", "title", "name"];
699
700 if let serde_json::Value::Object(map) = payload {
701 for field in priority_fields {
702 if let Some(serde_json::Value::String(s)) = map.get(field) {
703 if !s.is_empty() {
704 return Some(s.clone());
705 }
706 }
707 }
708 }
709
710 Some(payload.to_string())
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717
718 fn create_test_config() -> HybridSearchEngineConfig {
719 HybridSearchEngineConfig {
720 semantic_weight: 0.5,
721 keyword_weight: 0.5,
722 rrf_k: 60.0,
723 default_limit: 10,
724 min_score_threshold: 0.0,
725 over_fetch_multiplier: 3,
726 }
727 }
728
729 #[test]
730 fn test_config_default() {
731 let config = HybridSearchEngineConfig::default();
732 assert_eq!(config.semantic_weight, 0.5);
733 assert_eq!(config.keyword_weight, 0.5);
734 assert_eq!(config.rrf_k, 60.0);
735 assert_eq!(config.default_limit, 10);
736 }
737
738 #[test]
739 fn test_search_query_builder() {
740 let query = SearchQuery::new("test query")
741 .with_limit(20)
742 .with_tenant("tenant-1")
743 .with_mode(SearchMode::Hybrid)
744 .with_event_type("UserCreated")
745 .with_entity_id("user-123")
746 .with_min_similarity(0.7);
747
748 assert_eq!(query.query, "test query");
749 assert_eq!(query.limit, 20);
750 assert_eq!(query.tenant_id, Some("tenant-1".to_string()));
751 assert_eq!(query.mode, SearchMode::Hybrid);
752 assert_eq!(query.filters.event_type, Some("UserCreated".to_string()));
753 assert_eq!(query.filters.entity_id, Some("user-123".to_string()));
754 assert_eq!(query.min_similarity, Some(0.7));
755 }
756
757 #[test]
758 fn test_search_query_semantic_constructor() {
759 let query = SearchQuery::semantic("natural language query");
760 assert_eq!(query.mode, SearchMode::SemanticOnly);
761 }
762
763 #[test]
764 fn test_search_query_keyword_constructor() {
765 let query = SearchQuery::keyword("keyword search");
766 assert_eq!(query.mode, SearchMode::KeywordOnly);
767 }
768
769 #[test]
770 fn test_metadata_filters_builder() {
771 let now = Utc::now();
772 let past = now - chrono::Duration::hours(1);
773
774 let filters = MetadataFilters::new()
775 .with_event_type("OrderPlaced")
776 .with_entity_id("order-456")
777 .with_time_range(past, now);
778
779 assert_eq!(filters.event_type, Some("OrderPlaced".to_string()));
780 assert_eq!(filters.entity_id, Some("order-456".to_string()));
781 assert!(filters.time_from.is_some());
782 assert!(filters.time_to.is_some());
783 assert!(filters.has_filters());
784 }
785
786 #[test]
787 fn test_metadata_filters_has_filters() {
788 let empty = MetadataFilters::new();
789 assert!(!empty.has_filters());
790
791 let with_type = MetadataFilters::new().with_event_type("Test");
792 assert!(with_type.has_filters());
793
794 let with_entity = MetadataFilters::new().with_entity_id("e1");
795 assert!(with_entity.has_filters());
796
797 let with_time = MetadataFilters::new().with_time_from(Utc::now());
798 assert!(with_time.has_filters());
799 }
800
801 #[test]
802 fn test_search_mode_default() {
803 let mode: SearchMode = Default::default();
804 assert_eq!(mode, SearchMode::Hybrid);
805 }
806
807 #[test]
808 fn test_hybrid_search_result_structure() {
809 let result = HybridSearchResult {
810 event_id: Uuid::new_v4(),
811 score: 0.85,
812 semantic_score: Some(0.9),
813 keyword_score: Some(0.8),
814 event_type: Some("UserCreated".to_string()),
815 entity_id: Some("user-123".to_string()),
816 source_text: Some("Test content".to_string()),
817 };
818
819 assert!(result.score > 0.0);
820 assert!(result.semantic_score.is_some());
821 assert!(result.keyword_score.is_some());
822 }
823
824 #[test]
825 fn test_event_metadata_default() {
826 let metadata = EventMetadata::default();
827 assert!(metadata.event_type.is_none());
828 assert!(metadata.entity_id.is_none());
829 assert!(metadata.timestamp.is_none());
830 }
831
832 #[test]
833 fn test_rrf_score_calculation() {
834 let k = 60.0_f32;
836
837 let score_rank_0 = 1.0 / (k + 0.0 + 1.0);
839 assert!((score_rank_0 - 0.01639344).abs() < 0.0001);
840
841 let score_rank_1 = 1.0 / (k + 1.0 + 1.0);
843 assert!((score_rank_1 - 0.01612903).abs() < 0.0001);
844
845 assert!(score_rank_0 > score_rank_1);
847 }
848
849 #[test]
850 fn test_config_weights_validation() {
851 let config = create_test_config();
852
853 assert!((config.semantic_weight + config.keyword_weight - 1.0).abs() < f32::EPSILON);
855 }
856
857 #[test]
858 fn test_search_query_with_time_range() {
859 let now = Utc::now();
860 let past = now - chrono::Duration::days(7);
861
862 let query = SearchQuery::new("test").with_time_range(past, now);
863
864 assert_eq!(query.filters.time_from, Some(past));
865 assert_eq!(query.filters.time_to, Some(now));
866 }
867
868 #[test]
869 fn test_search_query_serialization() {
870 let query = SearchQuery::new("test query")
871 .with_limit(5)
872 .with_tenant("tenant-1")
873 .with_mode(SearchMode::Hybrid);
874
875 let json = serde_json::to_string(&query);
877 assert!(json.is_ok());
878
879 let deserialized: std::result::Result<SearchQuery, _> =
881 serde_json::from_str(&json.unwrap());
882 assert!(deserialized.is_ok());
883
884 let deserialized = deserialized.unwrap();
885 assert_eq!(deserialized.query, "test query");
886 assert_eq!(deserialized.limit, 5);
887 }
888
889 #[test]
890 fn test_hybrid_search_result_serialization() {
891 let result = HybridSearchResult {
892 event_id: Uuid::new_v4(),
893 score: 0.85,
894 semantic_score: Some(0.9),
895 keyword_score: Some(0.8),
896 event_type: Some("Test".to_string()),
897 entity_id: None,
898 source_text: None,
899 };
900
901 let json = serde_json::to_string(&result);
902 assert!(json.is_ok());
903 }
904}
905
906#[cfg(test)]
908#[cfg(all(feature = "vector-search", feature = "keyword-search"))]
909mod integration_tests {
910 use super::*;
911 use crate::infrastructure::search::{
912 KeywordSearchEngineConfig, VectorSearchEngineConfig,
913 };
914
915 fn create_test_engines() -> (Arc<VectorSearchEngine>, Arc<KeywordSearchEngine>) {
916 let vector_engine = Arc::new(
917 VectorSearchEngine::with_config(VectorSearchEngineConfig {
918 default_similarity_threshold: 0.0,
919 ..Default::default()
920 })
921 .unwrap(),
922 );
923
924 let keyword_engine = Arc::new(
925 KeywordSearchEngine::with_config(KeywordSearchEngineConfig {
926 auto_commit: true,
927 ..Default::default()
928 })
929 .unwrap(),
930 );
931
932 (vector_engine, keyword_engine)
933 }
934
935 #[tokio::test]
936 async fn test_hybrid_search_integration() {
937 let (vector_engine, keyword_engine) = create_test_engines();
938 let hybrid_engine = HybridSearchEngine::new(vector_engine, keyword_engine);
939
940 let id1 = Uuid::new_v4();
941 let id2 = Uuid::new_v4();
942
943 hybrid_engine
945 .index_event(
946 id1,
947 "tenant-1",
948 "UserCreated",
949 Some("user-123"),
950 &serde_json::json!({"name": "Alice", "email": "alice@example.com"}),
951 Utc::now(),
952 )
953 .await
954 .unwrap();
955
956 hybrid_engine
957 .index_event(
958 id2,
959 "tenant-1",
960 "OrderPlaced",
961 Some("order-456"),
962 &serde_json::json!({"product": "Widget", "quantity": 5}),
963 Utc::now(),
964 )
965 .await
966 .unwrap();
967
968 let query = SearchQuery::new("Alice user").with_tenant("tenant-1");
970 let results = hybrid_engine.search(&query).unwrap();
971
972 assert!(!results.is_empty());
973 assert_eq!(results[0].event_id, id1);
975 }
976
977 #[tokio::test]
978 async fn test_search_with_event_type_filter() {
979 let (vector_engine, keyword_engine) = create_test_engines();
980 let hybrid_engine = HybridSearchEngine::new(vector_engine, keyword_engine);
981
982 let id1 = Uuid::new_v4();
983 let id2 = Uuid::new_v4();
984
985 hybrid_engine
986 .index_event(
987 id1,
988 "tenant-1",
989 "UserCreated",
990 None,
991 &serde_json::json!({"data": "test user"}),
992 Utc::now(),
993 )
994 .await
995 .unwrap();
996
997 hybrid_engine
998 .index_event(
999 id2,
1000 "tenant-1",
1001 "OrderPlaced",
1002 None,
1003 &serde_json::json!({"data": "test order"}),
1004 Utc::now(),
1005 )
1006 .await
1007 .unwrap();
1008
1009 let query = SearchQuery::new("test")
1011 .with_tenant("tenant-1")
1012 .with_event_type("UserCreated");
1013 let results = hybrid_engine.search(&query).unwrap();
1014
1015 assert_eq!(results.len(), 1);
1016 assert_eq!(results[0].event_type, Some("UserCreated".to_string()));
1017 }
1018
1019 #[tokio::test]
1020 async fn test_search_with_time_range_filter() {
1021 let (vector_engine, keyword_engine) = create_test_engines();
1022 let hybrid_engine = HybridSearchEngine::new(vector_engine, keyword_engine);
1023
1024 let now = Utc::now();
1025 let past = now - chrono::Duration::hours(2);
1026 let recent = now - chrono::Duration::minutes(30);
1027
1028 let id1 = Uuid::new_v4();
1029 let id2 = Uuid::new_v4();
1030
1031 hybrid_engine
1033 .index_event(
1034 id1,
1035 "tenant-1",
1036 "Event",
1037 None,
1038 &serde_json::json!({"data": "old event"}),
1039 past,
1040 )
1041 .await
1042 .unwrap();
1043
1044 hybrid_engine
1046 .index_event(
1047 id2,
1048 "tenant-1",
1049 "Event",
1050 None,
1051 &serde_json::json!({"data": "recent event"}),
1052 recent,
1053 )
1054 .await
1055 .unwrap();
1056
1057 let one_hour_ago = now - chrono::Duration::hours(1);
1059 let query = SearchQuery::new("event")
1060 .with_tenant("tenant-1")
1061 .with_time_range(one_hour_ago, now);
1062 let results = hybrid_engine.search(&query).unwrap();
1063
1064 assert_eq!(results.len(), 1);
1065 assert_eq!(results[0].event_id, id2);
1066 }
1067
1068 #[test]
1069 fn test_health_check() {
1070 let (vector_engine, keyword_engine) = create_test_engines();
1071 let hybrid_engine = HybridSearchEngine::new(vector_engine, keyword_engine);
1072 assert!(hybrid_engine.health_check().is_ok());
1073 }
1074
1075 #[test]
1076 fn test_stats() {
1077 let (vector_engine, keyword_engine) = create_test_engines();
1078 let hybrid_engine = HybridSearchEngine::new(vector_engine, keyword_engine);
1079
1080 let (total, semantic, keyword, hybrid) = hybrid_engine.stats();
1081 assert_eq!(total, 0);
1082 assert_eq!(semantic, 0);
1083 assert_eq!(keyword, 0);
1084 assert_eq!(hybrid, 0);
1085 }
1086}