allsource_core/application/use_cases/
query_events.rs

1use crate::application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse};
2use crate::domain::repositories::EventRepository;
3use crate::error::Result;
4use std::sync::Arc;
5
6/// Use Case: Query Events
7///
8/// This use case handles querying events from the event store with various filters.
9///
10/// Responsibilities:
11/// - Validate query parameters
12/// - Determine query strategy (by entity, by type, by time range, etc.)
13/// - Execute query via repository
14/// - Transform domain events to DTOs
15/// - Apply limits and pagination
16pub struct QueryEventsUseCase {
17    repository: Arc<dyn EventRepository>,
18}
19
20impl QueryEventsUseCase {
21    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
22        Self { repository }
23    }
24
25    pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
26        // Determine tenant_id (default to "default" if not provided)
27        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
28
29        // Determine query strategy based on filters
30        let mut events = if let Some(entity_id) = request.entity_id {
31            // Query by entity (most specific)
32            if let Some(as_of) = request.as_of {
33                // Time-travel query
34                self.repository
35                    .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
36                    .await?
37            } else {
38                self.repository
39                    .find_by_entity(&entity_id, &tenant_id)
40                    .await?
41            }
42        } else if let Some(event_type) = request.event_type {
43            // Query by type
44            self.repository
45                .find_by_type(&event_type, &tenant_id)
46                .await?
47        } else if let (Some(since), Some(until)) = (request.since, request.until) {
48            // Query by time range
49            self.repository
50                .find_by_time_range(&tenant_id, since, until)
51                .await?
52        } else {
53            // No specific filter - this could be expensive!
54            // In production, you might want to require at least one filter
55            return Err(crate::error::Error::InvalidInput(
56                "Query requires at least one filter (entity_id, event_type, or time range)"
57                    .to_string(),
58            ));
59        };
60
61        // Apply time filters if provided (for non-time-range queries)
62        if let Some(since) = request.since {
63            events.retain(|e| e.occurred_after(since));
64        }
65        if let Some(until) = request.until {
66            events.retain(|e| e.occurred_before(until));
67        }
68
69        // Apply limit
70        if let Some(limit) = request.limit {
71            events.truncate(limit);
72        }
73
74        // Convert to DTOs
75        let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
76        let count = event_dtos.len();
77
78        Ok(QueryEventsResponse {
79            events: event_dtos,
80            count,
81        })
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::domain::entities::Event;
89    use async_trait::async_trait;
90    use chrono::{Duration, Utc};
91    use serde_json::json;
92    use uuid::Uuid;
93
94    // Mock repository for testing
95    struct MockEventRepository {
96        events: Vec<Event>,
97    }
98
99    impl MockEventRepository {
100        fn with_events(events: Vec<Event>) -> Self {
101            Self { events }
102        }
103    }
104
105    #[async_trait]
106    impl EventRepository for MockEventRepository {
107        async fn save(&self, _event: &Event) -> Result<()> {
108            unimplemented!()
109        }
110
111        async fn save_batch(&self, _events: &[Event]) -> Result<()> {
112            unimplemented!()
113        }
114
115        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
116            Ok(self.events.iter().find(|e| e.id() == id).cloned())
117        }
118
119        async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
120            Ok(self
121                .events
122                .iter()
123                .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
124                .cloned()
125                .collect())
126        }
127
128        async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
129            Ok(self
130                .events
131                .iter()
132                .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
133                .cloned()
134                .collect())
135        }
136
137        async fn find_by_time_range(
138            &self,
139            tenant_id: &str,
140            start: chrono::DateTime<Utc>,
141            end: chrono::DateTime<Utc>,
142        ) -> Result<Vec<Event>> {
143            Ok(self
144                .events
145                .iter()
146                .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
147                .cloned()
148                .collect())
149        }
150
151        async fn find_by_entity_as_of(
152            &self,
153            entity_id: &str,
154            tenant_id: &str,
155            as_of: chrono::DateTime<Utc>,
156        ) -> Result<Vec<Event>> {
157            Ok(self
158                .events
159                .iter()
160                .filter(|e| {
161                    e.entity_id_str() == entity_id
162                        && e.tenant_id_str() == tenant_id
163                        && e.occurred_before(as_of)
164                })
165                .cloned()
166                .collect())
167        }
168
169        async fn count(&self, tenant_id: &str) -> Result<usize> {
170            Ok(self
171                .events
172                .iter()
173                .filter(|e| e.tenant_id_str() == tenant_id)
174                .count())
175        }
176
177        async fn health_check(&self) -> Result<()> {
178            Ok(())
179        }
180    }
181
182    fn create_test_events() -> Vec<Event> {
183        vec![
184            Event::from_strings(
185                "user.created".to_string(),
186                "user-1".to_string(),
187                "tenant-1".to_string(),
188                json!({"name": "Alice"}),
189                None,
190            )
191            .unwrap(),
192            Event::from_strings(
193                "user.created".to_string(),
194                "user-2".to_string(),
195                "tenant-1".to_string(),
196                json!({"name": "Bob"}),
197                None,
198            )
199            .unwrap(),
200            Event::from_strings(
201                "order.placed".to_string(),
202                "order-1".to_string(),
203                "tenant-1".to_string(),
204                json!({"amount": 100}),
205                None,
206            )
207            .unwrap(),
208        ]
209    }
210
211    #[tokio::test]
212    async fn test_query_by_entity() {
213        let events = create_test_events();
214        let entity_id = events[0].entity_id().to_string();
215        let repo = Arc::new(MockEventRepository::with_events(events));
216        let use_case = QueryEventsUseCase::new(repo);
217
218        let request = QueryEventsRequest {
219            entity_id: Some(entity_id),
220            event_type: None,
221            tenant_id: Some("tenant-1".to_string()),
222            as_of: None,
223            since: None,
224            until: None,
225            limit: None,
226        };
227
228        let response = use_case.execute(request).await;
229        assert!(response.is_ok());
230
231        let response = response.unwrap();
232        assert_eq!(response.count, 1);
233    }
234
235    #[tokio::test]
236    async fn test_query_by_type() {
237        let events = create_test_events();
238        let repo = Arc::new(MockEventRepository::with_events(events));
239        let use_case = QueryEventsUseCase::new(repo);
240
241        let request = QueryEventsRequest {
242            entity_id: None,
243            event_type: Some("user.created".to_string()),
244            tenant_id: Some("tenant-1".to_string()),
245            as_of: None,
246            since: None,
247            until: None,
248            limit: None,
249        };
250
251        let response = use_case.execute(request).await;
252        assert!(response.is_ok());
253
254        let response = response.unwrap();
255        assert_eq!(response.count, 2);
256    }
257
258    #[tokio::test]
259    async fn test_query_with_limit() {
260        let events = create_test_events();
261        let repo = Arc::new(MockEventRepository::with_events(events));
262        let use_case = QueryEventsUseCase::new(repo);
263
264        let request = QueryEventsRequest {
265            entity_id: None,
266            event_type: Some("user.created".to_string()),
267            tenant_id: Some("tenant-1".to_string()),
268            as_of: None,
269            since: None,
270            until: None,
271            limit: Some(1),
272        };
273
274        let response = use_case.execute(request).await;
275        assert!(response.is_ok());
276
277        let response = response.unwrap();
278        assert_eq!(response.count, 1);
279    }
280
281    #[tokio::test]
282    async fn test_query_requires_filter() {
283        let events = create_test_events();
284        let repo = Arc::new(MockEventRepository::with_events(events));
285        let use_case = QueryEventsUseCase::new(repo);
286
287        let request = QueryEventsRequest {
288            entity_id: None,
289            event_type: None,
290            tenant_id: Some("tenant-1".to_string()),
291            as_of: None,
292            since: None,
293            until: None,
294            limit: None,
295        };
296
297        let response = use_case.execute(request).await;
298        assert!(response.is_err());
299    }
300}