solace_rs/
session.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
pub mod builder;
pub mod event;

pub use builder::{SessionBuilder, SessionBuilderError};
pub use event::SessionEvent;

use crate::cache_session::CacheSession;
use crate::context::Context;
use crate::message::{InboundMessage, Message, OutboundMessage};
use crate::util::get_last_error_info;
use crate::SessionError;
use crate::SolClientReturnCode;
use solace_rs_sys::{self as ffi, solClient_opaqueMsg_pt};
use std::ffi::CString;
use std::marker::PhantomData;
use std::num::NonZeroU32;
use tracing::warn;

type Result<T> = std::result::Result<T, SessionError>;

pub struct Session<
    'session,
    M: FnMut(InboundMessage) + Send + 'session,
    E: FnMut(SessionEvent) + Send + 'session,
> {
    pub(crate) lifetime: PhantomData<&'session ()>,

    // Pointer to session
    // This pointer must never be allowed to leave the struct
    pub(crate) _session_ptr: ffi::solClient_opaqueSession_pt,
    // The `context` field is never accessed, but implicitly does
    // reference counting via the `Drop` trait.
    #[allow(dead_code)]
    pub(crate) context: Context,

    // These fields are used to store the fn callback. The mutable reference to this fn is passed to the FFI library,
    #[allow(dead_code, clippy::redundant_allocation)]
    _msg_fn_ptr: Option<Box<Box<M>>>,
    #[allow(dead_code, clippy::redundant_allocation)]
    _event_fn_ptr: Option<Box<Box<E>>>,
}

unsafe impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Send
    for Session<'_, M, E>
{
}

impl<'session, M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send>
    Session<'session, M, E>
{
    pub fn publish(&self, message: OutboundMessage) -> Result<()> {
        let send_message_raw_rc = unsafe {
            ffi::solClient_session_sendMsg(self._session_ptr, message.get_raw_message_ptr())
        };

        let rc = SolClientReturnCode::from_raw(send_message_raw_rc);
        if !rc.is_ok() {
            let subcode = get_last_error_info();
            return Err(SessionError::PublishError(rc, subcode));
        }

        Ok(())
    }

    pub fn subscribe<T>(&self, topic: T) -> Result<()>
    where
        T: Into<Vec<u8>>,
    {
        let c_topic = CString::new(topic)?;
        let subscription_raw_rc =
            unsafe { ffi::solClient_session_topicSubscribe(self._session_ptr, c_topic.as_ptr()) };

        let rc = SolClientReturnCode::from_raw(subscription_raw_rc);

        if !rc.is_ok() {
            let subcode = get_last_error_info();
            return Err(SessionError::SubscriptionFailure(
                c_topic.to_string_lossy().into_owned(),
                rc,
                subcode,
            ));
        }
        Ok(())
    }

    pub fn unsubscribe<T>(&self, topic: T) -> Result<()>
    where
        T: Into<Vec<u8>>,
    {
        let c_topic = CString::new(topic)?;
        let subscription_raw_rc =
            unsafe { ffi::solClient_session_topicUnsubscribe(self._session_ptr, c_topic.as_ptr()) };

        let rc = SolClientReturnCode::from_raw(subscription_raw_rc);

        if !rc.is_ok() {
            let subcode = get_last_error_info();
            return Err(SessionError::UnsubscriptionFailure(
                c_topic.to_string_lossy().into_owned(),
                rc,
                subcode,
            ));
        }
        Ok(())
    }

    pub fn request(
        &self,
        message: OutboundMessage,
        timeout_ms: NonZeroU32,
    ) -> Result<InboundMessage> {
        let mut reply_ptr: solClient_opaqueMsg_pt = std::ptr::null_mut();

        let rc = unsafe {
            ffi::solClient_session_sendRequest(
                self._session_ptr,
                message.get_raw_message_ptr(),
                &mut reply_ptr,
                timeout_ms.into(),
            )
        };

        let rc = SolClientReturnCode::from_raw(rc);

        if !rc.is_ok() {
            // reply_ptr is always set to null if rc is not Ok
            // https://docs.solace.com/API-Developer-Online-Ref-Documentation/c/sol_client_8h.html#ac00adf1a9301ebe67fd0790523d5a44b
            debug_assert!(reply_ptr.is_null());

            let subcode = get_last_error_info();
            return Err(SessionError::RequestError(rc, subcode));
        }

        debug_assert!(!reply_ptr.is_null());

        let reply = InboundMessage::from(reply_ptr);

        Ok(reply)
    }

    pub fn cache_session<N>(
        self,
        cache_name: N,
        max_message: Option<u64>,
        max_age: Option<u64>,
        timeout_ms: Option<u64>,
    ) -> Result<CacheSession<'session, M, E>>
    where
        N: Into<Vec<u8>>,
    {
        CacheSession::new(self, cache_name, max_message, max_age, timeout_ms)
    }

    pub fn disconnect(self) -> Result<()> {
        let rc = unsafe { ffi::solClient_session_disconnect(self._session_ptr) };

        let rc = SolClientReturnCode::from_raw(rc);

        if !rc.is_ok() {
            let subcode = get_last_error_info();
            return Err(SessionError::DisconnectError(rc, subcode));
        }
        Ok(())
    }
}

impl<M: FnMut(InboundMessage) + Send, E: FnMut(SessionEvent) + Send> Drop for Session<'_, M, E> {
    fn drop(&mut self) {
        let session_free_result = unsafe { ffi::solClient_session_destroy(&mut self._session_ptr) };
        let rc = SolClientReturnCode::from_raw(session_free_result);

        if !rc.is_ok() {
            warn!("session was not dropped properly. {rc}");
        }
    }
}