use alloc::string::String;
use alloc::vec::Vec;
use mqtt_proto::{QoS, TopicName};
use crate::error::ConfigError;
mod common;
mod protocol;
pub use common::*;
pub use protocol::*;
#[derive(Debug, Clone)]
pub struct WillMessage {
pub topic: TopicName,
pub payload: Vec<u8>,
pub qos: QoS,
pub retain: bool,
}
impl WillMessage {
pub fn new(topic: TopicName, payload: Vec<u8>, qos: QoS, retain: bool) -> Self {
Self {
topic,
payload,
qos,
retain,
}
}
pub fn with_topic_str(
topic: &str,
payload: Vec<u8>,
qos: QoS,
retain: bool,
) -> Result<Self, mqtt_proto::Error> {
let topic = TopicName::try_from(topic)?;
Ok(Self::new(topic, payload, qos, retain))
}
pub fn text(topic: &str, message: &str, qos: QoS) -> Result<Self, mqtt_proto::Error> {
Self::with_topic_str(topic, message.as_bytes().to_vec(), qos, false)
}
}
#[derive(Debug, Clone)]
pub struct PublishConfig {
pub qos: QoS,
pub retain: bool,
pub timeout_ms: Option<u32>,
pub enable_dup_flag: bool,
}
impl Default for PublishConfig {
fn default() -> Self {
Self::qos0()
}
}
impl PublishConfig {
pub fn qos0() -> Self {
Self {
qos: QoS::Level0,
retain: false,
timeout_ms: None,
enable_dup_flag: false,
}
}
pub fn qos1() -> Self {
Self {
qos: QoS::Level1,
retain: false,
timeout_ms: Some(5_000),
enable_dup_flag: true,
}
}
pub fn qos2() -> Self {
Self {
qos: QoS::Level2,
retain: false,
timeout_ms: Some(10_000),
enable_dup_flag: true,
}
}
pub fn retained(qos: QoS) -> Self {
Self {
qos,
retain: true,
timeout_ms: if matches!(qos, QoS::Level0) {
None
} else {
Some(5_000)
},
enable_dup_flag: !matches!(qos, QoS::Level0),
}
}
pub fn with_qos(mut self, qos: QoS) -> Self {
self.qos = qos;
self
}
pub fn with_retain(mut self, retain: bool) -> Self {
self.retain = retain;
self
}
pub fn with_timeout(mut self, timeout_ms: u32) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}
pub fn with_dup_flag(mut self, enable: bool) -> Self {
self.enable_dup_flag = enable;
self
}
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub client_id: String,
pub username: Option<String>,
pub password: Option<Vec<u8>>,
pub keep_alive: u16,
pub clean_session: bool,
pub will_message: Option<WillMessage>,
pub connect_timeout_ms: u32,
pub reconnect: ReconnectConfig,
pub transport: TransportConfig,
pub max_subscriptions: usize,
pub max_inflight_messages: usize,
pub enable_auto_ping: bool,
pub max_packet_size: u32,
pub enable_packet_id_validation: bool,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
client_id: "masaka_client".into(),
username: None,
password: None,
keep_alive: 60,
clean_session: true,
will_message: None,
connect_timeout_ms: 30_000,
reconnect: ReconnectConfig::default(),
transport: TransportConfig::default(),
max_subscriptions: 32,
max_inflight_messages: 64,
enable_auto_ping: true,
max_packet_size: 268_435_456, enable_packet_id_validation: true,
}
}
}
impl ClientConfig {
pub fn new(client_id: impl Into<String>) -> Self {
Self {
client_id: client_id.into(),
..Default::default()
}
}
pub fn with_credentials(
mut self,
username: impl Into<String>,
password: impl Into<Vec<u8>>,
) -> Self {
self.username = Some(username.into());
self.password = Some(password.into());
self
}
pub fn with_string_password(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.username = Some(username.into());
self.password = Some(password.into().into_bytes());
self
}
pub fn with_keep_alive(mut self, keep_alive: u16) -> Self {
self.keep_alive = keep_alive;
self
}
pub fn with_clean_session(mut self, clean_session: bool) -> Self {
self.clean_session = clean_session;
self
}
pub fn with_will_message(mut self, will_message: WillMessage) -> Self {
self.will_message = Some(will_message);
self
}
pub fn with_will_text(
mut self,
topic: &str,
message: &str,
qos: QoS,
) -> Result<Self, mqtt_proto::Error> {
self.will_message = Some(WillMessage::text(topic, message, qos)?);
Ok(self)
}
pub fn with_connect_timeout(mut self, timeout_ms: u32) -> Self {
self.connect_timeout_ms = timeout_ms;
self
}
pub fn with_reconnect(mut self, reconnect: ReconnectConfig) -> Self {
self.reconnect = reconnect;
self
}
pub fn with_transport(mut self, transport: TransportConfig) -> Self {
self.transport = transport;
self
}
pub fn with_max_subscriptions(mut self, max_subscriptions: usize) -> Self {
self.max_subscriptions = max_subscriptions;
self
}
pub fn with_max_inflight_messages(mut self, max_inflight_messages: usize) -> Self {
self.max_inflight_messages = max_inflight_messages;
self
}
pub fn with_auto_ping(mut self, enable: bool) -> Self {
self.enable_auto_ping = enable;
self
}
pub fn with_max_packet_size(mut self, max_packet_size: u32) -> Self {
self.max_packet_size = max_packet_size;
self
}
pub fn with_packet_id_validation(mut self, enable: bool) -> Self {
self.enable_packet_id_validation = enable;
self
}
pub fn iot_device(client_id: impl Into<String>) -> Self {
Self::new(client_id)
.with_keep_alive(30)
.with_max_subscriptions(8)
.with_max_inflight_messages(16)
.with_transport(TransportConfig::low_latency())
.with_reconnect(ReconnectConfig::exponential_backoff(500, 30_000, 2.0, 10))
}
pub fn server_application(client_id: impl Into<String>) -> Self {
Self::new(client_id)
.with_keep_alive(120)
.with_max_subscriptions(256)
.with_max_inflight_messages(1024)
.with_transport(TransportConfig::high_throughput())
.with_reconnect(ReconnectConfig::exponential_backoff(1000, 60_000, 1.5, 0))
}
pub fn validate(&self) -> Result<(), ConfigError> {
if self.client_id.is_empty() {
return Err(ConfigError::InvalidClientId(
"Client ID cannot be empty".into(),
));
}
if self.client_id.len() > 23 {
return Err(ConfigError::InvalidClientId(
"MQTT v3.1.1 client ID cannot exceed 23 characters".into(),
));
}
if let Some(will) = &self.will_message {
if will.payload.len() > self.max_packet_size as usize {
return Err(ConfigError::InvalidWillMessage(
"Will message payload exceeds maximum packet size".into(),
));
}
}
if self.max_subscriptions == 0 {
return Err(ConfigError::InvalidLimit(
"Maximum subscriptions must be > 0".into(),
));
}
if self.max_inflight_messages == 0 {
return Err(ConfigError::InvalidLimit(
"Maximum in-flight messages must be > 0".into(),
));
}
if self.transport.connect_timeout_ms == 0 {
return Err(ConfigError::InvalidTimeout(
"Connection timeout must be > 0".into(),
));
}
Ok(())
}
}