Skip to main content

allsource_core/application/use_cases/
query_events.rs

1use crate::{
2    application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse},
3    domain::repositories::EventRepository,
4    error::Result,
5};
6use std::sync::Arc;
7
8/// Use Case: Query Events
9///
10/// This use case handles querying events from the event store with various filters.
11///
12/// Responsibilities:
13/// - Validate query parameters
14/// - Determine query strategy (by entity, by type, by time range, etc.)
15/// - Execute query via repository
16/// - Transform domain events to DTOs
17/// - Apply limits and pagination
18pub struct QueryEventsUseCase {
19    repository: Arc<dyn EventRepository>,
20}
21
22impl QueryEventsUseCase {
23    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24        Self { repository }
25    }
26
27    pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
28        // Determine tenant_id (default to "default" if not provided)
29        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31        // Determine query strategy based on filters
32        let mut events = if let Some(entity_id) = request.entity_id {
33            // Query by entity (most specific)
34            if let Some(as_of) = request.as_of {
35                // Time-travel query
36                self.repository
37                    .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
38                    .await?
39            } else {
40                self.repository
41                    .find_by_entity(&entity_id, &tenant_id)
42                    .await?
43            }
44        } else if let Some(event_type) = request.event_type {
45            // Query by type
46            self.repository
47                .find_by_type(&event_type, &tenant_id)
48                .await?
49        } else if let (Some(since), Some(until)) = (request.since, request.until) {
50            // Query by time range
51            self.repository
52                .find_by_time_range(&tenant_id, since, until)
53                .await?
54        } else {
55            // No specific filter - this could be expensive!
56            // In production, you might want to require at least one filter
57            return Err(crate::error::Error::InvalidInput(
58                "Query requires at least one filter (entity_id, event_type, or time range)"
59                    .to_string(),
60            ));
61        };
62
63        // Apply time filters if provided (for non-time-range queries)
64        if let Some(since) = request.since {
65            events.retain(|e| e.occurred_after(since));
66        }
67        if let Some(until) = request.until {
68            events.retain(|e| e.occurred_before(until));
69        }
70
71        // Capture total before applying limit
72        let total_count = events.len();
73
74        // Apply limit
75        if let Some(limit) = request.limit {
76            events.truncate(limit);
77        }
78
79        // Convert to DTOs
80        let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
81        let count = event_dtos.len();
82        let has_more = count < total_count;
83
84        Ok(QueryEventsResponse {
85            events: event_dtos,
86            count,
87            total_count,
88            has_more,
89            entity_version: None,
90        })
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::domain::entities::Event;
98    use async_trait::async_trait;
99    use chrono::Utc;
100    use serde_json::json;
101    use uuid::Uuid;
102
103    // Mock repository for testing
104    struct MockEventRepository {
105        events: Vec<Event>,
106    }
107
108    impl MockEventRepository {
109        fn with_events(events: Vec<Event>) -> Self {
110            Self { events }
111        }
112    }
113
114    #[async_trait]
115    impl EventRepository for MockEventRepository {
116        async fn save(&self, _event: &Event) -> Result<()> {
117            Ok(())
118        }
119
120        async fn save_batch(&self, _events: &[Event]) -> Result<()> {
121            Ok(())
122        }
123
124        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
125            Ok(self.events.iter().find(|e| e.id() == id).cloned())
126        }
127
128        async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
129            Ok(self
130                .events
131                .iter()
132                .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
133                .cloned()
134                .collect())
135        }
136
137        async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
138            Ok(self
139                .events
140                .iter()
141                .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
142                .cloned()
143                .collect())
144        }
145
146        async fn find_by_time_range(
147            &self,
148            tenant_id: &str,
149            start: chrono::DateTime<Utc>,
150            end: chrono::DateTime<Utc>,
151        ) -> Result<Vec<Event>> {
152            Ok(self
153                .events
154                .iter()
155                .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
156                .cloned()
157                .collect())
158        }
159
160        async fn find_by_entity_as_of(
161            &self,
162            entity_id: &str,
163            tenant_id: &str,
164            as_of: chrono::DateTime<Utc>,
165        ) -> Result<Vec<Event>> {
166            Ok(self
167                .events
168                .iter()
169                .filter(|e| {
170                    e.entity_id_str() == entity_id
171                        && e.tenant_id_str() == tenant_id
172                        && e.occurred_before(as_of)
173                })
174                .cloned()
175                .collect())
176        }
177
178        async fn count(&self, tenant_id: &str) -> Result<usize> {
179            Ok(self
180                .events
181                .iter()
182                .filter(|e| e.tenant_id_str() == tenant_id)
183                .count())
184        }
185
186        async fn health_check(&self) -> Result<()> {
187            Ok(())
188        }
189    }
190
191    fn create_test_events() -> Vec<Event> {
192        vec![
193            Event::from_strings(
194                "user.created".to_string(),
195                "user-1".to_string(),
196                "tenant-1".to_string(),
197                json!({"name": "Alice"}),
198                None,
199            )
200            .unwrap(),
201            Event::from_strings(
202                "user.created".to_string(),
203                "user-2".to_string(),
204                "tenant-1".to_string(),
205                json!({"name": "Bob"}),
206                None,
207            )
208            .unwrap(),
209            Event::from_strings(
210                "order.placed".to_string(),
211                "order-1".to_string(),
212                "tenant-1".to_string(),
213                json!({"amount": 100}),
214                None,
215            )
216            .unwrap(),
217        ]
218    }
219
220    #[tokio::test]
221    async fn test_query_by_entity() {
222        let events = create_test_events();
223        let entity_id = events[0].entity_id().to_string();
224        let repo = Arc::new(MockEventRepository::with_events(events));
225        let use_case = QueryEventsUseCase::new(repo);
226
227        let request = QueryEventsRequest {
228            entity_id: Some(entity_id),
229            event_type: None,
230            tenant_id: Some("tenant-1".to_string()),
231            as_of: None,
232            since: None,
233            until: None,
234            limit: None,
235            event_type_prefix: None,
236            payload_filter: None,
237        };
238
239        let response = use_case.execute(request).await;
240        assert!(response.is_ok());
241
242        let response = response.unwrap();
243        assert_eq!(response.count, 1);
244    }
245
246    #[tokio::test]
247    async fn test_query_by_type() {
248        let events = create_test_events();
249        let repo = Arc::new(MockEventRepository::with_events(events));
250        let use_case = QueryEventsUseCase::new(repo);
251
252        let request = QueryEventsRequest {
253            entity_id: None,
254            event_type: Some("user.created".to_string()),
255            tenant_id: Some("tenant-1".to_string()),
256            as_of: None,
257            since: None,
258            until: None,
259            limit: None,
260            event_type_prefix: None,
261            payload_filter: None,
262        };
263
264        let response = use_case.execute(request).await;
265        assert!(response.is_ok());
266
267        let response = response.unwrap();
268        assert_eq!(response.count, 2);
269    }
270
271    #[tokio::test]
272    async fn test_query_with_limit() {
273        let events = create_test_events();
274        let repo = Arc::new(MockEventRepository::with_events(events));
275        let use_case = QueryEventsUseCase::new(repo);
276
277        let request = QueryEventsRequest {
278            entity_id: None,
279            event_type: Some("user.created".to_string()),
280            tenant_id: Some("tenant-1".to_string()),
281            as_of: None,
282            since: None,
283            until: None,
284            limit: Some(1),
285            event_type_prefix: None,
286            payload_filter: None,
287        };
288
289        let response = use_case.execute(request).await;
290        assert!(response.is_ok());
291
292        let response = response.unwrap();
293        assert_eq!(response.count, 1);
294    }
295
296    #[tokio::test]
297    async fn test_query_requires_filter() {
298        let events = create_test_events();
299        let repo = Arc::new(MockEventRepository::with_events(events));
300        let use_case = QueryEventsUseCase::new(repo);
301
302        let request = QueryEventsRequest {
303            entity_id: None,
304            event_type: None,
305            tenant_id: Some("tenant-1".to_string()),
306            as_of: None,
307            since: None,
308            until: None,
309            limit: None,
310            event_type_prefix: None,
311            payload_filter: None,
312        };
313
314        let response = use_case.execute(request).await;
315        assert!(response.is_err());
316    }
317}