use ntex_bytes::ByteString;
use std::{io, marker::PhantomData, num::NonZeroU16, ptr};
use crate::{error, types::QoS, v3::codec};
#[derive(Debug)]
pub enum Control<E> {
Protocol(CtlFrame),
Flow(CtlFlow),
Stop(CtlReason<E>),
Shutdown(Shutdown),
}
#[derive(Debug)]
pub enum CtlFrame {
PublishRelease(PublishRelease),
Subscribe(Subscribe),
Unsubscribe(Unsubscribe),
Disconnect(Disconnect),
}
#[derive(Debug)]
pub enum CtlFlow {
Ping(Ping),
WrBackpressure(WrBackpressure),
}
#[derive(Debug)]
pub enum CtlReason<E> {
Error(Error<E>),
ProtocolError(ProtocolError),
PeerGone(PeerGone),
}
#[derive(Debug)]
pub struct ControlAck {
pub(crate) result: ControlAckKind,
}
#[derive(Debug)]
pub(crate) enum ControlAckKind {
Nothing,
PublishAck(NonZeroU16),
PublishRelease(NonZeroU16),
Ping,
Disconnect,
Subscribe(SubscribeResult),
Unsubscribe(UnsubscribeResult),
Closed,
}
impl<E> Control<E> {
pub(crate) fn pubrel(packet_id: NonZeroU16) -> Self {
Control::Protocol(CtlFrame::PublishRelease(PublishRelease { packet_id }))
}
#[doc(hidden)]
pub fn ping() -> Self {
Control::Flow(CtlFlow::Ping(Ping))
}
#[doc(hidden)]
pub fn subscribe(pkt: Subscribe) -> Self {
Control::Protocol(CtlFrame::Subscribe(pkt))
}
#[doc(hidden)]
pub fn unsubscribe(pkt: Unsubscribe) -> Self {
Control::Protocol(CtlFrame::Unsubscribe(pkt))
}
#[doc(hidden)]
pub fn remote_disconnect() -> Self {
Control::Protocol(CtlFrame::Disconnect(Disconnect))
}
pub(super) const fn shutdown() -> Self {
Control::Shutdown(Shutdown)
}
pub(super) const fn wr_backpressure(enabled: bool) -> Self {
Control::Flow(CtlFlow::WrBackpressure(WrBackpressure(enabled)))
}
pub(super) fn error(err: E) -> Self {
Control::Stop(CtlReason::Error(Error::new(err)))
}
pub(super) fn spec(err: error::SpecViolation) -> Self {
Control::Stop(CtlReason::ProtocolError(ProtocolError::new(error::ProtocolError::spec(
err,
))))
}
pub(super) fn proto_error(err: error::ProtocolError) -> Self {
Control::Stop(CtlReason::ProtocolError(ProtocolError::new(err)))
}
pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
Control::Stop(CtlReason::PeerGone(PeerGone(err)))
}
#[inline]
pub fn disconnect(&self) -> ControlAck {
ControlAck { result: ControlAckKind::Disconnect }
}
#[inline]
pub fn ack(self) -> ControlAck {
match self {
Control::Protocol(CtlFrame::PublishRelease(msg)) => msg.ack(),
Control::Protocol(CtlFrame::Disconnect(msg)) => msg.ack(),
Control::Protocol(CtlFrame::Subscribe(_)) => {
log::warn!("Subscribe is not supported");
ControlAck { result: ControlAckKind::Disconnect }
}
Control::Protocol(CtlFrame::Unsubscribe(_)) => {
log::warn!("Unsubscribe is not supported");
ControlAck { result: ControlAckKind::Disconnect }
}
Control::Flow(CtlFlow::Ping(msg)) => msg.ack(),
Control::Flow(CtlFlow::WrBackpressure(msg)) => msg.ack(),
Control::Stop(CtlReason::Error(msg)) => msg.ack(),
Control::Stop(CtlReason::ProtocolError(msg)) => msg.ack(),
Control::Stop(CtlReason::PeerGone(msg)) => msg.ack(),
Control::Shutdown(msg) => msg.ack(),
}
}
}
impl CtlFlow {
pub fn ack(self) -> ControlAck {
match self {
CtlFlow::Ping(msg) => msg.ack(),
CtlFlow::WrBackpressure(msg) => msg.ack(),
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct PublishRelease {
pub packet_id: NonZeroU16,
}
impl PublishRelease {
#[inline]
pub fn id(self) -> NonZeroU16 {
self.packet_id
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::PublishRelease(self.packet_id) }
}
}
#[derive(Copy, Clone, Debug)]
pub struct Ping;
impl Ping {
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Ping }
}
}
#[derive(Copy, Clone, Debug)]
pub struct Disconnect;
impl Disconnect {
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Disconnect }
}
}
#[derive(Debug)]
pub struct Error<E> {
err: E,
}
impl<E> Error<E> {
#[inline]
pub fn new(err: E) -> Self {
Self { err }
}
#[inline]
pub fn get_ref(&self) -> &E {
&self.err
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Disconnect }
}
#[inline]
pub fn ack_and_error(self) -> (ControlAck, E) {
(ControlAck { result: ControlAckKind::Disconnect }, self.err)
}
}
#[derive(Debug, Clone)]
pub struct ProtocolError {
err: error::ProtocolError,
}
impl ProtocolError {
#[inline]
pub fn new(err: error::ProtocolError) -> Self {
Self { err }
}
#[inline]
pub fn get_ref(&self) -> &error::ProtocolError {
&self.err
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Disconnect }
}
#[inline]
pub fn ack_and_error(self) -> (ControlAck, error::ProtocolError) {
(ControlAck { result: ControlAckKind::Disconnect }, self.err)
}
}
#[derive(Debug, Clone)]
pub struct Subscribe {
packet_id: NonZeroU16,
packet_size: u32,
topics: Vec<(ByteString, QoS)>,
codes: Vec<codec::SubscribeReturnCode>,
}
#[derive(Debug, Clone)]
pub(crate) struct SubscribeResult {
pub(crate) codes: Vec<codec::SubscribeReturnCode>,
pub(crate) packet_id: NonZeroU16,
}
impl Subscribe {
#[inline]
#[doc(hidden)]
pub fn new(
packet_id: NonZeroU16,
packet_size: u32,
topics: Vec<(ByteString, QoS)>,
) -> Self {
let mut codes = Vec::with_capacity(topics.len());
(0..topics.len()).for_each(|_| codes.push(codec::SubscribeReturnCode::Failure));
Self { packet_id, packet_size, topics, codes }
}
#[inline]
pub fn packet_size(&self) -> u32 {
self.packet_size
}
#[inline]
pub fn iter_mut(&mut self) -> SubscribeIter<'_> {
SubscribeIter { subs: ptr::from_ref(self).cast_mut(), entry: 0, lt: PhantomData }
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck {
result: ControlAckKind::Subscribe(SubscribeResult {
codes: self.codes,
packet_id: self.packet_id,
}),
}
}
}
impl<'a> IntoIterator for &'a mut Subscribe {
type Item = Subscription<'a>;
type IntoIter = SubscribeIter<'a>;
fn into_iter(self) -> SubscribeIter<'a> {
self.iter_mut()
}
}
pub struct SubscribeIter<'a> {
subs: *mut Subscribe,
entry: usize,
lt: PhantomData<&'a mut Subscribe>,
}
impl<'a> SubscribeIter<'a> {
fn next_unsafe(&mut self) -> Option<Subscription<'a>> {
let subs = unsafe { &mut *self.subs };
if self.entry < subs.topics.len() {
let s = Subscription {
topic: &subs.topics[self.entry].0,
qos: subs.topics[self.entry].1,
code: &mut subs.codes[self.entry],
};
self.entry += 1;
Some(s)
} else {
None
}
}
}
impl<'a> Iterator for SubscribeIter<'a> {
type Item = Subscription<'a>;
#[inline]
fn next(&mut self) -> Option<Subscription<'a>> {
self.next_unsafe()
}
}
#[derive(Debug)]
pub struct Subscription<'a> {
topic: &'a ByteString,
qos: QoS,
code: &'a mut codec::SubscribeReturnCode,
}
impl<'a> Subscription<'a> {
#[inline]
pub fn topic(&self) -> &'a ByteString {
self.topic
}
#[inline]
pub fn qos(&self) -> QoS {
self.qos
}
#[inline]
pub fn fail(&mut self) {
*self.code = codec::SubscribeReturnCode::Failure;
}
#[inline]
pub fn confirm(&mut self, qos: QoS) {
*self.code = codec::SubscribeReturnCode::Success(qos);
}
#[inline]
#[doc(hidden)]
pub fn subscribe(&mut self, qos: QoS) {
self.confirm(qos);
}
}
#[derive(Debug, Clone)]
pub struct Unsubscribe {
packet_id: NonZeroU16,
packet_size: u32,
topics: Vec<ByteString>,
}
#[derive(Debug, Copy, Clone)]
pub(crate) struct UnsubscribeResult {
pub(crate) packet_id: NonZeroU16,
}
impl Unsubscribe {
#[inline]
#[doc(hidden)]
pub fn new(packet_id: NonZeroU16, packet_size: u32, topics: Vec<ByteString>) -> Self {
Self { packet_id, packet_size, topics }
}
#[inline]
pub fn packet_size(&self) -> u32 {
self.packet_size
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
self.topics.iter()
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck {
result: ControlAckKind::Unsubscribe(UnsubscribeResult {
packet_id: self.packet_id,
}),
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct WrBackpressure(bool);
impl WrBackpressure {
#[inline]
pub fn enabled(&self) -> bool {
self.0
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Nothing }
}
}
#[derive(Debug, Copy, Clone)]
pub struct Shutdown;
impl Shutdown {
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Closed }
}
}
#[derive(Debug)]
pub struct PeerGone(pub(super) Option<io::Error>);
impl PeerGone {
#[inline]
pub fn err(&self) -> Option<&io::Error> {
self.0.as_ref()
}
#[inline]
pub fn take(&mut self) -> Option<io::Error> {
self.0.take()
}
#[inline]
pub fn ack(self) -> ControlAck {
ControlAck { result: ControlAckKind::Nothing }
}
}