use crate::{
application::dto::{IngestEventRequest, IngestEventResponse},
domain::{entities::Event, repositories::EventRepository},
error::Result,
};
use std::sync::Arc;
pub struct IngestEventUseCase {
repository: Arc<dyn EventRepository>,
}
impl IngestEventUseCase {
pub fn new(repository: Arc<dyn EventRepository>) -> Self {
Self { repository }
}
pub async fn execute(&self, request: IngestEventRequest) -> Result<IngestEventResponse> {
let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
let event = Event::from_strings(
request.event_type,
request.entity_id,
tenant_id,
request.payload,
request.metadata,
)?;
self.repository.save(&event).await?;
Ok(IngestEventResponse::from_event(&event))
}
}
pub struct IngestEventsBatchUseCase {
repository: Arc<dyn EventRepository>,
}
impl IngestEventsBatchUseCase {
pub fn new(repository: Arc<dyn EventRepository>) -> Self {
Self { repository }
}
pub async fn execute(
&self,
requests: Vec<IngestEventRequest>,
) -> Result<Vec<IngestEventResponse>> {
let mut events = Vec::with_capacity(requests.len());
let mut responses = Vec::with_capacity(requests.len());
for request in requests {
let tenant_id = request.tenant_id.unwrap_or_else(|| "default".to_string());
let event = Event::from_strings(
request.event_type,
request.entity_id,
tenant_id,
request.payload,
request.metadata,
)?;
responses.push(IngestEventResponse::from_event(&event));
events.push(event);
}
self.repository.save_batch(&events).await?;
Ok(responses)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use chrono::Utc;
use serde_json::json;
use uuid::Uuid;
struct MockEventRepository {
events: std::sync::Mutex<Vec<Event>>,
}
impl MockEventRepository {
fn new() -> Self {
Self {
events: std::sync::Mutex::new(Vec::new()),
}
}
}
#[async_trait]
impl EventRepository for MockEventRepository {
async fn save(&self, event: &Event) -> Result<()> {
let mut events = self.events.lock().unwrap();
events.push(Event::reconstruct_from_strings(
event.id(),
event.event_type_str().to_string(),
event.entity_id_str().to_string(),
event.tenant_id_str().to_string(),
event.payload().clone(),
event.timestamp(),
event.metadata().cloned(),
event.version(),
));
Ok(())
}
async fn save_batch(&self, events: &[Event]) -> Result<()> {
for event in events {
self.save(event).await?;
}
Ok(())
}
async fn find_by_id(&self, id: Uuid) -> Result<Option<Event>> {
let events = self.events.lock().unwrap();
Ok(events.iter().find(|e| e.id() == id).cloned())
}
async fn find_by_entity(&self, entity_id: &str, tenant_id: &str) -> Result<Vec<Event>> {
let events = self.events.lock().unwrap();
Ok(events
.iter()
.filter(|e| e.entity_id_str() == entity_id && e.tenant_id_str() == tenant_id)
.cloned()
.collect())
}
async fn find_by_type(&self, event_type: &str, tenant_id: &str) -> Result<Vec<Event>> {
let events = self.events.lock().unwrap();
Ok(events
.iter()
.filter(|e| e.event_type_str() == event_type && e.tenant_id_str() == tenant_id)
.cloned()
.collect())
}
async fn find_by_time_range(
&self,
tenant_id: &str,
start: chrono::DateTime<Utc>,
end: chrono::DateTime<Utc>,
) -> Result<Vec<Event>> {
let events = self.events.lock().unwrap();
Ok(events
.iter()
.filter(|e| e.tenant_id_str() == tenant_id && e.occurred_between(start, end))
.cloned()
.collect())
}
async fn find_by_entity_as_of(
&self,
entity_id: &str,
tenant_id: &str,
as_of: chrono::DateTime<Utc>,
) -> Result<Vec<Event>> {
let events = self.events.lock().unwrap();
Ok(events
.iter()
.filter(|e| {
e.entity_id_str() == entity_id
&& e.tenant_id_str() == tenant_id
&& e.occurred_before(as_of)
})
.cloned()
.collect())
}
async fn count(&self, tenant_id: &str) -> Result<usize> {
let events = self.events.lock().unwrap();
Ok(events
.iter()
.filter(|e| e.tenant_id_str() == tenant_id)
.count())
}
async fn health_check(&self) -> Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_ingest_event_use_case() {
let repo = Arc::new(MockEventRepository::new());
let use_case = IngestEventUseCase::new(repo.clone());
let request = IngestEventRequest {
event_type: "user.created".to_string(),
entity_id: "user-123".to_string(),
tenant_id: Some("tenant-1".to_string()),
payload: json!({"name": "Alice"}),
metadata: None,
expected_version: None,
};
let response = use_case.execute(request).await;
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(repo.events.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn test_ingest_event_with_default_tenant() {
let repo = Arc::new(MockEventRepository::new());
let use_case = IngestEventUseCase::new(repo.clone());
let request = IngestEventRequest {
event_type: "order.placed".to_string(),
entity_id: "order-456".to_string(),
tenant_id: None, payload: json!({"amount": 100}),
metadata: None,
expected_version: None,
};
let response = use_case.execute(request).await;
assert!(response.is_ok());
let events = repo.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].tenant_id_str(), "default");
}
#[tokio::test]
async fn test_batch_ingest() {
let repo = Arc::new(MockEventRepository::new());
let use_case = IngestEventsBatchUseCase::new(repo.clone());
let requests = vec![
IngestEventRequest {
event_type: "event.1".to_string(),
entity_id: "e1".to_string(),
tenant_id: Some("t1".to_string()),
payload: json!({}),
metadata: None,
expected_version: None,
},
IngestEventRequest {
event_type: "event.2".to_string(),
entity_id: "e2".to_string(),
tenant_id: Some("t1".to_string()),
payload: json!({}),
metadata: None,
expected_version: None,
},
];
let responses = use_case.execute(requests).await;
assert!(responses.is_ok());
assert_eq!(responses.unwrap().len(), 2);
assert_eq!(repo.events.lock().unwrap().len(), 2);
}
}