gmqtt_client/client/
builder.rs1use mqttbytes::{v5::SubscribeFilter, QoS};
2use rand::{distributions::Alphanumeric, thread_rng, Rng};
3use url::Url;
4
5use super::{
6 mqtt::MqttWorker, Message, MqttClient, MqttClientConfig, OnConnectedCallback, OnMessageCallback,
7};
8use crate::client::config::OnMessageOwnedCallback;
9use std::time::Instant;
10
11pub struct MqttClientBuilder {
12 mqtt_broker: Url,
13 client_id: Option<String>,
14 clean_session: bool,
15
16 subscribe_filters: Vec<SubscribeFilter>,
17 on_message_callback: Option<Box<OnMessageCallback>>,
18 on_message_owned_callback: Option<Box<OnMessageOwnedCallback>>,
19 on_connected_callback: Option<Box<OnConnectedCallback>>,
20}
21
22impl MqttClientBuilder {
23 pub fn new(url: Url) -> Self {
24 Self {
25 mqtt_broker: url,
26 client_id: None,
27 clean_session: false,
28 subscribe_filters: vec![],
29 on_message_callback: None,
30 on_message_owned_callback: None,
31 on_connected_callback: None,
32 }
33 }
34
35 pub fn client_id<S: Into<String>>(mut self, client_id: S) -> Self {
36 self.client_id = Some(client_id.into());
37 self
38 }
39
40 pub fn clean_session(mut self, clean_session: bool) -> Self {
48 self.clean_session = clean_session;
49 self
50 }
51
52 pub fn subscribe<S: Into<String>>(mut self, filter: S, qos: QoS) -> Self {
53 if qos == QoS::ExactlyOnce {
54 panic!("Quantity of Service 2 (Exactly Once) is not supported");
56 }
57
58 let sf = SubscribeFilter::new(filter.into(), qos);
59 self.subscribe_filters.push(sf);
60 self
61 }
62
63 pub fn on_message_callback<F>(mut self, callback: F) -> Self
64 where
65 F: Fn(&Message) + Send + Sync + 'static,
66 {
67 self.on_message_callback = Some(Box::new(callback));
68 self
69 }
70
71 pub fn on_connected_callback<F>(mut self, callback: F) -> Self
72 where
73 F: Fn() + Send + Sync + 'static,
74 {
75 self.on_connected_callback = Some(Box::new(callback));
76 self
77 }
78
79 pub fn on_message_owned_callback<F>(mut self, callback: F) -> Self
80 where
81 F: Fn(Message, Instant) + Send + 'static,
82 {
83 self.on_message_owned_callback = Some(Box::new(callback));
84 self
85 }
86
87 pub fn build(self) -> (MqttClient, MqttWorker) {
88 let url = self.mqtt_broker;
89
90 let client_id = self.client_id.unwrap_or_else(|| {
91 if !url.username().is_empty() {
92 format!("{}-{}", url.username(), random_string(8))
93 } else {
94 random_string(16)
95 }
96 });
97
98 let config = MqttClientConfig {
99 client_id,
100 subscribe_filters: self.subscribe_filters,
101 on_message_callback: self.on_message_callback,
102 on_connected_callback: self.on_connected_callback,
103 clean_session: self.clean_session,
104 };
105
106 MqttClient::new(url, config, self.on_message_owned_callback)
107 }
108}
109
110fn random_string(length: usize) -> String {
111 thread_rng()
112 .sample_iter(&Alphanumeric)
113 .take(length)
114 .map(char::from)
115 .collect()
116}