Skip to main content

allsource_core/application/use_cases/
ingest_event.rs

1use crate::application::dto::{IngestEventRequest, IngestEventResponse};
2use crate::domain::entities::Event;
3use crate::domain::repositories::EventRepository;
4use crate::error::Result;
5use std::sync::Arc;
6
7/// Use Case: Ingest Event
8///
9/// This use case handles the ingestion of a single event into the event store.
10/// It coordinates between the domain layer (Event entity) and the repository.
11///
12/// Responsibilities:
13/// - Validate input (DTO validation)
14/// - Create domain Event entity (with domain validation)
15/// - Persist via repository
16/// - Return response DTO
17pub struct IngestEventUseCase {
18    repository: Arc<dyn EventRepository>,
19}
20
21impl IngestEventUseCase {
22    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
23        Self { repository }
24    }
25
26    pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
27        // Create domain event using from_strings (validates and converts to value objects)
28        let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
29
30        let event = Event::from_strings(
31            request.event_type,
32            request.entity_id,
33            tenant_id,
34            request.payload,
35            request.metadata,
36        )?;
37
38        // Persist via repository
39        self.repository.save(&event).await?;
40
41        // Return response
42        Ok(IngestEventResponse::from_event(&event))
43    }
44}
45
46/// Use Case: Batch Ingest Events
47///
48/// Optimized use case for ingesting multiple events at once.
49pub struct IngestEventsBatchUseCase {
50    repository: Arc<dyn EventRepository>,
51}
52
53impl IngestEventsBatchUseCase {
54    pub fn new(repository: Arc<dyn EventRepository>) -> Self {
55        Self { repository }
56    }
57
58    pub async fn execute(
59        &self,
60        requests: Vec<IngestEventRequest>,
61    ) -> Result<Vec<IngestEventResponse>> {
62        // Create all domain events (validates and converts to value objects)
63        let mut events = Vec::with_capacity(requests.len());
64        let mut responses = Vec::with_capacity(requests.len());
65
66        for request in requests {
67            let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
68
69            let event = Event::from_strings(
70                request.event_type,
71                request.entity_id,
72                tenant_id,
73                request.payload,
74                request.metadata,
75            )?;
76
77            responses.push(IngestEventResponse::from_event(&event));
78            events.push(event);
79        }
80
81        // Batch persist
82        self.repository.save_batch(&events).await?;
83
84        Ok(responses)
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use async_trait::async_trait;
92    use chrono::Utc;
93    use serde_json::json;
94    use uuid::Uuid;
95
96    // Mock repository for testing
97    struct MockEventRepository {
98        events: std::sync::Mutex<Vec<Event>>,
99    }
100
101    impl MockEventRepository {
102        fn new() -> Self {
103            Self {
104                events: std::sync::Mutex::new(Vec::new()),
105            }
106        }
107    }
108
109    #[async_trait]
110    impl EventRepository for MockEventRepository {
111        async fn save(&self, event: &Event) -> Result<()> {
112            let mut events = self.events.lock().unwrap();
113            events.push(Event::reconstruct_from_strings(
114                event.id(),
115                event.event_type_str().to_string(),
116                event.entity_id_str().to_string(),
117                event.tenant_id_str().to_string(),
118                event.payload().clone(),
119                event.timestamp(),
120                event.metadata().cloned(),
121                event.version(),
122            ));
123            Ok(())
124        }
125
126        async fn save_batch(&self, events: &[Event]) -> Result<()> {
127            for event in events {
128                self.save(event).await?;
129            }
130            Ok(())
131        }
132
133        async fn find_by_id(&self, _id: Uuid) -> Result<Option<Event>> {
134            unimplemented!()
135        }
136
137        async fn find_by_entity(&self, _entity_id: &str, _tenant_id: &str) -> Result<Vec<Event>> {
138            unimplemented!()
139        }
140
141        async fn find_by_type(&self, _event_type: &str, _tenant_id: &str) -> Result<Vec<Event>> {
142            unimplemented!()
143        }
144
145        async fn find_by_time_range(
146            &self,
147            _tenant_id: &str,
148            _start: chrono::DateTime<Utc>,
149            _end: chrono::DateTime<Utc>,
150        ) -> Result<Vec<Event>> {
151            unimplemented!()
152        }
153
154        async fn find_by_entity_as_of(
155            &self,
156            _entity_id: &str,
157            _tenant_id: &str,
158            _as_of: chrono::DateTime<Utc>,
159        ) -> Result<Vec<Event>> {
160            unimplemented!()
161        }
162
163        async fn count(&self, _tenant_id: &str) -> Result<usize> {
164            unimplemented!()
165        }
166
167        async fn health_check(&self) -> Result<()> {
168            Ok(())
169        }
170    }
171
172    #[tokio::test]
173    async fn test_ingest_event_use_case() {
174        let repo = Arc::new(MockEventRepository::new());
175        let use_case = IngestEventUseCase::new(repo.clone());
176
177        let request = IngestEventRequest {
178            event_type: "user.created".to_string(),
179            entity_id: "user-123".to_string(),
180            tenant_id: Some("tenant-1".to_string()),
181            payload: json!({"name": "Alice"}),
182            metadata: None,
183        };
184
185        let response = use_case.execute(request).await;
186        assert!(response.is_ok());
187
188        let response = response.unwrap();
189        assert_eq!(repo.events.lock().unwrap().len(), 1);
190    }
191
192    #[tokio::test]
193    async fn test_ingest_event_with_default_tenant() {
194        let repo = Arc::new(MockEventRepository::new());
195        let use_case = IngestEventUseCase::new(repo.clone());
196
197        let request = IngestEventRequest {
198            event_type: "order.placed".to_string(),
199            entity_id: "order-456".to_string(),
200            tenant_id: None, // Should default to "default"
201            payload: json!({"amount": 100}),
202            metadata: None,
203        };
204
205        let response = use_case.execute(request).await;
206        assert!(response.is_ok());
207
208        let events = repo.events.lock().unwrap();
209        assert_eq!(events.len(), 1);
210        assert_eq!(events[0].tenant_id_str(), "default");
211    }
212
213    #[tokio::test]
214    async fn test_batch_ingest() {
215        let repo = Arc::new(MockEventRepository::new());
216        let use_case = IngestEventsBatchUseCase::new(repo.clone());
217
218        let requests = vec![
219            IngestEventRequest {
220                event_type: "event.1".to_string(),
221                entity_id: "e1".to_string(),
222                tenant_id: Some("t1".to_string()),
223                payload: json!({}),
224                metadata: None,
225            },
226            IngestEventRequest {
227                event_type: "event.2".to_string(),
228                entity_id: "e2".to_string(),
229                tenant_id: Some("t1".to_string()),
230                payload: json!({}),
231                metadata: None,
232            },
233        ];
234
235        let responses = use_case.execute(requests).await;
236        assert!(responses.is_ok());
237        assert_eq!(responses.unwrap().len(), 2);
238        assert_eq!(repo.events.lock().unwrap().len(), 2);
239    }
240}