gmqtt_client/client/
builder.rs

1use mqttbytes::{v5::SubscribeFilter, QoS};
2use rand::{distributions::Alphanumeric, thread_rng, Rng};
3use url::Url;
4
5use super::{
6    mqtt::MqttWorker, Message, MqttClient, MqttClientConfig, OnConnectedCallback, OnMessageCallback,
7};
8use crate::client::config::OnMessageOwnedCallback;
9use std::time::Instant;
10
11pub struct MqttClientBuilder {
12    mqtt_broker: Url,
13    client_id: Option<String>,
14    clean_session: bool,
15
16    subscribe_filters: Vec<SubscribeFilter>,
17    on_message_callback: Option<Box<OnMessageCallback>>,
18    on_message_owned_callback: Option<Box<OnMessageOwnedCallback>>,
19    on_connected_callback: Option<Box<OnConnectedCallback>>,
20}
21
22impl MqttClientBuilder {
23    pub fn new(url: Url) -> Self {
24        Self {
25            mqtt_broker: url,
26            client_id: None,
27            clean_session: false,
28            subscribe_filters: vec![],
29            on_message_callback: None,
30            on_message_owned_callback: None,
31            on_connected_callback: None,
32        }
33    }
34
35    pub fn client_id<S: Into<String>>(mut self, client_id: S) -> Self {
36        self.client_id = Some(client_id.into());
37        self
38    }
39
40    /// When the clean session flag is set to true, the client does not want a persistent session.
41    /// If the client disconnects for any reason, all information and messages that are queued from a previous persistent session are lost.
42    ///
43    /// When the clean session flag is set to false, the broker creates a persistent session for the client.
44    /// All information and messages are preserved until the next time that the client requests a clean session.
45    /// If the clean session flag is set to false and the broker already has a session available for the client,
46    /// it uses the existing session and delivers previously queued messages to the client.
47    pub fn clean_session(mut self, clean_session: bool) -> Self {
48        self.clean_session = clean_session;
49        self
50    }
51
52    pub fn subscribe<S: Into<String>>(mut self, filter: S, qos: QoS) -> Self {
53        if qos == QoS::ExactlyOnce {
54            // TODO: Add QoS 2 support for incoming messages
55            panic!("Quantity of Service 2 (Exactly Once) is not supported");
56        }
57
58        let sf = SubscribeFilter::new(filter.into(), qos);
59        self.subscribe_filters.push(sf);
60        self
61    }
62
63    pub fn on_message_callback<F>(mut self, callback: F) -> Self
64    where
65        F: Fn(&Message) + Send + Sync + 'static,
66    {
67        self.on_message_callback = Some(Box::new(callback));
68        self
69    }
70
71    pub fn on_connected_callback<F>(mut self, callback: F) -> Self
72    where
73        F: Fn() + Send + Sync + 'static,
74    {
75        self.on_connected_callback = Some(Box::new(callback));
76        self
77    }
78
79    pub fn on_message_owned_callback<F>(mut self, callback: F) -> Self
80    where
81        F: Fn(Message, Instant) + Send + 'static,
82    {
83        self.on_message_owned_callback = Some(Box::new(callback));
84        self
85    }
86
87    pub fn build(self) -> (MqttClient, MqttWorker) {
88        let url = self.mqtt_broker;
89
90        let client_id = self.client_id.unwrap_or_else(|| {
91            if !url.username().is_empty() {
92                format!("{}-{}", url.username(), random_string(8))
93            } else {
94                random_string(16)
95            }
96        });
97
98        let config = MqttClientConfig {
99            client_id,
100            subscribe_filters: self.subscribe_filters,
101            on_message_callback: self.on_message_callback,
102            on_connected_callback: self.on_connected_callback,
103            clean_session: self.clean_session,
104        };
105
106        MqttClient::new(url, config, self.on_message_owned_callback)
107    }
108}
109
110fn random_string(length: usize) -> String {
111    thread_rng()
112        .sample_iter(&Alphanumeric)
113        .take(length)
114        .map(char::from)
115        .collect()
116}