solace_rs/
session.rs

1pub mod builder;
2pub mod event;
3
4pub use builder::{SessionBuilder, SessionBuilderError};
5pub use event::SessionEvent;
6
7use crate::cache_session::CacheSession;
8use crate::context::Context;
9use crate::message::{InboundMessage, Message, OutboundMessage};
10use crate::util::get_last_error_info;
11use crate::SessionError;
12use crate::SolClientReturnCode;
13use solace_rs_sys::{self as ffi, solClient_opaqueMsg_pt};
14use std::ffi::CString;
15use std::marker::PhantomData;
16use std::num::NonZeroU32;
17use tracing::warn;
18
19type Result<T> = std::result::Result<T, SessionError>;
20
21pub struct Session<
22    'session,
23    M: FnMut(InboundMessage) + Send + 'session,
24    E: FnMut(SessionEvent) + Send + 'session,
25> {
26    pub(crate) lifetime: PhantomData<&'session ()>,
27
28    // Pointer to session
29    // This pointer must never be allowed to leave the struct
30    pub(crate) _session_ptr: ffi::solClient_opaqueSession_pt,
31    // The `context` field is never accessed, but implicitly does
32    // reference counting via the `Drop` trait.
33    #[allow(dead_code)]
34    pub(crate) context: Context,
35
36    // These fields are used to store the fn callback. The mutable reference to this fn is passed to the FFI library,
37    #[allow(dead_code, clippy::redundant_allocation)]
38    _msg_fn_ptr: Option<Box<Box<M>>>,
39    #[allow(dead_code, clippy::redundant_allocation)]
40    _event_fn_ptr: Option<Box<Box<E>>>,
41}
42
43unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Send
44    for Session<'_, M, E>
45{
46}
47
48impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send>
49    Session<'session, M, E>
50{
51    pub fn publish(&self, message: OutboundMessage) -> Result<()> {
52        let send_message_raw_rc = unsafe {
53            ffi::solClient_session_sendMsg(self._session_ptr, message.get_raw_message_ptr())
54        };
55
56        let rc = SolClientReturnCode::from_raw(send_message_raw_rc);
57        if !rc.is_ok() {
58            let subcode = get_last_error_info();
59            return Err(SessionError::PublishError(rc, subcode));
60        }
61
62        Ok(())
63    }
64
65    pub fn subscribe<T>(&self, topic: T) -> Result<()>
66    where
67        T: Into<Vec<u8>>,
68    {
69        let c_topic = CString::new(topic)?;
70        let subscription_raw_rc =
71            unsafe { ffi::solClient_session_topicSubscribe(self._session_ptr, c_topic.as_ptr()) };
72
73        let rc = SolClientReturnCode::from_raw(subscription_raw_rc);
74
75        if !rc.is_ok() {
76            let subcode = get_last_error_info();
77            return Err(SessionError::SubscriptionFailure(
78                c_topic.to_string_lossy().into_owned(),
79                rc,
80                subcode,
81            ));
82        }
83        Ok(())
84    }
85
86    pub fn unsubscribe<T>(&self, topic: T) -> Result<()>
87    where
88        T: Into<Vec<u8>>,
89    {
90        let c_topic = CString::new(topic)?;
91        let subscription_raw_rc =
92            unsafe { ffi::solClient_session_topicUnsubscribe(self._session_ptr, c_topic.as_ptr()) };
93
94        let rc = SolClientReturnCode::from_raw(subscription_raw_rc);
95
96        if !rc.is_ok() {
97            let subcode = get_last_error_info();
98            return Err(SessionError::UnsubscriptionFailure(
99                c_topic.to_string_lossy().into_owned(),
100                rc,
101                subcode,
102            ));
103        }
104        Ok(())
105    }
106
107    pub fn request(
108        &self,
109        message: OutboundMessage,
110        timeout_ms: NonZeroU32,
111    ) -> Result<InboundMessage> {
112        let mut reply_ptr: solClient_opaqueMsg_pt = std::ptr::null_mut();
113
114        let rc = unsafe {
115            ffi::solClient_session_sendRequest(
116                self._session_ptr,
117                message.get_raw_message_ptr(),
118                &mut reply_ptr,
119                timeout_ms.into(),
120            )
121        };
122
123        let rc = SolClientReturnCode::from_raw(rc);
124
125        if !rc.is_ok() {
126            // reply_ptr is always set to null if rc is not Ok
127            // https://docs.solace.com/API-Developer-Online-Ref-Documentation/c/sol_client_8h.html#ac00adf1a9301ebe67fd0790523d5a44b
128            debug_assert!(reply_ptr.is_null());
129
130            let subcode = get_last_error_info();
131            return Err(SessionError::RequestError(rc, subcode));
132        }
133
134        debug_assert!(!reply_ptr.is_null());
135
136        let reply = InboundMessage::from(reply_ptr);
137
138        Ok(reply)
139    }
140
141    pub fn cache_session<N>(
142        self,
143        cache_name: N,
144        max_message: Option<u64>,
145        max_age: Option<u64>,
146        timeout_ms: Option<u64>,
147    ) -> Result<CacheSession<'session, M, E>>
148    where
149        N: Into<Vec<u8>>,
150    {
151        CacheSession::new(self, cache_name, max_message, max_age, timeout_ms)
152    }
153
154    pub fn disconnect(self) -> Result<()> {
155        let rc = unsafe { ffi::solClient_session_disconnect(self._session_ptr) };
156
157        let rc = SolClientReturnCode::from_raw(rc);
158
159        if !rc.is_ok() {
160            let subcode = get_last_error_info();
161            return Err(SessionError::DisconnectError(rc, subcode));
162        }
163        Ok(())
164    }
165}
166
167impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Drop for Session<'_, M, E> {
168    fn drop(&mut self) {
169        let session_free_result = unsafe { ffi::solClient_session_destroy(&mut self._session_ptr) };
170        let rc = SolClientReturnCode::from_raw(session_free_result);
171
172        if !rc.is_ok() {
173            warn!("session was not dropped properly. {rc}");
174        }
175    }
176}