use super::destination::MessageDestination;
use super::{ClassOfService, DeliveryMode, Message};
use crate::SolClientReturnCode;
use num_traits::FromPrimitive;
use solace_rs_sys as ffi;
use std::ffi::{c_void, CString, NulError};
use std::ptr;
use thiserror::Error;
use tracing::warn;
#[derive(Error, Debug)]
pub enum MessageBuilderError {
#[error("builder recieved invalid args")]
InvalidArgs(#[from] NulError),
#[error("{0} arg need to be set")]
MissingRequiredArgs(String),
#[error("solClient returned not ok code")]
SolClientError,
#[error("solClient message aloc failed")]
MessageAlocFailure,
}
type Result<T> = std::result::Result<T, MessageBuilderError>;
pub struct OutboundMessage {
msg_ptr: ffi::solClient_opaqueMsg_pt,
}
impl Drop for OutboundMessage {
fn drop(&mut self) {
let msg_free_result = unsafe { ffi::solClient_msg_free(&mut self.msg_ptr) };
if SolClientReturnCode::from_i32(msg_free_result) != Some(SolClientReturnCode::Ok) {
warn!("message was not dropped properly");
}
}
}
impl<'a> Message<'a> for OutboundMessage {
unsafe fn get_raw_message_ptr(&self) -> ffi::solClient_opaqueMsg_pt {
self.msg_ptr
}
}
#[derive(Default)]
pub struct OutboundMessageBuilder {
delivery_mode: Option<DeliveryMode>,
destination: Option<MessageDestination>,
message: Option<Vec<u8>>,
correlation_id: Option<Vec<u8>>,
class_of_service: Option<ClassOfService>,
seq_number: Option<u64>,
priority: Option<u8>,
application_id: Option<Vec<u8>>,
application_msg_type: Option<Vec<u8>>,
}
impl OutboundMessageBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn delivery_mode(mut self, mode: DeliveryMode) -> Self {
self.delivery_mode = Some(mode);
self
}
pub fn application_id<M>(mut self, application_id: M) -> Self
where
M: Into<Vec<u8>>,
{
self.application_id = Some(application_id.into());
self
}
pub fn application_msg_type<M>(mut self, message_type: M) -> Self
where
M: Into<Vec<u8>>,
{
self.application_msg_type = Some(message_type.into());
self
}
pub fn destination(mut self, destination: MessageDestination) -> Self {
self.destination = Some(destination);
self
}
pub fn class_of_service(mut self, cos: ClassOfService) -> Self {
self.class_of_service = Some(cos);
self
}
pub fn seq_number(mut self, seq_num: u64) -> Self {
self.seq_number = Some(seq_num);
self
}
pub fn priority(mut self, priority: u8) -> Self {
self.priority = Some(priority);
self
}
pub fn payload<M>(mut self, message: M) -> Self
where
M: Into<Vec<u8>>,
{
self.message = Some(message.into());
self
}
pub fn correlation_id<M>(mut self, id: M) -> Self
where
M: Into<Vec<u8>>,
{
self.correlation_id = Some(id.into());
self
}
pub fn build(self) -> Result<OutboundMessage> {
let mut msg_ptr: ffi::solClient_opaqueMsg_pt = ptr::null_mut();
let msg_alloc_result = unsafe { ffi::solClient_msg_alloc(&mut msg_ptr) };
if Some(SolClientReturnCode::Ok) != SolClientReturnCode::from_i32(msg_alloc_result) {
return Err(MessageBuilderError::MessageAlocFailure);
};
let Some(delivery_mode) = self.delivery_mode else {
return Err(MessageBuilderError::MissingRequiredArgs(
"delivery_mode".to_owned(),
));
};
unsafe { ffi::solClient_msg_setDeliveryMode(msg_ptr, delivery_mode as u32) };
let Some(destination) = self.destination else {
return Err(MessageBuilderError::MissingRequiredArgs(
"destination".to_owned(),
));
};
let mut destination: ffi::solClient_destination = ffi::solClient_destination {
destType: destination.dest_type.to_i32(),
dest: destination.dest.as_ptr(),
};
unsafe {
ffi::solClient_msg_setDestination(
msg_ptr,
&mut destination,
std::mem::size_of::<ffi::solClient_destination>(),
)
};
let Some(message) = self.message else {
return Err(MessageBuilderError::MissingRequiredArgs(
"message".to_owned(),
));
};
unsafe {
ffi::solClient_msg_setBinaryAttachment(
msg_ptr,
message.as_ptr() as *const c_void,
message.len() as u32,
)
};
if let Some(id) = self.correlation_id {
unsafe { ffi::solClient_msg_setCorrelationId(msg_ptr, CString::new(id)?.as_ptr()) };
}
if let Some(cos) = self.class_of_service {
unsafe { ffi::solClient_msg_setClassOfService(msg_ptr, cos.into()) };
}
if let Some(seq_number) = self.seq_number {
unsafe { ffi::solClient_msg_setSequenceNumber(msg_ptr, seq_number) };
}
if let Some(priority) = self.priority {
unsafe { ffi::solClient_msg_setPriority(msg_ptr, priority.into()) };
}
if let Some(id) = self.application_id {
unsafe {
ffi::solClient_msg_setApplicationMessageId(msg_ptr, CString::new(id)?.as_ptr())
};
}
if let Some(message_type) = self.application_msg_type {
unsafe {
ffi::solClient_msg_setApplicationMsgType(
msg_ptr,
CString::new(message_type)?.as_ptr(),
)
};
}
Ok(OutboundMessage { msg_ptr })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::{DestinationType, MessageDestination};
#[test]
fn it_should_build_message() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let _ = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
}
#[test]
fn it_should_build_with_same_topic() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
let message_destination = message.get_destination().unwrap().unwrap();
assert!("test_topic" == message_destination.dest.to_string_lossy());
}
#[test]
fn it_should_build_with_same_corralation_id() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.correlation_id("test_correlation")
.payload("Hello")
.build()
.unwrap();
let correlation_id = message.get_correlation_id().unwrap();
assert!("test_correlation" == correlation_id);
}
#[test]
fn it_should_build_have_valid_exp() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
assert!(0 == message.get_expiration());
}
#[test]
fn it_should_build_with_same_cos() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.class_of_service(ClassOfService::Two)
.payload("Hello")
.build()
.unwrap();
assert!(ClassOfService::Two == message.get_class_of_service().unwrap());
}
#[test]
fn it_should_build_with_same_seq_num() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.seq_number(45)
.payload("Hello")
.build()
.unwrap();
assert!(45 == message.get_sequence_number().unwrap().unwrap());
}
#[test]
fn it_should_build_with_same_priority() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.priority(3)
.payload("Hello")
.build()
.unwrap();
assert!(3 == message.get_priority().unwrap().unwrap());
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
assert!(message.get_priority().unwrap().is_none());
}
#[test]
fn it_should_build_with_same_application_id() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.application_id("test_id")
.payload("Hello")
.build()
.unwrap();
assert!(Some("test_id") == message.get_application_message_id());
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
assert!(message.get_application_message_id().is_none());
}
#[test]
fn it_should_build_with_same_application_msg_type() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.application_msg_type("test_id")
.payload("Hello")
.build()
.unwrap();
assert!(Some("test_id") == message.get_application_msg_type());
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
assert!(message.get_application_msg_type().is_none());
}
#[test]
fn it_should_build_with_same_string_payload() {
let dest = MessageDestination::new(DestinationType::Topic, "test_topic").unwrap();
let message = OutboundMessageBuilder::new()
.delivery_mode(DeliveryMode::Direct)
.destination(dest)
.payload("Hello")
.build()
.unwrap();
let raw_payload = message.get_payload().unwrap();
assert!(b"Hello" == raw_payload);
}
}