pjson_rs/application/services/
event_service.rs

1//! Event service for handling domain events with DTO serialization
2//!
3//! This service provides a clean boundary between domain events and their
4//! serialized representations, maintaining Clean Architecture principles.
5
6use crate::{
7    application::{
8        ApplicationResult,
9        dto::{DomainEventDto, FromDto, ToDto},
10    },
11    domain::{
12        events::{DomainEvent, EventStore},
13        value_objects::{SessionId, StreamId},
14    },
15};
16use chrono::{DateTime, Utc};
17use std::sync::{Arc, Mutex};
18use tracing;
19
20/// Service for managing domain events with DTO conversion
21/// Uses compile-time polymorphism for zero-cost abstractions
22pub struct EventService<S, H>
23where
24    S: EventStore,
25    H: EventHandler,
26{
27    event_store: Arc<Mutex<S>>,
28    event_handler: H,
29}
30
31/// Zero-cost event handler trait with stack allocation
32pub trait EventHandler {
33    type HandleFuture<'a>: std::future::Future<Output = ApplicationResult<()>> + Send + 'a
34    where
35        Self: 'a;
36
37    fn handle_event(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
38}
39
40/// No-op handler for when events don't need processing
41pub struct NoOpEventHandler;
42
43impl EventHandler for NoOpEventHandler {
44    type HandleFuture<'a>
45        = impl std::future::Future<Output = ApplicationResult<()>> + Send + 'a
46    where
47        Self: 'a;
48
49    fn handle_event(&self, _event: &DomainEvent) -> Self::HandleFuture<'_> {
50        async move { Ok(()) }
51    }
52}
53
54/// Logging handler with zero allocations
55pub struct LoggingEventHandler;
56
57impl EventHandler for LoggingEventHandler {
58    type HandleFuture<'a>
59        = impl std::future::Future<Output = ApplicationResult<()>> + Send + 'a
60    where
61        Self: 'a;
62
63    fn handle_event(&self, event: &DomainEvent) -> Self::HandleFuture<'_> {
64        let event_type = event.event_type();
65        let session_id = event.session_id();
66        let stream_id = event.stream_id();
67
68        async move {
69            // Stack-allocated logging without heap allocation
70            match event_type {
71                "stream_completed" => {
72                    if let Some(stream) = stream_id {
73                        tracing::info!(
74                            "Stream completed: session={}, stream={}",
75                            session_id,
76                            stream
77                        );
78                    }
79                }
80                "session_activated" => {
81                    tracing::info!("Session activated: {}", session_id);
82                }
83                _ => {
84                    tracing::debug!("Domain event processed: {}", event_type);
85                }
86            }
87            Ok(())
88        }
89    }
90}
91
92impl<S, H> EventService<S, H>
93where
94    S: EventStore,
95    H: EventHandler,
96{
97    /// Create new event service with event store and handler
98    pub fn new(event_store: Arc<Mutex<S>>, event_handler: H) -> Self {
99        Self {
100            event_store,
101            event_handler,
102        }
103    }
104
105    /// Publish domain event with zero-cost handling
106    pub async fn publish_event(&self, event: DomainEvent) -> ApplicationResult<()> {
107        // Handle event with zero-cost abstraction
108        self.event_handler.handle_event(&event).await?;
109
110        // Store event
111        self.event_store
112            .lock()
113            .map_err(|_| {
114                crate::application::ApplicationError::Logic(
115                    "Failed to acquire event store lock".to_string(),
116                )
117            })?
118            .append_events(vec![event])
119            .map_err(crate::application::ApplicationError::Logic)?;
120
121        Ok(())
122    }
123
124    /// Publish multiple domain events
125    pub async fn publish_events(&self, events: Vec<DomainEvent>) -> ApplicationResult<()> {
126        // Handle events with zero-cost abstraction
127        for event in &events {
128            self.event_handler.handle_event(event).await?;
129        }
130
131        // Store all events
132        self.event_store
133            .lock()
134            .map_err(|_| {
135                crate::application::ApplicationError::Logic(
136                    "Failed to acquire event store lock".to_string(),
137                )
138            })?
139            .append_events(events)
140            .map_err(crate::application::ApplicationError::Logic)?;
141
142        Ok(())
143    }
144
145    /// Get events for session as DTOs (for API responses)
146    pub fn get_session_events_dto(
147        &self,
148        session_id: SessionId,
149    ) -> ApplicationResult<Vec<DomainEventDto>> {
150        let events = self
151            .event_store
152            .lock()
153            .map_err(|_| {
154                crate::application::ApplicationError::Logic(
155                    "Failed to acquire event store lock".to_string(),
156                )
157            })?
158            .get_events_for_session(session_id)
159            .map_err(crate::application::ApplicationError::Logic)?;
160
161        Ok(events.into_iter().map(|e| e.to_dto()).collect())
162    }
163
164    /// Get events for stream as DTOs (for API responses)
165    pub fn get_stream_events_dto(
166        &self,
167        stream_id: StreamId,
168    ) -> ApplicationResult<Vec<DomainEventDto>> {
169        let events = self
170            .event_store
171            .lock()
172            .map_err(|_| {
173                crate::application::ApplicationError::Logic(
174                    "Failed to acquire event store lock".to_string(),
175                )
176            })?
177            .get_events_for_stream(stream_id)
178            .map_err(crate::application::ApplicationError::Logic)?;
179
180        Ok(events.into_iter().map(|e| e.to_dto()).collect())
181    }
182
183    /// Get events since timestamp as DTOs (for API responses)
184    pub fn get_events_since_dto(
185        &self,
186        since: DateTime<Utc>,
187    ) -> ApplicationResult<Vec<DomainEventDto>> {
188        let events = self
189            .event_store
190            .lock()
191            .map_err(|_| {
192                crate::application::ApplicationError::Logic(
193                    "Failed to acquire event store lock".to_string(),
194                )
195            })?
196            .get_events_since(since)
197            .map_err(crate::application::ApplicationError::Logic)?;
198
199        Ok(events.into_iter().map(|e| e.to_dto()).collect())
200    }
201
202    /// Get events for session (domain objects, for internal use)
203    pub fn get_session_events(&self, session_id: SessionId) -> ApplicationResult<Vec<DomainEvent>> {
204        self.event_store
205            .lock()
206            .map_err(|_| {
207                crate::application::ApplicationError::Logic(
208                    "Failed to acquire event store lock".to_string(),
209                )
210            })?
211            .get_events_for_session(session_id)
212            .map_err(crate::application::ApplicationError::Logic)
213    }
214
215    /// Get events for stream (domain objects, for internal use)
216    pub fn get_stream_events(&self, stream_id: StreamId) -> ApplicationResult<Vec<DomainEvent>> {
217        self.event_store
218            .lock()
219            .map_err(|_| {
220                crate::application::ApplicationError::Logic(
221                    "Failed to acquire event store lock".to_string(),
222                )
223            })?
224            .get_events_for_stream(stream_id)
225            .map_err(crate::application::ApplicationError::Logic)
226    }
227
228    /// Replay events from DTOs (for event sourcing reconstruction)
229    pub fn replay_from_dtos(
230        &self,
231        event_dtos: Vec<DomainEventDto>,
232    ) -> ApplicationResult<Vec<DomainEvent>> {
233        let mut events = Vec::new();
234
235        for dto in event_dtos {
236            let event =
237                DomainEvent::from_dto(dto).map_err(crate::application::ApplicationError::Domain)?;
238            events.push(event);
239        }
240
241        Ok(events)
242    }
243}
244
245/// Convenience constructors for common configurations
246impl<S> EventService<S, NoOpEventHandler>
247where
248    S: EventStore,
249{
250    /// Create event service with no-op handler (maximum performance)
251    pub fn with_noop_handler(event_store: Arc<Mutex<S>>) -> Self {
252        Self::new(event_store, NoOpEventHandler)
253    }
254}
255
256impl<S> EventService<S, LoggingEventHandler>
257where
258    S: EventStore,
259{
260    /// Create event service with logging handler (zero-allocation logging)
261    pub fn with_logging_handler(event_store: Arc<Mutex<S>>) -> Self {
262        Self::new(event_store, LoggingEventHandler)
263    }
264}
265
266/// Event publishing convenience methods
267impl<S, H> EventService<S, H>
268where
269    S: EventStore,
270    H: EventHandler,
271{
272    /// Publish session activated event  
273    pub async fn publish_session_activated(&self, session_id: SessionId) -> ApplicationResult<()> {
274        let event = DomainEvent::SessionActivated {
275            session_id,
276            timestamp: Utc::now(),
277        };
278        self.publish_event(event).await
279    }
280
281    /// Publish session closed event
282    pub async fn publish_session_closed(&self, session_id: SessionId) -> ApplicationResult<()> {
283        let event = DomainEvent::SessionClosed {
284            session_id,
285            timestamp: Utc::now(),
286        };
287        self.publish_event(event).await
288    }
289
290    /// Publish stream created event
291    pub async fn publish_stream_created(
292        &self,
293        session_id: SessionId,
294        stream_id: StreamId,
295    ) -> ApplicationResult<()> {
296        let event = DomainEvent::StreamCreated {
297            session_id,
298            stream_id,
299            timestamp: Utc::now(),
300        };
301        self.publish_event(event).await
302    }
303
304    /// Publish stream completed event
305    pub async fn publish_stream_completed(
306        &self,
307        session_id: SessionId,
308        stream_id: StreamId,
309    ) -> ApplicationResult<()> {
310        let event = DomainEvent::StreamCompleted {
311            session_id,
312            stream_id,
313            timestamp: Utc::now(),
314        };
315        self.publish_event(event).await
316    }
317
318    /// Publish stream failed event
319    pub async fn publish_stream_failed(
320        &self,
321        session_id: SessionId,
322        stream_id: StreamId,
323        error: String,
324    ) -> ApplicationResult<()> {
325        let event = DomainEvent::StreamFailed {
326            session_id,
327            stream_id,
328            error,
329            timestamp: Utc::now(),
330        };
331        self.publish_event(event).await
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::domain::{
339        events::{EventSubscriber, InMemoryEventStore},
340        value_objects::{SessionId, StreamId},
341    };
342
343    // Mock subscriber for testing
344    struct MockSubscriber {
345        received_events: std::sync::Mutex<Vec<DomainEvent>>,
346    }
347
348    impl MockSubscriber {
349        fn new() -> Self {
350            Self {
351                received_events: std::sync::Mutex::new(Vec::new()),
352            }
353        }
354
355        fn event_count(&self) -> usize {
356            self.received_events
357                .lock()
358                .map(|events| events.len())
359                .unwrap_or(0)
360        }
361    }
362
363    impl EventSubscriber for MockSubscriber {
364        type HandleFuture<'a>
365            = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
366        where
367            Self: 'a;
368
369        fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_> {
370            let event = event.clone();
371            async move {
372                self.received_events
373                    .lock()
374                    .map_err(|_| {
375                        crate::domain::DomainError::Logic(
376                            "Event subscriber lock poisoned".to_string(),
377                        )
378                    })?
379                    .push(event);
380                Ok(())
381            }
382        }
383    }
384
385    #[tokio::test]
386    async fn test_event_service_creation() {
387        let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
388        let _service = EventService::with_noop_handler(store);
389
390        // Service created successfully
391    }
392
393    #[tokio::test]
394    async fn test_publish_event() {
395        let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
396        let service = EventService::with_logging_handler(store.clone());
397
398        let session_id = SessionId::new();
399
400        service.publish_session_activated(session_id).await.unwrap();
401
402        // Verify event was stored
403        let events = service.get_session_events(session_id).unwrap();
404        assert_eq!(events.len(), 1);
405
406        match &events[0] {
407            DomainEvent::SessionActivated {
408                session_id: stored_id,
409                ..
410            } => {
411                assert_eq!(*stored_id, session_id);
412            }
413            _ => panic!("Expected SessionActivated event"),
414        }
415    }
416
417    #[tokio::test]
418    async fn test_event_handler() {
419        let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
420        let service = EventService::with_logging_handler(store);
421
422        let session_id = SessionId::new();
423        service.publish_session_activated(session_id).await.unwrap();
424
425        // Verify event was handled and stored
426        let events = service.get_session_events(session_id).unwrap();
427        assert_eq!(events.len(), 1);
428    }
429
430    #[tokio::test]
431    async fn test_dto_conversion() {
432        let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
433        let service = EventService::with_logging_handler(store);
434
435        let session_id = SessionId::new();
436        let stream_id = StreamId::new();
437
438        service
439            .publish_stream_created(session_id, stream_id)
440            .await
441            .unwrap();
442
443        // Get events as DTOs
444        let event_dtos = service.get_session_events_dto(session_id).unwrap();
445        assert_eq!(event_dtos.len(), 1);
446
447        // Verify DTO structure
448        match &event_dtos[0] {
449            DomainEventDto::StreamCreated {
450                session_id: dto_session_id,
451                stream_id: dto_stream_id,
452                ..
453            } => {
454                assert_eq!(dto_session_id.uuid(), session_id.as_uuid());
455                assert_eq!(dto_stream_id.uuid(), stream_id.as_uuid());
456            }
457            _ => panic!("Expected StreamCreated DTO"),
458        }
459
460        // Test replay from DTOs
461        let replayed = service.replay_from_dtos(event_dtos).unwrap();
462        assert_eq!(replayed.len(), 1);
463    }
464
465    #[tokio::test]
466    async fn test_multiple_events() {
467        let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
468        let service = EventService::with_logging_handler(store);
469
470        let session_id = SessionId::new();
471        let stream_id = StreamId::new();
472
473        let events = vec![
474            DomainEvent::SessionActivated {
475                session_id,
476                timestamp: Utc::now(),
477            },
478            DomainEvent::StreamCreated {
479                session_id,
480                stream_id,
481                timestamp: Utc::now(),
482            },
483            DomainEvent::StreamStarted {
484                session_id,
485                stream_id,
486                timestamp: Utc::now(),
487            },
488        ];
489
490        service.publish_events(events).await.unwrap();
491
492        let stored_events = service.get_session_events(session_id).unwrap();
493        assert_eq!(stored_events.len(), 3);
494    }
495}