allsource_core/application/use_cases/
query_events.rs1use std::sync::Arc;
2use crate::domain::repositories::EventRepository;
3use crate::application::dto::{QueryEventsRequest, QueryEventsResponse, EventDto};
4use crate::error::Result;
5
6pub struct QueryEventsUseCase {
17 repository: Arc<dyn EventRepository>,
18}
19
20impl QueryEventsUseCase {
21 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
22 Self { repository }
23 }
24
25 pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
26 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
28
29 let mut events = if let Some(entity_id) = request.entity_id {
31 if let Some(as_of) = request.as_of {
33 self.repository
35 .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
36 .await?
37 } else {
38 self.repository.find_by_entity(&entity_id, &tenant_id).await?
39 }
40 } else if let Some(event_type) = request.event_type {
41 self.repository.find_by_type(&event_type, &tenant_id).await?
43 } else if let (Some(since), Some(until)) = (request.since, request.until) {
44 self.repository
46 .find_by_time_range(&tenant_id, since, until)
47 .await?
48 } else {
49 return Err(crate::error::Error::InvalidInput(
52 "Query requires at least one filter (entity_id, event_type, or time range)"
53 .to_string(),
54 ));
55 };
56
57 if let Some(since) = request.since {
59 events.retain(|e| e.occurred_after(since));
60 }
61 if let Some(until) = request.until {
62 events.retain(|e| e.occurred_before(until));
63 }
64
65 if let Some(limit) = request.limit {
67 events.truncate(limit);
68 }
69
70 let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
72 let count = event_dtos.len();
73
74 Ok(QueryEventsResponse {
75 events: event_dtos,
76 count,
77 })
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use async_trait::async_trait;
85 use crate::domain::entities::Event;
86 use serde_json::json;
87 use chrono::{Utc, Duration};
88 use uuid::Uuid;
89
90 struct MockEventRepository {
92 events: Vec<Event>,
93 }
94
95 impl MockEventRepository {
96 fn with_events(events: Vec<Event>) -> Self {
97 Self { events }
98 }
99 }
100
101 #[async_trait]
102 impl EventRepository for MockEventRepository {
103 async fn save(&self, _event: &Event) -> Result<()> {
104 unimplemented!()
105 }
106
107 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
108 unimplemented!()
109 }
110
111 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
112 Ok(self.events.iter().find(|e| e.id() == id).cloned())
113 }
114
115 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
116 Ok(self
117 .events
118 .iter()
119 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
120 .cloned()
121 .collect())
122 }
123
124 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
125 Ok(self
126 .events
127 .iter()
128 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
129 .cloned()
130 .collect())
131 }
132
133 async fn find_by_time_range(
134 &self,
135 tenant_id: &str,
136 start: chrono::DateTime<Utc>,
137 end: chrono::DateTime<Utc>,
138 ) -> Result<Vec<Event>> {
139 Ok(self
140 .events
141 .iter()
142 .filter(|e| {
143 e.tenant_id_str() == tenant_id && e.occurred_between(start, end)
144 })
145 .cloned()
146 .collect())
147 }
148
149 async fn find_by_entity_as_of(
150 &self,
151 entity_id: &str,
152 tenant_id: &str,
153 as_of: chrono::DateTime<Utc>,
154 ) -> Result<Vec<Event>> {
155 Ok(self
156 .events
157 .iter()
158 .filter(|e| {
159 e.entity_id_str() == entity_id
160 && e.tenant_id_str() == tenant_id
161 && e.occurred_before(as_of)
162 })
163 .cloned()
164 .collect())
165 }
166
167 async fn count(&self, tenant_id: &str) -> Result<usize> {
168 Ok(self
169 .events
170 .iter()
171 .filter(|e| e.tenant_id_str() == tenant_id)
172 .count())
173 }
174
175 async fn health_check(&self) -> Result<()> {
176 Ok(())
177 }
178 }
179
180 fn create_test_events() -> Vec<Event> {
181 vec![
182 Event::from_strings(
183 "user.created".to_string(),
184 "user-1".to_string(),
185 "tenant-1".to_string(),
186 json!({"name": "Alice"}),
187 None,
188 ).unwrap(),
189 Event::from_strings(
190 "user.created".to_string(),
191 "user-2".to_string(),
192 "tenant-1".to_string(),
193 json!({"name": "Bob"}),
194 None,
195 ).unwrap(),
196 Event::from_strings(
197 "order.placed".to_string(),
198 "order-1".to_string(),
199 "tenant-1".to_string(),
200 json!({"amount": 100}),
201 None,
202 ).unwrap(),
203 ]
204 }
205
206 #[tokio::test]
207 async fn test_query_by_entity() {
208 let events = create_test_events();
209 let entity_id = events[0].entity_id().to_string();
210 let repo = Arc::new(MockEventRepository::with_events(events));
211 let use_case = QueryEventsUseCase::new(repo);
212
213 let request = QueryEventsRequest {
214 entity_id: Some(entity_id),
215 event_type: None,
216 tenant_id: Some("tenant-1".to_string()),
217 as_of: None,
218 since: None,
219 until: None,
220 limit: None,
221 };
222
223 let response = use_case.execute(request).await;
224 assert!(response.is_ok());
225
226 let response = response.unwrap();
227 assert_eq!(response.count, 1);
228 }
229
230 #[tokio::test]
231 async fn test_query_by_type() {
232 let events = create_test_events();
233 let repo = Arc::new(MockEventRepository::with_events(events));
234 let use_case = QueryEventsUseCase::new(repo);
235
236 let request = QueryEventsRequest {
237 entity_id: None,
238 event_type: Some("user.created".to_string()),
239 tenant_id: Some("tenant-1".to_string()),
240 as_of: None,
241 since: None,
242 until: None,
243 limit: None,
244 };
245
246 let response = use_case.execute(request).await;
247 assert!(response.is_ok());
248
249 let response = response.unwrap();
250 assert_eq!(response.count, 2);
251 }
252
253 #[tokio::test]
254 async fn test_query_with_limit() {
255 let events = create_test_events();
256 let repo = Arc::new(MockEventRepository::with_events(events));
257 let use_case = QueryEventsUseCase::new(repo);
258
259 let request = QueryEventsRequest {
260 entity_id: None,
261 event_type: Some("user.created".to_string()),
262 tenant_id: Some("tenant-1".to_string()),
263 as_of: None,
264 since: None,
265 until: None,
266 limit: Some(1),
267 };
268
269 let response = use_case.execute(request).await;
270 assert!(response.is_ok());
271
272 let response = response.unwrap();
273 assert_eq!(response.count, 1);
274 }
275
276 #[tokio::test]
277 async fn test_query_requires_filter() {
278 let events = create_test_events();
279 let repo = Arc::new(MockEventRepository::with_events(events));
280 let use_case = QueryEventsUseCase::new(repo);
281
282 let request = QueryEventsRequest {
283 entity_id: None,
284 event_type: None,
285 tenant_id: Some("tenant-1".to_string()),
286 as_of: None,
287 since: None,
288 until: None,
289 limit: None,
290 };
291
292 let response = use_case.execute(request).await;
293 assert!(response.is_err());
294 }
295}