1
2use crate::{
5 application::{
6 ApplicationResult,
7 commands::*,
8 handlers::{CommandHandler, QueryHandler},
9 queries::*,
10 },
11 domain::{
12 aggregates::{StreamSession, stream_session::SessionHealth},
13 value_objects::{SessionId, StreamId},
14 },
15};
16use std::sync::Arc;
17
18#[derive(Debug)]
20pub struct SessionService<CH, QH>
21where
22 CH: CommandHandler<CreateSessionCommand, SessionId>
23 + CommandHandler<CreateStreamCommand, StreamId>
24 + CommandHandler<StartStreamCommand, ()>
25 + CommandHandler<CompleteStreamCommand, ()>
26 + CommandHandler<CloseSessionCommand, ()>,
27 QH: QueryHandler<GetSessionQuery, SessionResponse>
28 + QueryHandler<GetSessionHealthQuery, HealthResponse>
29 + QueryHandler<GetActiveSessionsQuery, SessionsResponse>,
30{
31 command_handler: Arc<CH>,
32 query_handler: Arc<QH>,
33}
34
35impl<CH, QH> SessionService<CH, QH>
36where
37 CH: CommandHandler<CreateSessionCommand, SessionId>
38 + CommandHandler<CreateStreamCommand, StreamId>
39 + CommandHandler<StartStreamCommand, ()>
40 + CommandHandler<CompleteStreamCommand, ()>
41 + CommandHandler<CloseSessionCommand, ()>
42 + Send
43 + Sync,
44 QH: QueryHandler<GetSessionQuery, SessionResponse>
45 + QueryHandler<GetSessionHealthQuery, HealthResponse>
46 + QueryHandler<GetActiveSessionsQuery, SessionsResponse>
47 + Send
48 + Sync,
49{
50 pub fn new(command_handler: Arc<CH>, query_handler: Arc<QH>) -> Self {
51 Self {
52 command_handler,
53 query_handler,
54 }
55 }
56
57 pub async fn create_and_activate_session(
59 &self,
60 config: crate::domain::aggregates::stream_session::SessionConfig,
61 client_info: Option<String>,
62 user_agent: Option<String>,
63 ip_address: Option<String>,
64 ) -> ApplicationResult<SessionId> {
65 let create_command = CreateSessionCommand {
66 config,
67 client_info,
68 user_agent,
69 ip_address,
70 };
71
72 self.command_handler.handle(create_command).await
74 }
75
76 pub async fn create_and_start_stream(
78 &self,
79 session_id: SessionId,
80 source_data: serde_json::Value,
81 config: Option<crate::domain::entities::stream::StreamConfig>,
82 ) -> ApplicationResult<StreamId> {
83 let create_command = CreateStreamCommand {
85 session_id: session_id.into(),
86 source_data,
87 config,
88 };
89
90 let stream_id = self.command_handler.handle(create_command).await?;
91
92 let start_command = StartStreamCommand {
94 session_id: session_id.into(),
95 stream_id: stream_id.into(),
96 };
97
98 self.command_handler.handle(start_command).await?;
99
100 Ok(stream_id)
101 }
102
103 pub async fn get_session_with_health(
105 &self,
106 session_id: SessionId,
107 ) -> ApplicationResult<SessionWithHealth> {
108 let session_query = GetSessionQuery { session_id: session_id.into() };
110 let session_response = self.query_handler.handle(session_query).await?;
111
112 let health_query = GetSessionHealthQuery { session_id: session_id.into() };
114 let health_response = self.query_handler.handle(health_query).await?;
115
116 Ok(SessionWithHealth {
117 session: session_response.session,
118 health: health_response.health,
119 })
120 }
121
122 pub async fn complete_stream_and_maybe_close_session(
124 &self,
125 session_id: SessionId,
126 stream_id: StreamId,
127 ) -> ApplicationResult<SessionCompletionResult> {
128 let complete_command = CompleteStreamCommand {
130 session_id: session_id.into(),
131 stream_id: stream_id.into(),
132 checksum: None,
133 };
134
135 self.command_handler.handle(complete_command).await?;
136
137 let session_query = GetSessionQuery { session_id: session_id.into() };
139 let session_response = self.query_handler.handle(session_query).await?;
140
141 let active_streams = session_response
142 .session
143 .streams()
144 .values()
145 .filter(|s| s.is_active())
146 .count();
147
148 let session_closed = if active_streams == 0 {
149 let close_command = CloseSessionCommand { session_id: session_id.into() };
151 self.command_handler.handle(close_command).await?;
152 true
153 } else {
154 false
155 };
156
157 Ok(SessionCompletionResult {
158 stream_id,
159 session_closed,
160 remaining_active_streams: active_streams,
161 })
162 }
163
164 pub async fn get_sessions_overview(
166 &self,
167 limit: Option<usize>,
168 ) -> ApplicationResult<SessionsOverview> {
169 let query = GetActiveSessionsQuery {
170 limit,
171 offset: None,
172 };
173
174 let response = self.query_handler.handle(query).await?;
175
176 let mut total_streams = 0u64;
178 let mut total_frames = 0u64;
179 let mut total_bytes = 0u64;
180 let mut healthy_sessions = 0usize;
181
182 for session in &response.sessions {
183 let stats = session.stats();
184 total_streams += stats.total_streams;
185 total_frames += stats.total_frames;
186 total_bytes += stats.total_bytes;
187
188 if session.health_check().is_healthy {
189 healthy_sessions += 1;
190 }
191 }
192
193 Ok(SessionsOverview {
194 sessions: response.sessions,
195 total_count: response.total_count,
196 healthy_count: healthy_sessions,
197 total_streams,
198 total_frames,
199 total_bytes,
200 })
201 }
202
203 pub async fn graceful_shutdown_session(
205 &self,
206 session_id: SessionId,
207 ) -> ApplicationResult<SessionShutdownResult> {
208 let session_query = GetSessionQuery { session_id: session_id.into() };
210 let session_response = self.query_handler.handle(session_query).await?;
211
212 let active_stream_ids: Vec<StreamId> = session_response
213 .session
214 .streams()
215 .iter()
216 .filter(|(_, stream)| stream.is_active())
217 .map(|(id, _)| *id)
218 .collect();
219
220 let mut completed_streams = Vec::new();
222 let mut failed_streams = Vec::new();
223
224 for stream_id in &active_stream_ids {
225 let complete_command = CompleteStreamCommand {
226 session_id: session_id.into(),
227 stream_id: (*stream_id).into(),
228 checksum: None,
229 };
230
231 match self.command_handler.handle(complete_command).await {
232 Ok(_) => completed_streams.push(*stream_id),
233 Err(_) => failed_streams.push(*stream_id),
234 }
235 }
236
237 let close_command = CloseSessionCommand { session_id: session_id.into() };
239 let session_closed = self.command_handler.handle(close_command).await.is_ok();
240
241 Ok(SessionShutdownResult {
242 session_id,
243 session_closed,
244 completed_streams,
245 failed_streams,
246 })
247 }
248}
249
250#[derive(Debug, Clone)]
252pub struct SessionWithHealth {
253 pub session: StreamSession,
254 pub health: SessionHealth,
255}
256
257#[derive(Debug, Clone)]
259pub struct SessionCompletionResult {
260 pub stream_id: StreamId,
261 pub session_closed: bool,
262 pub remaining_active_streams: usize,
263}
264
265#[derive(Debug, Clone)]
267pub struct SessionsOverview {
268 pub sessions: Vec<StreamSession>,
269 pub total_count: usize,
270 pub healthy_count: usize,
271 pub total_streams: u64,
272 pub total_frames: u64,
273 pub total_bytes: u64,
274}
275
276#[derive(Debug, Clone)]
278pub struct SessionShutdownResult {
279 pub session_id: SessionId,
280 pub session_closed: bool,
281 pub completed_streams: Vec<StreamId>,
282 pub failed_streams: Vec<StreamId>,
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use crate::{
289 application::{ApplicationError, ApplicationResult, dto::priority_dto::FromDto},
290 domain::aggregates::stream_session::SessionConfig,
291 };
292 use async_trait::async_trait;
293 use std::collections::HashMap;
294
295 struct MockCommandHandler {
297 sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
298 }
299
300 impl MockCommandHandler {
301 fn new() -> Self {
302 Self {
303 sessions: std::sync::Mutex::new(HashMap::new()),
304 }
305 }
306 }
307
308 #[async_trait]
309 impl CommandHandler<CreateSessionCommand, SessionId> for MockCommandHandler {
310 async fn handle(&self, command: CreateSessionCommand) -> ApplicationResult<SessionId> {
311 let mut session = StreamSession::new(command.config);
312 let _ = session.activate();
313 let session_id = session.id();
314 self.sessions.lock()
315 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?
316 .insert(session_id, session);
317 Ok(session_id)
318 }
319 }
320
321 #[async_trait]
322 impl CommandHandler<CreateStreamCommand, StreamId> for MockCommandHandler {
323 async fn handle(&self, command: CreateStreamCommand) -> ApplicationResult<StreamId> {
324 let mut sessions = self.sessions.lock()
325 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
326 let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
327 if let Some(session) = sessions.get_mut(&session_id) {
328 let stream_id = session
329 .create_stream(command.source_data)
330 .map_err(ApplicationError::Domain)?;
331 Ok(stream_id)
332 } else {
333 Err(ApplicationError::NotFound("Session not found".to_string()))
334 }
335 }
336 }
337
338 #[async_trait]
339 impl CommandHandler<StartStreamCommand, ()> for MockCommandHandler {
340 async fn handle(&self, command: StartStreamCommand) -> ApplicationResult<()> {
341 let mut sessions = self.sessions.lock()
342 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
343 let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
344 if let Some(session) = sessions.get_mut(&session_id) {
345 let stream_id = StreamId::from_dto(command.stream_id).map_err(ApplicationError::Domain)?;
346 session
347 .start_stream(stream_id)
348 .map_err(ApplicationError::Domain)?;
349 Ok(())
350 } else {
351 Err(ApplicationError::NotFound("Session not found".to_string()))
352 }
353 }
354 }
355
356 #[async_trait]
357 impl CommandHandler<CompleteStreamCommand, ()> for MockCommandHandler {
358 async fn handle(&self, command: CompleteStreamCommand) -> ApplicationResult<()> {
359 let mut sessions = self.sessions.lock()
360 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
361 let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
362 if let Some(session) = sessions.get_mut(&session_id) {
363 let stream_id = StreamId::from_dto(command.stream_id).map_err(ApplicationError::Domain)?;
364 session
365 .complete_stream(stream_id)
366 .map_err(ApplicationError::Domain)?;
367 Ok(())
368 } else {
369 Err(ApplicationError::NotFound("Session not found".to_string()))
370 }
371 }
372 }
373
374 #[async_trait]
375 impl CommandHandler<CloseSessionCommand, ()> for MockCommandHandler {
376 async fn handle(&self, command: CloseSessionCommand) -> ApplicationResult<()> {
377 let mut sessions = self.sessions.lock()
378 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
379 let session_id = SessionId::from_dto(command.session_id).map_err(ApplicationError::Domain)?;
380 if let Some(session) = sessions.get_mut(&session_id) {
381 session.close().map_err(ApplicationError::Domain)?;
382 Ok(())
383 } else {
384 Err(ApplicationError::NotFound("Session not found".to_string()))
385 }
386 }
387 }
388
389 struct MockQueryHandler {
391 sessions: std::sync::Mutex<HashMap<SessionId, StreamSession>>,
392 }
393
394 impl MockQueryHandler {
395 fn new() -> Self {
396 Self {
397 sessions: std::sync::Mutex::new(HashMap::new()),
398 }
399 }
400
401 #[allow(dead_code)]
402 fn sync_sessions(&self, sessions: &HashMap<SessionId, StreamSession>) -> ApplicationResult<()> {
403 *self.sessions.lock()
404 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))? = sessions.clone();
405 Ok(())
406 }
407 }
408
409 #[async_trait]
410 impl QueryHandler<GetSessionQuery, SessionResponse> for MockQueryHandler {
411 async fn handle(&self, query: GetSessionQuery) -> ApplicationResult<SessionResponse> {
412 let sessions = self.sessions.lock()
413 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
414 let session_id = SessionId::from_dto(query.session_id).map_err(ApplicationError::Domain)?;
415 if let Some(session) = sessions.get(&session_id) {
416 Ok(SessionResponse {
417 session: session.clone(),
418 })
419 } else {
420 Err(ApplicationError::NotFound("Session not found".to_string()))
421 }
422 }
423 }
424
425 #[async_trait]
426 impl QueryHandler<GetSessionHealthQuery, HealthResponse> for MockQueryHandler {
427 async fn handle(&self, query: GetSessionHealthQuery) -> ApplicationResult<HealthResponse> {
428 let sessions = self.sessions.lock()
429 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?;
430 let session_id = SessionId::from_dto(query.session_id).map_err(ApplicationError::Domain)?;
431 if let Some(session) = sessions.get(&session_id) {
432 Ok(HealthResponse {
433 health: session.health_check(),
434 })
435 } else {
436 Err(ApplicationError::NotFound("Session not found".to_string()))
437 }
438 }
439 }
440
441 #[async_trait]
442 impl QueryHandler<GetActiveSessionsQuery, SessionsResponse> for MockQueryHandler {
443 async fn handle(
444 &self,
445 query: GetActiveSessionsQuery,
446 ) -> ApplicationResult<SessionsResponse> {
447 let sessions: Vec<_> = self.sessions.lock()
448 .map_err(|_| ApplicationError::Concurrency("Sessions lock poisoned".to_string()))?
449 .values().cloned().collect();
450 let limited_sessions = if let Some(limit) = query.limit {
451 sessions.into_iter().take(limit).collect()
452 } else {
453 sessions
454 };
455
456 Ok(SessionsResponse {
457 sessions: limited_sessions.clone(),
458 total_count: limited_sessions.len(),
459 })
460 }
461 }
462
463 #[tokio::test]
464 async fn test_create_and_activate_session() {
465 let command_handler = Arc::new(MockCommandHandler::new());
466 let query_handler = Arc::new(MockQueryHandler::new());
467 let service = SessionService::new(command_handler, query_handler);
468
469 let result = service
470 .create_and_activate_session(
471 SessionConfig::default(),
472 Some("test-client".to_string()),
473 None,
474 None,
475 )
476 .await;
477
478 assert!(result.is_ok());
479 }
480
481 #[tokio::test]
482 async fn test_session_service_creation() {
483 let command_handler = Arc::new(MockCommandHandler::new());
484 let query_handler = Arc::new(MockQueryHandler::new());
485 let service = SessionService::new(command_handler.clone(), query_handler.clone());
486
487 assert!(std::ptr::eq(service.command_handler.as_ref(), command_handler.as_ref()));
488 assert!(std::ptr::eq(service.query_handler.as_ref(), query_handler.as_ref()));
489 }
490
491 #[tokio::test]
492 async fn test_create_and_start_stream() {
493 let command_handler = Arc::new(MockCommandHandler::new());
494 let query_handler = Arc::new(MockQueryHandler::new());
495 let service = SessionService::new(command_handler, query_handler);
496
497 let session_id = service
499 .create_and_activate_session(
500 SessionConfig::default(),
501 None,
502 None,
503 None,
504 )
505 .await
506 .unwrap();
507
508 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
510 *service.query_handler.sessions.lock().unwrap() = command_sessions;
511
512 let stream_data = serde_json::json!({"test": "data"});
514 let result = service
515 .create_and_start_stream(session_id, stream_data, None)
516 .await;
517
518 assert!(result.is_ok());
519 }
520
521 #[tokio::test]
522 async fn test_get_session_with_health() {
523 let command_handler = Arc::new(MockCommandHandler::new());
524 let query_handler = Arc::new(MockQueryHandler::new());
525 let service = SessionService::new(command_handler, query_handler);
526
527 let session_id = service
529 .create_and_activate_session(
530 SessionConfig::default(),
531 None,
532 None,
533 None,
534 )
535 .await
536 .unwrap();
537
538 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
540 *service.query_handler.sessions.lock().unwrap() = command_sessions;
541
542 let result = service.get_session_with_health(session_id).await;
544
545 assert!(result.is_ok());
546 let session_with_health = result.unwrap();
547 assert_eq!(session_with_health.session.id(), session_id);
548 assert!(session_with_health.health.is_healthy);
549 }
550
551 #[tokio::test]
552 async fn test_get_session_with_health_not_found() {
553 let command_handler = Arc::new(MockCommandHandler::new());
554 let query_handler = Arc::new(MockQueryHandler::new());
555 let service = SessionService::new(command_handler, query_handler);
556
557 let non_existent_session_id = SessionId::new();
558 let result = service.get_session_with_health(non_existent_session_id).await;
559
560 assert!(result.is_err());
561 assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
562 }
563
564 #[tokio::test]
565 async fn test_get_sessions_overview() {
566 let command_handler = Arc::new(MockCommandHandler::new());
567 let query_handler = Arc::new(MockQueryHandler::new());
568 let service = SessionService::new(command_handler, query_handler);
569
570 let mut session_ids = Vec::new();
572 for _ in 0..3 {
573 let session_id = service
574 .create_and_activate_session(
575 SessionConfig::default(),
576 None,
577 None,
578 None,
579 )
580 .await
581 .unwrap();
582 session_ids.push(session_id);
583 }
584
585 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
587 *service.query_handler.sessions.lock().unwrap() = command_sessions;
588
589 let result = service.get_sessions_overview(None).await;
591
592 assert!(result.is_ok());
593 let overview = result.unwrap();
594 assert_eq!(overview.sessions.len(), 3);
595 assert_eq!(overview.total_count, 3);
596 assert_eq!(overview.healthy_count, 3); }
598
599 #[tokio::test]
600 async fn test_get_sessions_overview_with_limit() {
601 let command_handler = Arc::new(MockCommandHandler::new());
602 let query_handler = Arc::new(MockQueryHandler::new());
603 let service = SessionService::new(command_handler, query_handler);
604
605 for _ in 0..5 {
607 let _ = service
608 .create_and_activate_session(
609 SessionConfig::default(),
610 None,
611 None,
612 None,
613 )
614 .await
615 .unwrap();
616 }
617
618 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
620 *service.query_handler.sessions.lock().unwrap() = command_sessions;
621
622 let result = service.get_sessions_overview(Some(2)).await;
624
625 assert!(result.is_ok());
626 let overview = result.unwrap();
627 assert_eq!(overview.sessions.len(), 2);
628 }
629
630 #[tokio::test]
631 async fn test_complete_stream_and_maybe_close_session() {
632 let command_handler = Arc::new(MockCommandHandler::new());
633 let query_handler = Arc::new(MockQueryHandler::new());
634 let service = SessionService::new(command_handler, query_handler);
635
636 let session_id = service
638 .create_and_activate_session(
639 SessionConfig::default(),
640 None,
641 None,
642 None,
643 )
644 .await
645 .unwrap();
646
647 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
649 *service.query_handler.sessions.lock().unwrap() = command_sessions.clone();
650
651 let stream_id = service
652 .create_and_start_stream(session_id, serde_json::json!({"test": "data"}), None)
653 .await
654 .unwrap();
655
656 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
658 *service.query_handler.sessions.lock().unwrap() = command_sessions;
659
660 let result = service
662 .complete_stream_and_maybe_close_session(session_id, stream_id)
663 .await;
664
665 assert!(result.is_ok());
666 let completion_result = result.unwrap();
667 assert_eq!(completion_result.stream_id, stream_id);
668 }
671
672 #[tokio::test]
673 async fn test_graceful_shutdown_session() {
674 let command_handler = Arc::new(MockCommandHandler::new());
675 let query_handler = Arc::new(MockQueryHandler::new());
676 let service = SessionService::new(command_handler, query_handler);
677
678 let session_id = service
680 .create_and_activate_session(
681 SessionConfig::default(),
682 None,
683 None,
684 None,
685 )
686 .await
687 .unwrap();
688
689 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
691 *service.query_handler.sessions.lock().unwrap() = command_sessions.clone();
692
693 let stream1_id = service
695 .create_and_start_stream(session_id, serde_json::json!({"stream": 1}), None)
696 .await
697 .unwrap();
698
699 let stream2_id = service
700 .create_and_start_stream(session_id, serde_json::json!({"stream": 2}), None)
701 .await
702 .unwrap();
703
704 let command_sessions = service.command_handler.sessions.lock().unwrap().clone();
706 *service.query_handler.sessions.lock().unwrap() = command_sessions;
707
708 let result = service.graceful_shutdown_session(session_id).await;
710
711 assert!(result.is_ok());
712 let shutdown_result = result.unwrap();
713 assert_eq!(shutdown_result.session_id, session_id);
714 assert!(shutdown_result.session_closed);
715 assert_eq!(shutdown_result.completed_streams.len(), 2);
716 assert_eq!(shutdown_result.failed_streams.len(), 0);
717 assert!(shutdown_result.completed_streams.contains(&stream1_id));
718 assert!(shutdown_result.completed_streams.contains(&stream2_id));
719 }
720
721 #[tokio::test]
722 async fn test_graceful_shutdown_session_not_found() {
723 let command_handler = Arc::new(MockCommandHandler::new());
724 let query_handler = Arc::new(MockQueryHandler::new());
725 let service = SessionService::new(command_handler, query_handler);
726
727 let non_existent_session_id = SessionId::new();
728 let result = service.graceful_shutdown_session(non_existent_session_id).await;
729
730 assert!(result.is_err());
731 assert!(matches!(result.err().unwrap(), ApplicationError::NotFound(_)));
732 }
733
734 #[tokio::test]
735 async fn test_session_with_health_structure() {
736 let session = StreamSession::new(SessionConfig::default());
737 let health = session.health_check();
738
739 let session_with_health = SessionWithHealth {
740 session: session.clone(),
741 health: health.clone(),
742 };
743
744 assert_eq!(session_with_health.session.id(), session.id());
745 assert_eq!(session_with_health.health.is_healthy, health.is_healthy);
746 }
747
748 #[tokio::test]
749 async fn test_session_completion_result_structure() {
750 let stream_id = StreamId::new();
751 let result = SessionCompletionResult {
752 stream_id,
753 session_closed: true,
754 remaining_active_streams: 0,
755 };
756
757 assert_eq!(result.stream_id, stream_id);
758 assert!(result.session_closed);
759 assert_eq!(result.remaining_active_streams, 0);
760 }
761
762 #[tokio::test]
763 async fn test_sessions_overview_structure() {
764 let sessions = vec![StreamSession::new(SessionConfig::default())];
765 let overview = SessionsOverview {
766 sessions: sessions.clone(),
767 total_count: 1,
768 healthy_count: 1,
769 total_streams: 0,
770 total_frames: 0,
771 total_bytes: 0,
772 };
773
774 assert_eq!(overview.sessions.len(), 1);
775 assert_eq!(overview.total_count, 1);
776 assert_eq!(overview.healthy_count, 1);
777 assert_eq!(overview.total_streams, 0);
778 }
779
780 #[tokio::test]
781 async fn test_session_shutdown_result_structure() {
782 let session_id = SessionId::new();
783 let stream_id = StreamId::new();
784 let result = SessionShutdownResult {
785 session_id,
786 session_closed: true,
787 completed_streams: vec![stream_id],
788 failed_streams: vec![],
789 };
790
791 assert_eq!(result.session_id, session_id);
792 assert!(result.session_closed);
793 assert_eq!(result.completed_streams.len(), 1);
794 assert_eq!(result.failed_streams.len(), 0);
795 }
796}