solace_rs/message/
inbound.rs1use super::{CacheStatus, Message, MessageError, Result};
2use crate::SolClientReturnCode;
3use enum_primitive::*;
4use solace_rs_sys as ffi;
5use std::convert::From;
6use std::ffi::CStr;
7use std::time::{Duration, SystemTime};
8use std::{fmt, ptr};
9use tracing::warn;
10
11pub struct InboundMessage {
12 _msg_ptr: ffi::solClient_opaqueMsg_pt,
13}
14
15impl fmt::Debug for InboundMessage {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 let mut f = f.debug_struct("InboundMessage");
18 if self.get_receive_timestamp().is_ok_and(|v| v.is_some()) {
19 f.field(
20 "receive_timestamp",
21 &format_args!("{:?}", self.get_receive_timestamp().unwrap().unwrap()),
22 );
23 }
24 if self.get_sender_id().is_ok_and(|v| v.is_some()) {
25 f.field(
26 "sender_id",
27 &format_args!("{}", self.get_sender_id().unwrap().unwrap()),
28 );
29 }
30 if self.get_sender_timestamp().is_ok_and(|v| v.is_some()) {
31 f.field(
32 "sender_timestamp",
33 &format_args!("{:?}", self.get_sender_timestamp().unwrap().unwrap()),
34 );
35 }
36 if self.get_sequence_number().is_ok_and(|v| v.is_some()) {
37 f.field(
38 "sequence_number",
39 &format_args!("{}", self.get_sequence_number().unwrap().unwrap()),
40 );
41 }
42 if self.get_correlation_id().is_ok_and(|v| v.is_some()) {
43 f.field(
44 "correlation_id",
45 &format_args!("{}", self.get_correlation_id().unwrap().unwrap()),
46 );
47 }
48 if self.get_priority().is_ok_and(|v| v.is_some()) {
49 f.field(
50 "priority",
51 &format_args!("{}", self.get_priority().unwrap().unwrap()),
52 );
53 }
54 if self.is_discard_indication() {
55 f.field(
56 "is_discard_indication",
57 &format_args!("{}", self.is_discard_indication()),
58 );
59 }
60 if self.get_application_message_id().is_some() {
61 f.field(
62 "application_message_id",
63 &format_args!("{}", &self.get_application_message_id().unwrap()),
64 );
65 }
66 if self.get_user_data().is_ok_and(|v| v.is_some()) {
67 if let Ok(v) = std::str::from_utf8(self.get_user_data().unwrap().unwrap()) {
68 f.field("user_data", &v);
69 }
70 }
71 if self.get_destination().is_ok_and(|v| v.is_some()) {
72 f.field("destination", &self.get_destination().unwrap().unwrap());
73 }
74
75 f.field("is_reply", &self.is_reply());
76
77 if self.get_reply_to().is_ok_and(|v| v.is_some()) {
78 f.field("reply_to", &self.get_reply_to().unwrap().unwrap());
79 }
80
81 f.field("is_cache_msg", &self.is_cache_msg());
82
83 if self.get_cache_request_id().is_ok_and(|v| v.is_some()) {
84 f.field(
85 "cache_request_id",
86 &self.get_cache_request_id().unwrap().unwrap(),
87 );
88 }
89
90 if self.get_payload().is_ok_and(|v| v.is_some()) {
91 if let Ok(v) = std::str::from_utf8(self.get_payload().unwrap().unwrap()) {
92 f.field("payload", &v);
93 }
94 }
95 f.finish()
96 }
97}
98
99unsafe impl Send for InboundMessage {}
100
101impl Drop for InboundMessage {
102 fn drop(&mut self) {
103 let rc = unsafe { ffi::solClient_msg_free(&mut self._msg_ptr) };
104
105 let rc = SolClientReturnCode::from_raw(rc);
106 if !rc.is_ok() {
107 warn!("warning: message was not dropped properly");
108 }
109 }
110}
111
112impl From<ffi::solClient_opaqueMsg_pt> for InboundMessage {
113 fn from(ptr: ffi::solClient_opaqueMsg_pt) -> Self {
123 Self { _msg_ptr: ptr }
124 }
125}
126
127impl<'a> Message<'a> for InboundMessage {
128 unsafe fn get_raw_message_ptr(&self) -> ffi::solClient_opaqueMsg_pt {
129 self._msg_ptr
130 }
131}
132
133impl InboundMessage {
134 pub fn get_receive_timestamp(&self) -> Result<Option<SystemTime>> {
135 let mut ts: i64 = 0;
136 let rc = unsafe { ffi::solClient_msg_getRcvTimestamp(self.get_raw_message_ptr(), &mut ts) };
137
138 let rc = SolClientReturnCode::from_raw(rc);
139 match rc {
140 SolClientReturnCode::NotFound => Ok(None),
141 SolClientReturnCode::Ok => Ok(Some(
142 SystemTime::UNIX_EPOCH + Duration::from_millis(ts.try_into().unwrap()),
143 )),
144 _ => Err(MessageError::FieldError("receive_timestamp", rc)),
145 }
146 }
147
148 pub fn get_sender_id(&self) -> Result<Option<&str>> {
149 let mut buffer = ptr::null();
150
151 let rc = unsafe { ffi::solClient_msg_getSenderId(self.get_raw_message_ptr(), &mut buffer) };
152
153 let rc = SolClientReturnCode::from_raw(rc);
154 match rc {
155 SolClientReturnCode::Ok => (),
156 SolClientReturnCode::NotFound => return Ok(None),
157 _ => return Err(MessageError::FieldError("sender_id", rc)),
158 }
159
160 let c_str = unsafe { CStr::from_ptr(buffer) };
161
162 let str = c_str
163 .to_str()
164 .map_err(|_| MessageError::FieldConvertionError("sender_id"))?;
165
166 Ok(Some(str))
167 }
168
169 pub fn is_discard_indication(&self) -> bool {
170 let discard_indication =
171 unsafe { ffi::solClient_msg_isDiscardIndication(self.get_raw_message_ptr()) };
172
173 if discard_indication == 0 {
174 return false;
175 }
176
177 true
178 }
179
180 pub fn get_cache_request_id(&self) -> Result<Option<u64>> {
181 let mut id: u64 = 0;
182
183 let rc =
184 unsafe { ffi::solClient_msg_getCacheRequestId(self.get_raw_message_ptr(), &mut id) };
185
186 let rc = SolClientReturnCode::from_raw(rc);
187 match rc {
188 SolClientReturnCode::Ok => Ok(Some(id)),
189 SolClientReturnCode::NotFound => Ok(None),
190 _ => Err(MessageError::FieldError("cache_request_id", rc)),
191 }
192 }
193
194 pub fn is_cache_msg(&self) -> CacheStatus {
195 let raw = unsafe { ffi::solClient_msg_isCacheMsg(self.get_raw_message_ptr()) };
196 CacheStatus::from_i32(raw).unwrap_or(CacheStatus::InvalidMessage)
197 }
198}