allsource_core/application/use_cases/
ingest_event.rs1use crate::application::dto::{IngestEventRequest, IngestEventResponse};
2use crate::domain::entities::Event;
3use crate::domain::repositories::EventRepository;
4use crate::error::Result;
5use std::sync::Arc;
6
7pub struct IngestEventUseCase {
18 repository: Arc<dyn EventRepository>,
19}
20
21impl IngestEventUseCase {
22 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
23 Self { repository }
24 }
25
26 pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
27 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
29
30 let event = Event::from_strings(
31 request.event_type,
32 request.entity_id,
33 tenant_id,
34 request.payload,
35 request.metadata,
36 )?;
37
38 self.repository.save(&event).await?;
40
41 Ok(IngestEventResponse::from_event(&event))
43 }
44}
45
46pub struct IngestEventsBatchUseCase {
50 repository: Arc<dyn EventRepository>,
51}
52
53impl IngestEventsBatchUseCase {
54 pub fn new(repository: Arc<dyn EventRepository>) -> Self {
55 Self { repository }
56 }
57
58 pub async fn execute(
59 &self,
60 requests: Vec<IngestEventRequest>,
61 ) -> Result<Vec<IngestEventResponse>> {
62 let mut events = Vec::with_capacity(requests.len());
64 let mut responses = Vec::with_capacity(requests.len());
65
66 for request in requests {
67 let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
68
69 let event = Event::from_strings(
70 request.event_type,
71 request.entity_id,
72 tenant_id,
73 request.payload,
74 request.metadata,
75 )?;
76
77 responses.push(IngestEventResponse::from_event(&event));
78 events.push(event);
79 }
80
81 self.repository.save_batch(&events).await?;
83
84 Ok(responses)
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use async_trait::async_trait;
92 use chrono::Utc;
93 use serde_json::json;
94 use uuid::Uuid;
95
96 struct MockEventRepository {
98 events: std::sync::Mutex<Vec<Event>>,
99 }
100
101 impl MockEventRepository {
102 fn new() -> Self {
103 Self {
104 events: std::sync::Mutex::new(Vec::new()),
105 }
106 }
107 }
108
109 #[async_trait]
110 impl EventRepository for MockEventRepository {
111 async fn save(&self, event: &Event) -> Result<()> {
112 let mut events = self.events.lock().unwrap();
113 events.push(Event::reconstruct_from_strings(
114 event.id(),
115 event.event_type_str().to_string(),
116 event.entity_id_str().to_string(),
117 event.tenant_id_str().to_string(),
118 event.payload().clone(),
119 event.timestamp(),
120 event.metadata().cloned(),
121 event.version(),
122 ));
123 Ok(())
124 }
125
126 async fn save_batch(&self, events: &[Event]) -> Result<()> {
127 for event in events {
128 self.save(event).await?;
129 }
130 Ok(())
131 }
132
133 async fn find_by_id(&self, _id: Uuid) -> Result<Option<Event>> {
134 unimplemented!()
135 }
136
137 async fn find_by_entity(&self, _entity_id: &str, _tenant_id: &str) -> Result<Vec<Event>> {
138 unimplemented!()
139 }
140
141 async fn find_by_type(&self, _event_type: &str, _tenant_id: &str) -> Result<Vec<Event>> {
142 unimplemented!()
143 }
144
145 async fn find_by_time_range(
146 &self,
147 _tenant_id: &str,
148 _start: chrono::DateTime<Utc>,
149 _end: chrono::DateTime<Utc>,
150 ) -> Result<Vec<Event>> {
151 unimplemented!()
152 }
153
154 async fn find_by_entity_as_of(
155 &self,
156 _entity_id: &str,
157 _tenant_id: &str,
158 _as_of: chrono::DateTime<Utc>,
159 ) -> Result<Vec<Event>> {
160 unimplemented!()
161 }
162
163 async fn count(&self, _tenant_id: &str) -> Result<usize> {
164 unimplemented!()
165 }
166
167 async fn health_check(&self) -> Result<()> {
168 Ok(())
169 }
170 }
171
172 #[tokio::test]
173 async fn test_ingest_event_use_case() {
174 let repo = Arc::new(MockEventRepository::new());
175 let use_case = IngestEventUseCase::new(repo.clone());
176
177 let request = IngestEventRequest {
178 event_type: "user.created".to_string(),
179 entity_id: "user-123".to_string(),
180 tenant_id: Some("tenant-1".to_string()),
181 payload: json!({"name": "Alice"}),
182 metadata: None,
183 };
184
185 let response = use_case.execute(request).await;
186 assert!(response.is_ok());
187
188 let response = response.unwrap();
189 assert_eq!(repo.events.lock().unwrap().len(), 1);
190 }
191
192 #[tokio::test]
193 async fn test_ingest_event_with_default_tenant() {
194 let repo = Arc::new(MockEventRepository::new());
195 let use_case = IngestEventUseCase::new(repo.clone());
196
197 let request = IngestEventRequest {
198 event_type: "order.placed".to_string(),
199 entity_id: "order-456".to_string(),
200 tenant_id: None, payload: json!({"amount": 100}),
202 metadata: None,
203 };
204
205 let response = use_case.execute(request).await;
206 assert!(response.is_ok());
207
208 let events = repo.events.lock().unwrap();
209 assert_eq!(events.len(), 1);
210 assert_eq!(events[0].tenant_id_str(), "default");
211 }
212
213 #[tokio::test]
214 async fn test_batch_ingest() {
215 let repo = Arc::new(MockEventRepository::new());
216 let use_case = IngestEventsBatchUseCase::new(repo.clone());
217
218 let requests = vec![
219 IngestEventRequest {
220 event_type: "event.1".to_string(),
221 entity_id: "e1".to_string(),
222 tenant_id: Some("t1".to_string()),
223 payload: json!({}),
224 metadata: None,
225 },
226 IngestEventRequest {
227 event_type: "event.2".to_string(),
228 entity_id: "e2".to_string(),
229 tenant_id: Some("t1".to_string()),
230 payload: json!({}),
231 metadata: None,
232 },
233 ];
234
235 let responses = use_case.execute(requests).await;
236 assert!(responses.is_ok());
237 assert_eq!(responses.unwrap().len(), 2);
238 assert_eq!(repo.events.lock().unwrap().len(), 2);
239 }
240}