1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/// MQTT Protocol Adapter
///
/// Bridges MQTT clients to the FreshBlu messaging system using rumqttd embedded broker.
/// MQTT topic format: <uuid>/<event_type>
/// Authentication: username=uuid, password=token
///
/// Meshblu-compatible: devices can connect over MQTT and receive/send messages.
use freshblu_core::message::{DeviceEvent, SendMessageParams};
use freshblu_core::permissions::PermissionChecker;
use freshblu_store::DynStore;
use rumqttd::{Broker, Config, ConnectionSettings, RouterConfig, ServerSettings};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::bus::DynBus;
pub struct MqttAdapter {
store: DynStore,
bus: DynBus,
port: u16,
}
impl MqttAdapter {
pub fn new(store: DynStore, bus: DynBus, port: u16) -> Self {
Self { store, bus, port }
}
/// Start the MQTT broker - runs in background task
pub async fn start(self) -> anyhow::Result<()> {
info!("MQTT adapter starting on port {}", self.port);
let store = self.store.clone();
let auth_handler: rumqttd::AuthHandler = Arc::new(move |_client_id, username, password| {
let uuid = match Uuid::parse_str(&username) {
Ok(u) => u,
Err(_) => return false,
};
// Wrap in spawn_blocking so bcrypt doesn't stall the event loop
let store = store.clone();
let rt = tokio::runtime::Handle::current();
rt.block_on(async move {
matches!(
tokio::task::spawn_blocking(move || {
let rt2 = tokio::runtime::Handle::current();
rt2.block_on(store.authenticate(&uuid, &password))
})
.await,
Ok(Ok(Some(_)))
)
})
});
let addr: SocketAddr = format!("0.0.0.0:{}", self.port).parse()?;
let mut server_settings = HashMap::new();
server_settings.insert(
"freshblu".to_string(),
ServerSettings {
name: "freshblu-mqtt".to_string(),
listen: addr,
tls: None,
next_connection_delay_ms: 0,
connections: ConnectionSettings {
connection_timeout_ms: 5000,
max_payload_size: 1_048_576, // 1MB
max_inflight_count: 100,
auth: None,
external_auth: Some(auth_handler),
dynamic_filters: true,
},
},
);
let config = Config {
id: 0,
router: RouterConfig {
max_connections: 10000,
max_outgoing_packet_count: 200,
max_segment_size: 100 * 1024,
max_segment_count: 10,
..Default::default()
},
v4: Some(server_settings),
..Default::default()
};
let mut broker = Broker::new(config);
// Get a programmatic link for bridging MQTT <-> MessageBus
let (mut link_tx, mut link_rx) = broker
.link("freshblu-bridge")
.map_err(|e| anyhow::anyhow!("failed to create broker link: {:?}", e))?;
// Subscribe to all topics via wildcard
let _ = link_tx.subscribe("#");
// Start the broker in a background thread (it blocks)
std::thread::spawn(move || {
if let Err(e) = broker.start() {
error!("MQTT broker error: {:?}", e);
}
});
let store = self.store.clone();
let bus = self.bus.clone();
// Bridge task: forward MQTT publishes to MessageBus
tokio::spawn(async move {
loop {
match link_rx.next().await {
Ok(Some(notification)) => {
if let rumqttd::Notification::Forward(forward) = notification {
let topic = forward.publish.topic.clone();
let payload = forward.publish.payload.clone();
let topic_str = String::from_utf8_lossy(&topic);
if let Some((target_uuid, event_type)) = parse_mqtt_topic(&topic_str) {
match event_type {
"message" => {
if let Ok(params) =
serde_json::from_slice::<SendMessageParams>(&payload)
{
let msg = freshblu_core::message::Message {
devices: params.devices.clone(),
from_uuid: Some(target_uuid),
topic: params.topic.clone(),
payload: params.payload.clone(),
metadata: None,
extra: params.extra.clone(),
};
for device_id in ¶ms.devices {
if device_id == "*" {
continue;
}
if let Ok(dest_uuid) = Uuid::parse_str(device_id) {
// Check can_message_from permission
let allowed = match store
.get_device(&dest_uuid)
.await
{
Ok(Some(dest_device)) => {
let checker = PermissionChecker::new(
&dest_device.meshblu.whitelists,
&target_uuid,
&dest_uuid,
);
checker.can_message_from()
}
_ => false,
};
if allowed {
let _ = bus
.publish(
&dest_uuid,
DeviceEvent::Message(msg.clone()),
)
.await;
} else {
warn!("MQTT: dropping unauthorized message from {} to {}", target_uuid, dest_uuid);
}
}
}
if params.is_broadcast() {
if let Ok(subs) = store.get_subscribers(
&target_uuid,
&freshblu_core::subscription::SubscriptionType::BroadcastSent,
).await {
for sub_uuid in subs {
let _ = bus.publish(&sub_uuid, DeviceEvent::Broadcast(msg.clone())).await;
}
}
}
}
}
"broadcast" => {
if let Ok(value) =
serde_json::from_slice::<serde_json::Value>(&payload)
{
let msg = freshblu_core::message::Message {
devices: vec!["*".to_string()],
from_uuid: Some(target_uuid),
topic: None,
payload: Some(value),
metadata: None,
extra: HashMap::new(),
};
if let Ok(subs) = store.get_subscribers(
&target_uuid,
&freshblu_core::subscription::SubscriptionType::BroadcastSent,
).await {
let event = DeviceEvent::Broadcast(msg);
for sub_uuid in subs {
let _ = bus.publish(&sub_uuid, event.clone()).await;
}
}
}
}
_ => {
warn!("Unknown MQTT event type: {}", event_type);
}
}
}
}
}
Ok(None) => continue,
Err(e) => {
error!("MQTT bridge link error: {:?}", e);
break;
}
}
}
});
info!("MQTT broker running at mqtt://0.0.0.0:{}", self.port);
info!("MQTT auth: username=device-uuid, password=device-token");
info!("MQTT topics: {{uuid}}/message, {{uuid}}/broadcast, {{uuid}}/config");
Ok(())
}
}
/// Parse a Meshblu MQTT topic
/// Format: <device_uuid>/<event_type>
pub fn parse_mqtt_topic(topic: &str) -> Option<(Uuid, &str)> {
let mut parts = topic.splitn(2, '/');
let uuid_str = parts.next()?;
let event_type = parts.next().unwrap_or("message");
let uuid = Uuid::parse_str(uuid_str).ok()?;
Some((uuid, event_type))
}
/// Map MQTT client ID to device UUID
/// Meshblu uses the device UUID as the MQTT client ID
pub fn mqtt_client_id_to_uuid(client_id: &str) -> Option<Uuid> {
Uuid::parse_str(client_id).ok()
}