1#[cfg(feature="rumqttd")]
5use crate::first_addr;
6
7#[cfg(feature="rumqttc")]
8use crate::VVal;
9#[allow(unused_imports)]
10use crate::{Env, StackAction};
11#[allow(unused_imports)]
12use crate::vval::{VValFun, VValUserData};
13#[cfg(feature="rumqttc")]
14use rumqttc::{MqttOptions, Client, QoS, Event, Packet};
15#[cfg(feature="rumqttd")]
16use librumqttd::{Broker, Config};
17use crate::compiler::*;
18
19#[cfg(feature="rumqttc")]
20use std::sync::{Arc, Mutex};
21
22#[cfg(feature="rumqttc")]
23struct ThreadClientHandle {
24 client: Option<Client>,
25 subscribe: Vec<String>,
26}
27
28#[cfg(feature="rumqttc")]
29impl ThreadClientHandle {
30 fn with_client<F: FnMut(&mut Client) -> Result<(), rumqttc::ClientError>>(&mut self, mut fun: F) -> Result<(), DetClientError> {
31 if let Some(client) = self.client.as_mut() {
32 match fun(client) {
33 Ok(()) => Ok(()),
34 Err(e) => Err(DetClientError::ClientError(e)),
35 }
36 } else {
37 Err(DetClientError::NotConnected)
38 }
39 }
40}
41
42#[cfg(feature="rumqttc")]
43#[derive(Debug)]
44pub enum DetClientError {
45 NotConnected,
46 ClientError(rumqttc::ClientError),
47}
48
49#[cfg(feature="rumqttc")]
50impl std::fmt::Display for DetClientError {
51 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52 match self {
53 DetClientError::NotConnected => write!(f, "Not Connected"),
54 DetClientError::ClientError(err) =>
55 write!(f, "MQTT Client Error: {}", err),
56 }
57 }
58}
59
60#[cfg(feature="rumqttc")]
61#[derive(Clone)]
62struct DetachedMQTTClient {
63 options: MqttOptions,
64 chan: crate::threads::AValChannel,
65 client: Arc<Mutex<ThreadClientHandle>>,
66}
67
68#[cfg(feature="rumqttc")]
69impl DetachedMQTTClient {
70 pub fn new(chan: crate::threads::AValChannel, id: &str, host: &str, port: u16) -> Self {
71 let mut options = MqttOptions::new(id, host, port);
72 options.set_keep_alive(std::time::Duration::from_secs(5));
73 Self {
74 options,
75 chan,
76 client: Arc::new(Mutex::new(ThreadClientHandle {
77 client: None,
78 subscribe: vec![],
79 })),
80 }
81 }
82
83 pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<(), DetClientError> {
84 if let Ok(mut hdl) = self.client.lock() {
85 hdl.with_client(|cl| {
86 cl.publish(topic, QoS::AtLeastOnce, false, payload)
87 })
88
89 } else {
90 Err(DetClientError::NotConnected)
91 }
92 }
93
94 pub fn subscribe(&self, topic: &str) -> Result<(), DetClientError> {
95 if let Ok(mut hdl) = self.client.lock() {
96 hdl.subscribe.push(topic.to_string());
97 hdl.with_client(|cl| cl.subscribe(topic, QoS::AtLeastOnce))
98
99 } else {
100 Err(DetClientError::NotConnected)
101 }
102 }
103
104 pub fn start(&mut self) {
105 let chan = self.chan.clone();
106 let client = self.client.clone();
107 let options = self.options.clone();
108
109 std::thread::spawn(move || {
110 loop {
111 let mut con = None;
112
113 if let Ok(mut hdl) = client.lock() {
114 let (client, connection) = Client::new(options.clone(), 25);
115 hdl.client = Some(client);
116
117 let mut retry = false;
118 let topics = hdl.subscribe.clone();
119 for topic in topics.iter() {
120 if let Err(e) =
121 hdl.client
122 .as_mut()
123 .unwrap()
124 .subscribe(topic, QoS::AtMostOnce)
125 {
126 chan.send(&VVal::pair(
127 VVal::new_sym("$WL/error/subscribe"),
128 VVal::new_str_mv(format!("{}", e))));
129 retry = true;
130 break;
131 }
132 }
133
134 if retry {
135 hdl.client = None;
136 break;
137 }
138
139 con = Some(connection);
140 }
141
142 if let Some(mut connection) = con {
143 chan.send(&VVal::pair(
144 VVal::new_sym("$WL/connected"), VVal::None));
145
146 for noti in connection.iter() {
147 let noti =
148 match noti {
149 Err(e) => {
150 chan.send(&VVal::pair(
151 VVal::new_sym("$WL/error"),
152 VVal::new_str_mv(format!("{}", e))));
153 break;
154 },
155 Ok(noti) => noti,
156 };
157
158 match noti {
159 Event::Incoming(inc) => {
160 match inc {
161 Packet::Publish(pubpkt) => {
162 chan.send(&VVal::pair(
163 VVal::new_str_mv(pubpkt.topic),
164 VVal::new_byt(
165 pubpkt.payload.as_ref().to_vec())));
166 },
167 _ => { },
168 }
169 },
170 _ => { }
171 }
172 }
173 }
174
175 if let Ok(mut hdl) = client.lock() {
176 hdl.client = None;
177 }
178 std::thread::sleep(std::time::Duration::from_secs(5));
179 }
180 });
181 }
182}
183
184#[cfg(feature="rumqttc")]
185impl VValUserData for DetachedMQTTClient {
186 fn s(&self) -> String {
187 format!("$<DetachedMQTTClient>")
188 }
189 fn as_any(&mut self) -> &mut dyn std::any::Any { self }
190 fn clone_ud(&self) -> Box<dyn VValUserData> {
191 Box::new(self.clone())
192 }
193
194 fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
195 let argv = env.argv_ref();
196 match key {
197 "subscribe" => {
198 if argv.len() != 1 {
199 return
200 Err(StackAction::panic_str(
201 "subscribe method expects 1 argument".to_string(),
202 None,
203 env.argv()))
204 }
205
206 let ret = argv[0].with_s_ref(|s| self.subscribe(s));
207 match ret {
208 Ok(_) => Ok(VVal::Bol(true)),
209 Err(e) => Ok(env.new_err(format!("subscribe error: {}", e)))
210 }
211 },
212 "publish" => {
213 if argv.len() != 2 {
214 return
215 Err(StackAction::panic_str(
216 "publish method expects 2 argument".to_string(),
217 None,
218 env.argv()))
219 }
220
221 let ret =
222 argv[0].with_s_ref(|topic|
223 argv[1].with_bv_ref(|payload|
224 self.publish(topic, payload)));
225 match ret {
226 Ok(_) => Ok(VVal::Bol(true)),
227 Err(e) => Ok(env.new_err(format!("publish error: {}", e)))
228 }
229 },
230 _ => {
231 Err(StackAction::panic_str(
232 format!("unknown method called: {}", key),
233 None,
234 env.argv()))
235 },
236 }
237 }
238
239 fn as_thread_safe_usr(&mut self) -> Option<Box<dyn crate::threads::ThreadSafeUsr>> {
240 Some(Box::new(self.clone()))
241 }
242}
243
244#[cfg(feature="rumqttc")]
245impl crate::threads::ThreadSafeUsr for DetachedMQTTClient {
246 fn to_vval(&self) -> VVal {
247 VVal::Usr(Box::new(self.clone()))
248 }
249}
250
251#[cfg(feature="rumqttd")]
252#[derive(Clone)]
253struct MQTTBroker {
254 link_tx: Arc<Mutex<librumqttd::LinkTx>>,
255}
256
257#[cfg(feature="rumqttd")]
258fn cfg2broker_config(env: &mut Env, cfg: VVal) -> Result<Config, VVal> {
259let mut servers = std::collections::HashMap::new();
267 let listen = first_addr!(cfg.v_k("listen"), env)?;
268 let srv = librumqttd::ServerSettings {
269 listen,
270 next_connection_delay_ms: 1,
271 connections: librumqttd::ConnectionSettings {
272 connection_timeout_ms: 100,
273 max_client_id_len: 256,
274 throttle_delay_ms: 0,
275 max_payload_size: 10240,
276 max_inflight_count: 500,
277 max_inflight_size: 10240,
278 login_credentials: None,
279 },
280 cert: None,
281 };
282
283 servers.insert(format!("{}", 1), srv);
284
285 let cons_listen = first_addr!(cfg.v_k("console_listen"), env)?;
286
287 let config = Config {
288 id: cfg.v_ik("id") as usize,
289 servers,
290 cluster: None,
291 replicator: None,
292 console: librumqttd::ConsoleSettings {
293 listen: cons_listen,
294 },
295 router: Default::default(),
296 };
297
298 Ok(config)
299}
300
301#[cfg(feature="rumqttd")]
302#[allow(clippy::collapsible_else_if)]
303impl MQTTBroker {
304 pub fn setup(env: &mut Env, cfg: VVal) -> Result<Self, VVal> {
305 let link_cfg = cfg.v_k("link");
306 let config = cfg2broker_config(env, cfg)?;
307
308 let mut broker = Broker::new(config);
309
310 let client_id =
311 if link_cfg.v_k("client_id").is_some() {
312 link_cfg.v_s_rawk("client_id")
313 } else {
314 "wl_local".to_string()
315 };
316
317 let mut link =
318 match broker.link(&client_id) {
319 Ok(link) => link,
320 Err(e) => {
321 return Err(env.new_err(format!(
322 "mqtt:broker:setup: Could not create local client link: {}",
323 e)));
324 }
325 };
326
327 std::thread::spawn(move || {
328 broker.start().unwrap();
329 });
331
332 let chan =
333 if link_cfg.v_k("recv").is_some() {
334 let mut chan = link_cfg.v_k("recv");
335 let chan =
336 chan.with_usr_ref(|chan: &mut crate::threads::AValChannel| {
337 chan.fork_sender_direct()
338 });
339
340 if let Some(chan) = chan {
341 match chan {
342 Ok(chan) => Some(chan),
343 Err(err) => {
344 return
345 Err(VVal::err_msg(
346 &format!("Failed to fork sender, can't get lock: {}", err)));
347 }
348 }
349 } else {
350 return
351 Err(env.new_err(format!(
352 "mqtt:broker:setup: config.link.recv not a std:sync:mpsc handle! {}",
353 link_cfg.v_k("recv").s())));
354 }
355 } else {
356 None
357 };
358
359 let mut link_rx =
360 match link.connect(100) {
361 Ok(link_rx) => link_rx,
362 Err(e) => {
363 return
364 Err(env.new_err(format!(
365 "mqtt:broker:setup: config.link.recv could not setup a receiver link: {}",
366 e)));
367 },
368 };
369
370 if let Some(chan) = chan {
371 if link_cfg.v_k("topics").is_some() {
372 if let Some(err) = link_cfg.v_k("topics").with_iter(|it| {
373 for (v, _) in it {
374 if let Err(e) = link.subscribe(&v.s_raw()) {
375 return
376 Some(env.new_err(format!(
377 "mqtt:broker:setup: config.link.topics could not subscribe to '#': {}",
378 e)));
379 }
380 }
381 None
382 })
383 {
384 return Err(err);
385 }
386
387 } else {
388 if let Err(e) = link.subscribe("#") {
389 return
390 Err(env.new_err(format!(
391 "mqtt:broker:setup: config.link.topics could not subscribe to '#': {}",
392 e)));
393 }
394 }
395
396 std::thread::spawn(move || {
397 loop {
398 chan.send(&VVal::pair(
399 VVal::new_sym("$WL/connected"), VVal::None));
400
401 match link_rx.recv() {
402 Ok(Some(msg)) => {
403 let topic = VVal::new_str_mv(msg.topic);
404 for payl in msg.payload {
405 chan.send(&VVal::pair(
406 topic.clone(),
407 VVal::new_byt(payl.as_ref().to_vec())));
408 }
409
410 },
411 Ok(None) => (),
412 Err(e) => {
413 chan.send(&VVal::pair(
414 VVal::new_sym("$WL/error"),
415 VVal::new_str_mv(format!("{}", e))));
416 break;
417 },
418 }
419 }
420 });
421 } else {
422 std::thread::spawn(move || {
423 loop {
424 match link_rx.recv() {
425 Ok(_) => (),
426 Err(_) => { break; },
427 }
428 }
429 });
430 }
431
432 Ok(Self {
433 link_tx: Arc::new(Mutex::new(link)),
434 })
435 }
436}
437
438#[cfg(feature="rumqttd")]
439impl VValUserData for MQTTBroker {
440 fn s(&self) -> String {
441 format!("$<MQTTBroker>")
442 }
443 fn as_any(&mut self) -> &mut dyn std::any::Any { self }
444 fn clone_ud(&self) -> Box<dyn VValUserData> {
445 Box::new(self.clone())
446 }
447 fn as_thread_safe_usr(&mut self) -> Option<Box<dyn crate::threads::ThreadSafeUsr>> {
448 Some(Box::new(self.clone()))
449 }
450
451 fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
452 let argv = env.argv_ref();
453 match key {
454 "publish" => {
455 if argv.len() != 2 {
456 return
457 Err(StackAction::panic_str(
458 "publish method expects 2 arguments: (topic, payload)".to_string(),
459 None,
460 env.argv()))
461 }
462
463 if let Ok(mut link) = self.link_tx.lock() {
464 let ret =
465 env.arg(0).with_s_ref(|topic|
466 env.arg(1).with_bv_ref(|payload|
467 link.publish(topic, false, payload)));
468 match ret {
469 Ok(_) => Ok(VVal::Bol(true)),
470 Err(e) => Ok(env.new_err(format!("publish error: {}", e))),
471 }
472 } else {
473 Ok(env.new_err(format!("publish error: can't lock mutex!")))
474 }
475 },
476 _ => {
477 Err(StackAction::panic_str(
478 format!("unknown method called: {}", key),
479 None,
480 env.argv()))
481 },
482 }
483 }
484}
485
486#[cfg(feature="rumqttd")]
487impl crate::threads::ThreadSafeUsr for MQTTBroker {
488 fn to_vval(&self) -> VVal {
489 VVal::Usr(Box::new(MQTTBroker {
490 link_tx: self.link_tx.clone()
491 }))
492 }
493}
494
495#[allow(unused_variables)]
496pub fn add_to_symtable(st: &mut SymbolTable) {
497 #[cfg(feature="rumqttc")]
498 st.fun("mqtt:client:new", |env: &mut Env, _argc: usize| {
499 let mut chan = env.arg(0);
500 let chan =
501 chan.with_usr_ref(|chan: &mut crate::threads::AValChannel| {
502 chan.fork_sender_direct()
503 });
504
505 let chan =
506 if let Some(chan) = chan {
507 match chan {
508 Ok(chan) => Some(chan),
509 Err(err) => {
510 return
511 Ok(VVal::err_msg(
512 &format!("Failed to fork sender, can't get lock: {}", err)));
513 },
514 }
515 } else {
516 return
517 Ok(env.new_err(format!(
518 "mqtt:client:detached:new: First argument not a std:sync:mpsc handle! {}",
519 env.arg(0).s())));
520 };
521
522 let mut cl =
523 DetachedMQTTClient::new(
524 chan.unwrap(),
525 &env.arg(1).s_raw(),
526 &env.arg(2).s_raw(),
527 env.arg(3).i() as u16);
528 cl.start();
529 Ok(VVal::new_usr(cl))
530 }, Some(4), Some(4), false);
531
532 #[cfg(feature="rumqttd")]
533 st.fun("mqtt:broker:new", |env: &mut Env, _argc: usize| {
534 let config = env.arg(0);
535
536 match MQTTBroker::setup(env, config) {
537 Ok(broker) => Ok(VVal::new_usr(broker)),
538 Err(ev) => Ok(ev),
539 }
540 }, Some(1), Some(1), false);
541}
542
543