allsource_core/application/use_cases/
query_events.rs1use crate::{
2 application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse},
3 domain::repositories::EventRepository,
4 error::Result,
5};
6use std::sync::Arc;
7
8pub struct QueryEventsUseCase {
19 repository: Arc<dyn EventRepository>,
20}
21
22impl QueryEventsUseCase {
23 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24 Self { repository }
25 }
26
27 pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
28 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31 let mut events = if let Some(entity_id) = request.entity_id {
33 if let Some(as_of) = request.as_of {
35 self.repository
37 .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
38 .await?
39 } else {
40 self.repository
41 .find_by_entity(&entity_id, &tenant_id)
42 .await?
43 }
44 } else if let Some(event_type) = request.event_type {
45 self.repository
47 .find_by_type(&event_type, &tenant_id)
48 .await?
49 } else if let (Some(since), Some(until)) = (request.since, request.until) {
50 self.repository
52 .find_by_time_range(&tenant_id, since, until)
53 .await?
54 } else {
55 return Err(crate::error::Error::InvalidInput(
58 "Query requires at least one filter (entity_id, event_type, or time range)"
59 .to_string(),
60 ));
61 };
62
63 if let Some(since) = request.since {
65 events.retain(|e| e.occurred_after(since));
66 }
67 if let Some(until) = request.until {
68 events.retain(|e| e.occurred_before(until));
69 }
70
71 if let Some(limit) = request.limit {
73 events.truncate(limit);
74 }
75
76 let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
78 let count = event_dtos.len();
79
80 Ok(QueryEventsResponse {
81 events: event_dtos,
82 count,
83 })
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use crate::domain::entities::Event;
91 use async_trait::async_trait;
92 use chrono::Utc;
93 use serde_json::json;
94 use uuid::Uuid;
95
96 struct MockEventRepository {
98 events: Vec<Event>,
99 }
100
101 impl MockEventRepository {
102 fn with_events(events: Vec<Event>) -> Self {
103 Self { events }
104 }
105 }
106
107 #[async_trait]
108 impl EventRepository for MockEventRepository {
109 async fn save(&self, _event: &Event) -> Result<()> {
110 Ok(())
111 }
112
113 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
114 Ok(())
115 }
116
117 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
118 Ok(self.events.iter().find(|e| e.id() == id).cloned())
119 }
120
121 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
122 Ok(self
123 .events
124 .iter()
125 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
126 .cloned()
127 .collect())
128 }
129
130 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
131 Ok(self
132 .events
133 .iter()
134 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
135 .cloned()
136 .collect())
137 }
138
139 async fn find_by_time_range(
140 &self,
141 tenant_id: &str,
142 start: chrono::DateTime<Utc>,
143 end: chrono::DateTime<Utc>,
144 ) -> Result<Vec<Event>> {
145 Ok(self
146 .events
147 .iter()
148 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
149 .cloned()
150 .collect())
151 }
152
153 async fn find_by_entity_as_of(
154 &self,
155 entity_id: &str,
156 tenant_id: &str,
157 as_of: chrono::DateTime<Utc>,
158 ) -> Result<Vec<Event>> {
159 Ok(self
160 .events
161 .iter()
162 .filter(|e| {
163 e.entity_id_str() == entity_id
164 && e.tenant_id_str() == tenant_id
165 && e.occurred_before(as_of)
166 })
167 .cloned()
168 .collect())
169 }
170
171 async fn count(&self, tenant_id: &str) -> Result<usize> {
172 Ok(self
173 .events
174 .iter()
175 .filter(|e| e.tenant_id_str() == tenant_id)
176 .count())
177 }
178
179 async fn health_check(&self) -> Result<()> {
180 Ok(())
181 }
182 }
183
184 fn create_test_events() -> Vec<Event> {
185 vec![
186 Event::from_strings(
187 "user.created".to_string(),
188 "user-1".to_string(),
189 "tenant-1".to_string(),
190 json!({"name": "Alice"}),
191 None,
192 )
193 .unwrap(),
194 Event::from_strings(
195 "user.created".to_string(),
196 "user-2".to_string(),
197 "tenant-1".to_string(),
198 json!({"name": "Bob"}),
199 None,
200 )
201 .unwrap(),
202 Event::from_strings(
203 "order.placed".to_string(),
204 "order-1".to_string(),
205 "tenant-1".to_string(),
206 json!({"amount": 100}),
207 None,
208 )
209 .unwrap(),
210 ]
211 }
212
213 #[tokio::test]
214 async fn test_query_by_entity() {
215 let events = create_test_events();
216 let entity_id = events[0].entity_id().to_string();
217 let repo = Arc::new(MockEventRepository::with_events(events));
218 let use_case = QueryEventsUseCase::new(repo);
219
220 let request = QueryEventsRequest {
221 entity_id: Some(entity_id),
222 event_type: None,
223 tenant_id: Some("tenant-1".to_string()),
224 as_of: None,
225 since: None,
226 until: None,
227 limit: None,
228 };
229
230 let response = use_case.execute(request).await;
231 assert!(response.is_ok());
232
233 let response = response.unwrap();
234 assert_eq!(response.count, 1);
235 }
236
237 #[tokio::test]
238 async fn test_query_by_type() {
239 let events = create_test_events();
240 let repo = Arc::new(MockEventRepository::with_events(events));
241 let use_case = QueryEventsUseCase::new(repo);
242
243 let request = QueryEventsRequest {
244 entity_id: None,
245 event_type: Some("user.created".to_string()),
246 tenant_id: Some("tenant-1".to_string()),
247 as_of: None,
248 since: None,
249 until: None,
250 limit: None,
251 };
252
253 let response = use_case.execute(request).await;
254 assert!(response.is_ok());
255
256 let response = response.unwrap();
257 assert_eq!(response.count, 2);
258 }
259
260 #[tokio::test]
261 async fn test_query_with_limit() {
262 let events = create_test_events();
263 let repo = Arc::new(MockEventRepository::with_events(events));
264 let use_case = QueryEventsUseCase::new(repo);
265
266 let request = QueryEventsRequest {
267 entity_id: None,
268 event_type: Some("user.created".to_string()),
269 tenant_id: Some("tenant-1".to_string()),
270 as_of: None,
271 since: None,
272 until: None,
273 limit: Some(1),
274 };
275
276 let response = use_case.execute(request).await;
277 assert!(response.is_ok());
278
279 let response = response.unwrap();
280 assert_eq!(response.count, 1);
281 }
282
283 #[tokio::test]
284 async fn test_query_requires_filter() {
285 let events = create_test_events();
286 let repo = Arc::new(MockEventRepository::with_events(events));
287 let use_case = QueryEventsUseCase::new(repo);
288
289 let request = QueryEventsRequest {
290 entity_id: None,
291 event_type: None,
292 tenant_id: Some("tenant-1".to_string()),
293 as_of: None,
294 since: None,
295 until: None,
296 limit: None,
297 };
298
299 let response = use_case.execute(request).await;
300 assert!(response.is_err());
301 }
302}