allsource_core/application/use_cases/
query_events.rs1use crate::application::dto::{EventDto, QueryEventsRequest, QueryEventsResponse};
2use crate::domain::repositories::EventRepository;
3use crate::error::Result;
4use std::sync::Arc;
5
6pub struct QueryEventsUseCase {
17 repository: Arc<dyn EventRepository>,
18}
19
20impl QueryEventsUseCase {
21 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
22 Self { repository }
23 }
24
25 pub async fn execute(&self, request: QueryEventsRequest) -> Result<QueryEventsResponse> {
26 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
28
29 let mut events = if let Some(entity_id) = request.entity_id {
31 if let Some(as_of) = request.as_of {
33 self.repository
35 .find_by_entity_as_of(&entity_id, &tenant_id, as_of)
36 .await?
37 } else {
38 self.repository
39 .find_by_entity(&entity_id, &tenant_id)
40 .await?
41 }
42 } else if let Some(event_type) = request.event_type {
43 self.repository
45 .find_by_type(&event_type, &tenant_id)
46 .await?
47 } else if let (Some(since), Some(until)) = (request.since, request.until) {
48 self.repository
50 .find_by_time_range(&tenant_id, since, until)
51 .await?
52 } else {
53 return Err(crate::error::Error::InvalidInput(
56 "Query requires at least one filter (entity_id, event_type, or time range)"
57 .to_string(),
58 ));
59 };
60
61 if let Some(since) = request.since {
63 events.retain(|e| e.occurred_after(since));
64 }
65 if let Some(until) = request.until {
66 events.retain(|e| e.occurred_before(until));
67 }
68
69 if let Some(limit) = request.limit {
71 events.truncate(limit);
72 }
73
74 let event_dtos: Vec<EventDto> = events.iter().map(EventDto::from).collect();
76 let count = event_dtos.len();
77
78 Ok(QueryEventsResponse {
79 events: event_dtos,
80 count,
81 })
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::domain::entities::Event;
89 use async_trait::async_trait;
90 use chrono::{Duration, Utc};
91 use serde_json::json;
92 use uuid::Uuid;
93
94 struct MockEventRepository {
96 events: Vec<Event>,
97 }
98
99 impl MockEventRepository {
100 fn with_events(events: Vec<Event>) -> Self {
101 Self { events }
102 }
103 }
104
105 #[async_trait]
106 impl EventRepository for MockEventRepository {
107 async fn save(&self, _event: &Event) -> Result<()> {
108 unimplemented!()
109 }
110
111 async fn save_batch(&self, _events: &[Event]) -> Result<()> {
112 unimplemented!()
113 }
114
115 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
116 Ok(self.events.iter().find(|e| e.id() == id).cloned())
117 }
118
119 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
120 Ok(self
121 .events
122 .iter()
123 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
124 .cloned()
125 .collect())
126 }
127
128 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
129 Ok(self
130 .events
131 .iter()
132 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
133 .cloned()
134 .collect())
135 }
136
137 async fn find_by_time_range(
138 &self,
139 tenant_id: &str,
140 start: chrono::DateTime<Utc>,
141 end: chrono::DateTime<Utc>,
142 ) -> Result<Vec<Event>> {
143 Ok(self
144 .events
145 .iter()
146 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
147 .cloned()
148 .collect())
149 }
150
151 async fn find_by_entity_as_of(
152 &self,
153 entity_id: &str,
154 tenant_id: &str,
155 as_of: chrono::DateTime<Utc>,
156 ) -> Result<Vec<Event>> {
157 Ok(self
158 .events
159 .iter()
160 .filter(|e| {
161 e.entity_id_str() == entity_id
162 && e.tenant_id_str() == tenant_id
163 && e.occurred_before(as_of)
164 })
165 .cloned()
166 .collect())
167 }
168
169 async fn count(&self, tenant_id: &str) -> Result<usize> {
170 Ok(self
171 .events
172 .iter()
173 .filter(|e| e.tenant_id_str() == tenant_id)
174 .count())
175 }
176
177 async fn health_check(&self) -> Result<()> {
178 Ok(())
179 }
180 }
181
182 fn create_test_events() -> Vec<Event> {
183 vec![
184 Event::from_strings(
185 "user.created".to_string(),
186 "user-1".to_string(),
187 "tenant-1".to_string(),
188 json!({"name": "Alice"}),
189 None,
190 )
191 .unwrap(),
192 Event::from_strings(
193 "user.created".to_string(),
194 "user-2".to_string(),
195 "tenant-1".to_string(),
196 json!({"name": "Bob"}),
197 None,
198 )
199 .unwrap(),
200 Event::from_strings(
201 "order.placed".to_string(),
202 "order-1".to_string(),
203 "tenant-1".to_string(),
204 json!({"amount": 100}),
205 None,
206 )
207 .unwrap(),
208 ]
209 }
210
211 #[tokio::test]
212 async fn test_query_by_entity() {
213 let events = create_test_events();
214 let entity_id = events[0].entity_id().to_string();
215 let repo = Arc::new(MockEventRepository::with_events(events));
216 let use_case = QueryEventsUseCase::new(repo);
217
218 let request = QueryEventsRequest {
219 entity_id: Some(entity_id),
220 event_type: None,
221 tenant_id: Some("tenant-1".to_string()),
222 as_of: None,
223 since: None,
224 until: None,
225 limit: None,
226 };
227
228 let response = use_case.execute(request).await;
229 assert!(response.is_ok());
230
231 let response = response.unwrap();
232 assert_eq!(response.count, 1);
233 }
234
235 #[tokio::test]
236 async fn test_query_by_type() {
237 let events = create_test_events();
238 let repo = Arc::new(MockEventRepository::with_events(events));
239 let use_case = QueryEventsUseCase::new(repo);
240
241 let request = QueryEventsRequest {
242 entity_id: None,
243 event_type: Some("user.created".to_string()),
244 tenant_id: Some("tenant-1".to_string()),
245 as_of: None,
246 since: None,
247 until: None,
248 limit: None,
249 };
250
251 let response = use_case.execute(request).await;
252 assert!(response.is_ok());
253
254 let response = response.unwrap();
255 assert_eq!(response.count, 2);
256 }
257
258 #[tokio::test]
259 async fn test_query_with_limit() {
260 let events = create_test_events();
261 let repo = Arc::new(MockEventRepository::with_events(events));
262 let use_case = QueryEventsUseCase::new(repo);
263
264 let request = QueryEventsRequest {
265 entity_id: None,
266 event_type: Some("user.created".to_string()),
267 tenant_id: Some("tenant-1".to_string()),
268 as_of: None,
269 since: None,
270 until: None,
271 limit: Some(1),
272 };
273
274 let response = use_case.execute(request).await;
275 assert!(response.is_ok());
276
277 let response = response.unwrap();
278 assert_eq!(response.count, 1);
279 }
280
281 #[tokio::test]
282 async fn test_query_requires_filter() {
283 let events = create_test_events();
284 let repo = Arc::new(MockEventRepository::with_events(events));
285 let use_case = QueryEventsUseCase::new(repo);
286
287 let request = QueryEventsRequest {
288 entity_id: None,
289 event_type: None,
290 tenant_id: Some("tenant-1".to_string()),
291 as_of: None,
292 since: None,
293 until: None,
294 limit: None,
295 };
296
297 let response = use_case.execute(request).await;
298 assert!(response.is_err());
299 }
300}