1use crate::application::dto::EventDto;
2use crate::application::services::{SemanticSearchRequest, VectorSearchService};
3use crate::domain::repositories::EventRepository;
4use crate::domain::value_objects::EmbeddingVector;
5use crate::error::{AllSourceError, Result};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use uuid::Uuid;
9
10pub struct SemanticSearchUseCase {
20 vector_service: Arc<VectorSearchService>,
21 event_repository: Arc<dyn EventRepository>,
22}
23
24impl SemanticSearchUseCase {
25 pub fn new(
26 vector_service: Arc<VectorSearchService>,
27 event_repository: Arc<dyn EventRepository>,
28 ) -> Self {
29 Self {
30 vector_service,
31 event_repository,
32 }
33 }
34
35 pub async fn execute(
37 &self,
38 request: SemanticSearchUseCaseRequest,
39 ) -> Result<SemanticSearchUseCaseResponse> {
40 let embedding = request
42 .query_embedding
43 .ok_or_else(|| AllSourceError::InvalidInput("query_embedding is required".to_string()))?;
44
45 if embedding.is_empty() {
46 return Err(AllSourceError::InvalidInput(
47 "query_embedding cannot be empty".to_string(),
48 ));
49 }
50
51 let k = request.k.unwrap_or(10);
53 if k == 0 {
54 return Err(AllSourceError::InvalidInput(
55 "k must be greater than 0".to_string(),
56 ));
57 }
58 if k > 1000 {
59 return Err(AllSourceError::InvalidInput(
60 "k cannot exceed 1000".to_string(),
61 ));
62 }
63
64 let search_request = SemanticSearchRequest {
66 query_embedding: Some(embedding),
67 k: Some(k),
68 tenant_id: request.tenant_id.clone(),
69 event_type: request.event_type.clone(),
70 min_similarity: request.min_similarity,
71 max_distance: request.max_distance,
72 metric: request.metric.clone(),
73 include_events: request.include_events.unwrap_or(false),
74 };
75
76 let search_response = self.vector_service.search(search_request).await?;
78
79 let events = if request.include_events.unwrap_or(false) {
81 let mut events = Vec::with_capacity(search_response.results.len());
82 for result in &search_response.results {
83 if let Some(event) = self.event_repository.find_by_id(result.event_id).await? {
84 events.push(EventDto::from(&event));
85 }
86 }
87 Some(events)
88 } else {
89 None
90 };
91
92 Ok(SemanticSearchUseCaseResponse {
93 results: search_response
94 .results
95 .into_iter()
96 .map(|r| SemanticSearchResultDto {
97 event_id: r.event_id,
98 score: r.score,
99 source_text: r.source_text,
100 })
101 .collect(),
102 events,
103 count: search_response.count,
104 metric: search_response.metric,
105 vectors_searched: search_response.stats.vectors_searched,
106 search_time_us: search_response.stats.search_time_us,
107 })
108 }
109
110 pub async fn find_similar(
112 &self,
113 event_id: Uuid,
114 k: usize,
115 tenant_id: Option<String>,
116 ) -> Result<SemanticSearchUseCaseResponse> {
117 let entry = self
119 .vector_service
120 .get_embedding(event_id)
121 .await?
122 .ok_or_else(|| {
123 AllSourceError::EventNotFound(format!("No embedding found for event {}", event_id))
124 })?;
125
126 let search_request = SemanticSearchRequest {
128 query_embedding: Some(entry.embedding.values().to_vec()),
129 k: Some(k + 1), tenant_id,
131 event_type: None,
132 min_similarity: None,
133 max_distance: None,
134 metric: None,
135 include_events: false,
136 };
137
138 let mut response = self.vector_service.search(search_request).await?;
139
140 response.results.retain(|r| r.event_id != event_id);
142 response.results.truncate(k);
143 response.count = response.results.len();
144
145 Ok(SemanticSearchUseCaseResponse {
146 results: response
147 .results
148 .into_iter()
149 .map(|r| SemanticSearchResultDto {
150 event_id: r.event_id,
151 score: r.score,
152 source_text: r.source_text,
153 })
154 .collect(),
155 events: None,
156 count: response.count,
157 metric: response.metric,
158 vectors_searched: response.stats.vectors_searched,
159 search_time_us: response.stats.search_time_us,
160 })
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct SemanticSearchUseCaseRequest {
167 pub query_embedding: Option<Vec<f32>>,
169 pub k: Option<usize>,
171 pub tenant_id: Option<String>,
173 pub event_type: Option<String>,
175 pub min_similarity: Option<f32>,
177 pub max_distance: Option<f32>,
179 pub metric: Option<String>,
181 pub include_events: Option<bool>,
183}
184
185impl Default for SemanticSearchUseCaseRequest {
186 fn default() -> Self {
187 Self {
188 query_embedding: None,
189 k: Some(10),
190 tenant_id: None,
191 event_type: None,
192 min_similarity: None,
193 max_distance: None,
194 metric: None,
195 include_events: None,
196 }
197 }
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SemanticSearchResultDto {
203 pub event_id: Uuid,
204 pub score: f32,
205 pub source_text: Option<String>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct SemanticSearchUseCaseResponse {
211 pub results: Vec<SemanticSearchResultDto>,
213 pub events: Option<Vec<EventDto>>,
215 pub count: usize,
217 pub metric: String,
219 pub vectors_searched: usize,
221 pub search_time_us: u64,
223}
224
225pub struct IndexEventEmbeddingUseCase {
229 vector_service: Arc<VectorSearchService>,
230}
231
232impl IndexEventEmbeddingUseCase {
233 pub fn new(vector_service: Arc<VectorSearchService>) -> Self {
234 Self { vector_service }
235 }
236
237 pub async fn execute(&self, request: IndexEventEmbeddingRequest) -> Result<()> {
239 let embedding = EmbeddingVector::new(request.embedding)?;
241
242 self.vector_service
244 .index_event(crate::application::services::IndexEventRequest {
245 event_id: request.event_id,
246 tenant_id: request.tenant_id,
247 embedding,
248 source_text: request.source_text,
249 })
250 .await
251 }
252
253 pub async fn execute_batch(
255 &self,
256 requests: Vec<IndexEventEmbeddingRequest>,
257 ) -> Result<BatchIndexResponse> {
258 let mut indexed = 0;
259 let mut failed = 0;
260 let mut errors = Vec::new();
261
262 for request in requests {
263 match EmbeddingVector::new(request.embedding) {
264 Ok(embedding) => {
265 match self
266 .vector_service
267 .index_event(crate::application::services::IndexEventRequest {
268 event_id: request.event_id,
269 tenant_id: request.tenant_id,
270 embedding,
271 source_text: request.source_text,
272 })
273 .await
274 {
275 Ok(_) => indexed += 1,
276 Err(e) => {
277 failed += 1;
278 errors.push(format!("Event {}: {}", request.event_id, e));
279 }
280 }
281 }
282 Err(e) => {
283 failed += 1;
284 errors.push(format!("Event {}: {}", request.event_id, e));
285 }
286 }
287 }
288
289 Ok(BatchIndexResponse {
290 indexed,
291 failed,
292 errors,
293 })
294 }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct IndexEventEmbeddingRequest {
300 pub event_id: Uuid,
301 pub tenant_id: String,
302 pub embedding: Vec<f32>,
303 pub source_text: Option<String>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct BatchIndexResponse {
309 pub indexed: usize,
310 pub failed: usize,
311 pub errors: Vec<String>,
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use crate::domain::entities::Event;
318 use crate::infrastructure::repositories::InMemoryVectorSearchRepository;
319 use async_trait::async_trait;
320 use chrono::Utc;
321 use serde_json::json;
322
323 struct MockEventRepository {
325 events: Vec<Event>,
326 }
327
328 impl MockEventRepository {
329 fn with_events(events: Vec<Event>) -> Self {
330 Self { events }
331 }
332 }
333
334 #[async_trait]
335 impl EventRepository for MockEventRepository {
336 async fn save(&self, _event: &Event) -> Result<()> {
337 unimplemented!()
338 }
339
340 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
341 unimplemented!()
342 }
343
344 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
345 Ok(self.events.iter().find(|e| e.id() == id).cloned())
346 }
347
348 async fn find_by_entity(&self, _entity_id: &str, _tenant_id: &str) -> Result<Vec<Event>> {
349 unimplemented!()
350 }
351
352 async fn find_by_type(&self, _event_type: &str, _tenant_id: &str) -> Result<Vec<Event>> {
353 unimplemented!()
354 }
355
356 async fn find_by_time_range(
357 &self,
358 _tenant_id: &str,
359 _start: chrono::DateTime<Utc>,
360 _end: chrono::DateTime<Utc>,
361 ) -> Result<Vec<Event>> {
362 unimplemented!()
363 }
364
365 async fn find_by_entity_as_of(
366 &self,
367 _entity_id: &str,
368 _tenant_id: &str,
369 _as_of: chrono::DateTime<Utc>,
370 ) -> Result<Vec<Event>> {
371 unimplemented!()
372 }
373
374 async fn count(&self, _tenant_id: &str) -> Result<usize> {
375 unimplemented!()
376 }
377
378 async fn health_check(&self) -> Result<()> {
379 Ok(())
380 }
381 }
382
383 fn create_test_use_case() -> (SemanticSearchUseCase, Arc<VectorSearchService>) {
384 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
385 let vector_service = Arc::new(VectorSearchService::new(vector_repo));
386
387 let events = vec![Event::from_strings(
388 "user.created".to_string(),
389 "user-1".to_string(),
390 "tenant-1".to_string(),
391 json!({"name": "Test"}),
392 None,
393 )
394 .unwrap()];
395
396 let event_repo = Arc::new(MockEventRepository::with_events(events));
397
398 (
399 SemanticSearchUseCase::new(vector_service.clone(), event_repo),
400 vector_service,
401 )
402 }
403
404 #[tokio::test]
405 async fn test_semantic_search() {
406 let (use_case, vector_service) = create_test_use_case();
407
408 let id1 = Uuid::new_v4();
410 let id2 = Uuid::new_v4();
411
412 vector_service
413 .index_event(crate::application::services::IndexEventRequest {
414 event_id: id1,
415 tenant_id: "tenant-1".to_string(),
416 embedding: EmbeddingVector::new(vec![1.0, 0.0, 0.0]).unwrap(),
417 source_text: Some("first document".to_string()),
418 })
419 .await
420 .unwrap();
421
422 vector_service
423 .index_event(crate::application::services::IndexEventRequest {
424 event_id: id2,
425 tenant_id: "tenant-1".to_string(),
426 embedding: EmbeddingVector::new(vec![0.0, 1.0, 0.0]).unwrap(),
427 source_text: Some("second document".to_string()),
428 })
429 .await
430 .unwrap();
431
432 let response = use_case
434 .execute(SemanticSearchUseCaseRequest {
435 query_embedding: Some(vec![1.0, 0.0, 0.0]),
436 k: Some(2),
437 tenant_id: Some("tenant-1".to_string()),
438 ..Default::default()
439 })
440 .await
441 .unwrap();
442
443 assert_eq!(response.count, 2);
444 assert_eq!(response.results[0].event_id, id1);
445 assert!((response.results[0].score - 1.0).abs() < 1e-6);
446 }
447
448 #[tokio::test]
449 async fn test_find_similar() {
450 let (use_case, vector_service) = create_test_use_case();
451
452 let id1 = Uuid::new_v4();
454 let id2 = Uuid::new_v4();
455 let id3 = Uuid::new_v4();
456
457 vector_service
458 .index_event(crate::application::services::IndexEventRequest {
459 event_id: id1,
460 tenant_id: "tenant-1".to_string(),
461 embedding: EmbeddingVector::new(vec![1.0, 0.0, 0.0]).unwrap(),
462 source_text: None,
463 })
464 .await
465 .unwrap();
466
467 vector_service
468 .index_event(crate::application::services::IndexEventRequest {
469 event_id: id2,
470 tenant_id: "tenant-1".to_string(),
471 embedding: EmbeddingVector::new(vec![0.9, 0.1, 0.0]).unwrap(),
472 source_text: None,
473 })
474 .await
475 .unwrap();
476
477 vector_service
478 .index_event(crate::application::services::IndexEventRequest {
479 event_id: id3,
480 tenant_id: "tenant-1".to_string(),
481 embedding: EmbeddingVector::new(vec![0.0, 1.0, 0.0]).unwrap(),
482 source_text: None,
483 })
484 .await
485 .unwrap();
486
487 let response = use_case
489 .find_similar(id1, 2, Some("tenant-1".to_string()))
490 .await
491 .unwrap();
492
493 assert!(!response.results.iter().any(|r| r.event_id == id1));
495 assert!(response.results.len() <= 2);
496
497 assert_eq!(response.results[0].event_id, id2);
499 }
500
501 #[tokio::test]
502 async fn test_validation_errors() {
503 let (use_case, _) = create_test_use_case();
504
505 let result = use_case
507 .execute(SemanticSearchUseCaseRequest {
508 query_embedding: None,
509 ..Default::default()
510 })
511 .await;
512 assert!(result.is_err());
513
514 let result = use_case
516 .execute(SemanticSearchUseCaseRequest {
517 query_embedding: Some(vec![]),
518 ..Default::default()
519 })
520 .await;
521 assert!(result.is_err());
522
523 let result = use_case
525 .execute(SemanticSearchUseCaseRequest {
526 query_embedding: Some(vec![1.0, 0.0, 0.0]),
527 k: Some(0),
528 ..Default::default()
529 })
530 .await;
531 assert!(result.is_err());
532
533 let result = use_case
535 .execute(SemanticSearchUseCaseRequest {
536 query_embedding: Some(vec![1.0, 0.0, 0.0]),
537 k: Some(2000),
538 ..Default::default()
539 })
540 .await;
541 assert!(result.is_err());
542 }
543
544 #[tokio::test]
545 async fn test_index_use_case() {
546 use crate::domain::repositories::VectorSearchRepository;
547
548 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
549 let vector_service = Arc::new(VectorSearchService::new(vector_repo.clone()));
550 let use_case = IndexEventEmbeddingUseCase::new(vector_service);
551
552 let event_id = Uuid::new_v4();
553 use_case
554 .execute(IndexEventEmbeddingRequest {
555 event_id,
556 tenant_id: "tenant-1".to_string(),
557 embedding: vec![1.0, 0.0, 0.0],
558 source_text: Some("test content".to_string()),
559 })
560 .await
561 .unwrap();
562
563 assert_eq!(VectorSearchRepository::count(&*vector_repo, None).await.unwrap(), 1);
564 }
565
566 #[tokio::test]
567 async fn test_batch_index_use_case() {
568 let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
569 let vector_service = Arc::new(VectorSearchService::new(vector_repo.clone()));
570 let use_case = IndexEventEmbeddingUseCase::new(vector_service);
571
572 let requests: Vec<_> = (0..5)
573 .map(|i| IndexEventEmbeddingRequest {
574 event_id: Uuid::new_v4(),
575 tenant_id: "tenant-1".to_string(),
576 embedding: vec![i as f32, 0.0, 0.0],
577 source_text: None,
578 })
579 .collect();
580
581 let response = use_case.execute_batch(requests).await.unwrap();
582 assert_eq!(response.indexed, 5);
583 assert_eq!(response.failed, 0);
584 }
585}