solace_rs/
cache_session.rs1use std::{
2 ffi::CString,
3 ops::{Deref, DerefMut},
4 ptr,
5};
6
7use solace_rs_sys as ffi;
8use tracing::warn;
9
10use crate::{
11 message::InboundMessage, session::SessionEvent, util::get_last_error_info, Session,
12 SessionError, SolClientReturnCode,
13};
14
15pub struct CacheSession<
16 'session,
17 M: FnMut(InboundMessage) + Send + 'session,
18 E: FnMut(SessionEvent) + Send + 'session,
19> {
20 pub(crate) _cache_session_pt: ffi::solClient_opaqueCacheSession_pt,
23 pub(crate) session: Session<'session, M, E>,
24}
25
26unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Send
27 for CacheSession<'_, M, E>
28{
29}
30unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Sync
31 for CacheSession<'_, M, E>
32{
33}
34
35impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Deref
36 for CacheSession<'session, M, E>
37{
38 type Target = Session<'session, M, E>;
39
40 fn deref(&self) -> &Self::Target {
41 &self.session
42 }
43}
44
45impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Drop
46 for CacheSession<'_, M, E>
47{
48 fn drop(&mut self) {
49 let session_free_result =
50 unsafe { ffi::solClient_cacheSession_destroy(&mut self._cache_session_pt) };
51 let rc = SolClientReturnCode::from_raw(session_free_result);
52
53 if !rc.is_ok() {
54 warn!("cache session was not dropped properly. {rc}");
55 }
56 }
57}
58
59impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> DerefMut
60 for CacheSession<'_, M, E>
61{
62 fn deref_mut(&mut self) -> &mut Self::Target {
63 &mut self.session
64 }
65}
66
67impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send>
68 CacheSession<'session, M, E>
69{
70 pub(crate) fn new<N>(
71 session: Session<'session, M, E>,
72 cache_name: N,
73 max_message: Option<u64>,
74 max_age: Option<u64>,
75 timeout_ms: Option<u64>,
76 ) -> Result<Self, SessionError>
77 where
78 N: Into<Vec<u8>>,
79 {
80 let c_cache_name = CString::new(cache_name)?;
81 let c_max_message = CString::new(max_message.unwrap_or(1).to_string())?;
82 let c_max_age = CString::new(max_age.unwrap_or(0).to_string())?;
83 let c_timeout_ms = CString::new(timeout_ms.unwrap_or(10000).to_string())?;
84
85 let mut cache_session_props = [
87 ffi::SOLCLIENT_CACHESESSION_PROP_CACHE_NAME.as_ptr() as *const i8,
88 c_cache_name.as_ptr(),
89 ffi::SOLCLIENT_CACHESESSION_PROP_DEFAULT_MAX_MSGS.as_ptr() as *const i8,
90 c_max_message.as_ptr(),
91 ffi::SOLCLIENT_CACHESESSION_PROP_MAX_AGE.as_ptr() as *const i8,
92 c_max_age.as_ptr(),
93 ffi::SOLCLIENT_CACHESESSION_PROP_REQUESTREPLY_TIMEOUT_MS.as_ptr() as *const i8,
94 c_timeout_ms.as_ptr(),
95 ptr::null(),
96 ];
97
98 let mut cache_session_pt: ffi::solClient_opaqueCacheSession_pt = ptr::null_mut();
99
100 let cache_create_raw_result = unsafe {
101 ffi::solClient_session_createCacheSession(
102 cache_session_props.as_mut_ptr(),
103 session._session_ptr,
104 &mut cache_session_pt,
105 )
106 };
107
108 let rc = SolClientReturnCode::from_raw(cache_create_raw_result);
109
110 if !rc.is_ok() {
111 let subcode = get_last_error_info();
112 return Err(SessionError::InitializationFailure(rc, subcode));
113 }
114
115 Ok(CacheSession {
116 session,
117 _cache_session_pt: cache_session_pt,
118 })
119 }
120
121 pub fn blocking_cache_request<T>(
122 &self,
123 topic: T,
124 request_id: u64,
125 subscribe: bool,
126 ) -> Result<(), SessionError>
127 where
128 T: Into<Vec<u8>>,
129 {
130 let c_topic = CString::new(topic)?;
131
132 let flags = if subscribe {
133 ffi::SOLCLIENT_CACHEREQUEST_FLAGS_LIVEDATA_FLOWTHRU
134 & ffi::SOLCLIENT_CACHEREQUEST_FLAGS_NO_SUBSCRIBE
135 } else {
136 ffi::SOLCLIENT_CACHEREQUEST_FLAGS_LIVEDATA_FLOWTHRU
137 };
138
139 let rc = unsafe {
140 ffi::solClient_cacheSession_sendCacheRequest(
141 self._cache_session_pt,
142 c_topic.as_ptr(),
143 request_id,
144 None,
145 ptr::null_mut(),
146 flags,
147 0,
148 )
149 };
150
151 let rc = SolClientReturnCode::from_raw(rc);
152 if !rc.is_ok() {
153 let subcode = get_last_error_info();
154 return Err(SessionError::CacheRequestFailure(rc, subcode));
155 }
156
157 Ok(())
158 }
159}