pjson_rs/application/handlers/
command_handlers.rs

1//! Command handlers implementing business use cases
2
3use crate::{
4    application::{ApplicationError, ApplicationResult, commands::*, handlers::CommandHandler},
5    domain::{
6        aggregates::StreamSession,
7        entities::Frame,
8        ports::{EventPublisher, StreamRepository},
9        value_objects::{SessionId, StreamId},
10    },
11};
12use async_trait::async_trait;
13use std::sync::Arc;
14
15/// Handler for session management commands
16#[derive(Debug)]
17pub struct SessionCommandHandler<R, P>
18where
19    R: StreamRepository,
20    P: EventPublisher,
21{
22    repository: Arc<R>,
23    event_publisher: Arc<P>,
24}
25
26impl<R, P> SessionCommandHandler<R, P>
27where
28    R: StreamRepository,
29    P: EventPublisher,
30{
31    pub fn new(repository: Arc<R>, event_publisher: Arc<P>) -> Self {
32        Self {
33            repository,
34            event_publisher,
35        }
36    }
37}
38
39#[async_trait]
40impl<R, P> CommandHandler<CreateSessionCommand, SessionId> for SessionCommandHandler<R, P>
41where
42    R: StreamRepository + Send + Sync,
43    P: EventPublisher + Send + Sync,
44{
45    async fn handle(&self, command: CreateSessionCommand) -> ApplicationResult<SessionId> {
46        // Create new session
47        let mut session = StreamSession::new(command.config);
48
49        // Set client information
50        if let (Some(client_info), user_agent, ip_address) =
51            (command.client_info, command.user_agent, command.ip_address)
52        {
53            session.set_client_info(client_info, user_agent, ip_address);
54        }
55
56        // Activate session
57        session.activate().map_err(ApplicationError::Domain)?;
58
59        let session_id = session.id();
60
61        // Save to repository
62        self.repository
63            .save_session(session.clone())
64            .await
65            .map_err(ApplicationError::Domain)?;
66
67        // Publish events
68        let events = session.take_events();
69        for event in events {
70            self.event_publisher
71                .publish(event)
72                .await
73                .map_err(ApplicationError::Domain)?;
74        }
75
76        Ok(session_id)
77    }
78}
79
80#[async_trait]
81impl<R, P> CommandHandler<CreateStreamCommand, StreamId> for SessionCommandHandler<R, P>
82where
83    R: StreamRepository + Send + Sync,
84    P: EventPublisher + Send + Sync,
85{
86    async fn handle(&self, command: CreateStreamCommand) -> ApplicationResult<StreamId> {
87        // Load session
88        let mut session = self
89            .repository
90            .find_session(command.session_id.into())
91            .await
92            .map_err(ApplicationError::Domain)?
93            .ok_or_else(|| {
94                ApplicationError::NotFound(format!("Session {} not found", command.session_id))
95            })?;
96
97        // Create stream in session
98        let stream_id = session
99            .create_stream(command.source_data)
100            .map_err(ApplicationError::Domain)?;
101
102        // Update stream configuration if provided
103        if let Some(config) = command.config
104            && let Some(stream) = session.get_stream_mut(stream_id) {
105                stream
106                    .update_config(config)
107                    .map_err(ApplicationError::Domain)?;
108            }
109
110        // Save updated session
111        self.repository
112            .save_session(session.clone())
113            .await
114            .map_err(ApplicationError::Domain)?;
115
116        // Publish events
117        let events = session.take_events();
118        for event in events {
119            self.event_publisher
120                .publish(event)
121                .await
122                .map_err(ApplicationError::Domain)?;
123        }
124
125        Ok(stream_id)
126    }
127}
128
129#[async_trait]
130impl<R, P> CommandHandler<StartStreamCommand, ()> for SessionCommandHandler<R, P>
131where
132    R: StreamRepository + Send + Sync,
133    P: EventPublisher + Send + Sync,
134{
135    async fn handle(&self, command: StartStreamCommand) -> ApplicationResult<()> {
136        // Load session
137        let mut session = self
138            .repository
139            .find_session(command.session_id.into())
140            .await
141            .map_err(ApplicationError::Domain)?
142            .ok_or_else(|| {
143                ApplicationError::NotFound(format!("Session {} not found", command.session_id))
144            })?;
145
146        // Start stream
147        session
148            .start_stream(command.stream_id.into())
149            .map_err(ApplicationError::Domain)?;
150
151        // Save updated session
152        self.repository
153            .save_session(session.clone())
154            .await
155            .map_err(ApplicationError::Domain)?;
156
157        // Publish events
158        let events = session.take_events();
159        for event in events {
160            self.event_publisher
161                .publish(event)
162                .await
163                .map_err(ApplicationError::Domain)?;
164        }
165
166        Ok(())
167    }
168}
169
170#[async_trait]
171impl<R, P> CommandHandler<CompleteStreamCommand, ()> for SessionCommandHandler<R, P>
172where
173    R: StreamRepository + Send + Sync,
174    P: EventPublisher + Send + Sync,
175{
176    async fn handle(&self, command: CompleteStreamCommand) -> ApplicationResult<()> {
177        // Load session
178        let mut session = self
179            .repository
180            .find_session(command.session_id.into())
181            .await
182            .map_err(ApplicationError::Domain)?
183            .ok_or_else(|| {
184                ApplicationError::NotFound(format!("Session {} not found", command.session_id))
185            })?;
186
187        // Complete stream
188        session
189            .complete_stream(command.stream_id.into())
190            .map_err(ApplicationError::Domain)?;
191
192        // Save updated session
193        self.repository
194            .save_session(session.clone())
195            .await
196            .map_err(ApplicationError::Domain)?;
197
198        // Publish events
199        let events = session.take_events();
200        for event in events {
201            self.event_publisher
202                .publish(event)
203                .await
204                .map_err(ApplicationError::Domain)?;
205        }
206
207        Ok(())
208    }
209}
210
211#[async_trait]
212impl<R, P> CommandHandler<GenerateFramesCommand, Vec<Frame>> for SessionCommandHandler<R, P>
213where
214    R: StreamRepository + Send + Sync,
215    P: EventPublisher + Send + Sync,
216{
217    async fn handle(&self, command: GenerateFramesCommand) -> ApplicationResult<Vec<Frame>> {
218        // Load session
219        let mut session = self
220            .repository
221            .find_session(command.session_id.into())
222            .await
223            .map_err(ApplicationError::Domain)?
224            .ok_or_else(|| {
225                ApplicationError::NotFound(format!("Session {} not found", command.session_id))
226            })?;
227
228        // Get stream
229        let stream = session.get_stream_mut(command.stream_id.into()).ok_or_else(|| {
230            ApplicationError::NotFound(format!("Stream {} not found", command.stream_id))
231        })?;
232
233        // Generate frames
234        let priority = command.priority_threshold.try_into()
235            .map_err(ApplicationError::Domain)?;
236        let frames = stream
237            .create_patch_frames(priority, command.max_frames)
238            .map_err(ApplicationError::Domain)?;
239
240        // Save updated session
241        self.repository
242            .save_session(session.clone())
243            .await
244            .map_err(ApplicationError::Domain)?;
245
246        // Publish events
247        let events = session.take_events();
248        for event in events {
249            self.event_publisher
250                .publish(event)
251                .await
252                .map_err(ApplicationError::Domain)?;
253        }
254
255        Ok(frames)
256    }
257}
258
259#[async_trait]
260impl<R, P> CommandHandler<BatchGenerateFramesCommand, Vec<Frame>> for SessionCommandHandler<R, P>
261where
262    R: StreamRepository + Send + Sync,
263    P: EventPublisher + Send + Sync,
264{
265    async fn handle(&self, command: BatchGenerateFramesCommand) -> ApplicationResult<Vec<Frame>> {
266        // Load session
267        let mut session = self
268            .repository
269            .find_session(command.session_id.into())
270            .await
271            .map_err(ApplicationError::Domain)?
272            .ok_or_else(|| {
273                ApplicationError::NotFound(format!("Session {} not found", command.session_id))
274            })?;
275
276        // Generate priority frames across all streams
277        let frames = session
278            .create_priority_frames(command.max_frames)
279            .map_err(ApplicationError::Domain)?;
280
281        // Save updated session
282        self.repository
283            .save_session(session.clone())
284            .await
285            .map_err(ApplicationError::Domain)?;
286
287        // Publish events
288        let events = session.take_events();
289        for event in events {
290            self.event_publisher
291                .publish(event)
292                .await
293                .map_err(ApplicationError::Domain)?;
294        }
295
296        Ok(frames)
297    }
298}
299
300#[async_trait]
301impl<R, P> CommandHandler<CloseSessionCommand, ()> for SessionCommandHandler<R, P>
302where
303    R: StreamRepository + Send + Sync,
304    P: EventPublisher + Send + Sync,
305{
306    async fn handle(&self, command: CloseSessionCommand) -> ApplicationResult<()> {
307        // Load session
308        let mut session = self
309            .repository
310            .find_session(command.session_id.into())
311            .await
312            .map_err(ApplicationError::Domain)?
313            .ok_or_else(|| {
314                ApplicationError::NotFound(format!("Session {} not found", command.session_id))
315            })?;
316
317        // Close session
318        session.close().map_err(ApplicationError::Domain)?;
319
320        // Save updated session
321        self.repository
322            .save_session(session.clone())
323            .await
324            .map_err(ApplicationError::Domain)?;
325
326        // Publish events
327        let events = session.take_events();
328        for event in events {
329            self.event_publisher
330                .publish(event)
331                .await
332                .map_err(ApplicationError::Domain)?;
333        }
334
335        Ok(())
336    }
337}
338
339/// Validation helper for commands
340pub struct CommandValidator;
341
342impl CommandValidator {
343    /// Validate CreateSessionCommand
344    pub fn validate_create_session(command: &CreateSessionCommand) -> Result<(), Vec<String>> {
345        let mut errors = Vec::new();
346
347        if command.config.max_concurrent_streams == 0 {
348            errors.push("max_concurrent_streams must be greater than 0".to_string());
349        }
350
351        if command.config.session_timeout_seconds == 0 {
352            errors.push("session_timeout_seconds must be greater than 0".to_string());
353        }
354
355        if errors.is_empty() {
356            Ok(())
357        } else {
358            Err(errors)
359        }
360    }
361
362    /// Validate CreateStreamCommand
363    pub fn validate_create_stream(command: &CreateStreamCommand) -> Result<(), Vec<String>> {
364        let mut errors = Vec::new();
365
366        if command.source_data.is_null() {
367            errors.push("source_data cannot be null".to_string());
368        }
369
370        if errors.is_empty() {
371            Ok(())
372        } else {
373            Err(errors)
374        }
375    }
376
377    /// Validate GenerateFramesCommand
378    pub fn validate_generate_frames(command: &GenerateFramesCommand) -> Result<(), Vec<String>> {
379        let mut errors = Vec::new();
380
381        if command.max_frames == 0 {
382            errors.push("max_frames must be greater than 0".to_string());
383        }
384
385        if command.max_frames > 1000 {
386            errors.push("max_frames cannot exceed 1000".to_string());
387        }
388
389        if errors.is_empty() {
390            Ok(())
391        } else {
392            Err(errors)
393        }
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use crate::domain::{
401        aggregates::stream_session::SessionConfig,
402        events::DomainEvent,
403        ports::{EventPublisher, StreamRepository},
404    };
405    use std::collections::HashMap;
406
407    // Mock implementations for testing
408    struct MockRepository {
409        sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
410    }
411
412    impl MockRepository {
413        fn new() -> Self {
414            Self {
415                sessions: std::sync::Mutex::new(HashMap::new()),
416            }
417        }
418    }
419
420    #[async_trait]
421    impl StreamRepository for MockRepository {
422        async fn find_session(
423            &self,
424            session_id: SessionId,
425        ) -> crate::domain::DomainResult<Option<StreamSession>> {
426            // TODO: Handle unwrap() - add proper error handling for mutex poisoning
427            Ok(self.sessions.lock().unwrap().get(&session_id).cloned())
428        }
429
430        async fn save_session(&self, session: StreamSession) -> crate::domain::DomainResult<()> {
431            // TODO: Handle unwrap() - add proper error handling for mutex poisoning
432            self.sessions.lock().unwrap().insert(session.id(), session);
433            Ok(())
434        }
435
436        async fn remove_session(&self, session_id: SessionId) -> crate::domain::DomainResult<()> {
437            // TODO: Handle unwrap() - add proper error handling for mutex poisoning
438            self.sessions.lock().unwrap().remove(&session_id);
439            Ok(())
440        }
441
442        async fn find_active_sessions(&self) -> crate::domain::DomainResult<Vec<StreamSession>> {
443            // TODO: Handle unwrap() - add proper error handling for mutex poisoning
444            Ok(self.sessions.lock().unwrap().values().cloned().collect())
445        }
446    }
447
448    struct MockEventPublisher;
449
450    #[async_trait]
451    impl EventPublisher for MockEventPublisher {
452        async fn publish(&self, _event: DomainEvent) -> crate::domain::DomainResult<()> {
453            Ok(())
454        }
455
456        async fn publish_batch(
457            &self,
458            _events: Vec<DomainEvent>,
459        ) -> crate::domain::DomainResult<()> {
460            Ok(())
461        }
462    }
463
464    #[tokio::test]
465    async fn test_create_session_command() {
466        let repository = Arc::new(MockRepository::new());
467        let event_publisher = Arc::new(MockEventPublisher);
468        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
469
470        let command = CreateSessionCommand {
471            config: SessionConfig::default(),
472            client_info: Some("test-client".to_string()),
473            user_agent: None,
474            ip_address: None,
475        };
476
477        let result = handler.handle(command).await;
478        assert!(result.is_ok());
479
480        let session_id = result.unwrap();
481
482        // Verify session was saved
483        let saved_session = repository.find_session(session_id).await.unwrap();
484        assert!(saved_session.is_some());
485    }
486
487    #[tokio::test]
488    async fn test_session_command_handler_creation() {
489        let repository = Arc::new(MockRepository::new());
490        let event_publisher = Arc::new(MockEventPublisher);
491        let handler = SessionCommandHandler::new(repository.clone(), event_publisher.clone());
492
493        assert!(std::ptr::eq(handler.repository.as_ref(), repository.as_ref()));
494        assert!(std::ptr::eq(handler.event_publisher.as_ref(), event_publisher.as_ref()));
495    }
496
497    #[tokio::test]
498    async fn test_create_session_with_full_client_info() {
499        let repository = Arc::new(MockRepository::new());
500        let event_publisher = Arc::new(MockEventPublisher);
501        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
502
503        let command = CreateSessionCommand {
504            config: SessionConfig::default(),
505            client_info: Some("test-client".to_string()),
506            user_agent: Some("Mozilla/5.0".to_string()),
507            ip_address: Some("192.168.1.1".to_string()),
508        };
509
510        let result = handler.handle(command).await;
511        assert!(result.is_ok());
512
513        let session_id = result.unwrap();
514        let saved_session = repository.find_session(session_id).await.unwrap();
515        assert!(saved_session.is_some());
516    }
517
518    #[tokio::test]
519    async fn test_create_session_without_client_info() {
520        let repository = Arc::new(MockRepository::new());
521        let event_publisher = Arc::new(MockEventPublisher);
522        let handler = SessionCommandHandler::new(repository, event_publisher);
523
524        let command = CreateSessionCommand {
525            config: SessionConfig::default(),
526            client_info: None,
527            user_agent: None,
528            ip_address: None,
529        };
530
531        let result = handler.handle(command).await;
532        assert!(result.is_ok());
533    }
534
535    #[tokio::test]
536    async fn test_create_stream_command() {
537        let repository = Arc::new(MockRepository::new());
538        let event_publisher = Arc::new(MockEventPublisher);
539        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
540
541        // First create a session
542        let create_session_cmd = CreateSessionCommand {
543            config: SessionConfig::default(),
544            client_info: None,
545            user_agent: None,
546            ip_address: None,
547        };
548
549        let session_id = handler.handle(create_session_cmd).await.unwrap();
550
551        // Then create a stream
552        let create_stream_cmd = CreateStreamCommand {
553            session_id: session_id.into(),
554            source_data: serde_json::json!({"test": "data"}),
555            config: None,
556        };
557
558        let result = handler.handle(create_stream_cmd).await;
559        assert!(result.is_ok());
560
561        let stream_id = result.unwrap();
562        assert_ne!(stream_id, StreamId::new()); // Should be a valid stream ID
563    }
564
565    #[tokio::test]
566    async fn test_create_stream_with_config() {
567        let repository = Arc::new(MockRepository::new());
568        let event_publisher = Arc::new(MockEventPublisher);
569        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
570
571        // Create session first
572        let session_id = handler.handle(CreateSessionCommand {
573            config: SessionConfig::default(),
574            client_info: None,
575            user_agent: None,
576            ip_address: None,
577        }).await.unwrap();
578
579        // Create stream with config
580        let stream_config = crate::domain::entities::stream::StreamConfig::default();
581        let create_stream_cmd = CreateStreamCommand {
582            session_id: session_id.into(),
583            source_data: serde_json::json!({"test": "data"}),
584            config: Some(stream_config),
585        };
586
587        let result = handler.handle(create_stream_cmd).await;
588        assert!(result.is_ok());
589    }
590
591    #[tokio::test]
592    async fn test_create_stream_session_not_found() {
593        let repository = Arc::new(MockRepository::new());
594        let event_publisher = Arc::new(MockEventPublisher);
595        let handler = SessionCommandHandler::new(repository, event_publisher);
596
597        let non_existent_session_id = SessionId::new();
598        let create_stream_cmd = CreateStreamCommand {
599            session_id: non_existent_session_id.into(),
600            source_data: serde_json::json!({"test": "data"}),
601            config: None,
602        };
603
604        let result = handler.handle(create_stream_cmd).await;
605        assert!(result.is_err());
606        assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
607    }
608
609    #[tokio::test]
610    async fn test_start_stream_command() {
611        let repository = Arc::new(MockRepository::new());
612        let event_publisher = Arc::new(MockEventPublisher);
613        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
614
615        // Create session and stream first
616        let session_id = handler.handle(CreateSessionCommand {
617            config: SessionConfig::default(),
618            client_info: None,
619            user_agent: None,
620            ip_address: None,
621        }).await.unwrap();
622
623        let stream_id = handler.handle(CreateStreamCommand {
624            session_id: session_id.into(),
625            source_data: serde_json::json!({"test": "data"}),
626            config: None,
627        }).await.unwrap();
628
629        // Start the stream
630        let start_stream_cmd = StartStreamCommand {
631            session_id: session_id.into(),
632            stream_id: stream_id.into(),
633        };
634
635        let result = handler.handle(start_stream_cmd).await;
636        assert!(result.is_ok());
637    }
638
639    #[tokio::test]
640    async fn test_start_stream_session_not_found() {
641        let repository = Arc::new(MockRepository::new());
642        let event_publisher = Arc::new(MockEventPublisher);
643        let handler = SessionCommandHandler::new(repository, event_publisher);
644
645        let start_stream_cmd = StartStreamCommand {
646            session_id: SessionId::new().into(),
647            stream_id: StreamId::new().into(),
648        };
649
650        let result = handler.handle(start_stream_cmd).await;
651        assert!(result.is_err());
652        assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
653    }
654
655    #[tokio::test]
656    async fn test_complete_stream_command() {
657        let repository = Arc::new(MockRepository::new());
658        let event_publisher = Arc::new(MockEventPublisher);
659        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
660
661        // Create session, stream, and start it
662        let session_id = handler.handle(CreateSessionCommand {
663            config: SessionConfig::default(),
664            client_info: None,
665            user_agent: None,
666            ip_address: None,
667        }).await.unwrap();
668
669        let stream_id = handler.handle(CreateStreamCommand {
670            session_id: session_id.into(),
671            source_data: serde_json::json!({"test": "data"}),
672            config: None,
673        }).await.unwrap();
674
675        handler.handle(StartStreamCommand {
676            session_id: session_id.into(),
677            stream_id: stream_id.into(),
678        }).await.unwrap();
679
680        // Complete the stream
681        let complete_stream_cmd = CompleteStreamCommand {
682            session_id: session_id.into(),
683            stream_id: stream_id.into(),
684            checksum: Some("abc123".to_string()),
685        };
686
687        let result = handler.handle(complete_stream_cmd).await;
688        assert!(result.is_ok());
689    }
690
691    #[tokio::test]
692    async fn test_complete_stream_without_checksum() {
693        let repository = Arc::new(MockRepository::new());
694        let event_publisher = Arc::new(MockEventPublisher);
695        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
696
697        // Create and start stream
698        let session_id = handler.handle(CreateSessionCommand {
699            config: SessionConfig::default(),
700            client_info: None,
701            user_agent: None,
702            ip_address: None,
703        }).await.unwrap();
704
705        let stream_id = handler.handle(CreateStreamCommand {
706            session_id: session_id.into(),
707            source_data: serde_json::json!({"test": "data"}),
708            config: None,
709        }).await.unwrap();
710
711        handler.handle(StartStreamCommand {
712            session_id: session_id.into(),
713            stream_id: stream_id.into(),
714        }).await.unwrap();
715
716        // Complete without checksum
717        let complete_stream_cmd = CompleteStreamCommand {
718            session_id: session_id.into(),
719            stream_id: stream_id.into(),
720            checksum: None,
721        };
722
723        let result = handler.handle(complete_stream_cmd).await;
724        assert!(result.is_ok());
725    }
726
727    #[tokio::test]
728    async fn test_close_session_command() {
729        let repository = Arc::new(MockRepository::new());
730        let event_publisher = Arc::new(MockEventPublisher);
731        let handler = SessionCommandHandler::new(repository.clone(), event_publisher);
732
733        // Create session first
734        let session_id = handler.handle(CreateSessionCommand {
735            config: SessionConfig::default(),
736            client_info: None,
737            user_agent: None,
738            ip_address: None,
739        }).await.unwrap();
740
741        // Close the session
742        let close_session_cmd = CloseSessionCommand {
743            session_id: session_id.into(),
744        };
745
746        let result = handler.handle(close_session_cmd).await;
747        assert!(result.is_ok());
748    }
749
750    #[tokio::test]
751    async fn test_close_session_not_found() {
752        let repository = Arc::new(MockRepository::new());
753        let event_publisher = Arc::new(MockEventPublisher);
754        let handler = SessionCommandHandler::new(repository, event_publisher);
755
756        let close_session_cmd = CloseSessionCommand {
757            session_id: SessionId::new().into(),
758        };
759
760        let result = handler.handle(close_session_cmd).await;
761        assert!(result.is_err());
762        assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
763    }
764}