solace_rs/
cache_session.rs

1use std::{
2    ffi::CString,
3    ops::{Deref, DerefMut},
4    ptr,
5};
6
7use solace_rs_sys as ffi;
8use tracing::warn;
9
10use crate::{
11    message::InboundMessage, session::SessionEvent, util::get_last_error_info, Session,
12    SessionError, SolClientReturnCode,
13};
14
15pub struct CacheSession<
16    'session,
17    M: FnMut(InboundMessage) + Send + 'session,
18    E: FnMut(SessionEvent) + Send + 'session,
19> {
20    // Pointer to session
21    // This pointer must never be allowed to leave the struct
22    pub(crate) _cache_session_pt: ffi::solClient_opaqueCacheSession_pt,
23    pub(crate) session: Session<'session, M, E>,
24}
25
26unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Send
27    for CacheSession<'_, M, E>
28{
29}
30unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Sync
31    for CacheSession<'_, M, E>
32{
33}
34
35impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Deref
36    for CacheSession<'session, M, E>
37{
38    type Target = Session<'session, M, E>;
39
40    fn deref(&self) -> &Self::Target {
41        &self.session
42    }
43}
44
45impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Drop
46    for CacheSession<'_, M, E>
47{
48    fn drop(&mut self) {
49        let session_free_result =
50            unsafe { ffi::solClient_cacheSession_destroy(&mut self._cache_session_pt) };
51        let rc = SolClientReturnCode::from_raw(session_free_result);
52
53        if !rc.is_ok() {
54            warn!("cache session was not dropped properly. {rc}");
55        }
56    }
57}
58
59impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> DerefMut
60    for CacheSession<'_, M, E>
61{
62    fn deref_mut(&mut self) -> &mut Self::Target {
63        &mut self.session
64    }
65}
66
67impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send>
68    CacheSession<'session, M, E>
69{
70    pub(crate) fn new<N>(
71        session: Session<'session, M, E>,
72        cache_name: N,
73        max_message: Option<u64>,
74        max_age: Option<u64>,
75        timeout_ms: Option<u64>,
76    ) -> Result<Self, SessionError>
77    where
78        N: Into<Vec<u8>>,
79    {
80        let c_cache_name = CString::new(cache_name)?;
81        let c_max_message = CString::new(max_message.unwrap_or(1).to_string())?;
82        let c_max_age = CString::new(max_age.unwrap_or(0).to_string())?;
83        let c_timeout_ms = CString::new(timeout_ms.unwrap_or(10000).to_string())?;
84
85        // Note: Needs to live long enough for the values to be copied
86        let mut cache_session_props = [
87            ffi::SOLCLIENT_CACHESESSION_PROP_CACHE_NAME.as_ptr() as *const i8,
88            c_cache_name.as_ptr(),
89            ffi::SOLCLIENT_CACHESESSION_PROP_DEFAULT_MAX_MSGS.as_ptr() as *const i8,
90            c_max_message.as_ptr(),
91            ffi::SOLCLIENT_CACHESESSION_PROP_MAX_AGE.as_ptr() as *const i8,
92            c_max_age.as_ptr(),
93            ffi::SOLCLIENT_CACHESESSION_PROP_REQUESTREPLY_TIMEOUT_MS.as_ptr() as *const i8,
94            c_timeout_ms.as_ptr(),
95            ptr::null(),
96        ];
97
98        let mut cache_session_pt: ffi::solClient_opaqueCacheSession_pt = ptr::null_mut();
99
100        let cache_create_raw_result = unsafe {
101            ffi::solClient_session_createCacheSession(
102                cache_session_props.as_mut_ptr(),
103                session._session_ptr,
104                &mut cache_session_pt,
105            )
106        };
107
108        let rc = SolClientReturnCode::from_raw(cache_create_raw_result);
109
110        if !rc.is_ok() {
111            let subcode = get_last_error_info();
112            return Err(SessionError::InitializationFailure(rc, subcode));
113        }
114
115        Ok(CacheSession {
116            session,
117            _cache_session_pt: cache_session_pt,
118        })
119    }
120
121    pub fn blocking_cache_request<T>(
122        &self,
123        topic: T,
124        request_id: u64,
125        subscribe: bool,
126    ) -> Result<(), SessionError>
127    where
128        T: Into<Vec<u8>>,
129    {
130        let c_topic = CString::new(topic)?;
131
132        let flags = if subscribe {
133            ffi::SOLCLIENT_CACHEREQUEST_FLAGS_LIVEDATA_FLOWTHRU
134                & ffi::SOLCLIENT_CACHEREQUEST_FLAGS_NO_SUBSCRIBE
135        } else {
136            ffi::SOLCLIENT_CACHEREQUEST_FLAGS_LIVEDATA_FLOWTHRU
137        };
138
139        let rc = unsafe {
140            ffi::solClient_cacheSession_sendCacheRequest(
141                self._cache_session_pt,
142                c_topic.as_ptr(),
143                request_id,
144                None,
145                ptr::null_mut(),
146                flags,
147                0,
148            )
149        };
150
151        let rc = SolClientReturnCode::from_raw(rc);
152        if !rc.is_ok() {
153            let subcode = get_last_error_info();
154            return Err(SessionError::CacheRequestFailure(rc, subcode));
155        }
156
157        Ok(())
158    }
159}