solace_rs/message/
outbound.rs

1use super::destination::MessageDestination;
2use super::{ClassOfService, DeliveryMode, Message};
3use crate::SolClientReturnCode;
4use solace_rs_sys as ffi;
5use std::ffi::{c_void, CString, NulError};
6use std::ptr;
7use std::time::SystemTime;
8use thiserror::Error;
9use tracing::warn;
10
11#[derive(Error, Debug)]
12pub enum MessageBuilderError {
13    #[error("builder recieved invalid args")]
14    InvalidArgs(#[from] NulError),
15    #[error("{0} arg need to be set")]
16    MissingRequiredArgs(String),
17    #[error("{0} size need to be less than {1} found {2}")]
18    SizeErrorArgs(String, usize, usize),
19    #[error("timestamp needs to be greater than UNIX_EPOCH")]
20    TimestampError,
21    #[error("solClient message aloc failed")]
22    MessageAlocFailure,
23}
24
25type Result<T> = std::result::Result<T, MessageBuilderError>;
26
27pub struct OutboundMessage {
28    _msg_ptr: ffi::solClient_opaqueMsg_pt,
29}
30
31unsafe impl Send for OutboundMessage {}
32
33impl Drop for OutboundMessage {
34    fn drop(&mut self) {
35        debug_assert!(!self._msg_ptr.is_null());
36        let msg_free_result = unsafe { ffi::solClient_msg_free(&mut self._msg_ptr) };
37
38        let rc = SolClientReturnCode::from_raw(msg_free_result);
39
40        if !rc.is_ok() {
41            warn!("message was not dropped properly");
42        }
43    }
44}
45
46impl<'a> Message<'a> for OutboundMessage {
47    unsafe fn get_raw_message_ptr(&self) -> ffi::solClient_opaqueMsg_pt {
48        self._msg_ptr
49    }
50}
51
52#[derive(Default)]
53pub struct OutboundMessageBuilder {
54    delivery_mode: Option<DeliveryMode>,
55    destination: Option<MessageDestination>,
56    message: Option<Vec<u8>>,
57    correlation_id: Option<Vec<u8>>,
58    class_of_service: Option<ClassOfService>,
59    seq_number: Option<u64>,
60    priority: Option<u8>,
61    application_id: Option<Vec<u8>>,
62    application_msg_type: Option<Vec<u8>>,
63    user_data: Option<Vec<u8>>,
64    sender_ts: Option<SystemTime>,
65    eliding_eligible: Option<()>,
66    is_reply: Option<()>,
67}
68
69impl OutboundMessageBuilder {
70    /// Creates a new [`OutboundMessageBuilder`].
71    pub fn new() -> Self {
72        Self::default()
73    }
74    pub fn delivery_mode(mut self, mode: DeliveryMode) -> Self {
75        self.delivery_mode = Some(mode);
76        self
77    }
78
79    pub fn application_id<M>(mut self, application_id: M) -> Self
80    where
81        M: Into<Vec<u8>>,
82    {
83        self.application_id = Some(application_id.into());
84        self
85    }
86
87    pub fn application_msg_type<M>(mut self, message_type: M) -> Self
88    where
89        M: Into<Vec<u8>>,
90    {
91        self.application_msg_type = Some(message_type.into());
92        self
93    }
94
95    pub fn destination(mut self, destination: MessageDestination) -> Self {
96        self.destination = Some(destination);
97        self
98    }
99
100    pub fn class_of_service(mut self, cos: ClassOfService) -> Self {
101        self.class_of_service = Some(cos);
102        self
103    }
104
105    pub fn seq_number(mut self, seq_num: u64) -> Self {
106        self.seq_number = Some(seq_num);
107        self
108    }
109
110    pub fn sender_timestamp(mut self, ts: SystemTime) -> Self {
111        self.sender_ts = Some(ts);
112        self
113    }
114
115    pub fn priority(mut self, priority: u8) -> Self {
116        self.priority = Some(priority);
117        self
118    }
119
120    pub fn is_reply(mut self, is_reply: bool) -> Self {
121        if is_reply {
122            self.is_reply = Some(());
123        } else {
124            self.is_reply = None
125        }
126        self
127    }
128
129    pub fn user_data<D>(mut self, data: D) -> Self
130    where
131        D: Into<Vec<u8>>,
132    {
133        self.user_data = Some(data.into());
134
135        self
136    }
137
138    pub fn payload<M>(mut self, message: M) -> Self
139    where
140        M: Into<Vec<u8>>,
141    {
142        // for attaching the message to the ptr, we have a couple of options
143        // based on those options, we can create a couple of interfaces
144        //
145        // solClient_msg_setBinaryAttachmentPtr (solClient_opaqueMsg_pt msg_p, void *buf_p, solClient_uint32_t size)
146        // Given a msg_p, set the contents of a Binary Attachment Part to the given pointer and size.
147        //
148        // solClient_msg_setBinaryAttachment (solClient_opaqueMsg_pt msg_p, const void *buf_p, solClient_uint32_t size)
149        // Given a msg_p, set the contents of the binary attachment part by copying in from the given pointer and size.
150        //
151        // solClient_msg_setBinaryAttachmentString (solClient_opaqueMsg_pt msg_p, const char *buf_p)
152        // Given a msg_p, set the contents of the binary attachment part to a UTF-8 or ASCII string by copying in from the given pointer until null-terminated.
153        //
154        // we will only use the binary ptr methods
155        self.message = Some(message.into());
156
157        self
158    }
159
160    pub fn correlation_id<M>(mut self, id: M) -> Self
161    where
162        M: Into<Vec<u8>>,
163    {
164        self.correlation_id = Some(id.into());
165        self
166    }
167
168    pub fn eliding_eligible(mut self, eliding_eligible: bool) -> Self {
169        if eliding_eligible {
170            self.eliding_eligible = Some(());
171        } else {
172            self.eliding_eligible = None;
173        }
174        self
175    }
176
177    pub fn build(self) -> Result<OutboundMessage> {
178        // message allocation
179        let mut msg_ptr: ffi::solClient_opaqueMsg_pt = ptr::null_mut();
180        let rc = unsafe { ffi::solClient_msg_alloc(&mut msg_ptr) };
181
182        let rc = SolClientReturnCode::from_raw(rc);
183
184        if !rc.is_ok() {
185            return Err(MessageBuilderError::MessageAlocFailure);
186        };
187
188        // OutboundMessage is responsible for dropping the message in-case of any errors
189        let msg = OutboundMessage { _msg_ptr: msg_ptr };
190
191        // We do not check the return code for many of the setter functions since they only fail
192        // on invalid msg_ptr. We validated the message ptr above, so no need to double check.
193
194        // delivery_mode
195        let Some(delivery_mode) = self.delivery_mode else {
196            return Err(MessageBuilderError::MissingRequiredArgs(
197                "delivery_mode".to_owned(),
198            ));
199        };
200        unsafe { ffi::solClient_msg_setDeliveryMode(msg_ptr, delivery_mode as u32) };
201
202        // destination
203        let Some(destination) = self.destination else {
204            return Err(MessageBuilderError::MissingRequiredArgs(
205                "destination".to_owned(),
206            ));
207        };
208        // destination is being copied by solClient_msg_setDestination
209        // so it is fine to create a ptr for the destination.dest
210        let mut destination: ffi::solClient_destination = ffi::solClient_destination {
211            destType: destination.dest_type.to_i32(),
212            dest: destination.dest.as_ptr(),
213        };
214        unsafe {
215            ffi::solClient_msg_setDestination(
216                msg_ptr,
217                &mut destination,
218                std::mem::size_of::<ffi::solClient_destination>(),
219            )
220        };
221
222        if let Some(user_data) = self.user_data {
223            if user_data.len()
224                > ffi::SOLCLIENT_BUFINFO_MAX_USER_DATA_SIZE
225                    .try_into()
226                    .unwrap()
227            {
228                return Err(MessageBuilderError::SizeErrorArgs(
229                    "user_data".to_owned(),
230                    user_data.len(),
231                    ffi::SOLCLIENT_BUFINFO_MAX_USER_DATA_SIZE
232                        .try_into()
233                        .unwrap(),
234                ));
235            }
236            // We pass the ptr which is then copied over
237            unsafe {
238                ffi::solClient_msg_setUserData(
239                    msg_ptr,
240                    user_data.as_ptr() as *const c_void,
241                    user_data.len() as u32,
242                )
243            };
244        }
245
246        // binary attachment
247        // We pass the ptr which is then copied over
248        let Some(message) = self.message else {
249            return Err(MessageBuilderError::MissingRequiredArgs(
250                "message".to_owned(),
251            ));
252        };
253        unsafe {
254            ffi::solClient_msg_setBinaryAttachment(
255                msg_ptr,
256                message.as_ptr() as *const c_void,
257                message.len() as u32,
258            )
259        };
260
261        // correlation_id
262        if let Some(id) = self.correlation_id {
263            // correlation_id is copied over
264            let c_id = CString::new(id)?;
265            unsafe { ffi::solClient_msg_setCorrelationId(msg_ptr, c_id.as_ptr()) };
266        }
267
268        // Class of Service
269        if let Some(cos) = self.class_of_service {
270            unsafe { ffi::solClient_msg_setClassOfService(msg_ptr, cos.into()) };
271        }
272
273        // Sequence Number
274        if let Some(seq_number) = self.seq_number {
275            unsafe { ffi::solClient_msg_setSequenceNumber(msg_ptr, seq_number) };
276        }
277
278        // Priority
279        if let Some(priority) = self.priority {
280            unsafe { ffi::solClient_msg_setPriority(msg_ptr, priority.into()) };
281        }
282
283        // Sender timestamp
284        if let Some(ts) = self.sender_ts {
285            let ts = ts
286                .duration_since(SystemTime::UNIX_EPOCH)
287                .map_err(|_| MessageBuilderError::TimestampError)?;
288            let ts: i64 = ts
289                .as_millis()
290                .try_into()
291                .map_err(|_| MessageBuilderError::TimestampError)?;
292
293            unsafe { ffi::solClient_msg_setSenderTimestamp(msg_ptr, ts) };
294        }
295
296        // Application ID
297        if let Some(id) = self.application_id {
298            // application id is copied over
299            let c_id = CString::new(id)?;
300            unsafe { ffi::solClient_msg_setApplicationMessageId(msg_ptr, c_id.as_ptr()) };
301        }
302
303        // Application Message Type
304        if let Some(message_type) = self.application_msg_type {
305            // application msg type is copied over
306            let c_type = CString::new(message_type)?;
307            unsafe { ffi::solClient_msg_setApplicationMsgType(msg_ptr, c_type.as_ptr()) };
308        }
309
310        if self.eliding_eligible.is_some() {
311            unsafe { ffi::solClient_msg_setElidingEligible(msg_ptr, true.into()) };
312        }
313
314        if self.is_reply.is_some() {
315            unsafe { ffi::solClient_msg_setAsReplyMsg(msg_ptr, true.into()) };
316        }
317
318        Ok(msg)
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::message::{DestinationType, MessageDestination};
326
327    #[test]
328    fn it_should_build_message() {
329        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
330        let _ = OutboundMessageBuilder::new()
331            .delivery_mode(DeliveryMode::Direct)
332            .destination(dest)
333            .payload("Hello")
334            .build()
335            .unwrap();
336    }
337
338    #[test]
339    fn it_should_build_with_eliding_eligible() {
340        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
341        let non_elided_msg = OutboundMessageBuilder::new()
342            .delivery_mode(DeliveryMode::Direct)
343            .destination(dest)
344            .payload("Hello")
345            .eliding_eligible(false)
346            .build()
347            .unwrap();
348
349        assert!(!non_elided_msg.is_eliding_eligible());
350
351        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
352        let elided_msg = OutboundMessageBuilder::new()
353            .delivery_mode(DeliveryMode::Direct)
354            .destination(dest)
355            .payload("Hello")
356            .eliding_eligible(true)
357            .build()
358            .unwrap();
359
360        assert!(elided_msg.is_eliding_eligible());
361    }
362
363    #[test]
364    fn it_should_build_with_is_reply() {
365        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
366        let non_reply_msg = OutboundMessageBuilder::new()
367            .delivery_mode(DeliveryMode::Direct)
368            .destination(dest)
369            .payload("Hello")
370            .is_reply(false)
371            .build()
372            .unwrap();
373
374        assert!(!non_reply_msg.is_reply());
375
376        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
377        let reply_msg = OutboundMessageBuilder::new()
378            .delivery_mode(DeliveryMode::Direct)
379            .destination(dest)
380            .payload("Hello")
381            .is_reply(true)
382            .build()
383            .unwrap();
384
385        assert!(reply_msg.is_reply());
386    }
387
388    #[test]
389    fn it_should_build_with_same_topic() {
390        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
391        let message = OutboundMessageBuilder::new()
392            .delivery_mode(DeliveryMode::Direct)
393            .destination(dest)
394            .payload("Hello")
395            .build()
396            .unwrap();
397        let message_destination = message.get_destination().unwrap().unwrap();
398
399        assert!("test_topic" == message_destination.dest.to_string_lossy());
400    }
401
402    #[test]
403    fn it_should_build_with_same_corralation_id() {
404        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
405        let message = OutboundMessageBuilder::new()
406            .delivery_mode(DeliveryMode::Direct)
407            .destination(dest)
408            .correlation_id("test_correlation")
409            .payload("Hello")
410            .build()
411            .unwrap();
412
413        let correlation_id = message.get_correlation_id().unwrap().unwrap();
414
415        assert!("test_correlation" == correlation_id);
416    }
417
418    #[test]
419    fn it_should_build_have_valid_exp() {
420        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
421        let message = OutboundMessageBuilder::new()
422            .delivery_mode(DeliveryMode::Direct)
423            .destination(dest)
424            .payload("Hello")
425            .build()
426            .unwrap();
427
428        assert!(0 == message.get_expiration());
429    }
430
431    #[test]
432    fn it_should_build_with_same_cos() {
433        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
434        let message = OutboundMessageBuilder::new()
435            .delivery_mode(DeliveryMode::Direct)
436            .destination(dest)
437            .class_of_service(ClassOfService::Two)
438            .payload("Hello")
439            .build()
440            .unwrap();
441
442        assert!(ClassOfService::Two == message.get_class_of_service().unwrap());
443    }
444
445    #[test]
446    fn it_should_build_with_same_seq_num() {
447        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
448        let message = OutboundMessageBuilder::new()
449            .delivery_mode(DeliveryMode::Direct)
450            .destination(dest)
451            .seq_number(45)
452            .payload("Hello")
453            .build()
454            .unwrap();
455
456        assert!(45 == message.get_sequence_number().unwrap().unwrap());
457    }
458
459    #[test]
460    fn it_should_build_with_same_priority() {
461        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
462        let message = OutboundMessageBuilder::new()
463            .delivery_mode(DeliveryMode::Direct)
464            .destination(dest)
465            .priority(3)
466            .payload("Hello")
467            .build()
468            .unwrap();
469
470        assert!(3 == message.get_priority().unwrap().unwrap());
471
472        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
473        let message = OutboundMessageBuilder::new()
474            .delivery_mode(DeliveryMode::Direct)
475            .destination(dest)
476            .payload("Hello")
477            .build()
478            .unwrap();
479
480        assert!(message.get_priority().unwrap().is_none());
481    }
482
483    #[test]
484    fn it_should_build_with_same_application_id() {
485        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
486        let message = OutboundMessageBuilder::new()
487            .delivery_mode(DeliveryMode::Direct)
488            .destination(dest)
489            .application_id("test_id")
490            .payload("Hello")
491            .build()
492            .unwrap();
493
494        assert!(Some("test_id") == message.get_application_message_id());
495
496        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
497        let message = OutboundMessageBuilder::new()
498            .delivery_mode(DeliveryMode::Direct)
499            .destination(dest)
500            .payload("Hello")
501            .build()
502            .unwrap();
503
504        assert!(message.get_application_message_id().is_none());
505    }
506
507    #[test]
508    fn it_should_build_with_same_application_msg_type() {
509        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
510        let message = OutboundMessageBuilder::new()
511            .delivery_mode(DeliveryMode::Direct)
512            .destination(dest)
513            .application_msg_type("test_id")
514            .payload("Hello")
515            .build()
516            .unwrap();
517
518        assert!(Some("test_id") == message.get_application_msg_type());
519
520        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
521        let message = OutboundMessageBuilder::new()
522            .delivery_mode(DeliveryMode::Direct)
523            .destination(dest)
524            .payload("Hello")
525            .build()
526            .unwrap();
527
528        assert!(message.get_application_msg_type().is_none());
529    }
530
531    #[test]
532    fn it_should_build_with_same_string_payload() {
533        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
534        let message = OutboundMessageBuilder::new()
535            .delivery_mode(DeliveryMode::Direct)
536            .destination(dest)
537            .payload("Hello")
538            .build()
539            .unwrap();
540
541        let raw_payload = message.get_payload().unwrap().unwrap();
542
543        assert!(b"Hello" == raw_payload);
544    }
545
546    #[test]
547    fn it_should_build_with_same_user_data() {
548        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
549        let message = OutboundMessageBuilder::new()
550            .delivery_mode(DeliveryMode::Direct)
551            .destination(dest)
552            .payload("Hello")
553            .user_data(32_u32.to_be_bytes())
554            .build()
555            .unwrap();
556
557        let raw_user_data = message.get_user_data().unwrap().unwrap();
558
559        assert!(32_u32.to_be_bytes() == raw_user_data);
560    }
561
562    #[test]
563    fn it_should_build_with_same_sender_timestamp() {
564        let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
565        let now = SystemTime::now();
566        let message = OutboundMessageBuilder::new()
567            .delivery_mode(DeliveryMode::Direct)
568            .destination(dest)
569            .payload("Hello")
570            .sender_timestamp(now)
571            .build()
572            .unwrap();
573
574        let ts = message.get_sender_timestamp().unwrap().unwrap();
575
576        let now = now
577            .duration_since(SystemTime::UNIX_EPOCH)
578            .unwrap()
579            .as_millis();
580        let ts = ts
581            .duration_since(SystemTime::UNIX_EPOCH)
582            .unwrap()
583            .as_millis();
584
585        assert!(now == ts);
586    }
587}