1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
pub mod bus0;
pub mod pair0;
pub mod pair1;
pub mod pub0;
pub mod pull0;
pub mod push0;
pub mod rep0;
pub mod req0;
pub mod sub0;
pub use self::bus0::*;
pub use self::pair0::*;
pub use self::pair1::*;
pub use self::pub0::*;
pub use self::pull0::*;
pub use self::push0::*;
pub use self::rep0::*;
pub use self::req0::*;
pub use self::sub0::*;
use crate::*;
use runng_derive::{NngGetOpts, NngSetOpts};
use runng_sys::*;
pub trait Subscribe {
fn subscribe(&self, topic: &[u8]) -> Result<()>;
fn subscribe_str(&self, topic: &str) -> Result<()> {
self.subscribe(topic.as_bytes())
}
fn unsubscribe(&self, topic: &[u8]) -> Result<()>;
fn unsubscribe_str(&self, topic: &str) -> Result<()> {
self.unsubscribe(topic.as_bytes())
}
}
fn nng_open<T, O, S>(open_func: O, socket_create_func: S) -> Result<T>
where
O: Fn(&mut nng_socket) -> i32,
S: Fn(NngSocket) -> T,
{
let mut socket = nng_socket::default();
let res = open_func(&mut socket);
Error::zero_map(res, || {
let socket = NngSocket::new(socket);
socket_create_func(socket)
})
}
pub(crate) fn subscribe(socket: nng_socket, topic: &[u8]) -> Result<()> {
unsafe {
let opt = NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const ::std::os::raw::c_char;
let topic_ptr = topic.as_ptr() as *const ::std::os::raw::c_void;
let topic_size = std::mem::size_of_val(topic);
let res = nng_socket_set(socket, opt, topic_ptr, topic_size);
nng_int_to_result(res)
}
}
pub(crate) fn unsubscribe(socket: nng_socket, topic: &[u8]) -> Result<()> {
unsafe {
let opt = NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const ::std::os::raw::c_char;
let topic_ptr = topic.as_ptr() as *const ::std::os::raw::c_void;
let topic_size = std::mem::size_of_val(topic);
let res = nng_socket_set(socket, opt, topic_ptr, topic_size);
nng_int_to_result(res)
}
}