#![allow(unsafe_op_in_unsafe_fn)]
use crate::anchorserver::AnchorServer;
use crate::error::{Error, check};
use crate::ffi;
use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
use crate::keypair::Keypair;
use crate::logserver::LogServer;
use crate::msg::CRequest;
use crate::role::ServerRole;
use crate::types::{
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_STREAMS, DEFAULT_TIMEOUT, Header, Identity, NodeId,
SECONDS,
};
use std::collections::{HashMap, HashSet};
use std::ffi::CString;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, mpsc};
use std::time::Duration;
const UDP_BUF_SIZE: usize = 65536;
pub struct Settings {
pub max_streams: u32,
pub max_message_size: u32,
pub timeout_ms: u32,
pub compression: String,
pub role: ServerRole,
}
impl Default for Settings {
fn default() -> Self {
Settings {
max_streams: DEFAULT_MAX_STREAMS,
max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
compression: String::new(),
role: ServerRole::Regular,
}
}
}
pub struct NotifyOptions {
pub headers: Vec<Header>,
pub notify_id: Option<[u8; 16]>,
}
#[derive(Clone, Debug)]
pub struct ConnInfo {
pub node_id: NodeId,
pub peer_identity: Identity,
pub role: ServerRole,
}
enum ServerEvent {
Packet {
data: Vec<u8>,
local: SocketAddr,
remote: SocketAddr,
},
TimerExpiry,
Notify {
peer_node_id: NodeId,
event: String,
path: String,
body: Vec<u8>,
options: Option<NotifyOptions>,
},
NotifyAll {
event: String,
path: String,
body: Vec<u8>,
},
CloseConn {
peer_node_id: NodeId,
error: i32,
},
Shutdown,
}
pub struct Request {
pub method: String,
pub path: String,
pub body: Vec<u8>,
pub request_id: [u8; 16],
pub trace_id: [u8; 16],
pub headers: Vec<Header>,
pub conn: ConnInfo,
}
impl Request {
pub fn header(&self, name: &str) -> Option<&str> {
self.headers
.iter()
.find(|h| h.name == name)
.map(|h| h.value.as_str())
}
}
pub trait Handler: Send + 'static {
fn serve(&self, w: &mut ResponseWriter, r: &Request);
}
impl<F: Fn(&mut ResponseWriter, &Request) + Send + 'static> Handler for F {
fn serve(&self, w: &mut ResponseWriter, r: &Request) {
self(w, r)
}
}
pub struct ResponseWriter {
pub(crate) stream: *mut ffi::nwep_stream,
pub(crate) status: String,
pub(crate) headers: Vec<Header>,
pub(crate) sent: bool,
}
impl ResponseWriter {
pub fn set_status(&mut self, status: &str) {
self.status = status.to_string();
}
pub fn set_header(&mut self, name: &str, value: &str) {
self.headers.push(Header::new(name, value));
}
pub fn write(&mut self, body: &[u8]) -> Result<(), Error> {
if self.sent {
return Ok(());
}
self.sent = true;
let status = CString::new(self.status.as_str()).unwrap_or_default();
let c_headers: Vec<ffi::nwep_header> = self
.headers
.iter()
.map(|h| ffi::nwep_header {
name: h.name.as_ptr(),
name_len: h.name.len(),
value: h.value.as_ptr(),
value_len: h.value.len(),
})
.collect();
let resp = ffi::nwep_response {
status: status.as_ptr(),
status_len: self.status.len(),
status_details: std::ptr::null(),
status_details_len: 0,
headers: if c_headers.is_empty() {
std::ptr::null()
} else {
c_headers.as_ptr()
},
header_count: c_headers.len(),
body: if body.is_empty() {
std::ptr::null()
} else {
body.as_ptr()
},
body_len: body.len(),
};
check(unsafe { ffi::nwep_stream_respond(self.stream, &resp) })?;
check(unsafe { ffi::nwep_stream_end(self.stream) })?;
Ok(())
}
pub fn respond(&mut self, status: &str, body: &[u8]) -> Result<(), Error> {
self.set_status(status);
self.write(body)
}
pub fn stream_write(&mut self, data: &[u8]) -> Result<isize, Error> {
let n = unsafe { ffi::nwep_stream_write(self.stream, data.as_ptr(), data.len()) };
if n < 0 {
Err(Error::from_code(n as i32))
} else {
Ok(n)
}
}
pub fn stream_end(&mut self) -> Result<(), Error> {
check(unsafe { ffi::nwep_stream_end(self.stream) })
}
pub fn stream_close(&mut self, err: i32) {
unsafe { ffi::nwep_stream_close(self.stream, err) }
}
pub fn stream_id(&self) -> i64 {
unsafe { ffi::nwep_stream_get_id(self.stream) }
}
pub fn is_server_initiated(&self) -> bool {
unsafe { ffi::nwep_stream_is_server_initiated(self.stream) != 0 }
}
}
struct CallbackData {
handler: Box<dyn Handler>,
on_connect: Option<Box<dyn Fn(ConnInfo) + Send>>,
on_disconnect: Option<Box<dyn Fn(ConnInfo, i32) + Send>>,
conns: HashMap<NodeId, *mut ffi::nwep_conn>,
conn_to_node_id: HashMap<usize, NodeId>,
closing_conns: HashSet<usize>,
peers: Arc<Mutex<Vec<NodeId>>>,
log_server: Option<Arc<Mutex<LogServer>>>,
anchor_server: Option<Arc<Mutex<AnchorServer>>>,
}
unsafe extern "C" fn server_on_connect(
conn: *mut ffi::nwep_conn,
peer: *const ffi::nwep_identity,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
let cb = &mut *(user_data as *mut CallbackData);
let mut identity = if peer.is_null() {
Identity::default()
} else {
Identity::from(*peer)
};
if identity.node_id.0 == [0u8; 32] && identity.pubkey != [0u8; 32] {
let mut nid = ffi::nwep_nodeid { data: [0u8; 32] };
if ffi::nwep_nodeid_from_pubkey(&mut nid, identity.pubkey.as_ptr()) == 0 {
identity.node_id = NodeId(nid.data);
}
}
let node_id = NodeId(identity.node_id.0);
let role = if conn.is_null() {
ServerRole::Regular
} else {
let role_ptr = ffi::nwep_conn_get_role(conn);
if role_ptr.is_null() {
ServerRole::Regular
} else {
ServerRole::from_str(
std::ffi::CStr::from_ptr(role_ptr)
.to_str()
.unwrap_or("regular"),
)
}
};
if !conn.is_null() {
cb.conns.insert(node_id, conn);
cb.conn_to_node_id.insert(conn as usize, node_id);
if let Ok(mut peers) = cb.peers.lock() {
peers.push(node_id);
}
}
if let Some(cb_fn) = &cb.on_connect {
cb_fn(ConnInfo {
node_id,
peer_identity: identity,
role,
});
}
0
}
unsafe extern "C" fn server_on_disconnect(
conn: *mut ffi::nwep_conn,
error: std::ffi::c_int,
user_data: *mut std::ffi::c_void,
) {
let cb = &mut *(user_data as *mut CallbackData);
let node_id = match cb.conn_to_node_id.remove(&(conn as usize)) {
Some(nid) => nid,
None => {
cb.closing_conns.remove(&(conn as usize));
return;
}
};
cb.closing_conns.remove(&(conn as usize));
cb.conns.remove(&node_id);
if let Ok(mut peers) = cb.peers.lock() {
peers.retain(|&n| n != node_id);
}
if let Some(cb_fn) = &cb.on_disconnect {
let peer = if conn.is_null() {
std::ptr::null()
} else {
ffi::nwep_conn_get_peer_identity(conn)
};
let identity = if peer.is_null() {
Identity {
pubkey: [0u8; 32],
node_id,
}
} else {
let mut id = Identity::from(*peer);
if id.node_id.0 == [0u8; 32] {
id.node_id = node_id;
}
id
};
cb_fn(
ConnInfo {
node_id,
peer_identity: identity,
role: ServerRole::Regular,
},
error,
);
}
}
unsafe extern "C" fn server_on_request(
conn: *mut ffi::nwep_conn,
stream: *mut ffi::nwep_stream,
req: *const ffi::nwep_request,
user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
let cb = &mut *(user_data as *mut CallbackData);
if req.is_null() || stream.is_null() {
return 0;
}
if !conn.is_null() && cb.closing_conns.contains(&(conn as usize)) {
ffi::nwep_stream_close(stream, 0);
return 0;
}
let c_req = CRequest::from_ffi(&*req);
if cb.log_server.is_some() || cb.anchor_server.is_some() {
if let Some(ref ls_arc) = cb.log_server {
if c_req.path == "/log" || c_req.path.starts_with("/log/") {
if let Ok(mut ls) = ls_arc.lock() {
if ls.handle_request(stream, req).is_err() {
sub_server_error_respond(stream, crate::protocol::STATUS_INTERNAL_ERROR);
}
}
ffi::nwep_stream_end(stream);
return 0;
}
}
if let Some(ref as_arc) = cb.anchor_server {
if c_req.path == "/checkpoint" || c_req.path.starts_with("/checkpoint/") {
if let Ok(mut as_) = as_arc.lock() {
if as_.handle_request(stream, req).is_err() {
sub_server_error_respond(stream, crate::protocol::STATUS_INTERNAL_ERROR);
}
}
ffi::nwep_stream_end(stream);
return 0;
}
}
}
let peer = if conn.is_null() {
std::ptr::null()
} else {
ffi::nwep_conn_get_peer_identity(conn)
};
let mut identity = if peer.is_null() {
Identity::default()
} else {
Identity::from(*peer)
};
if identity.node_id.0 == [0u8; 32] && identity.pubkey != [0u8; 32] {
let mut nid = ffi::nwep_nodeid { data: [0u8; 32] };
if ffi::nwep_nodeid_from_pubkey(&mut nid, identity.pubkey.as_ptr()) == 0 {
identity.node_id = NodeId(nid.data);
}
}
let role = if conn.is_null() {
ServerRole::Regular
} else {
let role_ptr = ffi::nwep_conn_get_role(conn);
if role_ptr.is_null() {
ServerRole::Regular
} else {
ServerRole::from_str(
std::ffi::CStr::from_ptr(role_ptr)
.to_str()
.unwrap_or("regular"),
)
}
};
let conn_info = ConnInfo {
node_id: NodeId(identity.node_id.0),
peer_identity: identity,
role,
};
let request = Request {
method: c_req.method,
path: c_req.path,
body: c_req.body,
request_id: c_req.request_id,
trace_id: c_req.trace_id,
headers: c_req.headers,
conn: conn_info,
};
let mut writer = ResponseWriter {
stream,
status: crate::protocol::STATUS_OK.to_string(),
headers: Vec::new(),
sent: false,
};
cb.handler.serve(&mut writer, &request);
if !writer.sent {
let _ = writer.write(b"");
}
0
}
unsafe extern "C" fn server_rand(
dest: *mut u8,
len: usize,
_user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
let slice = std::slice::from_raw_parts_mut(dest, len);
match crate::crypto::random_bytes(slice) {
Ok(()) => 0,
Err(_) => -1,
}
}
unsafe fn sub_server_error_respond(stream: *mut ffi::nwep_stream, status: &str) {
let status_c = CString::new(status).unwrap_or_default();
let resp = ffi::nwep_response {
status: status_c.as_ptr(),
status_len: status.len(),
status_details: std::ptr::null(),
status_details_len: 0,
headers: std::ptr::null(),
header_count: 0,
body: std::ptr::null(),
body_len: 0,
};
let _ = ffi::nwep_stream_respond(stream, &resp);
}
unsafe fn do_conn_notify(
c_server: *mut ffi::nwep_server,
conn: *mut ffi::nwep_conn,
notify: &ffi::nwep_notify,
socket: &UdpSocket,
write_buf: &mut Vec<u8>,
ts: u64,
) {
let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
let rc = ffi::nwep_conn_notify(conn, notify, &mut stream);
if rc == 0 && !stream.is_null() {
ffi::nwep_stream_end(stream);
}
drain_writes(c_server, socket, write_buf, ts);
}
pub struct Server {
event_tx: mpsc::SyncSender<ServerEvent>,
node_id: NodeId,
addr: SocketAddr,
shutdown_flag: Arc<AtomicBool>,
peers: Arc<Mutex<Vec<NodeId>>>,
log_server: Option<Arc<Mutex<LogServer>>>,
anchor_server: Option<Arc<Mutex<AnchorServer>>>,
}
impl Server {
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub fn url(&self, path: &str) -> String {
use std::net::{IpAddr, Ipv4Addr};
let ip = match self.addr.ip() {
ip if ip.is_unspecified() => IpAddr::V4(Ipv4Addr::LOCALHOST),
ip => ip,
};
let nwep_addr = match ip {
IpAddr::V4(v4) => crate::addr::Addr::new_ipv4(v4, self.node_id, self.addr.port()),
IpAddr::V6(v6) => crate::addr::Addr::new_ipv6(v6, self.node_id, self.addr.port()),
};
let url = crate::addr::Url {
addr: nwep_addr,
path: path.to_string(),
};
url.format()
.unwrap_or_else(|_| format!("web://[{}]:{}{}", ip, self.addr.port(), path))
}
pub fn connection_count(&self) -> usize {
self.peers.lock().map(|p| p.len()).unwrap_or(0)
}
pub fn connected_peers(&self) -> Vec<NodeId> {
self.peers.lock().map(|p| p.clone()).unwrap_or_default()
}
pub fn shutdown(&self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
let _ = self.event_tx.send(ServerEvent::Shutdown);
}
pub fn notify(
&self,
peer_node_id: NodeId,
event: &str,
path: &str,
body: Vec<u8>,
) -> Result<(), Error> {
self.event_tx
.send(ServerEvent::Notify {
peer_node_id,
event: event.to_string(),
path: path.to_string(),
body,
options: None,
})
.map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
}
pub fn notify_with_options(
&self,
peer_node_id: NodeId,
event: &str,
path: &str,
body: Vec<u8>,
options: NotifyOptions,
) -> Result<(), Error> {
self.event_tx
.send(ServerEvent::Notify {
peer_node_id,
event: event.to_string(),
path: path.to_string(),
body,
options: Some(options),
})
.map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
}
pub fn notify_all(&self, event: &str, path: &str, body: Vec<u8>) -> Result<(), Error> {
self.event_tx
.send(ServerEvent::NotifyAll {
event: event.to_string(),
path: path.to_string(),
body,
})
.map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
}
pub fn close_connection(&self, peer_node_id: NodeId, error: i32) -> Result<(), Error> {
self.event_tx
.send(ServerEvent::CloseConn {
peer_node_id,
error,
})
.map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
}
pub fn log_server(&self) -> Option<Arc<Mutex<LogServer>>> {
self.log_server.clone()
}
pub fn anchor_server(&self) -> Option<Arc<Mutex<AnchorServer>>> {
self.anchor_server.clone()
}
}
pub struct ServerBuilder {
addr: String,
keypair: Keypair,
settings: Settings,
on_connect: Option<Box<dyn Fn(ConnInfo) + Send>>,
on_disconnect: Option<Box<dyn Fn(ConnInfo, i32) + Send>>,
log_server: Option<Arc<Mutex<LogServer>>>,
anchor_server: Option<Arc<Mutex<AnchorServer>>>,
}
impl ServerBuilder {
pub fn new(addr: impl Into<String>, keypair: Keypair) -> Self {
ServerBuilder {
addr: addr.into(),
keypair,
settings: Settings::default(),
on_connect: None,
on_disconnect: None,
log_server: None,
anchor_server: None,
}
}
pub fn settings(mut self, s: Settings) -> Self {
self.settings = s;
self
}
pub fn on_connect<F: Fn(ConnInfo) + Send + 'static>(mut self, f: F) -> Self {
self.on_connect = Some(Box::new(f));
self
}
pub fn on_disconnect<F: Fn(ConnInfo, i32) + Send + 'static>(mut self, f: F) -> Self {
self.on_disconnect = Some(Box::new(f));
self
}
pub fn log_server(mut self, ls: Arc<Mutex<LogServer>>) -> Self {
self.log_server = Some(ls);
self
}
pub fn anchor_server(mut self, as_: Arc<Mutex<AnchorServer>>) -> Self {
self.anchor_server = Some(as_);
self
}
pub fn build<H: Handler>(mut self, handler: H) -> Result<(Server, EventLoop), Error> {
let node_id = self.keypair.node_id()?;
let socket = UdpSocket::bind(&self.addr)
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
let local_addr = socket
.local_addr()
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
let role_str = CString::new(self.settings.role.as_str()).unwrap_or_default();
let ffi_settings = ffi::nwep_settings {
max_streams: self.settings.max_streams,
max_message_size: self.settings.max_message_size,
timeout_ms: self.settings.timeout_ms,
compression: compression.as_ptr(),
role: role_str.as_ptr(),
};
let peers = Arc::new(Mutex::new(Vec::<NodeId>::new()));
let cb_data = Box::new(CallbackData {
handler: Box::new(handler),
on_connect: self.on_connect,
on_disconnect: self.on_disconnect,
conns: HashMap::new(),
conn_to_node_id: HashMap::new(),
closing_conns: HashSet::new(),
peers: Arc::clone(&peers),
log_server: self.log_server.clone(),
anchor_server: self.anchor_server.clone(),
});
let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;
let callbacks = ffi::nwep_callbacks {
on_connect: Some(server_on_connect),
on_disconnect: Some(server_on_disconnect),
on_request: Some(server_on_request),
on_response: None,
on_notify: None,
on_stream_data: None,
on_stream_end: None,
rand: Some(server_rand),
log: None,
};
let mut c_server: *mut ffi::nwep_server = std::ptr::null_mut();
check(unsafe {
ffi::nwep_server_new(
&mut c_server,
&ffi_settings,
&callbacks,
self.keypair.as_ffi_mut(),
cb_data_ptr,
)
})?;
let (event_tx, event_rx) = mpsc::sync_channel::<ServerEvent>(1024);
let shutdown_flag = Arc::new(AtomicBool::new(false));
let server = Server {
event_tx: event_tx.clone(),
node_id,
addr: local_addr,
shutdown_flag: shutdown_flag.clone(),
peers,
log_server: self.log_server,
anchor_server: self.anchor_server,
};
let event_loop = EventLoop {
c_server,
socket,
local_addr,
event_rx,
event_tx,
shutdown_flag,
cb_data_ptr,
_keypair: self.keypair,
};
Ok((server, event_loop))
}
}
pub struct EventLoop {
c_server: *mut ffi::nwep_server,
socket: UdpSocket,
local_addr: SocketAddr,
event_rx: mpsc::Receiver<ServerEvent>,
event_tx: mpsc::SyncSender<ServerEvent>,
shutdown_flag: Arc<AtomicBool>,
cb_data_ptr: *mut std::ffi::c_void,
_keypair: crate::keypair::Keypair,
}
unsafe impl Send for EventLoop {}
impl EventLoop {
pub fn run(self) -> Result<(), Error> {
let socket_recv = self
.socket
.try_clone()
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
let event_tx_recv = self.event_tx.clone();
let local_addr = self.local_addr;
let shutdown_flag_recv = self.shutdown_flag.clone();
std::thread::spawn(move || {
let mut buf = vec![0u8; UDP_BUF_SIZE];
loop {
if shutdown_flag_recv.load(Ordering::SeqCst) {
break;
}
socket_recv
.set_read_timeout(Some(Duration::from_millis(100)))
.ok();
match socket_recv.recv_from(&mut buf) {
Ok((n, remote)) => {
let _ = event_tx_recv.send(ServerEvent::Packet {
data: buf[..n].to_vec(),
local: local_addr,
remote,
});
}
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut => {}
Err(_) => {
if shutdown_flag_recv.load(Ordering::SeqCst) {
break;
}
}
}
}
});
let event_tx_timer = self.event_tx.clone();
let shutdown_flag_timer = self.shutdown_flag.clone();
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_millis(10));
if shutdown_flag_timer.load(Ordering::SeqCst) {
break;
}
let _ = event_tx_timer.send(ServerEvent::TimerExpiry);
}
});
let mut write_buf = vec![0u8; UDP_BUF_SIZE];
let socket_send = self
.socket
.try_clone()
.map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
loop {
let event = match self.event_rx.recv_timeout(Duration::from_millis(100)) {
Ok(e) => e,
Err(mpsc::RecvTimeoutError::Timeout) => {
if self.shutdown_flag.load(Ordering::SeqCst) {
break;
}
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
let ts = now_ns();
match event {
ServerEvent::Shutdown => break,
ServerEvent::Packet {
data,
local,
remote,
} => {
let path = make_path(&local, &remote);
unsafe {
ffi::nwep_server_read(self.c_server, &path, data.as_ptr(), data.len(), ts);
}
drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
}
ServerEvent::TimerExpiry => {
let expiry = unsafe { ffi::nwep_server_get_expiry(self.c_server) };
if expiry != u64::MAX && ts >= expiry {
unsafe {
ffi::nwep_server_handle_expiry(self.c_server, ts);
}
drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
}
}
ServerEvent::Notify {
peer_node_id,
event,
path,
body,
options,
} => {
let cb = unsafe { &*(self.cb_data_ptr as *const CallbackData) };
if let Some(&conn) = cb.conns.get(&peer_node_id) {
let ev = CString::new(event.as_str()).unwrap_or_default();
let pth = CString::new(path.as_str()).unwrap_or_default();
let c_headers: Vec<ffi::nwep_header> = options
.as_ref()
.map(|o| {
o.headers
.iter()
.map(|h| ffi::nwep_header {
name: h.name.as_ptr(),
name_len: h.name.len(),
value: h.value.as_ptr(),
value_len: h.value.len(),
})
.collect()
})
.unwrap_or_default();
let notify = ffi::nwep_notify {
event: ev.as_ptr(),
event_len: event.len(),
path: pth.as_ptr(),
path_len: path.len(),
notify_id: options
.as_ref()
.and_then(|o| o.notify_id)
.unwrap_or([0u8; 16]),
has_notify_id: options.as_ref().and_then(|o| o.notify_id).is_some()
as i32,
headers: if c_headers.is_empty() {
std::ptr::null()
} else {
c_headers.as_ptr()
},
header_count: c_headers.len(),
body: if body.is_empty() {
std::ptr::null()
} else {
body.as_ptr()
},
body_len: body.len(),
};
unsafe {
do_conn_notify(
self.c_server,
conn,
¬ify,
&socket_send,
&mut write_buf,
ts,
);
}
}
}
ServerEvent::NotifyAll { event, path, body } => {
let cb = unsafe { &*(self.cb_data_ptr as *const CallbackData) };
let conns: Vec<*mut ffi::nwep_conn> = cb.conns.values().copied().collect();
if !conns.is_empty() {
let ev = CString::new(event.as_str()).unwrap_or_default();
let pth = CString::new(path.as_str()).unwrap_or_default();
let notify = ffi::nwep_notify {
event: ev.as_ptr(),
event_len: event.len(),
path: pth.as_ptr(),
path_len: path.len(),
notify_id: [0u8; 16],
has_notify_id: 0,
headers: std::ptr::null(),
header_count: 0,
body: if body.is_empty() {
std::ptr::null()
} else {
body.as_ptr()
},
body_len: body.len(),
};
for conn in conns {
unsafe {
do_conn_notify(
self.c_server,
conn,
¬ify,
&socket_send,
&mut write_buf,
ts,
);
}
}
}
}
ServerEvent::CloseConn {
peer_node_id,
error,
} => {
let cb = unsafe { &mut *(self.cb_data_ptr as *mut CallbackData) };
if let Some(&conn) = cb.conns.get(&peer_node_id) {
let conn_key = conn as usize;
let node_id = cb.conn_to_node_id.remove(&conn_key).unwrap_or(peer_node_id);
cb.conns.remove(&peer_node_id);
if let Ok(mut peers) = cb.peers.lock() {
peers.retain(|&n| n != peer_node_id);
}
cb.closing_conns.insert(conn_key);
if let Some(cb_fn) = &cb.on_disconnect {
let peer = unsafe {
if conn.is_null() {
std::ptr::null()
} else {
ffi::nwep_conn_get_peer_identity(conn)
}
};
let identity = if peer.is_null() {
Identity {
pubkey: [0u8; 32],
node_id,
}
} else {
let mut id = unsafe { Identity::from(*peer) };
if id.node_id.0 == [0u8; 32] {
id.node_id = node_id;
}
id
};
cb_fn(
ConnInfo {
node_id,
peer_identity: identity,
role: ServerRole::Regular,
},
error,
);
}
unsafe {
ffi::nwep_conn_close(conn, error);
}
drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
}
}
}
}
unsafe {
ffi::nwep_server_close(self.c_server);
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
loop {
let ts = now_ns();
ffi::nwep_server_handle_expiry(self.c_server, ts);
drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
let expiry = ffi::nwep_server_get_expiry(self.c_server);
if expiry == u64::MAX || std::time::Instant::now() >= deadline {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
ffi::nwep_server_free(self.c_server);
}
Ok(())
}
}
impl Drop for EventLoop {
fn drop(&mut self) {
if !self.cb_data_ptr.is_null() {
unsafe {
drop(Box::from_raw(self.cb_data_ptr as *mut CallbackData));
}
self.cb_data_ptr = std::ptr::null_mut();
}
}
}
fn now_ns() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
fn drain_writes(server: *mut ffi::nwep_server, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
loop {
let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
let n =
unsafe { ffi::nwep_server_write(server, &mut path, buf.as_mut_ptr(), buf.len(), ts) };
if n <= 0 {
break;
}
if let Some(addr) = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen) {
socket.send_to(&buf[..n as usize], addr).ok();
}
}
}
fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
let (local_sa, local_len) = socketaddr_to_sockaddr(local);
let (remote_sa, remote_len) = socketaddr_to_sockaddr(remote);
path.local_addr = local_sa;
path.local_addrlen = local_len;
path.remote_addr = remote_sa;
path.remote_addrlen = remote_len;
path
}
fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
let (is_v4, v4_octets, port) = match addr {
SocketAddr::V4(v4) => (true, Some(v4.ip().octets()), v4.port()),
SocketAddr::V6(v6) => {
let octets = v6.ip().octets();
let is_mapped =
octets[..10].iter().all(|&b| b == 0) && octets[10] == 0xff && octets[11] == 0xff;
if is_mapped {
(
true,
Some([octets[12], octets[13], octets[14], octets[15]]),
v6.port(),
)
} else {
(false, None, v6.port())
}
}
};
if is_v4 {
let sin: &mut sockaddr_in = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
sin.sin_family = AF_INET as sa_family_t;
sin.sin_port = port.to_be();
sin.sin_addr.s_addr = u32::from_ne_bytes(v4_octets.unwrap());
(storage, std::mem::size_of::<sockaddr_in>())
} else {
let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
sin6.sin6_family = AF_INET6 as sa_family_t;
sin6.sin6_port = port.to_be();
sin6.sin6_addr.s6_addr = match addr {
SocketAddr::V6(v6) => v6.ip().octets(),
SocketAddr::V4(_) => unreachable!(),
};
(storage, std::mem::size_of::<sockaddr_in6>())
}
}
fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
let family = storage.ss_family as i32;
match family {
AF_INET => {
let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
let v4 = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
let port = u16::from_be(sin.sin_port);
let mapped = std::net::Ipv6Addr::from(v4.to_ipv6_mapped().octets());
Some(SocketAddr::new(mapped.into(), port))
}
AF_INET6 => {
let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
let ip = std::net::Ipv6Addr::from(sin6.sin6_addr.s6_addr);
let port = u16::from_be(sin6.sin6_port);
Some(SocketAddr::new(ip.into(), port))
}
_ => None,
}
}
pub struct Router {
routes: Vec<(String, Box<dyn Handler>)>,
prefix_routes: Vec<(String, Box<dyn Handler>)>,
}
impl Router {
pub fn new() -> Self {
Router {
routes: Vec::new(),
prefix_routes: Vec::new(),
}
}
pub fn handle(&mut self, path: &str, h: impl Handler) {
self.routes.push((path.to_string(), Box::new(h)));
}
pub fn handle_func<F: Fn(&mut ResponseWriter, &Request) + Send + 'static>(
&mut self,
path: &str,
f: F,
) {
self.routes.push((path.to_string(), Box::new(f)));
}
pub fn handle_prefix(&mut self, prefix: &str, h: impl Handler) {
self.prefix_routes.push((prefix.to_string(), Box::new(h)));
}
}
impl Handler for Router {
fn serve(&self, w: &mut ResponseWriter, r: &Request) {
for (path, h) in &self.routes {
if path == &r.path {
h.serve(w, r);
return;
}
}
for (prefix, h) in &self.prefix_routes {
if r.path.starts_with(prefix.as_str()) {
h.serve(w, r);
return;
}
}
let _ = w.respond(crate::protocol::STATUS_NOT_FOUND, b"");
}
}
impl Default for Router {
fn default() -> Self {
Self::new()
}
}