allsource_core/application/use_cases/
ingest_event.rs1use crate::{
2 application::dto::{IngestEventRequest, IngestEventResponse},
3 domain::{entities::Event, repositories::EventRepository},
4 error::Result,
5};
6use std::sync::Arc;
7
8pub struct IngestEventUseCase {
19 repository: Arc<dyn EventRepository>,
20}
21
22impl IngestEventUseCase {
23 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24 Self { repository }
25 }
26
27 pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
28 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31 let event = Event::from_strings(
32 request.event_type,
33 request.entity_id,
34 tenant_id,
35 request.payload,
36 request.metadata,
37 )?;
38
39 self.repository.save(&event).await?;
41
42 Ok(IngestEventResponse::from_event(&event))
44 }
45}
46
47pub struct IngestEventsBatchUseCase {
51 repository: Arc<dyn EventRepository>,
52}
53
54impl IngestEventsBatchUseCase {
55 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
56 Self { repository }
57 }
58
59 pub async fn execute(
60 &self,
61 requests: Vec<IngestEventRequest>,
62 ) -> Result<Vec<IngestEventResponse>> {
63 let mut events = Vec::with_capacity(requests.len());
65 let mut responses = Vec::with_capacity(requests.len());
66
67 for request in requests {
68 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
69
70 let event = Event::from_strings(
71 request.event_type,
72 request.entity_id,
73 tenant_id,
74 request.payload,
75 request.metadata,
76 )?;
77
78 responses.push(IngestEventResponse::from_event(&event));
79 events.push(event);
80 }
81
82 self.repository.save_batch(&events).await?;
84
85 Ok(responses)
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use async_trait::async_trait;
93 use chrono::Utc;
94 use serde_json::json;
95 use uuid::Uuid;
96
97 struct MockEventRepository {
99 events: std::sync::Mutex<Vec<Event>>,
100 }
101
102 impl MockEventRepository {
103 fn new() -> Self {
104 Self {
105 events: std::sync::Mutex::new(Vec::new()),
106 }
107 }
108 }
109
110 #[async_trait]
111 impl EventRepository for MockEventRepository {
112 async fn save(&self, event: &Event) -> Result<()> {
113 let mut events = self.events.lock().unwrap();
114 events.push(Event::reconstruct_from_strings(
115 event.id(),
116 event.event_type_str().to_string(),
117 event.entity_id_str().to_string(),
118 event.tenant_id_str().to_string(),
119 event.payload().clone(),
120 event.timestamp(),
121 event.metadata().cloned(),
122 event.version(),
123 ));
124 Ok(())
125 }
126
127 async fn save_batch(&self, events: &[Event]) -> Result<()> {
128 for event in events {
129 self.save(event).await?;
130 }
131 Ok(())
132 }
133
134 async fn find_by_id(&self, _id: Uuid) -> Result<Option<Event>> {
135 unimplemented!()
136 }
137
138 async fn find_by_entity(&self, _entity_id: &str, _tenant_id: &str) -> Result<Vec<Event>> {
139 unimplemented!()
140 }
141
142 async fn find_by_type(&self, _event_type: &str, _tenant_id: &str) -> Result<Vec<Event>> {
143 unimplemented!()
144 }
145
146 async fn find_by_time_range(
147 &self,
148 _tenant_id: &str,
149 _start: chrono::DateTime<Utc>,
150 _end: chrono::DateTime<Utc>,
151 ) -> Result<Vec<Event>> {
152 unimplemented!()
153 }
154
155 async fn find_by_entity_as_of(
156 &self,
157 _entity_id: &str,
158 _tenant_id: &str,
159 _as_of: chrono::DateTime<Utc>,
160 ) -> Result<Vec<Event>> {
161 unimplemented!()
162 }
163
164 async fn count(&self, _tenant_id: &str) -> Result<usize> {
165 unimplemented!()
166 }
167
168 async fn health_check(&self) -> Result<()> {
169 Ok(())
170 }
171 }
172
173 #[tokio::test]
174 async fn test_ingest_event_use_case() {
175 let repo = Arc::new(MockEventRepository::new());
176 let use_case = IngestEventUseCase::new(repo.clone());
177
178 let request = IngestEventRequest {
179 event_type: "user.created".to_string(),
180 entity_id: "user-123".to_string(),
181 tenant_id: Some("tenant-1".to_string()),
182 payload: json!({"name": "Alice"}),
183 metadata: None,
184 };
185
186 let response = use_case.execute(request).await;
187 assert!(response.is_ok());
188
189 let response = response.unwrap();
190 assert_eq!(repo.events.lock().unwrap().len(), 1);
191 }
192
193 #[tokio::test]
194 async fn test_ingest_event_with_default_tenant() {
195 let repo = Arc::new(MockEventRepository::new());
196 let use_case = IngestEventUseCase::new(repo.clone());
197
198 let request = IngestEventRequest {
199 event_type: "order.placed".to_string(),
200 entity_id: "order-456".to_string(),
201 tenant_id: None, payload: json!({"amount": 100}),
203 metadata: None,
204 };
205
206 let response = use_case.execute(request).await;
207 assert!(response.is_ok());
208
209 let events = repo.events.lock().unwrap();
210 assert_eq!(events.len(), 1);
211 assert_eq!(events[0].tenant_id_str(), "default");
212 }
213
214 #[tokio::test]
215 async fn test_batch_ingest() {
216 let repo = Arc::new(MockEventRepository::new());
217 let use_case = IngestEventsBatchUseCase::new(repo.clone());
218
219 let requests = vec![
220 IngestEventRequest {
221 event_type: "event.1".to_string(),
222 entity_id: "e1".to_string(),
223 tenant_id: Some("t1".to_string()),
224 payload: json!({}),
225 metadata: None,
226 },
227 IngestEventRequest {
228 event_type: "event.2".to_string(),
229 entity_id: "e2".to_string(),
230 tenant_id: Some("t1".to_string()),
231 payload: json!({}),
232 metadata: None,
233 },
234 ];
235
236 let responses = use_case.execute(requests).await;
237 assert!(responses.is_ok());
238 assert_eq!(responses.unwrap().len(), 2);
239 assert_eq!(repo.events.lock().unwrap().len(), 2);
240 }
241}