1pub mod builder;
2pub mod event;
3
4pub use builder::{SessionBuilder, SessionBuilderError};
5pub use event::SessionEvent;
6
7use crate::cache_session::CacheSession;
8use crate::context::Context;
9use crate::message::{InboundMessage, Message, OutboundMessage};
10use crate::util::get_last_error_info;
11use crate::SessionError;
12use crate::SolClientReturnCode;
13use solace_rs_sys::{self as ffi, solClient_opaqueMsg_pt};
14use std::ffi::CString;
15use std::marker::PhantomData;
16use std::num::NonZeroU32;
17use tracing::warn;
18
19type Result<T> = std::result::Result<T, SessionError>;
20
21pub struct Session<
22 'session,
23 M: FnMut(InboundMessage) + Send + 'session,
24 E: FnMut(SessionEvent) + Send + 'session,
25> {
26 pub(crate) lifetime: PhantomData<&'session ()>,
27
28 pub(crate) _session_ptr: ffi::solClient_opaqueSession_pt,
31 #[allow(dead_code)]
34 pub(crate) context: Context,
35
36 #[allow(dead_code, clippy::redundant_allocation)]
38 _msg_fn_ptr: Option<Box<Box<M>>>,
39 #[allow(dead_code, clippy::redundant_allocation)]
40 _event_fn_ptr: Option<Box<Box<E>>>,
41}
42
43unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Send
44 for Session<'_, M, E>
45{
46}
47
48impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send>
49 Session<'session, M, E>
50{
51 pub fn publish(&self, message: OutboundMessage) -> Result<()> {
52 let send_message_raw_rc = unsafe {
53 ffi::solClient_session_sendMsg(self._session_ptr, message.get_raw_message_ptr())
54 };
55
56 let rc = SolClientReturnCode::from_raw(send_message_raw_rc);
57 if !rc.is_ok() {
58 let subcode = get_last_error_info();
59 return Err(SessionError::PublishError(rc, subcode));
60 }
61
62 Ok(())
63 }
64
65 pub fn subscribe<T>(&self, topic: T) -> Result<()>
66 where
67 T: Into<Vec<u8>>,
68 {
69 let c_topic = CString::new(topic)?;
70 let subscription_raw_rc =
71 unsafe { ffi::solClient_session_topicSubscribe(self._session_ptr, c_topic.as_ptr()) };
72
73 let rc = SolClientReturnCode::from_raw(subscription_raw_rc);
74
75 if !rc.is_ok() {
76 let subcode = get_last_error_info();
77 return Err(SessionError::SubscriptionFailure(
78 c_topic.to_string_lossy().into_owned(),
79 rc,
80 subcode,
81 ));
82 }
83 Ok(())
84 }
85
86 pub fn unsubscribe<T>(&self, topic: T) -> Result<()>
87 where
88 T: Into<Vec<u8>>,
89 {
90 let c_topic = CString::new(topic)?;
91 let subscription_raw_rc =
92 unsafe { ffi::solClient_session_topicUnsubscribe(self._session_ptr, c_topic.as_ptr()) };
93
94 let rc = SolClientReturnCode::from_raw(subscription_raw_rc);
95
96 if !rc.is_ok() {
97 let subcode = get_last_error_info();
98 return Err(SessionError::UnsubscriptionFailure(
99 c_topic.to_string_lossy().into_owned(),
100 rc,
101 subcode,
102 ));
103 }
104 Ok(())
105 }
106
107 pub fn request(
108 &self,
109 message: OutboundMessage,
110 timeout_ms: NonZeroU32,
111 ) -> Result<InboundMessage> {
112 let mut reply_ptr: solClient_opaqueMsg_pt = std::ptr::null_mut();
113
114 let rc = unsafe {
115 ffi::solClient_session_sendRequest(
116 self._session_ptr,
117 message.get_raw_message_ptr(),
118 &mut reply_ptr,
119 timeout_ms.into(),
120 )
121 };
122
123 let rc = SolClientReturnCode::from_raw(rc);
124
125 if !rc.is_ok() {
126 debug_assert!(reply_ptr.is_null());
129
130 let subcode = get_last_error_info();
131 return Err(SessionError::RequestError(rc, subcode));
132 }
133
134 debug_assert!(!reply_ptr.is_null());
135
136 let reply = InboundMessage::from(reply_ptr);
137
138 Ok(reply)
139 }
140
141 pub fn cache_session<N>(
142 self,
143 cache_name: N,
144 max_message: Option<u64>,
145 max_age: Option<u64>,
146 timeout_ms: Option<u64>,
147 ) -> Result<CacheSession<'session, M, E>>
148 where
149 N: Into<Vec<u8>>,
150 {
151 CacheSession::new(self, cache_name, max_message, max_age, timeout_ms)
152 }
153
154 pub fn disconnect(self) -> Result<()> {
155 let rc = unsafe { ffi::solClient_session_disconnect(self._session_ptr) };
156
157 let rc = SolClientReturnCode::from_raw(rc);
158
159 if !rc.is_ok() {
160 let subcode = get_last_error_info();
161 return Err(SessionError::DisconnectError(rc, subcode));
162 }
163 Ok(())
164 }
165}
166
167impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Drop for Session<'_, M, E> {
168 fn drop(&mut self) {
169 let session_free_result = unsafe { ffi::solClient_session_destroy(&mut self._session_ptr) };
170 let rc = SolClientReturnCode::from_raw(session_free_result);
171
172 if !rc.is_ok() {
173 warn!("session was not dropped properly. {rc}");
174 }
175 }
176}