solace-rs 0.8.0

The Unofficial Solace PubSub+ Rust Client Library. It can be used to access the services of a Solace PubSub+ Event Broker. This is a wrapper around the Solace C library. Use under your own risk! The C library is not included.
Documentation
use super::{CacheStatus, Message, MessageError, Result};
use crate::SolClientReturnCode;
use enum_primitive::*;
use solace_rs_sys as ffi;
use std::convert::From;
use std::ffi::CStr;
use std::time::{Duration, SystemTime};
use std::{fmt, ptr};
use tracing::warn;

pub struct InboundMessage {
    _msg_ptr: ffi::solClient_opaqueMsg_pt,
}

impl fmt::Debug for InboundMessage {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut f = f.debug_struct("InboundMessage");
        if self.get_receive_timestamp().is_ok_and(|v| v.is_some()) {
            f.field(
                "receive_timestamp",
                &format_args!("{:?}", self.get_receive_timestamp().unwrap().unwrap()),
            );
        }
        if self.get_sender_id().is_ok_and(|v| v.is_some()) {
            f.field(
                "sender_id",
                &format_args!("{}", self.get_sender_id().unwrap().unwrap()),
            );
        }
        if self.get_sender_timestamp().is_ok_and(|v| v.is_some()) {
            f.field(
                "sender_timestamp",
                &format_args!("{:?}", self.get_sender_timestamp().unwrap().unwrap()),
            );
        }
        if self.get_sequence_number().is_ok_and(|v| v.is_some()) {
            f.field(
                "sequence_number",
                &format_args!("{}", self.get_sequence_number().unwrap().unwrap()),
            );
        }
        if self.get_correlation_id().is_ok_and(|v| v.is_some()) {
            f.field(
                "correlation_id",
                &format_args!("{}", self.get_correlation_id().unwrap().unwrap()),
            );
        }
        if self.get_priority().is_ok_and(|v| v.is_some()) {
            f.field(
                "priority",
                &format_args!("{}", self.get_priority().unwrap().unwrap()),
            );
        }
        if self.is_discard_indication() {
            f.field(
                "is_discard_indication",
                &format_args!("{}", self.is_discard_indication()),
            );
        }
        if self.get_application_message_id().is_some() {
            f.field(
                "application_message_id",
                &format_args!("{}", &self.get_application_message_id().unwrap()),
            );
        }
        if self.get_user_data().is_ok_and(|v| v.is_some()) {
            if let Ok(v) = std::str::from_utf8(self.get_user_data().unwrap().unwrap()) {
                f.field("user_data", &v);
            }
        }
        if self.get_destination().is_ok_and(|v| v.is_some()) {
            f.field("destination", &self.get_destination().unwrap().unwrap());
        }

        f.field("is_reply", &self.is_reply());

        if self.get_reply_to().is_ok_and(|v| v.is_some()) {
            f.field("reply_to", &self.get_reply_to().unwrap().unwrap());
        }

        f.field("is_cache_msg", &self.is_cache_msg());

        if self.get_cache_request_id().is_ok_and(|v| v.is_some()) {
            f.field(
                "cache_request_id",
                &self.get_cache_request_id().unwrap().unwrap(),
            );
        }

        if self.get_payload().is_ok_and(|v| v.is_some()) {
            if let Ok(v) = std::str::from_utf8(self.get_payload().unwrap().unwrap()) {
                f.field("payload", &v);
            }
        }
        f.finish()
    }
}

unsafe impl Send for InboundMessage {}

impl Drop for InboundMessage {
    fn drop(&mut self) {
        let rc = unsafe { ffi::solClient_msg_free(&mut self._msg_ptr) };

        let rc = SolClientReturnCode::from_raw(rc);
        if !rc.is_ok() {
            warn!("warning: message was not dropped properly");
        }
    }
}

impl From<ffi::solClient_opaqueMsg_pt> for InboundMessage {
    /// .
    ///
    /// # Safety
    ///
    /// From a valid owned pointer.
    /// No other alias should exist for this pointer
    /// InboundMessage will try to free the ptr when it is destroyed
    ///
    /// .
    fn from(ptr: ffi::solClient_opaqueMsg_pt) -> Self {
        Self { _msg_ptr: ptr }
    }
}

impl<'a> Message<'a> for InboundMessage {
    unsafe fn get_raw_message_ptr(&self) -> ffi::solClient_opaqueMsg_pt {
        self._msg_ptr
    }
}

impl InboundMessage {
    pub fn get_receive_timestamp(&self) -> Result<Option<SystemTime>> {
        let mut ts: i64 = 0;
        let rc = unsafe { ffi::solClient_msg_getRcvTimestamp(self.get_raw_message_ptr(), &mut ts) };

        let rc = SolClientReturnCode::from_raw(rc);
        match rc {
            SolClientReturnCode::NotFound => Ok(None),
            SolClientReturnCode::Ok => Ok(Some(
                SystemTime::UNIX_EPOCH + Duration::from_millis(ts.try_into().unwrap()),
            )),
            _ => Err(MessageError::FieldError("receive_timestamp", rc)),
        }
    }

    pub fn get_sender_id(&self) -> Result<Option<&str>> {
        let mut buffer = ptr::null();

        let rc = unsafe { ffi::solClient_msg_getSenderId(self.get_raw_message_ptr(), &mut buffer) };

        let rc = SolClientReturnCode::from_raw(rc);
        match rc {
            SolClientReturnCode::Ok => (),
            SolClientReturnCode::NotFound => return Ok(None),
            _ => return Err(MessageError::FieldError("sender_id", rc)),
        }

        let c_str = unsafe { CStr::from_ptr(buffer) };

        let str = c_str
            .to_str()
            .map_err(|_| MessageError::FieldConvertionError("sender_id"))?;

        Ok(Some(str))
    }

    pub fn is_discard_indication(&self) -> bool {
        let discard_indication =
            unsafe { ffi::solClient_msg_isDiscardIndication(self.get_raw_message_ptr()) };

        if discard_indication == 0 {
            return false;
        }

        true
    }

    pub fn get_cache_request_id(&self) -> Result<Option<u64>> {
        let mut id: u64 = 0;

        let rc =
            unsafe { ffi::solClient_msg_getCacheRequestId(self.get_raw_message_ptr(), &mut id) };

        let rc = SolClientReturnCode::from_raw(rc);
        match rc {
            SolClientReturnCode::Ok => Ok(Some(id)),
            SolClientReturnCode::NotFound => Ok(None),
            _ => Err(MessageError::FieldError("cache_request_id", rc)),
        }
    }

    pub fn is_cache_msg(&self) -> CacheStatus {
        let raw = unsafe { ffi::solClient_msg_isCacheMsg(self.get_raw_message_ptr()) };
        CacheStatus::from_i32(raw).unwrap_or(CacheStatus::InvalidMessage)
    }
}