1#![allow(unsafe_op_in_unsafe_fn)]
2
3use crate::anchorserver::AnchorServer;
4use crate::error::{Error, check};
5use crate::ffi;
6use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
7use crate::keypair::Keypair;
8use crate::logserver::LogServer;
9use crate::msg::CRequest;
10use crate::role::ServerRole;
11use crate::types::{
12 DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_STREAMS, DEFAULT_TIMEOUT, Header, Identity, NodeId,
13 SECONDS,
14};
15use std::collections::{HashMap, HashSet};
16use std::ffi::CString;
17use std::net::{SocketAddr, UdpSocket};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, mpsc};
20use std::time::Duration;
21
22const UDP_BUF_SIZE: usize = 65536;
23
24pub struct Settings {
28 pub max_streams: u32,
30 pub max_message_size: u32,
32 pub timeout_ms: u32,
36 pub compression: String,
38 pub role: ServerRole,
40}
41
42impl Default for Settings {
43 fn default() -> Self {
44 Settings {
45 max_streams: DEFAULT_MAX_STREAMS,
46 max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
47 timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
48 compression: String::new(),
49 role: ServerRole::Regular,
50 }
51 }
52}
53
54pub struct NotifyOptions {
59 pub headers: Vec<Header>,
61 pub notify_id: Option<[u8; 16]>,
66}
67
68#[derive(Clone, Debug)]
73pub struct ConnInfo {
74 pub node_id: NodeId,
76 pub peer_identity: Identity,
78 pub role: ServerRole,
80}
81
82enum ServerEvent {
83 Packet {
84 data: Vec<u8>,
85 local: SocketAddr,
86 remote: SocketAddr,
87 },
88 TimerExpiry,
89 Notify {
90 peer_node_id: NodeId,
91 event: String,
92 path: String,
93 body: Vec<u8>,
94 options: Option<NotifyOptions>,
95 },
96 NotifyAll {
97 event: String,
98 path: String,
99 body: Vec<u8>,
100 },
101 CloseConn {
102 peer_node_id: NodeId,
103 error: i32,
104 },
105 Shutdown,
106}
107
108pub struct Request {
113 pub method: String,
115 pub path: String,
117 pub body: Vec<u8>,
119 pub request_id: [u8; 16],
121 pub trace_id: [u8; 16],
123 pub headers: Vec<Header>,
125 pub conn: ConnInfo,
127}
128
129impl Request {
130 pub fn header(&self, name: &str) -> Option<&str> {
132 self.headers
133 .iter()
134 .find(|h| h.name == name)
135 .map(|h| h.value.as_str())
136 }
137}
138
139pub trait Handler: Send + 'static {
145 fn serve(&self, w: &mut ResponseWriter, r: &Request);
150}
151
152impl<F: Fn(&mut ResponseWriter, &Request) + Send + 'static> Handler for F {
153 fn serve(&self, w: &mut ResponseWriter, r: &Request) {
154 self(w, r)
155 }
156}
157
158pub struct ResponseWriter {
170 pub(crate) stream: *mut ffi::nwep_stream,
171 pub(crate) status: String,
172 pub(crate) headers: Vec<Header>,
173 pub(crate) sent: bool,
174}
175
176impl ResponseWriter {
177 pub fn set_status(&mut self, status: &str) {
181 self.status = status.to_string();
182 }
183
184 pub fn set_header(&mut self, name: &str, value: &str) {
186 self.headers.push(Header::new(name, value));
187 }
188
189 pub fn write(&mut self, body: &[u8]) -> Result<(), Error> {
200 if self.sent {
201 return Ok(());
202 }
203 self.sent = true;
204 let status = CString::new(self.status.as_str()).unwrap_or_default();
205 let c_headers: Vec<ffi::nwep_header> = self
206 .headers
207 .iter()
208 .map(|h| ffi::nwep_header {
209 name: h.name.as_ptr(),
210 name_len: h.name.len(),
211 value: h.value.as_ptr(),
212 value_len: h.value.len(),
213 })
214 .collect();
215 let resp = ffi::nwep_response {
216 status: status.as_ptr(),
217 status_len: self.status.len(),
218 status_details: std::ptr::null(),
219 status_details_len: 0,
220 headers: if c_headers.is_empty() {
221 std::ptr::null()
222 } else {
223 c_headers.as_ptr()
224 },
225 header_count: c_headers.len(),
226 body: if body.is_empty() {
227 std::ptr::null()
228 } else {
229 body.as_ptr()
230 },
231 body_len: body.len(),
232 };
233 check(unsafe { ffi::nwep_stream_respond(self.stream, &resp) })?;
234 check(unsafe { ffi::nwep_stream_end(self.stream) })?;
235 Ok(())
236 }
237
238 pub fn respond(&mut self, status: &str, body: &[u8]) -> Result<(), Error> {
247 self.set_status(status);
248 self.write(body)
249 }
250
251 pub fn stream_write(&mut self, data: &[u8]) -> Result<isize, Error> {
261 let n = unsafe { ffi::nwep_stream_write(self.stream, data.as_ptr(), data.len()) };
262 if n < 0 {
263 Err(Error::from_code(n as i32))
264 } else {
265 Ok(n)
266 }
267 }
268
269 pub fn stream_end(&mut self) -> Result<(), Error> {
275 check(unsafe { ffi::nwep_stream_end(self.stream) })
276 }
277
278 pub fn stream_close(&mut self, err: i32) {
284 unsafe { ffi::nwep_stream_close(self.stream, err) }
285 }
286
287 pub fn stream_id(&self) -> i64 {
291 unsafe { ffi::nwep_stream_get_id(self.stream) }
292 }
293
294 pub fn is_server_initiated(&self) -> bool {
299 unsafe { ffi::nwep_stream_is_server_initiated(self.stream) != 0 }
300 }
301}
302
303struct CallbackData {
306 handler: Box<dyn Handler>,
307 on_connect: Option<Box<dyn Fn(ConnInfo) + Send>>,
308 on_disconnect: Option<Box<dyn Fn(ConnInfo, i32) + Send>>,
309 conns: HashMap<NodeId, *mut ffi::nwep_conn>,
312 conn_to_node_id: HashMap<usize, NodeId>,
315 closing_conns: HashSet<usize>,
319 peers: Arc<Mutex<Vec<NodeId>>>,
321 log_server: Option<Arc<Mutex<LogServer>>>,
324 anchor_server: Option<Arc<Mutex<AnchorServer>>>,
325}
326
327unsafe extern "C" fn server_on_connect(
328 conn: *mut ffi::nwep_conn,
329 peer: *const ffi::nwep_identity,
330 user_data: *mut std::ffi::c_void,
331) -> std::ffi::c_int {
332 let cb = &mut *(user_data as *mut CallbackData);
333 let mut identity = if peer.is_null() {
336 Identity::default()
337 } else {
338 Identity::from(*peer)
339 };
340 if identity.node_id.0 == [0u8; 32] && identity.pubkey != [0u8; 32] {
341 let mut nid = ffi::nwep_nodeid { data: [0u8; 32] };
342 if ffi::nwep_nodeid_from_pubkey(&mut nid, identity.pubkey.as_ptr()) == 0 {
343 identity.node_id = NodeId(nid.data);
344 }
345 }
346 let node_id = NodeId(identity.node_id.0);
347 let role = if conn.is_null() {
348 ServerRole::Regular
349 } else {
350 let role_ptr = ffi::nwep_conn_get_role(conn);
351 if role_ptr.is_null() {
352 ServerRole::Regular
353 } else {
354 ServerRole::from_str(
355 std::ffi::CStr::from_ptr(role_ptr)
356 .to_str()
357 .unwrap_or("regular"),
358 )
359 }
360 };
361
362 if !conn.is_null() {
364 cb.conns.insert(node_id, conn);
365 cb.conn_to_node_id.insert(conn as usize, node_id);
366 if let Ok(mut peers) = cb.peers.lock() {
367 peers.push(node_id);
368 }
369 }
370
371 if let Some(cb_fn) = &cb.on_connect {
372 cb_fn(ConnInfo {
373 node_id,
374 peer_identity: identity,
375 role,
376 });
377 }
378 0
379}
380
381unsafe extern "C" fn server_on_disconnect(
382 conn: *mut ffi::nwep_conn,
383 error: std::ffi::c_int,
384 user_data: *mut std::ffi::c_void,
385) {
386 let cb = &mut *(user_data as *mut CallbackData);
387 let node_id = match cb.conn_to_node_id.remove(&(conn as usize)) {
390 Some(nid) => nid,
391 None => {
392 cb.closing_conns.remove(&(conn as usize));
394 return;
395 }
396 };
397 cb.closing_conns.remove(&(conn as usize));
398 cb.conns.remove(&node_id);
399 if let Ok(mut peers) = cb.peers.lock() {
400 peers.retain(|&n| n != node_id);
401 }
402
403 if let Some(cb_fn) = &cb.on_disconnect {
404 let peer = if conn.is_null() {
406 std::ptr::null()
407 } else {
408 ffi::nwep_conn_get_peer_identity(conn)
409 };
410 let identity = if peer.is_null() {
411 Identity {
412 pubkey: [0u8; 32],
413 node_id,
414 }
415 } else {
416 let mut id = Identity::from(*peer);
417 if id.node_id.0 == [0u8; 32] {
418 id.node_id = node_id;
419 }
420 id
421 };
422 cb_fn(
423 ConnInfo {
424 node_id,
425 peer_identity: identity,
426 role: ServerRole::Regular,
427 },
428 error,
429 );
430 }
431}
432
433unsafe extern "C" fn server_on_request(
434 conn: *mut ffi::nwep_conn,
435 stream: *mut ffi::nwep_stream,
436 req: *const ffi::nwep_request,
437 user_data: *mut std::ffi::c_void,
438) -> std::ffi::c_int {
439 let cb = &mut *(user_data as *mut CallbackData);
440 if req.is_null() || stream.is_null() {
441 return 0;
442 }
443 if !conn.is_null() && cb.closing_conns.contains(&(conn as usize)) {
446 ffi::nwep_stream_close(stream, 0);
447 return 0;
448 }
449 let c_req = CRequest::from_ffi(&*req);
450
451 if cb.log_server.is_some() || cb.anchor_server.is_some() {
454 if let Some(ref ls_arc) = cb.log_server {
455 if c_req.path == "/log" || c_req.path.starts_with("/log/") {
456 if let Ok(mut ls) = ls_arc.lock() {
457 if ls.handle_request(stream, req).is_err() {
458 sub_server_error_respond(stream, crate::protocol::STATUS_INTERNAL_ERROR);
459 }
460 }
461 ffi::nwep_stream_end(stream);
462 return 0;
463 }
464 }
465 if let Some(ref as_arc) = cb.anchor_server {
466 if c_req.path == "/checkpoint" || c_req.path.starts_with("/checkpoint/") {
467 if let Ok(mut as_) = as_arc.lock() {
468 if as_.handle_request(stream, req).is_err() {
469 sub_server_error_respond(stream, crate::protocol::STATUS_INTERNAL_ERROR);
470 }
471 }
472 ffi::nwep_stream_end(stream);
473 return 0;
474 }
475 }
476 }
477
478 let peer = if conn.is_null() {
479 std::ptr::null()
480 } else {
481 ffi::nwep_conn_get_peer_identity(conn)
482 };
483 let mut identity = if peer.is_null() {
484 Identity::default()
485 } else {
486 Identity::from(*peer)
487 };
488 if identity.node_id.0 == [0u8; 32] && identity.pubkey != [0u8; 32] {
489 let mut nid = ffi::nwep_nodeid { data: [0u8; 32] };
490 if ffi::nwep_nodeid_from_pubkey(&mut nid, identity.pubkey.as_ptr()) == 0 {
491 identity.node_id = NodeId(nid.data);
492 }
493 }
494 let role = if conn.is_null() {
495 ServerRole::Regular
496 } else {
497 let role_ptr = ffi::nwep_conn_get_role(conn);
498 if role_ptr.is_null() {
499 ServerRole::Regular
500 } else {
501 ServerRole::from_str(
502 std::ffi::CStr::from_ptr(role_ptr)
503 .to_str()
504 .unwrap_or("regular"),
505 )
506 }
507 };
508 let conn_info = ConnInfo {
509 node_id: NodeId(identity.node_id.0),
510 peer_identity: identity,
511 role,
512 };
513 let request = Request {
514 method: c_req.method,
515 path: c_req.path,
516 body: c_req.body,
517 request_id: c_req.request_id,
518 trace_id: c_req.trace_id,
519 headers: c_req.headers,
520 conn: conn_info,
521 };
522 let mut writer = ResponseWriter {
523 stream,
524 status: crate::protocol::STATUS_OK.to_string(),
525 headers: Vec::new(),
526 sent: false,
527 };
528 cb.handler.serve(&mut writer, &request);
529 if !writer.sent {
530 let _ = writer.write(b"");
531 }
532 0
533}
534
535unsafe extern "C" fn server_rand(
536 dest: *mut u8,
537 len: usize,
538 _user_data: *mut std::ffi::c_void,
539) -> std::ffi::c_int {
540 let slice = std::slice::from_raw_parts_mut(dest, len);
541 match crate::crypto::random_bytes(slice) {
542 Ok(()) => 0,
543 Err(_) => -1,
544 }
545}
546
547unsafe fn sub_server_error_respond(stream: *mut ffi::nwep_stream, status: &str) {
549 let status_c = CString::new(status).unwrap_or_default();
550 let resp = ffi::nwep_response {
551 status: status_c.as_ptr(),
552 status_len: status.len(),
553 status_details: std::ptr::null(),
554 status_details_len: 0,
555 headers: std::ptr::null(),
556 header_count: 0,
557 body: std::ptr::null(),
558 body_len: 0,
559 };
560 let _ = ffi::nwep_stream_respond(stream, &resp);
561}
562
563unsafe fn do_conn_notify(
566 c_server: *mut ffi::nwep_server,
567 conn: *mut ffi::nwep_conn,
568 notify: &ffi::nwep_notify,
569 socket: &UdpSocket,
570 write_buf: &mut Vec<u8>,
571 ts: u64,
572) {
573 let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
574 let rc = ffi::nwep_conn_notify(conn, notify, &mut stream);
575 if rc == 0 && !stream.is_null() {
576 ffi::nwep_stream_end(stream);
577 }
578 drain_writes(c_server, socket, write_buf, ts);
579}
580
581pub struct Server {
587 event_tx: mpsc::SyncSender<ServerEvent>,
588 node_id: NodeId,
589 addr: SocketAddr,
590 shutdown_flag: Arc<AtomicBool>,
591 peers: Arc<Mutex<Vec<NodeId>>>,
593 log_server: Option<Arc<Mutex<LogServer>>>,
595 anchor_server: Option<Arc<Mutex<AnchorServer>>>,
596}
597
598impl Server {
599 pub fn node_id(&self) -> NodeId {
601 self.node_id
602 }
603
604 pub fn addr(&self) -> SocketAddr {
606 self.addr
607 }
608
609 pub fn url(&self, path: &str) -> String {
621 use std::net::{IpAddr, Ipv4Addr};
622 let ip = match self.addr.ip() {
624 ip if ip.is_unspecified() => IpAddr::V4(Ipv4Addr::LOCALHOST),
625 ip => ip,
626 };
627 let nwep_addr = match ip {
628 IpAddr::V4(v4) => crate::addr::Addr::new_ipv4(v4, self.node_id, self.addr.port()),
629 IpAddr::V6(v6) => crate::addr::Addr::new_ipv6(v6, self.node_id, self.addr.port()),
630 };
631 let url = crate::addr::Url {
632 addr: nwep_addr,
633 path: path.to_string(),
634 };
635 url.format()
636 .unwrap_or_else(|_| format!("web://[{}]:{}{}", ip, self.addr.port(), path))
637 }
638
639 pub fn connection_count(&self) -> usize {
644 self.peers.lock().map(|p| p.len()).unwrap_or(0)
645 }
646
647 pub fn connected_peers(&self) -> Vec<NodeId> {
649 self.peers.lock().map(|p| p.clone()).unwrap_or_default()
650 }
651
652 pub fn shutdown(&self) {
657 self.shutdown_flag.store(true, Ordering::SeqCst);
658 let _ = self.event_tx.send(ServerEvent::Shutdown);
659 }
660
661 pub fn notify(
671 &self,
672 peer_node_id: NodeId,
673 event: &str,
674 path: &str,
675 body: Vec<u8>,
676 ) -> Result<(), Error> {
677 self.event_tx
678 .send(ServerEvent::Notify {
679 peer_node_id,
680 event: event.to_string(),
681 path: path.to_string(),
682 body,
683 options: None,
684 })
685 .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
686 }
687
688 pub fn notify_with_options(
697 &self,
698 peer_node_id: NodeId,
699 event: &str,
700 path: &str,
701 body: Vec<u8>,
702 options: NotifyOptions,
703 ) -> Result<(), Error> {
704 self.event_tx
705 .send(ServerEvent::Notify {
706 peer_node_id,
707 event: event.to_string(),
708 path: path.to_string(),
709 body,
710 options: Some(options),
711 })
712 .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
713 }
714
715 pub fn notify_all(&self, event: &str, path: &str, body: Vec<u8>) -> Result<(), Error> {
725 self.event_tx
726 .send(ServerEvent::NotifyAll {
727 event: event.to_string(),
728 path: path.to_string(),
729 body,
730 })
731 .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
732 }
733
734 pub fn close_connection(&self, peer_node_id: NodeId, error: i32) -> Result<(), Error> {
745 self.event_tx
746 .send(ServerEvent::CloseConn {
747 peer_node_id,
748 error,
749 })
750 .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
751 }
752
753 pub fn log_server(&self) -> Option<Arc<Mutex<LogServer>>> {
758 self.log_server.clone()
759 }
760
761 pub fn anchor_server(&self) -> Option<Arc<Mutex<AnchorServer>>> {
766 self.anchor_server.clone()
767 }
768}
769
770pub struct ServerBuilder {
775 addr: String,
776 keypair: Keypair,
777 settings: Settings,
778 on_connect: Option<Box<dyn Fn(ConnInfo) + Send>>,
779 on_disconnect: Option<Box<dyn Fn(ConnInfo, i32) + Send>>,
780 log_server: Option<Arc<Mutex<LogServer>>>,
781 anchor_server: Option<Arc<Mutex<AnchorServer>>>,
782}
783
784impl ServerBuilder {
785 pub fn new(addr: impl Into<String>, keypair: Keypair) -> Self {
790 ServerBuilder {
791 addr: addr.into(),
792 keypair,
793 settings: Settings::default(),
794 on_connect: None,
795 on_disconnect: None,
796 log_server: None,
797 anchor_server: None,
798 }
799 }
800
801 pub fn settings(mut self, s: Settings) -> Self {
803 self.settings = s;
804 self
805 }
806
807 pub fn on_connect<F: Fn(ConnInfo) + Send + 'static>(mut self, f: F) -> Self {
812 self.on_connect = Some(Box::new(f));
813 self
814 }
815
816 pub fn on_disconnect<F: Fn(ConnInfo, i32) + Send + 'static>(mut self, f: F) -> Self {
821 self.on_disconnect = Some(Box::new(f));
822 self
823 }
824
825 pub fn log_server(mut self, ls: Arc<Mutex<LogServer>>) -> Self {
829 self.log_server = Some(ls);
830 self
831 }
832
833 pub fn anchor_server(mut self, as_: Arc<Mutex<AnchorServer>>) -> Self {
835 self.anchor_server = Some(as_);
836 self
837 }
838
839 pub fn build<H: Handler>(mut self, handler: H) -> Result<(Server, EventLoop), Error> {
840 let node_id = self.keypair.node_id()?;
841 let socket = UdpSocket::bind(&self.addr)
846 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
847 let local_addr = socket
848 .local_addr()
849 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
850
851 let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
852 let role_str = CString::new(self.settings.role.as_str()).unwrap_or_default();
853 let ffi_settings = ffi::nwep_settings {
854 max_streams: self.settings.max_streams,
855 max_message_size: self.settings.max_message_size,
856 timeout_ms: self.settings.timeout_ms,
857 compression: compression.as_ptr(),
858 role: role_str.as_ptr(),
859 };
860
861 let peers = Arc::new(Mutex::new(Vec::<NodeId>::new()));
863
864 let cb_data = Box::new(CallbackData {
865 handler: Box::new(handler),
866 on_connect: self.on_connect,
867 on_disconnect: self.on_disconnect,
868 conns: HashMap::new(),
869 conn_to_node_id: HashMap::new(),
870 closing_conns: HashSet::new(),
871 peers: Arc::clone(&peers),
872 log_server: self.log_server.clone(),
873 anchor_server: self.anchor_server.clone(),
874 });
875 let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;
876
877 let callbacks = ffi::nwep_callbacks {
878 on_connect: Some(server_on_connect),
879 on_disconnect: Some(server_on_disconnect),
880 on_request: Some(server_on_request),
881 on_response: None,
882 on_notify: None,
883 on_stream_data: None,
884 on_stream_end: None,
885 rand: Some(server_rand),
886 log: None,
887 };
888
889 let mut c_server: *mut ffi::nwep_server = std::ptr::null_mut();
890 check(unsafe {
891 ffi::nwep_server_new(
892 &mut c_server,
893 &ffi_settings,
894 &callbacks,
895 self.keypair.as_ffi_mut(),
896 cb_data_ptr,
897 )
898 })?;
899
900 let (event_tx, event_rx) = mpsc::sync_channel::<ServerEvent>(1024);
901 let shutdown_flag = Arc::new(AtomicBool::new(false));
902
903 let server = Server {
904 event_tx: event_tx.clone(),
905 node_id,
906 addr: local_addr,
907 shutdown_flag: shutdown_flag.clone(),
908 peers,
909 log_server: self.log_server,
910 anchor_server: self.anchor_server,
911 };
912
913 let event_loop = EventLoop {
914 c_server,
915 socket,
916 local_addr,
917 event_rx,
918 event_tx,
919 shutdown_flag,
920 cb_data_ptr,
921 _keypair: self.keypair,
922 };
923
924 Ok((server, event_loop))
925 }
926}
927
928pub struct EventLoop {
929 c_server: *mut ffi::nwep_server,
930 socket: UdpSocket,
931 local_addr: SocketAddr,
932 event_rx: mpsc::Receiver<ServerEvent>,
933 event_tx: mpsc::SyncSender<ServerEvent>,
934 shutdown_flag: Arc<AtomicBool>,
935 cb_data_ptr: *mut std::ffi::c_void,
936 _keypair: crate::keypair::Keypair,
939}
940
941unsafe impl Send for EventLoop {}
942
943impl EventLoop {
944 pub fn run(self) -> Result<(), Error> {
945 let socket_recv = self
947 .socket
948 .try_clone()
949 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
950 let event_tx_recv = self.event_tx.clone();
951 let local_addr = self.local_addr;
952 let shutdown_flag_recv = self.shutdown_flag.clone();
953 std::thread::spawn(move || {
954 let mut buf = vec![0u8; UDP_BUF_SIZE];
955 loop {
956 if shutdown_flag_recv.load(Ordering::SeqCst) {
957 break;
958 }
959 socket_recv
960 .set_read_timeout(Some(Duration::from_millis(100)))
961 .ok();
962 match socket_recv.recv_from(&mut buf) {
963 Ok((n, remote)) => {
964 let _ = event_tx_recv.send(ServerEvent::Packet {
965 data: buf[..n].to_vec(),
966 local: local_addr,
967 remote,
968 });
969 }
970 Err(e)
971 if e.kind() == std::io::ErrorKind::WouldBlock
972 || e.kind() == std::io::ErrorKind::TimedOut => {}
973 Err(_) => {
974 if shutdown_flag_recv.load(Ordering::SeqCst) {
975 break;
976 }
977 }
978 }
979 }
980 });
981
982 let event_tx_timer = self.event_tx.clone();
984 let shutdown_flag_timer = self.shutdown_flag.clone();
985 std::thread::spawn(move || {
986 loop {
987 std::thread::sleep(Duration::from_millis(10));
988 if shutdown_flag_timer.load(Ordering::SeqCst) {
989 break;
990 }
991 let _ = event_tx_timer.send(ServerEvent::TimerExpiry);
992 }
993 });
994
995 let mut write_buf = vec![0u8; UDP_BUF_SIZE];
996 let socket_send = self
997 .socket
998 .try_clone()
999 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
1000
1001 loop {
1002 let event = match self.event_rx.recv_timeout(Duration::from_millis(100)) {
1003 Ok(e) => e,
1004 Err(mpsc::RecvTimeoutError::Timeout) => {
1005 if self.shutdown_flag.load(Ordering::SeqCst) {
1006 break;
1007 }
1008 continue;
1009 }
1010 Err(mpsc::RecvTimeoutError::Disconnected) => break,
1011 };
1012
1013 let ts = now_ns();
1014
1015 match event {
1016 ServerEvent::Shutdown => break,
1017
1018 ServerEvent::Packet {
1019 data,
1020 local,
1021 remote,
1022 } => {
1023 let path = make_path(&local, &remote);
1024 unsafe {
1025 ffi::nwep_server_read(self.c_server, &path, data.as_ptr(), data.len(), ts);
1026 }
1027 drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1028 }
1029
1030 ServerEvent::TimerExpiry => {
1031 let expiry = unsafe { ffi::nwep_server_get_expiry(self.c_server) };
1032 if expiry != u64::MAX && ts >= expiry {
1033 unsafe {
1034 ffi::nwep_server_handle_expiry(self.c_server, ts);
1035 }
1036 drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1037 }
1038 }
1039
1040 ServerEvent::Notify {
1041 peer_node_id,
1042 event,
1043 path,
1044 body,
1045 options,
1046 } => {
1047 let cb = unsafe { &*(self.cb_data_ptr as *const CallbackData) };
1048 if let Some(&conn) = cb.conns.get(&peer_node_id) {
1049 let ev = CString::new(event.as_str()).unwrap_or_default();
1050 let pth = CString::new(path.as_str()).unwrap_or_default();
1051 let c_headers: Vec<ffi::nwep_header> = options
1053 .as_ref()
1054 .map(|o| {
1055 o.headers
1056 .iter()
1057 .map(|h| ffi::nwep_header {
1058 name: h.name.as_ptr(),
1059 name_len: h.name.len(),
1060 value: h.value.as_ptr(),
1061 value_len: h.value.len(),
1062 })
1063 .collect()
1064 })
1065 .unwrap_or_default();
1066 let notify = ffi::nwep_notify {
1067 event: ev.as_ptr(),
1068 event_len: event.len(),
1069 path: pth.as_ptr(),
1070 path_len: path.len(),
1071 notify_id: options
1072 .as_ref()
1073 .and_then(|o| o.notify_id)
1074 .unwrap_or([0u8; 16]),
1075 has_notify_id: options.as_ref().and_then(|o| o.notify_id).is_some()
1076 as i32,
1077 headers: if c_headers.is_empty() {
1078 std::ptr::null()
1079 } else {
1080 c_headers.as_ptr()
1081 },
1082 header_count: c_headers.len(),
1083 body: if body.is_empty() {
1084 std::ptr::null()
1085 } else {
1086 body.as_ptr()
1087 },
1088 body_len: body.len(),
1089 };
1090 unsafe {
1091 do_conn_notify(
1092 self.c_server,
1093 conn,
1094 ¬ify,
1095 &socket_send,
1096 &mut write_buf,
1097 ts,
1098 );
1099 }
1100 }
1101 }
1102
1103 ServerEvent::NotifyAll { event, path, body } => {
1104 let cb = unsafe { &*(self.cb_data_ptr as *const CallbackData) };
1105 let conns: Vec<*mut ffi::nwep_conn> = cb.conns.values().copied().collect();
1107 if !conns.is_empty() {
1108 let ev = CString::new(event.as_str()).unwrap_or_default();
1109 let pth = CString::new(path.as_str()).unwrap_or_default();
1110 let notify = ffi::nwep_notify {
1111 event: ev.as_ptr(),
1112 event_len: event.len(),
1113 path: pth.as_ptr(),
1114 path_len: path.len(),
1115 notify_id: [0u8; 16],
1116 has_notify_id: 0,
1117 headers: std::ptr::null(),
1118 header_count: 0,
1119 body: if body.is_empty() {
1120 std::ptr::null()
1121 } else {
1122 body.as_ptr()
1123 },
1124 body_len: body.len(),
1125 };
1126 for conn in conns {
1127 unsafe {
1128 do_conn_notify(
1129 self.c_server,
1130 conn,
1131 ¬ify,
1132 &socket_send,
1133 &mut write_buf,
1134 ts,
1135 );
1136 }
1137 }
1138 }
1139 }
1140
1141 ServerEvent::CloseConn {
1142 peer_node_id,
1143 error,
1144 } => {
1145 let cb = unsafe { &mut *(self.cb_data_ptr as *mut CallbackData) };
1146 if let Some(&conn) = cb.conns.get(&peer_node_id) {
1147 let conn_key = conn as usize;
1148 let node_id = cb.conn_to_node_id.remove(&conn_key).unwrap_or(peer_node_id);
1151 cb.conns.remove(&peer_node_id);
1152 if let Ok(mut peers) = cb.peers.lock() {
1153 peers.retain(|&n| n != peer_node_id);
1154 }
1155 cb.closing_conns.insert(conn_key);
1157 if let Some(cb_fn) = &cb.on_disconnect {
1161 let peer = unsafe {
1162 if conn.is_null() {
1163 std::ptr::null()
1164 } else {
1165 ffi::nwep_conn_get_peer_identity(conn)
1166 }
1167 };
1168 let identity = if peer.is_null() {
1169 Identity {
1170 pubkey: [0u8; 32],
1171 node_id,
1172 }
1173 } else {
1174 let mut id = unsafe { Identity::from(*peer) };
1175 if id.node_id.0 == [0u8; 32] {
1176 id.node_id = node_id;
1177 }
1178 id
1179 };
1180 cb_fn(
1181 ConnInfo {
1182 node_id,
1183 peer_identity: identity,
1184 role: ServerRole::Regular,
1185 },
1186 error,
1187 );
1188 }
1189 unsafe {
1190 ffi::nwep_conn_close(conn, error);
1191 }
1192 drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1193 }
1194 }
1195 }
1196 }
1197
1198 unsafe {
1199 ffi::nwep_server_close(self.c_server);
1200 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
1203 loop {
1204 let ts = now_ns();
1205 ffi::nwep_server_handle_expiry(self.c_server, ts);
1206 drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1207 let expiry = ffi::nwep_server_get_expiry(self.c_server);
1208 if expiry == u64::MAX || std::time::Instant::now() >= deadline {
1209 break;
1210 }
1211 std::thread::sleep(std::time::Duration::from_millis(10));
1212 }
1213 ffi::nwep_server_free(self.c_server);
1214 }
1215
1216 Ok(())
1217 }
1218}
1219
1220impl Drop for EventLoop {
1221 fn drop(&mut self) {
1222 if !self.cb_data_ptr.is_null() {
1223 unsafe {
1224 drop(Box::from_raw(self.cb_data_ptr as *mut CallbackData));
1225 }
1226 self.cb_data_ptr = std::ptr::null_mut();
1227 }
1228 }
1229}
1230
1231fn now_ns() -> u64 {
1232 use std::time::{SystemTime, UNIX_EPOCH};
1233 SystemTime::now()
1234 .duration_since(UNIX_EPOCH)
1235 .unwrap_or_default()
1236 .as_nanos() as u64
1237}
1238
1239fn drain_writes(server: *mut ffi::nwep_server, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
1240 loop {
1241 let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
1242 let n =
1243 unsafe { ffi::nwep_server_write(server, &mut path, buf.as_mut_ptr(), buf.len(), ts) };
1244 if n <= 0 {
1245 break;
1246 }
1247 if let Some(addr) = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen) {
1248 socket.send_to(&buf[..n as usize], addr).ok();
1249 }
1250 }
1251}
1252
1253fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
1254 let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
1255 let (local_sa, local_len) = socketaddr_to_sockaddr(local);
1256 let (remote_sa, remote_len) = socketaddr_to_sockaddr(remote);
1257 path.local_addr = local_sa;
1258 path.local_addrlen = local_len;
1259 path.remote_addr = remote_sa;
1260 path.remote_addrlen = remote_len;
1261 path
1262}
1263
1264fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
1265 let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
1266 let (is_v4, v4_octets, port) = match addr {
1270 SocketAddr::V4(v4) => (true, Some(v4.ip().octets()), v4.port()),
1271 SocketAddr::V6(v6) => {
1272 let octets = v6.ip().octets();
1273 let is_mapped =
1274 octets[..10].iter().all(|&b| b == 0) && octets[10] == 0xff && octets[11] == 0xff;
1275 if is_mapped {
1276 (
1277 true,
1278 Some([octets[12], octets[13], octets[14], octets[15]]),
1279 v6.port(),
1280 )
1281 } else {
1282 (false, None, v6.port())
1283 }
1284 }
1285 };
1286 if is_v4 {
1287 let sin: &mut sockaddr_in = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
1288 sin.sin_family = AF_INET as sa_family_t;
1289 sin.sin_port = port.to_be();
1290 sin.sin_addr.s_addr = u32::from_ne_bytes(v4_octets.unwrap());
1291 (storage, std::mem::size_of::<sockaddr_in>())
1292 } else {
1293 let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
1294 sin6.sin6_family = AF_INET6 as sa_family_t;
1295 sin6.sin6_port = port.to_be();
1296 sin6.sin6_addr.s6_addr = match addr {
1297 SocketAddr::V6(v6) => v6.ip().octets(),
1298 SocketAddr::V4(_) => unreachable!(),
1299 };
1300 (storage, std::mem::size_of::<sockaddr_in6>())
1301 }
1302}
1303
1304fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
1305 let family = storage.ss_family as i32;
1306 match family {
1307 AF_INET => {
1308 let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
1310 let v4 = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
1311 let port = u16::from_be(sin.sin_port);
1312 let mapped = std::net::Ipv6Addr::from(v4.to_ipv6_mapped().octets());
1313 Some(SocketAddr::new(mapped.into(), port))
1314 }
1315 AF_INET6 => {
1316 let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
1317 let ip = std::net::Ipv6Addr::from(sin6.sin6_addr.s6_addr);
1318 let port = u16::from_be(sin6.sin6_port);
1319 Some(SocketAddr::new(ip.into(), port))
1320 }
1321 _ => None,
1322 }
1323}
1324
1325pub struct Router {
1326 routes: Vec<(String, Box<dyn Handler>)>,
1327 prefix_routes: Vec<(String, Box<dyn Handler>)>,
1328}
1329
1330impl Router {
1331 pub fn new() -> Self {
1332 Router {
1333 routes: Vec::new(),
1334 prefix_routes: Vec::new(),
1335 }
1336 }
1337
1338 pub fn handle(&mut self, path: &str, h: impl Handler) {
1339 self.routes.push((path.to_string(), Box::new(h)));
1340 }
1341
1342 pub fn handle_func<F: Fn(&mut ResponseWriter, &Request) + Send + 'static>(
1343 &mut self,
1344 path: &str,
1345 f: F,
1346 ) {
1347 self.routes.push((path.to_string(), Box::new(f)));
1348 }
1349
1350 pub fn handle_prefix(&mut self, prefix: &str, h: impl Handler) {
1351 self.prefix_routes.push((prefix.to_string(), Box::new(h)));
1352 }
1353}
1354
1355impl Handler for Router {
1356 fn serve(&self, w: &mut ResponseWriter, r: &Request) {
1357 for (path, h) in &self.routes {
1358 if path == &r.path {
1359 h.serve(w, r);
1360 return;
1361 }
1362 }
1363 for (prefix, h) in &self.prefix_routes {
1364 if r.path.starts_with(prefix.as_str()) {
1365 h.serve(w, r);
1366 return;
1367 }
1368 }
1369 let _ = w.respond(crate::protocol::STATUS_NOT_FOUND, b"");
1370 }
1371}
1372
1373impl Default for Router {
1374 fn default() -> Self {
1375 Self::new()
1376 }
1377}