#![cfg(target_os = "linux")]
#![doc(html_root_url = "https://docs.rs/tipc/0.1.2")]
use std::os::raw::{c_int, c_void};
mod bindings;
mod cmd;
mod error;
mod group;
use bindings::*;
pub use cmd::{attach_to_interface, set_host_addr};
pub use error::TipcError;
pub use group::{GroupMessage, Membership};
pub const MAX_MSG_SIZE: usize = TIPC_MAX_USER_MSG_SIZE as usize;
#[derive(Debug, Clone, Copy)]
pub enum TipcScope {
Cluster = 2,
Node = 3,
}
#[derive(Clone, Copy)]
pub enum SockType {
SockStream = __socket_type_SOCK_STREAM as isize,
SockDgram = __socket_type_SOCK_DGRAM as isize,
SockSeqpacket = __socket_type_SOCK_SEQPACKET as isize,
SockRdm = __socket_type_SOCK_RDM as isize,
}
type TipcResult<T> = Result<T, TipcError>;
#[derive(Debug)]
pub struct TipcConn {
socket: c_int,
socket_ref: u32,
node_ref: u32,
buf: Vec<u8>,
in_group: bool,
}
impl Drop for TipcConn {
fn drop(&mut self) {
self.close();
}
}
impl TipcConn {
pub fn socket_ref(&self) -> u32 {
self.socket_ref
}
pub fn node_ref(&self) -> u32 {
self.node_ref
}
fn close(&self) {
unsafe { tipc_close(self.socket) };
}
pub fn new(socktype: SockType) -> TipcResult<Self> {
let socket = unsafe { tipc_socket(socktype as i32) };
if socket < 0 {
return Err(TipcError::new("Unable to initialize socket"));
}
let (socket_ref, node_ref) = socket_and_node_refs(socket)?;
Ok(Self {
socket,
socket_ref,
node_ref,
buf: [0; MAX_MSG_SIZE].to_vec(),
in_group: false,
})
}
pub fn set_sock_non_block(&mut self) -> TipcResult<()> {
self.socket = unsafe { tipc_sock_non_block(self.socket) };
Ok(())
}
pub fn connect(
&self,
service_type: u32,
service_instance: u32,
node: u32,
scope: TipcScope,
) -> TipcResult<()> {
let addr = tipc_addr {
type_: service_type,
instance: service_instance,
node,
scope: scope as u32,
};
let r = unsafe { tipc_connect(self.socket, &addr) };
if r < 0 {
return Err(TipcError::new("Error connecting socket"));
}
Ok(())
}
pub fn listen(&self, backlog: i32) -> TipcResult<()> {
let r = unsafe { tipc_listen(self.socket, backlog) };
if r < 0 {
return Err(TipcError::new("Unable to listen for new connections"));
}
Ok(())
}
pub fn accept(&self) -> TipcResult<Self> {
let mut addr = tipc_addr {
type_: 0,
instance: 0,
node: 0,
scope: 0,
};
let socket = unsafe { tipc_accept(self.socket, &mut addr) };
if socket < 0 {
return Err(TipcError::new("Error accepting a connection"));
}
let (socket_ref, node_ref) = socket_and_node_refs(socket)?;
Ok(Self {
socket,
socket_ref,
node_ref,
buf: [0; MAX_MSG_SIZE].to_vec(),
in_group: self.in_group,
})
}
pub fn send(&self, data: &[u8]) -> TipcResult<i32> {
let r = unsafe {
tipc_send(
self.socket,
data.as_ptr() as *const c_void,
data.len() as u64,
)
};
if r < 0 {
return Err(TipcError::new("Send error"));
}
Ok(r)
}
pub fn broadcast(&self, data: &[u8]) -> TipcResult<i32> {
self.send(data)
}
pub fn anycast(
&self,
data: &[u8],
service_type: u32,
service_instance: u32,
scope: TipcScope,
) -> TipcResult<i32> {
let addr = tipc_addr {
type_: service_type,
instance: service_instance,
node: 0,
scope: scope as u32,
};
let bytes_sent = self.send_to(data, &addr);
if bytes_sent < 0 {
return Err(TipcError::new("Anycast error"));
}
Ok(bytes_sent)
}
pub fn unicast(
&self,
data: &[u8],
socket_ref: u32,
node_ref: u32,
scope: TipcScope,
) -> TipcResult<i32> {
let addr = tipc_addr {
type_: 0,
instance: socket_ref,
node: node_ref,
scope: scope as u32,
};
let bytes_sent = self.send_to(data, &addr);
if bytes_sent < 0 {
return Err(TipcError::new("Unicast error"));
}
Ok(bytes_sent)
}
pub fn multicast(
&self,
data: &[u8],
service_type: u32,
lower: u32,
upper: u32,
scope: TipcScope,
) -> TipcResult<i32> {
let node = if self.in_group { lower } else { upper };
let addr = tipc_addr {
type_: service_type,
instance: lower,
node,
scope: scope as u32,
};
let bytes_sent = unsafe {
tipc_mcast(
self.socket,
data.as_ptr() as *const c_void,
data.len() as size_t,
&addr,
)
};
if bytes_sent < 0 {
return Err(TipcError::new("Multicast error"));
}
Ok(bytes_sent)
}
pub fn join(&mut self, group_id: u32, member_id: u32, scope: TipcScope) -> TipcResult<()> {
let mut addr = tipc_addr {
type_: group_id,
instance: member_id,
node: 0,
scope: scope as u32,
};
let r = unsafe { tipc_join(self.socket, &mut addr, true, false) };
if r < 0 {
return Err(TipcError::new("Unable to join group"));
}
self.in_group = true;
Ok(())
}
pub fn leave(&mut self) -> TipcResult<()> {
let r = unsafe { tipc_leave(self.socket) };
if r < 0 {
return Err(TipcError::new("Leave error"));
}
self.in_group = false;
Ok(())
}
pub fn bind(
&self,
service_type: u32,
lower: u32,
upper: u32,
scope: TipcScope,
) -> TipcResult<()> {
let r = unsafe { tipc_bind(self.socket, service_type, lower, upper, scope as u32) };
if r < 0 {
return Err(TipcError::new("Error binding to socket address"));
}
Ok(())
}
pub fn recv(&self, buf: &mut [u8; MAX_MSG_SIZE]) -> TipcResult<i32> {
let msg_size = unsafe {
tipc_recv(
self.socket,
buf.as_ptr() as *mut c_void,
MAX_MSG_SIZE as size_t,
false,
)
};
if msg_size < 0 {
return Err(TipcError::new("Receive error"));
}
Ok(msg_size)
}
pub fn recvfrom(&self) -> TipcResult<GroupMessage> {
let mut socket_addr = tipc_addr {
type_: 0,
instance: 0,
node: 0,
scope: 0,
};
let mut member_addr = tipc_addr {
type_: 0,
instance: 0,
node: 0,
scope: 0,
};
let mut err = 0;
let msg_size = unsafe {
tipc_recvfrom(
self.socket,
self.buf.as_ptr() as *mut c_void,
self.buf.len() as u64,
&mut socket_addr,
&mut member_addr,
&mut err,
)
};
if msg_size < 0 {
return Err(TipcError::new("recvfrom error"));
}
let msg = if msg_size == 0 {
GroupMessage::MemberEvent(Membership {
socket_ref: socket_addr.instance,
node_ref: socket_addr.node,
service_address: member_addr.type_,
service_instance: member_addr.instance,
joined: if err == 0 { true } else { false },
})
} else {
let data = self.buf[0..msg_size as usize].to_vec();
GroupMessage::DataEvent(data)
};
Ok(msg)
}
fn send_to(&self, data: &[u8], addr: &tipc_addr) -> c_int {
unsafe {
tipc_sendto(
self.socket,
data.as_ptr() as *const c_void,
data.len() as size_t,
addr,
)
}
}
}
fn socket_and_node_refs(socket: c_int) -> TipcResult<(u32, u32)> {
let mut addr = tipc_addr {
type_: 0,
instance: 0,
node: 0,
scope: 0,
};
let r = unsafe { tipc_sockaddr(socket, &mut addr) };
if r < 0 {
return Err(TipcError::new("Unable to determine socket and node refs"));
}
Ok((addr.instance, addr.node))
}