Skip to main content

allsource_core/application/use_cases/
query_events.rs

1use crate::{
2    application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse},
3    domain::repositories::EventRepository,
4    error::Result,
5};
6use std::sync::Arc;
7
8/// Use Case: Query Events
9///
10/// This use case handles querying events from the event store with various filters.
11///
12/// Responsibilities:
13/// - Validate query parameters
14/// - Determine query strategy (by entity, by type, by time range, etc.)
15/// - Execute query via repository
16/// - Transform domain events to DTOs
17/// - Apply limits and pagination
18pub struct QueryEventsUseCase {
19    repository: Arc<dyn EventRepository>,
20}
21
22impl QueryEventsUseCase {
23    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24        Self { repository }
25    }
26
27    pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
28        // Determine tenant_id (default to "default" if not provided)
29        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31        // Determine query strategy based on filters
32        let mut events = if let Some(entity_id) = request.entity_id {
33            // Query by entity (most specific)
34            if let Some(as_of) = request.as_of {
35                // Time-travel query
36                self.repository
37                    .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
38                    .await?
39            } else {
40                self.repository
41                    .find_by_entity(&entity_id, &tenant_id)
42                    .await?
43            }
44        } else if let Some(event_type) = request.event_type {
45            // Query by type
46            self.repository
47                .find_by_type(&event_type, &tenant_id)
48                .await?
49        } else if let (Some(since), Some(until)) = (request.since, request.until) {
50            // Query by time range
51            self.repository
52                .find_by_time_range(&tenant_id, since, until)
53                .await?
54        } else {
55            // No specific filter - this could be expensive!
56            // In production, you might want to require at least one filter
57            return Err(crate::error::Error::InvalidInput(
58                "Query requires at least one filter (entity_id, event_type, or time range)"
59                    .to_string(),
60            ));
61        };
62
63        // Apply time filters if provided (for non-time-range queries)
64        if let Some(since) = request.since {
65            events.retain(|e| e.occurred_after(since));
66        }
67        if let Some(until) = request.until {
68            events.retain(|e| e.occurred_before(until));
69        }
70
71        // Apply limit
72        if let Some(limit) = request.limit {
73            events.truncate(limit);
74        }
75
76        // Convert to DTOs
77        let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
78        let count = event_dtos.len();
79
80        Ok(QueryEventsResponse {
81            events: event_dtos,
82            count,
83        })
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use crate::domain::entities::Event;
91    use async_trait::async_trait;
92    use chrono::Utc;
93    use serde_json::json;
94    use uuid::Uuid;
95
96    // Mock repository for testing
97    struct MockEventRepository {
98        events: Vec<Event>,
99    }
100
101    impl MockEventRepository {
102        fn with_events(events: Vec<Event>) -> Self {
103            Self { events }
104        }
105    }
106
107    #[async_trait]
108    impl EventRepository for MockEventRepository {
109        async fn save(&self, _event: &Event) -> Result<()> {
110            Ok(())
111        }
112
113        async fn save_batch(&self, _events: &[Event]) -> Result<()> {
114            Ok(())
115        }
116
117        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
118            Ok(self.events.iter().find(|e| e.id() == id).cloned())
119        }
120
121        async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
122            Ok(self
123                .events
124                .iter()
125                .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
126                .cloned()
127                .collect())
128        }
129
130        async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
131            Ok(self
132                .events
133                .iter()
134                .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
135                .cloned()
136                .collect())
137        }
138
139        async fn find_by_time_range(
140            &self,
141            tenant_id: &str,
142            start: chrono::DateTime<Utc>,
143            end: chrono::DateTime<Utc>,
144        ) -> Result<Vec<Event>> {
145            Ok(self
146                .events
147                .iter()
148                .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
149                .cloned()
150                .collect())
151        }
152
153        async fn find_by_entity_as_of(
154            &self,
155            entity_id: &str,
156            tenant_id: &str,
157            as_of: chrono::DateTime<Utc>,
158        ) -> Result<Vec<Event>> {
159            Ok(self
160                .events
161                .iter()
162                .filter(|e| {
163                    e.entity_id_str() == entity_id
164                        && e.tenant_id_str() == tenant_id
165                        && e.occurred_before(as_of)
166                })
167                .cloned()
168                .collect())
169        }
170
171        async fn count(&self, tenant_id: &str) -> Result<usize> {
172            Ok(self
173                .events
174                .iter()
175                .filter(|e| e.tenant_id_str() == tenant_id)
176                .count())
177        }
178
179        async fn health_check(&self) -> Result<()> {
180            Ok(())
181        }
182    }
183
184    fn create_test_events() -> Vec<Event> {
185        vec![
186            Event::from_strings(
187                "user.created".to_string(),
188                "user-1".to_string(),
189                "tenant-1".to_string(),
190                json!({"name": "Alice"}),
191                None,
192            )
193            .unwrap(),
194            Event::from_strings(
195                "user.created".to_string(),
196                "user-2".to_string(),
197                "tenant-1".to_string(),
198                json!({"name": "Bob"}),
199                None,
200            )
201            .unwrap(),
202            Event::from_strings(
203                "order.placed".to_string(),
204                "order-1".to_string(),
205                "tenant-1".to_string(),
206                json!({"amount": 100}),
207                None,
208            )
209            .unwrap(),
210        ]
211    }
212
213    #[tokio::test]
214    async fn test_query_by_entity() {
215        let events = create_test_events();
216        let entity_id = events[0].entity_id().to_string();
217        let repo = Arc::new(MockEventRepository::with_events(events));
218        let use_case = QueryEventsUseCase::new(repo);
219
220        let request = QueryEventsRequest {
221            entity_id: Some(entity_id),
222            event_type: None,
223            tenant_id: Some("tenant-1".to_string()),
224            as_of: None,
225            since: None,
226            until: None,
227            limit: None,
228        };
229
230        let response = use_case.execute(request).await;
231        assert!(response.is_ok());
232
233        let response = response.unwrap();
234        assert_eq!(response.count, 1);
235    }
236
237    #[tokio::test]
238    async fn test_query_by_type() {
239        let events = create_test_events();
240        let repo = Arc::new(MockEventRepository::with_events(events));
241        let use_case = QueryEventsUseCase::new(repo);
242
243        let request = QueryEventsRequest {
244            entity_id: None,
245            event_type: Some("user.created".to_string()),
246            tenant_id: Some("tenant-1".to_string()),
247            as_of: None,
248            since: None,
249            until: None,
250            limit: None,
251        };
252
253        let response = use_case.execute(request).await;
254        assert!(response.is_ok());
255
256        let response = response.unwrap();
257        assert_eq!(response.count, 2);
258    }
259
260    #[tokio::test]
261    async fn test_query_with_limit() {
262        let events = create_test_events();
263        let repo = Arc::new(MockEventRepository::with_events(events));
264        let use_case = QueryEventsUseCase::new(repo);
265
266        let request = QueryEventsRequest {
267            entity_id: None,
268            event_type: Some("user.created".to_string()),
269            tenant_id: Some("tenant-1".to_string()),
270            as_of: None,
271            since: None,
272            until: None,
273            limit: Some(1),
274        };
275
276        let response = use_case.execute(request).await;
277        assert!(response.is_ok());
278
279        let response = response.unwrap();
280        assert_eq!(response.count, 1);
281    }
282
283    #[tokio::test]
284    async fn test_query_requires_filter() {
285        let events = create_test_events();
286        let repo = Arc::new(MockEventRepository::with_events(events));
287        let use_case = QueryEventsUseCase::new(repo);
288
289        let request = QueryEventsRequest {
290            entity_id: None,
291            event_type: None,
292            tenant_id: Some("tenant-1".to_string()),
293            as_of: None,
294            since: None,
295            until: None,
296            limit: None,
297        };
298
299        let response = use_case.execute(request).await;
300        assert!(response.is_err());
301    }
302}