1extern crate getrandom;
38#[allow(unused_imports)]
41use std::{
42 collections::{
43 HashMap,
44 HashSet
45 },
46 ffi::{
47 c_char,
48 c_int,
49 c_long,
50 c_longlong,
51 c_short,
52 c_uchar,
53 c_void,
54 CString,
55 },
56 fs,
57 ptr,
58 time::{
59 SystemTime,
60 UNIX_EPOCH
61 }
62};
63
64const ZMQ_PAIR: c_int = 0;
67const ZMQ_PUB: c_int = 1;
68const ZMQ_REP: c_int = 4;
69const ZMQ_SUB: c_int = 2;
70const ZMQ_BLOCKY: c_int = 70;
72const ZMQ_CURVE_PUBLICKEY: c_int = 48;
73const ZMQ_CURVE_SECRETKEY: c_int = 49;
74const ZMQ_CURVE_SERVER: c_int = 47;
75const ZMQ_CURVE_SERVERKEY: c_int = 50;
76const ZMQ_EVENTS: c_int = 15;
77const ZMQ_IPV6: c_int = 42;
78const ZMQ_ROUTING_ID: c_int = 5;
79const ZMQ_SOCKS_PROXY: c_int = 68;
80const ZMQ_SUBSCRIBE: c_int = 6;
81const ZMQ_UNSUBSCRIBE: c_int = 7;
82const ZMQ_ZAP_DOMAIN: c_int = 55;
83const ZMQ_MORE: c_int = 1;
85const ZMQ_SNDMORE: c_int = 2;
87const ZMQ_EVENT_ACCEPTED: c_int = 0x0020;
89const ZMQ_EVENT_DISCONNECTED: c_int = 0x0200;
90const ZMQ_EVENT_HANDSHAKE_SUCCEEDED: c_int = 0x1000;
91const ZMQ_EVENT_ALL: c_int = 0xFFFF;
92const ZMQ_POLLIN: c_int = 1;
94
95const C_EINTR: c_int = 4;
97
98#[link(name = "zmq")]
99extern "C" {
100 fn zmq_bind(socket: *mut c_void, endpoint: *const c_char) -> c_int;
101 fn zmq_close(socket: *mut c_void) -> c_int;
102 fn zmq_connect(socket: *mut c_void, endpoint: *const c_char) -> c_int;
103 fn zmq_ctx_new() -> *mut c_void;
104 fn zmq_ctx_set(context: *mut c_void, option_name: c_int, option_value: c_int) -> c_int;
105 fn zmq_ctx_shutdown(context: *mut c_void) -> c_int;
106 fn zmq_ctx_term(context: *mut c_void) -> c_int;
107 fn zmq_curve_public(z85_public_key: *mut c_uchar, z85_secret_key: *mut c_uchar) -> c_int;
108 fn zmq_errno() -> c_int;
109 fn zmq_getsockopt(socket: *mut c_void, option_name: c_int, option_value: *mut c_void, option_len: *mut usize) -> c_int;
110 fn zmq_msg_close(msg: *mut zmq_msg_t) -> c_int;
111 fn zmq_msg_data(msg: *mut zmq_msg_t) -> *mut c_void;
112 fn zmq_msg_get(message: *mut zmq_msg_t, property: c_int) -> c_int;
113 fn zmq_msg_init(msg: *mut zmq_msg_t) -> c_int;
114 fn zmq_msg_init_size(msg: *mut zmq_msg_t, size: usize) -> c_int;
115 fn zmq_msg_recv(msg: *mut zmq_msg_t, socket: *mut c_void, flags: c_int) -> c_int;
116 fn zmq_msg_send(msg: *mut zmq_msg_t, socket: *mut c_void, flags: c_int) -> c_int;
117 fn zmq_msg_size(msg: *mut zmq_msg_t) -> usize;
118 fn zmq_setsockopt(socket: *mut c_void, option_name: c_int, option_value: *const c_void, option_len: usize) -> c_int;
119 fn zmq_socket(context: *mut c_void, stype: c_int) -> *mut c_void;
120 fn zmq_socket_monitor(socket: *mut c_void, addr: *const c_char, events: c_int) -> c_int;
121 fn zmq_z85_encode(dest: *mut c_uchar, data: *const u8, size: usize) -> *mut c_char;
122}
123
124pub const LIB_VERSION: &str = env!("CARGO_PKG_VERSION");
125pub const LIB_DATE: &str = "2024.06.05";
126
127pub const DEF_PUBSUB_PORT: u16 = 0xEDAF; pub const DEF_TOR_PROXY_PORT: u16 = 9050; pub const DEF_TOR_PROXY_HOST: &str = "127.0.0.1"; const KEY_Z85_LEN: usize = 40;
133const KEY_Z85_CSTR_LEN: usize = KEY_Z85_LEN + 1;
134const KEY_BIN_LEN: usize = 32;
135
136const DEF_IPV6_STATUS: c_int = 1;
137
138const CURVE_MECHANISM_ID: &str = "CURVE"; const ZAP_DOMAIN: &str = "emyz";
140
141const ZAP_SESSION_ID_LEN: usize = 32;
142
143const ERR_ALREADY_PRESENT: &str = "already present";
144const ERR_ALREADY_ABSENT: &str = "already absent";
145const ERR_ALREADY_PAUSED: &str = "already paused";
146const ERR_ALREADY_RESUMED: &str = "already resumed";
147const ERR_ABSENT: &str = "absent";
148
149#[repr(C)]
150#[allow(non_camel_case_types)]
151struct zmq_msg_t { _d: [c_uchar; 64]
153}
154
155pub struct Etale {
156 paused: bool,
157 parts: Vec<Vec<u8>>,
158 t_out: i64,
159 t_in: i64
160}
161
162pub struct Ehypha {
163 subsock: *mut c_void,
164 etales: HashMap<String, Etale>
165}
166
167pub struct Efunguz {
168 secretkey: String,
169 publickey: String,
170 whitelist_publickeys: HashSet<String>,
171 torproxy_port: u16,
172 torproxy_host: String,
173 ehyphae: HashMap<String, Ehypha>,
174 context: *mut c_void,
175 zapsock: *mut c_void,
176 zap_session_id: Vec<u8>,
177 pubsock: *mut c_void,
178 monsock: *mut c_void,
179 in_accepted_num: u64,
180 in_handshake_succeeded_num: u64,
181 in_disconnected_num: u64,
182 t_last_accepted: i64,
183 t_last_handshake_succeeded: i64,
184 t_last_disconnected: i64
185}
186
187fn time_musec() -> i64 {
188 match SystemTime::now().duration_since(UNIX_EPOCH) {
189 Ok(d) => d.as_micros() as i64,
190 Err(_) => 0
191 }
192}
193
194fn cut_pad_key_str(s: &str) -> String {
195 let mut s = String::from(s);
196 if s.len() < KEY_Z85_LEN {
197 s.extend(vec![' '; KEY_Z85_LEN - s.len()]);
198 } else {
199 s.truncate(KEY_Z85_LEN);
200 }
201 s
202}
203
204fn zmqe_setsockopt_int(socket: *mut c_void, option_name: c_int, option_value: c_int) -> c_int {
205 unsafe {
206 zmq_setsockopt(socket, option_name, (&option_value) as *const c_int as *const c_void, std::mem::size_of::<c_int>())
207 }
208}
209
210fn zmqe_setsockopt_str(socket: *mut c_void, option_name: c_int, option_value: &str) -> c_int {
211 let cstr = CString::new(option_value).unwrap_or_default();
212 unsafe {
213 zmq_setsockopt(socket, option_name, cstr.as_ptr() as *const c_char as *const c_void, option_value.len() + 1)
214 }
215}
216
217fn zmqe_setsockopt_vec(socket: *mut c_void, option_name: c_int, option_value: &Vec<u8>) -> c_int {
218 unsafe {
219 zmq_setsockopt(socket, option_name, option_value.as_ptr() as *const c_void, option_value.len())
220 }
221}
222
223fn zmqe_getsockopt_events(socket: *mut c_void) -> c_int {
224 let mut option_value: c_int = 0;
225 let mut option_len: usize = std::mem::size_of::<c_int>();
226 unsafe {
227 zmq_getsockopt(socket, ZMQ_EVENTS, (&mut option_value) as *mut c_int as *mut c_void, (&mut option_len) as *mut usize);
228 }
229 option_value
230}
231
232fn zmqe_bind(socket: *mut c_void, endpoint: &str) -> c_int {
233 let cstr = CString::new(endpoint).unwrap_or_default();
234 unsafe {
235 zmq_bind(socket, cstr.as_ptr())
236 }
237}
238
239fn zmqe_connect(socket: *mut c_void, endpoint: &str) -> c_int {
240 let cstr = CString::new(endpoint).unwrap_or_default();
241 unsafe {
242 zmq_connect(socket, cstr.as_ptr())
243 }
244}
245
246fn zmqe_socket_monitor_all(socket: *mut c_void, addr: &str) -> c_int {
247 let cstr = CString::new(addr).unwrap_or_default();
248 unsafe {
249 zmq_socket_monitor(socket, cstr.as_ptr(), ZMQ_EVENT_ALL)
250 }
251}
252
253fn zmqe_curve_public(z85_secret_key: &str) -> (String, c_int) {
254 let mut sec_bufn = [0u8; KEY_Z85_CSTR_LEN];
255 unsafe { (z85_secret_key.as_ptr() as *const u8).copy_to(sec_bufn.as_mut_ptr(), KEY_Z85_LEN);
257 }
258 let mut pub_bufn = [0u8; KEY_Z85_CSTR_LEN];
259 let r = unsafe {
260 zmq_curve_public(pub_bufn.as_mut_ptr(), sec_bufn.as_mut_ptr())
261 };
262 match r {
263 0 => (String::from_utf8(pub_bufn[..KEY_Z85_LEN].to_vec()).unwrap_or_default(), 0),
264 _ => (String::new(), r)
265 }
266}
267
268impl zmq_msg_t {
269 fn new_default() -> Self {
270 Self {
271 _d: [0 as c_uchar; 64]
272 }
273 }
274}
275
276fn zmqe_send(socket: *mut c_void, parts: & Vec<Vec<u8>>) {
277 let mut msg = zmq_msg_t::new_default();
278 for i in 0..parts.len() {
279 unsafe {
280 let size = parts[i].len();
281 zmq_msg_init_size((&mut msg) as *mut zmq_msg_t, size);
282 let data = zmq_msg_data((&mut msg) as *mut zmq_msg_t);
283 (parts[i].as_ptr() as *const c_void).copy_to(data, size);
284 if zmq_msg_send((&mut msg) as *mut zmq_msg_t, socket, if (i + 1) < parts.len() {ZMQ_SNDMORE} else {0}) < 0 {
285 zmq_msg_close((&mut msg) as *mut zmq_msg_t);
286 }
287 }
288 }
289}
290
291fn zmqe_recv(socket: *mut c_void) -> Vec<Vec<u8>> {
292 let mut parts = Vec::new();
293 let mut msg = zmq_msg_t::new_default();
294 let mut more: c_int;
295 loop {
296 unsafe {
297 zmq_msg_init((&mut msg) as *mut zmq_msg_t);
298 zmq_msg_recv((&mut msg) as *mut zmq_msg_t, socket, 0);
299 let size = zmq_msg_size((&mut msg) as *mut zmq_msg_t);
300 let mut part = vec![0u8; size];
301 let data = zmq_msg_data((&mut msg) as *mut zmq_msg_t);
302 (data as *const u8).copy_to(part.as_mut_ptr(), size);
303 parts.push(part);
304 more = zmq_msg_get((&mut msg) as *mut zmq_msg_t, ZMQ_MORE);
305 zmq_msg_close((&mut msg) as *mut zmq_msg_t);
306 }
307 if more == 0 {
308 break;
309 }
310 }
311 parts
312}
313
314impl Etale {
315
316 fn new_default() -> Self {
317 Self {
318 paused: false,
319 parts: Vec::new(),
320 t_out: -1,
321 t_in: -1
322 }
323 }
324
325 pub fn parts(&self) -> & Vec<Vec<u8>> {
326 & self.parts
327 }
328
329 pub fn t_out(&self) -> i64 {
330 self.t_out
331 }
332
333 pub fn t_in(&self) -> i64 {
334 self.t_in
335 }
336
337}
338
339impl Ehypha {
340
341 fn new(context: *mut c_void, secretkey: &str, publickey: &str, serverkey: &str, onion: &str, port: u16, torproxy_port: u16, torproxy_host: &str) -> Self {
342 let subsock = unsafe {
343 zmq_socket(context, ZMQ_SUB)
344 };
345 zmqe_setsockopt_str(subsock, ZMQ_CURVE_SECRETKEY, secretkey);
346 zmqe_setsockopt_str(subsock, ZMQ_CURVE_PUBLICKEY, publickey);
347 zmqe_setsockopt_str(subsock, ZMQ_CURVE_SERVERKEY, serverkey);
348 zmqe_setsockopt_str(subsock, ZMQ_SOCKS_PROXY, & format!("{}:{}", torproxy_host, torproxy_port));
349 let endpoint = format!("tcp://{}.onion:{}", onion, port);
350 zmqe_connect(subsock, &endpoint);
351 Self {
352 subsock,
353 etales: HashMap::new()
354 }
355 }
356
357 pub fn add_etale(&mut self, title: &str) -> Result<&Etale, String> {
358 match self.etales.insert(String::from(title), Etale::new_default()) {
359 None => {
360 zmqe_setsockopt_str(self.subsock, ZMQ_SUBSCRIBE, title);
361 match self.etales.get(title) {
362 Some(et) => Ok(et),
363 None => Err(String::from(ERR_ABSENT))
364 }
365
366 },
367 Some(_) => Err(String::from(ERR_ALREADY_PRESENT))
368 }
369 }
370
371 pub fn get_etale(&self, title: &str) -> Option<&Etale> {
372 self.etales.get(title)
373 }
374
375 pub fn del_etale(&mut self, title: &str) -> Result<(), String> {
376 match self.etales.remove(title) {
377 Some(_) => {
378 zmqe_setsockopt_str(self.subsock, ZMQ_UNSUBSCRIBE, title);
379 Ok(())
380 },
381 None => Err(String::from(ERR_ALREADY_ABSENT))
382 }
383 }
384
385 pub fn pause_etale(&mut self, title: &str) -> Result<(), String> {
386 match self.etales.get_mut(title) {
387 Some(etale) => {
388 if ! etale.paused {
389 zmqe_setsockopt_str(self.subsock, ZMQ_UNSUBSCRIBE, title);
390 etale.paused = true;
391 Ok(())
392 } else {
393 Err(String::from(ERR_ALREADY_PAUSED))
394 }
395 },
396 None => {
397 Err(String::from(ERR_ABSENT))
398 }
399 }
400 }
401
402 pub fn resume_etale(&mut self, title: &str) -> Result<(), String> {
403 match self.etales.get_mut(title) {
404 Some(etale) => {
405 if etale.paused {
406 zmqe_setsockopt_str(self.subsock, ZMQ_SUBSCRIBE, title);
407 etale.paused = false;
408 Ok(())
409 } else {
410 Err(String::from(ERR_ALREADY_RESUMED))
411 }
412 },
413 None => {
414 Err(String::from(ERR_ABSENT))
415 }
416 }
417 }
418
419 pub fn pause_etales(&mut self) {
420 for (title, etale) in &mut self.etales {
421 if ! etale.paused {
422 zmqe_setsockopt_str(self.subsock, ZMQ_UNSUBSCRIBE, title);
423 etale.paused = true;
424 }
425 }
426 }
427
428 pub fn resume_etales(&mut self) {
429 for (title, etale) in &mut self.etales {
430 if etale.paused {
431 zmqe_setsockopt_str(self.subsock, ZMQ_SUBSCRIBE, title);
432 etale.paused = false;
433 }
434 }
435 }
436
437 fn update(&mut self) {
438 let t = time_musec();
439 while zmqe_getsockopt_events(self.subsock) & ZMQ_POLLIN != 0 {
440 let msg_parts = zmqe_recv(self.subsock);
441 if msg_parts.len() >= 2 {
443 let topic = & msg_parts[0];
445 let l = topic.len();
446 if (l > 0) && (topic[l - 1] == 0) {
447 let title = String::from_utf8(topic[..(l - 1)].to_vec()).unwrap_or_default();
448 if let Some(etale) = self.etales.get_mut(&title) {
449 if ! etale.paused {
450 if msg_parts[1].len() == 8 { etale.parts.clear();
452 etale.parts.extend_from_slice(& msg_parts[2..]);
453 let mut buf = [0u8; 8];
454 buf.copy_from_slice(& msg_parts[1]);
455 etale.t_out = i64::from_le_bytes(buf);
456 etale.t_in = t;
457 }
458 }
459 }
460 }
461 }
462 }
463 }
464
465}
466
467impl Drop for Ehypha {
468 fn drop(&mut self) {
469 unsafe {
470 zmq_close(self.subsock);
471 }
472 }
473}
474
475impl Efunguz {
476
477 pub fn new(secretkey: &str, whitelist_publickeys: & HashSet<String>, pub_port: u16, torproxy_port: u16, torproxy_host: &str) -> Self {
478 let secretkey = cut_pad_key_str(secretkey);
479 let (publickey, _) = zmqe_curve_public(&secretkey);
480
481 let mut cp_whitelist_publickeys = HashSet::new();
482 for k in whitelist_publickeys {
483 cp_whitelist_publickeys.insert(cut_pad_key_str(k));
484 }
485
486 let torproxy_host = String::from(torproxy_host);
487
488 let ehyphae = HashMap::new();
489
490 let context = unsafe {
491 zmq_ctx_new()
492 };
493
494 unsafe {
495 zmq_ctx_set(context, ZMQ_IPV6, DEF_IPV6_STATUS);
496 zmq_ctx_set(context, ZMQ_BLOCKY, 0);
497 }
498
499 let zapsock = unsafe {
501 zmq_socket(context, ZMQ_REP)
502 };
503
504 zmqe_bind(zapsock, "inproc://zeromq.zap.01");
505
506 let mut zap_session_id = vec![0u8; ZAP_SESSION_ID_LEN];
507 getrandom::getrandom(&mut zap_session_id).unwrap_or(());
509 let pubsock = unsafe {
514 zmq_socket(context, ZMQ_PUB)
515 };
516
517 zmqe_setsockopt_int(pubsock, ZMQ_CURVE_SERVER, 1);
518 zmqe_setsockopt_str(pubsock, ZMQ_CURVE_SECRETKEY, &secretkey);
519 zmqe_setsockopt_vec(pubsock, ZMQ_ZAP_DOMAIN, & ZAP_DOMAIN.as_bytes().to_vec()); zmqe_setsockopt_vec(pubsock, ZMQ_ROUTING_ID, & zap_session_id); zmqe_socket_monitor_all(pubsock, "inproc://monitor-pub");
524 let monsock = unsafe {
525 zmq_socket(context, ZMQ_PAIR)
526 };
527 zmqe_connect(monsock, "inproc://monitor-pub");
528
529 zmqe_bind(pubsock, & format!("tcp://*:{}", pub_port));
530
531 let in_accepted_num: u64 = 0;
532 let in_handshake_succeeded_num: u64 = 0;
533 let in_disconnected_num: u64 = 0;
534
535 let t_last_accepted: i64 = -1;
536 let t_last_handshake_succeeded: i64 = -1;
537 let t_last_disconnected: i64 = -1;
538
539 Self {
540 secretkey,
541 publickey,
542 whitelist_publickeys: cp_whitelist_publickeys,
543 torproxy_port,
544 torproxy_host,
545 ehyphae,
546 context,
547 zapsock,
548 zap_session_id,
549 pubsock,
550 monsock,
551 in_accepted_num,
552 in_handshake_succeeded_num,
553 in_disconnected_num,
554 t_last_accepted,
555 t_last_handshake_succeeded,
556 t_last_disconnected
557 }
558 }
559
560 pub fn add_whitelist_publickeys(&mut self, publickeys: & HashSet<String>) {
561 for k in publickeys {
562 self.whitelist_publickeys.insert(cut_pad_key_str(k));
563 }
564 }
565
566 pub fn del_whitelist_publickeys(&mut self, publickeys: & HashSet<String>) {
567 for k in publickeys {
568 self.whitelist_publickeys.remove(& cut_pad_key_str(k));
569 }
570 }
571
572 pub fn clear_whitelist_publickeys(&mut self) {
573 self.whitelist_publickeys.clear();
574 }
575
576 pub fn read_whitelist_publickeys(&mut self, filepath: &str) {
577 for line in fs::read_to_string(filepath).unwrap_or_default().lines() {
578 if line.len() >= KEY_Z85_LEN {
579 let mut cp_line = String::from(line);
580 cp_line.truncate(KEY_Z85_LEN);
581 self.whitelist_publickeys.insert(cp_line);
582 }
583 }
584 }
585
586 pub fn add_ehypha(&mut self, publickey: &str, onion: &str, port: u16) -> Result<&mut Ehypha, String> {
587 let cp_publickey = cut_pad_key_str(publickey);
588 match self.ehyphae.insert(
589 cp_publickey.clone(),
590 Ehypha::new(self.context, & self.secretkey, & self.publickey, &cp_publickey, onion, port, self.torproxy_port, & self.torproxy_host)
591 ) {
592 None => match self.ehyphae.get_mut(&cp_publickey) {
593 Some(eh) => Ok(eh),
594 None => Err(String::from(ERR_ABSENT))
595 },
596 Some(_) => Err(String::from(ERR_ALREADY_PRESENT))
597 }
598 }
599
600 pub fn get_ehypha(&self, publickey: &str) -> Option<&Ehypha> {
601 let cp_publickey = cut_pad_key_str(publickey);
602 self.ehyphae.get(&cp_publickey)
603 }
604
605 pub fn get_mut_ehypha(&mut self, publickey: &str) -> Option<&mut Ehypha> {
606 let cp_publickey = cut_pad_key_str(publickey);
607 self.ehyphae.get_mut(&cp_publickey)
608 }
609
610 pub fn del_ehypha(&mut self, publickey: &str) -> Result<(), String> {
611 let cp_publickey = cut_pad_key_str(publickey);
612 match self.ehyphae.remove(&cp_publickey) {
613 Some(_) => Ok(()),
614 None => Err(String::from(ERR_ALREADY_ABSENT))
615 }
616 }
617
618 pub fn emit_etale(&mut self, title: &str, parts: & Vec<Vec<u8>>) {
619 let mut msg_parts: Vec<Vec<u8>> = Vec::new();
620
621 let mut topic = String::from(title).as_bytes().to_vec();
622 topic.push(0);
623 msg_parts.push(topic);
624
625 let t_out = time_musec();
626 msg_parts.push(t_out.to_le_bytes().to_vec());
627
628 msg_parts.extend_from_slice(parts);
629
630 zmqe_send(self.pubsock, &msg_parts);
631 }
632
633 pub fn update(&mut self) {
634 while zmqe_getsockopt_events(self.zapsock) & ZMQ_POLLIN != 0 {
635 let request = zmqe_recv(self.zapsock);
636 let mut reply: Vec<Vec<u8>> = Vec::new();
637
638 let version = request[0].clone();
639 let sequence = request[1].clone();
640 let identity = request[4].clone();
643 let mechanism = request[5].clone();
644 let mut key_bin = request[6].clone();
645
646 key_bin.truncate(KEY_BIN_LEN);
647 let mut key_bufn = [0u8; KEY_Z85_CSTR_LEN];
648 unsafe {
649 zmq_z85_encode(key_bufn.as_mut_ptr(), key_bin.as_ptr(), KEY_BIN_LEN);
650 }
651 let key_z85 = String::from_utf8(key_bufn[..KEY_Z85_LEN].to_vec()).unwrap_or_default();
652
653 reply.push(version);
654 reply.push(sequence);
655
656 if (identity == self.zap_session_id) && (mechanism == CURVE_MECHANISM_ID.as_bytes().to_vec()) && (self.whitelist_publickeys.is_empty() || self.whitelist_publickeys.contains(&key_z85)) {
657 reply.push("200".as_bytes().to_vec());
660 reply.push("OK".as_bytes().to_vec());
661 reply.push(key_z85.as_bytes().to_vec());
662 reply.push("".as_bytes().to_vec());
663 } else {
664 reply.push("400".as_bytes().to_vec());
666 reply.push("FAILED".as_bytes().to_vec());
667 reply.push("".as_bytes().to_vec());
668 reply.push("".as_bytes().to_vec());
669 }
670
671 zmqe_send(self.zapsock, &reply);
672 }
673
674 for (_, eh) in &mut self.ehyphae {
675 eh.update();
676 }
677
678 let t = time_musec();
679
680 while zmqe_getsockopt_events(self.monsock) & ZMQ_POLLIN != 0 {
681 let event_msg = zmqe_recv(self.monsock);
682 if event_msg.len() > 0 {
683 if event_msg[0].len() >= 2 {
684 let event_num = u16::from_le_bytes([event_msg[0][0], event_msg[0][1]]);
685 if ((event_num as c_int) & ZMQ_EVENT_ACCEPTED) != 0 {
686 self.in_accepted_num += 1;
687 self.t_last_accepted = t;
688 }
689 if ((event_num as c_int) & ZMQ_EVENT_HANDSHAKE_SUCCEEDED) != 0 {
690 self.in_handshake_succeeded_num += 1;
691 self.t_last_handshake_succeeded = t;
692 }
693 if ((event_num as c_int) & ZMQ_EVENT_DISCONNECTED) != 0 {
694 self.in_disconnected_num += 1;
695 self.t_last_disconnected = t;
696 }
697 }
698 }
699
700 }
701 }
702
703 pub fn in_attempted_num(&self) -> u64 {
704 self.in_accepted_num
705 }
706
707 pub fn in_permitted_num(&self) -> u64 {
708 self.in_handshake_succeeded_num
709 }
710
711 pub fn in_absorbing_num(&self) -> u64 { if self.in_accepted_num >= self.in_disconnected_num {
713 self.in_accepted_num - self.in_disconnected_num
714 } else {
715 0
716 }
717 }
718
719 pub fn t_last_attempt(&self) -> i64 {
720 self.t_last_accepted
721 }
722
723 pub fn t_last_permit(&self) -> i64 {
724 self.t_last_handshake_succeeded
725 }
726
727 pub fn t_last_disconnect(&self) -> i64 {
728 self.t_last_disconnected
729 }
730
731}
732
733impl Drop for Efunguz {
734 fn drop(&mut self) {
735 self.ehyphae.clear(); unsafe {
738 zmq_close(self.monsock);
739 zmq_close(self.pubsock);
740 zmq_close(self.zapsock);
741
742 zmq_ctx_shutdown(self.context);
743 while zmq_ctx_term(self.context) == -1 {
744 if zmq_errno() == C_EINTR {
745 continue;
746 } else {
747 break;
748 }
749 }
750 }
751 }
752}