#[cfg(not(target_arch = "wasm32"))]
pub mod file_backend;
pub mod memory_backend;
pub mod queue;
pub mod retained;
pub mod sessions;
#[cfg(not(target_arch = "wasm32"))]
pub use file_backend::FileBackend;
pub use memory_backend::MemoryBackend;
pub use queue::MessageQueue;
pub use retained::RetainedMessages;
pub use sessions::SessionManager;
#[cfg(test)]
mod tests;
use crate::error::Result;
use crate::packet::publish::PublishPacket;
use crate::time::{Duration, SystemTime};
use crate::QoS;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetainedMessage {
pub topic: String,
pub payload: Vec<u8>,
pub qos: QoS,
pub retain: bool,
pub stored_at_secs: u64,
pub message_expiry_interval: Option<u32>,
#[serde(skip)]
pub expires_at: Option<SystemTime>,
}
const CHANGE_ONLY_MAX_TOPICS: usize = 10_000;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ChangeOnlyState {
pub last_payload_hashes: HashMap<String, u64>,
}
impl ChangeOnlyState {
#[must_use]
pub fn compute_hash(payload: &[u8]) -> u64 {
let mut hasher = DefaultHasher::new();
payload.hash(&mut hasher);
hasher.finish()
}
#[must_use]
pub fn should_deliver(&self, topic: &str, payload: &[u8]) -> bool {
let hash = Self::compute_hash(payload);
self.last_payload_hashes.get(topic) != Some(&hash)
}
pub fn update_hash(&mut self, topic: &str, payload: &[u8]) {
let hash = Self::compute_hash(payload);
if self.last_payload_hashes.len() >= CHANGE_ONLY_MAX_TOPICS
&& !self.last_payload_hashes.contains_key(topic)
{
if let Some(key) = self.last_payload_hashes.keys().next().cloned() {
self.last_payload_hashes.remove(&key);
}
}
self.last_payload_hashes.insert(topic.to_string(), hash);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredSubscription {
pub qos: QoS,
pub no_local: bool,
pub retain_as_published: bool,
#[serde(default)]
pub retain_handling: u8,
pub subscription_id: Option<u32>,
#[serde(default = "default_protocol_version")]
pub protocol_version: u8,
#[serde(default)]
pub change_only: bool,
#[serde(default)]
pub flow_id: Option<u64>,
}
fn default_protocol_version() -> u8 {
5
}
impl StoredSubscription {
#[must_use]
pub fn new(qos: QoS) -> Self {
Self {
qos,
no_local: false,
retain_as_published: false,
retain_handling: 0,
subscription_id: None,
protocol_version: 5,
change_only: false,
flow_id: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientSession {
pub client_id: String,
pub persistent: bool,
pub expiry_interval: Option<u32>,
pub subscriptions: HashMap<String, StoredSubscription>,
#[serde(skip, default = "SystemTime::now")]
pub created_at: SystemTime,
#[serde(skip, default = "SystemTime::now")]
pub last_seen: SystemTime,
pub will_message: Option<crate::types::WillMessage>,
pub will_delay_interval: Option<u32>,
#[serde(default = "default_receive_maximum")]
pub receive_maximum: u16,
#[serde(default)]
pub change_only_state: ChangeOnlyState,
#[serde(default)]
pub user_id: Option<String>,
}
fn default_receive_maximum() -> u16 {
65535
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuedMessage {
pub topic: String,
pub payload: Vec<u8>,
pub client_id: String,
pub qos: QoS,
pub queued_at_secs: u64,
pub message_expiry_interval: Option<u32>,
#[serde(skip)]
pub expires_at: Option<SystemTime>,
pub packet_id: Option<u16>,
}
pub trait StorageBackend: Send + Sync {
fn store_retained_message(
&self,
topic: &str,
message: RetainedMessage,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn get_retained_message(
&self,
topic: &str,
) -> impl std::future::Future<Output = Result<Option<RetainedMessage>>> + Send;
fn remove_retained_message(
&self,
topic: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn get_retained_messages(
&self,
topic_filter: &str,
) -> impl std::future::Future<Output = Result<Vec<(String, RetainedMessage)>>> + Send;
fn store_session(
&self,
session: ClientSession,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn get_session(
&self,
client_id: &str,
) -> impl std::future::Future<Output = Result<Option<ClientSession>>> + Send;
fn remove_session(
&self,
client_id: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn queue_message(
&self,
message: QueuedMessage,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn get_queued_messages(
&self,
client_id: &str,
) -> impl std::future::Future<Output = Result<Vec<QueuedMessage>>> + Send;
fn remove_queued_messages(
&self,
client_id: &str,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn store_inflight_message(
&self,
_message: InflightMessage,
) -> impl std::future::Future<Output = Result<()>> + Send {
async { Ok(()) }
}
fn get_inflight_messages(
&self,
_client_id: &str,
) -> impl std::future::Future<Output = Result<Vec<InflightMessage>>> + Send {
async { Ok(Vec::new()) }
}
fn remove_inflight_message(
&self,
_client_id: &str,
_packet_id: u16,
_direction: InflightDirection,
) -> impl std::future::Future<Output = Result<()>> + Send {
async { Ok(()) }
}
fn remove_all_inflight_messages(
&self,
_client_id: &str,
) -> impl std::future::Future<Output = Result<()>> + Send {
async { Ok(()) }
}
fn cleanup_expired(&self) -> impl std::future::Future<Output = Result<()>> + Send;
fn flush_sessions(&self) -> impl std::future::Future<Output = Result<()>> + Send {
async { Ok(()) }
}
}
pub struct Storage<B: StorageBackend> {
backend: Arc<B>,
retained_cache: Arc<RwLock<HashMap<String, RetainedMessage>>>,
sessions_cache: Arc<RwLock<HashMap<String, ClientSession>>>,
dirty_sessions: Arc<RwLock<HashSet<String>>>,
shutdown: Arc<AtomicBool>,
}
impl<B: StorageBackend + 'static> Storage<B> {
pub fn new(backend: B) -> Self {
Self {
backend: Arc::new(backend),
retained_cache: Arc::new(RwLock::new(HashMap::new())),
sessions_cache: Arc::new(RwLock::new(HashMap::new())),
dirty_sessions: Arc::new(RwLock::new(HashSet::new())),
shutdown: Arc::new(AtomicBool::new(false)),
}
}
pub async fn initialize(&self) -> Result<()> {
let retained_messages = self.backend.get_retained_messages("#").await?;
let mut cache = self.retained_cache.write().await;
for (topic, msg) in retained_messages {
cache.insert(topic, msg);
}
Ok(())
}
pub async fn store_retained(&self, topic: &str, message: RetainedMessage) -> Result<()> {
self.retained_cache
.write()
.await
.insert(topic.to_string(), message.clone());
self.backend.store_retained_message(topic, message).await
}
pub async fn get_retained(&self, topic: &str) -> Option<RetainedMessage> {
let cache_result = self.retained_cache.read().await.get(topic).cloned();
if let Some(msg) = cache_result {
if msg.is_expired() {
if let Err(e) = self.remove_retained(topic).await {
warn!("Failed to remove expired retained message for {topic}: {e}");
}
return None;
}
return Some(msg);
}
if let Ok(Some(msg)) = self.backend.get_retained_message(topic).await {
self.retained_cache
.write()
.await
.insert(topic.to_string(), msg.clone());
Some(msg)
} else {
None
}
}
pub async fn remove_retained(&self, topic: &str) -> Result<()> {
self.retained_cache.write().await.remove(topic);
self.backend.remove_retained_message(topic).await
}
pub async fn get_retained_matching(
&self,
topic_filter: &str,
) -> Vec<(String, RetainedMessage)> {
use crate::validation::topic_matches_filter;
self.retained_cache
.read()
.await
.iter()
.filter(|(topic, _)| topic_matches_filter(topic, topic_filter))
.map(|(topic, msg)| (topic.clone(), msg.clone()))
.collect()
}
pub async fn store_session(&self, session: ClientSession) {
let client_id = session.client_id.clone();
self.sessions_cache
.write()
.await
.insert(client_id.clone(), session);
self.dirty_sessions.write().await.insert(client_id);
}
pub async fn flush_sessions(&self) -> Result<()> {
let to_flush: Vec<String> = self.dirty_sessions.read().await.iter().cloned().collect();
if !to_flush.is_empty() {
let cache = self.sessions_cache.read().await;
let mut failed = Vec::new();
for client_id in to_flush {
if let Some(session) = cache.get(&client_id) {
if let Err(e) = self.backend.store_session(session.clone()).await {
tracing::warn!("failed to persist session {}: {}", client_id, e);
failed.push(client_id);
} else {
self.dirty_sessions.write().await.remove(&client_id);
}
} else {
self.dirty_sessions.write().await.remove(&client_id);
}
}
if !failed.is_empty() {
return Err(crate::error::MqttError::Io(format!(
"failed to persist {} sessions",
failed.len()
)));
}
}
self.backend.flush_sessions().await
}
pub fn start_flush_task(self: &Arc<Self>, flush_interval: Duration) {
let storage = Arc::clone(self);
tokio::spawn(async move {
let mut interval = tokio::time::interval(flush_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
if storage.shutdown.load(Ordering::Relaxed) {
if let Err(e) = storage.flush_sessions().await {
tracing::warn!("failed to flush sessions on shutdown: {e}");
}
break;
}
if let Err(e) = storage.flush_sessions().await {
tracing::warn!("failed to flush sessions: {e}");
}
}
});
}
pub async fn shutdown(&self) -> Result<()> {
self.shutdown.store(true, Ordering::Relaxed);
self.flush_sessions().await
}
pub async fn get_session(&self, client_id: &str) -> Option<ClientSession> {
let cache_result = self.sessions_cache.read().await.get(client_id).cloned();
if let Some(session) = cache_result {
if session.is_expired() {
if let Err(e) = self.remove_session(client_id).await {
warn!("Failed to remove expired session for {client_id}: {e}");
}
return None;
}
return Some(session);
}
if let Ok(Some(session)) = self.backend.get_session(client_id).await {
self.sessions_cache
.write()
.await
.insert(client_id.to_string(), session.clone());
Some(session)
} else {
None
}
}
pub async fn remove_session(&self, client_id: &str) -> Result<()> {
self.sessions_cache.write().await.remove(client_id);
self.dirty_sessions.write().await.remove(client_id);
self.backend.remove_session(client_id).await
}
pub async fn queue_message(&self, message: QueuedMessage) -> Result<()> {
self.backend.queue_message(message).await
}
pub async fn get_queued_messages(&self, client_id: &str) -> Result<Vec<QueuedMessage>> {
self.backend.get_queued_messages(client_id).await
}
pub async fn remove_queued_messages(&self, client_id: &str) -> Result<()> {
self.backend.remove_queued_messages(client_id).await
}
pub async fn cleanup_expired(&self) -> Result<()> {
let now = SystemTime::now();
{
let mut cache = self.retained_cache.write().await;
cache.retain(|_, msg| msg.expires_at.is_none_or(|expiry| expiry > now));
}
{
let mut cache = self.sessions_cache.write().await;
cache.retain(|_, session| {
if let Some(expiry_interval) = session.expiry_interval {
let expiry_time =
session.last_seen + Duration::from_secs(u64::from(expiry_interval));
expiry_time > now
} else {
true
}
});
}
self.backend.cleanup_expired().await
}
}
impl RetainedMessage {
#[must_use]
pub fn new(packet: PublishPacket) -> Self {
let now = SystemTime::now();
let stored_at_secs = now
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let message_expiry_interval = packet.properties.get_message_expiry_interval();
let expires_at =
message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval)));
Self {
topic: packet.topic_name,
payload: packet.payload.to_vec(),
qos: packet.qos,
retain: packet.retain,
stored_at_secs,
message_expiry_interval,
expires_at,
}
}
pub fn recompute_expiry(&mut self) {
if let Some(interval) = self.message_expiry_interval {
let stored_at = SystemTime::UNIX_EPOCH + Duration::from_secs(self.stored_at_secs);
self.expires_at = Some(stored_at + Duration::from_secs(u64::from(interval)));
}
}
#[must_use]
pub fn to_publish_packet(&self) -> PublishPacket {
let mut packet = PublishPacket::new(&self.topic, self.payload.clone(), self.qos)
.with_retain(self.retain);
if let Some(remaining) = self.remaining_expiry_interval() {
packet.properties.set_message_expiry_interval(remaining);
}
packet
}
#[must_use]
pub fn remaining_expiry_interval(&self) -> Option<u32> {
self.expires_at.and_then(|expiry| {
expiry
.duration_since(SystemTime::now())
.ok()
.map(|d| u32::try_from(d.as_secs()).unwrap_or(u32::MAX))
})
}
#[must_use]
pub fn is_expired(&self) -> bool {
self.expires_at
.is_some_and(|expiry| SystemTime::now() > expiry)
}
}
impl ClientSession {
#[must_use]
pub fn new(
client_id: impl Into<String>,
persistent: bool,
expiry_interval: Option<u32>,
) -> Self {
let now = SystemTime::now();
Self {
client_id: client_id.into(),
persistent,
expiry_interval,
subscriptions: HashMap::new(),
created_at: now,
last_seen: now,
will_message: None,
will_delay_interval: None,
receive_maximum: 65535,
change_only_state: ChangeOnlyState::default(),
user_id: None,
}
}
#[must_use]
pub fn new_with_will(
client_id: impl Into<String>,
persistent: bool,
expiry_interval: Option<u32>,
will_message: Option<crate::types::WillMessage>,
) -> Self {
let now = SystemTime::now();
let will_delay = will_message
.as_ref()
.and_then(|w| w.properties.will_delay_interval);
Self {
client_id: client_id.into(),
persistent,
expiry_interval,
subscriptions: HashMap::new(),
created_at: now,
last_seen: now,
will_message,
will_delay_interval: will_delay,
receive_maximum: 65535,
change_only_state: ChangeOnlyState::default(),
user_id: None,
}
}
pub fn touch(&mut self) {
self.last_seen = SystemTime::now();
}
pub fn add_subscription(
&mut self,
topic_filter: impl Into<String>,
subscription: StoredSubscription,
) {
self.subscriptions.insert(topic_filter.into(), subscription);
}
pub fn remove_subscription(&mut self, topic_filter: &str) {
self.subscriptions.remove(topic_filter);
}
#[must_use]
pub fn is_expired(&self) -> bool {
if let Some(expiry_interval) = self.expiry_interval {
let expiry_time = self.last_seen + Duration::from_secs(u64::from(expiry_interval));
SystemTime::now() > expiry_time
} else {
false
}
}
}
impl QueuedMessage {
#[must_use]
pub fn new(packet: PublishPacket, client_id: String, qos: QoS, packet_id: Option<u16>) -> Self {
let now = SystemTime::now();
let queued_at_secs = now
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let message_expiry_interval = packet.properties.get_message_expiry_interval();
let expires_at =
message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval)));
Self {
topic: packet.topic_name,
payload: packet.payload.to_vec(),
client_id,
qos,
queued_at_secs,
message_expiry_interval,
expires_at,
packet_id,
}
}
pub fn recompute_expiry(&mut self) {
if let Some(interval) = self.message_expiry_interval {
let queued_at = SystemTime::UNIX_EPOCH + Duration::from_secs(self.queued_at_secs);
self.expires_at = Some(queued_at + Duration::from_secs(u64::from(interval)));
}
}
#[must_use]
pub fn to_publish_packet(&self) -> PublishPacket {
let mut packet = PublishPacket::new(&self.topic, self.payload.clone(), self.qos);
packet.packet_id = self.packet_id;
if let Some(remaining) = self.remaining_expiry_interval() {
packet.properties.set_message_expiry_interval(remaining);
}
packet
}
#[must_use]
pub fn remaining_expiry_interval(&self) -> Option<u32> {
self.expires_at.and_then(|expiry| {
expiry
.duration_since(SystemTime::now())
.ok()
.map(|d| u32::try_from(d.as_secs()).unwrap_or(u32::MAX))
})
}
#[must_use]
pub fn is_expired(&self) -> bool {
self.expires_at
.is_some_and(|expiry| SystemTime::now() > expiry)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum InflightDirection {
Inbound,
Outbound,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum InflightPhase {
AwaitingPubrec,
AwaitingPubrel,
AwaitingPubcomp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InflightMessage {
pub client_id: String,
pub packet_id: u16,
pub direction: InflightDirection,
pub phase: InflightPhase,
pub topic: String,
pub payload: Vec<u8>,
pub qos: QoS,
pub retain: bool,
pub stored_at_secs: u64,
pub message_expiry_interval: Option<u32>,
#[serde(default)]
pub expires_at_secs: Option<u64>,
#[serde(skip)]
expires_at_cache: Option<SystemTime>,
#[serde(default)]
pub user_properties: Vec<(String, String)>,
pub content_type: Option<String>,
pub response_topic: Option<String>,
pub correlation_data: Option<Vec<u8>>,
pub payload_format_indicator: Option<bool>,
}
impl InflightMessage {
#[must_use]
pub fn from_publish(
packet: &PublishPacket,
client_id: String,
direction: InflightDirection,
phase: InflightPhase,
) -> Self {
use crate::protocol::v5::properties::{PropertyId, PropertyValue};
let now = SystemTime::now();
let stored_at_secs = now
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let message_expiry_interval = packet.properties.get_message_expiry_interval();
let expires_at_secs =
message_expiry_interval.map(|interval| stored_at_secs + u64::from(interval));
let expires_at_cache =
message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval)));
let user_properties = packet
.properties
.get_all(PropertyId::UserProperty)
.map(|values| {
values
.iter()
.filter_map(|v| {
if let PropertyValue::Utf8StringPair(k, val) = v {
Some((k.clone(), val.clone()))
} else {
None
}
})
.collect()
})
.unwrap_or_default();
let content_type = packet.properties.get_content_type();
let response_topic = packet
.properties
.get(PropertyId::ResponseTopic)
.and_then(|v| {
if let PropertyValue::Utf8String(s) = v {
Some(s.clone())
} else {
None
}
});
let correlation_data = packet
.properties
.get(PropertyId::CorrelationData)
.and_then(|v| {
if let PropertyValue::BinaryData(b) = v {
Some(b.to_vec())
} else {
None
}
});
let payload_format_indicator = packet
.properties
.get(PropertyId::PayloadFormatIndicator)
.and_then(|v| {
if let PropertyValue::Byte(b) = v {
Some(*b != 0)
} else {
None
}
});
Self {
client_id,
packet_id: packet.packet_id.unwrap_or(0),
direction,
phase,
topic: packet.topic_name.clone(),
payload: packet.payload.to_vec(),
qos: packet.qos,
retain: packet.retain,
stored_at_secs,
message_expiry_interval,
expires_at_secs,
expires_at_cache,
user_properties,
content_type,
response_topic,
correlation_data,
payload_format_indicator,
}
}
#[must_use]
pub fn to_publish_packet(&self) -> PublishPacket {
let mut packet = PublishPacket::new(&self.topic, self.payload.clone(), self.qos)
.with_retain(self.retain);
packet.packet_id = Some(self.packet_id);
if let Some(remaining) = self.remaining_expiry_interval() {
packet.properties.set_message_expiry_interval(remaining);
}
for (key, value) in &self.user_properties {
packet
.properties
.add_user_property(key.clone(), value.clone());
}
if let Some(ref ct) = self.content_type {
packet.properties.set_content_type(ct.clone());
}
if let Some(ref rt) = self.response_topic {
packet.properties.set_response_topic(rt.clone());
}
if let Some(ref cd) = self.correlation_data {
packet.properties.set_correlation_data(cd.clone().into());
}
if let Some(pfi) = self.payload_format_indicator {
packet.properties.set_payload_format_indicator(pfi);
}
packet
}
fn resolved_expires_at(&self) -> Option<SystemTime> {
self.expires_at_cache.or_else(|| {
self.expires_at_secs
.map(|secs| SystemTime::UNIX_EPOCH + Duration::from_secs(secs))
})
}
#[must_use]
pub fn remaining_expiry_interval(&self) -> Option<u32> {
self.resolved_expires_at().and_then(|expiry| {
expiry
.duration_since(SystemTime::now())
.ok()
.map(|d| u32::try_from(d.as_secs()).unwrap_or(u32::MAX))
})
}
#[must_use]
pub fn is_expired(&self) -> bool {
self.resolved_expires_at()
.is_some_and(|expiry| SystemTime::now() > expiry)
}
}
pub enum DynamicStorage {
#[cfg(not(target_arch = "wasm32"))]
File(FileBackend),
Memory(MemoryBackend),
}
impl StorageBackend for DynamicStorage {
async fn store_retained_message(&self, topic: &str, message: RetainedMessage) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.store_retained_message(topic, message).await,
Self::Memory(backend) => backend.store_retained_message(topic, message).await,
}
}
async fn get_retained_message(&self, topic: &str) -> Result<Option<RetainedMessage>> {
match self {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.get_retained_message(topic).await,
Self::Memory(backend) => backend.get_retained_message(topic).await,
}
}
async fn remove_retained_message(&self, topic: &str) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.remove_retained_message(topic).await,
Self::Memory(backend) => backend.remove_retained_message(topic).await,
}
}
async fn get_retained_messages(
&self,
topic_filter: &str,
) -> Result<Vec<(String, RetainedMessage)>> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.get_retained_messages(topic_filter).await,
Self::Memory(backend) => backend.get_retained_messages(topic_filter).await,
}
}
async fn store_session(&self, session: ClientSession) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.store_session(session).await,
Self::Memory(backend) => backend.store_session(session).await,
}
}
async fn get_session(&self, client_id: &str) -> Result<Option<ClientSession>> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.get_session(client_id).await,
Self::Memory(backend) => backend.get_session(client_id).await,
}
}
async fn remove_session(&self, client_id: &str) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.remove_session(client_id).await,
Self::Memory(backend) => backend.remove_session(client_id).await,
}
}
async fn queue_message(&self, message: QueuedMessage) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.queue_message(message).await,
Self::Memory(backend) => backend.queue_message(message).await,
}
}
async fn get_queued_messages(&self, client_id: &str) -> Result<Vec<QueuedMessage>> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.get_queued_messages(client_id).await,
Self::Memory(backend) => backend.get_queued_messages(client_id).await,
}
}
async fn remove_queued_messages(&self, client_id: &str) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.remove_queued_messages(client_id).await,
Self::Memory(backend) => backend.remove_queued_messages(client_id).await,
}
}
async fn store_inflight_message(&self, message: InflightMessage) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.store_inflight_message(message).await,
Self::Memory(backend) => backend.store_inflight_message(message).await,
}
}
async fn get_inflight_messages(&self, client_id: &str) -> Result<Vec<InflightMessage>> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.get_inflight_messages(client_id).await,
Self::Memory(backend) => backend.get_inflight_messages(client_id).await,
}
}
async fn remove_inflight_message(
&self,
client_id: &str,
packet_id: u16,
direction: InflightDirection,
) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => {
backend
.remove_inflight_message(client_id, packet_id, direction)
.await
}
Self::Memory(backend) => {
backend
.remove_inflight_message(client_id, packet_id, direction)
.await
}
}
}
async fn remove_all_inflight_messages(&self, client_id: &str) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.remove_all_inflight_messages(client_id).await,
Self::Memory(backend) => backend.remove_all_inflight_messages(client_id).await,
}
}
async fn cleanup_expired(&self) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.cleanup_expired().await,
Self::Memory(backend) => backend.cleanup_expired().await,
}
}
}
impl DynamicStorage {
#[cfg_attr(target_arch = "wasm32", allow(clippy::unused_async))]
pub async fn flush_sessions(&self) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.flush_sessions().await,
Self::Memory(_) => Ok(()),
}
}
#[cfg_attr(target_arch = "wasm32", allow(clippy::unused_async))]
pub async fn shutdown(&self) -> Result<()> {
match self {
#[cfg(not(target_arch = "wasm32"))]
Self::File(backend) => backend.shutdown().await,
Self::Memory(_) => Ok(()),
}
}
}