pjson_rs/application/services/
event_service.rs1use crate::{
7 application::{
8 ApplicationResult,
9 dto::{DomainEventDto, FromDto, ToDto},
10 },
11 domain::{
12 events::{DomainEvent, EventStore},
13 value_objects::{SessionId, StreamId},
14 },
15};
16use chrono::{DateTime, Utc};
17use std::sync::{Arc, Mutex};
18use tracing;
19
20pub struct EventService<S, H>
23where
24 S: EventStore,
25 H: EventHandler,
26{
27 event_store: Arc<Mutex<S>>,
28 event_handler: H,
29}
30
31pub trait EventHandler {
33 type HandleFuture<'a>: std::future::Future<Output = ApplicationResult<()>> + Send + 'a
34 where
35 Self: 'a;
36
37 fn handle_event(&self, event: &DomainEvent) -> Self::HandleFuture<'_>;
38}
39
40pub struct NoOpEventHandler;
42
43impl EventHandler for NoOpEventHandler {
44 type HandleFuture<'a>
45 = impl std::future::Future<Output = ApplicationResult<()>> + Send + 'a
46 where
47 Self: 'a;
48
49 fn handle_event(&self, _event: &DomainEvent) -> Self::HandleFuture<'_> {
50 async move { Ok(()) }
51 }
52}
53
54pub struct LoggingEventHandler;
56
57impl EventHandler for LoggingEventHandler {
58 type HandleFuture<'a>
59 = impl std::future::Future<Output = ApplicationResult<()>> + Send + 'a
60 where
61 Self: 'a;
62
63 fn handle_event(&self, event: &DomainEvent) -> Self::HandleFuture<'_> {
64 let event_type = event.event_type();
65 let session_id = event.session_id();
66 let stream_id = event.stream_id();
67
68 async move {
69 match event_type {
71 "stream_completed" => {
72 if let Some(stream) = stream_id {
73 tracing::info!(
74 "Stream completed: session={}, stream={}",
75 session_id,
76 stream
77 );
78 }
79 }
80 "session_activated" => {
81 tracing::info!("Session activated: {}", session_id);
82 }
83 _ => {
84 tracing::debug!("Domain event processed: {}", event_type);
85 }
86 }
87 Ok(())
88 }
89 }
90}
91
92impl<S, H> EventService<S, H>
93where
94 S: EventStore,
95 H: EventHandler,
96{
97 pub fn new(event_store: Arc<Mutex<S>>, event_handler: H) -> Self {
99 Self {
100 event_store,
101 event_handler,
102 }
103 }
104
105 pub async fn publish_event(&self, event: DomainEvent) -> ApplicationResult<()> {
107 self.event_handler.handle_event(&event).await?;
109
110 self.event_store
112 .lock()
113 .map_err(|_| {
114 crate::application::ApplicationError::Logic(
115 "Failed to acquire event store lock".to_string(),
116 )
117 })?
118 .append_events(vec![event])
119 .map_err(crate::application::ApplicationError::Logic)?;
120
121 Ok(())
122 }
123
124 pub async fn publish_events(&self, events: Vec<DomainEvent>) -> ApplicationResult<()> {
126 for event in &events {
128 self.event_handler.handle_event(event).await?;
129 }
130
131 self.event_store
133 .lock()
134 .map_err(|_| {
135 crate::application::ApplicationError::Logic(
136 "Failed to acquire event store lock".to_string(),
137 )
138 })?
139 .append_events(events)
140 .map_err(crate::application::ApplicationError::Logic)?;
141
142 Ok(())
143 }
144
145 pub fn get_session_events_dto(
147 &self,
148 session_id: SessionId,
149 ) -> ApplicationResult<Vec<DomainEventDto>> {
150 let events = self
151 .event_store
152 .lock()
153 .map_err(|_| {
154 crate::application::ApplicationError::Logic(
155 "Failed to acquire event store lock".to_string(),
156 )
157 })?
158 .get_events_for_session(session_id)
159 .map_err(crate::application::ApplicationError::Logic)?;
160
161 Ok(events.into_iter().map(|e| e.to_dto()).collect())
162 }
163
164 pub fn get_stream_events_dto(
166 &self,
167 stream_id: StreamId,
168 ) -> ApplicationResult<Vec<DomainEventDto>> {
169 let events = self
170 .event_store
171 .lock()
172 .map_err(|_| {
173 crate::application::ApplicationError::Logic(
174 "Failed to acquire event store lock".to_string(),
175 )
176 })?
177 .get_events_for_stream(stream_id)
178 .map_err(crate::application::ApplicationError::Logic)?;
179
180 Ok(events.into_iter().map(|e| e.to_dto()).collect())
181 }
182
183 pub fn get_events_since_dto(
185 &self,
186 since: DateTime<Utc>,
187 ) -> ApplicationResult<Vec<DomainEventDto>> {
188 let events = self
189 .event_store
190 .lock()
191 .map_err(|_| {
192 crate::application::ApplicationError::Logic(
193 "Failed to acquire event store lock".to_string(),
194 )
195 })?
196 .get_events_since(since)
197 .map_err(crate::application::ApplicationError::Logic)?;
198
199 Ok(events.into_iter().map(|e| e.to_dto()).collect())
200 }
201
202 pub fn get_session_events(&self, session_id: SessionId) -> ApplicationResult<Vec<DomainEvent>> {
204 self.event_store
205 .lock()
206 .map_err(|_| {
207 crate::application::ApplicationError::Logic(
208 "Failed to acquire event store lock".to_string(),
209 )
210 })?
211 .get_events_for_session(session_id)
212 .map_err(crate::application::ApplicationError::Logic)
213 }
214
215 pub fn get_stream_events(&self, stream_id: StreamId) -> ApplicationResult<Vec<DomainEvent>> {
217 self.event_store
218 .lock()
219 .map_err(|_| {
220 crate::application::ApplicationError::Logic(
221 "Failed to acquire event store lock".to_string(),
222 )
223 })?
224 .get_events_for_stream(stream_id)
225 .map_err(crate::application::ApplicationError::Logic)
226 }
227
228 pub fn replay_from_dtos(
230 &self,
231 event_dtos: Vec<DomainEventDto>,
232 ) -> ApplicationResult<Vec<DomainEvent>> {
233 let mut events = Vec::new();
234
235 for dto in event_dtos {
236 let event =
237 DomainEvent::from_dto(dto).map_err(crate::application::ApplicationError::Domain)?;
238 events.push(event);
239 }
240
241 Ok(events)
242 }
243}
244
245impl<S> EventService<S, NoOpEventHandler>
247where
248 S: EventStore,
249{
250 pub fn with_noop_handler(event_store: Arc<Mutex<S>>) -> Self {
252 Self::new(event_store, NoOpEventHandler)
253 }
254}
255
256impl<S> EventService<S, LoggingEventHandler>
257where
258 S: EventStore,
259{
260 pub fn with_logging_handler(event_store: Arc<Mutex<S>>) -> Self {
262 Self::new(event_store, LoggingEventHandler)
263 }
264}
265
266impl<S, H> EventService<S, H>
268where
269 S: EventStore,
270 H: EventHandler,
271{
272 pub async fn publish_session_activated(&self, session_id: SessionId) -> ApplicationResult<()> {
274 let event = DomainEvent::SessionActivated {
275 session_id,
276 timestamp: Utc::now(),
277 };
278 self.publish_event(event).await
279 }
280
281 pub async fn publish_session_closed(&self, session_id: SessionId) -> ApplicationResult<()> {
283 let event = DomainEvent::SessionClosed {
284 session_id,
285 timestamp: Utc::now(),
286 };
287 self.publish_event(event).await
288 }
289
290 pub async fn publish_stream_created(
292 &self,
293 session_id: SessionId,
294 stream_id: StreamId,
295 ) -> ApplicationResult<()> {
296 let event = DomainEvent::StreamCreated {
297 session_id,
298 stream_id,
299 timestamp: Utc::now(),
300 };
301 self.publish_event(event).await
302 }
303
304 pub async fn publish_stream_completed(
306 &self,
307 session_id: SessionId,
308 stream_id: StreamId,
309 ) -> ApplicationResult<()> {
310 let event = DomainEvent::StreamCompleted {
311 session_id,
312 stream_id,
313 timestamp: Utc::now(),
314 };
315 self.publish_event(event).await
316 }
317
318 pub async fn publish_stream_failed(
320 &self,
321 session_id: SessionId,
322 stream_id: StreamId,
323 error: String,
324 ) -> ApplicationResult<()> {
325 let event = DomainEvent::StreamFailed {
326 session_id,
327 stream_id,
328 error,
329 timestamp: Utc::now(),
330 };
331 self.publish_event(event).await
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::domain::{
339 events::{EventSubscriber, InMemoryEventStore},
340 value_objects::{SessionId, StreamId},
341 };
342
343 struct MockSubscriber {
345 received_events: std::sync::Mutex<Vec<DomainEvent>>,
346 }
347
348 impl MockSubscriber {
349 fn new() -> Self {
350 Self {
351 received_events: std::sync::Mutex::new(Vec::new()),
352 }
353 }
354
355 fn event_count(&self) -> usize {
356 self.received_events
357 .lock()
358 .map(|events| events.len())
359 .unwrap_or(0)
360 }
361 }
362
363 impl EventSubscriber for MockSubscriber {
364 type HandleFuture<'a>
365 = impl std::future::Future<Output = crate::domain::DomainResult<()>> + Send + 'a
366 where
367 Self: 'a;
368
369 fn handle(&self, event: &DomainEvent) -> Self::HandleFuture<'_> {
370 let event = event.clone();
371 async move {
372 self.received_events
373 .lock()
374 .map_err(|_| {
375 crate::domain::DomainError::Logic(
376 "Event subscriber lock poisoned".to_string(),
377 )
378 })?
379 .push(event);
380 Ok(())
381 }
382 }
383 }
384
385 #[tokio::test]
386 async fn test_event_service_creation() {
387 let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
388 let _service = EventService::with_noop_handler(store);
389
390 }
392
393 #[tokio::test]
394 async fn test_publish_event() {
395 let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
396 let service = EventService::with_logging_handler(store.clone());
397
398 let session_id = SessionId::new();
399
400 service.publish_session_activated(session_id).await.unwrap();
401
402 let events = service.get_session_events(session_id).unwrap();
404 assert_eq!(events.len(), 1);
405
406 match &events[0] {
407 DomainEvent::SessionActivated {
408 session_id: stored_id,
409 ..
410 } => {
411 assert_eq!(*stored_id, session_id);
412 }
413 _ => panic!("Expected SessionActivated event"),
414 }
415 }
416
417 #[tokio::test]
418 async fn test_event_handler() {
419 let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
420 let service = EventService::with_logging_handler(store);
421
422 let session_id = SessionId::new();
423 service.publish_session_activated(session_id).await.unwrap();
424
425 let events = service.get_session_events(session_id).unwrap();
427 assert_eq!(events.len(), 1);
428 }
429
430 #[tokio::test]
431 async fn test_dto_conversion() {
432 let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
433 let service = EventService::with_logging_handler(store);
434
435 let session_id = SessionId::new();
436 let stream_id = StreamId::new();
437
438 service
439 .publish_stream_created(session_id, stream_id)
440 .await
441 .unwrap();
442
443 let event_dtos = service.get_session_events_dto(session_id).unwrap();
445 assert_eq!(event_dtos.len(), 1);
446
447 match &event_dtos[0] {
449 DomainEventDto::StreamCreated {
450 session_id: dto_session_id,
451 stream_id: dto_stream_id,
452 ..
453 } => {
454 assert_eq!(dto_session_id.uuid(), session_id.as_uuid());
455 assert_eq!(dto_stream_id.uuid(), stream_id.as_uuid());
456 }
457 _ => panic!("Expected StreamCreated DTO"),
458 }
459
460 let replayed = service.replay_from_dtos(event_dtos).unwrap();
462 assert_eq!(replayed.len(), 1);
463 }
464
465 #[tokio::test]
466 async fn test_multiple_events() {
467 let store = Arc::new(std::sync::Mutex::new(InMemoryEventStore::new()));
468 let service = EventService::with_logging_handler(store);
469
470 let session_id = SessionId::new();
471 let stream_id = StreamId::new();
472
473 let events = vec![
474 DomainEvent::SessionActivated {
475 session_id,
476 timestamp: Utc::now(),
477 },
478 DomainEvent::StreamCreated {
479 session_id,
480 stream_id,
481 timestamp: Utc::now(),
482 },
483 DomainEvent::StreamStarted {
484 session_id,
485 stream_id,
486 timestamp: Utc::now(),
487 },
488 ];
489
490 service.publish_events(events).await.unwrap();
491
492 let stored_events = service.get_session_events(session_id).unwrap();
493 assert_eq!(stored_events.len(), 3);
494 }
495}