Skip to main content

allsource_core/application/use_cases/
ingest_event.rs

1use crate::{
2    application::dto::{IngestEventRequest, IngestEventResponse},
3    domain::{entities::Event, repositories::EventRepository},
4    error::Result,
5};
6use std::sync::Arc;
7
8/// Use Case: Ingest Event
9///
10/// This use case handles the ingestion of a single event into the event store.
11/// It coordinates between the domain layer (Event entity) and the repository.
12///
13/// Responsibilities:
14/// - Validate input (DTO validation)
15/// - Create domain Event entity (with domain validation)
16/// - Persist via repository
17/// - Return response DTO
18pub struct IngestEventUseCase {
19    repository: Arc<dyn EventRepository>,
20}
21
22impl IngestEventUseCase {
23    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24        Self { repository }
25    }
26
27    pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
28        // Create domain event using from_strings (validates and converts to value objects)
29        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31        let event = Event::from_strings(
32            request.event_type,
33            request.entity_id,
34            tenant_id,
35            request.payload,
36            request.metadata,
37        )?;
38
39        // Persist via repository
40        self.repository.save(&event).await?;
41
42        // Return response
43        Ok(IngestEventResponse::from_event(&event))
44    }
45}
46
47/// Use Case: Batch Ingest Events
48///
49/// Optimized use case for ingesting multiple events at once.
50pub struct IngestEventsBatchUseCase {
51    repository: Arc<dyn EventRepository>,
52}
53
54impl IngestEventsBatchUseCase {
55    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
56        Self { repository }
57    }
58
59    pub async fn execute(
60        &self,
61        requests: Vec<IngestEventRequest>,
62    ) -> Result<Vec<IngestEventResponse>> {
63        // Create all domain events (validates and converts to value objects)
64        let mut events = Vec::with_capacity(requests.len());
65        let mut responses = Vec::with_capacity(requests.len());
66
67        for request in requests {
68            let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
69
70            let event = Event::from_strings(
71                request.event_type,
72                request.entity_id,
73                tenant_id,
74                request.payload,
75                request.metadata,
76            )?;
77
78            responses.push(IngestEventResponse::from_event(&event));
79            events.push(event);
80        }
81
82        // Batch persist
83        self.repository.save_batch(&events).await?;
84
85        Ok(responses)
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use async_trait::async_trait;
93    use chrono::Utc;
94    use serde_json::json;
95    use uuid::Uuid;
96
97    // Mock repository for testing
98    struct MockEventRepository {
99        events: std::sync::Mutex<Vec<Event>>,
100    }
101
102    impl MockEventRepository {
103        fn new() -> Self {
104            Self {
105                events: std::sync::Mutex::new(Vec::new()),
106            }
107        }
108    }
109
110    #[async_trait]
111    impl EventRepository for MockEventRepository {
112        async fn save(&self, event: &Event) -> Result<()> {
113            let mut events = self.events.lock().unwrap();
114            events.push(Event::reconstruct_from_strings(
115                event.id(),
116                event.event_type_str().to_string(),
117                event.entity_id_str().to_string(),
118                event.tenant_id_str().to_string(),
119                event.payload().clone(),
120                event.timestamp(),
121                event.metadata().cloned(),
122                event.version(),
123            ));
124            Ok(())
125        }
126
127        async fn save_batch(&self, events: &[Event]) -> Result<()> {
128            for event in events {
129                self.save(event).await?;
130            }
131            Ok(())
132        }
133
134        async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
135            let events = self.events.lock().unwrap();
136            Ok(events.iter().find(|e| e.id() == id).cloned())
137        }
138
139        async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
140            let events = self.events.lock().unwrap();
141            Ok(events
142                .iter()
143                .filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
144                .cloned()
145                .collect())
146        }
147
148        async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
149            let events = self.events.lock().unwrap();
150            Ok(events
151                .iter()
152                .filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
153                .cloned()
154                .collect())
155        }
156
157        async fn find_by_time_range(
158            &self,
159            tenant_id: &str,
160            start: chrono::DateTime<Utc>,
161            end: chrono::DateTime<Utc>,
162        ) -> Result<Vec<Event>> {
163            let events = self.events.lock().unwrap();
164            Ok(events
165                .iter()
166                .filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
167                .cloned()
168                .collect())
169        }
170
171        async fn find_by_entity_as_of(
172            &self,
173            entity_id: &str,
174            tenant_id: &str,
175            as_of: chrono::DateTime<Utc>,
176        ) -> Result<Vec<Event>> {
177            let events = self.events.lock().unwrap();
178            Ok(events
179                .iter()
180                .filter(|e| {
181                    e.entity_id_str() == entity_id
182                        && e.tenant_id_str() == tenant_id
183                        && e.occurred_before(as_of)
184                })
185                .cloned()
186                .collect())
187        }
188
189        async fn count(&self, tenant_id: &str) -> Result<usize> {
190            let events = self.events.lock().unwrap();
191            Ok(events
192                .iter()
193                .filter(|e| e.tenant_id_str() == tenant_id)
194                .count())
195        }
196
197        async fn health_check(&self) -> Result<()> {
198            Ok(())
199        }
200    }
201
202    #[tokio::test]
203    async fn test_ingest_event_use_case() {
204        let repo = Arc::new(MockEventRepository::new());
205        let use_case = IngestEventUseCase::new(repo.clone());
206
207        let request = IngestEventRequest {
208            event_type: "user.created".to_string(),
209            entity_id: "user-123".to_string(),
210            tenant_id: Some("tenant-1".to_string()),
211            payload: json!({"name": "Alice"}),
212            metadata: None,
213            expected_version: None,
214        };
215
216        let response = use_case.execute(request).await;
217        assert!(response.is_ok());
218
219        let response = response.unwrap();
220        assert_eq!(repo.events.lock().unwrap().len(), 1);
221    }
222
223    #[tokio::test]
224    async fn test_ingest_event_with_default_tenant() {
225        let repo = Arc::new(MockEventRepository::new());
226        let use_case = IngestEventUseCase::new(repo.clone());
227
228        let request = IngestEventRequest {
229            event_type: "order.placed".to_string(),
230            entity_id: "order-456".to_string(),
231            tenant_id: None, // Should default to "default"
232            payload: json!({"amount": 100}),
233            metadata: None,
234            expected_version: None,
235        };
236
237        let response = use_case.execute(request).await;
238        assert!(response.is_ok());
239
240        let events = repo.events.lock().unwrap();
241        assert_eq!(events.len(), 1);
242        assert_eq!(events[0].tenant_id_str(), "default");
243    }
244
245    #[tokio::test]
246    async fn test_batch_ingest() {
247        let repo = Arc::new(MockEventRepository::new());
248        let use_case = IngestEventsBatchUseCase::new(repo.clone());
249
250        let requests = vec![
251            IngestEventRequest {
252                event_type: "event.1".to_string(),
253                entity_id: "e1".to_string(),
254                tenant_id: Some("t1".to_string()),
255                payload: json!({}),
256                metadata: None,
257                expected_version: None,
258            },
259            IngestEventRequest {
260                event_type: "event.2".to_string(),
261                entity_id: "e2".to_string(),
262                tenant_id: Some("t1".to_string()),
263                payload: json!({}),
264                metadata: None,
265                expected_version: None,
266            },
267        ];
268
269        let responses = use_case.execute(requests).await;
270        assert!(responses.is_ok());
271        assert_eq!(responses.unwrap().len(), 2);
272        assert_eq!(repo.events.lock().unwrap().len(), 2);
273    }
274}