iris/
lib.rs

1mod tipc_include;
2mod iris_error_wrapper;
3
4use tipc_include::*;
5use std::os::raw::{c_int, c_void, c_uint};
6use iris_error_wrapper::IrisErrorWrapper;
7use std::result::{Result};
8use std::vec::Vec;
9use std::fmt;
10
11pub type IrisResult<T> = Result<T, IrisErrorWrapper>;
12
13pub const TOPO_SERVICE: u32 = TIPC_TOP_SRV;
14
15pub enum SockType
16{
17    Stream = 1,
18    Datagram,
19    Raw,
20    Rdm,
21    Sequential,
22}
23
24pub enum ScopeType
25{
26    Cluster = 2,
27    Node,
28}
29
30pub struct IrisAddressType
31{
32    pub addr_type: u32,
33    pub instance: u32,
34    pub node: u32,
35}
36
37impl IrisAddressType
38{
39    pub fn new() -> Self
40    {
41        IrisAddressType{addr_type: 0, instance: 0, node: 0}
42    }
43}
44
45impl fmt::Display for IrisAddressType
46{
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
48    {
49        write!(f, "({}, {}, {})", self.addr_type, self.instance, self.node)
50    }
51}
52
53pub enum SockOption
54{
55    Standard,
56    NonBlocking
57}
58
59pub struct Subscription
60{
61    pub service: u32,
62    pub instance: u32,
63    pub timeout: i32
64}
65
66pub struct Connection
67{
68    socket: c_int,
69    own_node: c_uint,
70}
71
72impl Connection
73{
74    pub fn new(sock_type: SockType) -> Connection
75    {
76        let sock = unsafe { tipc_socket(sock_type as i32) };
77        let own_node = unsafe { tipc_own_node() };
78        Connection{socket: sock, own_node: own_node}
79    }
80
81    pub fn get_socket(&self) -> i32
82    {
83        self.socket
84    }
85
86    fn get_node_for_scope(&self, scope: ScopeType) -> u32
87    {
88        match scope
89        {
90            ScopeType::Cluster => 0,
91            ScopeType::Node => self.own_node,
92        }
93    }
94
95    pub fn bind(&self, service_type: u32, lower: u32, upper: u32, scope: ScopeType) -> IrisResult<()>
96    {
97        let node = self.get_node_for_scope(scope);
98
99        let res = unsafe { tipc_bind(self.socket, service_type, lower, upper, node) };
100
101        match res
102        {
103            0 => Ok(()),
104            -1 => Err(IrisErrorWrapper::new_with_code("Bind failed due to scope error!", -1)),
105            _ => Err(IrisErrorWrapper::new("Bind failed!")),
106        }
107    }
108
109    pub fn recv_from(&self, sock: &mut IrisAddressType, member: &mut IrisAddressType) -> IrisResult<Vec<u8>>
110    {
111        let buf: [u8; TIPC_MAX_USER_MSG_SIZE as usize] = [0; TIPC_MAX_USER_MSG_SIZE as usize];
112        let mut sock_id = tipc_addr { type_ : 0, instance: 0, node: 0 };
113        let mut client_id = tipc_addr { type_ : 0, instance: 0, node: 0 };
114        let mut err: i32 = 0;
115
116        let res = unsafe { tipc_recvfrom(self.socket,
117                                         buf.as_ptr() as *mut c_void,
118                                         TIPC_MAX_USER_MSG_SIZE as u64,
119                                         &mut sock_id,
120                                         &mut client_id,
121                                         &mut err) };
122        match res
123        {
124            val if val >= 0 => {
125                // store socket information
126                sock.addr_type = sock_id.type_;
127                sock.instance = sock_id.instance;
128                sock.node = sock_id.node;
129
130                //store member id
131                member.addr_type = client_id.type_;
132                member.instance = client_id.instance;
133                member.node = client_id.node;
134
135                Ok(buf[0..res as usize].to_vec())
136            },
137            _ => Err(IrisErrorWrapper::new("No data was received!")),
138        }
139    }
140
141    pub fn recv(&self) -> IrisResult<Vec<u8>>
142    {
143        let buf: [u8; TIPC_MAX_USER_MSG_SIZE as usize] = [0; TIPC_MAX_USER_MSG_SIZE as usize];
144        let res = unsafe { tipc_recv(self.socket,
145                                     buf.as_ptr() as *mut c_void,
146                                     TIPC_MAX_USER_MSG_SIZE as u64,
147                                     true) };
148        match res
149        {
150            val if val > 0 => Ok(buf[0..res as usize].to_vec()),
151            0 => Err(IrisErrorWrapper::new("No data was received! Could be a event!")),
152            _ => Err(IrisErrorWrapper::new("Error during recv!")),
153        }
154    }
155
156    pub fn connect(&self, service_type: u32, instance: u32, scope: ScopeType) -> IrisResult<()>
157    {
158        let node = self.get_node_for_scope(scope);
159        let addr = tipc_addr { type_: service_type, instance: instance, node: node };
160
161        let res = unsafe { tipc_connect(self.socket, &addr) };
162
163        match res
164        {
165            0 => Ok(()),
166            _ => Err(IrisErrorWrapper::new("Connect failed!")),
167        }
168    }
169
170    fn get_sock_type(sock_type: u32) -> SockType
171    {
172        match sock_type
173        {
174            1 => SockType::Stream,
175            2 => SockType::Datagram,
176            3 => SockType::Raw,
177            4 => SockType::Rdm,
178            5 => SockType::Sequential,
179            _ => SockType::Rdm,
180        }
181    }
182
183    pub fn accept(&self) -> IrisResult<IrisAddressType>
184    {
185        let mut addr = tipc_addr { type_: 0, instance: 0, node: 0 };
186        let res = unsafe { tipc_accept(self.socket, &mut addr) };
187
188        match res
189        {
190            0 => Ok(IrisAddressType{ addr_type: addr.type_, instance: addr.instance, node: addr.node }),
191            _ => Err(IrisErrorWrapper::new("Accept failed!")),
192        }
193    }
194
195    pub fn listen(&self) -> IrisResult<()>
196    {
197        let res = unsafe{ tipc_listen(self.socket, 0) };
198
199        match res{
200            0 => Ok(()),
201            _ => Err(IrisErrorWrapper::new("Listen failed!")),
202        }
203    }
204
205    pub fn send(&self, buf: &Vec<u8>) -> IrisResult<i32>
206    {
207        let res = unsafe { tipc_send(self.socket, buf.as_ptr() as *const c_void, buf.len() as u64) };
208
209        match res
210        {
211            _ if buf.len() == (res as usize) => Ok(res),
212            _ if buf.len() > (res as usize) => Err(IrisErrorWrapper::new(format!("Data only send partially: {}", res).as_str())),
213            _ => Err(IrisErrorWrapper::new("Send failed!")),
214        }
215    }
216
217    pub fn sendto(&self, buf: &Vec<u8>, addr: &IrisAddressType) -> IrisResult<i32>
218    {
219        let dest = Connection::make_tipc_addr(addr);
220        let res = unsafe { tipc_sendto(self.socket, buf.as_ptr() as *const c_void, buf.len() as u64, &dest) };
221
222        match res
223        {
224            _ if buf.len() == (res as usize) => Ok(res),
225            _ if buf.len() > (res as usize) => Err(IrisErrorWrapper::new(format!("Data only send partially: {}", res).as_str())),
226            _ => Err(IrisErrorWrapper::new("Sendto failed!")),
227        }
228    }
229
230    pub fn multicast(&self, buf: &Vec<u8>, addr: &IrisAddressType) -> IrisResult<i32>
231    {
232        let dest = Connection::make_tipc_addr(addr);
233        let res = unsafe { tipc_mcast(self.socket, buf.as_ptr() as *const c_void, buf.len() as u64, &dest) };
234
235        match res
236        {
237            val if buf.len() == (val as usize) => Ok(val),
238            val if buf.len() > (val as usize) => Err(IrisErrorWrapper::new(format!("Data only send partially: {}", val).as_str())),
239            _ => Err(IrisErrorWrapper::new("Multicast failed!")),
240        }
241    }
242
243    pub fn subscribe(&self, subscription: &Subscription) -> IrisResult<()>
244    {
245        let res = unsafe { tipc_srv_subscr(self.socket,
246                                           subscription.service,
247                                           subscription.instance,
248                                           subscription.instance,
249                                           true,
250                                           subscription.timeout) };
251        
252        match res
253        {
254            0 => Ok(()),
255            _ => Err(IrisErrorWrapper::new("Subscription failed!")),
256        }
257    }
258
259    pub fn wait_for_service(srv: &Subscription) -> IrisResult<bool>
260    {
261        let sub = tipc_addr{ type_: srv.service,
262                             instance: srv.instance,
263                             node: 0 };
264        let res = unsafe { tipc_srv_wait(&sub, srv.timeout) };
265
266        match res
267        {
268            true => Ok(res),
269            false => Err(IrisErrorWrapper::new("Service not available in time")),
270        }
271    }
272
273    pub fn make_non_blocking(&self) -> IrisResult<()>
274    {
275        let res = unsafe { tipc_sock_non_block(self.socket)};
276        
277        match res
278        {
279            val if val == self.socket => Ok(()),
280            _ => Err(IrisErrorWrapper::new("Making sockiet non-blocking failed!")),
281        }
282    }
283
284    fn make_tipc_addr(addr: &IrisAddressType) -> tipc_addr
285    {
286        tipc_addr{type_: addr.addr_type, instance: addr.instance, node: addr.node}
287    }
288
289    pub fn join(&self, addr: &IrisAddressType, events: bool, loopback: bool) -> IrisResult<()>
290    {
291        let mut group = Connection::make_tipc_addr(addr);
292        let res = unsafe{tipc_join(self.socket, &mut group, events, loopback)};
293
294        match res
295        {
296            0 => Ok(()),
297            _ => Err(IrisErrorWrapper::new("Group join failed!")),
298        }
299    }
300
301    pub fn leave(&self) -> IrisResult<()>
302    {
303        let res = unsafe{tipc_leave(self.socket)};
304
305        match res
306        {
307            0 => Ok(()),
308            _ => Err(IrisErrorWrapper::new("Group leave failed!")),
309        }
310    }
311}