use crate::{
client::error::{AuthError, ConnectError},
codec::*,
core::{
base_types::{NonZero, QoS},
collections::UserProperties,
},
};
use futures::channel::mpsc::{self};
use std::{str, time::Duration};
use super::{
error::{PubackError, PubcompError, PubrecError},
stream::SubscribeStream,
};
pub struct ConnectRsp {
packet: ConnackRx,
}
impl TryFrom<ConnackRx> for ConnectRsp {
type Error = ConnectError;
fn try_from(packet: ConnackRx) -> Result<Self, Self::Error> {
if packet.reason as u8 >= 0x80 {
return Err(ConnectError::from(packet));
}
assert!(
bool::from(packet.subscription_identifier_available),
"Subscription identifier support is required. Check your broker settings."
);
Ok(Self { packet })
}
}
impl ConnectRsp {
pub fn session_present(&self) -> bool {
self.packet.session_present
}
pub fn reason(&self) -> ConnectReason {
self.packet.reason
}
pub fn wildcard_subscription_available(&self) -> bool {
bool::from(self.packet.wildcard_subscription_available)
}
pub fn subscription_identifier_available(&self) -> bool {
bool::from(self.packet.subscription_identifier_available)
}
pub fn shared_subscription_available(&self) -> bool {
bool::from(self.packet.shared_subscription_available)
}
pub fn maximum_qos(&self) -> QoS {
QoS::from(self.packet.maximum_qos)
}
pub fn retain_available(&self) -> bool {
bool::from(self.packet.retain_available)
}
pub fn server_keep_alive(&self) -> Option<Duration> {
self.packet
.server_keep_alive
.map(u16::from)
.map(u64::from)
.map(Duration::from_secs)
}
pub fn receive_maximum(&self) -> u16 {
NonZero::from(self.packet.receive_maximum).get()
}
pub fn topic_alias_maximum(&self) -> u16 {
u16::from(self.packet.topic_alias_maximum)
}
pub fn session_expiry_interval(&self) -> Option<Duration> {
self.packet
.session_expiry_interval
.map(u32::from)
.map(u64::from)
.map(Duration::from_secs)
}
pub fn maximum_packet_size(&self) -> Option<u32> {
self.packet
.maximum_packet_size
.map(NonZero::from)
.map(|val| val.get())
}
pub fn assigned_client_identifier(&self) -> Option<&str> {
self.packet
.assigned_client_identifier
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn response_information(&self) -> Option<&str> {
self.packet
.response_information
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn server_reference(&self) -> Option<&str> {
self.packet
.server_reference
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn authentication_method(&self) -> Option<&str> {
self.packet
.authentication_method
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn authentication_data(&self) -> Option<&[u8]> {
self.packet
.authentication_data
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
}
pub struct AuthRsp {
packet: AuthRx,
}
impl TryFrom<AuthRx> for AuthRsp {
type Error = AuthError;
fn try_from(packet: AuthRx) -> Result<Self, Self::Error> {
if packet.reason as u8 >= 0x80 {
return Err(AuthError::from(packet));
}
Ok(Self { packet })
}
}
impl AuthRsp {
pub fn reason(&self) -> AuthReason {
self.packet.reason
}
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn authentication_method(&self) -> Option<&str> {
self.packet
.authentication_method
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn authentication_data(&self) -> Option<&[u8]> {
self.packet
.authentication_data
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
}
pub struct SubscribeRsp {
pub(crate) packet: SubackRx,
pub(crate) receiver: mpsc::UnboundedReceiver<RxPacket>,
}
impl SubscribeRsp {
pub fn stream(self) -> SubscribeStream {
SubscribeStream {
receiver: self.receiver,
}
}
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
pub fn payload(&self) -> &[SubackReason] {
&self.packet.payload
}
}
pub struct UnsubscribeRsp {
pub(crate) packet: UnsubackRx,
}
impl UnsubscribeRsp {
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
pub fn payload(&self) -> &[UnsubackReason] {
&self.packet.payload
}
}
pub struct PublishData {
packet: PublishRx,
}
impl From<PublishRx> for PublishData {
fn from(packet: PublishRx) -> Self {
Self { packet }
}
}
impl PublishData {
pub fn dup(&self) -> bool {
self.packet.dup
}
pub fn retain(&self) -> bool {
self.packet.retain
}
pub fn qos(&self) -> QoS {
self.packet.qos
}
pub fn topic_name(&self) -> &str {
str::from_utf8(self.packet.topic_name.0.as_ref()).unwrap()
}
pub fn payload_format_indicator(&self) -> Option<bool> {
self.packet.payload_format_indicator.map(bool::from)
}
pub fn topic_alias(&self) -> Option<u16> {
self.packet
.topic_alias
.map(NonZero::from)
.map(|val| val.get())
}
pub fn message_expiry_interval(&self) -> Option<Duration> {
self.packet
.message_expiry_interval
.map(u32::from)
.map(u64::from)
.map(Duration::from_secs)
}
pub fn correlation_data(&self) -> Option<&[u8]> {
self.packet
.correlation_data
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
}
pub fn response_topic(&self) -> Option<&str> {
self.packet
.response_topic
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn content_type(&self) -> Option<&str> {
self.packet
.content_type
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn payload(&self) -> &[u8] {
self.packet.payload.0.as_ref()
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
pub(crate) fn subscription_identifier(&self) -> Option<u32> {
self.packet
.subscription_identifier
.map(NonZero::from)
.map(|val| val.get())
.map(|val| val.value())
}
}
pub struct PubackRsp {
pub(crate) packet: PubackRx,
}
impl PubackRsp {
pub fn reason(&self) -> PubackReason {
self.packet.reason
}
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
}
impl TryFrom<PubackRx> for PubackRsp {
type Error = PubackError;
fn try_from(packet: PubackRx) -> Result<Self, Self::Error> {
if packet.reason as u8 >= 0x80 {
return Err(PubackError::from(packet));
}
Ok(Self { packet })
}
}
pub struct PubrecRsp {
pub(crate) packet: PubrecRx,
}
impl PubrecRsp {
pub fn reason(&self) -> PubrecReason {
self.packet.reason
}
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
}
impl TryFrom<PubrecRx> for PubrecRsp {
type Error = PubrecError;
fn try_from(packet: PubrecRx) -> Result<Self, Self::Error> {
if packet.reason as u8 >= 0x80 {
return Err(PubrecError::from(packet));
}
Ok(Self { packet })
}
}
pub struct PubcompRsp {
pub(crate) packet: PubcompRx,
}
impl PubcompRsp {
pub fn reason(&self) -> PubcompReason {
self.packet.reason
}
pub fn reason_string(&self) -> Option<&str> {
self.packet
.reason_string
.as_ref()
.map(|val| &val.0)
.map(|val| val.0.as_ref())
.map(str::from_utf8)
.and_then(Result::ok)
}
pub fn user_properties(&self) -> &UserProperties {
&self.packet.user_property
}
}
impl TryFrom<PubcompRx> for PubcompRsp {
type Error = PubcompError;
fn try_from(packet: PubcompRx) -> Result<Self, Self::Error> {
if packet.reason as u8 >= 0x80 {
return Err(PubcompError::from(packet));
}
Ok(Self { packet })
}
}