use std::ffi::{c_void, CString};
use std::mem;
use std::net::SocketAddr;
use std::os::raw::{c_char, c_int, c_ulonglong};
use crate::common::{SockAddrIn, VmaOptions, unixnano_to_ms, sockaddr_to_rust};
#[repr(C)]
#[derive(Debug, Clone)]
pub struct UdpSocket {
pub socket_fd: c_int,
pub vma_options: VmaOptions,
pub local_addr: SockAddrIn,
pub remote_addr: SockAddrIn,
pub is_bound: bool,
pub is_connected: bool,
pub rx_packets: c_ulonglong,
pub tx_packets: c_ulonglong,
pub rx_bytes: c_ulonglong,
pub tx_bytes: c_ulonglong,
}
#[repr(C)]
#[derive(Debug, Clone)]
pub struct UdpPacket {
pub data: *mut c_void,
pub length: usize,
pub src_addr: SockAddrIn,
pub timestamp: c_ulonglong,
}
#[repr(C)]
#[derive(Debug, PartialEq, Eq)]
pub enum UdpResult {
UdpSuccess = 0,
UdpErrorSocketCreate = -1,
UdpErrorSocketOption = -2,
UdpErrorBind = -3,
UdpErrorConnect = -4,
UdpErrorSend = -5,
UdpErrorRecv = -6,
UdpErrorTimeout = -7,
UdpErrorInvalidParam = -8,
UdpErrorNotInitialized = -9,
UdpErrorClosed = -10,
}
use std::io::{Error, ErrorKind};
impl From<UdpResult> for std::io::Error {
fn from(udp_result: UdpResult) -> Self {
match udp_result {
UdpResult::UdpSuccess => Error::new(ErrorKind::Other, "Unexpected success"),
UdpResult::UdpErrorSocketCreate => Error::new(ErrorKind::ConnectionRefused, "Socket creation failed"),
UdpResult::UdpErrorSocketOption => Error::new(ErrorKind::InvalidInput, "Socket option error"),
UdpResult::UdpErrorBind => Error::new(ErrorKind::AddrInUse, "Bind failed"),
UdpResult::UdpErrorConnect => Error::new(ErrorKind::ConnectionRefused, "Connect failed"),
UdpResult::UdpErrorSend => Error::new(ErrorKind::BrokenPipe, "Send failed"),
UdpResult::UdpErrorRecv => Error::new(ErrorKind::ConnectionReset, "Receive failed"),
UdpResult::UdpErrorTimeout => Error::new(ErrorKind::TimedOut, "Operation timed out"),
UdpResult::UdpErrorInvalidParam => Error::new(ErrorKind::InvalidInput, "Invalid parameter"),
UdpResult::UdpErrorNotInitialized => Error::new(ErrorKind::NotConnected, "Not initialized"),
UdpResult::UdpErrorClosed => Error::new(ErrorKind::ConnectionAborted, "Socket closed"),
}
}
}
extern "C" {
fn udp_socket_init(socket: *mut UdpSocket, options: *const VmaOptions) -> c_int;
fn udp_socket_close(socket: *mut UdpSocket) -> c_int;
fn udp_socket_bind(socket: *mut UdpSocket, ip: *const c_char, port: u16) -> c_int;
fn udp_socket_connect(socket: *mut UdpSocket, ip: *const c_char, port: u16) -> c_int;
fn udp_socket_send(socket: *mut UdpSocket, data: *const c_void, length: usize, bytes_sent: *mut usize) -> c_int;
fn udp_socket_sendto(
socket: *mut UdpSocket,
data: *const c_void,
length: usize,
ip: *const c_char,
port: u16,
bytes_sent: *mut usize,
) -> c_int;
fn udp_socket_recv(
socket: *mut UdpSocket,
buffer: *mut c_void,
buffer_size: usize,
timeout_ms: c_int,
bytes_received: *mut usize,
) -> c_int;
fn udp_socket_recvfrom(
socket: *mut UdpSocket,
packet: *mut UdpPacket,
buffer: *mut c_void,
buffer_size: usize,
timeout_ms: c_int,
) -> c_int;
fn udp_socket_get_stats(
socket: *mut UdpSocket,
rx_packets: *mut c_ulonglong,
tx_packets: *mut c_ulonglong,
rx_bytes: *mut c_ulonglong,
tx_bytes: *mut c_ulonglong,
) -> c_int;
}
#[derive(Clone, Debug)]
pub struct Packet {
pub data: Vec<u8>,
pub src_addr: SocketAddr,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub struct UdpSocketWrapper {
socket: UdpSocket,
}
impl UdpSocketWrapper {
pub fn new(options: Option<VmaOptions>) -> Result<Self, UdpResult> {
let mut socket = unsafe { mem::zeroed::<UdpSocket>() };
let c_options = options.unwrap_or_default();
let result = unsafe {
println!("Initializing UDP socket with options: use_socketxtreme={}, optimize_for_latency={}, ring_count={}",
c_options.use_socketxtreme, c_options.optimize_for_latency, c_options.ring_count);
udp_socket_init(&mut socket, &c_options)
};
if result != UdpResult::UdpSuccess as i32 {
println!("UDP socket initialization failed with code: {}", result);
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok(UdpSocketWrapper { socket })
}
pub fn bind<A: Into<String>>(&mut self, addr: A, port: u16) -> Result<(), UdpResult> {
let c_addr = CString::new(addr.into()).unwrap();
let result = unsafe { udp_socket_bind(&mut self.socket, c_addr.as_ptr(), port) };
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok(())
}
pub fn connect<A: Into<String>>(&mut self, addr: A, port: u16) -> Result<(), UdpResult> {
let c_addr = CString::new(addr.into()).unwrap();
let result = unsafe { udp_socket_connect(&mut self.socket, c_addr.as_ptr(), port) };
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok(())
}
pub fn send(&mut self, data: &[u8]) -> Result<usize, UdpResult> {
let mut bytes_sent: usize = 0;
let result = unsafe {
udp_socket_send(
&mut self.socket,
data.as_ptr() as *const c_void,
data.len(),
&mut bytes_sent,
)
};
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok(bytes_sent)
}
pub fn send_to<A: Into<String>>(&mut self, data: &[u8], addr: A, port: u16) -> Result<usize, UdpResult> {
let c_addr = CString::new(addr.into()).unwrap();
let mut bytes_sent: usize = 0;
let result = unsafe {
udp_socket_sendto(
&mut self.socket,
data.as_ptr() as *const c_void,
data.len(),
c_addr.as_ptr(),
port,
&mut bytes_sent,
)
};
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok(bytes_sent)
}
pub fn recv(&mut self, buffer: &mut [u8], timeout_nano: Option<u64>) -> Result<usize, UdpResult> {
let mut bytes_received: usize = 0;
let timeout_ms = unixnano_to_ms(timeout_nano);
let result = unsafe {
udp_socket_recv(
&mut self.socket,
buffer.as_mut_ptr() as *mut c_void,
buffer.len(),
timeout_ms,
&mut bytes_received,
)
};
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok(bytes_received)
}
pub fn recv_from(&mut self, buffer: &mut [u8], timeout_nano: Option<u64>) -> Result<Packet, UdpResult> {
let mut packet = unsafe { mem::zeroed::<UdpPacket>() };
let timeout_ms = unixnano_to_ms(timeout_nano);
let result = unsafe {
udp_socket_recvfrom(
&mut self.socket,
&mut packet,
buffer.as_mut_ptr() as *mut c_void,
buffer.len(),
timeout_ms,
)
};
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
let src_addr = sockaddr_to_rust(&packet.src_addr);
let data = unsafe { std::slice::from_raw_parts(packet.data as *const u8, packet.length) }.to_vec();
Ok(Packet {
data,
src_addr,
timestamp: packet.timestamp,
})
}
pub fn get_stats(&mut self) -> Result<(u64, u64, u64, u64), UdpResult> {
let mut rx_packets: c_ulonglong = 0;
let mut tx_packets: c_ulonglong = 0;
let mut rx_bytes: c_ulonglong = 0;
let mut tx_bytes: c_ulonglong = 0;
let result = unsafe {
udp_socket_get_stats(
&mut self.socket as *mut _,
&mut rx_packets,
&mut tx_packets,
&mut rx_bytes,
&mut tx_bytes,
)
};
if result != UdpResult::UdpSuccess as i32 {
return Err(unsafe { mem::transmute::<i32, UdpResult>(result) });
}
Ok((rx_packets, tx_packets, rx_bytes, tx_bytes))
}
}
impl Drop for UdpSocketWrapper {
fn drop(&mut self) {
unsafe {
udp_socket_close(&mut self.socket);
}
}
}
#[derive(Debug, Clone)]
pub struct VmaUdpSocket {
inner: UdpSocketWrapper,
}
impl VmaUdpSocket {
pub fn new() -> Result<Self, std::io::Error> {
UdpSocketWrapper::new(None)
.map(|inner| VmaUdpSocket { inner })
.map_err(|e| e.into())
}
pub fn with_options(options: VmaOptions) -> Result<Self, std::io::Error> {
UdpSocketWrapper::new(Some(options))
.map(|inner| VmaUdpSocket { inner })
.map_err(|e| e.into())
}
pub fn bind<A: Into<String>>(&mut self, addr: A, port: u16) -> Result<(), std::io::Error> {
self.inner
.bind(addr, port)
.map_err(|e| e.into())
}
pub fn connect<A: Into<String>>(&mut self, addr: A, port: u16) -> Result<(), std::io::Error> {
self.inner
.connect(addr, port)
.map_err(|e| e.into())
}
pub fn send(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
self.inner
.send(data)
.map_err(|e| e.into())
}
pub fn send_to<A: Into<String>>(&mut self, data: &[u8], addr: A, port: u16) -> Result<usize, std::io::Error> {
self.inner
.send_to(data, addr, port)
.map_err(|e| e.into())
}
pub fn recv(&mut self, buffer: &mut [u8], timeout_nano: Option<u64>) -> Result<usize, std::io::Error> {
match self.inner.recv(buffer, timeout_nano) {
Ok(bytes) => Ok(bytes),
Err(UdpResult::UdpErrorTimeout) => Ok(0), Err(e) => Err(e.into()),
}
}
pub fn recv_from(&mut self, buffer: &mut [u8], timeout_nano: Option<u64>) -> Result<Option<Packet>, std::io::Error> {
match self.inner.recv_from(buffer, timeout_nano) {
Ok(packet) => Ok(Some(packet)),
Err(UdpResult::UdpErrorTimeout) => Ok(None), Err(e) => Err(e.into()),
}
}
pub fn get_stats(&mut self) -> Result<(u64, u64, u64, u64), std::io::Error> {
self.inner
.get_stats()
.map_err(|e| e.into())
}
}