solace_rs/message/
inbound.rs

1use super::{CacheStatus, Message, MessageError, Result};
2use crate::SolClientReturnCode;
3use enum_primitive::*;
4use solace_rs_sys as ffi;
5use std::convert::From;
6use std::ffi::CStr;
7use std::time::{Duration, SystemTime};
8use std::{fmt, ptr};
9use tracing::warn;
10
11pub struct InboundMessage {
12    _msg_ptr: ffi::solClient_opaqueMsg_pt,
13}
14
15impl fmt::Debug for InboundMessage {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        let mut f = f.debug_struct("InboundMessage");
18        if self.get_receive_timestamp().is_ok_and(|v| v.is_some()) {
19            f.field(
20                "receive_timestamp",
21                &format_args!("{:?}", self.get_receive_timestamp().unwrap().unwrap()),
22            );
23        }
24        if self.get_sender_id().is_ok_and(|v| v.is_some()) {
25            f.field(
26                "sender_id",
27                &format_args!("{}", self.get_sender_id().unwrap().unwrap()),
28            );
29        }
30        if self.get_sender_timestamp().is_ok_and(|v| v.is_some()) {
31            f.field(
32                "sender_timestamp",
33                &format_args!("{:?}", self.get_sender_timestamp().unwrap().unwrap()),
34            );
35        }
36        if self.get_sequence_number().is_ok_and(|v| v.is_some()) {
37            f.field(
38                "sequence_number",
39                &format_args!("{}", self.get_sequence_number().unwrap().unwrap()),
40            );
41        }
42        if self.get_correlation_id().is_ok_and(|v| v.is_some()) {
43            f.field(
44                "correlation_id",
45                &format_args!("{}", self.get_correlation_id().unwrap().unwrap()),
46            );
47        }
48        if self.get_priority().is_ok_and(|v| v.is_some()) {
49            f.field(
50                "priority",
51                &format_args!("{}", self.get_priority().unwrap().unwrap()),
52            );
53        }
54        if self.is_discard_indication() {
55            f.field(
56                "is_discard_indication",
57                &format_args!("{}", self.is_discard_indication()),
58            );
59        }
60        if self.get_application_message_id().is_some() {
61            f.field(
62                "application_message_id",
63                &format_args!("{}", &self.get_application_message_id().unwrap()),
64            );
65        }
66        if self.get_user_data().is_ok_and(|v| v.is_some()) {
67            if let Ok(v) = std::str::from_utf8(self.get_user_data().unwrap().unwrap()) {
68                f.field("user_data", &v);
69            }
70        }
71        if self.get_destination().is_ok_and(|v| v.is_some()) {
72            f.field("destination", &self.get_destination().unwrap().unwrap());
73        }
74
75        f.field("is_reply", &self.is_reply());
76
77        if self.get_reply_to().is_ok_and(|v| v.is_some()) {
78            f.field("reply_to", &self.get_reply_to().unwrap().unwrap());
79        }
80
81        f.field("is_cache_msg", &self.is_cache_msg());
82
83        if self.get_cache_request_id().is_ok_and(|v| v.is_some()) {
84            f.field(
85                "cache_request_id",
86                &self.get_cache_request_id().unwrap().unwrap(),
87            );
88        }
89
90        if self.get_payload().is_ok_and(|v| v.is_some()) {
91            if let Ok(v) = std::str::from_utf8(self.get_payload().unwrap().unwrap()) {
92                f.field("payload", &v);
93            }
94        }
95        f.finish()
96    }
97}
98
99unsafe impl Send for InboundMessage {}
100
101impl Drop for InboundMessage {
102    fn drop(&mut self) {
103        let rc = unsafe { ffi::solClient_msg_free(&mut self._msg_ptr) };
104
105        let rc = SolClientReturnCode::from_raw(rc);
106        if !rc.is_ok() {
107            warn!("warning: message was not dropped properly");
108        }
109    }
110}
111
112impl From<ffi::solClient_opaqueMsg_pt> for InboundMessage {
113    /// .
114    ///
115    /// # Safety
116    ///
117    /// From a valid owned pointer.
118    /// No other alias should exist for this pointer
119    /// InboundMessage will try to free the ptr when it is destroyed
120    ///
121    /// .
122    fn from(ptr: ffi::solClient_opaqueMsg_pt) -> Self {
123        Self { _msg_ptr: ptr }
124    }
125}
126
127impl<'a> Message<'a> for InboundMessage {
128    unsafe fn get_raw_message_ptr(&self) -> ffi::solClient_opaqueMsg_pt {
129        self._msg_ptr
130    }
131}
132
133impl InboundMessage {
134    pub fn get_receive_timestamp(&self) -> Result<Option<SystemTime>> {
135        let mut ts: i64 = 0;
136        let rc = unsafe { ffi::solClient_msg_getRcvTimestamp(self.get_raw_message_ptr(), &mut ts) };
137
138        let rc = SolClientReturnCode::from_raw(rc);
139        match rc {
140            SolClientReturnCode::NotFound => Ok(None),
141            SolClientReturnCode::Ok => Ok(Some(
142                SystemTime::UNIX_EPOCH + Duration::from_millis(ts.try_into().unwrap()),
143            )),
144            _ => Err(MessageError::FieldError("receive_timestamp", rc)),
145        }
146    }
147
148    pub fn get_sender_id(&self) -> Result<Option<&str>> {
149        let mut buffer = ptr::null();
150
151        let rc = unsafe { ffi::solClient_msg_getSenderId(self.get_raw_message_ptr(), &mut buffer) };
152
153        let rc = SolClientReturnCode::from_raw(rc);
154        match rc {
155            SolClientReturnCode::Ok => (),
156            SolClientReturnCode::NotFound => return Ok(None),
157            _ => return Err(MessageError::FieldError("sender_id", rc)),
158        }
159
160        let c_str = unsafe { CStr::from_ptr(buffer) };
161
162        let str = c_str
163            .to_str()
164            .map_err(|_| MessageError::FieldConvertionError("sender_id"))?;
165
166        Ok(Some(str))
167    }
168
169    pub fn is_discard_indication(&self) -> bool {
170        let discard_indication =
171            unsafe { ffi::solClient_msg_isDiscardIndication(self.get_raw_message_ptr()) };
172
173        if discard_indication == 0 {
174            return false;
175        }
176
177        true
178    }
179
180    pub fn get_cache_request_id(&self) -> Result<Option<u64>> {
181        let mut id: u64 = 0;
182
183        let rc =
184            unsafe { ffi::solClient_msg_getCacheRequestId(self.get_raw_message_ptr(), &mut id) };
185
186        let rc = SolClientReturnCode::from_raw(rc);
187        match rc {
188            SolClientReturnCode::Ok => Ok(Some(id)),
189            SolClientReturnCode::NotFound => Ok(None),
190            _ => Err(MessageError::FieldError("cache_request_id", rc)),
191        }
192    }
193
194    pub fn is_cache_msg(&self) -> CacheStatus {
195        let raw = unsafe { ffi::solClient_msg_isCacheMsg(self.get_raw_message_ptr()) };
196        CacheStatus::from_i32(raw).unwrap_or(CacheStatus::InvalidMessage)
197    }
198}