mosquitto_plugin/
lib.rs

1pub mod mosquitto_calls;
2pub mod mosquitto_dev;
3
4pub use mosquitto_dev::*;
5
6use std::collections::HashMap;
7use std::convert::From;
8use std::ffi::CString;
9use std::fmt;
10
11pub mod dynlib;
12
13pub use dynlib::*;
14pub use libc;
15use std::net::IpAddr;
16use std::str::FromStr;
17
18pub type MosquittoOpt<'a> = HashMap<&'a str, &'a str>;
19
20// parses the pointers given by mosquitto into a rust native structure
21pub fn __from_ptr_and_size<'a>(opts: *mut mosquitto_opt, count: usize) -> MosquittoOpt<'a> {
22    let mut map = HashMap::new();
23    // Yep, raw pointer values
24    let optsval = opts as usize;
25    for i in 0..count {
26        // manually increment the pointers according to the coun value
27        let opt = unsafe {
28            let opt = optsval + i as usize * std::mem::size_of::<mosquitto_opt>();
29            (opt as *mut mosquitto_opt)
30                .as_ref()
31                .expect("Failed to extract from ptr and size")
32        };
33
34        // get a reference, and then use this to parse the values into owned strings
35        let key: &str = unsafe {
36            let c_str = std::ffi::CStr::from_ptr(opt.key);
37            c_str.to_str().expect("Failed to create string from CStr")
38        };
39        let value: &str = unsafe {
40            let c_str = std::ffi::CStr::from_ptr(opt.value);
41            c_str.to_str().expect("Failed to create string from CStr")
42        };
43        map.insert(key, value);
44    }
45
46    map
47}
48
49#[repr(C)]
50#[derive(Debug, Copy, Clone, Eq, PartialEq)]
51pub enum AccessLevel {
52    None = 0,
53    Read = 1,
54    Write = 2,
55    Subscribe = 4,
56    Unsubscribe = 8,
57    Unknown,
58}
59
60impl From<i32> for AccessLevel {
61    fn from(level: i32) -> AccessLevel {
62        match level {
63            0 => AccessLevel::None,
64            1 => AccessLevel::Read,
65            2 => AccessLevel::Write,
66            4 => AccessLevel::Subscribe,
67            8 => AccessLevel::Unsubscribe,
68            _ => AccessLevel::Unknown,
69        }
70    }
71}
72
73impl std::fmt::Display for AccessLevel {
74    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75        write!(f, "{:?}", self)
76    }
77}
78
79#[repr(C)]
80#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
81pub enum AclCheckAccessLevel {
82    Read = 1,
83    Write = 2,
84    Subscribe = 4,
85    Unsubscribe = 8,
86}
87
88impl std::fmt::Display for AclCheckAccessLevel {
89    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90        write!(f, "{:?}", self)
91    }
92}
93
94impl From<AccessLevel> for Option<AclCheckAccessLevel> {
95    fn from(level: AccessLevel) -> Option<AclCheckAccessLevel> {
96        match level {
97            AccessLevel::Read => Some(AclCheckAccessLevel::Read),
98            AccessLevel::Write => Some(AclCheckAccessLevel::Write),
99            AccessLevel::Subscribe => Some(AclCheckAccessLevel::Subscribe),
100            AccessLevel::Unsubscribe => Some(AclCheckAccessLevel::Unsubscribe),
101            _ => None,
102        }
103    }
104}
105
106#[repr(C)]
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub enum Error {
109    AuthContinue(Vec<u8>),
110    NoSubscriber,
111    SubExists,
112    ConnPending,
113    NoMem,
114    Protocol,
115    Inval,
116    NoConn,
117    ConnRefused,
118    NotFound,
119    ConnLost,
120    Tls,
121    PayloadSize,
122    NotSupported,
123    Auth,
124    AclDenied,
125    Unknown,
126    Errno,
127    Eai,
128    Proxy,
129    PluginDefer,
130    MalformedUtf8,
131    Keepalive,
132    Lookup,
133    MalformedPacket,
134    DuplicateProperty,
135    TlsHandshake,
136    QosNotSupported,
137    OversizePacket,
138    OCSP,
139    Timeout,
140    RetainNotSupported,
141    TopicAliasInvalid,
142    AdministrativeAction,
143    AlreadyExists,
144}
145
146impl From<Error> for i32 {
147    fn from(e: Error) -> i32 {
148        match e {
149            Error::AuthContinue(_) => -4,
150            Error::NoSubscriber => -3,
151            Error::SubExists => -2,
152            Error::ConnPending => -1,
153            Error::NoMem => 1,
154            Error::Protocol => 2,
155            Error::Inval => 3,
156            Error::NoConn => 4,
157            Error::ConnRefused => 5,
158            Error::NotFound => 6,
159            Error::ConnLost => 7,
160            Error::Tls => 8,
161            Error::PayloadSize => 9,
162            Error::NotSupported => 10,
163            Error::Auth => 11,
164            Error::AclDenied => 12,
165            Error::Unknown => 13,
166            Error::Errno => 14,
167            Error::Eai => 15,
168            Error::Proxy => 16,
169            Error::PluginDefer => 17,
170            Error::MalformedUtf8 => 18,
171            Error::Keepalive => 19,
172            Error::Lookup => 20,
173            Error::MalformedPacket => 21,
174            Error::DuplicateProperty => 22,
175            Error::TlsHandshake => 23,
176            Error::QosNotSupported => 24,
177            Error::OversizePacket => 25,
178            Error::OCSP => 26,
179            Error::Timeout => 27,
180            Error::RetainNotSupported => 28,
181            Error::TopicAliasInvalid => 29,
182            Error::AdministrativeAction => 30,
183            Error::AlreadyExists => 31,
184        }
185    }
186}
187
188impl From<i32> for Error {
189    fn from(error: i32) -> Self {
190        match error {
191            -4 => Error::AuthContinue(Vec::with_capacity(0)),
192            -3 => Error::NoSubscriber,
193            -2 => Error::SubExists,
194            -1 => Error::ConnPending,
195            1 => Error::NoMem,
196            2 => Error::Protocol,
197            3 => Error::Inval,
198            4 => Error::NoConn,
199            5 => Error::ConnRefused,
200            6 => Error::NotFound,
201            7 => Error::ConnLost,
202            8 => Error::Tls,
203            9 => Error::PayloadSize,
204            10 => Error::NotSupported,
205            11 => Error::Auth,
206            12 => Error::AclDenied,
207            13 => Error::Unknown,
208            14 => Error::Errno,
209            15 => Error::Eai,
210            16 => Error::Proxy,
211            17 => Error::PluginDefer,
212            18 => Error::MalformedUtf8,
213            19 => Error::Keepalive,
214            20 => Error::Lookup,
215            21 => Error::MalformedPacket,
216            22 => Error::DuplicateProperty,
217            23 => Error::TlsHandshake,
218            24 => Error::QosNotSupported,
219            25 => Error::OversizePacket,
220            26 => Error::OCSP,
221            27 => Error::Timeout,
222            28 => Error::RetainNotSupported,
223            29 => Error::TopicAliasInvalid,
224            30 => Error::AdministrativeAction,
225            31 => Error::AlreadyExists,
226            _ => Error::Unknown,
227        }
228    }
229}
230
231#[derive(Debug, Copy, Clone, PartialEq, Eq)]
232pub struct Success;
233
234impl From<Success> for i32 {
235    fn from(_: Success) -> i32 {
236        0
237    }
238}
239
240// #[repr(C)]
241// #[derive(Debug)]
242// pub enum QoS {
243//     AtMostOnce = 0,
244//     AtLeastOnce = 1,
245//     ExactlyOnce = 2,
246// }
247
248// impl QoS {
249//     pub fn from_num(n: i32) -> Self {
250//         match n {
251//             2 => QoS::ExactlyOnce,
252//             1 => QoS::AtLeastOnce,
253//             _ => QoS::AtMostOnce,
254//         }
255//     }
256// }
257
258#[derive(Debug)]
259pub struct MosquittoMessage<'a> {
260    pub topic: &'a str,
261    pub payload: &'a [u8],
262    pub qos: i32,
263    pub retain: bool,
264}
265
266pub enum QOS {
267    AtMostOnce,
268    AtLeastOnce,
269    ExactlyOnce,
270}
271
272impl QOS {
273    pub fn to_i32(&self) -> i32 {
274        match self {
275            QOS::AtMostOnce => 0,
276            QOS::AtLeastOnce => 1,
277            QOS::ExactlyOnce => 2,
278        }
279    }
280}
281
282pub enum MosquittoClientProtocol {
283    Mqtt,
284    MqttSn,
285    Websockets,
286}
287
288pub enum MosquittoClientProtocolVersion {
289    V3,
290    V4,
291    V5,
292}
293
294pub trait MosquittoClientContext {
295    /// Binding to mosquitto_client_address
296
297    /// NOTE: stored sessions might be disconnected upon a restart, and then the client being
298    /// disconnected will have no IP address, the address will then be of type None
299    fn get_address(&self) -> Option<std::net::IpAddr>;
300    /// Binding to mosquitto_client_clean_session
301    fn is_clean_session(&self) -> bool;
302    /// Binding to mosquitto_client_id
303    fn get_id(&self) -> Option<String>;
304    /// Binding to mosquitto_client_keepalive
305    fn get_keepalive(&self) -> i32;
306    /// Binding to mosquitto_client_certificate
307    fn get_certificate(&self) -> Option<&[u8]>;
308    // TODO replace with a reasonable return type from another lib. openssl or x509_parser maybe?
309    /// Binding to mosquitto_client_protocol
310    fn get_protocol(&self) -> MosquittoClientProtocol;
311    /// Binding to mosquitto_client_protocol_version
312    fn get_protocol_version(&self) -> MosquittoClientProtocolVersion;
313    /// Binding to mosquitto_client_sub_count
314    fn get_sub_count(&self) -> i32;
315    /// Binding to mosquitto_client_username
316    fn get_username(&self) -> String;
317    /// Binding to mosquitto_set_username
318    /// Error is either NoMem or Inval
319    fn set_username(&self, username: String) -> Result<Success, Error>;
320}
321
322pub struct MosquittoClient {
323    pub client: *mut mosquitto,
324}
325
326impl MosquittoClientContext for MosquittoClient {
327    fn get_address(&self) -> Option<IpAddr> {
328        unsafe {
329            debug_assert!(!self.client.is_null(), "get_address: self client is null");
330            let address = mosquitto_client_address(self.client);
331            if address.is_null() {
332                None
333            } else {
334                let c_str = std::ffi::CStr::from_ptr(address);
335                let str = c_str.to_str().expect("Couldn't convert CStr to &str"); // TODO should we avoid expect here and instead return Option<String>?
336                Some(IpAddr::from_str(str).expect("Couldn't parse ip"))
337            }
338        }
339    }
340
341    fn is_clean_session(&self) -> bool {
342        debug_assert!(
343            !self.client.is_null(),
344            "is_clean_session: self client is null"
345        );
346        unsafe { mosquitto_client_clean_session(self.client) }
347    }
348
349    fn get_id(&self) -> Option<String> {
350        debug_assert!(!self.client.is_null(), "get_id: self client is null");
351        unsafe {
352            let client_id = mosquitto_client_id(self.client);
353
354            if client_id.is_null() {
355                None
356            } else {
357                let c_str = std::ffi::CStr::from_ptr(client_id);
358                let r_string = c_str
359                    .to_str()
360                    .expect("Couldn't convert CStr to &str")
361                    .to_string(); // TODO should we avoid expect here and instead return Option<String>?
362                Some(r_string)
363            }
364        }
365    }
366
367    fn get_keepalive(&self) -> i32 {
368        debug_assert!(!self.client.is_null(), "get_keepalive: self client is null");
369        unsafe { mosquitto_client_keepalive(self.client) }
370    }
371
372    fn get_certificate(&self) -> Option<&[u8]> {
373        unimplemented!()
374    }
375
376    fn get_protocol(&self) -> MosquittoClientProtocol {
377        debug_assert!(!self.client.is_null(), "get_protocol: self client is null");
378        unsafe {
379            let protocol = mosquitto_client_protocol(self.client) as u32;
380            if protocol == mosquitto_protocol_mp_mqtt {
381                MosquittoClientProtocol::Mqtt
382            } else if protocol == mosquitto_protocol_mp_mqttsn {
383                MosquittoClientProtocol::MqttSn
384            } else if protocol == mosquitto_protocol_mp_websockets {
385                MosquittoClientProtocol::Websockets
386            } else {
387                // TODO either we panic here. Or we need to return a result/option
388                // The benefit of returning the result/option would be to let library-user-space
389                // gracefully shutdown. Which would be preferable.
390
391                panic!(
392                    "mosquitto_client_protocol returned invalid protocol {}",
393                    protocol
394                );
395            }
396        }
397    }
398
399    fn get_protocol_version(&self) -> MosquittoClientProtocolVersion {
400        debug_assert!(
401            !self.client.is_null(),
402            "get_protocol_version: self client is null"
403        );
404        unsafe {
405            let protocol_version = mosquitto_client_protocol_version(self.client);
406            match protocol_version {
407                3 => MosquittoClientProtocolVersion::V3,
408                4 => MosquittoClientProtocolVersion::V4,
409                5 => MosquittoClientProtocolVersion::V5,
410                _ => panic!(
411                    "invalid mosquitto client protocol version returned from mosquitto_client_protocol_version. {}",
412                    protocol_version
413                ),
414            }
415        }
416    }
417
418    fn get_sub_count(&self) -> i32 {
419        debug_assert!(!self.client.is_null(), "get_sub_count: self client is null");
420        unsafe { mosquitto_client_sub_count(self.client) as i32 }
421    }
422
423    fn get_username(&self) -> String {
424        debug_assert!(!self.client.is_null(), "get_username: self client is null");
425        unsafe {
426            let username = mosquitto_client_username(self.client);
427            let c_str = std::ffi::CStr::from_ptr(username);
428            c_str
429                .to_str()
430                .expect("Couldn't convert CStr to &str")
431                .to_string() // TODO should we avoid expect here and instead return Option<String>?
432        }
433    }
434
435    fn set_username(&self, username: String) -> Result<Success, Error> {
436        debug_assert!(!self.client.is_null(), "set_username: self client is null");
437        unsafe {
438            let c_string = &CString::new(username).expect("no cstring for u");
439            let res = mosquitto_set_username(self.client, c_string.as_c_str().as_ptr());
440            match res {
441                0 => Ok(Success),
442                1 => Err(Error::NoMem),
443                3 => Err(Error::Inval),
444                _ => Err(Error::Unknown), // Any other number returned from set_username is undefined behaviour
445            }
446        }
447    }
448}
449
450#[derive(Debug)]
451pub enum MosquittoPluginEvent {
452    MosqEvtReload = 1,
453    MosqEvtAclCheck = 2,
454    MosqEvtBasicAuth = 3,
455    MosqEvtExtAuthStart = 4,
456    MosqEvtExtAuthContinue = 5,
457    MosqEvtControl = 6,
458    MosqEvtMessage = 7,
459    MosqEvtPskKey = 8,
460    MosqEvtTick = 9,
461    MosqEvtDisconnect = 10,
462    Unknown = -1,
463}
464
465impl From<MosquittoPluginEvent> for i32 {
466    fn from(it: MosquittoPluginEvent) -> i32 {
467        match it {
468            MosquittoPluginEvent::MosqEvtReload => 1,
469            MosquittoPluginEvent::MosqEvtAclCheck => 2,
470            MosquittoPluginEvent::MosqEvtBasicAuth => 3,
471            MosquittoPluginEvent::MosqEvtExtAuthStart => 4,
472            MosquittoPluginEvent::MosqEvtExtAuthContinue => 5,
473            MosquittoPluginEvent::MosqEvtControl => 6,
474            MosquittoPluginEvent::MosqEvtMessage => 7,
475            MosquittoPluginEvent::MosqEvtPskKey => 8,
476            MosquittoPluginEvent::MosqEvtTick => 9,
477            MosquittoPluginEvent::MosqEvtDisconnect => 10,
478            MosquittoPluginEvent::Unknown => -1,
479        }
480    }
481}
482
483pub trait MosquittoPlugin {
484    /// This will be run once on every startup, or load, and will allocate the structure, to be
485    /// reconstructed in other calls to the plugin.
486    ///
487    /// This requires unsafe usage due to nature of C calls
488    fn init(opts: MosquittoOpt) -> Self;
489
490    /// Called when SIGHUP is sent to the broker PID
491    #[allow(unused)]
492    fn on_reload(&mut self, opts: MosquittoOpt) {}
493
494    /// Access level checks, default implementation always returns success
495    /// If all acl checks from all plugins returns defer the action should be allowed.
496    /// However that doesn't happen right now, if this returns Err(PluginDefer) for a write the message is not let through.
497    #[allow(unused)]
498    fn acl_check(
499        &mut self,
500        client: &dyn MosquittoClientContext,
501        acl: AclCheckAccessLevel,
502        msg: MosquittoMessage,
503    ) -> Result<Success, Error> {
504        Ok(Success)
505    }
506    #[allow(unused)]
507    /// Username and password checks, default implementation always returns success
508    fn username_password(
509        &mut self,
510        client: &dyn MosquittoClientContext,
511        username: Option<&str>,
512        password: Option<&str>,
513    ) -> Result<Success, Error> {
514        Ok(Success)
515    }
516
517    /// Authentication start. Default implementation always returns success. Return Err(Error::AuthContinue(_))
518    /// to send auth data to the client.
519    fn on_auth_start(
520        &mut self,
521        _client: &dyn MosquittoClientContext,
522        _method: Option<&str>,
523        _data: Option<&[u8]>,
524    ) -> Result<Success, Error> {
525        Ok(Success)
526    }
527
528    /// Authentication continue. Default implementation always returns success. Return `Err(Error::AuthContinue(_))`
529    /// to send auth data to the client or `Ok(Success)`.
530    fn on_auth_continue(
531        &mut self,
532        _client: &dyn MosquittoClientContext,
533        _method: Option<&str>,
534        _data: Option<&[u8]>,
535    ) -> Result<Success, Error> {
536        Ok(Success)
537    }
538
539    /// Tested unsuccessfully. Haven't gotten this to work yet.
540    /// Suspect it has something to do with how the mosquitto_callback_register is called with the event_data parameter
541    #[allow(unused)]
542    fn on_control(&mut self, client: &dyn MosquittoClientContext, message: MosquittoMessage) {}
543
544    /// Called when a message is sent on the broker.
545    /// The message has to pass the ACL check otherwise this callback will not be called.
546    #[allow(unused)]
547    fn on_message(&mut self, client: &dyn MosquittoClientContext, message: MosquittoMessage) {}
548
549    /// Untested
550    #[allow(unused)]
551    fn on_psk(
552        &mut self,
553        client: &dyn MosquittoClientContext,
554        hint: &str,
555        identity: &str,
556        key: &str,
557        max_key_len: i32,
558    ) -> i32 {
559        0
560    }
561
562    /// Called every 100 ms
563    /// All now_ns, next_ns, now_s, next_s parameters are always zero right now.
564    /// I'm not sure if it's a bug on this library's part or of mosquitto.
565    /// If you want to keep time you'll have to measure it yourself right now.
566    #[allow(unused)]
567    fn on_tick(&mut self, now_ns: i64, next_ns: i64, now_s: i32, next_s: i32) {}
568
569    #[allow(unused)]
570    fn on_disconnect(&mut self, client: &dyn MosquittoClientContext, reason: i32) {}
571}
572
573// #[derive(Debug)]
574// pub struct Test {
575//     i: i32,
576//     s: String,
577// }
578
579// impl MosquittoPlugin for Test {
580//     fn init(opts: MosquittoOpt) -> Self {
581//         Test {
582//             i: 32,
583//             s: "I am a struct string, wants to be recreated".into(),
584//         }
585//     }
586
587//     fn username_password(&mut self, u: String, p: String) -> Result<Success, Error> {
588//         self.s = "Well, i changed, whatyagonnadoaboutit".into();
589
590//         if u == "hej" && p == "nope" {
591//             Ok(Success)
592//         } else {
593//             Err(Error::Auth)
594//         }
595//     }
596// }
597// create_dynamic_library!(Test);
598
599// #[no_mangle]
600// pub extern "C" fn mosquitto_auth_psk_key_get(
601//     user_data: *mut Void,
602//     client: *mut mosquitto,
603//     hint: *const Char,
604//     identity: *const Char,
605//     key: *mut Char,
606//     max_key_len: Int,
607// ) -> Int {
608//     0
609// }
610// #[no_mangle]
611// pub extern "C" fn mosquitto_auth_start(
612//     user_data: *mut Void,
613//     client: *mut mosquitto,
614//     method: *const Char,
615//     reauth: bool,
616//     data_in: *const Void,
617//     data_in_len: u16,
618//     data_out: *mut *mut Void,
619//     data_out_len: *mut u16,
620// ) -> Int {
621//     0
622// }
623// #[no_mangle]
624// pub extern "C" fn mosquitto_auth_continue(
625//     user_data: *mut Void,
626//     client: *mut mosquitto,
627//     method: *const Char,
628//     data_in: *const Void,
629//     data_in_len: u16,
630//     data_out: *mut *mut Void,
631//     data_out_len: *mut u16,
632// ) -> Int {
633//     0
634// }
635
636#[cfg(test)]
637mod tests {
638    #[test]
639    fn it_works() {
640        debug_assert_eq!(2 + 2, 4);
641    }
642}