1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//! NNG protocols.  See [Section 7](https://nng.nanomsg.org/man/v1.2.2/index.html#_section_7_protocols_and_transports).

pub mod bus0;
pub mod pair0;
pub mod pair1;
pub mod pub0;
pub mod pull0;
pub mod push0;
pub mod rep0;
pub mod req0;
pub mod sub0;

pub use self::bus0::*;
pub use self::pair0::*;
pub use self::pair1::*;
pub use self::pub0::*;
pub use self::pull0::*;
pub use self::push0::*;
pub use self::rep0::*;
pub use self::req0::*;
pub use self::sub0::*;

use crate::*;
use runng_derive::{NngGetOpts, NngSetOpts};
use runng_sys::*;

/// Type of subscribe half in publish/subscribe pattern.
pub trait Subscribe {
    /// Subscribe to a topic.
    fn subscribe(&self, topic: &[u8]) -> Result<()>;
    /// Subscribe to a topic.
    fn subscribe_str(&self, topic: &str) -> Result<()> {
        self.subscribe(topic.as_bytes())
    }
    /// Unsubscribe from a topic.
    fn unsubscribe(&self, topic: &[u8]) -> Result<()>;
    /// Unsubscribe from a topic.
    fn unsubscribe_str(&self, topic: &str) -> Result<()> {
        self.unsubscribe(topic.as_bytes())
    }
}

fn nng_open<T, O, S>(open_func: O, socket_create_func: S) -> Result<T>
where
    O: Fn(&mut nng_socket) -> i32,
    S: Fn(NngSocket) -> T,
{
    let mut socket = nng_socket::default();
    let res = open_func(&mut socket);
    Error::zero_map(res, || {
        let socket = NngSocket::new(socket);
        socket_create_func(socket)
    })
}

pub(crate) fn subscribe(socket: nng_socket, topic: &[u8]) -> Result<()> {
    unsafe {
        let opt = NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const ::std::os::raw::c_char;
        let topic_ptr = topic.as_ptr() as *const ::std::os::raw::c_void;
        let topic_size = std::mem::size_of_val(topic);
        let res = nng_socket_set(socket, opt, topic_ptr, topic_size);
        nng_int_to_result(res)
    }
}

pub(crate) fn unsubscribe(socket: nng_socket, topic: &[u8]) -> Result<()> {
    unsafe {
        let opt = NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const ::std::os::raw::c_char;
        let topic_ptr = topic.as_ptr() as *const ::std::os::raw::c_void;
        let topic_size = std::mem::size_of_val(topic);
        let res = nng_socket_set(socket, opt, topic_ptr, topic_size);
        nng_int_to_result(res)
    }
}