1#![allow(unsafe_op_in_unsafe_fn)]
2
3
4use crate::ffi;
5use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
6use crate::error::{check, Error};
7use crate::keypair::Keypair;
8use crate::types::{Header, NodeId, DEFAULT_MAX_STREAMS, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_TIMEOUT, SECONDS};
9use crate::msg::{CResponse, CNotify};
10use crate::addr::Url;
11use std::ffi::CString;
12use std::net::{UdpSocket, SocketAddr};
13use std::sync::{mpsc, Arc};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16use std::collections::HashMap;
17
18const UDP_BUF_SIZE: usize = 65536;
19
20pub struct Settings {
24 pub max_streams: u32,
26 pub max_message_size: u32,
28 pub timeout_ms: u32,
33 pub compression: String,
35}
36
37impl Default for Settings {
38 fn default() -> Self {
39 Settings {
40 max_streams: DEFAULT_MAX_STREAMS,
41 max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
42 timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
43 compression: String::new(),
44 }
45 }
46}
47
48#[derive(Clone, Debug)]
54pub struct Response {
55 pub status: String,
57 pub status_details: String,
59 pub headers: Vec<Header>,
61 pub body: Vec<u8>,
63}
64
65impl Response {
66 pub fn is_ok(&self) -> bool {
68 crate::protocol::status_is_success(&self.status)
69 }
70
71 pub fn header(&self, name: &str) -> Option<&str> {
73 self.headers.iter().find(|h| h.name == name).map(|h| h.value.as_str())
74 }
75}
76
77impl From<CResponse> for Response {
78 fn from(r: CResponse) -> Self {
79 Response { status: r.status, status_details: r.status_details, headers: r.headers, body: r.body }
80 }
81}
82
83#[derive(Clone, Debug)]
89pub struct Notification {
90 pub event: String,
92 pub path: String,
94 pub headers: Vec<Header>,
96 pub body: Vec<u8>,
98}
99
100impl From<CNotify> for Notification {
101 fn from(n: CNotify) -> Self {
102 Notification { event: n.event, path: n.path, headers: n.headers, body: n.body }
103 }
104}
105
106enum ClientEvent {
107 Packet { data: Vec<u8>, local: SocketAddr, remote: SocketAddr },
108 TimerExpiry,
109 Request { method: String, path: String, body: Vec<u8>, headers: Vec<Header>, resp_tx: mpsc::SyncSender<Result<Response, Error>> },
110 Shutdown,
111}
112
113struct PendingEntry {
116 resp_tx: mpsc::SyncSender<Result<Response, Error>>,
117 resp: Option<Response>,
119 body_buf: Vec<u8>,
121}
122
123struct ClientState {
124 c_client: *mut ffi::nwep_client,
125 pending: HashMap<i64, PendingEntry>,
126 on_notify: Option<Box<dyn Fn(Notification) + Send>>,
127 disconnected: bool,
131}
132
133unsafe impl Send for ClientState {}
134
135struct CallbackData {
136 state: *mut ClientState,
137 connected_tx: Option<mpsc::SyncSender<Result<crate::types::Identity, Error>>>,
140}
141
142unsafe extern "C" fn client_on_connect(
143 _conn: *mut ffi::nwep_conn,
144 peer: *const ffi::nwep_identity,
145 user_data: *mut std::ffi::c_void,
146) -> std::ffi::c_int {
147 let cb = &mut *(user_data as *mut CallbackData);
148 if let Some(tx) = cb.connected_tx.take() {
149 let identity = if peer.is_null() {
150 crate::types::Identity::default()
151 } else {
152 crate::types::Identity::from(*peer)
153 };
154 let _ = tx.send(Ok(identity));
155 }
156 0
157}
158
159unsafe extern "C" fn client_on_disconnect(
160 _conn: *mut ffi::nwep_conn,
161 error: std::ffi::c_int,
162 user_data: *mut std::ffi::c_void,
163) {
164 let cb = &mut *(user_data as *mut CallbackData);
165 if let Some(tx) = cb.connected_tx.take() {
167 let err = if error != 0 { Error::from_code(error) }
168 else { Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED) };
169 let _ = tx.send(Err(err));
170 }
171 let state = &mut *cb.state;
173 state.disconnected = true;
174 for (_, entry) in state.pending.drain() {
176 let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
177 }
178}
179
180unsafe extern "C" fn client_on_response(
181 _conn: *mut ffi::nwep_conn,
182 stream: *mut ffi::nwep_stream,
183 resp: *const ffi::nwep_response,
184 user_data: *mut std::ffi::c_void,
185) -> std::ffi::c_int {
186 if resp.is_null() || stream.is_null() { return 0; }
187 let cb = &*(user_data as *const CallbackData);
188 let state = &mut *cb.state;
189 let stream_id = ffi::nwep_stream_get_id(stream);
190 if let Some(entry) = state.pending.get_mut(&stream_id) {
191 let c_resp = CResponse::from_ffi(&*resp);
192 entry.resp = Some(Response::from(c_resp));
193 }
194 0
195}
196
197unsafe extern "C" fn client_on_notify(
198 _conn: *mut ffi::nwep_conn,
199 _stream: *mut ffi::nwep_stream,
200 notify: *const ffi::nwep_notify,
201 user_data: *mut std::ffi::c_void,
202) -> std::ffi::c_int {
203 let cb = &*(user_data as *const CallbackData);
204 let state = &mut *cb.state;
205 if let Some(cb_fn) = &state.on_notify {
206 if !notify.is_null() {
207 let n = CNotify::from_ffi(&*notify);
208 cb_fn(Notification::from(n));
209 }
210 }
211 0
212}
213
214unsafe extern "C" fn client_on_stream_data(
216 _conn: *mut ffi::nwep_conn,
217 stream: *mut ffi::nwep_stream,
218 data: *const u8,
219 len: usize,
220 user_data: *mut std::ffi::c_void,
221) -> std::ffi::c_int {
222 if stream.is_null() || data.is_null() { return 0; }
223 let cb = &*(user_data as *const CallbackData);
224 let state = &mut *cb.state;
225 let stream_id = ffi::nwep_stream_get_id(stream);
226 if let Some(entry) = state.pending.get_mut(&stream_id) {
227 let chunk = std::slice::from_raw_parts(data, len);
228 entry.body_buf.extend_from_slice(chunk);
229 }
230 0
231}
232
233unsafe extern "C" fn client_on_stream_end(
236 _conn: *mut ffi::nwep_conn,
237 stream: *mut ffi::nwep_stream,
238 user_data: *mut std::ffi::c_void,
239) -> std::ffi::c_int {
240 if stream.is_null() { return 0; }
241 let cb = &*(user_data as *const CallbackData);
242 let state = &mut *cb.state;
243 let stream_id = ffi::nwep_stream_get_id(stream);
244 if let Some(entry) = state.pending.remove(&stream_id) {
245 if let Some(mut resp) = entry.resp {
246 if !entry.body_buf.is_empty() {
247 resp.body.extend_from_slice(&entry.body_buf);
248 }
249 let _ = entry.resp_tx.send(Ok(resp));
250 } else {
251 let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE)));
252 }
253 }
254 0
255}
256
257unsafe extern "C" fn client_rand(
258 dest: *mut u8,
259 len: usize,
260 _user_data: *mut std::ffi::c_void,
261) -> std::ffi::c_int {
262 let slice = std::slice::from_raw_parts_mut(dest, len);
263 match crate::crypto::random_bytes(slice) {
264 Ok(()) => 0,
265 Err(_) => -1,
266 }
267}
268
269pub struct Client {
275 event_tx: mpsc::SyncSender<ClientEvent>,
276 node_id: NodeId,
277 peer_identity: crate::types::Identity,
278 shutdown_flag: Arc<AtomicBool>,
279 done_rx: mpsc::Receiver<()>,
281 request_timeout: Duration,
284}
285
286impl Client {
287 pub fn node_id(&self) -> NodeId {
289 self.node_id
290 }
291
292 pub fn peer_identity(&self) -> crate::types::Identity {
295 self.peer_identity.clone()
296 }
297
298 pub fn peer_node_id(&self) -> NodeId {
302 self.peer_identity.node_id
303 }
304
305 pub fn fetch(&self, method: &str, path: &str, body: &[u8]) -> Result<Response, Error> {
314 self.fetch_with_headers(method, path, body, &[])
315 }
316
317 pub fn fetch_with_headers(&self, method: &str, path: &str, body: &[u8], headers: &[Header]) -> Result<Response, Error> {
325 let (resp_tx, resp_rx) = mpsc::sync_channel(1);
326 self.event_tx.send(ClientEvent::Request {
327 method: method.to_string(),
328 path: path.to_string(),
329 body: body.to_vec(),
330 headers: headers.to_vec(),
331 resp_tx,
332 }).map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))?;
333 resp_rx.recv_timeout(self.request_timeout)
334 .map_err(|e| match e {
335 mpsc::RecvTimeoutError::Timeout => Error::from_code(crate::error::ERR_NETWORK_TIMEOUT),
336 mpsc::RecvTimeoutError::Disconnected => Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED),
337 })?
338 }
339
340 pub fn get(&self, path: &str) -> Result<Response, Error> {
345 self.fetch(crate::protocol::METHOD_READ, path, b"")
346 }
347
348 pub fn post(&self, path: &str, body: &[u8]) -> Result<Response, Error> {
353 self.fetch(crate::protocol::METHOD_WRITE, path, body)
354 }
355
356 pub fn close(&self) {
359 self.shutdown_flag.store(true, Ordering::SeqCst);
360 let _ = self.event_tx.send(ClientEvent::Shutdown);
361 let _ = self.done_rx.recv();
363 }
364}
365
366pub struct ClientBuilder {
372 settings: Settings,
373 on_notify: Option<Box<dyn Fn(Notification) + Send>>,
374}
375
376impl ClientBuilder {
377 pub fn new() -> Self {
379 ClientBuilder { settings: Settings::default(), on_notify: None }
380 }
381
382 pub fn settings(mut self, s: Settings) -> Self {
384 self.settings = s;
385 self
386 }
387
388 pub fn on_notify<F: Fn(Notification) + Send + 'static>(mut self, f: F) -> Self {
392 self.on_notify = Some(Box::new(f));
393 self
394 }
395
396 pub fn connect(self, mut keypair: Keypair, url: &str) -> Result<Client, Error> {
407 let node_id = keypair.node_id()?;
408 let parsed_url = Url::parse(url)?;
409
410 let socket = UdpSocket::bind("[::]:0")
413 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
414 let local_addr = socket.local_addr()
415 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
416
417 let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
418 let ffi_settings = ffi::nwep_settings {
419 max_streams: self.settings.max_streams,
420 max_message_size: self.settings.max_message_size,
421 timeout_ms: self.settings.timeout_ms,
422 compression: compression.as_ptr(),
423 role: std::ptr::null(),
424 };
425
426 let state = Box::new(ClientState {
427 c_client: std::ptr::null_mut(),
428 pending: HashMap::new(),
429 on_notify: self.on_notify,
430 disconnected: false,
431 });
432 let state_ptr = Box::into_raw(state);
433
434 let (connected_tx, connected_rx) = mpsc::sync_channel::<Result<crate::types::Identity, Error>>(1);
437
438 let cb_data = Box::new(CallbackData {
439 state: state_ptr,
440 connected_tx: Some(connected_tx),
441 });
442 let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;
443
444 let callbacks = ffi::nwep_callbacks {
445 on_connect: Some(client_on_connect),
446 on_disconnect: Some(client_on_disconnect),
447 on_request: None,
448 on_response: Some(client_on_response),
449 on_notify: Some(client_on_notify),
450 on_stream_data: Some(client_on_stream_data),
451 on_stream_end: Some(client_on_stream_end),
452 rand: Some(client_rand),
453 log: None,
454 };
455
456 let mut c_client: *mut ffi::nwep_client = std::ptr::null_mut();
457 check(unsafe {
458 ffi::nwep_client_new(&mut c_client, &ffi_settings, &callbacks, keypair.as_ffi_mut(), cb_data_ptr)
459 })?;
460
461 unsafe { (*state_ptr).c_client = c_client; }
462
463 let ffi_url = parsed_url.to_ffi();
466 let ts = now_ns();
467 let (local_sa, local_sa_len) = socketaddr_to_sockaddr(&local_addr);
468 check(unsafe {
469 ffi::nwep_client_connect(
470 c_client,
471 &ffi_url,
472 &local_sa as *const ffi::sockaddr_storage as *const _,
473 local_sa_len,
474 ts,
475 )
476 })?;
477 let mut init_buf = vec![0u8; UDP_BUF_SIZE];
478 drain_writes(c_client, &socket, &mut init_buf, ts);
479
480 let (event_tx, event_rx) = mpsc::sync_channel::<ClientEvent>(1024);
481 let shutdown_flag = Arc::new(AtomicBool::new(false));
482
483 let (done_tx, done_rx) = mpsc::channel::<()>();
485
486 let socket_recv = socket.try_clone()
488 .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
489 let event_tx_recv = event_tx.clone();
490 let shutdown_recv = shutdown_flag.clone();
491 std::thread::spawn(move || {
492 let mut buf = vec![0u8; UDP_BUF_SIZE];
493 loop {
494 if shutdown_recv.load(Ordering::SeqCst) { break; }
495 socket_recv.set_read_timeout(Some(Duration::from_millis(50))).ok();
496 match socket_recv.recv_from(&mut buf) {
497 Ok((n, remote)) => {
498 let local = socket_recv.local_addr().unwrap();
499 let _ = event_tx_recv.send(ClientEvent::Packet {
500 data: buf[..n].to_vec(),
501 local,
502 remote,
503 });
504 }
505 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock ||
506 e.kind() == std::io::ErrorKind::TimedOut => {}
507 Err(_) => {
508 if shutdown_recv.load(Ordering::SeqCst) { break; }
509 }
510 }
511 }
512 });
513
514 let event_tx_timer = event_tx.clone();
516 let shutdown_timer = shutdown_flag.clone();
517 std::thread::spawn(move || {
518 loop {
519 std::thread::sleep(Duration::from_millis(10));
520 if shutdown_timer.load(Ordering::SeqCst) { break; }
521 let _ = event_tx_timer.send(ClientEvent::TimerExpiry);
522 }
523 });
524
525 let c_client_addr = c_client as usize;
530 let state_ptr_addr = state_ptr as usize;
531 let cb_data_ptr_addr = cb_data_ptr as usize;
532 let shutdown_loop = shutdown_flag.clone();
533 std::thread::spawn(move || {
536 let _done = done_tx;
538 let _keypair = keypair;
540
541 let c_client = c_client_addr as *mut ffi::nwep_client;
542 let state_ptr = state_ptr_addr as *mut ClientState;
543 let cb_data_ptr = cb_data_ptr_addr as *mut std::ffi::c_void;
544 let mut write_buf = vec![0u8; UDP_BUF_SIZE];
545
546 loop {
547 let event = match event_rx.recv_timeout(Duration::from_millis(100)) {
548 Ok(e) => e,
549 Err(mpsc::RecvTimeoutError::Timeout) => {
550 if shutdown_loop.load(Ordering::SeqCst) { break; }
551 continue;
552 }
553 Err(mpsc::RecvTimeoutError::Disconnected) => break,
554 };
555
556 let ts = now_ns();
557 let state = unsafe { &mut *state_ptr };
558
559 match event {
560 ClientEvent::Shutdown => break,
561
562 ClientEvent::Packet { data, local, remote } => {
563 let path = make_path(&local, &remote);
564 unsafe {
565 ffi::nwep_client_read(c_client, &path, data.as_ptr(), data.len(), ts);
566 }
567 drain_writes(c_client, &socket, &mut write_buf, ts);
568 }
569
570 ClientEvent::TimerExpiry => {
571 let expiry = unsafe { ffi::nwep_client_get_expiry(c_client) };
572 if expiry != u64::MAX && ts >= expiry {
573 unsafe { ffi::nwep_client_handle_expiry(c_client, ts); }
574 drain_writes(c_client, &socket, &mut write_buf, ts);
575 }
576 }
577
578 ClientEvent::Request { method, path, body, headers, resp_tx } => {
579 let conn = unsafe { ffi::nwep_client_get_conn(c_client) };
580 if conn.is_null() || state.disconnected {
581 let _ = resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
582 continue;
583 }
584 let meth = CString::new(method.as_str()).unwrap_or_default();
585 let pth = CString::new(path.as_str()).unwrap_or_default();
586
587 let mut request_id = [0u8; 16];
588 let mut trace_id = [0u8; 16];
589 let _ = crate::protocol::request_id_generate().map(|id| request_id = id);
590 let _ = crate::protocol::trace_id_generate().map(|id| trace_id = id);
591
592 let c_headers: Vec<ffi::nwep_header> = headers.iter().map(|h| ffi::nwep_header {
594 name: h.name.as_ptr(),
595 name_len: h.name.len(),
596 value: h.value.as_ptr(),
597 value_len: h.value.len(),
598 }).collect();
599
600 let req = ffi::nwep_request {
601 method: meth.as_ptr(),
602 method_len: method.len(),
603 path: pth.as_ptr(),
604 path_len: path.len(),
605 headers: if c_headers.is_empty() { std::ptr::null() } else { c_headers.as_ptr() },
606 header_count: c_headers.len(),
607 body: if body.is_empty() { std::ptr::null() } else { body.as_ptr() },
608 body_len: body.len(),
609 request_id,
610 trace_id,
611 };
612
613 let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
614 let rc = unsafe { ffi::nwep_stream_request(conn, &req, &mut stream) };
615 if rc != 0 {
616 let _ = resp_tx.send(Err(Error::from_code(rc)));
617 continue;
618 }
619 let stream_id = unsafe { ffi::nwep_stream_get_id(stream) };
620 unsafe { ffi::nwep_stream_end(stream); }
621 state.pending.insert(stream_id, PendingEntry {
622 resp_tx,
623 resp: None,
624 body_buf: Vec::new(),
625 });
626 drain_writes(c_client, &socket, &mut write_buf, ts);
627 }
628 }
629 }
630
631 let state = unsafe { &mut *state_ptr };
633 for (_, entry) in state.pending.drain() {
634 let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
635 }
636
637 unsafe {
638 let conn = ffi::nwep_client_get_conn(c_client);
643 if !conn.is_null() {
644 ffi::nwep_conn_close(conn, 0);
645 }
646 ffi::nwep_client_close(c_client);
647 let ts = now_ns();
648 drain_writes(c_client, &socket, &mut write_buf, ts);
649 ffi::nwep_client_free(c_client);
650 drop(Box::from_raw(state_ptr));
651 drop(Box::from_raw(cb_data_ptr as *mut CallbackData));
652 }
653 });
655
656 let request_timeout = Duration::from_millis(self.settings.timeout_ms as u64);
658 match connected_rx.recv() {
659 Ok(Ok(peer_identity)) => {
660 Ok(Client { event_tx, node_id, peer_identity, shutdown_flag, done_rx, request_timeout })
661 }
662 Ok(Err(e)) => {
663 shutdown_flag.store(true, Ordering::SeqCst);
665 let _ = event_tx.send(ClientEvent::Shutdown);
666 Err(e)
667 }
668 Err(_) => {
669 Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
671 }
672 }
673 }
674}
675
676impl Default for ClientBuilder {
677 fn default() -> Self {
678 Self::new()
679 }
680}
681
682fn now_ns() -> u64 {
683 use std::time::{SystemTime, UNIX_EPOCH};
684 SystemTime::now()
685 .duration_since(UNIX_EPOCH)
686 .unwrap_or_default()
687 .as_nanos() as u64
688}
689
690fn drain_writes(client: *mut ffi::nwep_client, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
691 loop {
692 let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
693 let n = unsafe {
694 ffi::nwep_client_write(client, &mut path, buf.as_mut_ptr(), buf.len(), ts)
695 };
696 if n <= 0 { break; }
697 let remote = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen);
698 if let Some(addr) = remote {
699 socket.send_to(&buf[..n as usize], addr).ok();
700 }
701 }
702}
703
704fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
705 let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
706 let (local_sa, local_len) = socketaddr_to_sockaddr_v6(local);
707 let (remote_sa, remote_len) = socketaddr_to_sockaddr_v6(remote);
708 path.local_addr = local_sa;
709 path.local_addrlen = local_len;
710 path.remote_addr = remote_sa;
711 path.remote_addrlen = remote_len;
712 path
713}
714
715fn socketaddr_to_sockaddr_v6(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
719 let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
720 let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
721 sin6.sin6_family = AF_INET6 as sa_family_t;
722 let (port, octets) = match addr {
723 SocketAddr::V4(v4) => {
724 let mut mapped = [0u8; 16];
725 mapped[10] = 0xff;
726 mapped[11] = 0xff;
727 mapped[12..].copy_from_slice(&v4.ip().octets());
728 (v4.port(), mapped)
729 }
730 SocketAddr::V6(v6) => (v6.port(), v6.ip().octets()),
731 };
732 sin6.sin6_port = port.to_be();
733 sin6.sin6_addr.s6_addr = octets;
734 (storage, std::mem::size_of::<sockaddr_in6>())
735}
736
737fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
739 socketaddr_to_sockaddr_v6(addr)
740}
741
742fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
743 let family = storage.ss_family as i32;
744 match family {
745 AF_INET => {
746 let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
747 let ip = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
748 let port = u16::from_be(sin.sin_port);
749 Some(SocketAddr::new(ip.into(), port))
750 }
751 AF_INET6 => {
752 let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
753 let octets = sin6.sin6_addr.s6_addr;
754 let port = u16::from_be(sin6.sin6_port);
755 let ip = std::net::Ipv6Addr::from(octets);
758 Some(SocketAddr::new(ip.into(), port))
759 }
760 _ => None,
761 }
762}