allsource_core/application/use_cases/
query_events.rs

1use std::sync::Arc;
2use crate::domain::repositories::EventRepository;
3use crate::application::dto::{QueryEventsRequest, QueryEventsResponse, EventDto};
4use crate::error::Result;
5
6/// Use Case: Query Events
7///
8/// This use case handles querying events from the event store with various filters.
9///
10/// Responsibilities:
11/// - Validate query parameters
12/// - Determine query strategy (by entity, by type, by time range, etc.)
13/// - Execute query via repository
14/// - Transform domain events to DTOs
15/// - Apply limits and pagination
16pub struct QueryEventsUseCase {
17    repository: Arc<dyn EventRepository>,
18}
19
20impl QueryEventsUseCase {
21    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
22        Self { repository }
23    }
24
25    pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
26        // Determine tenant_id (default to "default" if not provided)
27        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
28
29        // Determine query strategy based on filters
30        let mut events = if let Some(entity_id) = request.entity_id {
31            // Query by entity (most specific)
32            if let Some(as_of) = request.as_of {
33                // Time-travel query
34                self.repository
35                    .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
36                    .await?
37            } else {
38                self.repository.find_by_entity(&entity_id, &tenant_id).await?
39            }
40        } else if let Some(event_type) = request.event_type {
41            // Query by type
42            self.repository.find_by_type(&event_type, &tenant_id).await?
43        } else if let (Some(since), Some(until)) = (request.since, request.until) {
44            // Query by time range
45            self.repository
46                .find_by_time_range(&tenant_id, since, until)
47                .await?
48        } else {
49            // No specific filter - this could be expensive!
50            // In production, you might want to require at least one filter
51            return Err(crate::error::Error::InvalidInput(
52                "Query requires at least one filter (entity_id, event_type, or time range)"
53                    .to_string(),
54            ));
55        };
56
57        // Apply time filters if provided (for non-time-range queries)
58        if let Some(since) = request.since {
59            events.retain(|e| e.occurred_after(since));
60        }
61        if let Some(until) = request.until {
62            events.retain(|e| e.occurred_before(until));
63        }
64
65        // Apply limit
66        if let Some(limit) = request.limit {
67            events.truncate(limit);
68        }
69
70        // Convert to DTOs
71        let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
72        let count = event_dtos.len();
73
74        Ok(QueryEventsResponse {
75            events: event_dtos,
76            count,
77        })
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use async_trait::async_trait;
85    use crate::domain::entities::Event;
86    use serde_json::json;
87    use chrono::{Utc, Duration};
88    use uuid::Uuid;
89
90    // Mock repository for testing
91    struct MockEventRepository {
92        events: Vec<Event>,
93    }
94
95    impl MockEventRepository {
96        fn with_events(events: Vec<Event>) -> Self {
97            Self { events }
98        }
99    }
100
101    #[async_trait]
102    impl EventRepository for MockEventRepository {
103        async fn save(&self, _event: &Event) -> Result<()> {
104            unimplemented!()
105        }
106
107        async fn save_batch(&self, _events: &[Event]) -> Result<()> {
108            unimplemented!()
109        }
110
111        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
112            Ok(self.events.iter().find(|e| e.id() == id).cloned())
113        }
114
115        async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
116            Ok(self
117                .events
118                .iter()
119                .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
120                .cloned()
121                .collect())
122        }
123
124        async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
125            Ok(self
126                .events
127                .iter()
128                .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
129                .cloned()
130                .collect())
131        }
132
133        async fn find_by_time_range(
134            &self,
135            tenant_id: &str,
136            start: chrono::DateTime<Utc>,
137            end: chrono::DateTime<Utc>,
138        ) -> Result<Vec<Event>> {
139            Ok(self
140                .events
141                .iter()
142                .filter(|e| {
143                    e.tenant_id_str() == tenant_id && e.occurred_between(start, end)
144                })
145                .cloned()
146                .collect())
147        }
148
149        async fn find_by_entity_as_of(
150            &self,
151            entity_id: &str,
152            tenant_id: &str,
153            as_of: chrono::DateTime<Utc>,
154        ) -> Result<Vec<Event>> {
155            Ok(self
156                .events
157                .iter()
158                .filter(|e| {
159                    e.entity_id_str() == entity_id
160                        && e.tenant_id_str() == tenant_id
161                        && e.occurred_before(as_of)
162                })
163                .cloned()
164                .collect())
165        }
166
167        async fn count(&self, tenant_id: &str) -> Result<usize> {
168            Ok(self
169                .events
170                .iter()
171                .filter(|e| e.tenant_id_str() == tenant_id)
172                .count())
173        }
174
175        async fn health_check(&self) -> Result<()> {
176            Ok(())
177        }
178    }
179
180    fn create_test_events() -> Vec<Event> {
181        vec![
182            Event::from_strings(
183                "user.created".to_string(),
184                "user-1".to_string(),
185                "tenant-1".to_string(),
186                json!({"name": "Alice"}),
187                None,
188            ).unwrap(),
189            Event::from_strings(
190                "user.created".to_string(),
191                "user-2".to_string(),
192                "tenant-1".to_string(),
193                json!({"name": "Bob"}),
194                None,
195            ).unwrap(),
196            Event::from_strings(
197                "order.placed".to_string(),
198                "order-1".to_string(),
199                "tenant-1".to_string(),
200                json!({"amount": 100}),
201                None,
202            ).unwrap(),
203        ]
204    }
205
206    #[tokio::test]
207    async fn test_query_by_entity() {
208        let events = create_test_events();
209        let entity_id = events[0].entity_id().to_string();
210        let repo = Arc::new(MockEventRepository::with_events(events));
211        let use_case = QueryEventsUseCase::new(repo);
212
213        let request = QueryEventsRequest {
214            entity_id: Some(entity_id),
215            event_type: None,
216            tenant_id: Some("tenant-1".to_string()),
217            as_of: None,
218            since: None,
219            until: None,
220            limit: None,
221        };
222
223        let response = use_case.execute(request).await;
224        assert!(response.is_ok());
225
226        let response = response.unwrap();
227        assert_eq!(response.count, 1);
228    }
229
230    #[tokio::test]
231    async fn test_query_by_type() {
232        let events = create_test_events();
233        let repo = Arc::new(MockEventRepository::with_events(events));
234        let use_case = QueryEventsUseCase::new(repo);
235
236        let request = QueryEventsRequest {
237            entity_id: None,
238            event_type: Some("user.created".to_string()),
239            tenant_id: Some("tenant-1".to_string()),
240            as_of: None,
241            since: None,
242            until: None,
243            limit: None,
244        };
245
246        let response = use_case.execute(request).await;
247        assert!(response.is_ok());
248
249        let response = response.unwrap();
250        assert_eq!(response.count, 2);
251    }
252
253    #[tokio::test]
254    async fn test_query_with_limit() {
255        let events = create_test_events();
256        let repo = Arc::new(MockEventRepository::with_events(events));
257        let use_case = QueryEventsUseCase::new(repo);
258
259        let request = QueryEventsRequest {
260            entity_id: None,
261            event_type: Some("user.created".to_string()),
262            tenant_id: Some("tenant-1".to_string()),
263            as_of: None,
264            since: None,
265            until: None,
266            limit: Some(1),
267        };
268
269        let response = use_case.execute(request).await;
270        assert!(response.is_ok());
271
272        let response = response.unwrap();
273        assert_eq!(response.count, 1);
274    }
275
276    #[tokio::test]
277    async fn test_query_requires_filter() {
278        let events = create_test_events();
279        let repo = Arc::new(MockEventRepository::with_events(events));
280        let use_case = QueryEventsUseCase::new(repo);
281
282        let request = QueryEventsRequest {
283            entity_id: None,
284            event_type: None,
285            tenant_id: Some("tenant-1".to_string()),
286            as_of: None,
287            since: None,
288            until: None,
289            limit: None,
290        };
291
292        let response = use_case.execute(request).await;
293        assert!(response.is_err());
294    }
295}