use libmqm_default as default;
use libmqm_sys as mq;
use super::option;
use crate::{
Object,
connection::AsConnection,
constants,
handle::{ObjectHandle, SubscriptionHandle},
prelude::*,
result::{ResultComp, ResultCompErr},
structs,
types::{MQCO, MQLONG},
};
#[derive(Debug)]
#[must_use]
pub struct Subscription<C: AsConnection> {
handle: SubscriptionHandle,
connection: C,
close_options: MQCO,
}
impl<C: AsConnection> Subscription<C> {
pub fn close(self) -> ResultComp<()> {
let mut s = self;
let conn = s.connection.as_connection();
conn.mq.mqclose(conn.handle, &mut s.handle, s.close_options)
}
pub fn request_retained(&self, request_options: &impl option::SubscribeRequestOption) -> ResultComp<MQLONG> {
let mut srp = option::SubscribeRequestParam {
sro: structs::MQSRO::new(default::MQSRO_DEFAULT),
sr: constants::MQSR_ACTION_PUBLICATION,
};
request_options.apply_param(&mut srp);
assert!(srp.sro.Version <= mq::MQSRO_CURRENT_VERSION);
let conn = self.connection.as_connection();
conn.mq
.mqsubrq(conn.handle, &self.handle, srp.sr, Some(&mut srp.sro))
.map_completion(|()| srp.sro.NumPubs)
}
}
impl<C: AsConnection> Drop for Subscription<C> {
fn drop(&mut self) {
if self.handle.is_closeable() {
let conn = self.connection.as_connection();
let _ = conn.mq.mqclose(conn.handle, &mut self.handle, self.close_options);
}
}
}
impl<C: AsConnection + Clone> Subscription<C> {
pub fn subscribe<'so>(connection: C, subscribe_option: &impl option::SubscribeOption<'so>) -> ResultComp<Self> {
Self::subscribe_as(connection, subscribe_option)
}
pub fn subscribe_with<'so, A>(connection: C, subscribe_option: &impl option::SubscribeOption<'so>) -> ResultComp<(Self, A)>
where
A: option::SubscribeAttr<C>,
{
Self::subscribe_as(connection, subscribe_option)
}
pub fn subscribe_managed_with<'so, A>(
connection: C,
subscribe_option: impl option::SubscribeOption<'so>,
) -> ResultComp<(Self, Object<C>, A)>
where
A: option::SubscribeAttr<C>,
{
Self::subscribe_as::<(Self, Option<Object<C>>, A)>(connection, &(constants::MQSO_MANAGED, subscribe_option))
.map_completion(|(qm, queue, attr)| {
(
qm,
queue.expect("managed queue should always be returned with MQSO_MANAGED option"),
attr,
)
})
}
pub fn subscribe_managed<'so>(
connection: C,
subscribe_option: impl option::SubscribeOption<'so>,
) -> ResultComp<(Self, Object<C>)> {
Self::subscribe_managed_with::<()>(connection, subscribe_option).map_completion(|(sub, queue, ..)| (sub, queue))
}
pub(super) fn subscribe_as<'so, R>(
connection: C,
subscribe_option: &impl option::SubscribeOption<'so>,
) -> ResultCompErr<R, <R as option::SubscribeValue<C>>::Error>
where
R: option::SubscribeValue<C>,
{
use libmqm_sys::MQHO_NONE;
let mut so = option::SubscribeParam {
close_options: MQCO::default(),
sd: structs::MQSD::new(default::MQSD_DEFAULT),
provided_object: MQHO_NONE,
};
subscribe_option.apply_param(&mut so);
assert!(so.sd.Version <= mq::MQSD_CURRENT_VERSION);
R::subscribe_consume(&mut so, |param| {
let mut obj_handle = ObjectHandle::from(param.provided_object);
let conn = connection.as_connection();
let mqsub_result = unsafe { conn.mq.mqsub(conn.handle, &mut param.sd, &mut obj_handle) };
mqsub_result.map_completion(|handle| {
let new_raw_handle = obj_handle.raw_handle();
let object = match (param.provided_object, new_raw_handle) {
(_, MQHO_NONE) => None,
(original, new) if original == new => None,
(_, new) => Some(Object::from_parts(connection.clone(), ObjectHandle::from(new))),
};
option::SubscribeState {
subscription: Self {
handle,
connection,
close_options: param.close_options,
},
object,
}
})
})
}
}
#[cfg(all(test, feature = "mock"))]
#[cfg_attr(coverage_nightly, coverage(off))]
mod test {
use super::*;
use crate::test::mock;
#[test]
pub fn test_request_retained() -> Result<(), Box<dyn std::error::Error>> {
let qm = mock::connect_ok(|mock_library| {
mock_library
.expect_MQSUBRQ()
.returning(|_, _, _, sro, cc, rc| {
let mqsro = sro.expect("MQRSO should never be a null pointer");
mqsro.NumPubs = 5;
mock::mqi_outcome_ok(cc, rc);
})
.once();
mock_library
.expect_MQCLOSE()
.withf(|_, &hobj, _, _, _| 1 == hobj)
.returning(|_, _, _, cc, rc| {
mock::mqi_outcome_ok(cc, rc);
})
.once();
});
let sub = Subscription {
handle: 1.into(),
connection: qm,
close_options: MQCO::default(),
};
assert_eq!(sub.request_retained(&()).warn_as_error()?, 5);
Ok(())
}
}