wlambda/stdlib/
mqtt.rs

1// Copyright (c) 2020-2022 Weird Constructor <weirdconstructor@gmail.com>
2// This is a part of WLambda. See README.md and COPYING for details.
3
4#[cfg(feature="rumqttd")]
5use crate::first_addr;
6
7#[cfg(feature="rumqttc")]
8use crate::VVal;
9#[allow(unused_imports)]
10use crate::{Env, StackAction};
11#[allow(unused_imports)]
12use crate::vval::{VValFun, VValUserData};
13#[cfg(feature="rumqttc")]
14use rumqttc::{MqttOptions, Client, QoS, Event, Packet};
15#[cfg(feature="rumqttd")]
16use librumqttd::{Broker, Config};
17use crate::compiler::*;
18
19#[cfg(feature="rumqttc")]
20use std::sync::{Arc, Mutex};
21
22#[cfg(feature="rumqttc")]
23struct ThreadClientHandle {
24    client:      Option<Client>,
25    subscribe:   Vec<String>,
26}
27
28#[cfg(feature="rumqttc")]
29impl ThreadClientHandle {
30    fn with_client<F: FnMut(&mut Client) -> Result<(), rumqttc::ClientError>>(&mut self, mut fun: F) -> Result<(), DetClientError> {
31        if let Some(client) = self.client.as_mut() {
32            match fun(client) {
33                Ok(()) => Ok(()),
34                Err(e) => Err(DetClientError::ClientError(e)),
35            }
36        } else {
37            Err(DetClientError::NotConnected)
38        }
39    }
40}
41
42#[cfg(feature="rumqttc")]
43#[derive(Debug)]
44pub enum DetClientError {
45    NotConnected,
46    ClientError(rumqttc::ClientError),
47}
48
49#[cfg(feature="rumqttc")]
50impl std::fmt::Display for DetClientError {
51    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52        match self {
53            DetClientError::NotConnected => write!(f, "Not Connected"),
54            DetClientError::ClientError(err) =>
55                write!(f, "MQTT Client Error: {}", err),
56        }
57    }
58}
59
60#[cfg(feature="rumqttc")]
61#[derive(Clone)]
62struct DetachedMQTTClient {
63    options:     MqttOptions,
64    chan:        crate::threads::AValChannel,
65    client:      Arc<Mutex<ThreadClientHandle>>,
66}
67
68#[cfg(feature="rumqttc")]
69impl DetachedMQTTClient {
70    pub fn new(chan: crate::threads::AValChannel, id: &str, host: &str, port: u16) -> Self {
71        let mut options = MqttOptions::new(id, host, port);
72        options.set_keep_alive(std::time::Duration::from_secs(5));
73        Self {
74            options,
75            chan,
76            client: Arc::new(Mutex::new(ThreadClientHandle {
77                client:    None,
78                subscribe: vec![],
79            })),
80        }
81    }
82
83    pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<(), DetClientError> {
84        if let Ok(mut hdl) = self.client.lock() {
85            hdl.with_client(|cl| {
86                cl.publish(topic, QoS::AtLeastOnce, false, payload)
87            })
88
89        } else {
90            Err(DetClientError::NotConnected)
91        }
92    }
93
94    pub fn subscribe(&self, topic: &str) -> Result<(), DetClientError> {
95        if let Ok(mut hdl) = self.client.lock() {
96            hdl.subscribe.push(topic.to_string());
97            hdl.with_client(|cl| cl.subscribe(topic, QoS::AtLeastOnce))
98
99        } else {
100            Err(DetClientError::NotConnected)
101        }
102    }
103
104    pub fn start(&mut self) {
105        let chan    = self.chan.clone();
106        let client  = self.client.clone();
107        let options = self.options.clone();
108
109        std::thread::spawn(move || {
110            loop {
111                let mut con = None;
112
113                if let Ok(mut hdl) = client.lock() {
114                    let (client, connection) = Client::new(options.clone(), 25);
115                    hdl.client = Some(client);
116
117                    let mut retry = false;
118                    let topics = hdl.subscribe.clone();
119                    for topic in topics.iter() {
120                        if let Err(e) =
121                            hdl.client
122                                .as_mut()
123                                .unwrap()
124                                .subscribe(topic, QoS::AtMostOnce)
125                        {
126                            chan.send(&VVal::pair(
127                                VVal::new_sym("$WL/error/subscribe"),
128                                VVal::new_str_mv(format!("{}", e))));
129                            retry = true;
130                            break;
131                        }
132                    }
133
134                    if retry {
135                        hdl.client = None;
136                        break;
137                    }
138
139                    con = Some(connection);
140                }
141
142                if let Some(mut connection) = con {
143                    chan.send(&VVal::pair(
144                        VVal::new_sym("$WL/connected"), VVal::None));
145
146                    for noti in connection.iter() {
147                        let noti =
148                            match noti {
149                                Err(e) => {
150                                    chan.send(&VVal::pair(
151                                        VVal::new_sym("$WL/error"),
152                                        VVal::new_str_mv(format!("{}", e))));
153                                    break;
154                                },
155                                Ok(noti) => noti,
156                            };
157
158                        match noti {
159                            Event::Incoming(inc) => {
160                                match inc {
161                                    Packet::Publish(pubpkt) => {
162                                        chan.send(&VVal::pair(
163                                            VVal::new_str_mv(pubpkt.topic),
164                                            VVal::new_byt(
165                                                pubpkt.payload.as_ref().to_vec())));
166                                    },
167                                    _ => { },
168                                }
169                            },
170                            _ => { }
171                        }
172                    }
173                }
174
175                if let Ok(mut hdl) = client.lock() {
176                    hdl.client = None;
177                }
178                std::thread::sleep(std::time::Duration::from_secs(5));
179            }
180        });
181    }
182}
183
184#[cfg(feature="rumqttc")]
185impl VValUserData for DetachedMQTTClient {
186    fn s(&self) -> String {
187        format!("$<DetachedMQTTClient>")
188    }
189    fn as_any(&mut self) -> &mut dyn std::any::Any { self }
190    fn clone_ud(&self) -> Box<dyn VValUserData> {
191        Box::new(self.clone())
192    }
193
194    fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
195        let argv = env.argv_ref();
196        match key {
197            "subscribe" => {
198                if argv.len() != 1 {
199                    return
200                        Err(StackAction::panic_str(
201                            "subscribe method expects 1 argument".to_string(),
202                            None,
203                            env.argv()))
204                }
205
206                let ret = argv[0].with_s_ref(|s| self.subscribe(s));
207                match ret {
208                    Ok(_)  => Ok(VVal::Bol(true)),
209                    Err(e) => Ok(env.new_err(format!("subscribe error: {}", e)))
210                }
211            },
212            "publish" => {
213                if argv.len() != 2 {
214                    return
215                        Err(StackAction::panic_str(
216                            "publish method expects 2 argument".to_string(),
217                            None,
218                            env.argv()))
219                }
220
221                let ret =
222                    argv[0].with_s_ref(|topic|
223                        argv[1].with_bv_ref(|payload|
224                            self.publish(topic, payload)));
225                match ret {
226                    Ok(_)  => Ok(VVal::Bol(true)),
227                    Err(e) => Ok(env.new_err(format!("publish error: {}", e)))
228                }
229            },
230            _ => {
231                Err(StackAction::panic_str(
232                    format!("unknown method called: {}", key),
233                    None,
234                    env.argv()))
235            },
236        }
237    }
238
239    fn as_thread_safe_usr(&mut self) -> Option<Box<dyn crate::threads::ThreadSafeUsr>> {
240        Some(Box::new(self.clone()))
241    }
242}
243
244#[cfg(feature="rumqttc")]
245impl crate::threads::ThreadSafeUsr for DetachedMQTTClient {
246    fn to_vval(&self) -> VVal {
247        VVal::Usr(Box::new(self.clone()))
248    }
249}
250
251#[cfg(feature="rumqttd")]
252#[derive(Clone)]
253struct MQTTBroker {
254    link_tx: Arc<Mutex<librumqttd::LinkTx>>,
255}
256
257#[cfg(feature="rumqttd")]
258fn cfg2broker_config(env: &mut Env, cfg: VVal) -> Result<Config, VVal>  {
259//    let servers_cfg = cfg.v_k("servers");
260
261//    servers_cfg.with_iter(|it| {
262//        let mut i = 0;
263//        for (v, k) in it {
264//            i += 1;
265
266    let mut servers = std::collections::HashMap::new();
267    let listen = first_addr!(cfg.v_k("listen"), env)?;
268    let srv = librumqttd::ServerSettings {
269        listen,
270        next_connection_delay_ms: 1,
271        connections: librumqttd::ConnectionSettings {
272            connection_timeout_ms: 100,
273            max_client_id_len:     256,
274            throttle_delay_ms:     0,
275            max_payload_size:      10240,
276            max_inflight_count:    500,
277            max_inflight_size:     10240,
278            login_credentials: None,
279        },
280        cert: None,
281    };
282
283    servers.insert(format!("{}", 1), srv);
284
285    let cons_listen = first_addr!(cfg.v_k("console_listen"), env)?;
286
287    let config = Config {
288        id: cfg.v_ik("id") as usize,
289        servers,
290        cluster: None,
291        replicator: None,
292        console: librumqttd::ConsoleSettings {
293            listen: cons_listen,
294        },
295        router: Default::default(),
296    };
297
298    Ok(config)
299}
300
301#[cfg(feature="rumqttd")]
302#[allow(clippy::collapsible_else_if)]
303impl MQTTBroker {
304    pub fn setup(env: &mut Env, cfg: VVal) -> Result<Self, VVal> {
305        let link_cfg = cfg.v_k("link");
306        let config   = cfg2broker_config(env, cfg)?;
307
308        let mut broker = Broker::new(config);
309
310        let client_id =
311            if link_cfg.v_k("client_id").is_some() {
312                link_cfg.v_s_rawk("client_id")
313            } else {
314                "wl_local".to_string()
315            };
316
317        let mut link =
318            match broker.link(&client_id) {
319                Ok(link) => link,
320                Err(e) => {
321                    return Err(env.new_err(format!(
322                        "mqtt:broker:setup: Could not create local client link: {}",
323                        e)));
324                }
325            };
326
327        std::thread::spawn(move || {
328            broker.start().unwrap();
329            // TODO: Log errors?!
330        });
331
332        let chan =
333            if link_cfg.v_k("recv").is_some() {
334                let mut chan = link_cfg.v_k("recv");
335                let chan =
336                    chan.with_usr_ref(|chan: &mut crate::threads::AValChannel| {
337                        chan.fork_sender_direct()
338                    });
339
340                if let Some(chan) = chan {
341                   match chan {
342                        Ok(chan) => Some(chan),
343                        Err(err) => {
344                            return
345                                Err(VVal::err_msg(
346                                    &format!("Failed to fork sender, can't get lock: {}", err)));
347                        }
348                   }
349                } else {
350                    return
351                        Err(env.new_err(format!(
352                            "mqtt:broker:setup: config.link.recv not a std:sync:mpsc handle! {}",
353                            link_cfg.v_k("recv").s())));
354                }
355            } else {
356                None
357            };
358
359        let mut link_rx =
360            match link.connect(100) {
361                Ok(link_rx) => link_rx,
362                Err(e) => {
363                    return
364                        Err(env.new_err(format!(
365                            "mqtt:broker:setup: config.link.recv could not setup a receiver link: {}",
366                            e)));
367                },
368            };
369
370        if let Some(chan) = chan {
371            if link_cfg.v_k("topics").is_some() {
372                if let Some(err) = link_cfg.v_k("topics").with_iter(|it| {
373                        for (v, _) in it {
374                            if let Err(e) = link.subscribe(&v.s_raw()) {
375                                return
376                                    Some(env.new_err(format!(
377                                        "mqtt:broker:setup: config.link.topics could not subscribe to '#': {}",
378                                        e)));
379                            }
380                        }
381                        None
382                    })
383                {
384                    return Err(err);
385                }
386
387            } else {
388                if let Err(e) = link.subscribe("#") {
389                    return
390                        Err(env.new_err(format!(
391                            "mqtt:broker:setup: config.link.topics could not subscribe to '#': {}",
392                            e)));
393                }
394            }
395
396            std::thread::spawn(move || {
397                loop {
398                    chan.send(&VVal::pair(
399                        VVal::new_sym("$WL/connected"), VVal::None));
400
401                    match link_rx.recv() {
402                        Ok(Some(msg)) => {
403                            let topic = VVal::new_str_mv(msg.topic);
404                            for payl in msg.payload {
405                                chan.send(&VVal::pair(
406                                    topic.clone(),
407                                    VVal::new_byt(payl.as_ref().to_vec())));
408                            }
409
410                        },
411                        Ok(None) => (),
412                        Err(e) => {
413                            chan.send(&VVal::pair(
414                                VVal::new_sym("$WL/error"),
415                                VVal::new_str_mv(format!("{}", e))));
416                            break;
417                        },
418                    }
419                }
420            });
421        } else {
422            std::thread::spawn(move || {
423                loop {
424                    match link_rx.recv() {
425                        Ok(_) => (),
426                        Err(_) => { break; },
427                    }
428                }
429            });
430        }
431
432        Ok(Self {
433            link_tx: Arc::new(Mutex::new(link)),
434        })
435    }
436}
437
438#[cfg(feature="rumqttd")]
439impl VValUserData for MQTTBroker {
440    fn s(&self) -> String {
441        format!("$<MQTTBroker>")
442    }
443    fn as_any(&mut self) -> &mut dyn std::any::Any { self }
444    fn clone_ud(&self) -> Box<dyn VValUserData> {
445        Box::new(self.clone())
446    }
447    fn as_thread_safe_usr(&mut self) -> Option<Box<dyn crate::threads::ThreadSafeUsr>> {
448        Some(Box::new(self.clone()))
449    }
450
451    fn call_method(&self, key: &str, env: &mut Env) -> Result<VVal, StackAction> {
452        let argv = env.argv_ref();
453        match key {
454            "publish" => {
455                if argv.len() != 2 {
456                    return
457                        Err(StackAction::panic_str(
458                            "publish method expects 2 arguments: (topic, payload)".to_string(),
459                            None,
460                            env.argv()))
461                }
462
463                if let Ok(mut link) = self.link_tx.lock() {
464                    let ret =
465                        env.arg(0).with_s_ref(|topic|
466                            env.arg(1).with_bv_ref(|payload|
467                                link.publish(topic, false, payload)));
468                    match ret {
469                        Ok(_)  => Ok(VVal::Bol(true)),
470                        Err(e) => Ok(env.new_err(format!("publish error: {}", e))),
471                    }
472                } else {
473                    Ok(env.new_err(format!("publish error: can't lock mutex!")))
474                }
475            },
476            _ => {
477                Err(StackAction::panic_str(
478                    format!("unknown method called: {}", key),
479                    None,
480                    env.argv()))
481            },
482        }
483    }
484}
485
486#[cfg(feature="rumqttd")]
487impl crate::threads::ThreadSafeUsr for MQTTBroker {
488    fn to_vval(&self) -> VVal {
489        VVal::Usr(Box::new(MQTTBroker {
490            link_tx: self.link_tx.clone()
491        }))
492    }
493}
494
495#[allow(unused_variables)]
496pub fn add_to_symtable(st: &mut SymbolTable) {
497    #[cfg(feature="rumqttc")]
498    st.fun("mqtt:client:new", |env: &mut Env, _argc: usize| {
499        let mut chan = env.arg(0);
500        let chan =
501            chan.with_usr_ref(|chan: &mut crate::threads::AValChannel| {
502                chan.fork_sender_direct()
503            });
504
505        let chan =
506            if let Some(chan) = chan {
507               match chan {
508                    Ok(chan) => Some(chan),
509                    Err(err) => {
510                        return
511                            Ok(VVal::err_msg(
512                                &format!("Failed to fork sender, can't get lock: {}", err)));
513                    },
514               }
515            } else {
516                return
517                    Ok(env.new_err(format!(
518                        "mqtt:client:detached:new: First argument not a std:sync:mpsc handle! {}",
519                        env.arg(0).s())));
520            };
521
522        let mut cl =
523            DetachedMQTTClient::new(
524                chan.unwrap(),
525                &env.arg(1).s_raw(),
526                &env.arg(2).s_raw(),
527                env.arg(3).i() as u16);
528        cl.start();
529        Ok(VVal::new_usr(cl))
530    }, Some(4), Some(4), false);
531
532    #[cfg(feature="rumqttd")]
533    st.fun("mqtt:broker:new", |env: &mut Env, _argc: usize| {
534        let config = env.arg(0);
535
536        match MQTTBroker::setup(env, config) {
537            Ok(broker) => Ok(VVal::new_usr(broker)),
538            Err(ev)    => Ok(ev),
539        }
540    }, Some(1), Some(1), false);
541}
542
543/*
544
545
546!chan = std:sync:mpsc:new[];
547
548!broker = std:mqtt:broker:new ${ ... config here ..., link = "client_id" };
549!link = broker.get_link[];
550link.subscribe "test/me";
551link.publish "test/me" $b"payload";
552
553while $t {
554    std:displayln chan.recv[];
555}
556
557
558*/