mod drive;
mod handshake;
mod inbound;
mod operations;
mod state;
#[cfg(test)]
mod tests;
use crate::de::PacketReader;
use crate::ser::MAX_FIXED_HEADER_SIZE;
use crate::types::Auth;
use crate::{ConfigBuilder, Op, OpStatus, QoS, Will};
use heapless::String;
use super::Io;
use state::{RuntimeState, SessionData};
pub struct Session<'buf, IO> {
connection: Option<IO>,
client_id: String<64>,
packet_reader: PacketReader<'buf>,
data: SessionData<'buf>,
runtime: RuntimeState,
will: Option<Will<'buf>>,
auth: Option<Auth<'buf>>,
session_expiry_interval: u32,
downgrade_qos: bool,
}
impl<'buf, IO> Session<'buf, IO>
where
IO: Io,
{
pub fn new(config: ConfigBuilder<'buf>) -> Self {
let (
buffers,
will,
client_id,
keepalive_interval,
session_expiry_interval,
downgrade_qos,
auth,
) = config.into_parts();
let (rx, tx) = buffers.into_parts();
Self {
connection: None,
client_id,
packet_reader: PacketReader::new(rx),
data: SessionData::new(tx),
runtime: RuntimeState::new(keepalive_interval),
will,
auth,
session_expiry_interval,
downgrade_qos,
}
}
pub fn is_connected(&self) -> bool {
self.connection.is_some()
}
pub fn can_publish(&mut self, qos: QoS) -> bool {
self.connection.is_some()
&& if qos == QoS::AtMostOnce {
self.data.outbound.scratch_space().len() >= MAX_FIXED_HEADER_SIZE
} else {
self.runtime.send_quota != 0 && self.data.outbound.can_retain()
}
}
pub fn is_publish_quiescent(&self) -> bool {
self.data.outbound.is_quiescent()
}
pub fn status(&self, op: &Op) -> OpStatus {
if op.generation != self.data.generation() {
return OpStatus::Invalidated;
}
let pending = match op.kind {
super::OpKind::PublishAtLeastOnce
| super::OpKind::Subscribe
| super::OpKind::Unsubscribe => self.data.outbound.has_retained(op.packet_id),
super::OpKind::PublishExactlyOnce => {
self.data.outbound.has_retained(op.packet_id)
|| self.data.outbound.has_pending_release(op.packet_id)
}
};
if pending {
OpStatus::Pending
} else {
OpStatus::Complete
}
}
}