use crate::common::{unixnano_to_ms, sockaddr_to_rust, SockAddrIn, VmaOptions};
use std::ffi::{c_void, CString};
use std::mem;
use std::net::SocketAddr;
use std::os::raw::{c_char, c_int, c_ulonglong};
extern "C" {
fn tcp_socket_init(socket: *mut TcpSocket, options: *const VmaOptions) -> c_int;
fn tcp_socket_close(socket: *mut TcpSocket) -> c_int;
fn tcp_socket_bind(socket: *mut TcpSocket, ip: *const c_char, port: u16) -> c_int;
fn tcp_socket_listen(socket: *mut TcpSocket, backlog: c_int) -> c_int;
fn tcp_socket_accept(socket: *mut TcpSocket, client: *mut TcpClient, timeout_ms: c_int) -> c_int;
fn tcp_socket_connect(socket: *mut TcpSocket, ip: *const c_char, port: u16, timeout_ms: c_int) -> c_int;
fn tcp_socket_reconnect(socket: *mut TcpSocket, timeout_ms: c_int) -> c_int;
fn tcp_socket_is_connected(socket: *mut TcpSocket) -> bool;
fn tcp_socket_send(socket: *mut TcpSocket, data: *const c_void, length: usize, bytes_sent: *mut usize) -> c_int;
fn tcp_socket_send_to_client(client: *mut TcpClient, data: *const c_void, length: usize, bytes_sent: *mut usize) -> c_int;
fn tcp_socket_recv(
socket: *mut TcpSocket,
buffer: *mut c_void,
buffer_size: usize,
timeout_ms: c_int,
bytes_received: *mut usize,
) -> c_int;
fn tcp_socket_recv_from_client(
client: *mut TcpClient,
buffer: *mut c_void,
buffer_size: usize,
timeout_ms: c_int,
bytes_received: *mut usize,
) -> c_int;
fn tcp_socket_close_client(client: *mut TcpClient) -> c_int;
fn tcp_socket_get_stats(
socket: *mut TcpSocket,
rx_packets: *mut c_ulonglong,
tx_packets: *mut c_ulonglong,
rx_bytes: *mut c_ulonglong,
tx_bytes: *mut c_ulonglong,
) -> c_int;
}
#[repr(C)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum TcpConnectionState {
Disconnected = 0,
Connecting = 1,
Connected = 2,
Listening = 3,
}
#[repr(C)]
#[derive(Debug, Clone)]
pub struct TcpSocket {
pub socket_fd: c_int,
pub vma_options: VmaOptions,
pub local_addr: SockAddrIn,
pub remote_addr: SockAddrIn,
pub is_bound: bool,
pub state: TcpConnectionState,
pub rx_packets: c_ulonglong,
pub tx_packets: c_ulonglong,
pub rx_bytes: c_ulonglong,
pub tx_bytes: c_ulonglong,
pub backlog: c_int,
}
#[repr(C)]
#[derive(Debug, Clone)]
pub struct TcpClient {
pub socket_fd: c_int,
pub addr: SockAddrIn,
pub rx_bytes: c_ulonglong,
pub tx_bytes: c_ulonglong,
}
#[repr(C)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum TcpResult {
TcpSuccess = 0,
TcpErrorSocketCreate = -1,
TcpErrorSocketOption = -2,
TcpErrorBind = -3,
TcpErrorListen = -4,
TcpErrorAccept = -5,
TcpErrorConnect = -6,
TcpErrorReconnect = -7,
TcpErrorSend = -8,
TcpErrorRecv = -9,
TcpErrorTimeout = -10,
TcpErrorInvalidParam = -11,
TcpErrorNotInitialized = -12,
TcpErrorClosed = -13,
TcpErrorWouldBlock = -14,
TcpErrorAlreadyConnected = -15,
}
use std::io::{Error, ErrorKind};
impl From<TcpResult> for std::io::Error {
fn from(tcp_result: TcpResult) -> Self {
match tcp_result {
TcpResult::TcpSuccess => Error::new(ErrorKind::Other, "Unexpected success"),
TcpResult::TcpErrorSocketCreate => Error::new(ErrorKind::ConnectionRefused, "Socket creation failed"),
TcpResult::TcpErrorSocketOption => Error::new(ErrorKind::InvalidInput, "Socket option error"),
TcpResult::TcpErrorBind => Error::new(ErrorKind::AddrInUse, "Bind failed"),
TcpResult::TcpErrorListen => Error::new(ErrorKind::ConnectionRefused, "Listen failed"),
TcpResult::TcpErrorAccept => Error::new(ErrorKind::ConnectionRefused, "Accept failed"),
TcpResult::TcpErrorConnect => Error::new(ErrorKind::ConnectionRefused, "Connect failed"),
TcpResult::TcpErrorReconnect => Error::new(ErrorKind::ConnectionRefused, "Reconnect failed"),
TcpResult::TcpErrorSend => Error::new(ErrorKind::BrokenPipe, "Send failed"),
TcpResult::TcpErrorRecv => Error::new(ErrorKind::ConnectionReset, "Receive failed"),
TcpResult::TcpErrorTimeout => Error::new(ErrorKind::TimedOut, "Operation timed out"),
TcpResult::TcpErrorInvalidParam => Error::new(ErrorKind::InvalidInput, "Invalid parameter"),
TcpResult::TcpErrorNotInitialized => Error::new(ErrorKind::NotConnected, "Not initialized"),
TcpResult::TcpErrorClosed => Error::new(ErrorKind::ConnectionAborted, "Connection closed"),
TcpResult::TcpErrorWouldBlock => Error::new(ErrorKind::WouldBlock, "Would block"),
TcpResult::TcpErrorAlreadyConnected => Error::new(ErrorKind::AlreadyExists, "Already connected"),
}
}
}
#[derive(Debug, Clone)]
pub struct Client {
inner: TcpClient,
pub address: SocketAddr,
}
impl Client {
fn new(client: TcpClient) -> Self {
let address = sockaddr_to_rust(&client.addr);
Client {
inner: client,
address,
}
}
pub fn send(&mut self, data: &[u8]) -> Result<usize, TcpResult> {
let mut bytes_sent: usize = 0;
let result = unsafe {
tcp_socket_send_to_client(
&mut self.inner,
data.as_ptr() as *const c_void,
data.len(),
&mut bytes_sent,
)
};
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(bytes_sent)
}
pub fn recv(&mut self, buffer: &mut [u8], timeout_nano: Option<u64>) -> Result<usize, TcpResult> {
let mut bytes_received: usize = 0;
let timeout_ms = unixnano_to_ms(timeout_nano);
let result = unsafe {
tcp_socket_recv_from_client(
&mut self.inner,
buffer.as_mut_ptr() as *mut c_void,
buffer.len(),
timeout_ms,
&mut bytes_received,
)
};
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(bytes_received)
}
pub fn close(&mut self) -> Result<(), TcpResult> {
let result = unsafe { tcp_socket_close_client(&mut self.inner) };
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(())
}
}
impl Drop for Client {
fn drop(&mut self) {
if self.inner.socket_fd >= 0 {
unsafe {
tcp_socket_close_client(&mut self.inner);
}
}
}
}
#[derive(Debug, Clone)]
pub struct TcpSocketWrapper {
socket: TcpSocket,
}
impl TcpSocketWrapper {
pub fn new(options: Option<VmaOptions>) -> Result<Self, TcpResult> {
let mut socket = unsafe { mem::zeroed::<TcpSocket>() };
let c_options = options.unwrap_or_default();
let result = unsafe {
println!("Initializing TCP socket with options: use_socketxtreme={}, optimize_for_latency={}, ring_count={}",
c_options.use_socketxtreme, c_options.optimize_for_latency, c_options.ring_count);
tcp_socket_init(&mut socket, &c_options)
};
if result != TcpResult::TcpSuccess as i32 {
println!("TCP socket initialization failed with code: {}", result);
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(TcpSocketWrapper { socket })
}
pub fn bind<A: Into<String>>(&mut self, addr: A, port: u16) -> Result<(), TcpResult> {
let c_addr = CString::new(addr.into()).unwrap();
let result = unsafe { tcp_socket_bind(&mut self.socket, c_addr.as_ptr(), port) };
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(())
}
pub fn listen(&mut self, backlog: i32) -> Result<(), TcpResult> {
let result = unsafe { tcp_socket_listen(&mut self.socket, backlog) };
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(())
}
pub fn accept(&mut self, timeout_nano: Option<u64>) -> Result<Client, TcpResult> {
let mut client = unsafe { mem::zeroed::<TcpClient>() };
let timeout_ms = unixnano_to_ms(timeout_nano);
let result = unsafe { tcp_socket_accept(&mut self.socket, &mut client, timeout_ms) };
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(Client::new(client))
}
pub fn connect<A: Into<String>>(&mut self, addr: A, port: u16, timeout_nano: Option<u64>) -> Result<(), TcpResult> {
let c_addr = CString::new(addr.into()).unwrap();
let timeout_ms = unixnano_to_ms(timeout_nano);
let result = unsafe { tcp_socket_connect(&mut self.socket, c_addr.as_ptr(), port, timeout_ms) };
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(())
}
pub fn reconnect(&mut self, timeout: Option<u64>) -> Result<(), TcpResult> {
let timeout_ms = unixnano_to_ms(timeout);
let result = unsafe { tcp_socket_reconnect(&mut self.socket, timeout_ms) };
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(())
}
pub fn is_connected(&mut self) -> bool {
unsafe { tcp_socket_is_connected(&mut self.socket) }
}
pub fn send(&mut self, data: &[u8]) -> Result<usize, TcpResult> {
let mut bytes_sent: usize = 0;
let result = unsafe {
tcp_socket_send(
&mut self.socket,
data.as_ptr() as *const c_void,
data.len(),
&mut bytes_sent,
)
};
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(bytes_sent)
}
pub fn recv(&mut self, buffer: &mut [u8], timeout_nano: Option<u64>) -> Result<usize, TcpResult> {
let mut bytes_received: usize = 0;
let timeout_ms = unixnano_to_ms(timeout_nano);
let result = unsafe {
tcp_socket_recv(
&mut self.socket,
buffer.as_mut_ptr() as *mut c_void,
buffer.len(),
timeout_ms,
&mut bytes_received,
)
};
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok(bytes_received)
}
pub fn get_stats(&mut self) -> Result<(u64, u64, u64, u64), TcpResult> {
let mut rx_packets: c_ulonglong = 0;
let mut tx_packets: c_ulonglong = 0;
let mut rx_bytes: c_ulonglong = 0;
let mut tx_bytes: c_ulonglong = 0;
let result = unsafe {
tcp_socket_get_stats(
&mut self.socket as *mut _,
&mut rx_packets,
&mut tx_packets,
&mut rx_bytes,
&mut tx_bytes,
)
};
if result != TcpResult::TcpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, TcpResult>(result) });
}
Ok((rx_packets, tx_packets, rx_bytes, tx_bytes))
}
}
impl Drop for TcpSocketWrapper {
fn drop(&mut self) {
unsafe {
tcp_socket_close(&mut self.socket);
}
}
}
pub struct VmaTcpSocket {
inner: TcpSocketWrapper,
}
impl VmaTcpSocket {
pub fn new() -> Result<Self, std::io::Error> {
TcpSocketWrapper::new(None)
.map(|inner| VmaTcpSocket { inner })
.map_err(|e| e.into())
}
pub fn with_options(options: VmaOptions) -> Result<Self, std::io::Error> {
TcpSocketWrapper::new(Some(options))
.map(|inner| VmaTcpSocket { inner })
.map_err(|e| e.into())
}
pub fn bind<A: Into<String>>(&mut self, addr: A, port: u16) -> Result<(), std::io::Error> {
self.inner
.bind(addr, port)
.map_err(|e| e.into())
}
pub fn listen(&mut self, backlog: i32) -> Result<(), std::io::Error> {
self.inner
.listen(backlog)
.map_err(|e| e.into())
}
pub fn accept(&mut self, timeout_nano: Option<u64>) -> Result<Option<Client>, std::io::Error> {
match self.inner.accept(timeout_nano) {
Ok(client) => Ok(Some(client)),
Err(TcpResult::TcpErrorTimeout) => Ok(None), Err(e) => Err(e.into()),
}
}
pub fn connect<A: Into<String>>(&mut self, addr: A, port: u16, timeout: Option<u64>) -> Result<bool, std::io::Error> {
match self.inner.connect(addr, port, timeout) {
Ok(_) => Ok(true),
Err(TcpResult::TcpErrorTimeout) => Ok(false), Err(e) => Err(e.into()),
}
}
pub fn try_reconnect(&mut self, timeout: Option<u64>) -> Result<bool, std::io::Error> {
match self.inner.reconnect(timeout) {
Ok(_) => Ok(true),
Err(TcpResult::TcpErrorTimeout) => Ok(false), Err(TcpResult::TcpErrorReconnect) => Ok(false), Err(e) => Err(e.into()),
}
}
pub fn is_connected(&mut self) -> bool {
self.inner.is_connected()
}
pub fn send(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
match self.inner.send(data) {
Ok(bytes) => Ok(bytes),
Err(TcpResult::TcpErrorWouldBlock) => Ok(0), Err(e) => Err(e.into()),
}
}
pub fn recv(&mut self, buffer: &mut [u8], timeout: Option<u64>) -> Result<usize, std::io::Error> {
match self.inner.recv(buffer, timeout) {
Ok(bytes) => Ok(bytes),
Err(TcpResult::TcpErrorTimeout) => Ok(0), Err(TcpResult::TcpErrorClosed) => Ok(0), Err(e) => Err(e.into()),
}
}
pub fn get_stats(&mut self) -> Result<(u64, u64, u64, u64), std::io::Error> {
self.inner.get_stats()
.map_err(|e| e.into())
}
}