use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttConfig {
pub broker_url: String,
pub client_id: String,
pub clean_session: bool,
pub keep_alive_secs: u16,
pub default_qos: QoS,
pub username: Option<String>,
pub password: Option<String>,
pub tls: Option<MqttTlsConfig>,
pub reconnect: MqttReconnectConfig,
pub last_will: Option<LastWillConfig>,
pub protocol_version: MqttProtocolVersion,
pub session_expiry_interval: Option<u32>,
pub max_packet_size: Option<u32>,
pub request_response_info: bool,
pub request_problem_info: bool,
}
impl Default for MqttConfig {
fn default() -> Self {
Self {
broker_url: "tcp://localhost:1883".to_string(),
client_id: format!("oxirs-mqtt-{}", uuid::Uuid::new_v4()),
clean_session: true,
keep_alive_secs: 60,
default_qos: QoS::AtLeastOnce,
username: None,
password: None,
tls: None,
reconnect: MqttReconnectConfig::default(),
last_will: None,
protocol_version: MqttProtocolVersion::V311,
session_expiry_interval: None,
max_packet_size: None,
request_response_info: false,
request_problem_info: true,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum QoS {
AtMostOnce = 0,
AtLeastOnce = 1,
ExactlyOnce = 2,
}
impl From<QoS> for u8 {
fn from(qos: QoS) -> Self {
qos as u8
}
}
impl TryFrom<u8> for QoS {
type Error = String;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(QoS::AtMostOnce),
1 => Ok(QoS::AtLeastOnce),
2 => Ok(QoS::ExactlyOnce),
_ => Err(format!("Invalid QoS level: {}", value)),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum MqttProtocolVersion {
V311,
V5,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttTlsConfig {
pub ca_cert_path: Option<String>,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
pub insecure_skip_verify: bool,
pub alpn_protocols: Vec<String>,
}
impl Default for MqttTlsConfig {
fn default() -> Self {
Self {
ca_cert_path: None,
client_cert_path: None,
client_key_path: None,
insecure_skip_verify: false,
alpn_protocols: vec!["mqtt".to_string()],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttReconnectConfig {
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub backoff_factor: f64,
pub max_attempts: u32,
pub jitter: f64,
}
impl Default for MqttReconnectConfig {
fn default() -> Self {
Self {
initial_delay_ms: 1000,
max_delay_ms: 60000,
backoff_factor: 2.0,
max_attempts: 0,
jitter: 0.1,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LastWillConfig {
pub topic: String,
pub payload: Vec<u8>,
pub qos: QoS,
pub retain: bool,
pub will_delay_interval: Option<u32>,
pub message_expiry_interval: Option<u32>,
pub content_type: Option<String>,
pub response_topic: Option<String>,
pub user_properties: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicSubscription {
pub topic_pattern: String,
pub qos: QoS,
pub payload_format: PayloadFormat,
pub rdf_mapping: TopicRdfMapping,
pub options: Option<SubscriptionOptions>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum PayloadFormat {
Json {
schema: Option<String>,
root_path: Option<String>,
},
SparkplugB {
namespace: String,
},
Protobuf {
schema: String,
message_type: String,
},
Avro {
schema: String,
},
Csv {
delimiter: char,
headers: Vec<String>,
skip_header: bool,
},
PlainText {
datatype: String,
},
Raw,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicRdfMapping {
pub subject_pattern: String,
pub predicate_map: HashMap<String, String>,
pub graph_pattern: Option<String>,
pub type_uri: Option<String>,
pub timestamp_field: Option<String>,
pub timestamp_predicate: Option<String>,
pub transformations: Vec<ValueTransformation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValueTransformation {
pub field: String,
pub operation: TransformOperation,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum TransformOperation {
Scale { factor: f64 },
Offset { value: f64 },
UnitConversion { from: String, to: String },
Formula { expression: String },
LookupTable { table: HashMap<String, String> },
RegexReplace {
pattern: String,
replacement: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionOptions {
pub no_local: bool,
pub retain_as_published: bool,
pub retain_handling: RetainHandling,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum RetainHandling {
SendAtSubscribe = 0,
SendAtSubscribeNew = 1,
DontSend = 2,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct MqttStats {
pub messages_published: u64,
pub messages_received: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connection_count: u64,
pub reconnection_count: u64,
pub last_connected_at: Option<chrono::DateTime<chrono::Utc>>,
pub last_disconnected_at: Option<chrono::DateTime<chrono::Utc>>,
pub qos0_count: u64,
pub qos1_count: u64,
pub qos2_count: u64,
pub publish_failures: u64,
pub subscribe_failures: u64,
}
#[derive(Debug, Clone)]
pub struct MqttMessage {
pub topic: String,
pub qos: QoS,
pub retain: bool,
pub payload: Vec<u8>,
pub properties: Option<MqttMessageProperties>,
pub received_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttMessageProperties {
pub payload_format_indicator: Option<u8>,
pub message_expiry_interval: Option<u32>,
pub topic_alias: Option<u16>,
pub response_topic: Option<String>,
pub correlation_data: Option<Vec<u8>>,
pub user_properties: HashMap<String, String>,
pub subscription_identifier: Option<u32>,
pub content_type: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_qos_conversion() {
assert_eq!(u8::from(QoS::AtMostOnce), 0);
assert_eq!(u8::from(QoS::AtLeastOnce), 1);
assert_eq!(u8::from(QoS::ExactlyOnce), 2);
assert_eq!(QoS::try_from(0).unwrap(), QoS::AtMostOnce);
assert_eq!(QoS::try_from(1).unwrap(), QoS::AtLeastOnce);
assert_eq!(QoS::try_from(2).unwrap(), QoS::ExactlyOnce);
assert!(QoS::try_from(3).is_err());
}
#[test]
fn test_default_config() {
let config = MqttConfig::default();
assert!(config.broker_url.starts_with("tcp://"));
assert!(!config.client_id.is_empty());
assert_eq!(config.default_qos, QoS::AtLeastOnce);
assert_eq!(config.keep_alive_secs, 60);
}
#[test]
fn test_default_stats() {
let stats = MqttStats::default();
assert_eq!(stats.messages_published, 0);
assert_eq!(stats.messages_received, 0);
assert_eq!(stats.connection_count, 0);
}
}