1use slim_session::CompletionHandle as SlimCompletionHandle;
5use slim_session::session_controller::SessionController;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10use futures_timer::Delay;
11use slim_datapath::api::ProtoSessionType;
12use slim_datapath::messages::Name as SlimName;
13use slim_datapath::messages::utils::{PUBLISH_TO, SlimHeaderFlags, TRUE_VAL};
14use slim_session::SessionConfig as SlimSessionConfig;
15use slim_session::SessionError;
16use slim_session::context::SessionContext as SlimSession;
17
18use crate::message_context::MessageContext;
19use crate::{CompletionHandle, Name, ReceivedMessage, SlimError};
20
21#[derive(Debug, Clone, PartialEq, uniffi::Enum)]
23pub enum SessionType {
24 PointToPoint,
25 Group,
26}
27
28impl From<SessionType> for ProtoSessionType {
29 fn from(session_type: SessionType) -> Self {
30 match session_type {
31 SessionType::PointToPoint => ProtoSessionType::PointToPoint,
32 SessionType::Group => ProtoSessionType::Multicast,
33 }
34 }
35}
36
37impl From<ProtoSessionType> for SessionType {
38 fn from(session_type: ProtoSessionType) -> Self {
39 match session_type {
40 ProtoSessionType::PointToPoint => SessionType::PointToPoint,
41 ProtoSessionType::Multicast => SessionType::Group,
42 ProtoSessionType::Unspecified => SessionType::PointToPoint, }
44 }
45}
46
47#[derive(uniffi::Record)]
49pub struct SessionConfig {
50 pub session_type: SessionType,
52
53 pub enable_mls: bool,
55
56 pub max_retries: Option<u32>,
58
59 pub interval: Option<std::time::Duration>,
61
62 pub metadata: std::collections::HashMap<String, String>,
64}
65
66impl From<SessionConfig> for SlimSessionConfig {
67 fn from(config: SessionConfig) -> Self {
68 SlimSessionConfig {
69 session_type: config.session_type.into(),
70 max_retries: config.max_retries,
71 interval: config.interval,
72 mls_enabled: config.enable_mls,
73 initiator: true,
74 metadata: config.metadata,
75 }
76 }
77}
78
79impl From<SlimSessionConfig> for SessionConfig {
80 fn from(config: SlimSessionConfig) -> Self {
81 SessionConfig {
82 session_type: config.session_type.into(),
83 enable_mls: config.mls_enabled,
84 max_retries: config.max_retries,
85 interval: config.interval,
86 metadata: config.metadata,
87 }
88 }
89}
90
91#[derive(uniffi::Object)]
96pub struct Session {
97 pub session: std::sync::Weak<SessionController>,
99 pub rx: RwLock<slim_session::AppChannelReceiver>,
101}
102
103impl Session {
104 pub fn new(ctx: SlimSession) -> Self {
106 let (session, rx) = ctx.into_parts();
107 Self {
108 session,
109 rx: RwLock::new(rx),
110 }
111 }
112
113 pub fn runtime(&self) -> &'static tokio::runtime::Runtime {
115 crate::config::get_runtime()
116 }
117}
118
119impl Session {
124 pub async fn publish_internal(
129 &self,
130 name: &SlimName,
131 fanout: u32,
132 blob: Vec<u8>,
133 conn_out: Option<u64>,
134 payload_type: Option<String>,
135 metadata: Option<HashMap<String, String>>,
136 ) -> Result<SlimCompletionHandle, SessionError> {
137 let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
138
139 let flags = SlimHeaderFlags::new(fanout, None, conn_out, None, None);
140
141 let ret = session
142 .publish_with_flags(name, flags, blob, payload_type, metadata)
143 .await?;
144
145 Ok(ret)
146 }
147
148 pub async fn publish_to_internal(
153 &self,
154 message_ctx: &MessageContext,
155 blob: Vec<u8>,
156 payload_type: Option<String>,
157 metadata: Option<HashMap<String, String>>,
158 ) -> Result<SlimCompletionHandle, SessionError> {
159 let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
160
161 let flags = SlimHeaderFlags::new(
162 1, None,
164 Some(message_ctx.input_connection), None,
166 None,
167 );
168
169 let mut final_metadata = metadata.unwrap_or_default();
170 final_metadata.insert(PUBLISH_TO.to_string(), TRUE_VAL.to_string());
171
172 let source_name = message_ctx.source_as_slim_name();
174
175 let ret = session
176 .publish_with_flags(
177 &source_name, flags,
179 blob,
180 payload_type,
181 Some(final_metadata),
182 )
183 .await?;
184
185 Ok(ret)
186 }
187
188 pub async fn invite_internal(
193 &self,
194 destination: &SlimName,
195 ) -> Result<SlimCompletionHandle, SessionError> {
196 let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
197
198 session.invite_participant(destination).await
199 }
200
201 pub async fn remove_internal(
206 &self,
207 destination: &SlimName,
208 ) -> Result<SlimCompletionHandle, SessionError> {
209 let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
210
211 session.remove_participant(destination).await
212 }
213
214 pub async fn get_session_message(
216 &self,
217 timeout: Option<std::time::Duration>,
218 ) -> Result<(MessageContext, Vec<u8>), SessionError> {
219 let mut rx = self.rx.write().await;
220
221 let recv_future = async {
222 let msg = rx.recv().await.ok_or(SessionError::SessionClosed)??;
223
224 MessageContext::from_proto_message(msg)
225 };
226
227 if let Some(timeout_duration) = timeout {
228 futures::pin_mut!(recv_future);
230 let delay = Delay::new(timeout_duration);
231 futures::pin_mut!(delay);
232
233 match futures::future::select(recv_future, delay).await {
234 futures::future::Either::Left((result, _)) => result,
235 futures::future::Either::Right(_) => Err(SessionError::ReceiveTimeout),
236 }
237 } else {
238 recv_future.await
239 }
240 }
241}
242
243#[uniffi::export]
248impl Session {
249 pub fn publish(
268 &self,
269 data: Vec<u8>,
270 payload_type: Option<String>,
271 metadata: Option<HashMap<String, String>>,
272 ) -> Result<Arc<CompletionHandle>, SlimError> {
273 crate::config::get_runtime()
274 .block_on(async { self.publish_async(data, payload_type, metadata).await })
275 }
276
277 pub async fn publish_async(
281 &self,
282 data: Vec<u8>,
283 payload_type: Option<String>,
284 metadata: Option<HashMap<String, String>>,
285 ) -> Result<Arc<CompletionHandle>, SlimError> {
286 let session = self
287 .session
288 .upgrade()
289 .ok_or_else(|| SlimError::SessionError {
290 message: "Session already closed or dropped".to_string(),
291 })?;
292
293 let destination = session.dst();
294
295 let completion = self
296 .publish_internal(
297 destination,
298 1, data,
300 None, payload_type,
302 metadata,
303 )
304 .await?;
305
306 Ok(Arc::new(CompletionHandle::from(completion)))
307 }
308
309 pub fn publish_and_wait(
313 &self,
314 data: Vec<u8>,
315 payload_type: Option<String>,
316 metadata: Option<HashMap<String, String>>,
317 ) -> Result<(), SlimError> {
318 crate::config::get_runtime().block_on(async {
319 self.publish_and_wait_async(data, payload_type, metadata)
320 .await
321 })
322 }
323
324 pub async fn publish_and_wait_async(
328 &self,
329 data: Vec<u8>,
330 payload_type: Option<String>,
331 metadata: Option<HashMap<String, String>>,
332 ) -> Result<(), SlimError> {
333 let completion_handle = self.publish_async(data, payload_type, metadata).await?;
334 completion_handle.wait_async().await
335 }
336
337 pub fn publish_to(
355 &self,
356 message_context: MessageContext,
357 data: Vec<u8>,
358 payload_type: Option<String>,
359 metadata: Option<HashMap<String, String>>,
360 ) -> Result<Arc<CompletionHandle>, SlimError> {
361 crate::config::get_runtime().block_on(async {
362 self.publish_to_async(message_context, data, payload_type, metadata)
363 .await
364 })
365 }
366
367 pub async fn publish_to_async(
371 &self,
372 message_context: MessageContext,
373 data: Vec<u8>,
374 payload_type: Option<String>,
375 metadata: Option<HashMap<String, String>>,
376 ) -> Result<Arc<CompletionHandle>, SlimError> {
377 let completion = self
378 .publish_to_internal(&message_context, data, payload_type, metadata)
379 .await?;
380
381 Ok(Arc::new(CompletionHandle::from(completion)))
382 }
383
384 pub fn publish_to_and_wait(
388 &self,
389 message_context: MessageContext,
390 data: Vec<u8>,
391 payload_type: Option<String>,
392 metadata: Option<HashMap<String, String>>,
393 ) -> Result<(), SlimError> {
394 crate::config::get_runtime().block_on(async {
395 self.publish_to_and_wait_async(message_context, data, payload_type, metadata)
396 .await
397 })
398 }
399
400 pub async fn publish_to_and_wait_async(
404 &self,
405 message_context: MessageContext,
406 data: Vec<u8>,
407 payload_type: Option<String>,
408 metadata: Option<HashMap<String, String>>,
409 ) -> Result<(), SlimError> {
410 let completion_handle = self
411 .publish_to_async(message_context, data, payload_type, metadata)
412 .await?;
413 completion_handle.wait_async().await
414 }
415
416 pub fn publish_with_params(
429 &self,
430 destination: Arc<Name>,
431 fanout: u32,
432 data: Vec<u8>,
433 connection_out: Option<u64>,
434 payload_type: Option<String>,
435 metadata: Option<HashMap<String, String>>,
436 ) -> Result<(), SlimError> {
437 crate::config::get_runtime().block_on(async {
438 self.publish_with_params_async(
439 destination,
440 fanout,
441 data,
442 connection_out,
443 payload_type,
444 metadata,
445 )
446 .await
447 })
448 }
449
450 pub async fn publish_with_params_async(
452 &self,
453 destination: Arc<Name>,
454 fanout: u32,
455 data: Vec<u8>,
456 connection_out: Option<u64>,
457 payload_type: Option<String>,
458 metadata: Option<HashMap<String, String>>,
459 ) -> Result<(), SlimError> {
460 let slim_dest: SlimName = destination.as_ref().into();
461
462 self.publish_internal(
463 &slim_dest,
464 fanout,
465 data,
466 connection_out,
467 payload_type,
468 metadata,
469 )
470 .await
471 .map(|_| ())?;
472
473 Ok(())
474 }
475
476 pub fn get_message(
485 &self,
486 timeout: Option<std::time::Duration>,
487 ) -> Result<ReceivedMessage, SlimError> {
488 crate::config::get_runtime().block_on(async { self.get_message_async(timeout).await })
489 }
490
491 pub async fn get_message_async(
493 &self,
494 timeout: Option<std::time::Duration>,
495 ) -> Result<ReceivedMessage, SlimError> {
496 let (ctx, payload) = self.get_session_message(timeout).await?;
497
498 Ok(ReceivedMessage {
499 context: ctx,
500 payload,
501 })
502 }
503
504 pub fn invite(&self, participant: Arc<Name>) -> Result<Arc<CompletionHandle>, SlimError> {
508 crate::config::get_runtime().block_on(async { self.invite_async(participant).await })
509 }
510
511 pub async fn invite_async(
515 &self,
516 participant: Arc<Name>,
517 ) -> Result<Arc<CompletionHandle>, SlimError> {
518 let slim_name: SlimName = participant.as_ref().into();
519
520 let completion = self.invite_internal(&slim_name).await?;
521
522 Ok(Arc::new(CompletionHandle::from(completion)))
524 }
525
526 pub fn invite_and_wait(&self, participant: Arc<Name>) -> Result<(), SlimError> {
530 crate::config::get_runtime()
531 .block_on(async { self.invite_and_wait_async(participant).await })
532 }
533
534 pub async fn invite_and_wait_async(&self, participant: Arc<Name>) -> Result<(), SlimError> {
538 let completion_handle = self.invite_async(participant).await?;
539 completion_handle.wait_async().await
540 }
541
542 pub fn remove(&self, participant: Arc<Name>) -> Result<Arc<CompletionHandle>, SlimError> {
546 crate::config::get_runtime().block_on(async { self.remove_async(participant).await })
547 }
548
549 pub async fn remove_async(
553 &self,
554 participant: Arc<Name>,
555 ) -> Result<Arc<CompletionHandle>, SlimError> {
556 let slim_name: SlimName = participant.as_ref().into();
557
558 let completion = self.remove_internal(&slim_name).await?;
559
560 Ok(Arc::new(CompletionHandle::from(completion)))
562 }
563
564 pub fn remove_and_wait(&self, participant: Arc<Name>) -> Result<(), SlimError> {
568 crate::config::get_runtime()
569 .block_on(async { self.remove_and_wait_async(participant).await })
570 }
571
572 pub async fn remove_and_wait_async(&self, participant: Arc<Name>) -> Result<(), SlimError> {
576 let completion_handle = self.remove_async(participant).await?;
577 completion_handle.wait_async().await
578 }
579
580 pub fn destination(&self) -> Result<Name, SlimError> {
582 let session = self
583 .session
584 .upgrade()
585 .ok_or_else(|| SlimError::SessionError {
586 message: "Session already closed or dropped".to_string(),
587 })?;
588
589 Ok(Name::from(session.dst()))
590 }
591
592 pub fn source(&self) -> Result<Name, SlimError> {
594 let session = self
595 .session
596 .upgrade()
597 .ok_or_else(|| SlimError::SessionError {
598 message: "Session already closed or dropped".to_string(),
599 })?;
600
601 Ok(Name::from(session.source()))
602 }
603
604 pub fn session_id(&self) -> Result<u32, SlimError> {
606 let session = self
607 .session
608 .upgrade()
609 .ok_or_else(|| SlimError::SessionError {
610 message: "Session already closed or dropped".to_string(),
611 })?;
612
613 Ok(session.id())
614 }
615
616 pub fn session_type(&self) -> Result<SessionType, SlimError> {
618 let session = self
619 .session
620 .upgrade()
621 .ok_or_else(|| SlimError::SessionError {
622 message: "Session already closed or dropped".to_string(),
623 })?;
624
625 match session.session_type() {
626 ProtoSessionType::PointToPoint => Ok(SessionType::PointToPoint),
627 ProtoSessionType::Multicast => Ok(SessionType::Group),
628 ProtoSessionType::Unspecified => Err(SlimError::InvalidArgument {
629 message: "Session has unspecified type".to_string(),
630 }),
631 }
632 }
633
634 pub fn is_initiator(&self) -> Result<bool, SlimError> {
636 let session = self
637 .session
638 .upgrade()
639 .ok_or_else(|| SlimError::SessionError {
640 message: "Session already closed or dropped".to_string(),
641 })?;
642
643 Ok(session.is_initiator())
644 }
645
646 pub fn metadata(&self) -> Result<HashMap<String, String>, SlimError> {
648 let session = self
649 .session
650 .upgrade()
651 .ok_or_else(|| SlimError::SessionError {
652 message: "Session already closed or dropped".to_string(),
653 })?;
654 Ok(session.metadata())
655 }
656
657 pub fn config(&self) -> Result<SessionConfig, SlimError> {
659 let session = self
660 .session
661 .upgrade()
662 .ok_or_else(|| SlimError::SessionError {
663 message: "Session already closed or dropped".to_string(),
664 })?;
665
666 Ok(session.session_config().into())
667 }
668
669 pub async fn participants_list_async(&self) -> Result<Vec<Arc<Name>>, SlimError> {
671 let session = self
672 .session
673 .upgrade()
674 .ok_or_else(|| SlimError::SessionError {
675 message: "Session already closed or dropped".to_string(),
676 })?;
677
678 let list = session.participants_list().await?;
679 Ok(list.into_iter().map(|n| Arc::new(Name::from(n))).collect())
680 }
681
682 pub fn participants_list(&self) -> Result<Vec<Arc<Name>>, SlimError> {
684 crate::config::get_runtime().block_on(async { self.participants_list_async().await })
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691 use crate::Name as FfiName;
692 use crate::errors::SlimError;
693 use slim_datapath::api::{
694 ApplicationPayload, ProtoMessage, ProtoPublish, ProtoPublishType, SessionHeader, SlimHeader,
695 };
696 use slim_session::SessionError;
697 use std::time::Duration;
698 use tokio::sync::mpsc;
699
700 fn make_slim_name(parts: [&str; 3]) -> SlimName {
702 SlimName::from_strings(parts).with_id(0)
703 }
704
705 fn make_ffi_name(parts: [&str; 3]) -> FfiName {
707 FfiName::new(
708 parts[0].to_string(),
709 parts[1].to_string(),
710 parts[2].to_string(),
711 )
712 }
713
714 fn make_context() -> (
715 Session,
716 mpsc::UnboundedSender<Result<ProtoMessage, SessionError>>,
717 ) {
718 let (tx, rx) = mpsc::unbounded_channel::<Result<ProtoMessage, SessionError>>();
719 let ctx = Session {
720 session: std::sync::Weak::new(),
721 rx: RwLock::new(rx),
722 };
723 (ctx, tx)
724 }
725
726 fn create_test_proto_message(
728 source: SlimName,
729 dest: SlimName,
730 connection_id: u64,
731 payload: Vec<u8>,
732 content_type: &str,
733 metadata: HashMap<String, String>,
734 ) -> ProtoMessage {
735 let content = ApplicationPayload::new(content_type, payload).as_content();
736
737 let mut slim_header = SlimHeader::default();
738 slim_header.set_source(&source);
739 slim_header.set_destination(&dest);
740
741 let publish = ProtoPublish {
742 header: Some(slim_header),
743 session: Some(SessionHeader::default()),
744 msg: Some(content),
745 };
746
747 let mut proto_msg = ProtoMessage {
748 message_type: Some(ProtoPublishType(publish)),
749 metadata,
750 };
751
752 proto_msg.set_incoming_conn(Some(connection_id));
753 proto_msg
754 }
755
756 #[tokio::test]
759 async fn test_get_session_message_success() {
760 let (ctx, tx) = make_context();
761
762 let source = make_slim_name(["org", "sender", "app"]);
763 let dest = make_slim_name(["org", "receiver", "app"]);
764 let payload = b"hello world".to_vec();
765 let mut metadata = HashMap::new();
766 metadata.insert("key".to_string(), "value".to_string());
767
768 let msg = create_test_proto_message(
769 source.clone(),
770 dest,
771 42,
772 payload.clone(),
773 "text/plain",
774 metadata.clone(),
775 );
776
777 tx.send(Ok(msg)).expect("send should succeed");
778
779 let result = ctx
780 .get_session_message(Some(Duration::from_millis(100)))
781 .await;
782 assert!(result.is_ok(), "should receive message successfully");
783
784 let (msg_ctx, received_payload) = result.unwrap();
785 assert_eq!(received_payload, payload);
786 assert_eq!(msg_ctx.payload_type, "text/plain");
787 assert_eq!(msg_ctx.input_connection, 42);
788 assert_eq!(msg_ctx.metadata.get("key"), Some(&"value".to_string()));
789 }
790
791 #[tokio::test]
792 async fn test_get_session_message_timeout() {
793 let (ctx, _tx) = make_context();
794 let result = ctx
795 .get_session_message(Some(Duration::from_millis(50)))
796 .await;
797 assert!(result.is_err());
798 assert!(result.unwrap_err().to_string().contains("timeout"));
799 }
800
801 #[tokio::test]
802 async fn test_get_session_message_channel_closed() {
803 let (ctx, tx) = make_context();
804 drop(tx); let result = ctx
806 .get_session_message(Some(Duration::from_millis(50)))
807 .await;
808 assert!(result.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
809 }
810
811 #[tokio::test]
812 async fn test_get_session_message_decode_error() {
813 let (ctx, tx) = make_context();
814
815 tx.send(Err(SessionError::SlimMessageSendFailed)).unwrap();
817
818 let result = ctx
819 .get_session_message(Some(Duration::from_millis(100)))
820 .await;
821 assert!(result.is_err_and(|e| matches!(e, SessionError::SlimMessageSendFailed)));
822 }
823
824 #[tokio::test]
825 async fn test_get_session_message_no_timeout() {
826 let (ctx, tx) = make_context();
827
828 tokio::spawn(async move {
830 tokio::time::sleep(Duration::from_millis(20)).await;
831 let msg = create_test_proto_message(
832 make_slim_name(["org", "sender", "app"]),
833 make_slim_name(["org", "receiver", "app"]),
834 1,
835 b"delayed".to_vec(),
836 "text/plain",
837 HashMap::new(),
838 );
839 let _ = tx.send(Ok(msg));
840 });
841
842 let result = ctx.get_session_message(None).await;
844 assert!(result.is_ok());
845 let (_, payload) = result.unwrap();
846 assert_eq!(payload, b"delayed".to_vec());
847 }
848
849 #[tokio::test]
850 async fn test_get_session_message_various_timeouts() {
851 let (ctx, _tx) = make_context();
852
853 let start = std::time::Instant::now();
855 let result = ctx
856 .get_session_message(Some(Duration::from_millis(10)))
857 .await;
858 assert!(result.is_err());
859 assert!(start.elapsed() >= Duration::from_millis(10));
860
861 let start = std::time::Instant::now();
863 let result = ctx.get_session_message(Some(Duration::ZERO)).await;
864 assert!(result.is_err());
865 assert!(start.elapsed() < Duration::from_millis(50));
866
867 let start = std::time::Instant::now();
869 let result = ctx
870 .get_session_message(Some(Duration::from_millis(100)))
871 .await;
872 assert!(result.is_err());
873 let elapsed = start.elapsed();
874 assert!(elapsed >= Duration::from_millis(90)); assert!(elapsed < Duration::from_millis(200));
876 }
877
878 #[tokio::test]
879 async fn test_get_session_message_multiple_messages() {
880 let (ctx, tx) = make_context();
881
882 for i in 0..3 {
884 let msg = create_test_proto_message(
885 make_slim_name(["org", "sender", "app"]),
886 make_slim_name(["org", "receiver", "app"]),
887 i as u64,
888 format!("message {}", i).into_bytes(),
889 "text/plain",
890 HashMap::new(),
891 );
892 tx.send(Ok(msg)).expect("send should succeed");
893 }
894
895 for i in 0..3 {
897 let result = ctx
898 .get_session_message(Some(Duration::from_millis(100)))
899 .await;
900 assert!(result.is_ok());
901 let (msg_ctx, payload) = result.unwrap();
902 assert_eq!(payload, format!("message {}", i).into_bytes());
903 assert_eq!(msg_ctx.input_connection, i as u64);
904 }
905 }
906
907 #[tokio::test]
910 async fn test_publish_internal_errors_when_session_missing() {
911 let (ctx, _tx) = make_context();
912 let res = ctx
913 .publish_internal(
914 &make_slim_name(["dest", "app", "v1"]),
915 1,
916 b"payload".to_vec(),
917 None,
918 Some("text/plain".to_string()),
919 None,
920 )
921 .await;
922 assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
923 }
924
925 #[tokio::test]
926 async fn test_publish_internal_with_all_params_errors_when_session_missing() {
927 let (ctx, _tx) = make_context();
928 let mut metadata = HashMap::new();
929 metadata.insert("key".to_string(), "value".to_string());
930
931 let res = ctx
932 .publish_internal(
933 &make_slim_name(["dest", "app", "v1"]),
934 3, b"payload".to_vec(), Some(123), Some("application/json".to_string()), Some(metadata), )
940 .await;
941 assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
942 }
943
944 #[tokio::test]
945 async fn test_publish_to_internal_errors_when_session_missing() {
946 let (ctx, _tx) = make_context();
947 let message_ctx = MessageContext::new(
948 make_ffi_name(["sender", "org", "service"]),
949 Some(make_ffi_name(["receiver", "org", "service"])),
950 "application/json".to_string(),
951 HashMap::new(),
952 42,
953 "unique".to_string(),
954 );
955
956 let res = ctx
957 .publish_to_internal(
958 &message_ctx,
959 b"reply".to_vec(),
960 Some("json".to_string()),
961 None,
962 )
963 .await;
964 assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
965 }
966
967 #[tokio::test]
970 async fn test_invite_internal_errors_when_session_missing() {
971 let (ctx, _tx) = make_context();
972 let res = ctx
973 .invite_internal(&make_slim_name(["org", "peer", "app"]))
974 .await;
975 assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
976 }
977
978 #[tokio::test]
979 async fn test_remove_internal_errors_when_session_missing() {
980 let (ctx, _tx) = make_context();
981 let err = ctx
982 .remove_internal(&make_slim_name(["org", "peer", "app"]))
983 .await;
984 assert!(err.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
985 }
986
987 #[tokio::test]
990 async fn test_bindings_session_context_weak_ref() {
991 let (ctx, _tx) = make_context();
992 assert!(ctx.session.upgrade().is_none());
994 }
995
996 #[tokio::test]
999 async fn test_publish_async_session_missing() {
1000 let (ctx, _tx) = make_context();
1001 let result = ctx.publish_async(b"test".to_vec(), None, None).await;
1002 assert!(result.is_err());
1003 match result.unwrap_err() {
1004 SlimError::SessionError { message } => {
1005 assert!(message.contains("closed") || message.contains("dropped"));
1006 }
1007 _ => panic!("Expected SessionError"),
1008 }
1009 }
1010
1011 #[tokio::test]
1012 async fn test_publish_to_async_session_missing() {
1013 let (ctx, _tx) = make_context();
1014 let message_ctx = MessageContext::new(
1015 make_ffi_name(["sender", "org", "service"]),
1016 Some(make_ffi_name(["receiver", "org", "service"])),
1017 "application/json".to_string(),
1018 HashMap::new(),
1019 42,
1020 "identity".to_string(),
1021 );
1022
1023 let result = ctx
1024 .publish_to_async(message_ctx, b"reply".to_vec(), None, None)
1025 .await;
1026 assert!(result.is_err_and(|e| {
1027 if let SlimError::SessionError { message } = e {
1028 message.contains("closed")
1029 } else {
1030 false
1031 }
1032 }));
1033 }
1034
1035 #[tokio::test]
1036 async fn test_publish_with_params_async_session_missing() {
1037 let (ctx, _tx) = make_context();
1038 let dest = Arc::new(FfiName::new(
1039 "org".to_string(),
1040 "ns".to_string(),
1041 "dest".to_string(),
1042 ));
1043
1044 let result = ctx
1045 .publish_with_params_async(dest, 1, b"test".to_vec(), None, None, None)
1046 .await;
1047 assert!(result.is_err_and(|e| {
1048 if let SlimError::SessionError { message } = e {
1049 message.contains("closed")
1050 } else {
1051 false
1052 }
1053 }));
1054 }
1055
1056 #[tokio::test]
1057 async fn test_publish_with_params_async_with_all_options() {
1058 let (ctx, _tx) = make_context();
1059 let dest = Arc::new(FfiName::new_with_id(
1060 "org".to_string(),
1061 "ns".to_string(),
1062 "dest".to_string(),
1063 123,
1064 ));
1065
1066 let mut metadata = HashMap::new();
1067 metadata.insert("key".to_string(), "value".to_string());
1068
1069 let result = ctx
1070 .publish_with_params_async(
1071 dest,
1072 3, b"test payload".to_vec(),
1074 Some(456), Some("application/json".to_string()), Some(metadata), )
1078 .await;
1079
1080 assert!(result.is_err());
1082 }
1083
1084 #[tokio::test]
1087 async fn test_get_message_async_success() {
1088 let (ctx, tx) = make_context();
1089
1090 let msg = create_test_proto_message(
1091 make_slim_name(["org", "sender", "app"]),
1092 make_slim_name(["org", "receiver", "app"]),
1093 42,
1094 b"hello".to_vec(),
1095 "text/plain",
1096 HashMap::new(),
1097 );
1098
1099 tx.send(Ok(msg)).expect("send should succeed");
1100
1101 let result = ctx
1102 .get_message_async(Some(std::time::Duration::from_millis(100)))
1103 .await;
1104 assert!(result.is_ok());
1105
1106 let received = result.unwrap();
1107 assert_eq!(received.payload, b"hello");
1108 assert_eq!(received.context.payload_type, "text/plain");
1109 assert_eq!(received.context.input_connection, 42);
1110 }
1111
1112 #[tokio::test]
1113 async fn test_get_message_async_timeout() {
1114 let (ctx, _tx) = make_context();
1115
1116 let result = ctx
1117 .get_message_async(Some(std::time::Duration::from_millis(50)))
1118 .await;
1119 assert!(result.is_err_and(|e| {
1120 if let SlimError::SessionError { message } = e {
1121 message.contains("receive timeout")
1122 } else {
1123 false
1124 }
1125 }));
1126 }
1127
1128 #[tokio::test]
1129 async fn test_get_message_async_channel_closed() {
1130 let (ctx, tx) = make_context();
1131 drop(tx);
1132
1133 let result = ctx
1134 .get_message_async(Some(std::time::Duration::from_millis(100)))
1135 .await;
1136 assert!(result.is_err_and(|e| matches!(e, SlimError::SessionError { .. })));
1137 }
1138
1139 #[tokio::test]
1142 async fn test_invite_async_session_missing() {
1143 let (ctx, _tx) = make_context();
1144 let participant = Arc::new(FfiName::new(
1145 "org".to_string(),
1146 "ns".to_string(),
1147 "peer".to_string(),
1148 ));
1149
1150 let result = ctx.invite_async(participant).await;
1151 assert!(result.is_err_and(|e| {
1152 if let SlimError::SessionError { message } = e {
1153 message.contains("closed")
1154 } else {
1155 false
1156 }
1157 }));
1158 }
1159
1160 #[tokio::test]
1161 async fn test_remove_async_session_missing() {
1162 let (ctx, _tx) = make_context();
1163 let participant = Arc::new(FfiName::new(
1164 "org".to_string(),
1165 "ns".to_string(),
1166 "peer".to_string(),
1167 ));
1168
1169 let result = ctx.remove_async(participant).await;
1170 assert!(result.is_err_and(|e| {
1171 if let SlimError::SessionError { message } = e {
1172 message.contains("closed")
1173 } else {
1174 false
1175 }
1176 }));
1177 }
1178
1179 #[tokio::test]
1182 async fn test_destination_session_missing() {
1183 let (ctx, _tx) = make_context();
1184 let result = ctx.destination();
1185 assert!(result.is_err());
1186 match result.unwrap_err() {
1187 SlimError::SessionError { message } => {
1188 assert!(message.contains("closed") || message.contains("dropped"));
1189 }
1190 _ => panic!("Expected SessionError"),
1191 }
1192 }
1193
1194 #[tokio::test]
1195 async fn test_source_session_missing() {
1196 let (ctx, _tx) = make_context();
1197 let result = ctx.source();
1198 assert!(result.is_err());
1199 match result.unwrap_err() {
1200 SlimError::SessionError { message } => {
1201 assert!(message.contains("closed") || message.contains("dropped"));
1202 }
1203 _ => panic!("Expected SessionError"),
1204 }
1205 }
1206
1207 #[tokio::test]
1208 async fn test_session_id_session_missing() {
1209 let (ctx, _tx) = make_context();
1210 let result = ctx.session_id();
1211 assert!(result.is_err());
1212 match result.unwrap_err() {
1213 SlimError::SessionError { message } => {
1214 assert!(message.contains("closed") || message.contains("dropped"));
1215 }
1216 _ => panic!("Expected SessionError"),
1217 }
1218 }
1219
1220 #[tokio::test]
1221 async fn test_session_type_session_missing() {
1222 let (ctx, _tx) = make_context();
1223 let result = ctx.session_type();
1224 assert!(result.is_err());
1225 match result.unwrap_err() {
1226 SlimError::SessionError { message } => {
1227 assert!(message.contains("closed") || message.contains("dropped"));
1228 }
1229 _ => panic!("Expected SessionError"),
1230 }
1231 }
1232
1233 #[tokio::test]
1234 async fn test_is_initiator_session_missing() {
1235 let (ctx, _tx) = make_context();
1236 let result = ctx.is_initiator();
1237 assert!(result.is_err());
1238 match result.unwrap_err() {
1239 SlimError::SessionError { message } => {
1240 assert!(message.contains("closed") || message.contains("dropped"));
1241 }
1242 _ => panic!("Expected SessionError"),
1243 }
1244 }
1245
1246 #[tokio::test]
1247 async fn test_participants_list_session_missing() {
1248 let (ctx, _tx) = make_context();
1249 let result = ctx.participants_list_async().await;
1250 assert!(result.is_err());
1251 match result.unwrap_err() {
1252 SlimError::SessionError { message } => {
1253 assert!(message.contains("closed") || message.contains("dropped"));
1254 }
1255 _ => panic!("Expected SessionError"),
1256 }
1257 }
1258
1259 #[tokio::test]
1262 async fn test_publish_to_internal_adds_publish_to_metadata() {
1263 let (ctx, _tx) = make_context();
1266 let message_ctx = MessageContext::new(
1267 make_ffi_name(["sender", "org", "service"]),
1268 Some(make_ffi_name(["receiver", "org", "service"])),
1269 "application/json".to_string(),
1270 HashMap::new(),
1271 42,
1272 "identity".to_string(),
1273 );
1274
1275 let mut metadata = HashMap::new();
1276 metadata.insert("custom_key".to_string(), "custom_value".to_string());
1277
1278 let result = ctx
1280 .publish_to_internal(
1281 &message_ctx,
1282 b"reply".to_vec(),
1283 Some("text/plain".to_string()),
1284 Some(metadata),
1285 )
1286 .await;
1287
1288 assert!(result.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
1289 }
1290
1291 #[tokio::test]
1294 async fn test_received_message_construction() {
1295 let (ctx, tx) = make_context();
1296
1297 let mut metadata = HashMap::new();
1298 metadata.insert("trace_id".to_string(), "abc123".to_string());
1299 metadata.insert("user".to_string(), "test_user".to_string());
1300
1301 let msg = create_test_proto_message(
1302 make_slim_name(["org", "sender", "service"]),
1303 make_slim_name(["org", "receiver", "service"]),
1304 999,
1305 b"complex payload with special chars: \x00\xFF".to_vec(),
1306 "application/octet-stream",
1307 metadata,
1308 );
1309
1310 tx.send(Ok(msg)).expect("send should succeed");
1311
1312 let result = ctx
1313 .get_message_async(Some(std::time::Duration::from_millis(100)))
1314 .await;
1315 assert!(result.is_ok());
1316
1317 let received = result.unwrap();
1318 assert_eq!(
1319 received.payload,
1320 b"complex payload with special chars: \x00\xFF"
1321 );
1322 assert_eq!(received.context.payload_type, "application/octet-stream");
1323 assert_eq!(received.context.input_connection, 999);
1324 assert_eq!(
1325 received.context.metadata.get("trace_id"),
1326 Some(&"abc123".to_string())
1327 );
1328 assert_eq!(
1329 received.context.metadata.get("user"),
1330 Some(&"test_user".to_string())
1331 );
1332 }
1333
1334 #[tokio::test]
1337 async fn test_message_context_source_as_slim_name() {
1338 let ffi_name = make_ffi_name(["org", "namespace", "app"]);
1339 let message_ctx = MessageContext::new(
1340 ffi_name,
1341 None,
1342 "text/plain".to_string(),
1343 HashMap::new(),
1344 1,
1345 "id".to_string(),
1346 );
1347
1348 let slim_name = message_ctx.source_as_slim_name();
1349 let components = slim_name.components_strings();
1350 assert_eq!(components[0], "org");
1351 assert_eq!(components[1], "namespace");
1352 assert_eq!(components[2], "app");
1353 }
1354
1355 #[tokio::test]
1358 async fn test_get_message_with_empty_payload() {
1359 let (ctx, tx) = make_context();
1360
1361 let msg = create_test_proto_message(
1362 make_slim_name(["org", "sender", "app"]),
1363 make_slim_name(["org", "receiver", "app"]),
1364 1,
1365 vec![], "text/plain",
1367 HashMap::new(),
1368 );
1369
1370 tx.send(Ok(msg)).expect("send should succeed");
1371
1372 let result = ctx
1373 .get_message_async(Some(std::time::Duration::from_millis(100)))
1374 .await;
1375 assert!(result.is_ok());
1376 let received = result.unwrap();
1377 assert!(received.payload.is_empty());
1378 }
1379
1380 #[tokio::test]
1381 async fn test_get_message_with_large_payload() {
1382 let (ctx, tx) = make_context();
1383
1384 let large_payload = vec![0xAB; 1024 * 1024];
1386
1387 let msg = create_test_proto_message(
1388 make_slim_name(["org", "sender", "app"]),
1389 make_slim_name(["org", "receiver", "app"]),
1390 1,
1391 large_payload.clone(),
1392 "application/octet-stream",
1393 HashMap::new(),
1394 );
1395
1396 tx.send(Ok(msg)).expect("send should succeed");
1397
1398 let result = ctx
1399 .get_message_async(Some(std::time::Duration::from_millis(100)))
1400 .await;
1401 assert!(result.is_ok());
1402 let received = result.unwrap();
1403 assert_eq!(received.payload.len(), 1024 * 1024);
1404 assert_eq!(received.payload, large_payload);
1405 }
1406
1407 #[tokio::test]
1408 async fn test_publish_async_with_metadata() {
1409 let (ctx, _tx) = make_context();
1410
1411 let mut metadata = HashMap::new();
1412 metadata.insert("key1".to_string(), "value1".to_string());
1413 metadata.insert("key2".to_string(), "value2".to_string());
1414
1415 let result = ctx
1416 .publish_async(
1417 b"test".to_vec(),
1418 Some("application/json".to_string()),
1419 Some(metadata),
1420 )
1421 .await;
1422
1423 assert!(result.is_err());
1425 }
1426
1427 #[tokio::test]
1428 async fn test_publish_async_with_options() {
1429 let (ctx, _tx) = make_context();
1430
1431 let mut metadata = HashMap::new();
1432 metadata.insert("trace".to_string(), "123".to_string());
1433
1434 let result = ctx
1435 .publish_async(
1436 b"important message".to_vec(),
1437 Some("text/plain".to_string()),
1438 Some(metadata),
1439 )
1440 .await;
1441
1442 assert!(result.is_err());
1444 }
1445
1446 #[test]
1450 fn test_get_message_blocking_with_duration() {
1451 let (ctx, tx) = make_context();
1452
1453 let msg = create_test_proto_message(
1454 make_slim_name(["org", "sender", "app"]),
1455 make_slim_name(["org", "receiver", "app"]),
1456 1,
1457 b"test message".to_vec(),
1458 "text/plain",
1459 HashMap::new(),
1460 );
1461
1462 tx.send(Ok(msg)).expect("send should succeed");
1463
1464 let result = ctx.get_message(Some(std::time::Duration::from_millis(100)));
1466 assert!(result.is_ok());
1467 let received = result.unwrap();
1468 assert_eq!(received.payload, b"test message");
1469 }
1470
1471 #[tokio::test]
1473 async fn test_get_message_async_with_duration_parameter() {
1474 let (ctx, tx) = make_context();
1475
1476 let msg = create_test_proto_message(
1477 make_slim_name(["org", "sender", "app"]),
1478 make_slim_name(["org", "receiver", "app"]),
1479 1,
1480 b"duration test".to_vec(),
1481 "text/plain",
1482 HashMap::new(),
1483 );
1484
1485 tx.send(Ok(msg)).expect("send should succeed");
1486
1487 let result = ctx
1489 .get_message_async(Some(std::time::Duration::from_millis(200)))
1490 .await;
1491 assert!(result.is_ok());
1492 let received = result.unwrap();
1493 assert_eq!(received.payload, b"duration test");
1494 }
1495
1496 #[tokio::test]
1498 async fn test_get_message_with_no_timeout() {
1499 let (ctx, tx) = make_context();
1500
1501 let msg = create_test_proto_message(
1502 make_slim_name(["org", "sender", "app"]),
1503 make_slim_name(["org", "receiver", "app"]),
1504 1,
1505 b"no timeout".to_vec(),
1506 "text/plain",
1507 HashMap::new(),
1508 );
1509
1510 tx.send(Ok(msg)).expect("send should succeed");
1512
1513 let result = ctx.get_message_async(None).await;
1514 assert!(result.is_ok());
1515 let received = result.unwrap();
1516 assert_eq!(received.payload, b"no timeout");
1517 }
1518
1519 #[tokio::test]
1521 async fn test_get_session_message_futures_timer_timeout() {
1522 let (ctx, _tx) = make_context();
1523
1524 let result = ctx
1526 .get_session_message(Some(std::time::Duration::from_millis(50)))
1527 .await;
1528
1529 assert!(result.is_err());
1531 if let Err(e) = result {
1532 assert!(
1533 matches!(e, SessionError::ReceiveTimeout),
1534 "Should be ReceiveTimeout error"
1535 );
1536 }
1537 }
1538
1539 #[tokio::test]
1541 async fn test_get_message_futures_timer_race_condition() {
1542 let (ctx, tx) = make_context();
1543
1544 let tx_clone = tx.clone();
1546 tokio::spawn(async move {
1547 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
1548 let msg = create_test_proto_message(
1549 make_slim_name(["org", "sender", "app"]),
1550 make_slim_name(["org", "receiver", "app"]),
1551 1,
1552 b"delayed message".to_vec(),
1553 "text/plain",
1554 HashMap::new(),
1555 );
1556 let _ = tx_clone.send(Ok(msg));
1557 });
1558
1559 let result = ctx
1561 .get_message_async(Some(std::time::Duration::from_millis(100)))
1562 .await;
1563
1564 assert!(result.is_ok(), "Message should arrive before timeout");
1565 let received = result.unwrap();
1566 assert_eq!(received.payload, b"delayed message");
1567 }
1568
1569 #[test]
1571 fn test_get_message_blocking_timeout() {
1572 let (ctx, _tx) = make_context();
1573
1574 let result = ctx.get_message(Some(std::time::Duration::from_millis(50)));
1576
1577 assert!(result.is_err(), "Should timeout when no message arrives");
1578 }
1579
1580 #[tokio::test]
1582 async fn test_get_message_various_timeout_durations() {
1583 let (ctx, _tx) = make_context();
1585
1586 let result1 = ctx
1588 .get_message_async(Some(std::time::Duration::from_millis(1)))
1589 .await;
1590 assert!(result1.is_err());
1591
1592 let result2 = ctx
1594 .get_message_async(Some(std::time::Duration::from_millis(50)))
1595 .await;
1596 assert!(result2.is_err());
1597
1598 let result3 = ctx
1600 .get_message_async(Some(std::time::Duration::from_millis(100)))
1601 .await;
1602 assert!(result3.is_err());
1603 }
1604
1605 #[test]
1611 fn test_session_config_point_to_point() {
1612 let config = SessionConfig {
1613 session_type: SessionType::PointToPoint,
1614 enable_mls: true,
1615 max_retries: Some(5),
1616 interval: Some(std::time::Duration::from_millis(200)),
1617 metadata: std::collections::HashMap::from([
1618 ("key1".to_string(), "value1".to_string()),
1619 ("key2".to_string(), "value2".to_string()),
1620 ]),
1621 };
1622
1623 let slim_config: SlimSessionConfig = config.into();
1624
1625 assert_eq!(slim_config.session_type, ProtoSessionType::PointToPoint);
1626 assert!(slim_config.mls_enabled);
1627 assert_eq!(slim_config.max_retries, Some(5));
1628 assert_eq!(
1629 slim_config.interval,
1630 Some(std::time::Duration::from_millis(200))
1631 );
1632 assert_eq!(
1633 slim_config.metadata.get("key1"),
1634 Some(&"value1".to_string())
1635 );
1636 }
1637
1638 #[test]
1640 fn test_session_config_group() {
1641 let config = SessionConfig {
1642 session_type: SessionType::Group,
1643 enable_mls: false,
1644 max_retries: None,
1645 interval: None,
1646 metadata: std::collections::HashMap::new(),
1647 };
1648
1649 let slim_config: SlimSessionConfig = config.into();
1650
1651 assert_eq!(slim_config.session_type, ProtoSessionType::Multicast);
1652 assert!(!slim_config.mls_enabled);
1653 assert!(slim_config.max_retries.is_none());
1654 assert!(slim_config.interval.is_none());
1655 }
1656}