#![allow(unsafe_op_in_unsafe_fn)]
use crate::ffi;
use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
use crate::error::{check, Error};
use crate::keypair::Keypair;
use crate::types::{Header, NodeId, DEFAULT_MAX_STREAMS, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_TIMEOUT, SECONDS};
use crate::msg::{CResponse, CNotify};
use crate::addr::Url;
use std::ffi::CString;
use std::net::{UdpSocket, SocketAddr};
use std::sync::{mpsc, Arc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::collections::HashMap;
const UDP_BUF_SIZE: usize = 65536;
pub struct Settings {
pub max_streams: u32,
pub max_message_size: u32,
pub timeout_ms: u32,
pub compression: String,
}
impl Default for Settings {
fn default() -> Self {
Settings {
max_streams: DEFAULT_MAX_STREAMS,
max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
compression: String::new(),
}
}
}
#[derive(Clone, Debug)]
pub struct Response {
pub status: String,
pub status_details: String,
pub headers: Vec<Header>,
pub body: Vec<u8>,
}
impl Response {
pub fn is_ok(&self) -> bool {
crate::protocol::status_is_success(&self.status)
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers.iter().find(|h| h.name == name).map(|h| h.value.as_str())
}
}
impl From<CResponse> for Response {
fn from(r: CResponse) -> Self {
Response { status: r.status, status_details: r.status_details, headers: r.headers, body: r.body }
}
}
#[derive(Clone, Debug)]
pub struct Notification {
pub event: String,
pub path: String,
pub headers: Vec<Header>,
pub body: Vec<u8>,
}
impl From<CNotify> for Notification {
fn from(n: CNotify) -> Self {
Notification { event: n.event, path: n.path, headers: n.headers, body: n.body }
}
}
enum ClientEvent {
Packet { data: Vec<u8>, local: SocketAddr, remote: SocketAddr },
TimerExpiry,
Request { method: String, path: String, body: Vec<u8>, headers: Vec<Header>, resp_tx: mpsc::SyncSender<Result<Response, Error>> },
Shutdown,
}
struct PendingEntry {
resp_tx: mpsc::SyncSender<Result<Response, Error>>,
resp: Option<Response>,
body_buf: Vec<u8>,
}
struct ClientState {
c_client: *mut ffi::nwep_client,
pending: HashMap<i64, PendingEntry>,
on_notify: Option<Box<dyn Fn(Notification) + Send>>,
disconnected: bool,
}
unsafe impl Send for ClientState {}
struct CallbackData {
state: *mut ClientState,
connected_tx: Option<mpsc::SyncSender<Result<crate::types::Identity, Error>>>,
}
unsafe extern "C" fn client_on_connect(
_conn: *mut ffi::nwep_conn,
peer: *const ffi::nwep_identity,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
let cb = &mut *(user_data as *mut CallbackData);
if let Some(tx) = cb.connected_tx.take() {
let identity = if peer.is_null() {
crate::types::Identity::default()
} else {
crate::types::Identity::from(*peer)
};
let _ = tx.send(Ok(identity));
}
0
}
unsafe extern "C" fn client_on_disconnect(
_conn: *mut ffi::nwep_conn,
error: std::ffi::c_int,
user_data: *mut std::ffi::c_void,
) {
let cb = &mut *(user_data as *mut CallbackData);
if let Some(tx) = cb.connected_tx.take() {
let err = if error != 0 { Error::from_code(error) }
else { Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED) };
let _ = tx.send(Err(err));
}
let state = &mut *cb.state;
state.disconnected = true;
for (_, entry) in state.pending.drain() {
let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
}
}
unsafe extern "C" fn client_on_response(
_conn: *mut ffi::nwep_conn,
stream: *mut ffi::nwep_stream,
resp: *const ffi::nwep_response,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
if resp.is_null() || stream.is_null() { return 0; }
let cb = &*(user_data as *const CallbackData);
let state = &mut *cb.state;
let stream_id = ffi::nwep_stream_get_id(stream);
if let Some(entry) = state.pending.get_mut(&stream_id) {
let c_resp = CResponse::from_ffi(&*resp);
entry.resp = Some(Response::from(c_resp));
}
0
}
unsafe extern "C" fn client_on_notify(
_conn: *mut ffi::nwep_conn,
_stream: *mut ffi::nwep_stream,
notify: *const ffi::nwep_notify,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
let cb = &*(user_data as *const CallbackData);
let state = &mut *cb.state;
if let Some(cb_fn) = &state.on_notify {
if !notify.is_null() {
let n = CNotify::from_ffi(&*notify);
cb_fn(Notification::from(n));
}
}
0
}
unsafe extern "C" fn client_on_stream_data(
_conn: *mut ffi::nwep_conn,
stream: *mut ffi::nwep_stream,
data: *const u8,
len: usize,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
if stream.is_null() || data.is_null() { return 0; }
let cb = &*(user_data as *const CallbackData);
let state = &mut *cb.state;
let stream_id = ffi::nwep_stream_get_id(stream);
if let Some(entry) = state.pending.get_mut(&stream_id) {
let chunk = std::slice::from_raw_parts(data, len);
entry.body_buf.extend_from_slice(chunk);
}
0
}
unsafe extern "C" fn client_on_stream_end(
_conn: *mut ffi::nwep_conn,
stream: *mut ffi::nwep_stream,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
if stream.is_null() { return 0; }
let cb = &*(user_data as *const CallbackData);
let state = &mut *cb.state;
let stream_id = ffi::nwep_stream_get_id(stream);
if let Some(entry) = state.pending.remove(&stream_id) {
if let Some(mut resp) = entry.resp {
if !entry.body_buf.is_empty() {
resp.body.extend_from_slice(&entry.body_buf);
}
let _ = entry.resp_tx.send(Ok(resp));
} else {
let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE)));
}
}
0
}
unsafe extern "C" fn client_rand(
dest: *mut u8,
len: usize,
_user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
let slice = std::slice::from_raw_parts_mut(dest, len);
match crate::crypto::random_bytes(slice) {
Ok(()) => 0,
Err(_) => -1,
}
}
pub struct Client {
event_tx: mpsc::SyncSender<ClientEvent>,
node_id: NodeId,
peer_identity: crate::types::Identity,
shutdown_flag: Arc<AtomicBool>,
done_rx: mpsc::Receiver<()>,
request_timeout: Duration,
}
impl Client {
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn peer_identity(&self) -> crate::types::Identity {
self.peer_identity.clone()
}
pub fn peer_node_id(&self) -> NodeId {
self.peer_identity.node_id
}
pub fn fetch(&self, method: &str, path: &str, body: &[u8]) -> Result<Response, Error> {
self.fetch_with_headers(method, path, body, &[])
}
pub fn fetch_with_headers(&self, method: &str, path: &str, body: &[u8], headers: &[Header]) -> Result<Response, Error> {
let (resp_tx, resp_rx) = mpsc::sync_channel(1);
self.event_tx.send(ClientEvent::Request {
method: method.to_string(),
path: path.to_string(),
body: body.to_vec(),
headers: headers.to_vec(),
resp_tx,
}).map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))?;
resp_rx.recv_timeout(self.request_timeout)
.map_err(|e| match e {
mpsc::RecvTimeoutError::Timeout => Error::from_code(crate::error::ERR_NETWORK_TIMEOUT),
mpsc::RecvTimeoutError::Disconnected => Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED),
})?
}
pub fn get(&self, path: &str) -> Result<Response, Error> {
self.fetch(crate::protocol::METHOD_READ, path, b"")
}
pub fn post(&self, path: &str, body: &[u8]) -> Result<Response, Error> {
self.fetch(crate::protocol::METHOD_WRITE, path, body)
}
pub fn close(&self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
let _ = self.event_tx.send(ClientEvent::Shutdown);
let _ = self.done_rx.recv();
}
}
pub struct ClientBuilder {
settings: Settings,
on_notify: Option<Box<dyn Fn(Notification) + Send>>,
}
impl ClientBuilder {
pub fn new() -> Self {
ClientBuilder { settings: Settings::default(), on_notify: None }
}
pub fn settings(mut self, s: Settings) -> Self {
self.settings = s;
self
}
pub fn on_notify<F: Fn(Notification) + Send + 'static>(mut self, f: F) -> Self {
self.on_notify = Some(Box::new(f));
self
}
pub fn connect(self, mut keypair: Keypair, url: &str) -> Result<Client, Error> {
let node_id = keypair.node_id()?;
let parsed_url = Url::parse(url)?;
let socket = UdpSocket::bind("[::]:0")
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
let local_addr = socket.local_addr()
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
let ffi_settings = ffi::nwep_settings {
max_streams: self.settings.max_streams,
max_message_size: self.settings.max_message_size,
timeout_ms: self.settings.timeout_ms,
compression: compression.as_ptr(),
role: std::ptr::null(),
};
let state = Box::new(ClientState {
c_client: std::ptr::null_mut(),
pending: HashMap::new(),
on_notify: self.on_notify,
disconnected: false,
});
let state_ptr = Box::into_raw(state);
let (connected_tx, connected_rx) = mpsc::sync_channel::<Result<crate::types::Identity, Error>>(1);
let cb_data = Box::new(CallbackData {
state: state_ptr,
connected_tx: Some(connected_tx),
});
let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;
let callbacks = ffi::nwep_callbacks {
on_connect: Some(client_on_connect),
on_disconnect: Some(client_on_disconnect),
on_request: None,
on_response: Some(client_on_response),
on_notify: Some(client_on_notify),
on_stream_data: Some(client_on_stream_data),
on_stream_end: Some(client_on_stream_end),
rand: Some(client_rand),
log: None,
};
let mut c_client: *mut ffi::nwep_client = std::ptr::null_mut();
check(unsafe {
ffi::nwep_client_new(&mut c_client, &ffi_settings, &callbacks, keypair.as_ffi_mut(), cb_data_ptr)
})?;
unsafe { (*state_ptr).c_client = c_client; }
let ffi_url = parsed_url.to_ffi();
let ts = now_ns();
let (local_sa, local_sa_len) = socketaddr_to_sockaddr(&local_addr);
check(unsafe {
ffi::nwep_client_connect(
c_client,
&ffi_url,
&local_sa as *const ffi::sockaddr_storage as *const _,
local_sa_len,
ts,
)
})?;
let mut init_buf = vec![0u8; UDP_BUF_SIZE];
drain_writes(c_client, &socket, &mut init_buf, ts);
let (event_tx, event_rx) = mpsc::sync_channel::<ClientEvent>(1024);
let shutdown_flag = Arc::new(AtomicBool::new(false));
let (done_tx, done_rx) = mpsc::channel::<()>();
let socket_recv = socket.try_clone()
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
let event_tx_recv = event_tx.clone();
let shutdown_recv = shutdown_flag.clone();
std::thread::spawn(move || {
let mut buf = vec![0u8; UDP_BUF_SIZE];
loop {
if shutdown_recv.load(Ordering::SeqCst) { break; }
socket_recv.set_read_timeout(Some(Duration::from_millis(50))).ok();
match socket_recv.recv_from(&mut buf) {
Ok((n, remote)) => {
let local = socket_recv.local_addr().unwrap();
let _ = event_tx_recv.send(ClientEvent::Packet {
data: buf[..n].to_vec(),
local,
remote,
});
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock ||
e.kind() == std::io::ErrorKind::TimedOut => {}
Err(_) => {
if shutdown_recv.load(Ordering::SeqCst) { break; }
}
}
}
});
let event_tx_timer = event_tx.clone();
let shutdown_timer = shutdown_flag.clone();
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_millis(10));
if shutdown_timer.load(Ordering::SeqCst) { break; }
let _ = event_tx_timer.send(ClientEvent::TimerExpiry);
}
});
let c_client_addr = c_client as usize;
let state_ptr_addr = state_ptr as usize;
let cb_data_ptr_addr = cb_data_ptr as usize;
let shutdown_loop = shutdown_flag.clone();
std::thread::spawn(move || {
let _done = done_tx;
let _keypair = keypair;
let c_client = c_client_addr as *mut ffi::nwep_client;
let state_ptr = state_ptr_addr as *mut ClientState;
let cb_data_ptr = cb_data_ptr_addr as *mut std::ffi::c_void;
let mut write_buf = vec![0u8; UDP_BUF_SIZE];
loop {
let event = match event_rx.recv_timeout(Duration::from_millis(100)) {
Ok(e) => e,
Err(mpsc::RecvTimeoutError::Timeout) => {
if shutdown_loop.load(Ordering::SeqCst) { break; }
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
let ts = now_ns();
let state = unsafe { &mut *state_ptr };
match event {
ClientEvent::Shutdown => break,
ClientEvent::Packet { data, local, remote } => {
let path = make_path(&local, &remote);
unsafe {
ffi::nwep_client_read(c_client, &path, data.as_ptr(), data.len(), ts);
}
drain_writes(c_client, &socket, &mut write_buf, ts);
}
ClientEvent::TimerExpiry => {
let expiry = unsafe { ffi::nwep_client_get_expiry(c_client) };
if expiry != u64::MAX && ts >= expiry {
unsafe { ffi::nwep_client_handle_expiry(c_client, ts); }
drain_writes(c_client, &socket, &mut write_buf, ts);
}
}
ClientEvent::Request { method, path, body, headers, resp_tx } => {
let conn = unsafe { ffi::nwep_client_get_conn(c_client) };
if conn.is_null() || state.disconnected {
let _ = resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
continue;
}
let meth = CString::new(method.as_str()).unwrap_or_default();
let pth = CString::new(path.as_str()).unwrap_or_default();
let mut request_id = [0u8; 16];
let mut trace_id = [0u8; 16];
let _ = crate::protocol::request_id_generate().map(|id| request_id = id);
let _ = crate::protocol::trace_id_generate().map(|id| trace_id = id);
let c_headers: Vec<ffi::nwep_header> = headers.iter().map(|h| ffi::nwep_header {
name: h.name.as_ptr(),
name_len: h.name.len(),
value: h.value.as_ptr(),
value_len: h.value.len(),
}).collect();
let req = ffi::nwep_request {
method: meth.as_ptr(),
method_len: method.len(),
path: pth.as_ptr(),
path_len: path.len(),
headers: if c_headers.is_empty() { std::ptr::null() } else { c_headers.as_ptr() },
header_count: c_headers.len(),
body: if body.is_empty() { std::ptr::null() } else { body.as_ptr() },
body_len: body.len(),
request_id,
trace_id,
};
let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
let rc = unsafe { ffi::nwep_stream_request(conn, &req, &mut stream) };
if rc != 0 {
let _ = resp_tx.send(Err(Error::from_code(rc)));
continue;
}
let stream_id = unsafe { ffi::nwep_stream_get_id(stream) };
unsafe { ffi::nwep_stream_end(stream); }
state.pending.insert(stream_id, PendingEntry {
resp_tx,
resp: None,
body_buf: Vec::new(),
});
drain_writes(c_client, &socket, &mut write_buf, ts);
}
}
}
let state = unsafe { &mut *state_ptr };
for (_, entry) in state.pending.drain() {
let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
}
unsafe {
let conn = ffi::nwep_client_get_conn(c_client);
if !conn.is_null() {
ffi::nwep_conn_close(conn, 0);
}
ffi::nwep_client_close(c_client);
let ts = now_ns();
drain_writes(c_client, &socket, &mut write_buf, ts);
ffi::nwep_client_free(c_client);
drop(Box::from_raw(state_ptr));
drop(Box::from_raw(cb_data_ptr as *mut CallbackData));
}
});
let request_timeout = Duration::from_millis(self.settings.timeout_ms as u64);
match connected_rx.recv() {
Ok(Ok(peer_identity)) => {
Ok(Client { event_tx, node_id, peer_identity, shutdown_flag, done_rx, request_timeout })
}
Ok(Err(e)) => {
shutdown_flag.store(true, Ordering::SeqCst);
let _ = event_tx.send(ClientEvent::Shutdown);
Err(e)
}
Err(_) => {
Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
}
}
}
}
impl Default for ClientBuilder {
fn default() -> Self {
Self::new()
}
}
fn now_ns() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
fn drain_writes(client: *mut ffi::nwep_client, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
loop {
let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
let n = unsafe {
ffi::nwep_client_write(client, &mut path, buf.as_mut_ptr(), buf.len(), ts)
};
if n <= 0 { break; }
let remote = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen);
if let Some(addr) = remote {
socket.send_to(&buf[..n as usize], addr).ok();
}
}
}
fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
let (local_sa, local_len) = socketaddr_to_sockaddr_v6(local);
let (remote_sa, remote_len) = socketaddr_to_sockaddr_v6(remote);
path.local_addr = local_sa;
path.local_addrlen = local_len;
path.remote_addr = remote_sa;
path.remote_addrlen = remote_len;
path
}
fn socketaddr_to_sockaddr_v6(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
sin6.sin6_family = AF_INET6 as sa_family_t;
let (port, octets) = match addr {
SocketAddr::V4(v4) => {
let mut mapped = [0u8; 16];
mapped[10] = 0xff;
mapped[11] = 0xff;
mapped[12..].copy_from_slice(&v4.ip().octets());
(v4.port(), mapped)
}
SocketAddr::V6(v6) => (v6.port(), v6.ip().octets()),
};
sin6.sin6_port = port.to_be();
sin6.sin6_addr.s6_addr = octets;
(storage, std::mem::size_of::<sockaddr_in6>())
}
fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
socketaddr_to_sockaddr_v6(addr)
}
fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
let family = storage.ss_family as i32;
match family {
AF_INET => {
let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
let ip = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
let port = u16::from_be(sin.sin_port);
Some(SocketAddr::new(ip.into(), port))
}
AF_INET6 => {
let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
let octets = sin6.sin6_addr.s6_addr;
let port = u16::from_be(sin6.sin6_port);
let ip = std::net::Ipv6Addr::from(octets);
Some(SocketAddr::new(ip.into(), port))
}
_ => None,
}
}