allsource_core/application/use_cases/
query_events.rs1use crate::{
2 application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse},
3 domain::repositories::EventRepository,
4 error::Result,
5};
6use std::sync::Arc;
7
8pub struct QueryEventsUseCase {
19 repository: Arc<dyn EventRepository>,
20}
21
22impl QueryEventsUseCase {
23 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24 Self { repository }
25 }
26
27 pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
28 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31 let mut events = if let Some(entity_id) = request.entity_id {
33 if let Some(as_of) = request.as_of {
35 self.repository
37 .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
38 .await?
39 } else {
40 self.repository
41 .find_by_entity(&entity_id, &tenant_id)
42 .await?
43 }
44 } else if let Some(event_type) = request.event_type {
45 self.repository
47 .find_by_type(&event_type, &tenant_id)
48 .await?
49 } else if let (Some(since), Some(until)) = (request.since, request.until) {
50 self.repository
52 .find_by_time_range(&tenant_id, since, until)
53 .await?
54 } else {
55 return Err(crate::error::Error::InvalidInput(
58 "Query requires at least one filter (entity_id, event_type, or time range)"
59 .to_string(),
60 ));
61 };
62
63 if let Some(since) = request.since {
65 events.retain(|e| e.occurred_after(since));
66 }
67 if let Some(until) = request.until {
68 events.retain(|e| e.occurred_before(until));
69 }
70
71 let total_count = events.len();
73
74 if let Some(limit) = request.limit {
76 events.truncate(limit);
77 }
78
79 let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
81 let count = event_dtos.len();
82 let has_more = count < total_count;
83
84 Ok(QueryEventsResponse {
85 events: event_dtos,
86 count,
87 total_count,
88 has_more,
89 entity_version: None,
90 })
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::domain::entities::Event;
98 use async_trait::async_trait;
99 use chrono::Utc;
100 use serde_json::json;
101 use uuid::Uuid;
102
103 struct MockEventRepository {
105 events: Vec<Event>,
106 }
107
108 impl MockEventRepository {
109 fn with_events(events: Vec<Event>) -> Self {
110 Self { events }
111 }
112 }
113
114 #[async_trait]
115 impl EventRepository for MockEventRepository {
116 async fn save(&self, _event: &Event) -> Result<()> {
117 Ok(())
118 }
119
120 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
121 Ok(())
122 }
123
124 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
125 Ok(self.events.iter().find(|e| e.id() == id).cloned())
126 }
127
128 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
129 Ok(self
130 .events
131 .iter()
132 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
133 .cloned()
134 .collect())
135 }
136
137 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
138 Ok(self
139 .events
140 .iter()
141 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
142 .cloned()
143 .collect())
144 }
145
146 async fn find_by_time_range(
147 &self,
148 tenant_id: &str,
149 start: chrono::DateTime<Utc>,
150 end: chrono::DateTime<Utc>,
151 ) -> Result<Vec<Event>> {
152 Ok(self
153 .events
154 .iter()
155 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
156 .cloned()
157 .collect())
158 }
159
160 async fn find_by_entity_as_of(
161 &self,
162 entity_id: &str,
163 tenant_id: &str,
164 as_of: chrono::DateTime<Utc>,
165 ) -> Result<Vec<Event>> {
166 Ok(self
167 .events
168 .iter()
169 .filter(|e| {
170 e.entity_id_str() == entity_id
171 && e.tenant_id_str() == tenant_id
172 && e.occurred_before(as_of)
173 })
174 .cloned()
175 .collect())
176 }
177
178 async fn count(&self, tenant_id: &str) -> Result<usize> {
179 Ok(self
180 .events
181 .iter()
182 .filter(|e| e.tenant_id_str() == tenant_id)
183 .count())
184 }
185
186 async fn health_check(&self) -> Result<()> {
187 Ok(())
188 }
189 }
190
191 fn create_test_events() -> Vec<Event> {
192 vec![
193 Event::from_strings(
194 "user.created".to_string(),
195 "user-1".to_string(),
196 "tenant-1".to_string(),
197 json!({"name": "Alice"}),
198 None,
199 )
200 .unwrap(),
201 Event::from_strings(
202 "user.created".to_string(),
203 "user-2".to_string(),
204 "tenant-1".to_string(),
205 json!({"name": "Bob"}),
206 None,
207 )
208 .unwrap(),
209 Event::from_strings(
210 "order.placed".to_string(),
211 "order-1".to_string(),
212 "tenant-1".to_string(),
213 json!({"amount": 100}),
214 None,
215 )
216 .unwrap(),
217 ]
218 }
219
220 #[tokio::test]
221 async fn test_query_by_entity() {
222 let events = create_test_events();
223 let entity_id = events[0].entity_id().to_string();
224 let repo = Arc::new(MockEventRepository::with_events(events));
225 let use_case = QueryEventsUseCase::new(repo);
226
227 let request = QueryEventsRequest {
228 entity_id: Some(entity_id),
229 event_type: None,
230 tenant_id: Some("tenant-1".to_string()),
231 as_of: None,
232 since: None,
233 until: None,
234 limit: None,
235 event_type_prefix: None,
236 payload_filter: None,
237 };
238
239 let response = use_case.execute(request).await;
240 assert!(response.is_ok());
241
242 let response = response.unwrap();
243 assert_eq!(response.count, 1);
244 }
245
246 #[tokio::test]
247 async fn test_query_by_type() {
248 let events = create_test_events();
249 let repo = Arc::new(MockEventRepository::with_events(events));
250 let use_case = QueryEventsUseCase::new(repo);
251
252 let request = QueryEventsRequest {
253 entity_id: None,
254 event_type: Some("user.created".to_string()),
255 tenant_id: Some("tenant-1".to_string()),
256 as_of: None,
257 since: None,
258 until: None,
259 limit: None,
260 event_type_prefix: None,
261 payload_filter: None,
262 };
263
264 let response = use_case.execute(request).await;
265 assert!(response.is_ok());
266
267 let response = response.unwrap();
268 assert_eq!(response.count, 2);
269 }
270
271 #[tokio::test]
272 async fn test_query_with_limit() {
273 let events = create_test_events();
274 let repo = Arc::new(MockEventRepository::with_events(events));
275 let use_case = QueryEventsUseCase::new(repo);
276
277 let request = QueryEventsRequest {
278 entity_id: None,
279 event_type: Some("user.created".to_string()),
280 tenant_id: Some("tenant-1".to_string()),
281 as_of: None,
282 since: None,
283 until: None,
284 limit: Some(1),
285 event_type_prefix: None,
286 payload_filter: None,
287 };
288
289 let response = use_case.execute(request).await;
290 assert!(response.is_ok());
291
292 let response = response.unwrap();
293 assert_eq!(response.count, 1);
294 }
295
296 #[tokio::test]
297 async fn test_query_requires_filter() {
298 let events = create_test_events();
299 let repo = Arc::new(MockEventRepository::with_events(events));
300 let use_case = QueryEventsUseCase::new(repo);
301
302 let request = QueryEventsRequest {
303 entity_id: None,
304 event_type: None,
305 tenant_id: Some("tenant-1".to_string()),
306 as_of: None,
307 since: None,
308 until: None,
309 limit: None,
310 event_type_prefix: None,
311 payload_filter: None,
312 };
313
314 let response = use_case.execute(request).await;
315 assert!(response.is_err());
316 }
317}