Skip to main content

allsource_core/application/use_cases/
ingest_event.rs

1use crate::{
2    application::dto::{IngestEventRequest, IngestEventResponse},
3    domain::{entities::Event, repositories::EventRepository},
4    error::Result,
5};
6use std::sync::Arc;
7
8/// Use Case: Ingest Event
9///
10/// This use case handles the ingestion of a single event into the event store.
11/// It coordinates between the domain layer (Event entity) and the repository.
12///
13/// Responsibilities:
14/// - Validate input (DTO validation)
15/// - Create domain Event entity (with domain validation)
16/// - Persist via repository
17/// - Return response DTO
18pub struct IngestEventUseCase {
19    repository: Arc<dyn EventRepository>,
20}
21
22impl IngestEventUseCase {
23    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
24        Self { repository }
25    }
26
27    pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
28        // Create domain event using from_strings (validates and converts to value objects)
29        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
30
31        let event = Event::from_strings(
32            request.event_type,
33            request.entity_id,
34            tenant_id,
35            request.payload,
36            request.metadata,
37        )?;
38
39        // Persist via repository
40        self.repository.save(&event).await?;
41
42        // Return response
43        Ok(IngestEventResponse::from_event(&event))
44    }
45}
46
47/// Use Case: Batch Ingest Events
48///
49/// Optimized use case for ingesting multiple events at once.
50pub struct IngestEventsBatchUseCase {
51    repository: Arc<dyn EventRepository>,
52}
53
54impl IngestEventsBatchUseCase {
55    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
56        Self { repository }
57    }
58
59    pub async fn execute(
60        &self,
61        requests: Vec<IngestEventRequest>,
62    ) -> Result<Vec<IngestEventResponse>> {
63        // Create all domain events (validates and converts to value objects)
64        let mut events = Vec::with_capacity(requests.len());
65        let mut responses = Vec::with_capacity(requests.len());
66
67        for request in requests {
68            let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
69
70            let event = Event::from_strings(
71                request.event_type,
72                request.entity_id,
73                tenant_id,
74                request.payload,
75                request.metadata,
76            )?;
77
78            responses.push(IngestEventResponse::from_event(&event));
79            events.push(event);
80        }
81
82        // Batch persist
83        self.repository.save_batch(&events).await?;
84
85        Ok(responses)
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use async_trait::async_trait;
93    use chrono::Utc;
94    use serde_json::json;
95    use uuid::Uuid;
96
97    // Mock repository for testing
98    struct MockEventRepository {
99        events: std::sync::Mutex<Vec<Event>>,
100    }
101
102    impl MockEventRepository {
103        fn new() -> Self {
104            Self {
105                events: std::sync::Mutex::new(Vec::new()),
106            }
107        }
108    }
109
110    #[async_trait]
111    impl EventRepository for MockEventRepository {
112        async fn save(&self, event: &Event) -> Result<()> {
113            let mut events = self.events.lock().unwrap();
114            events.push(Event::reconstruct_from_strings(
115                event.id(),
116                event.event_type_str().to_string(),
117                event.entity_id_str().to_string(),
118                event.tenant_id_str().to_string(),
119                event.payload().clone(),
120                event.timestamp(),
121                event.metadata().cloned(),
122                event.version(),
123            ));
124            Ok(())
125        }
126
127        async fn save_batch(&self, events: &[Event]) -> Result<()> {
128            for event in events {
129                self.save(event).await?;
130            }
131            Ok(())
132        }
133
134        async fn find_by_id(&self, _id: Uuid) -> Result<Option<Event>> {
135            unimplemented!()
136        }
137
138        async fn find_by_entity(&self, _entity_id: &str, _tenant_id: &str) -> Result<Vec<Event>> {
139            unimplemented!()
140        }
141
142        async fn find_by_type(&self, _event_type: &str, _tenant_id: &str) -> Result<Vec<Event>> {
143            unimplemented!()
144        }
145
146        async fn find_by_time_range(
147            &self,
148            _tenant_id: &str,
149            _start: chrono::DateTime<Utc>,
150            _end: chrono::DateTime<Utc>,
151        ) -> Result<Vec<Event>> {
152            unimplemented!()
153        }
154
155        async fn find_by_entity_as_of(
156            &self,
157            _entity_id: &str,
158            _tenant_id: &str,
159            _as_of: chrono::DateTime<Utc>,
160        ) -> Result<Vec<Event>> {
161            unimplemented!()
162        }
163
164        async fn count(&self, _tenant_id: &str) -> Result<usize> {
165            unimplemented!()
166        }
167
168        async fn health_check(&self) -> Result<()> {
169            Ok(())
170        }
171    }
172
173    #[tokio::test]
174    async fn test_ingest_event_use_case() {
175        let repo = Arc::new(MockEventRepository::new());
176        let use_case = IngestEventUseCase::new(repo.clone());
177
178        let request = IngestEventRequest {
179            event_type: "user.created".to_string(),
180            entity_id: "user-123".to_string(),
181            tenant_id: Some("tenant-1".to_string()),
182            payload: json!({"name": "Alice"}),
183            metadata: None,
184        };
185
186        let response = use_case.execute(request).await;
187        assert!(response.is_ok());
188
189        let response = response.unwrap();
190        assert_eq!(repo.events.lock().unwrap().len(), 1);
191    }
192
193    #[tokio::test]
194    async fn test_ingest_event_with_default_tenant() {
195        let repo = Arc::new(MockEventRepository::new());
196        let use_case = IngestEventUseCase::new(repo.clone());
197
198        let request = IngestEventRequest {
199            event_type: "order.placed".to_string(),
200            entity_id: "order-456".to_string(),
201            tenant_id: None, // Should default to "default"
202            payload: json!({"amount": 100}),
203            metadata: None,
204        };
205
206        let response = use_case.execute(request).await;
207        assert!(response.is_ok());
208
209        let events = repo.events.lock().unwrap();
210        assert_eq!(events.len(), 1);
211        assert_eq!(events[0].tenant_id_str(), "default");
212    }
213
214    #[tokio::test]
215    async fn test_batch_ingest() {
216        let repo = Arc::new(MockEventRepository::new());
217        let use_case = IngestEventsBatchUseCase::new(repo.clone());
218
219        let requests = vec![
220            IngestEventRequest {
221                event_type: "event.1".to_string(),
222                entity_id: "e1".to_string(),
223                tenant_id: Some("t1".to_string()),
224                payload: json!({}),
225                metadata: None,
226            },
227            IngestEventRequest {
228                event_type: "event.2".to_string(),
229                entity_id: "e2".to_string(),
230                tenant_id: Some("t1".to_string()),
231                payload: json!({}),
232                metadata: None,
233            },
234        ];
235
236        let responses = use_case.execute(requests).await;
237        assert!(responses.is_ok());
238        assert_eq!(responses.unwrap().len(), 2);
239        assert_eq!(repo.events.lock().unwrap().len(), 2);
240    }
241}