Skip to main content

allsource_core/application/use_cases/
semantic_search.rs

1use crate::application::dto::EventDto;
2use crate::application::services::{SemanticSearchRequest, VectorSearchService};
3use crate::domain::repositories::EventRepository;
4use crate::domain::value_objects::EmbeddingVector;
5use crate::error::{AllSourceError, Result};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use uuid::Uuid;
9
10/// Use Case: Semantic Search
11///
12/// This use case handles semantic (vector-based) search operations.
13///
14/// Responsibilities:
15/// - Validate search parameters
16/// - Execute vector similarity search
17/// - Optionally enrich results with full event data
18/// - Apply filters and pagination
19pub struct SemanticSearchUseCase {
20    vector_service: Arc<VectorSearchService>,
21    event_repository: Arc<dyn EventRepository>,
22}
23
24impl SemanticSearchUseCase {
25    pub fn new(
26        vector_service: Arc<VectorSearchService>,
27        event_repository: Arc<dyn EventRepository>,
28    ) -> Self {
29        Self {
30            vector_service,
31            event_repository,
32        }
33    }
34
35    /// Execute semantic search and return results
36    pub async fn execute(
37        &self,
38        request: SemanticSearchUseCaseRequest,
39    ) -> Result<SemanticSearchUseCaseResponse> {
40        // Validate query
41        let embedding = request
42            .query_embedding
43            .ok_or_else(|| AllSourceError::InvalidInput("query_embedding is required".to_string()))?;
44
45        if embedding.is_empty() {
46            return Err(AllSourceError::InvalidInput(
47                "query_embedding cannot be empty".to_string(),
48            ));
49        }
50
51        // Validate k
52        let k = request.k.unwrap_or(10);
53        if k == 0 {
54            return Err(AllSourceError::InvalidInput(
55                "k must be greater than 0".to_string(),
56            ));
57        }
58        if k > 1000 {
59            return Err(AllSourceError::InvalidInput(
60                "k cannot exceed 1000".to_string(),
61            ));
62        }
63
64        // Build search request
65        let search_request = SemanticSearchRequest {
66            query_embedding: Some(embedding),
67            k: Some(k),
68            tenant_id: request.tenant_id.clone(),
69            event_type: request.event_type.clone(),
70            min_similarity: request.min_similarity,
71            max_distance: request.max_distance,
72            metric: request.metric.clone(),
73            include_events: request.include_events.unwrap_or(false),
74        };
75
76        // Execute search
77        let search_response = self.vector_service.search(search_request).await?;
78
79        // If we need full events, fetch them
80        let events = if request.include_events.unwrap_or(false) {
81            let mut events = Vec::with_capacity(search_response.results.len());
82            for result in &search_response.results {
83                if let Some(event) = self.event_repository.find_by_id(result.event_id).await? {
84                    events.push(EventDto::from(&event));
85                }
86            }
87            Some(events)
88        } else {
89            None
90        };
91
92        Ok(SemanticSearchUseCaseResponse {
93            results: search_response
94                .results
95                .into_iter()
96                .map(|r| SemanticSearchResultDto {
97                    event_id: r.event_id,
98                    score: r.score,
99                    source_text: r.source_text,
100                })
101                .collect(),
102            events,
103            count: search_response.count,
104            metric: search_response.metric,
105            vectors_searched: search_response.stats.vectors_searched,
106            search_time_us: search_response.stats.search_time_us,
107        })
108    }
109
110    /// Find similar events to a given event
111    pub async fn find_similar(
112        &self,
113        event_id: Uuid,
114        k: usize,
115        tenant_id: Option<String>,
116    ) -> Result<SemanticSearchUseCaseResponse> {
117        // Get the embedding for the source event
118        let entry = self
119            .vector_service
120            .get_embedding(event_id)
121            .await?
122            .ok_or_else(|| {
123                AllSourceError::EventNotFound(format!("No embedding found for event {}", event_id))
124            })?;
125
126        // Search for similar events (excluding the source event)
127        let search_request = SemanticSearchRequest {
128            query_embedding: Some(entry.embedding.values().to_vec()),
129            k: Some(k + 1), // Get one extra to exclude the source
130            tenant_id,
131            event_type: None,
132            min_similarity: None,
133            max_distance: None,
134            metric: None,
135            include_events: false,
136        };
137
138        let mut response = self.vector_service.search(search_request).await?;
139
140        // Filter out the source event and limit to k results
141        response.results.retain(|r| r.event_id != event_id);
142        response.results.truncate(k);
143        response.count = response.results.len();
144
145        Ok(SemanticSearchUseCaseResponse {
146            results: response
147                .results
148                .into_iter()
149                .map(|r| SemanticSearchResultDto {
150                    event_id: r.event_id,
151                    score: r.score,
152                    source_text: r.source_text,
153                })
154                .collect(),
155            events: None,
156            count: response.count,
157            metric: response.metric,
158            vectors_searched: response.stats.vectors_searched,
159            search_time_us: response.stats.search_time_us,
160        })
161    }
162}
163
164/// Request for semantic search use case
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct SemanticSearchUseCaseRequest {
167    /// The query embedding vector
168    pub query_embedding: Option<Vec<f32>>,
169    /// Number of results to return (default: 10, max: 1000)
170    pub k: Option<usize>,
171    /// Filter by tenant
172    pub tenant_id: Option<String>,
173    /// Filter by event type
174    pub event_type: Option<String>,
175    /// Minimum similarity threshold
176    pub min_similarity: Option<f32>,
177    /// Maximum distance threshold
178    pub max_distance: Option<f32>,
179    /// Distance metric ("cosine", "euclidean", "dot_product")
180    pub metric: Option<String>,
181    /// Whether to include full event data
182    pub include_events: Option<bool>,
183}
184
185impl Default for SemanticSearchUseCaseRequest {
186    fn default() -> Self {
187        Self {
188            query_embedding: None,
189            k: Some(10),
190            tenant_id: None,
191            event_type: None,
192            min_similarity: None,
193            max_distance: None,
194            metric: None,
195            include_events: None,
196        }
197    }
198}
199
200/// A single result from semantic search
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SemanticSearchResultDto {
203    pub event_id: Uuid,
204    pub score: f32,
205    pub source_text: Option<String>,
206}
207
208/// Response from semantic search use case
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct SemanticSearchUseCaseResponse {
211    /// Search results
212    pub results: Vec<SemanticSearchResultDto>,
213    /// Full event data (if requested)
214    pub events: Option<Vec<EventDto>>,
215    /// Number of results
216    pub count: usize,
217    /// Metric used for scoring
218    pub metric: String,
219    /// Number of vectors searched
220    pub vectors_searched: usize,
221    /// Search time in microseconds
222    pub search_time_us: u64,
223}
224
225/// Use Case: Index Event Embedding
226///
227/// Handles indexing of event embeddings for semantic search.
228pub struct IndexEventEmbeddingUseCase {
229    vector_service: Arc<VectorSearchService>,
230}
231
232impl IndexEventEmbeddingUseCase {
233    pub fn new(vector_service: Arc<VectorSearchService>) -> Self {
234        Self { vector_service }
235    }
236
237    /// Index a single event embedding
238    pub async fn execute(&self, request: IndexEventEmbeddingRequest) -> Result<()> {
239        // Validate embedding
240        let embedding = EmbeddingVector::new(request.embedding)?;
241
242        // Index the embedding
243        self.vector_service
244            .index_event(crate::application::services::IndexEventRequest {
245                event_id: request.event_id,
246                tenant_id: request.tenant_id,
247                embedding,
248                source_text: request.source_text,
249            })
250            .await
251    }
252
253    /// Index multiple embeddings in batch
254    pub async fn execute_batch(
255        &self,
256        requests: Vec<IndexEventEmbeddingRequest>,
257    ) -> Result<BatchIndexResponse> {
258        let mut indexed = 0;
259        let mut failed = 0;
260        let mut errors = Vec::new();
261
262        for request in requests {
263            match EmbeddingVector::new(request.embedding) {
264                Ok(embedding) => {
265                    match self
266                        .vector_service
267                        .index_event(crate::application::services::IndexEventRequest {
268                            event_id: request.event_id,
269                            tenant_id: request.tenant_id,
270                            embedding,
271                            source_text: request.source_text,
272                        })
273                        .await
274                    {
275                        Ok(_) => indexed += 1,
276                        Err(e) => {
277                            failed += 1;
278                            errors.push(format!("Event {}: {}", request.event_id, e));
279                        }
280                    }
281                }
282                Err(e) => {
283                    failed += 1;
284                    errors.push(format!("Event {}: {}", request.event_id, e));
285                }
286            }
287        }
288
289        Ok(BatchIndexResponse {
290            indexed,
291            failed,
292            errors,
293        })
294    }
295}
296
297/// Request to index an event embedding
298#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct IndexEventEmbeddingRequest {
300    pub event_id: Uuid,
301    pub tenant_id: String,
302    pub embedding: Vec<f32>,
303    pub source_text: Option<String>,
304}
305
306/// Response from batch indexing
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct BatchIndexResponse {
309    pub indexed: usize,
310    pub failed: usize,
311    pub errors: Vec<String>,
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use crate::domain::entities::Event;
318    use crate::infrastructure::repositories::InMemoryVectorSearchRepository;
319    use async_trait::async_trait;
320    use chrono::Utc;
321    use serde_json::json;
322
323    // Mock repository for testing
324    struct MockEventRepository {
325        events: Vec<Event>,
326    }
327
328    impl MockEventRepository {
329        fn with_events(events: Vec<Event>) -> Self {
330            Self { events }
331        }
332    }
333
334    #[async_trait]
335    impl EventRepository for MockEventRepository {
336        async fn save(&self, _event: &Event) -> Result<()> {
337            unimplemented!()
338        }
339
340        async fn save_batch(&self, _events: &[Event]) -> Result<()> {
341            unimplemented!()
342        }
343
344        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
345            Ok(self.events.iter().find(|e| e.id() == id).cloned())
346        }
347
348        async fn find_by_entity(&self, _entity_id: &str, _tenant_id: &str) -> Result<Vec<Event>> {
349            unimplemented!()
350        }
351
352        async fn find_by_type(&self, _event_type: &str, _tenant_id: &str) -> Result<Vec<Event>> {
353            unimplemented!()
354        }
355
356        async fn find_by_time_range(
357            &self,
358            _tenant_id: &str,
359            _start: chrono::DateTime<Utc>,
360            _end: chrono::DateTime<Utc>,
361        ) -> Result<Vec<Event>> {
362            unimplemented!()
363        }
364
365        async fn find_by_entity_as_of(
366            &self,
367            _entity_id: &str,
368            _tenant_id: &str,
369            _as_of: chrono::DateTime<Utc>,
370        ) -> Result<Vec<Event>> {
371            unimplemented!()
372        }
373
374        async fn count(&self, _tenant_id: &str) -> Result<usize> {
375            unimplemented!()
376        }
377
378        async fn health_check(&self) -> Result<()> {
379            Ok(())
380        }
381    }
382
383    fn create_test_use_case() -> (SemanticSearchUseCase, Arc<VectorSearchService>) {
384        let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
385        let vector_service = Arc::new(VectorSearchService::new(vector_repo));
386
387        let events = vec![Event::from_strings(
388            "user.created".to_string(),
389            "user-1".to_string(),
390            "tenant-1".to_string(),
391            json!({"name": "Test"}),
392            None,
393        )
394        .unwrap()];
395
396        let event_repo = Arc::new(MockEventRepository::with_events(events));
397
398        (
399            SemanticSearchUseCase::new(vector_service.clone(), event_repo),
400            vector_service,
401        )
402    }
403
404    #[tokio::test]
405    async fn test_semantic_search() {
406        let (use_case, vector_service) = create_test_use_case();
407
408        // Index some embeddings
409        let id1 = Uuid::new_v4();
410        let id2 = Uuid::new_v4();
411
412        vector_service
413            .index_event(crate::application::services::IndexEventRequest {
414                event_id: id1,
415                tenant_id: "tenant-1".to_string(),
416                embedding: EmbeddingVector::new(vec![1.0, 0.0, 0.0]).unwrap(),
417                source_text: Some("first document".to_string()),
418            })
419            .await
420            .unwrap();
421
422        vector_service
423            .index_event(crate::application::services::IndexEventRequest {
424                event_id: id2,
425                tenant_id: "tenant-1".to_string(),
426                embedding: EmbeddingVector::new(vec![0.0, 1.0, 0.0]).unwrap(),
427                source_text: Some("second document".to_string()),
428            })
429            .await
430            .unwrap();
431
432        // Search
433        let response = use_case
434            .execute(SemanticSearchUseCaseRequest {
435                query_embedding: Some(vec![1.0, 0.0, 0.0]),
436                k: Some(2),
437                tenant_id: Some("tenant-1".to_string()),
438                ..Default::default()
439            })
440            .await
441            .unwrap();
442
443        assert_eq!(response.count, 2);
444        assert_eq!(response.results[0].event_id, id1);
445        assert!((response.results[0].score - 1.0).abs() < 1e-6);
446    }
447
448    #[tokio::test]
449    async fn test_find_similar() {
450        let (use_case, vector_service) = create_test_use_case();
451
452        // Index embeddings
453        let id1 = Uuid::new_v4();
454        let id2 = Uuid::new_v4();
455        let id3 = Uuid::new_v4();
456
457        vector_service
458            .index_event(crate::application::services::IndexEventRequest {
459                event_id: id1,
460                tenant_id: "tenant-1".to_string(),
461                embedding: EmbeddingVector::new(vec![1.0, 0.0, 0.0]).unwrap(),
462                source_text: None,
463            })
464            .await
465            .unwrap();
466
467        vector_service
468            .index_event(crate::application::services::IndexEventRequest {
469                event_id: id2,
470                tenant_id: "tenant-1".to_string(),
471                embedding: EmbeddingVector::new(vec![0.9, 0.1, 0.0]).unwrap(),
472                source_text: None,
473            })
474            .await
475            .unwrap();
476
477        vector_service
478            .index_event(crate::application::services::IndexEventRequest {
479                event_id: id3,
480                tenant_id: "tenant-1".to_string(),
481                embedding: EmbeddingVector::new(vec![0.0, 1.0, 0.0]).unwrap(),
482                source_text: None,
483            })
484            .await
485            .unwrap();
486
487        // Find similar to id1
488        let response = use_case
489            .find_similar(id1, 2, Some("tenant-1".to_string()))
490            .await
491            .unwrap();
492
493        // Should not include id1 itself
494        assert!(!response.results.iter().any(|r| r.event_id == id1));
495        assert!(response.results.len() <= 2);
496
497        // id2 should be first (most similar to id1)
498        assert_eq!(response.results[0].event_id, id2);
499    }
500
501    #[tokio::test]
502    async fn test_validation_errors() {
503        let (use_case, _) = create_test_use_case();
504
505        // Missing embedding
506        let result = use_case
507            .execute(SemanticSearchUseCaseRequest {
508                query_embedding: None,
509                ..Default::default()
510            })
511            .await;
512        assert!(result.is_err());
513
514        // Empty embedding
515        let result = use_case
516            .execute(SemanticSearchUseCaseRequest {
517                query_embedding: Some(vec![]),
518                ..Default::default()
519            })
520            .await;
521        assert!(result.is_err());
522
523        // k = 0
524        let result = use_case
525            .execute(SemanticSearchUseCaseRequest {
526                query_embedding: Some(vec![1.0, 0.0, 0.0]),
527                k: Some(0),
528                ..Default::default()
529            })
530            .await;
531        assert!(result.is_err());
532
533        // k too large
534        let result = use_case
535            .execute(SemanticSearchUseCaseRequest {
536                query_embedding: Some(vec![1.0, 0.0, 0.0]),
537                k: Some(2000),
538                ..Default::default()
539            })
540            .await;
541        assert!(result.is_err());
542    }
543
544    #[tokio::test]
545    async fn test_index_use_case() {
546        use crate::domain::repositories::VectorSearchRepository;
547
548        let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
549        let vector_service = Arc::new(VectorSearchService::new(vector_repo.clone()));
550        let use_case = IndexEventEmbeddingUseCase::new(vector_service);
551
552        let event_id = Uuid::new_v4();
553        use_case
554            .execute(IndexEventEmbeddingRequest {
555                event_id,
556                tenant_id: "tenant-1".to_string(),
557                embedding: vec![1.0, 0.0, 0.0],
558                source_text: Some("test content".to_string()),
559            })
560            .await
561            .unwrap();
562
563        assert_eq!(VectorSearchRepository::count(&*vector_repo, None).await.unwrap(), 1);
564    }
565
566    #[tokio::test]
567    async fn test_batch_index_use_case() {
568        let vector_repo = Arc::new(InMemoryVectorSearchRepository::new());
569        let vector_service = Arc::new(VectorSearchService::new(vector_repo.clone()));
570        let use_case = IndexEventEmbeddingUseCase::new(vector_service);
571
572        let requests: Vec<_> = (0..5)
573            .map(|i| IndexEventEmbeddingRequest {
574                event_id: Uuid::new_v4(),
575                tenant_id: "tenant-1".to_string(),
576                embedding: vec![i as f32, 0.0, 0.0],
577                source_text: None,
578            })
579            .collect();
580
581        let response = use_case.execute_batch(requests).await.unwrap();
582        assert_eq!(response.indexed, 5);
583        assert_eq!(response.failed, 0);
584    }
585}