1use super::destination::MessageDestination;
2use super::{ClassOfService, DeliveryMode, Message};
3use crate::SolClientReturnCode;
4use solace_rs_sys as ffi;
5use std::ffi::{c_void, CString, NulError};
6use std::ptr;
7use std::time::SystemTime;
8use thiserror::Error;
9use tracing::warn;
10
11#[derive(Error, Debug)]
12pub enum MessageBuilderError {
13 #[error("builder recieved invalid args")]
14 InvalidArgs(#[from] NulError),
15 #[error("{0} arg need to be set")]
16 MissingRequiredArgs(String),
17 #[error("{0} size need to be less than {1} found {2}")]
18 SizeErrorArgs(String, usize, usize),
19 #[error("timestamp needs to be greater than UNIX_EPOCH")]
20 TimestampError,
21 #[error("solClient message aloc failed")]
22 MessageAlocFailure,
23}
24
25type Result<T> = std::result::Result<T, MessageBuilderError>;
26
27pub struct OutboundMessage {
28 _msg_ptr: ffi::solClient_opaqueMsg_pt,
29}
30
31unsafe impl Send for OutboundMessage {}
32
33impl Drop for OutboundMessage {
34 fn drop(&mut self) {
35 debug_assert!(!self._msg_ptr.is_null());
36 let msg_free_result = unsafe { ffi::solClient_msg_free(&mut self._msg_ptr) };
37
38 let rc = SolClientReturnCode::from_raw(msg_free_result);
39
40 if !rc.is_ok() {
41 warn!("message was not dropped properly");
42 }
43 }
44}
45
46impl<'a> Message<'a> for OutboundMessage {
47 unsafe fn get_raw_message_ptr(&self) -> ffi::solClient_opaqueMsg_pt {
48 self._msg_ptr
49 }
50}
51
52#[derive(Default)]
53pub struct OutboundMessageBuilder {
54 delivery_mode: Option<DeliveryMode>,
55 destination: Option<MessageDestination>,
56 message: Option<Vec<u8>>,
57 correlation_id: Option<Vec<u8>>,
58 class_of_service: Option<ClassOfService>,
59 seq_number: Option<u64>,
60 priority: Option<u8>,
61 application_id: Option<Vec<u8>>,
62 application_msg_type: Option<Vec<u8>>,
63 user_data: Option<Vec<u8>>,
64 sender_ts: Option<SystemTime>,
65 eliding_eligible: Option<()>,
66 is_reply: Option<()>,
67}
68
69impl OutboundMessageBuilder {
70 pub fn new() -> Self {
72 Self::default()
73 }
74 pub fn delivery_mode(mut self, mode: DeliveryMode) -> Self {
75 self.delivery_mode = Some(mode);
76 self
77 }
78
79 pub fn application_id<M>(mut self, application_id: M) -> Self
80 where
81 M: Into<Vec<u8>>,
82 {
83 self.application_id = Some(application_id.into());
84 self
85 }
86
87 pub fn application_msg_type<M>(mut self, message_type: M) -> Self
88 where
89 M: Into<Vec<u8>>,
90 {
91 self.application_msg_type = Some(message_type.into());
92 self
93 }
94
95 pub fn destination(mut self, destination: MessageDestination) -> Self {
96 self.destination = Some(destination);
97 self
98 }
99
100 pub fn class_of_service(mut self, cos: ClassOfService) -> Self {
101 self.class_of_service = Some(cos);
102 self
103 }
104
105 pub fn seq_number(mut self, seq_num: u64) -> Self {
106 self.seq_number = Some(seq_num);
107 self
108 }
109
110 pub fn sender_timestamp(mut self, ts: SystemTime) -> Self {
111 self.sender_ts = Some(ts);
112 self
113 }
114
115 pub fn priority(mut self, priority: u8) -> Self {
116 self.priority = Some(priority);
117 self
118 }
119
120 pub fn is_reply(mut self, is_reply: bool) -> Self {
121 if is_reply {
122 self.is_reply = Some(());
123 } else {
124 self.is_reply = None
125 }
126 self
127 }
128
129 pub fn user_data<D>(mut self, data: D) -> Self
130 where
131 D: Into<Vec<u8>>,
132 {
133 self.user_data = Some(data.into());
134
135 self
136 }
137
138 pub fn payload<M>(mut self, message: M) -> Self
139 where
140 M: Into<Vec<u8>>,
141 {
142 self.message = Some(message.into());
156
157 self
158 }
159
160 pub fn correlation_id<M>(mut self, id: M) -> Self
161 where
162 M: Into<Vec<u8>>,
163 {
164 self.correlation_id = Some(id.into());
165 self
166 }
167
168 pub fn eliding_eligible(mut self, eliding_eligible: bool) -> Self {
169 if eliding_eligible {
170 self.eliding_eligible = Some(());
171 } else {
172 self.eliding_eligible = None;
173 }
174 self
175 }
176
177 pub fn build(self) -> Result<OutboundMessage> {
178 let mut msg_ptr: ffi::solClient_opaqueMsg_pt = ptr::null_mut();
180 let rc = unsafe { ffi::solClient_msg_alloc(&mut msg_ptr) };
181
182 let rc = SolClientReturnCode::from_raw(rc);
183
184 if !rc.is_ok() {
185 return Err(MessageBuilderError::MessageAlocFailure);
186 };
187
188 let msg = OutboundMessage { _msg_ptr: msg_ptr };
190
191 let Some(delivery_mode) = self.delivery_mode else {
196 return Err(MessageBuilderError::MissingRequiredArgs(
197 "delivery_mode".to_owned(),
198 ));
199 };
200 unsafe { ffi::solClient_msg_setDeliveryMode(msg_ptr, delivery_mode as u32) };
201
202 let Some(destination) = self.destination else {
204 return Err(MessageBuilderError::MissingRequiredArgs(
205 "destination".to_owned(),
206 ));
207 };
208 let mut destination: ffi::solClient_destination = ffi::solClient_destination {
211 destType: destination.dest_type.to_i32(),
212 dest: destination.dest.as_ptr(),
213 };
214 unsafe {
215 ffi::solClient_msg_setDestination(
216 msg_ptr,
217 &mut destination,
218 std::mem::size_of::<ffi::solClient_destination>(),
219 )
220 };
221
222 if let Some(user_data) = self.user_data {
223 if user_data.len()
224 > ffi::SOLCLIENT_BUFINFO_MAX_USER_DATA_SIZE
225 .try_into()
226 .unwrap()
227 {
228 return Err(MessageBuilderError::SizeErrorArgs(
229 "user_data".to_owned(),
230 user_data.len(),
231 ffi::SOLCLIENT_BUFINFO_MAX_USER_DATA_SIZE
232 .try_into()
233 .unwrap(),
234 ));
235 }
236 unsafe {
238 ffi::solClient_msg_setUserData(
239 msg_ptr,
240 user_data.as_ptr() as *const c_void,
241 user_data.len() as u32,
242 )
243 };
244 }
245
246 let Some(message) = self.message else {
249 return Err(MessageBuilderError::MissingRequiredArgs(
250 "message".to_owned(),
251 ));
252 };
253 unsafe {
254 ffi::solClient_msg_setBinaryAttachment(
255 msg_ptr,
256 message.as_ptr() as *const c_void,
257 message.len() as u32,
258 )
259 };
260
261 if let Some(id) = self.correlation_id {
263 let c_id = CString::new(id)?;
265 unsafe { ffi::solClient_msg_setCorrelationId(msg_ptr, c_id.as_ptr()) };
266 }
267
268 if let Some(cos) = self.class_of_service {
270 unsafe { ffi::solClient_msg_setClassOfService(msg_ptr, cos.into()) };
271 }
272
273 if let Some(seq_number) = self.seq_number {
275 unsafe { ffi::solClient_msg_setSequenceNumber(msg_ptr, seq_number) };
276 }
277
278 if let Some(priority) = self.priority {
280 unsafe { ffi::solClient_msg_setPriority(msg_ptr, priority.into()) };
281 }
282
283 if let Some(ts) = self.sender_ts {
285 let ts = ts
286 .duration_since(SystemTime::UNIX_EPOCH)
287 .map_err(|_| MessageBuilderError::TimestampError)?;
288 let ts: i64 = ts
289 .as_millis()
290 .try_into()
291 .map_err(|_| MessageBuilderError::TimestampError)?;
292
293 unsafe { ffi::solClient_msg_setSenderTimestamp(msg_ptr, ts) };
294 }
295
296 if let Some(id) = self.application_id {
298 let c_id = CString::new(id)?;
300 unsafe { ffi::solClient_msg_setApplicationMessageId(msg_ptr, c_id.as_ptr()) };
301 }
302
303 if let Some(message_type) = self.application_msg_type {
305 let c_type = CString::new(message_type)?;
307 unsafe { ffi::solClient_msg_setApplicationMsgType(msg_ptr, c_type.as_ptr()) };
308 }
309
310 if self.eliding_eligible.is_some() {
311 unsafe { ffi::solClient_msg_setElidingEligible(msg_ptr, true.into()) };
312 }
313
314 if self.is_reply.is_some() {
315 unsafe { ffi::solClient_msg_setAsReplyMsg(msg_ptr, true.into()) };
316 }
317
318 Ok(msg)
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::message::{DestinationType, MessageDestination};
326
327 #[test]
328 fn it_should_build_message() {
329 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
330 let _ = OutboundMessageBuilder::new()
331 .delivery_mode(DeliveryMode::Direct)
332 .destination(dest)
333 .payload("Hello")
334 .build()
335 .unwrap();
336 }
337
338 #[test]
339 fn it_should_build_with_eliding_eligible() {
340 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
341 let non_elided_msg = OutboundMessageBuilder::new()
342 .delivery_mode(DeliveryMode::Direct)
343 .destination(dest)
344 .payload("Hello")
345 .eliding_eligible(false)
346 .build()
347 .unwrap();
348
349 assert!(!non_elided_msg.is_eliding_eligible());
350
351 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
352 let elided_msg = OutboundMessageBuilder::new()
353 .delivery_mode(DeliveryMode::Direct)
354 .destination(dest)
355 .payload("Hello")
356 .eliding_eligible(true)
357 .build()
358 .unwrap();
359
360 assert!(elided_msg.is_eliding_eligible());
361 }
362
363 #[test]
364 fn it_should_build_with_is_reply() {
365 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
366 let non_reply_msg = OutboundMessageBuilder::new()
367 .delivery_mode(DeliveryMode::Direct)
368 .destination(dest)
369 .payload("Hello")
370 .is_reply(false)
371 .build()
372 .unwrap();
373
374 assert!(!non_reply_msg.is_reply());
375
376 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
377 let reply_msg = OutboundMessageBuilder::new()
378 .delivery_mode(DeliveryMode::Direct)
379 .destination(dest)
380 .payload("Hello")
381 .is_reply(true)
382 .build()
383 .unwrap();
384
385 assert!(reply_msg.is_reply());
386 }
387
388 #[test]
389 fn it_should_build_with_same_topic() {
390 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
391 let message = OutboundMessageBuilder::new()
392 .delivery_mode(DeliveryMode::Direct)
393 .destination(dest)
394 .payload("Hello")
395 .build()
396 .unwrap();
397 let message_destination = message.get_destination().unwrap().unwrap();
398
399 assert!("test_topic" == message_destination.dest.to_string_lossy());
400 }
401
402 #[test]
403 fn it_should_build_with_same_corralation_id() {
404 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
405 let message = OutboundMessageBuilder::new()
406 .delivery_mode(DeliveryMode::Direct)
407 .destination(dest)
408 .correlation_id("test_correlation")
409 .payload("Hello")
410 .build()
411 .unwrap();
412
413 let correlation_id = message.get_correlation_id().unwrap().unwrap();
414
415 assert!("test_correlation" == correlation_id);
416 }
417
418 #[test]
419 fn it_should_build_have_valid_exp() {
420 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
421 let message = OutboundMessageBuilder::new()
422 .delivery_mode(DeliveryMode::Direct)
423 .destination(dest)
424 .payload("Hello")
425 .build()
426 .unwrap();
427
428 assert!(0 == message.get_expiration());
429 }
430
431 #[test]
432 fn it_should_build_with_same_cos() {
433 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
434 let message = OutboundMessageBuilder::new()
435 .delivery_mode(DeliveryMode::Direct)
436 .destination(dest)
437 .class_of_service(ClassOfService::Two)
438 .payload("Hello")
439 .build()
440 .unwrap();
441
442 assert!(ClassOfService::Two == message.get_class_of_service().unwrap());
443 }
444
445 #[test]
446 fn it_should_build_with_same_seq_num() {
447 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
448 let message = OutboundMessageBuilder::new()
449 .delivery_mode(DeliveryMode::Direct)
450 .destination(dest)
451 .seq_number(45)
452 .payload("Hello")
453 .build()
454 .unwrap();
455
456 assert!(45 == message.get_sequence_number().unwrap().unwrap());
457 }
458
459 #[test]
460 fn it_should_build_with_same_priority() {
461 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
462 let message = OutboundMessageBuilder::new()
463 .delivery_mode(DeliveryMode::Direct)
464 .destination(dest)
465 .priority(3)
466 .payload("Hello")
467 .build()
468 .unwrap();
469
470 assert!(3 == message.get_priority().unwrap().unwrap());
471
472 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
473 let message = OutboundMessageBuilder::new()
474 .delivery_mode(DeliveryMode::Direct)
475 .destination(dest)
476 .payload("Hello")
477 .build()
478 .unwrap();
479
480 assert!(message.get_priority().unwrap().is_none());
481 }
482
483 #[test]
484 fn it_should_build_with_same_application_id() {
485 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
486 let message = OutboundMessageBuilder::new()
487 .delivery_mode(DeliveryMode::Direct)
488 .destination(dest)
489 .application_id("test_id")
490 .payload("Hello")
491 .build()
492 .unwrap();
493
494 assert!(Some("test_id") == message.get_application_message_id());
495
496 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
497 let message = OutboundMessageBuilder::new()
498 .delivery_mode(DeliveryMode::Direct)
499 .destination(dest)
500 .payload("Hello")
501 .build()
502 .unwrap();
503
504 assert!(message.get_application_message_id().is_none());
505 }
506
507 #[test]
508 fn it_should_build_with_same_application_msg_type() {
509 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
510 let message = OutboundMessageBuilder::new()
511 .delivery_mode(DeliveryMode::Direct)
512 .destination(dest)
513 .application_msg_type("test_id")
514 .payload("Hello")
515 .build()
516 .unwrap();
517
518 assert!(Some("test_id") == message.get_application_msg_type());
519
520 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
521 let message = OutboundMessageBuilder::new()
522 .delivery_mode(DeliveryMode::Direct)
523 .destination(dest)
524 .payload("Hello")
525 .build()
526 .unwrap();
527
528 assert!(message.get_application_msg_type().is_none());
529 }
530
531 #[test]
532 fn it_should_build_with_same_string_payload() {
533 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
534 let message = OutboundMessageBuilder::new()
535 .delivery_mode(DeliveryMode::Direct)
536 .destination(dest)
537 .payload("Hello")
538 .build()
539 .unwrap();
540
541 let raw_payload = message.get_payload().unwrap().unwrap();
542
543 assert!(b"Hello" == raw_payload);
544 }
545
546 #[test]
547 fn it_should_build_with_same_user_data() {
548 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
549 let message = OutboundMessageBuilder::new()
550 .delivery_mode(DeliveryMode::Direct)
551 .destination(dest)
552 .payload("Hello")
553 .user_data(32_u32.to_be_bytes())
554 .build()
555 .unwrap();
556
557 let raw_user_data = message.get_user_data().unwrap().unwrap();
558
559 assert!(32_u32.to_be_bytes() == raw_user_data);
560 }
561
562 #[test]
563 fn it_should_build_with_same_sender_timestamp() {
564 let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
565 let now = SystemTime::now();
566 let message = OutboundMessageBuilder::new()
567 .delivery_mode(DeliveryMode::Direct)
568 .destination(dest)
569 .payload("Hello")
570 .sender_timestamp(now)
571 .build()
572 .unwrap();
573
574 let ts = message.get_sender_timestamp().unwrap().unwrap();
575
576 let now = now
577 .duration_since(SystemTime::UNIX_EPOCH)
578 .unwrap()
579 .as_millis();
580 let ts = ts
581 .duration_since(SystemTime::UNIX_EPOCH)
582 .unwrap()
583 .as_millis();
584
585 assert!(now == ts);
586 }
587}