1use crate::{
4 application::{ApplicationError, ApplicationResult, commands::*, handlers::CommandHandler},
5 domain::{
6 aggregates::StreamSession,
7 entities::Frame,
8 ports::{EventPublisher, StreamRepository},
9 value_objects::{SessionId, StreamId},
10 },
11};
12use async_trait::async_trait;
13use std::sync::Arc;
14
15#[derive(Debug)]
17pub struct SessionCommandHandler<R, P>
18where
19 R: StreamRepository,
20 P: EventPublisher,
21{
22 repository: Arc<R>,
23 event_publisher: Arc<P>,
24}
25
26impl<R, P> SessionCommandHandler<R, P>
27where
28 R: StreamRepository,
29 P: EventPublisher,
30{
31 pub fn new(repository: Arc<R>, event_publisher: Arc<P>) -> Self {
32 Self {
33 repository,
34 event_publisher,
35 }
36 }
37}
38
39#[async_trait]
40impl<R, P> CommandHandler<CreateSessionCommand, SessionId> for SessionCommandHandler<R, P>
41where
42 R: StreamRepository + Send + Sync,
43 P: EventPublisher + Send + Sync,
44{
45 async fn handle(&self, command: CreateSessionCommand) -> ApplicationResult<SessionId> {
46 let mut session = StreamSession::new(command.config);
48
49 if let (Some(client_info), user_agent, ip_address) =
51 (command.client_info, command.user_agent, command.ip_address)
52 {
53 session.set_client_info(client_info, user_agent, ip_address);
54 }
55
56 session.activate().map_err(ApplicationError::Domain)?;
58
59 let session_id = session.id();
60
61 self.repository
63 .save_session(session.clone())
64 .await
65 .map_err(ApplicationError::Domain)?;
66
67 let events = session.take_events();
69 for event in events {
70 self.event_publisher
71 .publish(event)
72 .await
73 .map_err(ApplicationError::Domain)?;
74 }
75
76 Ok(session_id)
77 }
78}
79
80#[async_trait]
81impl<R, P> CommandHandler<CreateStreamCommand, StreamId> for SessionCommandHandler<R, P>
82where
83 R: StreamRepository + Send + Sync,
84 P: EventPublisher + Send + Sync,
85{
86 async fn handle(&self, command: CreateStreamCommand) -> ApplicationResult<StreamId> {
87 let mut session = self
89 .repository
90 .find_session(command.session_id.into())
91 .await
92 .map_err(ApplicationError::Domain)?
93 .ok_or_else(|| {
94 ApplicationError::NotFound(format!("Session {} not found", command.session_id))
95 })?;
96
97 let stream_id = session
99 .create_stream(command.source_data)
100 .map_err(ApplicationError::Domain)?;
101
102 if let Some(config) = command.config
104 && let Some(stream) = session.get_stream_mut(stream_id) {
105 stream
106 .update_config(config)
107 .map_err(ApplicationError::Domain)?;
108 }
109
110 self.repository
112 .save_session(session.clone())
113 .await
114 .map_err(ApplicationError::Domain)?;
115
116 let events = session.take_events();
118 for event in events {
119 self.event_publisher
120 .publish(event)
121 .await
122 .map_err(ApplicationError::Domain)?;
123 }
124
125 Ok(stream_id)
126 }
127}
128
129#[async_trait]
130impl<R, P> CommandHandler<StartStreamCommand, ()> for SessionCommandHandler<R, P>
131where
132 R: StreamRepository + Send + Sync,
133 P: EventPublisher + Send + Sync,
134{
135 async fn handle(&self, command: StartStreamCommand) -> ApplicationResult<()> {
136 let mut session = self
138 .repository
139 .find_session(command.session_id.into())
140 .await
141 .map_err(ApplicationError::Domain)?
142 .ok_or_else(|| {
143 ApplicationError::NotFound(format!("Session {} not found", command.session_id))
144 })?;
145
146 session
148 .start_stream(command.stream_id.into())
149 .map_err(ApplicationError::Domain)?;
150
151 self.repository
153 .save_session(session.clone())
154 .await
155 .map_err(ApplicationError::Domain)?;
156
157 let events = session.take_events();
159 for event in events {
160 self.event_publisher
161 .publish(event)
162 .await
163 .map_err(ApplicationError::Domain)?;
164 }
165
166 Ok(())
167 }
168}
169
170#[async_trait]
171impl<R, P> CommandHandler<CompleteStreamCommand, ()> for SessionCommandHandler<R, P>
172where
173 R: StreamRepository + Send + Sync,
174 P: EventPublisher + Send + Sync,
175{
176 async fn handle(&self, command: CompleteStreamCommand) -> ApplicationResult<()> {
177 let mut session = self
179 .repository
180 .find_session(command.session_id.into())
181 .await
182 .map_err(ApplicationError::Domain)?
183 .ok_or_else(|| {
184 ApplicationError::NotFound(format!("Session {} not found", command.session_id))
185 })?;
186
187 session
189 .complete_stream(command.stream_id.into())
190 .map_err(ApplicationError::Domain)?;
191
192 self.repository
194 .save_session(session.clone())
195 .await
196 .map_err(ApplicationError::Domain)?;
197
198 let events = session.take_events();
200 for event in events {
201 self.event_publisher
202 .publish(event)
203 .await
204 .map_err(ApplicationError::Domain)?;
205 }
206
207 Ok(())
208 }
209}
210
211#[async_trait]
212impl<R, P> CommandHandler<GenerateFramesCommand, Vec<Frame>> for SessionCommandHandler<R, P>
213where
214 R: StreamRepository + Send + Sync,
215 P: EventPublisher + Send + Sync,
216{
217 async fn handle(&self, command: GenerateFramesCommand) -> ApplicationResult<Vec<Frame>> {
218 let mut session = self
220 .repository
221 .find_session(command.session_id.into())
222 .await
223 .map_err(ApplicationError::Domain)?
224 .ok_or_else(|| {
225 ApplicationError::NotFound(format!("Session {} not found", command.session_id))
226 })?;
227
228 let stream = session.get_stream_mut(command.stream_id.into()).ok_or_else(|| {
230 ApplicationError::NotFound(format!("Stream {} not found", command.stream_id))
231 })?;
232
233 let priority = command.priority_threshold.try_into()
235 .map_err(ApplicationError::Domain)?;
236 let frames = stream
237 .create_patch_frames(priority, command.max_frames)
238 .map_err(ApplicationError::Domain)?;
239
240 self.repository
242 .save_session(session.clone())
243 .await
244 .map_err(ApplicationError::Domain)?;
245
246 let events = session.take_events();
248 for event in events {
249 self.event_publisher
250 .publish(event)
251 .await
252 .map_err(ApplicationError::Domain)?;
253 }
254
255 Ok(frames)
256 }
257}
258
259#[async_trait]
260impl<R, P> CommandHandler<BatchGenerateFramesCommand, Vec<Frame>> for SessionCommandHandler<R, P>
261where
262 R: StreamRepository + Send + Sync,
263 P: EventPublisher + Send + Sync,
264{
265 async fn handle(&self, command: BatchGenerateFramesCommand) -> ApplicationResult<Vec<Frame>> {
266 let mut session = self
268 .repository
269 .find_session(command.session_id.into())
270 .await
271 .map_err(ApplicationError::Domain)?
272 .ok_or_else(|| {
273 ApplicationError::NotFound(format!("Session {} not found", command.session_id))
274 })?;
275
276 let frames = session
278 .create_priority_frames(command.max_frames)
279 .map_err(ApplicationError::Domain)?;
280
281 self.repository
283 .save_session(session.clone())
284 .await
285 .map_err(ApplicationError::Domain)?;
286
287 let events = session.take_events();
289 for event in events {
290 self.event_publisher
291 .publish(event)
292 .await
293 .map_err(ApplicationError::Domain)?;
294 }
295
296 Ok(frames)
297 }
298}
299
300#[async_trait]
301impl<R, P> CommandHandler<CloseSessionCommand, ()> for SessionCommandHandler<R, P>
302where
303 R: StreamRepository + Send + Sync,
304 P: EventPublisher + Send + Sync,
305{
306 async fn handle(&self, command: CloseSessionCommand) -> ApplicationResult<()> {
307 let mut session = self
309 .repository
310 .find_session(command.session_id.into())
311 .await
312 .map_err(ApplicationError::Domain)?
313 .ok_or_else(|| {
314 ApplicationError::NotFound(format!("Session {} not found", command.session_id))
315 })?;
316
317 session.close().map_err(ApplicationError::Domain)?;
319
320 self.repository
322 .save_session(session.clone())
323 .await
324 .map_err(ApplicationError::Domain)?;
325
326 let events = session.take_events();
328 for event in events {
329 self.event_publisher
330 .publish(event)
331 .await
332 .map_err(ApplicationError::Domain)?;
333 }
334
335 Ok(())
336 }
337}
338
339pub struct CommandValidator;
341
342impl CommandValidator {
343 pub fn validate_create_session(command: &CreateSessionCommand) -> Result<(), Vec<String>> {
345 let mut errors = Vec::new();
346
347 if command.config.max_concurrent_streams == 0 {
348 errors.push("max_concurrent_streams must be greater than 0".to_string());
349 }
350
351 if command.config.session_timeout_seconds == 0 {
352 errors.push("session_timeout_seconds must be greater than 0".to_string());
353 }
354
355 if errors.is_empty() {
356 Ok(())
357 } else {
358 Err(errors)
359 }
360 }
361
362 pub fn validate_create_stream(command: &CreateStreamCommand) -> Result<(), Vec<String>> {
364 let mut errors = Vec::new();
365
366 if command.source_data.is_null() {
367 errors.push("source_data cannot be null".to_string());
368 }
369
370 if errors.is_empty() {
371 Ok(())
372 } else {
373 Err(errors)
374 }
375 }
376
377 pub fn validate_generate_frames(command: &GenerateFramesCommand) -> Result<(), Vec<String>> {
379 let mut errors = Vec::new();
380
381 if command.max_frames == 0 {
382 errors.push("max_frames must be greater than 0".to_string());
383 }
384
385 if command.max_frames > 1000 {
386 errors.push("max_frames cannot exceed 1000".to_string());
387 }
388
389 if errors.is_empty() {
390 Ok(())
391 } else {
392 Err(errors)
393 }
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use crate::domain::{
401 aggregates::stream_session::SessionConfig,
402 events::DomainEvent,
403 ports::{EventPublisher, StreamRepository},
404 };
405 use std::collections::HashMap;
406
407 struct MockRepository {
409 sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
410 }
411
412 impl MockRepository {
413 fn new() -> Self {
414 Self {
415 sessions: std::sync::Mutex::new(HashMap::new()),
416 }
417 }
418 }
419
420 #[async_trait]
421 impl StreamRepository for MockRepository {
422 async fn find_session(
423 &self,
424 session_id: SessionId,
425 ) -> crate::domain::DomainResult<Option<StreamSession>> {
426 Ok(self.sessions.lock().unwrap().get(&session_id).cloned())
428 }
429
430 async fn save_session(&self, session: StreamSession) -> crate::domain::DomainResult<()> {
431 self.sessions.lock().unwrap().insert(session.id(), session);
433 Ok(())
434 }
435
436 async fn remove_session(&self, session_id: SessionId) -> crate::domain::DomainResult<()> {
437 self.sessions.lock().unwrap().remove(&session_id);
439 Ok(())
440 }
441
442 async fn find_active_sessions(&self) -> crate::domain::DomainResult<Vec<StreamSession>> {
443 Ok(self.sessions.lock().unwrap().values().cloned().collect())
445 }
446 }
447
448 struct MockEventPublisher;
449
450 #[async_trait]
451 impl EventPublisher for MockEventPublisher {
452 async fn publish(&self, _event: DomainEvent) -> crate::domain::DomainResult<()> {
453 Ok(())
454 }
455
456 async fn publish_batch(
457 &self,
458 _events: Vec<DomainEvent>,
459 ) -> crate::domain::DomainResult<()> {
460 Ok(())
461 }
462 }
463
464 #[tokio::test]
465 async fn test_create_session_command() {
466 let repository = Arc::new(MockRepository::new());
467 let event_publisher = Arc::new(MockEventPublisher);
468 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
469
470 let command = CreateSessionCommand {
471 config: SessionConfig::default(),
472 client_info: Some("test-client".to_string()),
473 user_agent: None,
474 ip_address: None,
475 };
476
477 let result = handler.handle(command).await;
478 assert!(result.is_ok());
479
480 let session_id = result.unwrap();
481
482 let saved_session = repository.find_session(session_id).await.unwrap();
484 assert!(saved_session.is_some());
485 }
486
487 #[tokio::test]
488 async fn test_session_command_handler_creation() {
489 let repository = Arc::new(MockRepository::new());
490 let event_publisher = Arc::new(MockEventPublisher);
491 let handler = SessionCommandHandler::new(repository.clone(), event_publisher.clone());
492
493 assert!(std::ptr::eq(handler.repository.as_ref(), repository.as_ref()));
494 assert!(std::ptr::eq(handler.event_publisher.as_ref(), event_publisher.as_ref()));
495 }
496
497 #[tokio::test]
498 async fn test_create_session_with_full_client_info() {
499 let repository = Arc::new(MockRepository::new());
500 let event_publisher = Arc::new(MockEventPublisher);
501 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
502
503 let command = CreateSessionCommand {
504 config: SessionConfig::default(),
505 client_info: Some("test-client".to_string()),
506 user_agent: Some("Mozilla/5.0".to_string()),
507 ip_address: Some("192.168.1.1".to_string()),
508 };
509
510 let result = handler.handle(command).await;
511 assert!(result.is_ok());
512
513 let session_id = result.unwrap();
514 let saved_session = repository.find_session(session_id).await.unwrap();
515 assert!(saved_session.is_some());
516 }
517
518 #[tokio::test]
519 async fn test_create_session_without_client_info() {
520 let repository = Arc::new(MockRepository::new());
521 let event_publisher = Arc::new(MockEventPublisher);
522 let handler = SessionCommandHandler::new(repository, event_publisher);
523
524 let command = CreateSessionCommand {
525 config: SessionConfig::default(),
526 client_info: None,
527 user_agent: None,
528 ip_address: None,
529 };
530
531 let result = handler.handle(command).await;
532 assert!(result.is_ok());
533 }
534
535 #[tokio::test]
536 async fn test_create_stream_command() {
537 let repository = Arc::new(MockRepository::new());
538 let event_publisher = Arc::new(MockEventPublisher);
539 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
540
541 let create_session_cmd = CreateSessionCommand {
543 config: SessionConfig::default(),
544 client_info: None,
545 user_agent: None,
546 ip_address: None,
547 };
548
549 let session_id = handler.handle(create_session_cmd).await.unwrap();
550
551 let create_stream_cmd = CreateStreamCommand {
553 session_id: session_id.into(),
554 source_data: serde_json::json!({"test": "data"}),
555 config: None,
556 };
557
558 let result = handler.handle(create_stream_cmd).await;
559 assert!(result.is_ok());
560
561 let stream_id = result.unwrap();
562 assert_ne!(stream_id, StreamId::new()); }
564
565 #[tokio::test]
566 async fn test_create_stream_with_config() {
567 let repository = Arc::new(MockRepository::new());
568 let event_publisher = Arc::new(MockEventPublisher);
569 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
570
571 let session_id = handler.handle(CreateSessionCommand {
573 config: SessionConfig::default(),
574 client_info: None,
575 user_agent: None,
576 ip_address: None,
577 }).await.unwrap();
578
579 let stream_config = crate::domain::entities::stream::StreamConfig::default();
581 let create_stream_cmd = CreateStreamCommand {
582 session_id: session_id.into(),
583 source_data: serde_json::json!({"test": "data"}),
584 config: Some(stream_config),
585 };
586
587 let result = handler.handle(create_stream_cmd).await;
588 assert!(result.is_ok());
589 }
590
591 #[tokio::test]
592 async fn test_create_stream_session_not_found() {
593 let repository = Arc::new(MockRepository::new());
594 let event_publisher = Arc::new(MockEventPublisher);
595 let handler = SessionCommandHandler::new(repository, event_publisher);
596
597 let non_existent_session_id = SessionId::new();
598 let create_stream_cmd = CreateStreamCommand {
599 session_id: non_existent_session_id.into(),
600 source_data: serde_json::json!({"test": "data"}),
601 config: None,
602 };
603
604 let result = handler.handle(create_stream_cmd).await;
605 assert!(result.is_err());
606 assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
607 }
608
609 #[tokio::test]
610 async fn test_start_stream_command() {
611 let repository = Arc::new(MockRepository::new());
612 let event_publisher = Arc::new(MockEventPublisher);
613 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
614
615 let session_id = handler.handle(CreateSessionCommand {
617 config: SessionConfig::default(),
618 client_info: None,
619 user_agent: None,
620 ip_address: None,
621 }).await.unwrap();
622
623 let stream_id = handler.handle(CreateStreamCommand {
624 session_id: session_id.into(),
625 source_data: serde_json::json!({"test": "data"}),
626 config: None,
627 }).await.unwrap();
628
629 let start_stream_cmd = StartStreamCommand {
631 session_id: session_id.into(),
632 stream_id: stream_id.into(),
633 };
634
635 let result = handler.handle(start_stream_cmd).await;
636 assert!(result.is_ok());
637 }
638
639 #[tokio::test]
640 async fn test_start_stream_session_not_found() {
641 let repository = Arc::new(MockRepository::new());
642 let event_publisher = Arc::new(MockEventPublisher);
643 let handler = SessionCommandHandler::new(repository, event_publisher);
644
645 let start_stream_cmd = StartStreamCommand {
646 session_id: SessionId::new().into(),
647 stream_id: StreamId::new().into(),
648 };
649
650 let result = handler.handle(start_stream_cmd).await;
651 assert!(result.is_err());
652 assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
653 }
654
655 #[tokio::test]
656 async fn test_complete_stream_command() {
657 let repository = Arc::new(MockRepository::new());
658 let event_publisher = Arc::new(MockEventPublisher);
659 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
660
661 let session_id = handler.handle(CreateSessionCommand {
663 config: SessionConfig::default(),
664 client_info: None,
665 user_agent: None,
666 ip_address: None,
667 }).await.unwrap();
668
669 let stream_id = handler.handle(CreateStreamCommand {
670 session_id: session_id.into(),
671 source_data: serde_json::json!({"test": "data"}),
672 config: None,
673 }).await.unwrap();
674
675 handler.handle(StartStreamCommand {
676 session_id: session_id.into(),
677 stream_id: stream_id.into(),
678 }).await.unwrap();
679
680 let complete_stream_cmd = CompleteStreamCommand {
682 session_id: session_id.into(),
683 stream_id: stream_id.into(),
684 checksum: Some("abc123".to_string()),
685 };
686
687 let result = handler.handle(complete_stream_cmd).await;
688 assert!(result.is_ok());
689 }
690
691 #[tokio::test]
692 async fn test_complete_stream_without_checksum() {
693 let repository = Arc::new(MockRepository::new());
694 let event_publisher = Arc::new(MockEventPublisher);
695 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
696
697 let session_id = handler.handle(CreateSessionCommand {
699 config: SessionConfig::default(),
700 client_info: None,
701 user_agent: None,
702 ip_address: None,
703 }).await.unwrap();
704
705 let stream_id = handler.handle(CreateStreamCommand {
706 session_id: session_id.into(),
707 source_data: serde_json::json!({"test": "data"}),
708 config: None,
709 }).await.unwrap();
710
711 handler.handle(StartStreamCommand {
712 session_id: session_id.into(),
713 stream_id: stream_id.into(),
714 }).await.unwrap();
715
716 let complete_stream_cmd = CompleteStreamCommand {
718 session_id: session_id.into(),
719 stream_id: stream_id.into(),
720 checksum: None,
721 };
722
723 let result = handler.handle(complete_stream_cmd).await;
724 assert!(result.is_ok());
725 }
726
727 #[tokio::test]
728 async fn test_close_session_command() {
729 let repository = Arc::new(MockRepository::new());
730 let event_publisher = Arc::new(MockEventPublisher);
731 let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
732
733 let session_id = handler.handle(CreateSessionCommand {
735 config: SessionConfig::default(),
736 client_info: None,
737 user_agent: None,
738 ip_address: None,
739 }).await.unwrap();
740
741 let close_session_cmd = CloseSessionCommand {
743 session_id: session_id.into(),
744 };
745
746 let result = handler.handle(close_session_cmd).await;
747 assert!(result.is_ok());
748 }
749
750 #[tokio::test]
751 async fn test_close_session_not_found() {
752 let repository = Arc::new(MockRepository::new());
753 let event_publisher = Arc::new(MockEventPublisher);
754 let handler = SessionCommandHandler::new(repository, event_publisher);
755
756 let close_session_cmd = CloseSessionCommand {
757 session_id: SessionId::new().into(),
758 };
759
760 let result = handler.handle(close_session_cmd).await;
761 assert!(result.is_err());
762 assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
763 }
764}