Skip to main content

allsource_core/application/use_cases/
query_events.rs

1use crate::{
2    application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse},
3    domain::repositories::EventRepository,
4    error::Result,
5};
6use std::sync::Arc;
7
8/// Use Case: Query Events
9///
10/// This use case handles querying events from the event store with various filters.
11///
12/// Responsibilities:
13/// - Validate query parameters
14/// - Determine query strategy (by entity, by type, by time range, etc.)
15/// - Execute query via repository
16/// - Transform domain events to DTOs
17/// - Apply limits and pagination
18pub struct QueryEventsUseCase {
19    repository: Arc<dyn EventRepository>,
20}
21
22impl QueryEventsUseCase {
23    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24        Self { repository }
25    }
26
27    pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
28        // Determine tenant_id (default to "default" if not provided)
29        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31        // Determine query strategy based on filters
32        let mut events = if let Some(entity_id) = request.entity_id {
33            // Query by entity (most specific)
34            if let Some(as_of) = request.as_of {
35                // Time-travel query
36                self.repository
37                    .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
38                    .await?
39            } else {
40                self.repository
41                    .find_by_entity(&entity_id, &tenant_id)
42                    .await?
43            }
44        } else if let Some(event_type) = request.event_type {
45            // Query by type
46            self.repository
47                .find_by_type(&event_type, &tenant_id)
48                .await?
49        } else if let (Some(since), Some(until)) = (request.since, request.until) {
50            // Query by time range
51            self.repository
52                .find_by_time_range(&tenant_id, since, until)
53                .await?
54        } else {
55            // No specific filter - this could be expensive!
56            // In production, you might want to require at least one filter
57            return Err(crate::error::Error::InvalidInput(
58                "Query requires at least one filter (entity_id, event_type, or time range)"
59                    .to_string(),
60            ));
61        };
62
63        // Apply time filters if provided (for non-time-range queries)
64        if let Some(since) = request.since {
65            events.retain(|e| e.occurred_after(since));
66        }
67        if let Some(until) = request.until {
68            events.retain(|e| e.occurred_before(until));
69        }
70
71        // Capture total before applying limit
72        let total_count = events.len();
73
74        // Apply limit
75        if let Some(limit) = request.limit {
76            events.truncate(limit);
77        }
78
79        // Convert to DTOs
80        let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
81        let count = event_dtos.len();
82        let has_more = count < total_count;
83
84        Ok(QueryEventsResponse {
85            events: event_dtos,
86            count,
87            total_count,
88            has_more,
89        })
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96    use crate::domain::entities::Event;
97    use async_trait::async_trait;
98    use chrono::Utc;
99    use serde_json::json;
100    use uuid::Uuid;
101
102    // Mock repository for testing
103    struct MockEventRepository {
104        events: Vec<Event>,
105    }
106
107    impl MockEventRepository {
108        fn with_events(events: Vec<Event>) -> Self {
109            Self { events }
110        }
111    }
112
113    #[async_trait]
114    impl EventRepository for MockEventRepository {
115        async fn save(&self, _event: &Event) -> Result<()> {
116            Ok(())
117        }
118
119        async fn save_batch(&self, _events: &[Event]) -> Result<()> {
120            Ok(())
121        }
122
123        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
124            Ok(self.events.iter().find(|e| e.id() == id).cloned())
125        }
126
127        async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
128            Ok(self
129                .events
130                .iter()
131                .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
132                .cloned()
133                .collect())
134        }
135
136        async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
137            Ok(self
138                .events
139                .iter()
140                .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
141                .cloned()
142                .collect())
143        }
144
145        async fn find_by_time_range(
146            &self,
147            tenant_id: &str,
148            start: chrono::DateTime<Utc>,
149            end: chrono::DateTime<Utc>,
150        ) -> Result<Vec<Event>> {
151            Ok(self
152                .events
153                .iter()
154                .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
155                .cloned()
156                .collect())
157        }
158
159        async fn find_by_entity_as_of(
160            &self,
161            entity_id: &str,
162            tenant_id: &str,
163            as_of: chrono::DateTime<Utc>,
164        ) -> Result<Vec<Event>> {
165            Ok(self
166                .events
167                .iter()
168                .filter(|e| {
169                    e.entity_id_str() == entity_id
170                        && e.tenant_id_str() == tenant_id
171                        && e.occurred_before(as_of)
172                })
173                .cloned()
174                .collect())
175        }
176
177        async fn count(&self, tenant_id: &str) -> Result<usize> {
178            Ok(self
179                .events
180                .iter()
181                .filter(|e| e.tenant_id_str() == tenant_id)
182                .count())
183        }
184
185        async fn health_check(&self) -> Result<()> {
186            Ok(())
187        }
188    }
189
190    fn create_test_events() -> Vec<Event> {
191        vec![
192            Event::from_strings(
193                "user.created".to_string(),
194                "user-1".to_string(),
195                "tenant-1".to_string(),
196                json!({"name": "Alice"}),
197                None,
198            )
199            .unwrap(),
200            Event::from_strings(
201                "user.created".to_string(),
202                "user-2".to_string(),
203                "tenant-1".to_string(),
204                json!({"name": "Bob"}),
205                None,
206            )
207            .unwrap(),
208            Event::from_strings(
209                "order.placed".to_string(),
210                "order-1".to_string(),
211                "tenant-1".to_string(),
212                json!({"amount": 100}),
213                None,
214            )
215            .unwrap(),
216        ]
217    }
218
219    #[tokio::test]
220    async fn test_query_by_entity() {
221        let events = create_test_events();
222        let entity_id = events[0].entity_id().to_string();
223        let repo = Arc::new(MockEventRepository::with_events(events));
224        let use_case = QueryEventsUseCase::new(repo);
225
226        let request = QueryEventsRequest {
227            entity_id: Some(entity_id),
228            event_type: None,
229            tenant_id: Some("tenant-1".to_string()),
230            as_of: None,
231            since: None,
232            until: None,
233            limit: None,
234            event_type_prefix: None,
235            payload_filter: None,
236        };
237
238        let response = use_case.execute(request).await;
239        assert!(response.is_ok());
240
241        let response = response.unwrap();
242        assert_eq!(response.count, 1);
243    }
244
245    #[tokio::test]
246    async fn test_query_by_type() {
247        let events = create_test_events();
248        let repo = Arc::new(MockEventRepository::with_events(events));
249        let use_case = QueryEventsUseCase::new(repo);
250
251        let request = QueryEventsRequest {
252            entity_id: None,
253            event_type: Some("user.created".to_string()),
254            tenant_id: Some("tenant-1".to_string()),
255            as_of: None,
256            since: None,
257            until: None,
258            limit: None,
259            event_type_prefix: None,
260            payload_filter: None,
261        };
262
263        let response = use_case.execute(request).await;
264        assert!(response.is_ok());
265
266        let response = response.unwrap();
267        assert_eq!(response.count, 2);
268    }
269
270    #[tokio::test]
271    async fn test_query_with_limit() {
272        let events = create_test_events();
273        let repo = Arc::new(MockEventRepository::with_events(events));
274        let use_case = QueryEventsUseCase::new(repo);
275
276        let request = QueryEventsRequest {
277            entity_id: None,
278            event_type: Some("user.created".to_string()),
279            tenant_id: Some("tenant-1".to_string()),
280            as_of: None,
281            since: None,
282            until: None,
283            limit: Some(1),
284            event_type_prefix: None,
285            payload_filter: None,
286        };
287
288        let response = use_case.execute(request).await;
289        assert!(response.is_ok());
290
291        let response = response.unwrap();
292        assert_eq!(response.count, 1);
293    }
294
295    #[tokio::test]
296    async fn test_query_requires_filter() {
297        let events = create_test_events();
298        let repo = Arc::new(MockEventRepository::with_events(events));
299        let use_case = QueryEventsUseCase::new(repo);
300
301        let request = QueryEventsRequest {
302            entity_id: None,
303            event_type: None,
304            tenant_id: Some("tenant-1".to_string()),
305            as_of: None,
306            since: None,
307            until: None,
308            limit: None,
309            event_type_prefix: None,
310            payload_filter: None,
311        };
312
313        let response = use_case.execute(request).await;
314        assert!(response.is_err());
315    }
316}