emyzelium/
lib.rs

1/*
2 * Emyzelium (Rust)
3 *
4 * is another wrapper around ZeroMQ's Publish-Subscribe messaging pattern
5 * with mandatory Curve security and optional ZAP authentication filter,
6 * over Tor, through Tor SOCKS proxy,
7 * for distributed artificial elife, decision making etc. systems where
8 * each peer, identified by its public key, onion address, and port,
9 * publishes and updates vectors of vectors of bytes of data
10 * under unique topics that other peers can subscribe to
11 * and receive the respective data.
12 * 
13 * https://github.com/emyzelium/emyzelium-rs
14 * 
15 * emyzelium@protonmail.com
16 * 
17 * Copyright (c) 2023-2024 Emyzelium caretakers
18 * 
19 * This program is free software: you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation, either version 3 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
27 * See the GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <https://www.gnu.org/licenses/>.
31 */
32
33/*
34 * Library
35 */
36
37extern crate getrandom;
38// use rand::prelude::*;
39
40#[allow(unused_imports)]
41use std::{
42    collections::{
43        HashMap,
44        HashSet
45    },
46    ffi::{
47        c_char,
48        c_int,
49        c_long,
50        c_longlong,
51        c_short,
52        c_uchar,
53        c_void,
54        CString,
55    },
56    fs,
57    ptr,
58    time::{
59        SystemTime,
60        UNIX_EPOCH
61    }
62};
63
64// Copied from zmq.h
65// Socket types
66const ZMQ_PAIR: c_int = 0;
67const ZMQ_PUB: c_int = 1;
68const ZMQ_REP: c_int = 4;
69const ZMQ_SUB: c_int = 2;
70// Socket options
71const ZMQ_BLOCKY: c_int = 70;
72const ZMQ_CURVE_PUBLICKEY: c_int = 48;
73const ZMQ_CURVE_SECRETKEY: c_int = 49;
74const ZMQ_CURVE_SERVER: c_int = 47;
75const ZMQ_CURVE_SERVERKEY: c_int = 50;
76const ZMQ_EVENTS: c_int = 15;
77const ZMQ_IPV6: c_int = 42;
78const ZMQ_ROUTING_ID: c_int = 5;
79const ZMQ_SOCKS_PROXY: c_int = 68;
80const ZMQ_SUBSCRIBE: c_int = 6;
81const ZMQ_UNSUBSCRIBE: c_int = 7;
82const ZMQ_ZAP_DOMAIN: c_int = 55;
83// Message options
84const ZMQ_MORE: c_int = 1;
85// Send/recv options
86const ZMQ_SNDMORE: c_int = 2;
87// Socket transport events
88const ZMQ_EVENT_ACCEPTED: c_int = 0x0020;
89const ZMQ_EVENT_DISCONNECTED: c_int = 0x0200;
90const ZMQ_EVENT_HANDSHAKE_SUCCEEDED: c_int = 0x1000;
91const ZMQ_EVENT_ALL: c_int = 0xFFFF;
92// Poll events
93const ZMQ_POLLIN: c_int = 1;
94
95// Copied from errno-base.h
96const C_EINTR: c_int = 4;
97
98#[link(name = "zmq")]
99extern "C" {
100    fn zmq_bind(socket: *mut c_void, endpoint: *const c_char) -> c_int;
101    fn zmq_close(socket: *mut c_void) -> c_int;
102    fn zmq_connect(socket: *mut c_void, endpoint: *const c_char) -> c_int;
103    fn zmq_ctx_new() -> *mut c_void;
104    fn zmq_ctx_set(context: *mut c_void, option_name: c_int, option_value: c_int) -> c_int;
105    fn zmq_ctx_shutdown(context: *mut c_void) -> c_int;
106    fn zmq_ctx_term(context: *mut c_void) -> c_int;
107    fn zmq_curve_public(z85_public_key: *mut c_uchar, z85_secret_key: *mut c_uchar) -> c_int;
108    fn zmq_errno() -> c_int;
109    fn zmq_getsockopt(socket: *mut c_void, option_name: c_int, option_value: *mut c_void, option_len: *mut usize) -> c_int;
110    fn zmq_msg_close(msg: *mut zmq_msg_t) -> c_int;
111    fn zmq_msg_data(msg: *mut zmq_msg_t) -> *mut c_void;
112    fn zmq_msg_get(message: *mut zmq_msg_t, property: c_int) -> c_int;
113    fn zmq_msg_init(msg: *mut zmq_msg_t) -> c_int;
114    fn zmq_msg_init_size(msg: *mut zmq_msg_t, size: usize) -> c_int;
115    fn zmq_msg_recv(msg: *mut zmq_msg_t, socket: *mut c_void, flags: c_int) -> c_int;
116    fn zmq_msg_send(msg: *mut zmq_msg_t, socket: *mut c_void, flags: c_int) -> c_int;
117    fn zmq_msg_size(msg: *mut zmq_msg_t) -> usize;
118    fn zmq_setsockopt(socket: *mut c_void, option_name: c_int, option_value: *const c_void, option_len: usize) -> c_int;
119    fn zmq_socket(context: *mut c_void, stype: c_int) -> *mut c_void;
120    fn zmq_socket_monitor(socket: *mut c_void, addr: *const c_char, events: c_int) -> c_int;
121    fn zmq_z85_encode(dest: *mut c_uchar, data: *const u8, size: usize) -> *mut c_char;
122}
123
124pub const LIB_VERSION: &str = env!("CARGO_PKG_VERSION");
125pub const LIB_DATE: &str = "2024.06.05";
126
127pub const DEF_PUBSUB_PORT: u16 = 0xEDAF; // 60847
128
129pub const DEF_TOR_PROXY_PORT: u16 = 9050; // default from /etc/tor/torrc
130pub const DEF_TOR_PROXY_HOST: &str = "127.0.0.1"; // default from /etc/tor/torrc
131
132const KEY_Z85_LEN: usize = 40;
133const KEY_Z85_CSTR_LEN: usize = KEY_Z85_LEN + 1;
134const KEY_BIN_LEN: usize = 32;
135
136const DEF_IPV6_STATUS: c_int = 1;
137
138const CURVE_MECHANISM_ID: &str = "CURVE"; // See https://rfc.zeromq.org/spec/27/
139const ZAP_DOMAIN: &str = "emyz";
140
141const ZAP_SESSION_ID_LEN: usize = 32;
142
143const ERR_ALREADY_PRESENT: &str = "already present";
144const ERR_ALREADY_ABSENT: &str = "already absent";
145const ERR_ALREADY_PAUSED: &str = "already paused";
146const ERR_ALREADY_RESUMED: &str = "already resumed";
147const ERR_ABSENT: &str = "absent";
148
149#[repr(C)]
150#[allow(non_camel_case_types)]
151struct zmq_msg_t { // from zmq.h
152    _d: [c_uchar; 64]
153}
154
155pub struct Etale {
156    paused: bool,
157    parts: Vec<Vec<u8>>,
158    t_out: i64,
159    t_in: i64
160}
161
162pub struct Ehypha {
163    subsock: *mut c_void,
164    etales: HashMap<String, Etale>
165}
166
167pub struct Efunguz {
168    secretkey: String,
169    publickey: String,
170    whitelist_publickeys: HashSet<String>,
171    torproxy_port: u16,
172    torproxy_host: String,
173    ehyphae: HashMap<String, Ehypha>,
174    context: *mut c_void,
175    zapsock: *mut c_void,
176    zap_session_id: Vec<u8>,
177    pubsock: *mut c_void,
178    monsock: *mut c_void,
179    in_accepted_num: u64,
180    in_handshake_succeeded_num: u64,
181    in_disconnected_num: u64,
182    t_last_accepted: i64,
183    t_last_handshake_succeeded: i64,
184    t_last_disconnected: i64
185}
186
187fn time_musec() -> i64 {
188    match SystemTime::now().duration_since(UNIX_EPOCH) {
189        Ok(d) => d.as_micros() as i64,
190        Err(_) => 0
191    } 
192}
193
194fn cut_pad_key_str(s: &str) -> String {
195    let mut s = String::from(s);
196    if s.len() < KEY_Z85_LEN {
197        s.extend(vec![' '; KEY_Z85_LEN - s.len()]);
198    } else {
199        s.truncate(KEY_Z85_LEN);
200    }
201    s
202}
203
204fn zmqe_setsockopt_int(socket: *mut c_void, option_name: c_int, option_value: c_int) -> c_int {
205    unsafe {
206        zmq_setsockopt(socket, option_name, (&option_value) as *const c_int as *const c_void, std::mem::size_of::<c_int>())
207    }
208}
209
210fn zmqe_setsockopt_str(socket: *mut c_void, option_name: c_int, option_value: &str) -> c_int {
211    let cstr = CString::new(option_value).unwrap_or_default();
212    unsafe {
213        zmq_setsockopt(socket, option_name, cstr.as_ptr() as *const c_char as *const c_void, option_value.len() + 1)
214    }
215}
216
217fn zmqe_setsockopt_vec(socket: *mut c_void, option_name: c_int, option_value: &Vec<u8>) -> c_int {
218    unsafe {
219        zmq_setsockopt(socket, option_name, option_value.as_ptr() as *const c_void, option_value.len())
220    }
221}
222
223fn zmqe_getsockopt_events(socket: *mut c_void) -> c_int {
224    let mut option_value: c_int = 0;
225    let mut option_len: usize = std::mem::size_of::<c_int>();
226    unsafe {
227        zmq_getsockopt(socket, ZMQ_EVENTS, (&mut option_value) as *mut c_int as *mut c_void, (&mut option_len) as *mut usize);
228    }
229    option_value
230}
231
232fn zmqe_bind(socket: *mut c_void, endpoint: &str) -> c_int {
233    let cstr = CString::new(endpoint).unwrap_or_default();
234    unsafe {
235        zmq_bind(socket, cstr.as_ptr())
236    }
237}
238
239fn zmqe_connect(socket: *mut c_void, endpoint: &str) -> c_int {
240    let cstr = CString::new(endpoint).unwrap_or_default();
241    unsafe {
242        zmq_connect(socket, cstr.as_ptr())
243    }
244}
245
246fn zmqe_socket_monitor_all(socket: *mut c_void, addr: &str) -> c_int {
247    let cstr = CString::new(addr).unwrap_or_default();
248    unsafe {
249        zmq_socket_monitor(socket, cstr.as_ptr(), ZMQ_EVENT_ALL)
250    }
251}
252
253fn zmqe_curve_public(z85_secret_key: &str) -> (String, c_int) {
254    let mut sec_bufn = [0u8; KEY_Z85_CSTR_LEN];
255    unsafe { // "safe" if z85_secret_key is not shorter than KEY_Z85_LEN... see cut_pad_key_str()
256        (z85_secret_key.as_ptr() as *const u8).copy_to(sec_bufn.as_mut_ptr(), KEY_Z85_LEN);
257    }
258    let mut pub_bufn = [0u8; KEY_Z85_CSTR_LEN];
259    let r = unsafe {
260        zmq_curve_public(pub_bufn.as_mut_ptr(), sec_bufn.as_mut_ptr())
261    };
262    match r {
263        0 => (String::from_utf8(pub_bufn[..KEY_Z85_LEN].to_vec()).unwrap_or_default(), 0),
264        _ => (String::new(), r)
265    }    
266}
267
268impl zmq_msg_t {
269    fn new_default() -> Self {
270        Self {
271            _d: [0 as c_uchar; 64]
272        }
273    }
274}
275
276fn zmqe_send(socket: *mut c_void, parts: & Vec<Vec<u8>>) {
277    let mut msg = zmq_msg_t::new_default();
278    for i in 0..parts.len() {
279        unsafe {
280            let size = parts[i].len();
281            zmq_msg_init_size((&mut msg) as *mut zmq_msg_t, size);
282            let data = zmq_msg_data((&mut msg) as *mut zmq_msg_t);
283            (parts[i].as_ptr() as *const c_void).copy_to(data, size);
284            if zmq_msg_send((&mut msg) as *mut zmq_msg_t, socket, if (i + 1) < parts.len() {ZMQ_SNDMORE} else {0}) < 0 {
285                zmq_msg_close((&mut msg) as *mut zmq_msg_t);
286            }
287        }        
288    }
289}
290
291fn zmqe_recv(socket: *mut c_void) -> Vec<Vec<u8>> {
292    let mut parts = Vec::new();
293    let mut msg = zmq_msg_t::new_default();
294    let mut more: c_int;
295    loop {
296        unsafe {
297            zmq_msg_init((&mut msg) as *mut zmq_msg_t);
298            zmq_msg_recv((&mut msg) as *mut zmq_msg_t, socket, 0);
299            let size = zmq_msg_size((&mut msg) as *mut zmq_msg_t);
300            let mut part = vec![0u8; size];
301            let data = zmq_msg_data((&mut msg) as *mut zmq_msg_t);
302            (data as *const u8).copy_to(part.as_mut_ptr(), size);
303            parts.push(part);
304            more = zmq_msg_get((&mut msg) as *mut zmq_msg_t, ZMQ_MORE);
305            zmq_msg_close((&mut msg) as *mut zmq_msg_t);
306        }
307        if more == 0 {
308            break;
309        }
310    }
311    parts
312}
313
314impl Etale {
315
316    fn new_default() -> Self {
317        Self {
318            paused: false,
319            parts: Vec::new(),
320            t_out: -1,
321            t_in: -1
322        }
323    }
324
325    pub fn parts(&self) -> & Vec<Vec<u8>> {
326        & self.parts
327    }
328
329    pub fn t_out(&self) -> i64 {
330        self.t_out
331    }
332
333    pub fn t_in(&self) -> i64 {
334        self.t_in
335    }
336
337}
338
339impl Ehypha {
340
341    fn new(context: *mut c_void, secretkey: &str, publickey: &str, serverkey: &str, onion: &str, port: u16, torproxy_port: u16, torproxy_host: &str) -> Self {
342        let subsock = unsafe {
343            zmq_socket(context, ZMQ_SUB)
344        };
345        zmqe_setsockopt_str(subsock, ZMQ_CURVE_SECRETKEY, secretkey);
346        zmqe_setsockopt_str(subsock, ZMQ_CURVE_PUBLICKEY, publickey);
347        zmqe_setsockopt_str(subsock, ZMQ_CURVE_SERVERKEY, serverkey);
348        zmqe_setsockopt_str(subsock, ZMQ_SOCKS_PROXY, & format!("{}:{}", torproxy_host, torproxy_port));
349        let endpoint = format!("tcp://{}.onion:{}", onion, port);
350        zmqe_connect(subsock, &endpoint);
351        Self {
352            subsock,
353            etales: HashMap::new()
354        }
355    }
356
357    pub fn add_etale(&mut self, title: &str) -> Result<&Etale, String> {
358        match self.etales.insert(String::from(title), Etale::new_default()) {
359            None => {
360                zmqe_setsockopt_str(self.subsock, ZMQ_SUBSCRIBE, title);
361                match self.etales.get(title) {
362                    Some(et) => Ok(et),
363                    None => Err(String::from(ERR_ABSENT))
364                }
365                
366            },
367            Some(_) => Err(String::from(ERR_ALREADY_PRESENT))
368        }
369    }
370
371    pub fn get_etale(&self, title: &str) -> Option<&Etale> {
372        self.etales.get(title)
373    }
374
375    pub fn del_etale(&mut self, title: &str) -> Result<(), String> {
376        match self.etales.remove(title) {
377            Some(_) => {
378                zmqe_setsockopt_str(self.subsock, ZMQ_UNSUBSCRIBE, title);
379                Ok(())
380            },
381            None => Err(String::from(ERR_ALREADY_ABSENT))
382        }
383    }
384
385    pub fn pause_etale(&mut self, title: &str) -> Result<(), String> {
386        match self.etales.get_mut(title) {
387            Some(etale) => {
388                if ! etale.paused {
389                    zmqe_setsockopt_str(self.subsock, ZMQ_UNSUBSCRIBE, title);
390                    etale.paused = true;
391                    Ok(())
392                } else {
393                    Err(String::from(ERR_ALREADY_PAUSED))
394                }
395            },
396            None => {
397                Err(String::from(ERR_ABSENT))
398            }
399        }
400    }
401
402    pub fn resume_etale(&mut self, title: &str) -> Result<(), String> {
403        match self.etales.get_mut(title) {
404            Some(etale) => {
405                if etale.paused {
406                    zmqe_setsockopt_str(self.subsock, ZMQ_SUBSCRIBE, title);
407                    etale.paused = false;
408                    Ok(())
409                } else {
410                    Err(String::from(ERR_ALREADY_RESUMED))
411                }
412            },
413            None => {
414                Err(String::from(ERR_ABSENT))
415            }
416        }
417    }
418
419    pub fn pause_etales(&mut self) {
420        for (title, etale) in &mut self.etales {
421            if ! etale.paused {
422                zmqe_setsockopt_str(self.subsock, ZMQ_UNSUBSCRIBE, title);
423                etale.paused = true;
424            }
425        }
426    }
427
428    pub fn resume_etales(&mut self) {
429        for (title, etale) in &mut self.etales {
430            if etale.paused {
431                zmqe_setsockopt_str(self.subsock, ZMQ_SUBSCRIBE, title);
432                etale.paused = false;
433            }
434        }
435    }
436
437    fn update(&mut self) {
438        let t = time_musec();
439        while zmqe_getsockopt_events(self.subsock) & ZMQ_POLLIN != 0 {
440            let msg_parts = zmqe_recv(self.subsock);
441            // Sanity checks...
442            if msg_parts.len() >= 2 {
443                // 0th is topic, 1st is remote time, rest (optional) is data
444                let topic = & msg_parts[0];
445                let l = topic.len();
446                if (l > 0) && (topic[l - 1] == 0) {
447                    let title = String::from_utf8(topic[..(l - 1)].to_vec()).unwrap_or_default();
448                    if let Some(etale) = self.etales.get_mut(&title) {
449                        if ! etale.paused {
450                            if msg_parts[1].len() == 8 { // i64
451                                etale.parts.clear();
452                                etale.parts.extend_from_slice(& msg_parts[2..]);
453                                let mut buf = [0u8; 8];
454                                buf.copy_from_slice(& msg_parts[1]);
455                                etale.t_out = i64::from_le_bytes(buf);
456                                etale.t_in = t;
457                            }
458                        }
459                    }
460                }
461            }
462        }
463    }
464
465}
466
467impl Drop for Ehypha {
468    fn drop(&mut self) {
469        unsafe {
470            zmq_close(self.subsock);
471        }
472    }
473}
474
475impl Efunguz {
476
477    pub fn new(secretkey: &str, whitelist_publickeys: & HashSet<String>, pub_port: u16, torproxy_port: u16, torproxy_host: &str) -> Self {
478        let secretkey = cut_pad_key_str(secretkey);
479        let (publickey, _) = zmqe_curve_public(&secretkey);
480
481        let mut cp_whitelist_publickeys = HashSet::new();
482        for k in whitelist_publickeys {
483            cp_whitelist_publickeys.insert(cut_pad_key_str(k));
484        }
485
486        let torproxy_host = String::from(torproxy_host);
487
488        let ehyphae = HashMap::new();
489
490        let context = unsafe {
491            zmq_ctx_new()
492        };
493
494        unsafe {
495            zmq_ctx_set(context, ZMQ_IPV6, DEF_IPV6_STATUS);
496            zmq_ctx_set(context, ZMQ_BLOCKY, 0);
497        }
498
499        // At first, REP socket for ZAP auth...
500        let zapsock = unsafe {
501            zmq_socket(context, ZMQ_REP)
502        };
503
504        zmqe_bind(zapsock, "inproc://zeromq.zap.01");
505
506        let mut zap_session_id = vec![0u8; ZAP_SESSION_ID_LEN];
507         // Must be cryptographically random...
508        getrandom::getrandom(&mut zap_session_id).unwrap_or(());
509        // thread_rng().fill_bytes(&mut zap_session_id);
510        // ...is it?
511
512        // ..and only then, PUB socket
513        let pubsock = unsafe {
514            zmq_socket(context, ZMQ_PUB)
515        };
516
517        zmqe_setsockopt_int(pubsock, ZMQ_CURVE_SERVER, 1);
518        zmqe_setsockopt_str(pubsock, ZMQ_CURVE_SECRETKEY, &secretkey);
519        zmqe_setsockopt_vec(pubsock, ZMQ_ZAP_DOMAIN, & ZAP_DOMAIN.as_bytes().to_vec()); // to enable auth, must be non-empty due to ZMQ RFC 27
520        zmqe_setsockopt_vec(pubsock, ZMQ_ROUTING_ID, & zap_session_id); // to make sure only this pubsock can pass auth through zapsock; see update()
521        
522        // Before binding, attach monitor
523        zmqe_socket_monitor_all(pubsock, "inproc://monitor-pub");
524        let monsock = unsafe {
525            zmq_socket(context, ZMQ_PAIR)
526        };
527        zmqe_connect(monsock, "inproc://monitor-pub");
528
529        zmqe_bind(pubsock, & format!("tcp://*:{}", pub_port));
530
531        let in_accepted_num: u64 = 0;
532        let in_handshake_succeeded_num: u64 = 0;
533        let in_disconnected_num: u64 = 0;
534
535        let t_last_accepted: i64 = -1;
536        let t_last_handshake_succeeded: i64 = -1;
537        let t_last_disconnected: i64 = -1;
538
539        Self {
540            secretkey,
541            publickey,
542            whitelist_publickeys: cp_whitelist_publickeys,
543            torproxy_port,
544            torproxy_host,
545            ehyphae,
546            context,
547            zapsock,
548            zap_session_id,
549            pubsock,
550            monsock,
551            in_accepted_num,
552            in_handshake_succeeded_num,
553            in_disconnected_num,
554            t_last_accepted,
555            t_last_handshake_succeeded,
556            t_last_disconnected
557        }
558    }
559
560    pub fn add_whitelist_publickeys(&mut self, publickeys: & HashSet<String>) {
561        for k in publickeys {
562            self.whitelist_publickeys.insert(cut_pad_key_str(k));
563        }
564    }
565
566    pub fn del_whitelist_publickeys(&mut self, publickeys: & HashSet<String>) {
567        for k in publickeys {
568            self.whitelist_publickeys.remove(& cut_pad_key_str(k));
569        }
570    }
571
572    pub fn clear_whitelist_publickeys(&mut self) {
573        self.whitelist_publickeys.clear();
574    }
575
576    pub fn read_whitelist_publickeys(&mut self, filepath: &str) {
577        for line in fs::read_to_string(filepath).unwrap_or_default().lines() {
578            if line.len() >= KEY_Z85_LEN {
579                let mut cp_line = String::from(line);
580                cp_line.truncate(KEY_Z85_LEN);
581                self.whitelist_publickeys.insert(cp_line);
582            }
583        }
584    }
585
586    pub fn add_ehypha(&mut self, publickey: &str, onion: &str, port: u16) -> Result<&mut Ehypha, String> {
587        let cp_publickey = cut_pad_key_str(publickey);
588        match self.ehyphae.insert(
589            cp_publickey.clone(),
590            Ehypha::new(self.context, & self.secretkey, & self.publickey, &cp_publickey, onion, port, self.torproxy_port, & self.torproxy_host)
591        ) {
592            None => match self.ehyphae.get_mut(&cp_publickey) {
593                Some(eh) => Ok(eh),
594                None => Err(String::from(ERR_ABSENT))
595            },
596            Some(_) => Err(String::from(ERR_ALREADY_PRESENT))
597        }
598    }
599
600    pub fn get_ehypha(&self, publickey: &str) -> Option<&Ehypha> {
601        let cp_publickey = cut_pad_key_str(publickey);
602        self.ehyphae.get(&cp_publickey)
603    }
604
605    pub fn get_mut_ehypha(&mut self, publickey: &str) -> Option<&mut Ehypha> {
606        let cp_publickey = cut_pad_key_str(publickey);
607        self.ehyphae.get_mut(&cp_publickey)
608    }
609
610    pub fn del_ehypha(&mut self, publickey: &str) -> Result<(), String> {
611        let cp_publickey = cut_pad_key_str(publickey);
612        match self.ehyphae.remove(&cp_publickey) {
613            Some(_) => Ok(()),
614            None => Err(String::from(ERR_ALREADY_ABSENT))
615        }
616    }
617
618    pub fn emit_etale(&mut self, title: &str, parts: & Vec<Vec<u8>>) {
619        let mut msg_parts: Vec<Vec<u8>> = Vec::new();
620
621        let mut topic = String::from(title).as_bytes().to_vec();
622        topic.push(0);
623        msg_parts.push(topic);
624
625        let t_out = time_musec();
626        msg_parts.push(t_out.to_le_bytes().to_vec());
627
628        msg_parts.extend_from_slice(parts);
629
630        zmqe_send(self.pubsock, &msg_parts);
631    }
632
633    pub fn update(&mut self) {
634        while zmqe_getsockopt_events(self.zapsock) & ZMQ_POLLIN != 0 {
635            let request = zmqe_recv(self.zapsock);
636            let mut reply: Vec<Vec<u8>> = Vec::new();
637
638            let version = request[0].clone();
639            let sequence = request[1].clone();
640            // let domain = request[2].clone();
641            // let address = request[3].clone();
642            let identity = request[4].clone();
643            let mechanism = request[5].clone();
644            let mut key_bin = request[6].clone();
645
646            key_bin.truncate(KEY_BIN_LEN);
647            let mut key_bufn = [0u8; KEY_Z85_CSTR_LEN];
648            unsafe {
649                zmq_z85_encode(key_bufn.as_mut_ptr(), key_bin.as_ptr(), KEY_BIN_LEN);
650            }
651            let key_z85 = String::from_utf8(key_bufn[..KEY_Z85_LEN].to_vec()).unwrap_or_default();
652
653            reply.push(version);
654            reply.push(sequence);
655
656            if (identity == self.zap_session_id) && (mechanism == CURVE_MECHANISM_ID.as_bytes().to_vec()) && (self.whitelist_publickeys.is_empty() || self.whitelist_publickeys.contains(&key_z85)) {
657                // Auth passed
658                // Though needless (yet), set user-id to client's public key
659                reply.push("200".as_bytes().to_vec());
660                reply.push("OK".as_bytes().to_vec());
661                reply.push(key_z85.as_bytes().to_vec());
662                reply.push("".as_bytes().to_vec());
663            } else {
664                // Auth failed
665                reply.push("400".as_bytes().to_vec());
666                reply.push("FAILED".as_bytes().to_vec());
667                reply.push("".as_bytes().to_vec());
668                reply.push("".as_bytes().to_vec());
669            }
670
671            zmqe_send(self.zapsock, &reply);
672        }
673
674        for (_, eh) in &mut self.ehyphae {
675            eh.update();
676        }
677
678        let t = time_musec();
679
680        while zmqe_getsockopt_events(self.monsock) & ZMQ_POLLIN != 0 {
681            let event_msg = zmqe_recv(self.monsock);
682            if event_msg.len() > 0 {
683                if event_msg[0].len() >= 2 {
684                    let event_num = u16::from_le_bytes([event_msg[0][0], event_msg[0][1]]);
685                    if ((event_num as c_int) & ZMQ_EVENT_ACCEPTED) != 0 {
686                        self.in_accepted_num += 1;
687                        self.t_last_accepted = t;
688                    }
689                    if ((event_num as c_int) & ZMQ_EVENT_HANDSHAKE_SUCCEEDED) != 0 {
690                        self.in_handshake_succeeded_num += 1;
691                        self.t_last_handshake_succeeded = t;
692                    }
693                    if ((event_num as c_int) & ZMQ_EVENT_DISCONNECTED) != 0 {
694                        self.in_disconnected_num += 1;
695                        self.t_last_disconnected = t;
696                    } 
697                }
698            }
699
700        }
701    }
702
703    pub fn in_attempted_num(&self) -> u64 {
704        self.in_accepted_num
705    }
706
707    pub fn in_permitted_num(&self) -> u64 {
708        self.in_handshake_succeeded_num
709    }
710
711    pub fn in_absorbing_num(&self) -> u64 { // may temporarily exceed number of actually subscribed peers, until disconnection due to failed auth
712        if self.in_accepted_num >= self.in_disconnected_num {
713            self.in_accepted_num - self.in_disconnected_num
714        } else {
715            0
716        }
717    }
718
719    pub fn t_last_attempt(&self) -> i64 {
720        self.t_last_accepted
721    }
722
723    pub fn t_last_permit(&self) -> i64 {
724        self.t_last_handshake_succeeded
725    }
726
727    pub fn t_last_disconnect(&self) -> i64 {
728        self.t_last_disconnected
729    }
730
731}
732
733impl Drop for Efunguz {
734    fn drop(&mut self) {
735        self.ehyphae.clear(); // to close subsock of each ehypha in its dropper before terminating context, to which those sockets belong; freezes otherwise
736
737        unsafe {
738            zmq_close(self.monsock);
739            zmq_close(self.pubsock);
740            zmq_close(self.zapsock);
741
742            zmq_ctx_shutdown(self.context);
743            while zmq_ctx_term(self.context) == -1 {
744                if zmq_errno() == C_EINTR {
745                    continue;
746                } else {
747                    break;
748                }
749            }
750        }
751    }
752}