allsource_core/application/use_cases/
ingest_event.rs1use crate::{
2 application::dto::{IngestEventRequest, IngestEventResponse},
3 domain::{entities::Event, repositories::EventRepository},
4 error::Result,
5};
6use std::sync::Arc;
7
8pub struct IngestEventUseCase {
19 repository: Arc<dyn EventRepository>,
20}
21
22impl IngestEventUseCase {
23 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24 Self { repository }
25 }
26
27 pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
28 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31 let event = Event::from_strings(
32 request.event_type,
33 request.entity_id,
34 tenant_id,
35 request.payload,
36 request.metadata,
37 )?;
38
39 self.repository.save(&event).await?;
41
42 Ok(IngestEventResponse::from_event(&event))
44 }
45}
46
47pub struct IngestEventsBatchUseCase {
51 repository: Arc<dyn EventRepository>,
52}
53
54impl IngestEventsBatchUseCase {
55 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
56 Self { repository }
57 }
58
59 pub async fn execute(
60 &self,
61 requests: Vec<IngestEventRequest>,
62 ) -> Result<Vec<IngestEventResponse>> {
63 let mut events = Vec::with_capacity(requests.len());
65 let mut responses = Vec::with_capacity(requests.len());
66
67 for request in requests {
68 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
69
70 let event = Event::from_strings(
71 request.event_type,
72 request.entity_id,
73 tenant_id,
74 request.payload,
75 request.metadata,
76 )?;
77
78 responses.push(IngestEventResponse::from_event(&event));
79 events.push(event);
80 }
81
82 self.repository.save_batch(&events).await?;
84
85 Ok(responses)
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use async_trait::async_trait;
93 use chrono::Utc;
94 use serde_json::json;
95 use uuid::Uuid;
96
97 struct MockEventRepository {
99 events: std::sync::Mutex<Vec<Event>>,
100 }
101
102 impl MockEventRepository {
103 fn new() -> Self {
104 Self {
105 events: std::sync::Mutex::new(Vec::new()),
106 }
107 }
108 }
109
110 #[async_trait]
111 impl EventRepository for MockEventRepository {
112 async fn save(&self, event: &Event) -> Result<()> {
113 let mut events = self.events.lock().unwrap();
114 events.push(Event::reconstruct_from_strings(
115 event.id(),
116 event.event_type_str().to_string(),
117 event.entity_id_str().to_string(),
118 event.tenant_id_str().to_string(),
119 event.payload().clone(),
120 event.timestamp(),
121 event.metadata().cloned(),
122 event.version(),
123 ));
124 Ok(())
125 }
126
127 async fn save_batch(&self, events: &[Event]) -> Result<()> {
128 for event in events {
129 self.save(event).await?;
130 }
131 Ok(())
132 }
133
134 async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
135 let events = self.events.lock().unwrap();
136 Ok(events.iter().find(|e| e.id() == id).cloned())
137 }
138
139 async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
140 let events = self.events.lock().unwrap();
141 Ok(events
142 .iter()
143 .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
144 .cloned()
145 .collect())
146 }
147
148 async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
149 let events = self.events.lock().unwrap();
150 Ok(events
151 .iter()
152 .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
153 .cloned()
154 .collect())
155 }
156
157 async fn find_by_time_range(
158 &self,
159 tenant_id: &str,
160 start: chrono::DateTime<Utc>,
161 end: chrono::DateTime<Utc>,
162 ) -> Result<Vec<Event>> {
163 let events = self.events.lock().unwrap();
164 Ok(events
165 .iter()
166 .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
167 .cloned()
168 .collect())
169 }
170
171 async fn find_by_entity_as_of(
172 &self,
173 entity_id: &str,
174 tenant_id: &str,
175 as_of: chrono::DateTime<Utc>,
176 ) -> Result<Vec<Event>> {
177 let events = self.events.lock().unwrap();
178 Ok(events
179 .iter()
180 .filter(|e| {
181 e.entity_id_str() == entity_id
182 && e.tenant_id_str() == tenant_id
183 && e.occurred_before(as_of)
184 })
185 .cloned()
186 .collect())
187 }
188
189 async fn count(&self, tenant_id: &str) -> Result<usize> {
190 let events = self.events.lock().unwrap();
191 Ok(events
192 .iter()
193 .filter(|e| e.tenant_id_str() == tenant_id)
194 .count())
195 }
196
197 async fn health_check(&self) -> Result<()> {
198 Ok(())
199 }
200 }
201
202 #[tokio::test]
203 async fn test_ingest_event_use_case() {
204 let repo = Arc::new(MockEventRepository::new());
205 let use_case = IngestEventUseCase::new(repo.clone());
206
207 let request = IngestEventRequest {
208 event_type: "user.created".to_string(),
209 entity_id: "user-123".to_string(),
210 tenant_id: Some("tenant-1".to_string()),
211 payload: json!({"name": "Alice"}),
212 metadata: None,
213 };
214
215 let response = use_case.execute(request).await;
216 assert!(response.is_ok());
217
218 let response = response.unwrap();
219 assert_eq!(repo.events.lock().unwrap().len(), 1);
220 }
221
222 #[tokio::test]
223 async fn test_ingest_event_with_default_tenant() {
224 let repo = Arc::new(MockEventRepository::new());
225 let use_case = IngestEventUseCase::new(repo.clone());
226
227 let request = IngestEventRequest {
228 event_type: "order.placed".to_string(),
229 entity_id: "order-456".to_string(),
230 tenant_id: None, payload: json!({"amount": 100}),
232 metadata: None,
233 };
234
235 let response = use_case.execute(request).await;
236 assert!(response.is_ok());
237
238 let events = repo.events.lock().unwrap();
239 assert_eq!(events.len(), 1);
240 assert_eq!(events[0].tenant_id_str(), "default");
241 }
242
243 #[tokio::test]
244 async fn test_batch_ingest() {
245 let repo = Arc::new(MockEventRepository::new());
246 let use_case = IngestEventsBatchUseCase::new(repo.clone());
247
248 let requests = vec![
249 IngestEventRequest {
250 event_type: "event.1".to_string(),
251 entity_id: "e1".to_string(),
252 tenant_id: Some("t1".to_string()),
253 payload: json!({}),
254 metadata: None,
255 },
256 IngestEventRequest {
257 event_type: "event.2".to_string(),
258 entity_id: "e2".to_string(),
259 tenant_id: Some("t1".to_string()),
260 payload: json!({}),
261 metadata: None,
262 },
263 ];
264
265 let responses = use_case.execute(requests).await;
266 assert!(responses.is_ok());
267 assert_eq!(responses.unwrap().len(), 2);
268 assert_eq!(repo.events.lock().unwrap().len(), 2);
269 }
270}