solace_rs/
cache_session.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use std::{
    ffi::CString,
    ops::{Deref, DerefMut},
    ptr,
};

use solace_rs_sys as ffi;
use tracing::warn;

use crate::{
    message::InboundMessage, session::SessionEvent, util::get_last_error_info, Session,
    SessionError, SolClientReturnCode,
};

pub struct CacheSession<
    'session,
    M: FnMut(InboundMessage) + Send + 'session,
    E: FnMut(SessionEvent) + Send + 'session,
> {
    // Pointer to session
    // This pointer must never be allowed to leave the struct
    pub(crate) _cache_session_pt: ffi::solClient_opaqueCacheSession_pt,
    pub(crate) session: Session<'session, M, E>,
}

unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Send
    for CacheSession<'_, M, E>
{
}
unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Sync
    for CacheSession<'_, M, E>
{
}

impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Deref
    for CacheSession<'session, M, E>
{
    type Target = Session<'session, M, E>;

    fn deref(&self) -> &Self::Target {
        &self.session
    }
}

impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Drop
    for CacheSession<'_, M, E>
{
    fn drop(&mut self) {
        let session_free_result =
            unsafe { ffi::solClient_cacheSession_destroy(&mut self._cache_session_pt) };
        let rc = SolClientReturnCode::from_raw(session_free_result);

        if !rc.is_ok() {
            warn!("cache session was not dropped properly. {rc}");
        }
    }
}

impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> DerefMut
    for CacheSession<'_, M, E>
{
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.session
    }
}

impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send>
    CacheSession<'session, M, E>
{
    pub(crate) fn new<N>(
        session: Session<'session, M, E>,
        cache_name: N,
        max_message: Option<u64>,
        max_age: Option<u64>,
        timeout_ms: Option<u64>,
    ) -> Result<Self, SessionError>
    where
        N: Into<Vec<u8>>,
    {
        let c_cache_name = CString::new(cache_name)?;
        let c_max_message = CString::new(max_message.unwrap_or(1).to_string())?;
        let c_max_age = CString::new(max_age.unwrap_or(0).to_string())?;
        let c_timeout_ms = CString::new(timeout_ms.unwrap_or(10000).to_string())?;

        // Note: Needs to live long enough for the values to be copied
        let mut cache_session_props = [
            ffi::SOLCLIENT_CACHESESSION_PROP_CACHE_NAME.as_ptr() as *const i8,
            c_cache_name.as_ptr(),
            ffi::SOLCLIENT_CACHESESSION_PROP_DEFAULT_MAX_MSGS.as_ptr() as *const i8,
            c_max_message.as_ptr(),
            ffi::SOLCLIENT_CACHESESSION_PROP_MAX_AGE.as_ptr() as *const i8,
            c_max_age.as_ptr(),
            ffi::SOLCLIENT_CACHESESSION_PROP_REQUESTREPLY_TIMEOUT_MS.as_ptr() as *const i8,
            c_timeout_ms.as_ptr(),
            ptr::null(),
        ];

        let mut cache_session_pt: ffi::solClient_opaqueCacheSession_pt = ptr::null_mut();

        let cache_create_raw_result = unsafe {
            ffi::solClient_session_createCacheSession(
                cache_session_props.as_mut_ptr(),
                session._session_ptr,
                &mut cache_session_pt,
            )
        };

        let rc = SolClientReturnCode::from_raw(cache_create_raw_result);

        if !rc.is_ok() {
            let subcode = get_last_error_info();
            return Err(SessionError::InitializationFailure(rc, subcode));
        }

        Ok(CacheSession {
            session,
            _cache_session_pt: cache_session_pt,
        })
    }

    pub fn blocking_cache_request<T>(
        &self,
        topic: T,
        request_id: u64,
        subscribe: bool,
    ) -> Result<(), SessionError>
    where
        T: Into<Vec<u8>>,
    {
        let c_topic = CString::new(topic)?;

        let flags = if subscribe {
            ffi::SOLCLIENT_CACHEREQUEST_FLAGS_LIVEDATA_FLOWTHRU
                & ffi::SOLCLIENT_CACHEREQUEST_FLAGS_NO_SUBSCRIBE
        } else {
            ffi::SOLCLIENT_CACHEREQUEST_FLAGS_LIVEDATA_FLOWTHRU
        };

        let rc = unsafe {
            ffi::solClient_cacheSession_sendCacheRequest(
                self._cache_session_pt,
                c_topic.as_ptr(),
                request_id,
                None,
                ptr::null_mut(),
                flags,
                0,
            )
        };

        let rc = SolClientReturnCode::from_raw(rc);
        if !rc.is_ok() {
            let subcode = get_last_error_info();
            return Err(SessionError::CacheRequestFailure(rc, subcode));
        }

        Ok(())
    }
}