use embassy_time::Instant;
use embedded_io_async::Error as _;
use crate::packets::{DisconnectReq, PublishHeader, Subscribe, Unsubscribe};
use crate::types::{Properties, TopicFilter};
use crate::{Error, Op, Property, PubError, QoS, ResourceError, debug, info, warn};
use super::super::outbound::write_packet;
use super::{Io, Session};
impl<'buf, IO> Session<'buf, IO>
where
IO: Io,
{
pub async fn disconnect(&mut self) -> Result<(), Error<IO::Error>> {
let Some(connection) = self.connection.as_mut() else {
return Ok(());
};
info!("Graceful disconnect requested");
let mut buffer = [0u8; 9];
let result = write_packet(&mut buffer, connection, &DisconnectReq).await;
self.handle_disconnect();
result
}
pub async fn subscribe(
&mut self,
topics: &[TopicFilter<'_>],
properties: &[Property<'_>],
) -> Result<Op, Error<IO::Error>> {
if self.connection.is_none() {
return Err(Error::Disconnected);
}
if topics.is_empty() {
return Err(Error::InvalidRequest);
}
self.flush_outbound().await?;
self.require_retained_slot()?;
let packet_id = self.data.next_packet_id();
let (offset, len) = self.data.outbound.encode_packet(&Subscribe {
packet_id,
dup: false,
properties: Properties::Slice(properties),
topics,
})?;
self.runtime.require_packet_size(len)?;
self.data.outbound.retain_packet(packet_id, offset, len)?;
debug!(
"Enqueued SUBSCRIBE packet_id={=u16} len={=usize} tx_used={=usize}",
packet_id,
len,
self.data.outbound.used()
);
self.flush_outbound().await?;
Ok(Op::new(
super::super::OpKind::Subscribe,
packet_id,
self.data.generation(),
))
}
pub async fn unsubscribe(
&mut self,
topics: &[&str],
properties: &[Property<'_>],
) -> Result<Op, Error<IO::Error>> {
if self.connection.is_none() {
return Err(Error::Disconnected);
}
if topics.is_empty() {
return Err(Error::InvalidRequest);
}
self.flush_outbound().await?;
self.require_retained_slot()?;
let packet_id = self.data.next_packet_id();
let (offset, len) = self.data.outbound.encode_packet(&Unsubscribe {
packet_id,
dup: false,
properties: Properties::Slice(properties),
topics,
})?;
self.runtime.require_packet_size(len)?;
self.data.outbound.retain_packet(packet_id, offset, len)?;
debug!(
"Enqueued UNSUBSCRIBE packet_id={=u16} len={=usize} tx_used={=usize}",
packet_id,
len,
self.data.outbound.used()
);
self.flush_outbound().await?;
Ok(Op::new(
super::super::OpKind::Unsubscribe,
packet_id,
self.data.generation(),
))
}
pub async fn publish<P>(
&mut self,
publication: crate::publication::Publication<'_, P>,
) -> Result<Option<Op>, PubError<P::Error, IO::Error>>
where
P: crate::publication::ToPayload,
{
if self.connection.is_none() {
return Err(Error::Disconnected.into());
}
self.flush_outbound().await?;
let crate::publication::Publication {
topic,
properties,
qos,
payload,
retain,
} = publication;
let qos = match self.runtime.max_qos {
Some(max_qos) if self.downgrade_qos && qos > max_qos => max_qos,
_ => qos,
};
let packet_id = (qos > QoS::AtMostOnce).then(|| self.data.next_packet_id());
let header = PublishHeader {
topic: crate::types::Utf8String(topic),
packet_id,
properties,
retain,
qos,
dup: false,
};
if packet_id.is_some() {
self.require_retained_slot()?;
}
if !self.can_publish(qos) {
return Err(Error::NotReady.into());
}
if let Some(packet_id) = packet_id {
let (offset, len) = self.data.outbound.encode_publish(&header, payload)?;
self.runtime.require_packet_size(len)?;
self.data.outbound.retain_packet(packet_id, offset, len)?;
self.runtime.send_quota = self.runtime.send_quota.saturating_sub(1);
debug!(
"Enqueued PUBLISH packet_id={=u16} qos={} len={=usize} send_quota={=u16}/{=u16} tx_used={=usize}",
packet_id,
qos,
len,
self.runtime.send_quota,
self.runtime.max_send_quota,
self.data.outbound.used()
);
self.flush_outbound().await?;
let kind = if qos == QoS::ExactlyOnce {
super::super::OpKind::PublishExactlyOnce
} else {
super::super::OpKind::PublishAtLeastOnce
};
return Ok(Some(Op::new(kind, packet_id, self.data.generation())));
}
let packet = crate::ser::MqttSerializer::pub_to_buffer(
self.data.outbound.scratch_space(),
&header,
payload,
)?;
self.runtime.require_packet_size(packet.len())?;
debug!("Sending QoS0 PUBLISH len={=usize}", packet.len());
let connection = self.connection.as_mut().ok_or(Error::Disconnected)?;
if let Err(err) = crate::mqtt_client::outbound::write_all(connection, packet).await {
if matches!(err, Error::WriteZero) {
return Err(err.into());
}
warn!("QoS0 PUBLISH write failed");
self.handle_disconnect();
return Err(err.into());
}
if let Err(err) = connection.flush().await {
warn!("QoS0 PUBLISH flush failed: {}", err.kind());
self.handle_disconnect();
return Err(Error::Transport(err).into());
}
self.runtime.note_outbound_activity(Instant::now());
Ok(None)
}
pub(super) fn require_retained_slot(&self) -> Result<(), Error<IO::Error>> {
if self.data.outbound.retained_full() {
return Err(Error::Resource(ResourceError::InflightExhausted));
}
Ok(())
}
}