mqtt5 0.31.2

Complete MQTT v5.0 platform with high-performance async client and full-featured broker supporting TCP, TLS, WebSocket, authentication, bridging, and resource monitoring
Documentation
use crate::codec::CodecRegistry;
use crate::session::SessionConfig;
use mqtt5_protocol::time::Duration;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

pub use mqtt5_protocol::ReasonCode;
pub use mqtt5_protocol::ReconnectConfig;

#[derive(Clone, Default)]
pub struct ConnectOptions {
    pub protocol_options: mqtt5_protocol::ConnectOptions,
    pub session_config: SessionConfig,
    pub reconnect_config: ReconnectConfig,
    pub keepalive_config: Option<mqtt5_protocol::KeepaliveConfig>,
    pub codec_registry: Option<Arc<CodecRegistry>>,
}

impl ConnectOptions {
    #[must_use]
    pub fn new(client_id: impl Into<String>) -> Self {
        Self {
            protocol_options: mqtt5_protocol::ConnectOptions::new(client_id),
            session_config: SessionConfig::default(),
            reconnect_config: ReconnectConfig::default(),
            keepalive_config: None,
            codec_registry: None,
        }
    }

    #[must_use]
    pub fn with_keep_alive(mut self, duration: Duration) -> Self {
        self.protocol_options = self.protocol_options.with_keep_alive(duration);
        self
    }

    #[must_use]
    pub fn with_clean_start(mut self, clean: bool) -> Self {
        self.protocol_options = self.protocol_options.with_clean_start(clean);
        self
    }

    #[must_use]
    pub fn with_credentials(
        mut self,
        username: impl Into<String>,
        password: impl AsRef<[u8]>,
    ) -> Self {
        self.protocol_options = self.protocol_options.with_credentials(username, password);
        self
    }

    #[must_use]
    pub fn with_will(mut self, will: mqtt5_protocol::WillMessage) -> Self {
        self.protocol_options = self.protocol_options.with_will(will);
        self
    }

    #[must_use]
    pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
        self.protocol_options = self.protocol_options.with_session_expiry_interval(interval);
        self
    }

    #[must_use]
    pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
        self.protocol_options = self.protocol_options.with_receive_maximum(receive_maximum);
        self
    }

    #[must_use]
    pub fn with_automatic_reconnect(mut self, enabled: bool) -> Self {
        self.reconnect_config.enabled = enabled;
        self
    }

    #[must_use]
    pub fn with_reconnect_delay(mut self, initial: Duration, max: Duration) -> Self {
        self.reconnect_config.initial_delay = initial;
        self.reconnect_config.max_delay = max;
        self
    }

    #[must_use]
    pub fn with_max_reconnect_attempts(mut self, attempts: u32) -> Self {
        self.reconnect_config.max_attempts = Some(attempts);
        self
    }

    #[must_use]
    pub fn with_protocol_version(mut self, version: mqtt5_protocol::ProtocolVersion) -> Self {
        self.protocol_options = self.protocol_options.with_protocol_version(version);
        self
    }

    #[must_use]
    pub fn with_authentication_method(mut self, method: impl Into<String>) -> Self {
        self.protocol_options = self.protocol_options.with_authentication_method(method);
        self
    }

    #[must_use]
    pub fn with_authentication_data(mut self, data: impl AsRef<[u8]>) -> Self {
        self.protocol_options = self.protocol_options.with_authentication_data(data);
        self
    }

    #[must_use]
    pub fn with_keepalive_config(mut self, config: mqtt5_protocol::KeepaliveConfig) -> Self {
        self.keepalive_config = Some(config);
        self
    }

    #[must_use]
    pub fn with_keepalive_timeout_percent(mut self, timeout_percent: u8) -> Self {
        let config = self.keepalive_config.unwrap_or_default();
        self.keepalive_config = Some(mqtt5_protocol::KeepaliveConfig {
            ping_interval_percent: config.ping_interval_percent,
            timeout_percent,
            lock_retry_attempts: config.lock_retry_attempts,
            lock_retry_delay_ms: config.lock_retry_delay_ms,
        });
        self
    }

    #[must_use]
    pub fn with_codec_registry(mut self, registry: Arc<CodecRegistry>) -> Self {
        self.codec_registry = Some(registry);
        self
    }
}

impl std::fmt::Debug for ConnectOptions {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ConnectOptions")
            .field("protocol_options", &self.protocol_options)
            .field("session_config", &self.session_config)
            .field("reconnect_config", &self.reconnect_config)
            .field("keepalive_config", &self.keepalive_config)
            .field(
                "codec_registry",
                &self.codec_registry.as_ref().map(|_| "CodecRegistry"),
            )
            .finish()
    }
}

impl Deref for ConnectOptions {
    type Target = mqtt5_protocol::ConnectOptions;

    fn deref(&self) -> &Self::Target {
        &self.protocol_options
    }
}

impl DerefMut for ConnectOptions {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.protocol_options
    }
}

pub use mqtt5_protocol::{
    ConnectProperties, ConnectResult, KeepaliveConfig, Message, MessageProperties, ProtocolVersion,
    PublishOptions, PublishProperties, PublishResult, RetainHandling, SubscribeOptions,
    WillMessage, WillProperties,
};

#[derive(Debug, Clone, Default)]
pub struct ConnectionStats {
    pub messages_sent: u64,
    pub messages_received: u64,
    pub bytes_sent: u64,
    pub bytes_received: u64,
    pub connect_time: Option<crate::time::Instant>,
    pub last_message_time: Option<crate::time::Instant>,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_connect_options_default_keepalive_config() {
        let options = ConnectOptions::new("test-client");
        assert!(options.keepalive_config.is_none());
    }

    #[test]
    fn test_connect_options_with_keepalive_config() {
        let config = KeepaliveConfig::new(50, 200);
        let options = ConnectOptions::new("test-client").with_keepalive_config(config);

        let stored = options.keepalive_config.unwrap();
        assert_eq!(stored.ping_interval_percent, 50);
        assert_eq!(stored.timeout_percent, 200);
    }

    #[test]
    fn test_connect_options_with_keepalive_timeout_percent() {
        let options = ConnectOptions::new("test-client").with_keepalive_timeout_percent(250);

        let stored = options.keepalive_config.unwrap();
        assert_eq!(stored.ping_interval_percent, 75);
        assert_eq!(stored.timeout_percent, 250);
    }

    #[test]
    fn test_connect_options_keepalive_timeout_preserves_ping_interval() {
        let options = ConnectOptions::new("test-client")
            .with_keepalive_config(KeepaliveConfig::new(60, 150))
            .with_keepalive_timeout_percent(200);

        let stored = options.keepalive_config.unwrap();
        assert_eq!(stored.ping_interval_percent, 60);
        assert_eq!(stored.timeout_percent, 200);
    }

    #[test]
    fn test_keepalive_config_timeout_calculation() {
        let config = KeepaliveConfig::new(75, 200);
        let keepalive = Duration::from_secs(60);

        let timeout = config.timeout_duration(keepalive);
        assert_eq!(timeout, Duration::from_secs(120));
    }

    #[test]
    fn test_keepalive_timeout_percent_preserves_lock_retry() {
        let options = ConnectOptions::new("test-client")
            .with_keepalive_config(KeepaliveConfig::new(60, 150).with_lock_retry(50, 25))
            .with_keepalive_timeout_percent(200);

        let stored = options.keepalive_config.unwrap();
        assert_eq!(stored.lock_retry_attempts, 50);
        assert_eq!(stored.lock_retry_delay_ms, 25);
    }
}