1use crate::{
2 application::{
3 dto::EventDto,
4 services::{SemanticSearchRequest, VectorSearchService},
5 },
6 domain::{repositories::EventRepository, value_objects::EmbeddingVector},
7 error::{AllSourceError, Result},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use uuid::Uuid;
12
13pub struct SemanticSearchUseCase {
23 vector_service: Arc<VectorSearchService>,
24 event_repository: Arc<dyn EventRepository>,
25}
26
27impl SemanticSearchUseCase {
28 pub fn new(
29 vector_service: Arc<VectorSearchService>,
30 event_repository: Arc<dyn EventRepository>,
31 ) -> Self {
32 Self {
33 vector_service,
34 event_repository,
35 }
36 }
37
38 pub async fn execute(
40 &self,
41 request: SemanticSearchUseCaseRequest,
42 ) -> Result<SemanticSearchUseCaseResponse> {
43 let embedding = request.query_embedding.ok_or_else(|| {
45 AllSourceError::InvalidInput("query_embedding is required".to_string())
46 })?;
47
48 if embedding.is_empty() {
49 return Err(AllSourceError::InvalidInput(
50 "query_embedding cannot be empty".to_string(),
51 ));
52 }
53
54 let k = request.k.unwrap_or(10);
56 if k == 0 {
57 return Err(AllSourceError::InvalidInput(
58 "k must be greater than 0".to_string(),
59 ));
60 }
61 if k > 1000 {
62 return Err(AllSourceError::InvalidInput(
63 "k cannot exceed 1000".to_string(),
64 ));
65 }
66
67 let search_request = SemanticSearchRequest {
69 query_embedding: Some(embedding),
70 k: Some(k),
71 tenant_id: request.tenant_id.clone(),
72 event_type: request.event_type.clone(),
73 min_similarity: request.min_similarity,
74 max_distance: request.max_distance,
75 metric: request.metric.clone(),
76 include_events: request.include_events.unwrap_or(false),
77 };
78
79 let search_response = self.vector_service.search(search_request).await?;
81
82 let events = if request.include_events.unwrap_or(false) {
84 let mut events = Vec::with_capacity(search_response.results.len());
85 for result in &search_response.results {
86 if let Some(event) = self.event_repository.find_by_id(result.event_id).await? {
87 events.push(EventDto::from(&event));
88 }
89 }
90 Some(events)
91 } else {
92 None
93 };
94
95 Ok(SemanticSearchUseCaseResponse {
96 results: search_response
97 .results
98 .into_iter()
99 .map(|r| SemanticSearchResultDto {
100 event_id: r.event_id,
101 score: r.score,
102 source_text: r.source_text,
103 })
104 .collect(),
105 events,
106 count: search_response.count,
107 metric: search_response.metric,
108 vectors_searched: search_response.stats.vectors_searched,
109 search_time_us: search_response.stats.search_time_us,
110 })
111 }
112
113 pub async fn find_similar(
115 &self,
116 event_id: Uuid,
117 k: usize,
118 tenant_id: Option<String>,
119 ) -> Result<SemanticSearchUseCaseResponse> {
120 let entry = self
122 .vector_service
123 .get_embedding(event_id)
124 .await?
125 .ok_or_else(|| {
126 AllSourceError::EventNotFound(format!("No embedding found for event {event_id}"))
127 })?;
128
129 let search_request = SemanticSearchRequest {
131 query_embedding: Some(entry.embedding.values().to_vec()),
132 k: Some(k + 1), tenant_id,
134 event_type: None,
135 min_similarity: None,
136 max_distance: None,
137 metric: None,
138 include_events: false,
139 };
140
141 let mut response = self.vector_service.search(search_request).await?;
142
143 response.results.retain(|r| r.event_id != event_id);
145 response.results.truncate(k);
146 response.count = response.results.len();
147
148 Ok(SemanticSearchUseCaseResponse {
149 results: response
150 .results
151 .into_iter()
152 .map(|r| SemanticSearchResultDto {
153 event_id: r.event_id,
154 score: r.score,
155 source_text: r.source_text,
156 })
157 .collect(),
158 events: None,
159 count: response.count,
160 metric: response.metric,
161 vectors_searched: response.stats.vectors_searched,
162 search_time_us: response.stats.search_time_us,
163 })
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct SemanticSearchUseCaseRequest {
170 pub query_embedding: Option<Vec<f32>>,
172 pub k: Option<usize>,
174 pub tenant_id: Option<String>,
176 pub event_type: Option<String>,
178 pub min_similarity: Option<f32>,
180 pub max_distance: Option<f32>,
182 pub metric: Option<String>,
184 pub include_events: Option<bool>,
186}
187
188impl Default for SemanticSearchUseCaseRequest {
189 fn default() -> Self {
190 Self {
191 query_embedding: None,
192 k: Some(10),
193 tenant_id: None,
194 event_type: None,
195 min_similarity: None,
196 max_distance: None,
197 metric: None,
198 include_events: None,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct SemanticSearchResultDto {
206 pub event_id: Uuid,
207 pub score: f32,
208 pub source_text: Option<String>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct SemanticSearchUseCaseResponse {
214 pub results: Vec<SemanticSearchResultDto>,
216 pub events: Option<Vec<EventDto>>,
218 pub count: usize,
220 pub metric: String,
222 pub vectors_searched: usize,
224 pub search_time_us: u64,
226}
227
228pub struct IndexEventEmbeddingUseCase {
232 vector_service: Arc<VectorSearchService>,
233}
234
235impl IndexEventEmbeddingUseCase {
236 pub fn new(vector_service: Arc<VectorSearchService>) -> Self {
237 Self { vector_service }
238 }
239
240 pub async fn execute(&self, request: IndexEventEmbeddingRequest) -> Result<()> {
242 let embedding = EmbeddingVector::new(request.embedding)?;
244
245 self.vector_service
247 .index_event(crate::application::services::IndexEventRequest {
248 event_id: request.event_id,
249 tenant_id: request.tenant_id,
250 embedding,
251 source_text: request.source_text,
252 })
253 .await
254 }
255
256 pub async fn execute_batch(
258 &self,
259 requests: Vec<IndexEventEmbeddingRequest>,
260 ) -> Result<BatchIndexResponse> {
261 let mut indexed = 0;
262 let mut failed = 0;
263 let mut errors = Vec::new();
264
265 for request in requests {
266 match EmbeddingVector::new(request.embedding) {
267 Ok(embedding) => {
268 match self
269 .vector_service
270 .index_event(crate::application::services::IndexEventRequest {
271 event_id: request.event_id,
272 tenant_id: request.tenant_id,
273 embedding,
274 source_text: request.source_text,
275 })
276 .await
277 {
278 Ok(()) => indexed += 1,
279 Err(e) => {
280 failed += 1;
281 errors.push(format!("Event {}: {}", request.event_id, e));
282 }
283 }
284 }
285 Err(e) => {
286 failed += 1;
287 errors.push(format!("Event {}: {}", request.event_id, e));
288 }
289 }
290 }
291
292 Ok(BatchIndexResponse {
293 indexed,
294 failed,
295 errors,
296 })
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct IndexEventEmbeddingRequest {
303 pub event_id: Uuid,
304 pub tenant_id: String,
305 pub embedding: Vec<f32>,
306 pub source_text: Option<String>,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct BatchIndexResponse {
312 pub indexed: usize,
313 pub failed: usize,
314 pub errors: Vec<String>,
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use crate::{
321 domain::entities::Event, infrastructure::repositories::InMemoryVectorSearchRepository,
322 };
323 use async_trait::async_trait;
324 use chrono::Utc;
325 use serde_json::json;
326
327 struct MockEventRepository {
329 events: Vec<Event>,
330 }
331
332 impl MockEventRepository {
333 fn with_events(events: Vec<Event>) -> Self {
334 Self { events }
335 }
336 }
337
338 #[async_trait]
339 impl EventRepository for MockEventRepository {
340 async fn save(&self, _event: &Event) -> Result<()> {
341 Ok(())
342 }
343
344 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
345 Ok(())
346 }
347
348 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
349 Ok(self.events.iter().find(|e| e.id() == id).cloned())
350 }
351
352 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
353 Ok(self
354 .events
355 .iter()
356 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
357 .cloned()
358 .collect())
359 }
360
361 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
362 Ok(self
363 .events
364 .iter()
365 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
366 .cloned()
367 .collect())
368 }
369
370 async fn find_by_time_range(
371 &self,
372 tenant_id: &str,
373 start: chrono::DateTime<Utc>,
374 end: chrono::DateTime<Utc>,
375 ) -> Result<Vec<Event>> {
376 Ok(self
377 .events
378 .iter()
379 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
380 .cloned()
381 .collect())
382 }
383
384 async fn find_by_entity_as_of(
385 &self,
386 entity_id: &str,
387 tenant_id: &str,
388 as_of: chrono::DateTime<Utc>,
389 ) -> Result<Vec<Event>> {
390 Ok(self
391 .events
392 .iter()
393 .filter(|e| {
394 e.entity_id_str() == entity_id
395 && e.tenant_id_str() == tenant_id
396 && e.occurred_before(as_of)
397 })
398 .cloned()
399 .collect())
400 }
401
402 async fn count(&self, tenant_id: &str) -> Result<usize> {
403 Ok(self
404 .events
405 .iter()
406 .filter(|e| e.tenant_id_str() == tenant_id)
407 .count())
408 }
409
410 async fn health_check(&self) -> Result<()> {
411 Ok(())
412 }
413 }
414
415 fn create_test_use_case() -> (SemanticSearchUseCase, Arc<VectorSearchService>) {
416 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
417 let vector_service = Arc::new(VectorSearchService::new(vector_repo));
418
419 let events = vec![
420 Event::from_strings(
421 "user.created".to_string(),
422 "user-1".to_string(),
423 "tenant-1".to_string(),
424 json!({"name": "Test"}),
425 None,
426 )
427 .unwrap(),
428 ];
429
430 let event_repo = Arc::new(MockEventRepository::with_events(events));
431
432 (
433 SemanticSearchUseCase::new(vector_service.clone(), event_repo),
434 vector_service,
435 )
436 }
437
438 #[tokio::test]
439 async fn test_semantic_search() {
440 let (use_case, vector_service) = create_test_use_case();
441
442 let id1 = Uuid::new_v4();
444 let id2 = Uuid::new_v4();
445
446 vector_service
447 .index_event(crate::application::services::IndexEventRequest {
448 event_id: id1,
449 tenant_id: "tenant-1".to_string(),
450 embedding: EmbeddingVector::new(vec![1.0, 0.0, 0.0]).unwrap(),
451 source_text: Some("first document".to_string()),
452 })
453 .await
454 .unwrap();
455
456 vector_service
457 .index_event(crate::application::services::IndexEventRequest {
458 event_id: id2,
459 tenant_id: "tenant-1".to_string(),
460 embedding: EmbeddingVector::new(vec![0.0, 1.0, 0.0]).unwrap(),
461 source_text: Some("second document".to_string()),
462 })
463 .await
464 .unwrap();
465
466 let response = use_case
468 .execute(SemanticSearchUseCaseRequest {
469 query_embedding: Some(vec![1.0, 0.0, 0.0]),
470 k: Some(2),
471 tenant_id: Some("tenant-1".to_string()),
472 ..Default::default()
473 })
474 .await
475 .unwrap();
476
477 assert_eq!(response.count, 2);
478 assert_eq!(response.results[0].event_id, id1);
479 assert!((response.results[0].score - 1.0).abs() < 1e-6);
480 }
481
482 #[tokio::test]
483 async fn test_find_similar() {
484 let (use_case, vector_service) = create_test_use_case();
485
486 let id1 = Uuid::new_v4();
488 let id2 = Uuid::new_v4();
489 let id3 = Uuid::new_v4();
490
491 vector_service
492 .index_event(crate::application::services::IndexEventRequest {
493 event_id: id1,
494 tenant_id: "tenant-1".to_string(),
495 embedding: EmbeddingVector::new(vec![1.0, 0.0, 0.0]).unwrap(),
496 source_text: None,
497 })
498 .await
499 .unwrap();
500
501 vector_service
502 .index_event(crate::application::services::IndexEventRequest {
503 event_id: id2,
504 tenant_id: "tenant-1".to_string(),
505 embedding: EmbeddingVector::new(vec![0.9, 0.1, 0.0]).unwrap(),
506 source_text: None,
507 })
508 .await
509 .unwrap();
510
511 vector_service
512 .index_event(crate::application::services::IndexEventRequest {
513 event_id: id3,
514 tenant_id: "tenant-1".to_string(),
515 embedding: EmbeddingVector::new(vec![0.0, 1.0, 0.0]).unwrap(),
516 source_text: None,
517 })
518 .await
519 .unwrap();
520
521 let response = use_case
523 .find_similar(id1, 2, Some("tenant-1".to_string()))
524 .await
525 .unwrap();
526
527 assert!(!response.results.iter().any(|r| r.event_id == id1));
529 assert!(response.results.len() <= 2);
530
531 assert_eq!(response.results[0].event_id, id2);
533 }
534
535 #[tokio::test]
536 async fn test_validation_errors() {
537 let (use_case, _) = create_test_use_case();
538
539 let result = use_case
541 .execute(SemanticSearchUseCaseRequest {
542 query_embedding: None,
543 ..Default::default()
544 })
545 .await;
546 assert!(result.is_err());
547
548 let result = use_case
550 .execute(SemanticSearchUseCaseRequest {
551 query_embedding: Some(vec![]),
552 ..Default::default()
553 })
554 .await;
555 assert!(result.is_err());
556
557 let result = use_case
559 .execute(SemanticSearchUseCaseRequest {
560 query_embedding: Some(vec![1.0, 0.0, 0.0]),
561 k: Some(0),
562 ..Default::default()
563 })
564 .await;
565 assert!(result.is_err());
566
567 let result = use_case
569 .execute(SemanticSearchUseCaseRequest {
570 query_embedding: Some(vec![1.0, 0.0, 0.0]),
571 k: Some(2000),
572 ..Default::default()
573 })
574 .await;
575 assert!(result.is_err());
576 }
577
578 #[tokio::test]
579 async fn test_index_use_case() {
580 use crate::domain::repositories::VectorSearchRepository;
581
582 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
583 let vector_service = Arc::new(VectorSearchService::new(vector_repo.clone()));
584 let use_case = IndexEventEmbeddingUseCase::new(vector_service);
585
586 let event_id = Uuid::new_v4();
587 use_case
588 .execute(IndexEventEmbeddingRequest {
589 event_id,
590 tenant_id: "tenant-1".to_string(),
591 embedding: vec![1.0, 0.0, 0.0],
592 source_text: Some("test content".to_string()),
593 })
594 .await
595 .unwrap();
596
597 assert_eq!(
598 VectorSearchRepository::count(&*vector_repo, None)
599 .await
600 .unwrap(),
601 1
602 );
603 }
604
605 #[tokio::test]
606 async fn test_batch_index_use_case() {
607 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
608 let vector_service = Arc::new(VectorSearchService::new(vector_repo.clone()));
609 let use_case = IndexEventEmbeddingUseCase::new(vector_service);
610
611 let requests: Vec<_> = (0..5)
612 .map(|i| IndexEventEmbeddingRequest {
613 event_id: Uuid::new_v4(),
614 tenant_id: "tenant-1".to_string(),
615 embedding: vec![i as f32, 0.0, 0.0],
616 source_text: None,
617 })
618 .collect();
619
620 let response = use_case.execute_batch(requests).await.unwrap();
621 assert_eq!(response.indexed, 5);
622 assert_eq!(response.failed, 0);
623 }
624
625 #[tokio::test]
627 async fn test_ingest_embed_search_integration() {
628 use crate::application::{
629 dto::IngestEventRequest, use_cases::ingest_event::IngestEventUseCase,
630 };
631 use std::sync::Mutex;
632
633 struct SharedEventRepository {
635 events: Mutex<Vec<Event>>,
636 }
637
638 impl SharedEventRepository {
639 fn new() -> Self {
640 Self {
641 events: Mutex::new(Vec::new()),
642 }
643 }
644 }
645
646 #[async_trait]
647 impl EventRepository for SharedEventRepository {
648 async fn save(&self, event: &Event) -> Result<()> {
649 let mut events = self.events.lock().unwrap();
650 events.push(Event::reconstruct_from_strings(
651 event.id(),
652 event.event_type_str().to_string(),
653 event.entity_id_str().to_string(),
654 event.tenant_id_str().to_string(),
655 event.payload().clone(),
656 event.timestamp(),
657 event.metadata().cloned(),
658 event.version(),
659 ));
660 Ok(())
661 }
662
663 async fn save_batch(&self, events: &[Event]) -> Result<()> {
664 for event in events {
665 self.save(event).await?;
666 }
667 Ok(())
668 }
669
670 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
671 let events = self.events.lock().unwrap();
672 Ok(events.iter().find(|e| e.id() == id).cloned())
673 }
674
675 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
676 let events = self.events.lock().unwrap();
677 Ok(events
678 .iter()
679 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
680 .cloned()
681 .collect())
682 }
683
684 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
685 let events = self.events.lock().unwrap();
686 Ok(events
687 .iter()
688 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
689 .cloned()
690 .collect())
691 }
692
693 async fn find_by_time_range(
694 &self,
695 tenant_id: &str,
696 start: chrono::DateTime<Utc>,
697 end: chrono::DateTime<Utc>,
698 ) -> Result<Vec<Event>> {
699 let events = self.events.lock().unwrap();
700 Ok(events
701 .iter()
702 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
703 .cloned()
704 .collect())
705 }
706
707 async fn find_by_entity_as_of(
708 &self,
709 entity_id: &str,
710 tenant_id: &str,
711 as_of: chrono::DateTime<Utc>,
712 ) -> Result<Vec<Event>> {
713 let events = self.events.lock().unwrap();
714 Ok(events
715 .iter()
716 .filter(|e| {
717 e.entity_id_str() == entity_id
718 && e.tenant_id_str() == tenant_id
719 && e.occurred_before(as_of)
720 })
721 .cloned()
722 .collect())
723 }
724
725 async fn count(&self, tenant_id: &str) -> Result<usize> {
726 let events = self.events.lock().unwrap();
727 Ok(events
728 .iter()
729 .filter(|e| e.tenant_id_str() == tenant_id)
730 .count())
731 }
732
733 async fn health_check(&self) -> Result<()> {
734 Ok(())
735 }
736 }
737
738 let event_repo = Arc::new(SharedEventRepository::new());
740 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
741 let vector_service = Arc::new(VectorSearchService::new(vector_repo));
742
743 let ingest_use_case = IngestEventUseCase::new(event_repo.clone());
745
746 let response1 = ingest_use_case
747 .execute(IngestEventRequest {
748 event_type: "user.created".to_string(),
749 entity_id: "user-1".to_string(),
750 tenant_id: Some("tenant-1".to_string()),
751 payload: json!({"name": "Alice", "role": "admin"}),
752 metadata: None,
753 expected_version: None,
754 })
755 .await
756 .unwrap();
757
758 let response2 = ingest_use_case
759 .execute(IngestEventRequest {
760 event_type: "order.placed".to_string(),
761 entity_id: "order-1".to_string(),
762 tenant_id: Some("tenant-1".to_string()),
763 payload: json!({"amount": 99.99, "item": "widget"}),
764 metadata: None,
765 expected_version: None,
766 })
767 .await
768 .unwrap();
769
770 let response3 = ingest_use_case
771 .execute(IngestEventRequest {
772 event_type: "user.updated".to_string(),
773 entity_id: "user-1".to_string(),
774 tenant_id: Some("tenant-1".to_string()),
775 payload: json!({"name": "Alice", "role": "superadmin"}),
776 metadata: None,
777 expected_version: None,
778 })
779 .await
780 .unwrap();
781
782 assert_eq!(event_repo.events.lock().unwrap().len(), 3);
784
785 let index_use_case = IndexEventEmbeddingUseCase::new(vector_service.clone());
787
788 index_use_case
790 .execute(IndexEventEmbeddingRequest {
791 event_id: response1.event_id,
792 tenant_id: "tenant-1".to_string(),
793 embedding: vec![0.9, 0.1, 0.0],
794 source_text: Some("user created Alice admin".to_string()),
795 })
796 .await
797 .unwrap();
798
799 index_use_case
801 .execute(IndexEventEmbeddingRequest {
802 event_id: response2.event_id,
803 tenant_id: "tenant-1".to_string(),
804 embedding: vec![0.1, 0.9, 0.0],
805 source_text: Some("order placed widget".to_string()),
806 })
807 .await
808 .unwrap();
809
810 index_use_case
812 .execute(IndexEventEmbeddingRequest {
813 event_id: response3.event_id,
814 tenant_id: "tenant-1".to_string(),
815 embedding: vec![0.85, 0.15, 0.0],
816 source_text: Some("user updated Alice superadmin".to_string()),
817 })
818 .await
819 .unwrap();
820
821 let search_use_case =
823 SemanticSearchUseCase::new(vector_service.clone(), event_repo.clone());
824
825 let search_response = search_use_case
826 .execute(SemanticSearchUseCaseRequest {
827 query_embedding: Some(vec![1.0, 0.0, 0.0]),
828 k: Some(3),
829 tenant_id: Some("tenant-1".to_string()),
830 include_events: Some(true),
831 ..Default::default()
832 })
833 .await
834 .unwrap();
835
836 assert_eq!(search_response.count, 3);
838
839 assert_eq!(search_response.results[0].event_id, response1.event_id);
841 assert_eq!(search_response.results[1].event_id, response3.event_id);
843 assert_eq!(search_response.results[2].event_id, response2.event_id);
845
846 assert!(search_response.results[0].score >= search_response.results[1].score);
848 assert!(search_response.results[1].score >= search_response.results[2].score);
849
850 let events = search_response.events.expect("events should be included");
852 assert_eq!(events.len(), 3);
853
854 assert_eq!(events[0].event_type, "user.created");
856 assert_eq!(events[1].event_type, "user.updated");
857 assert_eq!(events[2].event_type, "order.placed");
858 }
859}