#[cfg(any(feature = "fuzzy", test))]
use arbitrary::{Arbitrary, Error as ArbitraryError, Unstructured};
use std::{fmt, io, net, result};
use crate::v5;
use crate::{Blob, ClientID, PacketID, PacketType, Packetize, QoS, QueueStatus};
use crate::{ReasonCode, Result};
use crate::{Subscription, TopicName};
pub type QueuePkt = QueueStatus<QPacket>;
#[derive(Clone, Eq, PartialEq)]
pub enum Protocol {
V5(v5::Protocol),
None,
}
impl Default for Protocol {
fn default() -> Protocol {
Protocol::None
}
}
impl From<v5::Protocol> for Protocol {
fn from(proto: v5::Protocol) -> Protocol {
Protocol::V5(proto)
}
}
impl Protocol {
#[inline]
pub fn is_listen(&self) -> bool {
match self {
Protocol::V5(proto) => proto.is_listen(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn to_listen_address(&self) -> net::SocketAddr {
match self {
Protocol::V5(proto) => proto.to_listen_address(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn to_listen_port(&self) -> u16 {
match self {
Protocol::V5(proto) => proto.to_listen_port(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn maximum_qos(&self) -> QoS {
match self {
Protocol::V5(proto) => proto.maximum_qos(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn retain_available(&self) -> bool {
match self {
Protocol::V5(proto) => proto.retain_available(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn max_packet_size(&self) -> u32 {
match self {
Protocol::V5(proto) => proto.max_packet_size(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn keep_alive(&self) -> Option<u16> {
match self {
Protocol::V5(proto) => proto.keep_alive(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn keep_alive_factor(&self) -> f32 {
match self {
Protocol::V5(proto) => proto.keep_alive_factor(),
Protocol::None => unreachable!(),
}
}
#[inline]
pub fn topic_alias_max(&self) -> Option<u16> {
match self {
Protocol::V5(proto) => proto.topic_alias_max(),
Protocol::None => unreachable!(),
}
}
}
impl Protocol {
#[inline]
pub fn handshake(&self, prefix: &str, conn: mio::net::TcpStream) -> Result<Socket> {
match self {
Protocol::V5(proto) => Ok(Socket::V5(proto.handshake(prefix, conn)?)),
Protocol::None => unreachable!(),
}
}
}
impl Protocol {
pub fn new_ping_resp(&self, ping_req: QPacket) -> QPacket {
match self {
Protocol::V5(proto) => match ping_req {
QPacket::V5(ping_req) => proto.new_ping_resp(ping_req),
},
Protocol::None => unreachable!(),
}
}
pub fn new_pub_ack(&self, packet_id: PacketID) -> QPacket {
match self {
Protocol::V5(proto) => proto.new_pub_ack(packet_id),
Protocol::None => unreachable!(),
}
}
pub fn new_sub_ack(&self, sub: &QPacket, rcodes: Vec<ReasonCode>) -> QPacket {
match self {
Protocol::V5(proto) => match sub {
QPacket::V5(sub) => proto.new_sub_ack(sub, rcodes),
},
Protocol::None => unreachable!(),
}
}
pub fn new_unsub_ack(&self, unsub: &QPacket, rcodes: Vec<ReasonCode>) -> QPacket {
match self {
Protocol::V5(proto) => match unsub {
QPacket::V5(unsub) => proto.new_unsub_ack(unsub, rcodes),
},
Protocol::None => unreachable!(),
}
}
}
pub enum Socket {
V5(v5::Socket),
}
impl mio::event::Source for Socket {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
match self {
Socket::V5(sock) => sock.register(registry, token, interests),
}
}
fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
match self {
Socket::V5(sock) => sock.reregister(registry, token, interests),
}
}
fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
match self {
Socket::V5(sock) => sock.deregister(registry),
}
}
}
impl Socket {
#[inline]
pub fn set_mio_token(&mut self, token: mio::Token) {
match self {
Socket::V5(sock) => sock.set_mio_token(token),
}
}
#[inline]
pub fn set_shard_id(&mut self, shard_id: u32) {
match self {
Socket::V5(sock) => sock.set_shard_id(shard_id),
}
}
}
impl Socket {
#[inline]
pub fn peer_addr(&self) -> net::SocketAddr {
match self {
Socket::V5(sock) => sock.peer_addr(),
}
}
#[inline]
pub fn as_client_id(&self) -> &ClientID {
match self {
Socket::V5(sock) => sock.as_client_id(),
}
}
#[inline]
pub fn to_mio_token(&self) -> mio::Token {
match self {
Socket::V5(sock) => sock.to_mio_token(),
}
}
#[inline]
pub fn to_protocol(&self) -> Protocol {
match self {
Socket::V5(sock) => Protocol::V5(sock.to_protocol()),
}
}
#[inline]
pub fn client_keep_alive(&self) -> u16 {
match self {
Socket::V5(sock) => sock.client_keep_alive(),
}
}
#[inline]
pub fn client_receive_maximum(&self) -> u16 {
match self {
Socket::V5(sock) => sock.client_receive_maximum(),
}
}
#[inline]
pub fn client_session_expiry_interval(&self) -> Option<u32> {
match self {
Socket::V5(sock) => sock.client_session_expiry_interval(),
}
}
#[inline]
pub fn is_clean_start(&self) -> bool {
match self {
Socket::V5(sock) => sock.is_clean_start(),
}
}
}
impl Socket {
#[inline]
pub fn read_packet(&mut self, prefix: &str) -> Result<QueuePkt> {
match self {
Socket::V5(sock) => sock.read_packet(prefix),
}
}
#[inline]
pub fn write_packet(&mut self, prefix: &str, blob: Option<Blob>) -> QueuePkt {
match self {
Socket::V5(sock) => sock.write_packet(prefix, blob),
}
}
#[inline]
pub fn disconnect(&mut self, prefix: &str, code: ReasonCode) {
match self {
Socket::V5(sock) => sock.disconnect(prefix, code),
}
}
#[inline]
pub fn new_conn_ack(&self, rcode: ReasonCode) -> QPacket {
match self {
Socket::V5(sock) => sock.new_conn_ack(rcode),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum QPacket {
V5(v5::Packet),
}
impl fmt::Display for QPacket {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
match self {
QPacket::V5(pkt) => write!(f, "{}", pkt),
}
}
}
#[cfg(any(feature = "fuzzy", test))]
use std::cmp;
#[cfg(any(feature = "fuzzy", test))]
impl PartialOrd for QPacket {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
self.as_topic_name().partial_cmp(other.as_topic_name())
}
}
#[cfg(any(feature = "fuzzy", test))]
impl Ord for QPacket {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.as_topic_name().cmp(other.as_topic_name())
}
}
impl From<QPacket> for v5::Packet {
#[inline]
fn from(val: QPacket) -> v5::Packet {
match val {
QPacket::V5(pkt) => pkt,
}
}
}
impl From<v5::Packet> for QPacket {
#[inline]
fn from(val: v5::Packet) -> QPacket {
QPacket::V5(val)
}
}
#[cfg(any(feature = "fuzzy", test))]
impl<'a> Arbitrary<'a> for QPacket {
#[inline]
fn arbitrary(uns: &mut Unstructured<'a>) -> result::Result<Self, ArbitraryError> {
Ok(QPacket::V5(uns.arbitrary()?))
}
}
impl Packetize for QPacket {
#[inline]
fn decode<T: AsRef<[u8]>>(_stream: T) -> Result<(Self, usize)> {
unimplemented!()
}
#[inline]
fn encode(&self) -> Result<Blob> {
match self {
QPacket::V5(pkt) => pkt.encode(),
}
}
}
impl QPacket {
#[inline]
pub fn set_packet_id(&mut self, packet_id: u16) {
match self {
QPacket::V5(pkt) => pkt.set_packet_id(packet_id),
}
}
#[inline]
pub fn set_retain(&mut self, retain: bool) {
match self {
QPacket::V5(pkt) => pkt.set_retain(retain),
}
}
#[inline]
pub fn set_fixed_header(&mut self, retain: bool, qos: QoS, dup: bool) {
match self {
QPacket::V5(pkt) => pkt.set_fixed_header(retain, qos, dup),
}
}
#[inline]
pub fn set_subscription_ids(&mut self, ids: Vec<u32>) {
match self {
QPacket::V5(pkt) => pkt.set_subscription_ids(ids),
}
}
#[inline]
pub fn set_session_present(&mut self, session_present: bool) {
match self {
QPacket::V5(pkt) => pkt.set_session_present(session_present),
}
}
}
impl QPacket {
#[inline]
pub fn to_packet_type(&self) -> PacketType {
match self {
QPacket::V5(pkt) => pkt.to_packet_type(),
}
}
#[inline]
pub fn to_packet_id(&self) -> Option<u16> {
match self {
QPacket::V5(pkt) => pkt.to_packet_id(),
}
}
#[inline]
pub fn to_qos(&self) -> QoS {
match self {
QPacket::V5(pkt) => pkt.to_qos(),
}
}
pub fn to_subscription_id(&self) -> Option<u32> {
match self {
QPacket::V5(pkt) => pkt.to_subscription_id(),
}
}
pub fn to_subscriptions(&self) -> Vec<Subscription> {
match self {
QPacket::V5(pkt) => pkt.to_subscriptions(),
}
}
pub fn to_unsubscriptions(&self, client_id: ClientID) -> Vec<Subscription> {
match self {
QPacket::V5(pkt) => pkt.to_unsubscriptions(client_id),
}
}
pub fn to_topic_alias(&self) -> Option<u16> {
match self {
QPacket::V5(pkt) => pkt.to_topic_alias(),
}
}
pub fn to_disconnect_code(&self) -> ReasonCode {
match self {
QPacket::V5(pkt) => pkt.to_disconnect_code(),
}
}
pub fn to_reason_string(&self) -> Option<String> {
match self {
QPacket::V5(pkt) => pkt.to_reason_string(),
}
}
pub fn as_topic_name(&self) -> &TopicName {
match self {
QPacket::V5(pkt) => pkt.as_topic_name(),
}
}
#[inline]
pub fn is_qos0(&self) -> bool {
match self {
QPacket::V5(pkt) => pkt.is_qos0(),
}
}
#[inline]
pub fn is_qos12(&self) -> bool {
match self {
QPacket::V5(pkt) => pkt.is_qos12(),
}
}
#[inline]
pub fn is_retain(&self) -> bool {
match self {
QPacket::V5(pkt) => pkt.is_retain(),
}
}
#[inline]
pub fn message_expiry_interval(&self) -> Option<u32> {
match self {
QPacket::V5(pkt) => pkt.message_expiry_interval(),
}
}
}