use bytes::BytesMut;
use crate::{
client::{
builder::ClientBuilder,
value_types::{
KeepAlive,
Publish,
ReadResult,
Subscribe,
SubscribeResult,
Unsubscribe,
},
},
Error,
Result,
util::{
AsyncStream,
FreePidList,
TokioRuntime,
}
};
use futures_util::{
future::{
FutureExt,
pending,
},
select,
};
use log::{debug, error, info, trace};
use mqttrs::{
ConnectReturnCode,
Packet,
Pid,
QoS,
QosPid,
self,
SubscribeTopic,
};
#[cfg(feature = "tls")]
use rustls;
use std::{
cmp::min,
collections::BTreeMap,
fmt,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
Mutex,
},
};
use tokio::{
io::{
AsyncReadExt,
AsyncWriteExt,
},
net::TcpStream,
sync::{
mpsc,
oneshot,
},
time::{
sleep,
sleep_until,
Duration,
error::Elapsed,
Instant,
timeout,
},
};
#[cfg(feature = "tls")]
use tokio_rustls::{
self,
TlsConnector,
webpki::DNSNameRef,
};
pub struct Client {
options: ClientOptions,
io_task_handle: Option<IoTaskHandle>,
free_write_pids: Mutex<FreePidList>,
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("options", &self.options)
.finish()
}
}
#[derive(Clone)]
pub(crate) struct ClientOptions {
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) username: Option<String>,
pub(crate) password: Option<Vec<u8>>,
pub(crate) keep_alive: KeepAlive,
pub(crate) runtime: TokioRuntime,
pub(crate) client_id: Option<String>,
pub(crate) packet_buffer_len: usize,
pub(crate) max_packet_len: usize,
pub(crate) operation_timeout: Duration,
#[cfg(feature = "tls")]
pub(crate) tls_client_config: Option<Arc<rustls::ClientConfig>>,
pub(crate) automatic_connect: bool,
pub(crate) connect_retry_delay: Duration,
}
impl fmt::Debug for ClientOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientOptions")
.field("host", &self.host)
.field("port", &self.port)
.field("username", &self.username)
.field("keep_alive", &self.keep_alive)
.field("client_id", &self.client_id)
.field("packet_buffer_len", &self.packet_buffer_len)
.field("max_packet_len", &self.max_packet_len)
.field("operation_timeout", &self.operation_timeout)
.field("automatic_connect", &self.automatic_connect)
.field("connect_retry_delay", &self.connect_retry_delay)
.finish()
}
}
struct IoTaskHandle {
tx_io_requests: mpsc::Sender<IoRequest>,
rx_recv_published: mpsc::Receiver<Packet>,
halt: Arc<AtomicBool>,
}
struct IoTask {
options: ClientOptions,
rx_io_requests: mpsc::Receiver<IoRequest>,
tx_recv_published: mpsc::Sender<Packet>,
state: IoTaskState,
subscriptions: BTreeMap<String, QoS>,
halt: Arc<AtomicBool>,
}
enum IoTaskState {
Halted,
Disconnected,
Connected(IoTaskConnected),
}
struct IoTaskConnected {
stream: AsyncStream,
read_buf: BytesMut,
read_bufn: usize,
last_write_time: Instant,
last_pingreq_time: Instant,
last_pingresp_time: Instant,
pid_response_map: BTreeMap<Pid, IoRequest>,
}
#[derive(Debug)]
struct IoRequest {
tx_result: Option<oneshot::Sender<IoResult>>,
io_type: IoType,
}
#[derive(Debug)]
enum IoType {
WriteOnly { packet: Packet },
WriteAndResponse { packet: Packet, response_pid: Pid },
ShutdownConnection,
}
#[derive(Debug)]
struct IoResult {
result: Result<Option<Packet>>,
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub(crate) fn new(opts: ClientOptions) -> Result<Client> {
Ok(Client {
options: opts,
io_task_handle: None,
free_write_pids: Mutex::new(FreePidList::new()),
})
}
pub async fn connect(&mut self) -> Result<()> {
self.spawn_io_task()?;
Ok(())
}
fn spawn_io_task(&mut self) -> Result<()> {
self.check_no_io_task()?;
let (tx_io_requests, rx_io_requests) =
mpsc::channel::<IoRequest>(self.options.packet_buffer_len);
let (tx_recv_published, rx_recv_published) =
mpsc::channel::<Packet>(self.options.packet_buffer_len);
let halt = Arc::new(AtomicBool::new(false));
self.io_task_handle = Some(IoTaskHandle {
tx_io_requests,
rx_recv_published,
halt: halt.clone(),
});
let io = IoTask {
options: self.options.clone(),
rx_io_requests,
tx_recv_published,
state: IoTaskState::Disconnected,
subscriptions: BTreeMap::new(),
halt: halt,
};
self.options.runtime.spawn(io.run());
Ok(())
}
pub async fn publish(&self, p: &Publish) -> Result<()> {
let qos = p.qos();
if qos == QoS::ExactlyOnce {
return Err("QoS::ExactlyOnce is not supported".into());
}
let p2 = Packet::Publish(mqttrs::Publish {
dup: false,
qospid: match qos {
QoS::AtMostOnce => QosPid::AtMostOnce,
QoS::AtLeastOnce => QosPid::AtLeastOnce(self.alloc_write_pid()?),
QoS::ExactlyOnce => panic!("Not reached"),
},
retain: p.retain(),
topic_name: p.topic().to_owned(),
payload: p.payload().to_owned(),
});
match qos {
QoS::AtMostOnce => {
let res = timeout(self.options.operation_timeout,
self.write_only_packet(&p2)).await;
if let Err(Elapsed { .. }) = res {
return Err(format!("Timeout writing publish after {}ms",
self.options.operation_timeout.as_millis()).into());
}
res.expect("No timeout")?;
}
QoS::AtLeastOnce => {
let res = timeout(self.options.operation_timeout,
self.write_response_packet(&p2)).await;
if let Err(Elapsed { .. }) = res {
return Err(format!("Timeout waiting for Puback after {}ms",
self.options.operation_timeout.as_millis()).into());
}
let res = res.expect("No timeout")?;
match res {
Packet::Puback(pid) => self.free_write_pid(pid)?,
_ => error!("Bad packet response for publish: {:#?}", res),
}
},
QoS::ExactlyOnce => panic!("Not reached"),
};
Ok(())
}
pub async fn subscribe(&mut self, s: Subscribe) -> Result<SubscribeResult> {
let pid = self.alloc_write_pid()?;
if s.topics().iter().any(|t| t.qos == QoS::ExactlyOnce) {
return Err("Qos::ExactlyOnce is not supported right now".into())
}
let p = Packet::Subscribe(mqttrs::Subscribe {
pid: pid,
topics: s.topics().to_owned(),
});
let res = timeout(self.options.operation_timeout, self.write_response_packet(&p)).await;
if let Err(Elapsed { .. }) = res {
return Err(format!("Timeout waiting for Suback after {}ms",
self.options.operation_timeout.as_millis()).into());
}
let res = res.expect("No timeout")?;
match res {
Packet::Suback(mqttrs::Suback {
pid: suback_pid,
return_codes: rcs,
}) if suback_pid == pid => {
self.free_write_pid(pid)?;
Ok(SubscribeResult {
return_codes: rcs
})
},
_ => {
return Err(format!("Unexpected packet waiting for Suback(Pid={:?}): {:#?}",
pid, res)
.into());
}
}
}
pub async fn unsubscribe(&mut self, u: Unsubscribe) -> Result<()> {
let pid = self.alloc_write_pid()?;
let p = Packet::Unsubscribe(mqttrs::Unsubscribe {
pid: pid,
topics: u.topics().iter().map(|ut| ut.topic_name().to_owned())
.collect::<Vec<String>>(),
});
let res = timeout(self.options.operation_timeout, self.write_response_packet(&p)).await;
if let Err(Elapsed { .. }) = res {
return Err(format!("Timeout waiting for Unsuback after {}ms",
self.options.operation_timeout.as_millis()).into());
}
let res = res.expect("No timeout")?;
match res {
Packet::Unsuback(ack_pid)
if ack_pid == pid => {
self.free_write_pid(pid)?;
Ok(())
},
_ => {
return Err(format!("Unexpected packet waiting for Unsuback(Pid={:?}): {:#?}",
pid, res)
.into());
}
}
}
pub async fn read_subscriptions(&mut self) -> Result<ReadResult> {
let h = self.check_io_task_mut()?;
let r = match h.rx_recv_published.recv().await {
Some(r) => r,
None => {
self.io_task_handle = None;
return Err(Error::Disconnected);
}
};
match r {
Packet::Publish(p) => {
match p.qospid {
QosPid::AtMostOnce => (),
QosPid::AtLeastOnce(pid) => {
self.write_only_packet(&Packet::Puback(pid)).await?;
},
QosPid::ExactlyOnce(_) => {
error!("Received publish with unimplemented QoS: ExactlyOnce");
}
}
let rr = ReadResult {
topic: p.topic_name,
payload: p.payload,
};
Ok(rr)
},
_ => {
return Err(format!("Unexpected packet waiting for read: {:#?}", r).into());
}
}
}
pub async fn disconnect(&mut self) -> Result<()> {
self.check_io_task()?;
debug!("Disconnecting");
let p = Packet::Disconnect;
let res = timeout(self.options.operation_timeout,
self.write_only_packet(&p)).await;
if let Err(Elapsed { .. }) = res {
return Err(format!("Timeout waiting for Disconnect to send after {}ms",
self.options.operation_timeout.as_millis()).into());
}
res.expect("No timeout")?;
self.shutdown().await?;
Ok(())
}
fn alloc_write_pid(&self) -> Result<Pid> {
match self.free_write_pids.lock().expect("not poisoned").alloc() {
Some(pid) => Ok(Pid::try_from(pid).expect("Non-zero Pid")),
None => Err(Error::from("No free Pids")),
}
}
fn free_write_pid(&self, p: Pid) -> Result<()> {
match self.free_write_pids.lock().expect("not poisoned").free(p.get()) {
true => Err(Error::from("Pid was already free")),
false => Ok(())
}
}
async fn shutdown(&mut self) -> Result <()> {
let c = self.check_io_task()?;
c.halt.store(true, Ordering::SeqCst);
self.write_request(IoType::ShutdownConnection).await?;
self.io_task_handle = None;
Ok(())
}
async fn write_only_packet(&self, p: &Packet) -> Result<()> {
self.write_request(IoType::WriteOnly { packet: p.clone(), })
.await.map(|_v| ())
}
async fn write_response_packet(&self, p: &Packet) -> Result<Packet> {
let io_type = IoType::WriteAndResponse {
packet: p.clone(),
response_pid: packet_pid(p).expect("packet_pid"),
};
self.write_request(io_type)
.await.map(|v| v.expect("return packet"))
}
async fn write_request(&self, io_type: IoType) -> Result<Option<Packet>> {
let c = self.check_io_task()?;
let (tx, rx) = oneshot::channel::<IoResult>();
let req = IoRequest {
tx_result: Some(tx),
io_type: io_type,
};
c.tx_io_requests.clone().send(req).await
.map_err(|e| Error::from_std_err(e))?;
let res = rx.await
.map_err(|e| Error::from_std_err(e))?;
res.result
}
fn check_io_task_mut(&mut self) -> Result<&mut IoTaskHandle> {
match self.io_task_handle {
Some(ref mut h) => Ok(h),
None => Err("No IO task, did you call connect?".into()),
}
}
fn check_io_task(&self) -> Result<&IoTaskHandle> {
match self.io_task_handle {
Some(ref h) => Ok(h),
None => Err("No IO task, did you call connect?".into()),
}
}
fn check_no_io_task(&self) -> Result<()> {
match self.io_task_handle {
Some(_) => Err("Already spawned IO task".into()),
None => Ok(()),
}
}
}
async fn connect_stream(opts: &ClientOptions) -> Result<AsyncStream> {
debug!("Connecting to {}:{}", opts.host, opts.port);
#[cfg(feature = "tls")]
match opts.tls_client_config {
Some(ref c) => {
let connector = TlsConnector::from(c.clone());
let domain = DNSNameRef::try_from_ascii_str(&*opts.host)
.map_err(|e| Error::from_std_err(e))?;
let tcp = TcpStream::connect((&*opts.host, opts.port)).await?;
let conn = connector.connect(domain, tcp).await?;
Ok(AsyncStream::TlsStream(conn))
},
None => {
let tcp = TcpStream::connect((&*opts.host, opts.port)).await?;
Ok(AsyncStream::TcpStream(tcp))
}
}
#[cfg(not(feature = "tls"))]
{
let tcp = TcpStream::connect((&*opts.host, opts.port)).await?;
Ok(AsyncStream::TcpStream(tcp))
}
}
fn connect_packet(opts: &ClientOptions) -> Result<Packet> {
Ok(Packet::Connect(mqttrs::Connect {
protocol: mqttrs::Protocol::MQTT311,
keep_alive: match opts.keep_alive {
KeepAlive::Disabled => 0,
KeepAlive::Enabled { secs } => secs,
},
client_id: match &opts.client_id {
None => "".to_owned(),
Some(cid) => cid.to_owned(),
},
clean_session: true,
last_will: None,
username: opts.username.clone(),
password: opts.password.clone(),
}))
}
fn packet_pid(p: &Packet) -> Option<Pid> {
match p {
Packet::Connect(_) => None,
Packet::Connack(_) => None,
Packet::Publish(publish) => publish.qospid.pid(),
Packet::Puback(pid) => Some(pid.to_owned()),
Packet::Pubrec(pid) => Some(pid.to_owned()),
Packet::Pubrel(pid) => Some(pid.to_owned()),
Packet::Pubcomp(pid) => Some(pid.to_owned()),
Packet::Subscribe(sub) => Some(sub.pid),
Packet::Suback(suback) => Some(suback.pid),
Packet::Unsubscribe(unsub) => Some(unsub.pid),
Packet::Unsuback(pid) => Some(pid.to_owned()),
Packet::Pingreq => None,
Packet::Pingresp => None,
Packet::Disconnect => None,
}
}
enum SelectResult {
IoReq(Option<IoRequest>),
Read(Result<Packet>),
Ping,
PingrespExpected,
}
impl IoTask {
async fn run(mut self) {
loop {
if self.halt.load(Ordering::SeqCst) {
self.shutdown_conn().await;
debug!("IoTask: halting by request.");
self.state = IoTaskState::Halted;
return;
}
match self.state {
IoTaskState::Halted => return,
IoTaskState::Disconnected =>
match Self::try_connect(&mut self).await {
Err(e) => {
error!("IoTask: Error connecting: {}", e);
if self.options.automatic_connect {
sleep(self.options.connect_retry_delay).await;
} else {
info!("IoTask: halting due to connection failure, auto connect is off.");
self.state = IoTaskState::Halted;
return;
}
},
Ok(()) => {
if let Err(e) = Self::replay_subscriptions(&mut self).await {
error!("IoTask: Error replaying subscriptions on reconnect: {}",
e);
}
},
},
IoTaskState::Connected(_) =>
match Self::run_once_connected(&mut self).await {
Err(Error::Disconnected) => {
info!("IoTask: Disconnected, resetting state");
self.state = IoTaskState::Disconnected;
},
Err(e) => {
error!("IoTask: Quitting run loop due to error: {}", e);
return;
},
_ => {},
},
}
}
}
async fn try_connect(&mut self) -> Result<()> {
let stream = connect_stream(&self.options).await?;
self.state = IoTaskState::Connected(IoTaskConnected {
stream: stream,
read_buf: BytesMut::with_capacity(self.options.max_packet_len),
read_bufn: 0,
last_write_time: Instant::now(),
last_pingreq_time: Instant::now(),
last_pingresp_time: Instant::now(),
pid_response_map: BTreeMap::new(),
});
let c = match self.state {
IoTaskState::Connected(ref mut c) => c,
_ => panic!("Not reached"),
};
let conn = connect_packet(&self.options)?;
debug!("IoTask: Sending connect packet");
Self::write_packet(&self.options, c, &conn).await?;
let read = Self::read_packet(&mut c.stream,
&mut c.read_buf,
&mut c.read_bufn,
self.options.max_packet_len);
let res = match timeout(self.options.operation_timeout,
read).await {
Err(Elapsed { .. }) =>
Err(format!("Timeout waiting for Connack after {}ms",
self.options.operation_timeout.as_millis()).into()),
Ok(Err(e)) => Err(e),
Ok(Ok(Packet::Connack(ca))) => {
match ca.code {
ConnectReturnCode::Accepted => {
debug!("IoTask: connack with code=Accepted.");
Ok(())
},
_ => Err(format!("Bad connect return code: {:?}", ca.code).into()),
}
},
Ok(Ok(p)) =>
Err(format!("Received packet not CONNACK after connect: {:?}", p).into()),
};
match res {
Ok(()) => Ok(()),
Err(e) => {
self.shutdown_conn().await;
Err(e)
},
}
}
async fn shutdown_conn(&mut self) {
debug!("IoTask: shutdown_conn");
let c = match self.state {
IoTaskState::Disconnected |
IoTaskState::Halted => return,
IoTaskState::Connected(ref mut c) => c,
};
if let Err(e) = c.stream.shutdown().await {
if e.kind() != std::io::ErrorKind::NotConnected {
error!("IoTask: Error on stream shutdown in shutdown_conn: {:?}", e);
}
}
self.state = IoTaskState::Disconnected;
}
async fn replay_subscriptions(&mut self) -> Result<()> {
let subs = self.subscriptions.clone();
for (t, qos) in subs.iter() {
trace!("Replaying subscription topic='{}' qos={:?}", t, qos);
let pid = Pid::try_from(65535).expect("non-zero pid");
let p = Packet::Subscribe(mqttrs::Subscribe {
pid,
topics: vec![SubscribeTopic { topic_path: t.to_owned(), qos: qos.to_owned() }]
});
let req = IoRequest {
io_type: IoType::WriteAndResponse { packet: p, response_pid: pid },
tx_result: None,
};
self.handle_io_req(req).await?;
}
Ok(())
}
async fn run_once_connected(&mut self) -> Result<()> {
let c = match self.state {
IoTaskState::Connected(ref mut c) => c,
_ => panic!("Not reached"),
};
let pingreq_next = self.options.keep_alive.as_duration()
.map(|dur| c.last_write_time + dur);
let pingresp_expected_by =
if self.options.keep_alive.is_enabled() &&
c.last_pingreq_time > c.last_pingresp_time
{
let ka = self.options.keep_alive.as_duration().expect("enabled");
Some(c.last_pingreq_time + min(self.options.operation_timeout, ka))
} else {
None
};
let sel_res: SelectResult = {
let mut req_fut = Box::pin(self.rx_io_requests.recv().fuse());
let mut read_fut = Box::pin(
Self::read_packet(&mut c.stream, &mut c.read_buf, &mut c.read_bufn,
self.options.max_packet_len).fuse());
let mut ping_fut = match pingreq_next {
Some(t) => Box::pin(sleep_until(t).boxed().fuse()),
None => Box::pin(pending().boxed().fuse()),
};
let mut pingresp_expected_fut = match pingresp_expected_by {
Some(t) => Box::pin(sleep_until(t).boxed().fuse()),
None => Box::pin(pending().boxed().fuse()),
};
select! {
req = req_fut => SelectResult::IoReq(req),
read = read_fut => SelectResult::Read(read),
_ = ping_fut => SelectResult::Ping,
_ = pingresp_expected_fut => SelectResult::PingrespExpected,
}
};
match sel_res {
SelectResult::Read(read) => return self.handle_read(read).await,
SelectResult::IoReq(req) => match req {
None => {
debug!("IoTask: Req stream closed, shutting down.");
self.shutdown_conn().await;
return Err(Error::Disconnected);
},
Some(req) => return self.handle_io_req(req).await,
},
SelectResult::Ping => return self.send_ping().await,
SelectResult::PingrespExpected => {
debug!("IoTask: Timed out waiting for Pingresp, shutting down.");
self.shutdown_conn().await;
return Err(Error::Disconnected);
}
}
}
async fn handle_read(&mut self, read: Result<Packet>) -> Result<()> {
let c = match self.state {
IoTaskState::Connected(ref mut c) => c,
_ => panic!("Not reached"),
};
match read {
Err(Error::Disconnected) => {
return Err(Error::Disconnected);
}
Err(e) => {
error!("IoTask: Failed to read packet: {:?}", e);
},
Ok(p) => {
match p {
Packet::Pingresp => {
debug!("IoTask: Received Pingresp");
c.last_pingresp_time = Instant::now();
},
Packet::Publish(_) => {
if let Err(e) = self.tx_recv_published.send(p).await {
error!("IoTask: Failed to send Packet: {:?}", e);
}
},
Packet::Connack(_) => {
error!("IoTask: Unexpected CONNACK in handle_read(): {:?}", p);
self.shutdown_conn().await;
return Err(Error::Disconnected);
}
_ => {
let pid = packet_pid(&p);
if let Some(pid) = pid {
let pid_response = c.pid_response_map.remove(&pid);
match pid_response {
None => error!("Unknown PID: {:?}", pid),
Some(req) => {
trace!("Sending response PID={:?} p={:?}",
pid, p);
let res = IoResult { result: Ok(Some(p)) };
Self::send_io_result(req, res)?;
},
}
}
},
}
},
}
Ok(())
}
async fn handle_io_req(&mut self, req: IoRequest) -> Result<()> {
let c = match self.state {
IoTaskState::Connected(ref mut c) => c,
_ => panic!("Not reached"),
};
let packet = req.io_type.packet();
if let Some(p) = packet {
c.last_write_time = Instant::now();
let res = Self::write_packet(&self.options, c, &p).await;
if let Err(e) = res {
error!("IoTask: Error writing packet: {:?}", e);
let res = IoResult { result: Err(e) };
Self::send_io_result(req, res)?;
return Ok(())
}
match p {
Packet::Subscribe(s) => {
for st in s.topics.iter() {
trace!("Tracking subscription topic='{}', qos={:?}",
st.topic_path, st.qos);
let _ = self.subscriptions.insert(st.topic_path.clone(), st.qos);
}
},
Packet::Unsubscribe(u) => {
for t in u.topics.iter() {
trace!("Tracking unsubscription topic='{}'", t);
let _ = self.subscriptions.remove(t);
}
},
_ => {},
}
match req.io_type {
IoType::WriteOnly { .. } => {
let res = IoResult { result: res.map(|_| None) };
Self::send_io_result(req, res)?;
},
IoType::WriteAndResponse { response_pid, .. } => {
c.pid_response_map.insert(response_pid, req);
},
IoType::ShutdownConnection => {
panic!("Not reached because ShutdownConnection has no packet")
},
}
} else {
match req.io_type {
IoType::ShutdownConnection => {
debug!("IoTask: IoType::ShutdownConnection.");
self.shutdown_conn().await;
let res = IoResult { result: Ok(None) };
Self::send_io_result(req, res)?;
return Err(Error::Disconnected);
}
_ => (),
}
}
Ok(())
}
fn send_io_result(req: IoRequest, res: IoResult) -> Result<()> {
match req.tx_result {
Some(tx) => {
if let Err(e) = tx.send(res) {
error!("IoTask: Failed to send IoResult={:?}", e);
}
},
None => {
debug!("IoTask: Ignored IoResult: {:?}", res);
},
}
Ok(())
}
async fn send_ping(&mut self) -> Result<()> {
let c = match self.state {
IoTaskState::Connected(ref mut c) => c,
_ => panic!("Not reached"),
};
debug!("IoTask: Writing Pingreq");
c.last_write_time = Instant::now();
c.last_pingreq_time = Instant::now();
let p = Packet::Pingreq;
if let Err(e) = Self::write_packet(&self.options, c, &p).await {
error!("IoTask: Failed to write ping: {:?}", e);
}
Ok(())
}
async fn write_packet(
opts: &ClientOptions,
c: &mut IoTaskConnected,
p: &Packet,
) -> Result<()> {
if cfg!(feature = "unsafe-logging") {
trace!("write_packet p={:#?}", p);
}
let mut bytes = BytesMut::with_capacity(opts.max_packet_len);
mqttrs::encode(&p, &mut bytes)?;
if cfg!(feature = "unsafe-logging") {
trace!("write_packet bytes p={:?}", &*bytes);
}
c.stream.write_all(&*bytes).await?;
Ok(())
}
async fn read_packet(
stream: &mut AsyncStream,
read_buf: &mut BytesMut,
read_bufn: &mut usize,
max_packet_len: usize
) -> Result<Packet> {
loop {
if cfg!(feature = "unsafe-logging") {
trace!("read_packet Decoding buf={:?}", &read_buf[0..*read_bufn]);
}
if *read_bufn > 0 {
read_buf.split_off(*read_bufn);
let old_len = read_buf.len();
let decoded = mqttrs::decode(read_buf)?;
if cfg!(feature = "unsafe-logging") {
trace!("read_packet decoded={:#?}", decoded);
}
if let Some(p) = decoded {
let new_len = read_buf.len();
trace!("read_packet old_len={} new_len={} read_bufn={}",
old_len, new_len, *read_bufn);
*read_bufn -= old_len - new_len;
if cfg!(feature = "unsafe-logging") {
trace!("read_packet Remaining buf={:?}", &read_buf[0..*read_bufn]);
}
return Ok(p);
}
}
read_buf.resize(max_packet_len, 0u8);
let readlen = read_buf.len();
trace!("read_packet read read_bufn={} readlen={}", *read_bufn, readlen);
let nread = stream.read(&mut read_buf[*read_bufn..readlen]).await?;
*read_bufn += nread;
if nread == 0 {
error!("IoTask: Socket disconnected");
return Err(Error::Disconnected);
}
}
}
}
impl IoType {
fn packet(&self) -> Option<&Packet> {
match self {
IoType::ShutdownConnection => None,
IoType::WriteOnly { packet } => Some(&packet),
IoType::WriteAndResponse { packet, .. } => Some(&packet),
}
}
}
#[cfg(test)]
mod test {
use super::Client;
#[test]
fn client_is_send() {
let c = Client::builder().set_host("localhost".to_owned()).build().unwrap();
let _s: &dyn Send = &c;
}
}