#![allow(clippy::let_underscore_future)]
use std::{fmt, marker::PhantomData, rc::Rc};
use ntex_io::IoBoxed;
use ntex_router::{IntoPattern, Router, RouterBuilder};
use ntex_service::{IntoService, Pipeline, Service, boxed, fn_service};
use ntex_util::future::{Either, Ready};
use ntex_util::time::{Millis, Seconds, sleep};
use crate::v3::{ProtocolMessageAck, Publish, codec, shared::MqttShared, sink::MqttSink};
use crate::v3::{Session, default::ControlService};
use crate::{control, error::MqttError, io::Dispatcher};
use super::{control::ProtocolMessage, dispatcher::create_dispatcher};
pub struct Client {
io: IoBoxed,
shared: Rc<MqttShared>,
keepalive: Seconds,
session_present: bool,
max_receive: usize,
max_buffer_size: usize,
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("v3::Client")
.field("keepalive", &self.keepalive)
.field("session_present", &self.session_present)
.field("max_receive", &self.max_receive)
.finish()
}
}
impl Client {
pub(super) fn new(
io: IoBoxed,
shared: Rc<MqttShared>,
session_present: bool,
keepalive: Seconds,
max_receive: usize,
max_buffer_size: usize,
) -> Self {
Client { io, shared, keepalive, session_present, max_receive, max_buffer_size }
}
}
impl Client {
#[inline]
pub fn sink(&self) -> MqttSink {
MqttSink::new(self.shared.clone())
}
#[inline]
pub fn session_present(&self) -> bool {
self.session_present
}
pub fn resource<T, F, U>(self, address: T, service: F) -> ClientRouter<U::Error, U::Error>
where
T: IntoPattern,
F: IntoService<U, Publish>,
U: Service<Publish, Response = ()> + 'static,
{
let mut builder = Router::build();
builder.path(address, 0);
let handlers = vec![Pipeline::new(boxed::service(service.into_service()))];
ClientRouter {
builder,
handlers,
io: self.io,
shared: self.shared,
keepalive: self.keepalive,
max_receive: self.max_receive,
max_buffer_size: self.max_buffer_size,
_t: PhantomData,
}
}
pub async fn start_default(self) {
if self.keepalive.non_zero() {
let _ =
ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
}
let dispatcher = create_dispatcher(
self.shared.clone(),
self.max_receive,
self.max_buffer_size,
fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
fn_service(|_: ProtocolMessage| Ready::<_, ()>::Ok(ProtocolMessage::disconnect())),
);
let control = ControlService::new(
control::DefaultControlService::<Session<()>, (), codec::Encoded>::default(),
self.shared.clone(),
);
let _ = Dispatcher::new(self.io, self.shared, dispatcher, control).await;
}
pub async fn start<F, S, E>(self, service: F) -> Result<(), MqttError<E>>
where
E: fmt::Debug + 'static,
F: IntoService<S, ProtocolMessage> + 'static,
S: Service<ProtocolMessage, Response = ProtocolMessageAck, Error = E> + 'static,
{
if self.keepalive.non_zero() {
let _ =
ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
}
let dispatcher = create_dispatcher(
self.shared.clone(),
self.max_receive,
self.max_buffer_size,
fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
service.into_service(),
);
let control = ControlService::new(
control::DefaultControlService::<Session<()>, E, codec::Encoded>::default(),
self.shared.clone(),
);
Dispatcher::new(self.io, self.shared, dispatcher, control).await
}
pub async fn start_with_control<F, S, E, C>(
self,
service: F,
control: C,
) -> Result<(), MqttError<C::Error>>
where
E: fmt::Debug + 'static,
F: IntoService<S, ProtocolMessage> + 'static,
S: Service<ProtocolMessage, Response = ProtocolMessageAck, Error = E> + 'static,
C: Service<control::Control<E>, Response = Option<codec::Encoded>> + 'static,
{
if self.keepalive.non_zero() {
let _ =
ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
}
let dispatcher = create_dispatcher(
self.shared.clone(),
self.max_receive,
self.max_buffer_size,
fn_service(|pkt| Ready::Ok(Either::Right(pkt))),
service.into_service(),
);
let control = ControlService::new(control, self.shared.clone());
Dispatcher::new(self.io, self.shared, dispatcher, control).await
}
pub fn into_inner(self) -> (IoBoxed, codec::Codec) {
(self.io, self.shared.codec.clone())
}
}
type Handler<E> = boxed::BoxService<Publish, (), E>;
pub struct ClientRouter<Err, PErr> {
builder: RouterBuilder<usize>,
handlers: Vec<Pipeline<Handler<PErr>>>,
io: IoBoxed,
shared: Rc<MqttShared>,
keepalive: Seconds,
max_receive: usize,
max_buffer_size: usize,
_t: PhantomData<Err>,
}
impl<Err, PErr> fmt::Debug for ClientRouter<Err, PErr> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("v3::ClientRouter")
.field("keepalive", &self.keepalive)
.field("max_receive", &self.max_receive)
.finish()
}
}
impl<Err, PErr> ClientRouter<Err, PErr>
where
Err: From<PErr> + fmt::Debug + 'static,
PErr: 'static,
{
#[must_use]
pub fn resource<T, F, S>(mut self, address: T, service: F) -> Self
where
T: IntoPattern,
F: IntoService<S, Publish>,
S: Service<Publish, Response = (), Error = PErr> + 'static,
{
self.builder.path(address, self.handlers.len());
self.handlers.push(Pipeline::new(boxed::service(service.into_service())));
self
}
pub async fn start_default(self) {
if self.keepalive.non_zero() {
let _ =
ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
}
let dispatcher = create_dispatcher(
self.shared.clone(),
self.max_receive,
self.max_buffer_size,
dispatch(self.builder.finish(), self.handlers),
fn_service(|_: ProtocolMessage| Ready::<_, Err>::Ok(ProtocolMessage::disconnect())),
);
let control = ControlService::new(
control::DefaultControlService::<Session<()>, Err, codec::Encoded>::default(),
self.shared.clone(),
);
let _ = Dispatcher::new(self.io, self.shared, dispatcher, control).await;
}
pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
where
F: IntoService<S, ProtocolMessage>,
S: Service<ProtocolMessage, Response = ProtocolMessageAck, Error = Err> + 'static,
{
if self.keepalive.non_zero() {
let _ =
ntex_util::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
}
let dispatcher = create_dispatcher(
self.shared.clone(),
self.max_receive,
self.max_buffer_size,
dispatch(self.builder.finish(), self.handlers),
service.into_service(),
);
let control = ControlService::new(
control::DefaultControlService::<Session<()>, Err, codec::Encoded>::default(),
self.shared.clone(),
);
Dispatcher::new(self.io, self.shared, dispatcher, control).await
}
}
fn dispatch<Err, PErr>(
router: Router<usize>,
handlers: Vec<Pipeline<Handler<PErr>>>,
) -> impl Service<Publish, Response = Either<(), Publish>, Error = Err>
where
PErr: 'static,
Err: From<PErr>,
{
let handlers = Rc::new(handlers);
fn_service(move |mut req: Publish| {
if let Some((idx, _info)) = router.recognize(req.topic_mut()) {
let idx = *idx;
let handlers = handlers.clone();
Either::Left(async move { call(req, handlers[idx].clone()).await })
} else {
Either::Right(Ready::<_, Err>::Ok(Either::Right(req)))
}
})
}
async fn call<S, Err, PErr>(req: Publish, srv: Pipeline<S>) -> Result<Either<(), Publish>, Err>
where
S: Service<Publish, Response = (), Error = PErr>,
Err: From<PErr>,
{
match srv.call(req).await {
Ok(()) => Ok(Either::Left(())),
Err(err) => Err(err.into()),
}
}
async fn keepalive(sink: MqttSink, timeout: Seconds) {
log::debug!("start mqtt client keep-alive task");
let keepalive = Millis::from(timeout);
loop {
sleep(keepalive).await;
if !sink.is_open() || !sink.ping() {
log::debug!("mqtt client connection is closed, stopping keep-alive task");
break;
}
}
}