Skip to main content

slim_bindings/
session.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use slim_session::CompletionHandle as SlimCompletionHandle;
5use slim_session::session_controller::SessionController;
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10use futures_timer::Delay;
11use slim_datapath::api::ProtoSessionType;
12use slim_datapath::messages::Name as SlimName;
13use slim_datapath::messages::utils::{PUBLISH_TO, SlimHeaderFlags, TRUE_VAL};
14use slim_session::SessionConfig as SlimSessionConfig;
15use slim_session::SessionError;
16use slim_session::context::SessionContext as SlimSession;
17
18use crate::message_context::MessageContext;
19use crate::{CompletionHandle, Name, ReceivedMessage, SlimError};
20
21/// Session type enum
22#[derive(Debug, Clone, PartialEq, uniffi::Enum)]
23pub enum SessionType {
24    PointToPoint,
25    Group,
26}
27
28impl From<SessionType> for ProtoSessionType {
29    fn from(session_type: SessionType) -> Self {
30        match session_type {
31            SessionType::PointToPoint => ProtoSessionType::PointToPoint,
32            SessionType::Group => ProtoSessionType::Multicast,
33        }
34    }
35}
36
37impl From<ProtoSessionType> for SessionType {
38    fn from(session_type: ProtoSessionType) -> Self {
39        match session_type {
40            ProtoSessionType::PointToPoint => SessionType::PointToPoint,
41            ProtoSessionType::Multicast => SessionType::Group,
42            ProtoSessionType::Unspecified => SessionType::PointToPoint, // Default to PointToPoint
43        }
44    }
45}
46
47/// Session configuration
48#[derive(uniffi::Record)]
49pub struct SessionConfig {
50    /// Session type (PointToPoint or Group)
51    pub session_type: SessionType,
52
53    /// Enable MLS encryption for this session
54    pub enable_mls: bool,
55
56    /// Maximum number of retries for message transmission (None = use default)
57    pub max_retries: Option<u32>,
58
59    /// Interval between retries in milliseconds (None = use default)
60    pub interval: Option<std::time::Duration>,
61
62    /// Custom metadata key-value pairs for the session
63    pub metadata: std::collections::HashMap<String, String>,
64}
65
66impl From<SessionConfig> for SlimSessionConfig {
67    fn from(config: SessionConfig) -> Self {
68        SlimSessionConfig {
69            session_type: config.session_type.into(),
70            max_retries: config.max_retries,
71            interval: config.interval,
72            mls_enabled: config.enable_mls,
73            initiator: true,
74            metadata: config.metadata,
75        }
76    }
77}
78
79impl From<SlimSessionConfig> for SessionConfig {
80    fn from(config: SlimSessionConfig) -> Self {
81        SessionConfig {
82            session_type: config.session_type.into(),
83            enable_mls: config.mls_enabled,
84            max_retries: config.max_retries,
85            interval: config.interval,
86            metadata: config.metadata,
87        }
88    }
89}
90
91/// Session context for language bindings (UniFFI-compatible)
92///
93/// Wraps the session context with proper async access patterns for message reception.
94/// Provides both synchronous (blocking) and asynchronous methods for FFI compatibility.
95#[derive(uniffi::Object)]
96pub struct Session {
97    /// Weak reference to the underlying session
98    pub session: std::sync::Weak<SessionController>,
99    /// Message receiver wrapped in RwLock for concurrent access
100    pub rx: RwLock<slim_session::AppChannelReceiver>,
101}
102
103impl Session {
104    /// Create a new Session from a Session and runtime
105    pub fn new(ctx: SlimSession) -> Self {
106        let (session, rx) = ctx.into_parts();
107        Self {
108            session,
109            rx: RwLock::new(rx),
110        }
111    }
112
113    /// Get the runtime (for internal use)
114    pub fn runtime(&self) -> &'static tokio::runtime::Runtime {
115        crate::config::get_runtime()
116    }
117}
118
119// ============================================================================
120// Internal async methods (used by both FFI and Python bindings)
121// ============================================================================
122
123impl Session {
124    /// Publish a message through this session (internal API for language bindings)
125    ///
126    /// This is the low-level publish method that takes SlimName directly.
127    /// Use `publish()` or `publish_with_params()` for FFI-compatible APIs.
128    pub async fn publish_internal(
129        &self,
130        name: &SlimName,
131        fanout: u32,
132        blob: Vec<u8>,
133        conn_out: Option<u64>,
134        payload_type: Option<String>,
135        metadata: Option<HashMap<String, String>>,
136    ) -> Result<SlimCompletionHandle, SessionError> {
137        let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
138
139        let flags = SlimHeaderFlags::new(fanout, None, conn_out, None, None);
140
141        let ret = session
142            .publish_with_flags(name, flags, blob, payload_type, metadata)
143            .await?;
144
145        Ok(ret)
146    }
147
148    /// Publish a message as a reply (internal API for language bindings)
149    ///
150    /// This is the low-level publish_to method that takes MessageContext reference.
151    /// Use `publish_to()` for FFI-compatible API.
152    pub async fn publish_to_internal(
153        &self,
154        message_ctx: &MessageContext,
155        blob: Vec<u8>,
156        payload_type: Option<String>,
157        metadata: Option<HashMap<String, String>>,
158    ) -> Result<SlimCompletionHandle, SessionError> {
159        let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
160
161        let flags = SlimHeaderFlags::new(
162            1, // fanout = 1 for reply semantics
163            None,
164            Some(message_ctx.input_connection), // reply to the same connection
165            None,
166            None,
167        );
168
169        let mut final_metadata = metadata.unwrap_or_default();
170        final_metadata.insert(PUBLISH_TO.to_string(), TRUE_VAL.to_string());
171
172        // Convert FFI Name to SlimName for the datapath layer
173        let source_name = message_ctx.source_as_slim_name();
174
175        let ret = session
176            .publish_with_flags(
177                &source_name, // reply to the original source
178                flags,
179                blob,
180                payload_type,
181                Some(final_metadata),
182            )
183            .await?;
184
185        Ok(ret)
186    }
187
188    /// Invite a peer to join this session (internal API for language bindings)
189    ///
190    /// This is the low-level invite method that takes SlimName reference.
191    /// Use `invite()` for FFI-compatible API with auto-wait.
192    pub async fn invite_internal(
193        &self,
194        destination: &SlimName,
195    ) -> Result<SlimCompletionHandle, SessionError> {
196        let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
197
198        session.invite_participant(destination).await
199    }
200
201    /// Remove a peer from this session (internal API for language bindings)
202    ///
203    /// This is the low-level remove method that takes SlimName reference.
204    /// Use `remove()` for FFI-compatible API with auto-wait.
205    pub async fn remove_internal(
206        &self,
207        destination: &SlimName,
208    ) -> Result<SlimCompletionHandle, SessionError> {
209        let session = self.session.upgrade().ok_or(SessionError::SessionClosed)?;
210
211        session.remove_participant(destination).await
212    }
213
214    /// Receive a message from this session with optional timeout
215    pub async fn get_session_message(
216        &self,
217        timeout: Option<std::time::Duration>,
218    ) -> Result<(MessageContext, Vec<u8>), SessionError> {
219        let mut rx = self.rx.write().await;
220
221        let recv_future = async {
222            let msg = rx.recv().await.ok_or(SessionError::SessionClosed)??;
223
224            MessageContext::from_proto_message(msg)
225        };
226
227        if let Some(timeout_duration) = timeout {
228            // Runtime-agnostic timeout using futures-timer
229            futures::pin_mut!(recv_future);
230            let delay = Delay::new(timeout_duration);
231            futures::pin_mut!(delay);
232
233            match futures::future::select(recv_future, delay).await {
234                futures::future::Either::Left((result, _)) => result,
235                futures::future::Either::Right(_) => Err(SessionError::ReceiveTimeout),
236            }
237        } else {
238            recv_future.await
239        }
240    }
241}
242
243// ============================================================================
244// FFI-exported methods (UniFFI)
245// ============================================================================
246
247#[uniffi::export]
248impl Session {
249    /// Publish a message to the session's destination (blocking version)
250    ///
251    /// Returns a completion handle that can be awaited to ensure the message was delivered.
252    ///
253    /// # Arguments
254    /// * `data` - The message payload bytes
255    /// * `payload_type` - Optional content type identifier
256    /// * `metadata` - Optional key-value metadata pairs
257    ///
258    /// # Returns
259    /// * `Ok(CompletionHandle)` - Handle to await delivery confirmation
260    /// * `Err(SlimError)` - If publishing fails
261    ///
262    /// # Example
263    /// ```ignore
264    /// let completion = session.publish(data, None, None)?;
265    /// completion.wait()?; // Blocks until message is delivered
266    /// ```
267    pub fn publish(
268        &self,
269        data: Vec<u8>,
270        payload_type: Option<String>,
271        metadata: Option<HashMap<String, String>>,
272    ) -> Result<Arc<CompletionHandle>, SlimError> {
273        crate::config::get_runtime()
274            .block_on(async { self.publish_async(data, payload_type, metadata).await })
275    }
276
277    /// Publish a message to the session's destination (async version)
278    ///
279    /// Returns a completion handle that can be awaited to ensure the message was delivered.
280    pub async fn publish_async(
281        &self,
282        data: Vec<u8>,
283        payload_type: Option<String>,
284        metadata: Option<HashMap<String, String>>,
285    ) -> Result<Arc<CompletionHandle>, SlimError> {
286        let session = self
287            .session
288            .upgrade()
289            .ok_or_else(|| SlimError::SessionError {
290                message: "Session already closed or dropped".to_string(),
291            })?;
292
293        let destination = session.dst();
294
295        let completion = self
296            .publish_internal(
297                destination,
298                1, // fanout = 1 for normal publish
299                data,
300                None, // connection_out
301                payload_type,
302                metadata,
303            )
304            .await?;
305
306        Ok(Arc::new(CompletionHandle::from(completion)))
307    }
308
309    /// Publish a message and wait for completion (blocking version)
310    ///
311    /// This method publishes a message and blocks until the delivery completes.
312    pub fn publish_and_wait(
313        &self,
314        data: Vec<u8>,
315        payload_type: Option<String>,
316        metadata: Option<HashMap<String, String>>,
317    ) -> Result<(), SlimError> {
318        crate::config::get_runtime().block_on(async {
319            self.publish_and_wait_async(data, payload_type, metadata)
320                .await
321        })
322    }
323
324    /// Publish a message and wait for completion (async version)
325    ///
326    /// This method publishes a message and waits until the delivery completes.
327    pub async fn publish_and_wait_async(
328        &self,
329        data: Vec<u8>,
330        payload_type: Option<String>,
331        metadata: Option<HashMap<String, String>>,
332    ) -> Result<(), SlimError> {
333        let completion_handle = self.publish_async(data, payload_type, metadata).await?;
334        completion_handle.wait_async().await
335    }
336
337    /// Publish a reply message to the originator of a received message (blocking version for FFI)
338    ///
339    /// This method uses the routing information from a previously received message
340    /// to send a reply back to the sender. This is the preferred way to implement
341    /// request/reply patterns.
342    ///
343    /// Returns a completion handle that can be awaited to ensure the message was delivered.
344    ///
345    /// # Arguments
346    /// * `message_context` - Context from a message received via `get_message()`
347    /// * `data` - The reply payload bytes
348    /// * `payload_type` - Optional content type identifier
349    /// * `metadata` - Optional key-value metadata pairs
350    ///
351    /// # Returns
352    /// * `Ok(CompletionHandle)` - Handle to await delivery confirmation
353    /// * `Err(SlimError)` - If publishing fails
354    pub fn publish_to(
355        &self,
356        message_context: MessageContext,
357        data: Vec<u8>,
358        payload_type: Option<String>,
359        metadata: Option<HashMap<String, String>>,
360    ) -> Result<Arc<CompletionHandle>, SlimError> {
361        crate::config::get_runtime().block_on(async {
362            self.publish_to_async(message_context, data, payload_type, metadata)
363                .await
364        })
365    }
366
367    /// Publish a reply message (async version)
368    ///
369    /// Returns a completion handle that can be awaited to ensure the message was delivered.
370    pub async fn publish_to_async(
371        &self,
372        message_context: MessageContext,
373        data: Vec<u8>,
374        payload_type: Option<String>,
375        metadata: Option<HashMap<String, String>>,
376    ) -> Result<Arc<CompletionHandle>, SlimError> {
377        let completion = self
378            .publish_to_internal(&message_context, data, payload_type, metadata)
379            .await?;
380
381        Ok(Arc::new(CompletionHandle::from(completion)))
382    }
383
384    /// Publish a reply message and wait for completion (blocking version)
385    ///
386    /// This method publishes a reply to a received message and blocks until the delivery completes.
387    pub fn publish_to_and_wait(
388        &self,
389        message_context: MessageContext,
390        data: Vec<u8>,
391        payload_type: Option<String>,
392        metadata: Option<HashMap<String, String>>,
393    ) -> Result<(), SlimError> {
394        crate::config::get_runtime().block_on(async {
395            self.publish_to_and_wait_async(message_context, data, payload_type, metadata)
396                .await
397        })
398    }
399
400    /// Publish a reply message and wait for completion (async version)
401    ///
402    /// This method publishes a reply to a received message and waits until the delivery completes.
403    pub async fn publish_to_and_wait_async(
404        &self,
405        message_context: MessageContext,
406        data: Vec<u8>,
407        payload_type: Option<String>,
408        metadata: Option<HashMap<String, String>>,
409    ) -> Result<(), SlimError> {
410        let completion_handle = self
411            .publish_to_async(message_context, data, payload_type, metadata)
412            .await?;
413        completion_handle.wait_async().await
414    }
415
416    /// Low-level publish with full control over all parameters (blocking version for FFI)
417    ///
418    /// This is an advanced method that provides complete control over routing and delivery.
419    /// Most users should use `publish()` or `publish_to()` instead.
420    ///
421    /// # Arguments
422    /// * `destination` - Target name to send to
423    /// * `fanout` - Number of copies to send (for multicast)
424    /// * `data` - The message payload bytes
425    /// * `connection_out` - Optional specific connection ID to use
426    /// * `payload_type` - Optional content type identifier
427    /// * `metadata` - Optional key-value metadata pairs
428    pub fn publish_with_params(
429        &self,
430        destination: Arc<Name>,
431        fanout: u32,
432        data: Vec<u8>,
433        connection_out: Option<u64>,
434        payload_type: Option<String>,
435        metadata: Option<HashMap<String, String>>,
436    ) -> Result<(), SlimError> {
437        crate::config::get_runtime().block_on(async {
438            self.publish_with_params_async(
439                destination,
440                fanout,
441                data,
442                connection_out,
443                payload_type,
444                metadata,
445            )
446            .await
447        })
448    }
449
450    /// Low-level publish with full control (async version)
451    pub async fn publish_with_params_async(
452        &self,
453        destination: Arc<Name>,
454        fanout: u32,
455        data: Vec<u8>,
456        connection_out: Option<u64>,
457        payload_type: Option<String>,
458        metadata: Option<HashMap<String, String>>,
459    ) -> Result<(), SlimError> {
460        let slim_dest: SlimName = destination.as_ref().into();
461
462        self.publish_internal(
463            &slim_dest,
464            fanout,
465            data,
466            connection_out,
467            payload_type,
468            metadata,
469        )
470        .await
471        .map(|_| ())?;
472
473        Ok(())
474    }
475
476    /// Receive a message from the session (blocking version for FFI)
477    ///
478    /// # Arguments
479    /// * `timeout` - Optional timeout duration
480    ///
481    /// # Returns
482    /// * `Ok(ReceivedMessage)` - Message with context and payload bytes
483    /// * `Err(SlimError)` - If the receive fails or times out
484    pub fn get_message(
485        &self,
486        timeout: Option<std::time::Duration>,
487    ) -> Result<ReceivedMessage, SlimError> {
488        crate::config::get_runtime().block_on(async { self.get_message_async(timeout).await })
489    }
490
491    /// Receive a message from the session (async version)
492    pub async fn get_message_async(
493        &self,
494        timeout: Option<std::time::Duration>,
495    ) -> Result<ReceivedMessage, SlimError> {
496        let (ctx, payload) = self.get_session_message(timeout).await?;
497
498        Ok(ReceivedMessage {
499            context: ctx,
500            payload,
501        })
502    }
503
504    /// Invite a participant to the session (blocking version)
505    ///
506    /// Returns a completion handle that can be awaited to ensure the invitation completes.
507    pub fn invite(&self, participant: Arc<Name>) -> Result<Arc<CompletionHandle>, SlimError> {
508        crate::config::get_runtime().block_on(async { self.invite_async(participant).await })
509    }
510
511    /// Invite a participant to the session (async version)
512    ///
513    /// Returns a completion handle that can be awaited to ensure the invitation completes.
514    pub async fn invite_async(
515        &self,
516        participant: Arc<Name>,
517    ) -> Result<Arc<CompletionHandle>, SlimError> {
518        let slim_name: SlimName = participant.as_ref().into();
519
520        let completion = self.invite_internal(&slim_name).await?;
521
522        // Return completion handle for caller to wait on
523        Ok(Arc::new(CompletionHandle::from(completion)))
524    }
525
526    /// Invite a participant and wait for completion (blocking version)
527    ///
528    /// This method invites a participant and blocks until the invitation completes.
529    pub fn invite_and_wait(&self, participant: Arc<Name>) -> Result<(), SlimError> {
530        crate::config::get_runtime()
531            .block_on(async { self.invite_and_wait_async(participant).await })
532    }
533
534    /// Invite a participant and wait for completion (async version)
535    ///
536    /// This method invites a participant and waits until the invitation completes.
537    pub async fn invite_and_wait_async(&self, participant: Arc<Name>) -> Result<(), SlimError> {
538        let completion_handle = self.invite_async(participant).await?;
539        completion_handle.wait_async().await
540    }
541
542    /// Remove a participant from the session (blocking version)
543    ///
544    /// Returns a completion handle that can be awaited to ensure the removal completes.
545    pub fn remove(&self, participant: Arc<Name>) -> Result<Arc<CompletionHandle>, SlimError> {
546        crate::config::get_runtime().block_on(async { self.remove_async(participant).await })
547    }
548
549    /// Remove a participant from the session (async version)
550    ///
551    /// Returns a completion handle that can be awaited to ensure the removal completes.
552    pub async fn remove_async(
553        &self,
554        participant: Arc<Name>,
555    ) -> Result<Arc<CompletionHandle>, SlimError> {
556        let slim_name: SlimName = participant.as_ref().into();
557
558        let completion = self.remove_internal(&slim_name).await?;
559
560        // Return completion handle for caller to wait on
561        Ok(Arc::new(CompletionHandle::from(completion)))
562    }
563
564    /// Remove a participant and wait for completion (blocking version)
565    ///
566    /// This method removes a participant and blocks until the removal completes.
567    pub fn remove_and_wait(&self, participant: Arc<Name>) -> Result<(), SlimError> {
568        crate::config::get_runtime()
569            .block_on(async { self.remove_and_wait_async(participant).await })
570    }
571
572    /// Remove a participant and wait for completion (async version)
573    ///
574    /// This method removes a participant and waits until the removal completes.
575    pub async fn remove_and_wait_async(&self, participant: Arc<Name>) -> Result<(), SlimError> {
576        let completion_handle = self.remove_async(participant).await?;
577        completion_handle.wait_async().await
578    }
579
580    /// Get the destination name for this session
581    pub fn destination(&self) -> Result<Name, SlimError> {
582        let session = self
583            .session
584            .upgrade()
585            .ok_or_else(|| SlimError::SessionError {
586                message: "Session already closed or dropped".to_string(),
587            })?;
588
589        Ok(Name::from(session.dst()))
590    }
591
592    /// Get the source name for this session
593    pub fn source(&self) -> Result<Name, SlimError> {
594        let session = self
595            .session
596            .upgrade()
597            .ok_or_else(|| SlimError::SessionError {
598                message: "Session already closed or dropped".to_string(),
599            })?;
600
601        Ok(Name::from(session.source()))
602    }
603
604    /// Get the session ID
605    pub fn session_id(&self) -> Result<u32, SlimError> {
606        let session = self
607            .session
608            .upgrade()
609            .ok_or_else(|| SlimError::SessionError {
610                message: "Session already closed or dropped".to_string(),
611            })?;
612
613        Ok(session.id())
614    }
615
616    /// Get the session type (PointToPoint or Group)
617    pub fn session_type(&self) -> Result<SessionType, SlimError> {
618        let session = self
619            .session
620            .upgrade()
621            .ok_or_else(|| SlimError::SessionError {
622                message: "Session already closed or dropped".to_string(),
623            })?;
624
625        match session.session_type() {
626            ProtoSessionType::PointToPoint => Ok(SessionType::PointToPoint),
627            ProtoSessionType::Multicast => Ok(SessionType::Group),
628            ProtoSessionType::Unspecified => Err(SlimError::InvalidArgument {
629                message: "Session has unspecified type".to_string(),
630            }),
631        }
632    }
633
634    /// Check if this session is the initiator
635    pub fn is_initiator(&self) -> Result<bool, SlimError> {
636        let session = self
637            .session
638            .upgrade()
639            .ok_or_else(|| SlimError::SessionError {
640                message: "Session already closed or dropped".to_string(),
641            })?;
642
643        Ok(session.is_initiator())
644    }
645
646    /// Get the session metadata
647    pub fn metadata(&self) -> Result<HashMap<String, String>, SlimError> {
648        let session = self
649            .session
650            .upgrade()
651            .ok_or_else(|| SlimError::SessionError {
652                message: "Session already closed or dropped".to_string(),
653            })?;
654        Ok(session.metadata())
655    }
656
657    /// Get the session configuration
658    pub fn config(&self) -> Result<SessionConfig, SlimError> {
659        let session = self
660            .session
661            .upgrade()
662            .ok_or_else(|| SlimError::SessionError {
663                message: "Session already closed or dropped".to_string(),
664            })?;
665
666        Ok(session.session_config().into())
667    }
668
669    /// Get list of participants in the session
670    pub async fn participants_list_async(&self) -> Result<Vec<Arc<Name>>, SlimError> {
671        let session = self
672            .session
673            .upgrade()
674            .ok_or_else(|| SlimError::SessionError {
675                message: "Session already closed or dropped".to_string(),
676            })?;
677
678        let list = session.participants_list().await?;
679        Ok(list.into_iter().map(|n| Arc::new(Name::from(n))).collect())
680    }
681
682    /// Get list of participants in the session (blocking version for FFI)
683    pub fn participants_list(&self) -> Result<Vec<Arc<Name>>, SlimError> {
684        crate::config::get_runtime().block_on(async { self.participants_list_async().await })
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691    use crate::Name as FfiName;
692    use crate::errors::SlimError;
693    use slim_datapath::api::{
694        ApplicationPayload, ProtoMessage, ProtoPublish, ProtoPublishType, SessionHeader, SlimHeader,
695    };
696    use slim_session::SessionError;
697    use std::time::Duration;
698    use tokio::sync::mpsc;
699
700    /// Helper to create SlimName for proto message construction
701    fn make_slim_name(parts: [&str; 3]) -> SlimName {
702        SlimName::from_strings(parts).with_id(0)
703    }
704
705    /// Helper to create FFI Name for MessageContext construction
706    fn make_ffi_name(parts: [&str; 3]) -> FfiName {
707        FfiName::new(
708            parts[0].to_string(),
709            parts[1].to_string(),
710            parts[2].to_string(),
711        )
712    }
713
714    fn make_context() -> (
715        Session,
716        mpsc::UnboundedSender<Result<ProtoMessage, SessionError>>,
717    ) {
718        let (tx, rx) = mpsc::unbounded_channel::<Result<ProtoMessage, SessionError>>();
719        let ctx = Session {
720            session: std::sync::Weak::new(),
721            rx: RwLock::new(rx),
722        };
723        (ctx, tx)
724    }
725
726    /// Helper to create a valid ProtoMessage for testing message reception
727    fn create_test_proto_message(
728        source: SlimName,
729        dest: SlimName,
730        connection_id: u64,
731        payload: Vec<u8>,
732        content_type: &str,
733        metadata: HashMap<String, String>,
734    ) -> ProtoMessage {
735        let content = ApplicationPayload::new(content_type, payload).as_content();
736
737        let mut slim_header = SlimHeader::default();
738        slim_header.set_source(&source);
739        slim_header.set_destination(&dest);
740
741        let publish = ProtoPublish {
742            header: Some(slim_header),
743            session: Some(SessionHeader::default()),
744            msg: Some(content),
745        };
746
747        let mut proto_msg = ProtoMessage {
748            message_type: Some(ProtoPublishType(publish)),
749            metadata,
750        };
751
752        proto_msg.set_incoming_conn(Some(connection_id));
753        proto_msg
754    }
755
756    // ==================== Message Reception Tests ====================
757
758    #[tokio::test]
759    async fn test_get_session_message_success() {
760        let (ctx, tx) = make_context();
761
762        let source = make_slim_name(["org", "sender", "app"]);
763        let dest = make_slim_name(["org", "receiver", "app"]);
764        let payload = b"hello world".to_vec();
765        let mut metadata = HashMap::new();
766        metadata.insert("key".to_string(), "value".to_string());
767
768        let msg = create_test_proto_message(
769            source.clone(),
770            dest,
771            42,
772            payload.clone(),
773            "text/plain",
774            metadata.clone(),
775        );
776
777        tx.send(Ok(msg)).expect("send should succeed");
778
779        let result = ctx
780            .get_session_message(Some(Duration::from_millis(100)))
781            .await;
782        assert!(result.is_ok(), "should receive message successfully");
783
784        let (msg_ctx, received_payload) = result.unwrap();
785        assert_eq!(received_payload, payload);
786        assert_eq!(msg_ctx.payload_type, "text/plain");
787        assert_eq!(msg_ctx.input_connection, 42);
788        assert_eq!(msg_ctx.metadata.get("key"), Some(&"value".to_string()));
789    }
790
791    #[tokio::test]
792    async fn test_get_session_message_timeout() {
793        let (ctx, _tx) = make_context();
794        let result = ctx
795            .get_session_message(Some(Duration::from_millis(50)))
796            .await;
797        assert!(result.is_err());
798        assert!(result.unwrap_err().to_string().contains("timeout"));
799    }
800
801    #[tokio::test]
802    async fn test_get_session_message_channel_closed() {
803        let (ctx, tx) = make_context();
804        drop(tx); // close sender so receiver sees channel closed
805        let result = ctx
806            .get_session_message(Some(Duration::from_millis(50)))
807            .await;
808        assert!(result.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
809    }
810
811    #[tokio::test]
812    async fn test_get_session_message_decode_error() {
813        let (ctx, tx) = make_context();
814
815        // Send an error through the channel (simulates decode failure)
816        tx.send(Err(SessionError::SlimMessageSendFailed)).unwrap();
817
818        let result = ctx
819            .get_session_message(Some(Duration::from_millis(100)))
820            .await;
821        assert!(result.is_err_and(|e| matches!(e, SessionError::SlimMessageSendFailed)));
822    }
823
824    #[tokio::test]
825    async fn test_get_session_message_no_timeout() {
826        let (ctx, tx) = make_context();
827
828        // Spawn a task that sends a message after a short delay
829        tokio::spawn(async move {
830            tokio::time::sleep(Duration::from_millis(20)).await;
831            let msg = create_test_proto_message(
832                make_slim_name(["org", "sender", "app"]),
833                make_slim_name(["org", "receiver", "app"]),
834                1,
835                b"delayed".to_vec(),
836                "text/plain",
837                HashMap::new(),
838            );
839            let _ = tx.send(Ok(msg));
840        });
841
842        // Call without timeout - should block until message arrives
843        let result = ctx.get_session_message(None).await;
844        assert!(result.is_ok());
845        let (_, payload) = result.unwrap();
846        assert_eq!(payload, b"delayed".to_vec());
847    }
848
849    #[tokio::test]
850    async fn test_get_session_message_various_timeouts() {
851        let (ctx, _tx) = make_context();
852
853        // Very short timeout
854        let start = std::time::Instant::now();
855        let result = ctx
856            .get_session_message(Some(Duration::from_millis(10)))
857            .await;
858        assert!(result.is_err());
859        assert!(start.elapsed() >= Duration::from_millis(10));
860
861        // Zero timeout should return immediately
862        let start = std::time::Instant::now();
863        let result = ctx.get_session_message(Some(Duration::ZERO)).await;
864        assert!(result.is_err());
865        assert!(start.elapsed() < Duration::from_millis(50));
866
867        // Longer timeout
868        let start = std::time::Instant::now();
869        let result = ctx
870            .get_session_message(Some(Duration::from_millis(100)))
871            .await;
872        assert!(result.is_err());
873        let elapsed = start.elapsed();
874        assert!(elapsed >= Duration::from_millis(90)); // Allow variance
875        assert!(elapsed < Duration::from_millis(200));
876    }
877
878    #[tokio::test]
879    async fn test_get_session_message_multiple_messages() {
880        let (ctx, tx) = make_context();
881
882        // Send multiple messages
883        for i in 0..3 {
884            let msg = create_test_proto_message(
885                make_slim_name(["org", "sender", "app"]),
886                make_slim_name(["org", "receiver", "app"]),
887                i as u64,
888                format!("message {}", i).into_bytes(),
889                "text/plain",
890                HashMap::new(),
891            );
892            tx.send(Ok(msg)).expect("send should succeed");
893        }
894
895        // Receive them in order
896        for i in 0..3 {
897            let result = ctx
898                .get_session_message(Some(Duration::from_millis(100)))
899                .await;
900            assert!(result.is_ok());
901            let (msg_ctx, payload) = result.unwrap();
902            assert_eq!(payload, format!("message {}", i).into_bytes());
903            assert_eq!(msg_ctx.input_connection, i as u64);
904        }
905    }
906
907    // ==================== Publish Tests (Session Missing) ====================
908
909    #[tokio::test]
910    async fn test_publish_internal_errors_when_session_missing() {
911        let (ctx, _tx) = make_context();
912        let res = ctx
913            .publish_internal(
914                &make_slim_name(["dest", "app", "v1"]),
915                1,
916                b"payload".to_vec(),
917                None,
918                Some("text/plain".to_string()),
919                None,
920            )
921            .await;
922        assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
923    }
924
925    #[tokio::test]
926    async fn test_publish_internal_with_all_params_errors_when_session_missing() {
927        let (ctx, _tx) = make_context();
928        let mut metadata = HashMap::new();
929        metadata.insert("key".to_string(), "value".to_string());
930
931        let res = ctx
932            .publish_internal(
933                &make_slim_name(["dest", "app", "v1"]),
934                3,                                    // fanout
935                b"payload".to_vec(),                  // blob
936                Some(123),                            // conn_out
937                Some("application/json".to_string()), // payload_type
938                Some(metadata),                       // metadata
939            )
940            .await;
941        assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
942    }
943
944    #[tokio::test]
945    async fn test_publish_to_internal_errors_when_session_missing() {
946        let (ctx, _tx) = make_context();
947        let message_ctx = MessageContext::new(
948            make_ffi_name(["sender", "org", "service"]),
949            Some(make_ffi_name(["receiver", "org", "service"])),
950            "application/json".to_string(),
951            HashMap::new(),
952            42,
953            "unique".to_string(),
954        );
955
956        let res = ctx
957            .publish_to_internal(
958                &message_ctx,
959                b"reply".to_vec(),
960                Some("json".to_string()),
961                None,
962            )
963            .await;
964        assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
965    }
966
967    // ==================== Invite/Remove Tests (Session Missing) ====================
968
969    #[tokio::test]
970    async fn test_invite_internal_errors_when_session_missing() {
971        let (ctx, _tx) = make_context();
972        let res = ctx
973            .invite_internal(&make_slim_name(["org", "peer", "app"]))
974            .await;
975        assert!(res.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
976    }
977
978    #[tokio::test]
979    async fn test_remove_internal_errors_when_session_missing() {
980        let (ctx, _tx) = make_context();
981        let err = ctx
982            .remove_internal(&make_slim_name(["org", "peer", "app"]))
983            .await;
984        assert!(err.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
985    }
986
987    // ==================== Context Creation Tests ====================
988
989    #[tokio::test]
990    async fn test_bindings_session_context_weak_ref() {
991        let (ctx, _tx) = make_context();
992        // With no actual SessionController, the weak ref should be None
993        assert!(ctx.session.upgrade().is_none());
994    }
995
996    // ==================== FFI Method Tests (Session Missing) ====================
997
998    #[tokio::test]
999    async fn test_publish_async_session_missing() {
1000        let (ctx, _tx) = make_context();
1001        let result = ctx.publish_async(b"test".to_vec(), None, None).await;
1002        assert!(result.is_err());
1003        match result.unwrap_err() {
1004            SlimError::SessionError { message } => {
1005                assert!(message.contains("closed") || message.contains("dropped"));
1006            }
1007            _ => panic!("Expected SessionError"),
1008        }
1009    }
1010
1011    #[tokio::test]
1012    async fn test_publish_to_async_session_missing() {
1013        let (ctx, _tx) = make_context();
1014        let message_ctx = MessageContext::new(
1015            make_ffi_name(["sender", "org", "service"]),
1016            Some(make_ffi_name(["receiver", "org", "service"])),
1017            "application/json".to_string(),
1018            HashMap::new(),
1019            42,
1020            "identity".to_string(),
1021        );
1022
1023        let result = ctx
1024            .publish_to_async(message_ctx, b"reply".to_vec(), None, None)
1025            .await;
1026        assert!(result.is_err_and(|e| {
1027            if let SlimError::SessionError { message } = e {
1028                message.contains("closed")
1029            } else {
1030                false
1031            }
1032        }));
1033    }
1034
1035    #[tokio::test]
1036    async fn test_publish_with_params_async_session_missing() {
1037        let (ctx, _tx) = make_context();
1038        let dest = Arc::new(FfiName::new(
1039            "org".to_string(),
1040            "ns".to_string(),
1041            "dest".to_string(),
1042        ));
1043
1044        let result = ctx
1045            .publish_with_params_async(dest, 1, b"test".to_vec(), None, None, None)
1046            .await;
1047        assert!(result.is_err_and(|e| {
1048            if let SlimError::SessionError { message } = e {
1049                message.contains("closed")
1050            } else {
1051                false
1052            }
1053        }));
1054    }
1055
1056    #[tokio::test]
1057    async fn test_publish_with_params_async_with_all_options() {
1058        let (ctx, _tx) = make_context();
1059        let dest = Arc::new(FfiName::new_with_id(
1060            "org".to_string(),
1061            "ns".to_string(),
1062            "dest".to_string(),
1063            123,
1064        ));
1065
1066        let mut metadata = HashMap::new();
1067        metadata.insert("key".to_string(), "value".to_string());
1068
1069        let result = ctx
1070            .publish_with_params_async(
1071                dest,
1072                3, // fanout
1073                b"test payload".to_vec(),
1074                Some(456),                            // connection_out
1075                Some("application/json".to_string()), // payload_type
1076                Some(metadata),                       // metadata
1077            )
1078            .await;
1079
1080        // Should fail because session is missing, but this tests the parameter passing
1081        assert!(result.is_err());
1082    }
1083
1084    // ==================== Get Message FFI Tests ====================
1085
1086    #[tokio::test]
1087    async fn test_get_message_async_success() {
1088        let (ctx, tx) = make_context();
1089
1090        let msg = create_test_proto_message(
1091            make_slim_name(["org", "sender", "app"]),
1092            make_slim_name(["org", "receiver", "app"]),
1093            42,
1094            b"hello".to_vec(),
1095            "text/plain",
1096            HashMap::new(),
1097        );
1098
1099        tx.send(Ok(msg)).expect("send should succeed");
1100
1101        let result = ctx
1102            .get_message_async(Some(std::time::Duration::from_millis(100)))
1103            .await;
1104        assert!(result.is_ok());
1105
1106        let received = result.unwrap();
1107        assert_eq!(received.payload, b"hello");
1108        assert_eq!(received.context.payload_type, "text/plain");
1109        assert_eq!(received.context.input_connection, 42);
1110    }
1111
1112    #[tokio::test]
1113    async fn test_get_message_async_timeout() {
1114        let (ctx, _tx) = make_context();
1115
1116        let result = ctx
1117            .get_message_async(Some(std::time::Duration::from_millis(50)))
1118            .await;
1119        assert!(result.is_err_and(|e| {
1120            if let SlimError::SessionError { message } = e {
1121                message.contains("receive timeout")
1122            } else {
1123                false
1124            }
1125        }));
1126    }
1127
1128    #[tokio::test]
1129    async fn test_get_message_async_channel_closed() {
1130        let (ctx, tx) = make_context();
1131        drop(tx);
1132
1133        let result = ctx
1134            .get_message_async(Some(std::time::Duration::from_millis(100)))
1135            .await;
1136        assert!(result.is_err_and(|e| matches!(e, SlimError::SessionError { .. })));
1137    }
1138
1139    // ==================== Invite/Remove FFI Tests ====================
1140
1141    #[tokio::test]
1142    async fn test_invite_async_session_missing() {
1143        let (ctx, _tx) = make_context();
1144        let participant = Arc::new(FfiName::new(
1145            "org".to_string(),
1146            "ns".to_string(),
1147            "peer".to_string(),
1148        ));
1149
1150        let result = ctx.invite_async(participant).await;
1151        assert!(result.is_err_and(|e| {
1152            if let SlimError::SessionError { message } = e {
1153                message.contains("closed")
1154            } else {
1155                false
1156            }
1157        }));
1158    }
1159
1160    #[tokio::test]
1161    async fn test_remove_async_session_missing() {
1162        let (ctx, _tx) = make_context();
1163        let participant = Arc::new(FfiName::new(
1164            "org".to_string(),
1165            "ns".to_string(),
1166            "peer".to_string(),
1167        ));
1168
1169        let result = ctx.remove_async(participant).await;
1170        assert!(result.is_err_and(|e| {
1171            if let SlimError::SessionError { message } = e {
1172                message.contains("closed")
1173            } else {
1174                false
1175            }
1176        }));
1177    }
1178
1179    // ==================== Session Info Accessor Tests ====================
1180
1181    #[tokio::test]
1182    async fn test_destination_session_missing() {
1183        let (ctx, _tx) = make_context();
1184        let result = ctx.destination();
1185        assert!(result.is_err());
1186        match result.unwrap_err() {
1187            SlimError::SessionError { message } => {
1188                assert!(message.contains("closed") || message.contains("dropped"));
1189            }
1190            _ => panic!("Expected SessionError"),
1191        }
1192    }
1193
1194    #[tokio::test]
1195    async fn test_source_session_missing() {
1196        let (ctx, _tx) = make_context();
1197        let result = ctx.source();
1198        assert!(result.is_err());
1199        match result.unwrap_err() {
1200            SlimError::SessionError { message } => {
1201                assert!(message.contains("closed") || message.contains("dropped"));
1202            }
1203            _ => panic!("Expected SessionError"),
1204        }
1205    }
1206
1207    #[tokio::test]
1208    async fn test_session_id_session_missing() {
1209        let (ctx, _tx) = make_context();
1210        let result = ctx.session_id();
1211        assert!(result.is_err());
1212        match result.unwrap_err() {
1213            SlimError::SessionError { message } => {
1214                assert!(message.contains("closed") || message.contains("dropped"));
1215            }
1216            _ => panic!("Expected SessionError"),
1217        }
1218    }
1219
1220    #[tokio::test]
1221    async fn test_session_type_session_missing() {
1222        let (ctx, _tx) = make_context();
1223        let result = ctx.session_type();
1224        assert!(result.is_err());
1225        match result.unwrap_err() {
1226            SlimError::SessionError { message } => {
1227                assert!(message.contains("closed") || message.contains("dropped"));
1228            }
1229            _ => panic!("Expected SessionError"),
1230        }
1231    }
1232
1233    #[tokio::test]
1234    async fn test_is_initiator_session_missing() {
1235        let (ctx, _tx) = make_context();
1236        let result = ctx.is_initiator();
1237        assert!(result.is_err());
1238        match result.unwrap_err() {
1239            SlimError::SessionError { message } => {
1240                assert!(message.contains("closed") || message.contains("dropped"));
1241            }
1242            _ => panic!("Expected SessionError"),
1243        }
1244    }
1245
1246    #[tokio::test]
1247    async fn test_participants_list_session_missing() {
1248        let (ctx, _tx) = make_context();
1249        let result = ctx.participants_list_async().await;
1250        assert!(result.is_err());
1251        match result.unwrap_err() {
1252            SlimError::SessionError { message } => {
1253                assert!(message.contains("closed") || message.contains("dropped"));
1254            }
1255            _ => panic!("Expected SessionError"),
1256        }
1257    }
1258
1259    // ==================== Publish Internal with Metadata Tests ====================
1260
1261    #[tokio::test]
1262    async fn test_publish_to_internal_adds_publish_to_metadata() {
1263        // This test verifies the metadata manipulation in publish_to_internal
1264        // Even though session is missing, we can verify the code path is exercised
1265        let (ctx, _tx) = make_context();
1266        let message_ctx = MessageContext::new(
1267            make_ffi_name(["sender", "org", "service"]),
1268            Some(make_ffi_name(["receiver", "org", "service"])),
1269            "application/json".to_string(),
1270            HashMap::new(),
1271            42,
1272            "identity".to_string(),
1273        );
1274
1275        let mut metadata = HashMap::new();
1276        metadata.insert("custom_key".to_string(), "custom_value".to_string());
1277
1278        // This should fail but exercises the metadata handling code
1279        let result = ctx
1280            .publish_to_internal(
1281                &message_ctx,
1282                b"reply".to_vec(),
1283                Some("text/plain".to_string()),
1284                Some(metadata),
1285            )
1286            .await;
1287
1288        assert!(result.is_err_and(|e| matches!(e, SessionError::SessionClosed)));
1289    }
1290
1291    // ==================== ReceivedMessage Construction Test ====================
1292
1293    #[tokio::test]
1294    async fn test_received_message_construction() {
1295        let (ctx, tx) = make_context();
1296
1297        let mut metadata = HashMap::new();
1298        metadata.insert("trace_id".to_string(), "abc123".to_string());
1299        metadata.insert("user".to_string(), "test_user".to_string());
1300
1301        let msg = create_test_proto_message(
1302            make_slim_name(["org", "sender", "service"]),
1303            make_slim_name(["org", "receiver", "service"]),
1304            999,
1305            b"complex payload with special chars: \x00\xFF".to_vec(),
1306            "application/octet-stream",
1307            metadata,
1308        );
1309
1310        tx.send(Ok(msg)).expect("send should succeed");
1311
1312        let result = ctx
1313            .get_message_async(Some(std::time::Duration::from_millis(100)))
1314            .await;
1315        assert!(result.is_ok());
1316
1317        let received = result.unwrap();
1318        assert_eq!(
1319            received.payload,
1320            b"complex payload with special chars: \x00\xFF"
1321        );
1322        assert_eq!(received.context.payload_type, "application/octet-stream");
1323        assert_eq!(received.context.input_connection, 999);
1324        assert_eq!(
1325            received.context.metadata.get("trace_id"),
1326            Some(&"abc123".to_string())
1327        );
1328        assert_eq!(
1329            received.context.metadata.get("user"),
1330            Some(&"test_user".to_string())
1331        );
1332    }
1333
1334    // ==================== Message Context Conversion Test ====================
1335
1336    #[tokio::test]
1337    async fn test_message_context_source_as_slim_name() {
1338        let ffi_name = make_ffi_name(["org", "namespace", "app"]);
1339        let message_ctx = MessageContext::new(
1340            ffi_name,
1341            None,
1342            "text/plain".to_string(),
1343            HashMap::new(),
1344            1,
1345            "id".to_string(),
1346        );
1347
1348        let slim_name = message_ctx.source_as_slim_name();
1349        let components = slim_name.components_strings();
1350        assert_eq!(components[0], "org");
1351        assert_eq!(components[1], "namespace");
1352        assert_eq!(components[2], "app");
1353    }
1354
1355    // ==================== Empty/Edge Case Tests ====================
1356
1357    #[tokio::test]
1358    async fn test_get_message_with_empty_payload() {
1359        let (ctx, tx) = make_context();
1360
1361        let msg = create_test_proto_message(
1362            make_slim_name(["org", "sender", "app"]),
1363            make_slim_name(["org", "receiver", "app"]),
1364            1,
1365            vec![], // empty payload
1366            "text/plain",
1367            HashMap::new(),
1368        );
1369
1370        tx.send(Ok(msg)).expect("send should succeed");
1371
1372        let result = ctx
1373            .get_message_async(Some(std::time::Duration::from_millis(100)))
1374            .await;
1375        assert!(result.is_ok());
1376        let received = result.unwrap();
1377        assert!(received.payload.is_empty());
1378    }
1379
1380    #[tokio::test]
1381    async fn test_get_message_with_large_payload() {
1382        let (ctx, tx) = make_context();
1383
1384        // 1MB payload
1385        let large_payload = vec![0xAB; 1024 * 1024];
1386
1387        let msg = create_test_proto_message(
1388            make_slim_name(["org", "sender", "app"]),
1389            make_slim_name(["org", "receiver", "app"]),
1390            1,
1391            large_payload.clone(),
1392            "application/octet-stream",
1393            HashMap::new(),
1394        );
1395
1396        tx.send(Ok(msg)).expect("send should succeed");
1397
1398        let result = ctx
1399            .get_message_async(Some(std::time::Duration::from_millis(100)))
1400            .await;
1401        assert!(result.is_ok());
1402        let received = result.unwrap();
1403        assert_eq!(received.payload.len(), 1024 * 1024);
1404        assert_eq!(received.payload, large_payload);
1405    }
1406
1407    #[tokio::test]
1408    async fn test_publish_async_with_metadata() {
1409        let (ctx, _tx) = make_context();
1410
1411        let mut metadata = HashMap::new();
1412        metadata.insert("key1".to_string(), "value1".to_string());
1413        metadata.insert("key2".to_string(), "value2".to_string());
1414
1415        let result = ctx
1416            .publish_async(
1417                b"test".to_vec(),
1418                Some("application/json".to_string()),
1419                Some(metadata),
1420            )
1421            .await;
1422
1423        // Should fail because session is missing
1424        assert!(result.is_err());
1425    }
1426
1427    #[tokio::test]
1428    async fn test_publish_async_with_options() {
1429        let (ctx, _tx) = make_context();
1430
1431        let mut metadata = HashMap::new();
1432        metadata.insert("trace".to_string(), "123".to_string());
1433
1434        let result = ctx
1435            .publish_async(
1436                b"important message".to_vec(),
1437                Some("text/plain".to_string()),
1438                Some(metadata),
1439            )
1440            .await;
1441
1442        // Should fail because session is missing, but returns CompletionHandle
1443        assert!(result.is_err());
1444    }
1445
1446    // ==================== get_message Duration Tests ====================
1447
1448    /// Test get_message blocking version with Duration timeout
1449    #[test]
1450    fn test_get_message_blocking_with_duration() {
1451        let (ctx, tx) = make_context();
1452
1453        let msg = create_test_proto_message(
1454            make_slim_name(["org", "sender", "app"]),
1455            make_slim_name(["org", "receiver", "app"]),
1456            1,
1457            b"test message".to_vec(),
1458            "text/plain",
1459            HashMap::new(),
1460        );
1461
1462        tx.send(Ok(msg)).expect("send should succeed");
1463
1464        // Test blocking version with Duration parameter
1465        let result = ctx.get_message(Some(std::time::Duration::from_millis(100)));
1466        assert!(result.is_ok());
1467        let received = result.unwrap();
1468        assert_eq!(received.payload, b"test message");
1469    }
1470
1471    /// Test get_message_async with Duration parameter instead of milliseconds
1472    #[tokio::test]
1473    async fn test_get_message_async_with_duration_parameter() {
1474        let (ctx, tx) = make_context();
1475
1476        let msg = create_test_proto_message(
1477            make_slim_name(["org", "sender", "app"]),
1478            make_slim_name(["org", "receiver", "app"]),
1479            1,
1480            b"duration test".to_vec(),
1481            "text/plain",
1482            HashMap::new(),
1483        );
1484
1485        tx.send(Ok(msg)).expect("send should succeed");
1486
1487        // New signature takes Duration directly
1488        let result = ctx
1489            .get_message_async(Some(std::time::Duration::from_millis(200)))
1490            .await;
1491        assert!(result.is_ok());
1492        let received = result.unwrap();
1493        assert_eq!(received.payload, b"duration test");
1494    }
1495
1496    /// Test get_message with no timeout (None)
1497    #[tokio::test]
1498    async fn test_get_message_with_no_timeout() {
1499        let (ctx, tx) = make_context();
1500
1501        let msg = create_test_proto_message(
1502            make_slim_name(["org", "sender", "app"]),
1503            make_slim_name(["org", "receiver", "app"]),
1504            1,
1505            b"no timeout".to_vec(),
1506            "text/plain",
1507            HashMap::new(),
1508        );
1509
1510        // Send immediately so it doesn't block forever
1511        tx.send(Ok(msg)).expect("send should succeed");
1512
1513        let result = ctx.get_message_async(None).await;
1514        assert!(result.is_ok());
1515        let received = result.unwrap();
1516        assert_eq!(received.payload, b"no timeout");
1517    }
1518
1519    /// Test futures-timer timeout behavior in get_session_message
1520    #[tokio::test]
1521    async fn test_get_session_message_futures_timer_timeout() {
1522        let (ctx, _tx) = make_context();
1523
1524        // Test with short timeout using futures-timer
1525        let result = ctx
1526            .get_session_message(Some(std::time::Duration::from_millis(50)))
1527            .await;
1528
1529        // Should timeout because no message is sent
1530        assert!(result.is_err());
1531        if let Err(e) = result {
1532            assert!(
1533                matches!(e, SessionError::ReceiveTimeout),
1534                "Should be ReceiveTimeout error"
1535            );
1536        }
1537    }
1538
1539    /// Test futures-timer select between message and timeout
1540    #[tokio::test]
1541    async fn test_get_message_futures_timer_race_condition() {
1542        let (ctx, tx) = make_context();
1543
1544        // Spawn a task to send message after a delay
1545        let tx_clone = tx.clone();
1546        tokio::spawn(async move {
1547            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
1548            let msg = create_test_proto_message(
1549                make_slim_name(["org", "sender", "app"]),
1550                make_slim_name(["org", "receiver", "app"]),
1551                1,
1552                b"delayed message".to_vec(),
1553                "text/plain",
1554                HashMap::new(),
1555            );
1556            let _ = tx_clone.send(Ok(msg));
1557        });
1558
1559        // Wait with sufficient timeout - message should arrive first
1560        let result = ctx
1561            .get_message_async(Some(std::time::Duration::from_millis(100)))
1562            .await;
1563
1564        assert!(result.is_ok(), "Message should arrive before timeout");
1565        let received = result.unwrap();
1566        assert_eq!(received.payload, b"delayed message");
1567    }
1568
1569    /// Test blocking get_message with timeout
1570    #[test]
1571    fn test_get_message_blocking_timeout() {
1572        let (ctx, _tx) = make_context();
1573
1574        // Blocking version with timeout should fail when no message arrives
1575        let result = ctx.get_message(Some(std::time::Duration::from_millis(50)));
1576
1577        assert!(result.is_err(), "Should timeout when no message arrives");
1578    }
1579
1580    /// Test get_message_async consistency with different timeout values
1581    #[tokio::test]
1582    async fn test_get_message_various_timeout_durations() {
1583        // Test that different Duration values work correctly
1584        let (ctx, _tx) = make_context();
1585
1586        // Very short timeout
1587        let result1 = ctx
1588            .get_message_async(Some(std::time::Duration::from_millis(1)))
1589            .await;
1590        assert!(result1.is_err());
1591
1592        // Medium timeout
1593        let result2 = ctx
1594            .get_message_async(Some(std::time::Duration::from_millis(50)))
1595            .await;
1596        assert!(result2.is_err());
1597
1598        // Longer timeout (but still short for test speed)
1599        let result3 = ctx
1600            .get_message_async(Some(std::time::Duration::from_millis(100)))
1601            .await;
1602        assert!(result3.is_err());
1603    }
1604
1605    // ========================================================================
1606    // SessionConfig Conversion Tests
1607    // ========================================================================
1608
1609    /// Test SessionConfig to SlimSessionConfig conversion for PointToPoint
1610    #[test]
1611    fn test_session_config_point_to_point() {
1612        let config = SessionConfig {
1613            session_type: SessionType::PointToPoint,
1614            enable_mls: true,
1615            max_retries: Some(5),
1616            interval: Some(std::time::Duration::from_millis(200)),
1617            metadata: std::collections::HashMap::from([
1618                ("key1".to_string(), "value1".to_string()),
1619                ("key2".to_string(), "value2".to_string()),
1620            ]),
1621        };
1622
1623        let slim_config: SlimSessionConfig = config.into();
1624
1625        assert_eq!(slim_config.session_type, ProtoSessionType::PointToPoint);
1626        assert!(slim_config.mls_enabled);
1627        assert_eq!(slim_config.max_retries, Some(5));
1628        assert_eq!(
1629            slim_config.interval,
1630            Some(std::time::Duration::from_millis(200))
1631        );
1632        assert_eq!(
1633            slim_config.metadata.get("key1"),
1634            Some(&"value1".to_string())
1635        );
1636    }
1637
1638    /// Test SessionConfig to SlimSessionConfig conversion for Group
1639    #[test]
1640    fn test_session_config_group() {
1641        let config = SessionConfig {
1642            session_type: SessionType::Group,
1643            enable_mls: false,
1644            max_retries: None,
1645            interval: None,
1646            metadata: std::collections::HashMap::new(),
1647        };
1648
1649        let slim_config: SlimSessionConfig = config.into();
1650
1651        assert_eq!(slim_config.session_type, ProtoSessionType::Multicast);
1652        assert!(!slim_config.mls_enabled);
1653        assert!(slim_config.max_retries.is_none());
1654        assert!(slim_config.interval.is_none());
1655    }
1656}