allsource_core/application/use_cases/
query_events.rs1use crate::{
2 application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse},
3 domain::repositories::EventRepository,
4 error::Result,
5};
6use std::sync::Arc;
7
8pub struct QueryEventsUseCase {
19 repository: Arc<dyn EventRepository>,
20}
21
22impl QueryEventsUseCase {
23 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24 Self { repository }
25 }
26
27 pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
28 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31 let mut events = if let Some(entity_id) = request.entity_id {
33 if let Some(as_of) = request.as_of {
35 self.repository
37 .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
38 .await?
39 } else {
40 self.repository
41 .find_by_entity(&entity_id, &tenant_id)
42 .await?
43 }
44 } else if let Some(event_type) = request.event_type {
45 self.repository
47 .find_by_type(&event_type, &tenant_id)
48 .await?
49 } else if let (Some(since), Some(until)) = (request.since, request.until) {
50 self.repository
52 .find_by_time_range(&tenant_id, since, until)
53 .await?
54 } else {
55 return Err(crate::error::Error::InvalidInput(
58 "Query requires at least one filter (entity_id, event_type, or time range)"
59 .to_string(),
60 ));
61 };
62
63 if let Some(since) = request.since {
65 events.retain(|e| e.occurred_after(since));
66 }
67 if let Some(until) = request.until {
68 events.retain(|e| e.occurred_before(until));
69 }
70
71 let total_count = events.len();
73
74 if let Some(limit) = request.limit {
76 events.truncate(limit);
77 }
78
79 let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
81 let count = event_dtos.len();
82 let has_more = count < total_count;
83
84 Ok(QueryEventsResponse {
85 events: event_dtos,
86 count,
87 total_count,
88 has_more,
89 })
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use crate::domain::entities::Event;
97 use async_trait::async_trait;
98 use chrono::Utc;
99 use serde_json::json;
100 use uuid::Uuid;
101
102 struct MockEventRepository {
104 events: Vec<Event>,
105 }
106
107 impl MockEventRepository {
108 fn with_events(events: Vec<Event>) -> Self {
109 Self { events }
110 }
111 }
112
113 #[async_trait]
114 impl EventRepository for MockEventRepository {
115 async fn save(&self, _event: &Event) -> Result<()> {
116 Ok(())
117 }
118
119 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
120 Ok(())
121 }
122
123 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
124 Ok(self.events.iter().find(|e| e.id() == id).cloned())
125 }
126
127 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
128 Ok(self
129 .events
130 .iter()
131 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
132 .cloned()
133 .collect())
134 }
135
136 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
137 Ok(self
138 .events
139 .iter()
140 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
141 .cloned()
142 .collect())
143 }
144
145 async fn find_by_time_range(
146 &self,
147 tenant_id: &str,
148 start: chrono::DateTime<Utc>,
149 end: chrono::DateTime<Utc>,
150 ) -> Result<Vec<Event>> {
151 Ok(self
152 .events
153 .iter()
154 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
155 .cloned()
156 .collect())
157 }
158
159 async fn find_by_entity_as_of(
160 &self,
161 entity_id: &str,
162 tenant_id: &str,
163 as_of: chrono::DateTime<Utc>,
164 ) -> Result<Vec<Event>> {
165 Ok(self
166 .events
167 .iter()
168 .filter(|e| {
169 e.entity_id_str() == entity_id
170 && e.tenant_id_str() == tenant_id
171 && e.occurred_before(as_of)
172 })
173 .cloned()
174 .collect())
175 }
176
177 async fn count(&self, tenant_id: &str) -> Result<usize> {
178 Ok(self
179 .events
180 .iter()
181 .filter(|e| e.tenant_id_str() == tenant_id)
182 .count())
183 }
184
185 async fn health_check(&self) -> Result<()> {
186 Ok(())
187 }
188 }
189
190 fn create_test_events() -> Vec<Event> {
191 vec![
192 Event::from_strings(
193 "user.created".to_string(),
194 "user-1".to_string(),
195 "tenant-1".to_string(),
196 json!({"name": "Alice"}),
197 None,
198 )
199 .unwrap(),
200 Event::from_strings(
201 "user.created".to_string(),
202 "user-2".to_string(),
203 "tenant-1".to_string(),
204 json!({"name": "Bob"}),
205 None,
206 )
207 .unwrap(),
208 Event::from_strings(
209 "order.placed".to_string(),
210 "order-1".to_string(),
211 "tenant-1".to_string(),
212 json!({"amount": 100}),
213 None,
214 )
215 .unwrap(),
216 ]
217 }
218
219 #[tokio::test]
220 async fn test_query_by_entity() {
221 let events = create_test_events();
222 let entity_id = events[0].entity_id().to_string();
223 let repo = Arc::new(MockEventRepository::with_events(events));
224 let use_case = QueryEventsUseCase::new(repo);
225
226 let request = QueryEventsRequest {
227 entity_id: Some(entity_id),
228 event_type: None,
229 tenant_id: Some("tenant-1".to_string()),
230 as_of: None,
231 since: None,
232 until: None,
233 limit: None,
234 event_type_prefix: None,
235 payload_filter: None,
236 };
237
238 let response = use_case.execute(request).await;
239 assert!(response.is_ok());
240
241 let response = response.unwrap();
242 assert_eq!(response.count, 1);
243 }
244
245 #[tokio::test]
246 async fn test_query_by_type() {
247 let events = create_test_events();
248 let repo = Arc::new(MockEventRepository::with_events(events));
249 let use_case = QueryEventsUseCase::new(repo);
250
251 let request = QueryEventsRequest {
252 entity_id: None,
253 event_type: Some("user.created".to_string()),
254 tenant_id: Some("tenant-1".to_string()),
255 as_of: None,
256 since: None,
257 until: None,
258 limit: None,
259 event_type_prefix: None,
260 payload_filter: None,
261 };
262
263 let response = use_case.execute(request).await;
264 assert!(response.is_ok());
265
266 let response = response.unwrap();
267 assert_eq!(response.count, 2);
268 }
269
270 #[tokio::test]
271 async fn test_query_with_limit() {
272 let events = create_test_events();
273 let repo = Arc::new(MockEventRepository::with_events(events));
274 let use_case = QueryEventsUseCase::new(repo);
275
276 let request = QueryEventsRequest {
277 entity_id: None,
278 event_type: Some("user.created".to_string()),
279 tenant_id: Some("tenant-1".to_string()),
280 as_of: None,
281 since: None,
282 until: None,
283 limit: Some(1),
284 event_type_prefix: None,
285 payload_filter: None,
286 };
287
288 let response = use_case.execute(request).await;
289 assert!(response.is_ok());
290
291 let response = response.unwrap();
292 assert_eq!(response.count, 1);
293 }
294
295 #[tokio::test]
296 async fn test_query_requires_filter() {
297 let events = create_test_events();
298 let repo = Arc::new(MockEventRepository::with_events(events));
299 let use_case = QueryEventsUseCase::new(repo);
300
301 let request = QueryEventsRequest {
302 entity_id: None,
303 event_type: None,
304 tenant_id: Some("tenant-1".to_string()),
305 as_of: None,
306 since: None,
307 until: None,
308 limit: None,
309 event_type_prefix: None,
310 payload_filter: None,
311 };
312
313 let response = use_case.execute(request).await;
314 assert!(response.is_err());
315 }
316}