Skip to main content

nwep/
client.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2
3
4use crate::ffi;
5use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
6use crate::error::{check, Error};
7use crate::keypair::Keypair;
8use crate::types::{Header, NodeId, DEFAULT_MAX_STREAMS, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_TIMEOUT, SECONDS};
9use crate::msg::{CResponse, CNotify};
10use crate::addr::Url;
11use std::ffi::CString;
12use std::net::{UdpSocket, SocketAddr};
13use std::sync::{mpsc, Arc};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::time::Duration;
16use std::collections::HashMap;
17
18const UDP_BUF_SIZE: usize = 65536;
19
20/// `Settings` controls transport-level parameters for a NWEP client connection.
21///
22/// All fields have sensible defaults via [`Settings::default`]; only override what you need.
23pub struct Settings {
24    /// Maximum number of concurrent QUIC streams.
25    pub max_streams: u32,
26    /// Maximum allowed NWEP message body size, in bytes.
27    pub max_message_size: u32,
28    /// QUIC idle timeout in milliseconds, also used as the per-request receive timeout.
29    ///
30    /// If a response is not received within this duration, [`fetch`](Client::fetch_with_headers)
31    /// returns [`ERR_NETWORK_TIMEOUT`](crate::error::ERR_NETWORK_TIMEOUT).
32    pub timeout_ms: u32,
33    /// Optional compression algorithm name (e.g. `"zstd"`). Empty string means no compression.
34    pub compression: String,
35}
36
37impl Default for Settings {
38    fn default() -> Self {
39        Settings {
40            max_streams: DEFAULT_MAX_STREAMS,
41            max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
42            timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
43            compression: String::new(),
44        }
45    }
46}
47
48/// `Response` is the complete response received from a NWEP server.
49///
50/// `Response` is assembled from the response headers (received in `on_response`)
51/// and the body chunks (accumulated in `on_stream_data` and delivered in `on_stream_end`).
52/// Use [`is_ok`](Response::is_ok) to check whether the status indicates success.
53#[derive(Clone, Debug)]
54pub struct Response {
55    /// NWEP status string (e.g. `"ok"`, `"not-found"`, `"internal-error"`).
56    pub status: String,
57    /// Optional human-readable status detail string.
58    pub status_details: String,
59    /// Response headers.
60    pub headers: Vec<Header>,
61    /// Response body bytes.
62    pub body: Vec<u8>,
63}
64
65impl Response {
66    /// `is_ok` returns `true` if `status` indicates a successful response (e.g. `"ok"` or `"created"`).
67    pub fn is_ok(&self) -> bool {
68        crate::protocol::status_is_success(&self.status)
69    }
70
71    /// `header` looks up a response header by name, returning its value if present.
72    pub fn header(&self, name: &str) -> Option<&str> {
73        self.headers.iter().find(|h| h.name == name).map(|h| h.value.as_str())
74    }
75}
76
77impl From<CResponse> for Response {
78    fn from(r: CResponse) -> Self {
79        Response { status: r.status, status_details: r.status_details, headers: r.headers, body: r.body }
80    }
81}
82
83/// `Notification` is a server-push message delivered out-of-band on a NWEP connection.
84///
85/// Notifications are received via the callback registered with
86/// [`ClientBuilder::on_notify`] and carry an event name, a path, optional
87/// headers, and a body payload.
88#[derive(Clone, Debug)]
89pub struct Notification {
90    /// Event type string identifying the kind of notification (e.g. `"update"`, `"ping"`).
91    pub event: String,
92    /// Path associated with the notification, typically the resource that changed.
93    pub path: String,
94    /// Additional metadata sent with the notification.
95    pub headers: Vec<Header>,
96    /// Notification payload bytes.
97    pub body: Vec<u8>,
98}
99
100impl From<CNotify> for Notification {
101    fn from(n: CNotify) -> Self {
102        Notification { event: n.event, path: n.path, headers: n.headers, body: n.body }
103    }
104}
105
106enum ClientEvent {
107    Packet { data: Vec<u8>, local: SocketAddr, remote: SocketAddr },
108    TimerExpiry,
109    Request { method: String, path: String, body: Vec<u8>, headers: Vec<Header>, resp_tx: mpsc::SyncSender<Result<Response, Error>> },
110    Shutdown,
111}
112
113// Accumulates response data across the on_response / on_stream_data / on_stream_end callbacks.
114// The response is only delivered to the caller in on_stream_end, matching Go's behaviour.
115struct PendingEntry {
116    resp_tx: mpsc::SyncSender<Result<Response, Error>>,
117    // Populated by on_response; None until the response headers arrive.
118    resp: Option<Response>,
119    // Body chunks from on_stream_data, appended to resp.body in on_stream_end.
120    body_buf: Vec<u8>,
121}
122
123struct ClientState {
124    c_client: *mut ffi::nwep_client,
125    pending: HashMap<i64, PendingEntry>,
126    on_notify: Option<Box<dyn Fn(Notification) + Send>>,
127    // Set to true in on_disconnect so that subsequent fetch() calls fail immediately
128    // instead of hanging. The C library may not return null from nwep_client_get_conn
129    // after a remote-initiated close.
130    disconnected: bool,
131}
132
133unsafe impl Send for ClientState {}
134
135struct CallbackData {
136    state: *mut ClientState,
137    // Signaled once when the handshake completes (Ok(Identity)) or fails before connect (Err).
138    // Taken on first use so subsequent callbacks are no-ops on this field.
139    connected_tx: Option<mpsc::SyncSender<Result<crate::types::Identity, Error>>>,
140}
141
142unsafe extern "C" fn client_on_connect(
143    _conn: *mut ffi::nwep_conn,
144    peer: *const ffi::nwep_identity,
145    user_data: *mut std::ffi::c_void,
146) -> std::ffi::c_int {
147    let cb = &mut *(user_data as *mut CallbackData);
148    if let Some(tx) = cb.connected_tx.take() {
149        let identity = if peer.is_null() {
150            crate::types::Identity::default()
151        } else {
152            crate::types::Identity::from(*peer)
153        };
154        let _ = tx.send(Ok(identity));
155    }
156    0
157}
158
159unsafe extern "C" fn client_on_disconnect(
160    _conn: *mut ffi::nwep_conn,
161    error: std::ffi::c_int,
162    user_data: *mut std::ffi::c_void,
163) {
164    let cb = &mut *(user_data as *mut CallbackData);
165    // If the handshake hasn't completed yet, signal the error to connect().
166    if let Some(tx) = cb.connected_tx.take() {
167        let err = if error != 0 { Error::from_code(error) }
168                  else { Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED) };
169        let _ = tx.send(Err(err));
170    }
171    // Mark disconnected so subsequent requests fail immediately.
172    let state = &mut *cb.state;
173    state.disconnected = true;
174    // Fail all in-flight requests.
175    for (_, entry) in state.pending.drain() {
176        let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
177    }
178}
179
180unsafe extern "C" fn client_on_response(
181    _conn: *mut ffi::nwep_conn,
182    stream: *mut ffi::nwep_stream,
183    resp: *const ffi::nwep_response,
184    user_data: *mut std::ffi::c_void,
185) -> std::ffi::c_int {
186    if resp.is_null() || stream.is_null() { return 0; }
187    let cb = &*(user_data as *const CallbackData);
188    let state = &mut *cb.state;
189    let stream_id = ffi::nwep_stream_get_id(stream);
190    if let Some(entry) = state.pending.get_mut(&stream_id) {
191        let c_resp = CResponse::from_ffi(&*resp);
192        entry.resp = Some(Response::from(c_resp));
193    }
194    0
195}
196
197unsafe extern "C" fn client_on_notify(
198    _conn: *mut ffi::nwep_conn,
199    _stream: *mut ffi::nwep_stream,
200    notify: *const ffi::nwep_notify,
201    user_data: *mut std::ffi::c_void,
202) -> std::ffi::c_int {
203    let cb = &*(user_data as *const CallbackData);
204    let state = &mut *cb.state;
205    if let Some(cb_fn) = &state.on_notify {
206        if !notify.is_null() {
207            let n = CNotify::from_ffi(&*notify);
208            cb_fn(Notification::from(n));
209        }
210    }
211    0
212}
213
214// Accumulates streaming body chunks delivered before on_stream_end.
215unsafe extern "C" fn client_on_stream_data(
216    _conn: *mut ffi::nwep_conn,
217    stream: *mut ffi::nwep_stream,
218    data: *const u8,
219    len: usize,
220    user_data: *mut std::ffi::c_void,
221) -> std::ffi::c_int {
222    if stream.is_null() || data.is_null() { return 0; }
223    let cb = &*(user_data as *const CallbackData);
224    let state = &mut *cb.state;
225    let stream_id = ffi::nwep_stream_get_id(stream);
226    if let Some(entry) = state.pending.get_mut(&stream_id) {
227        let chunk = std::slice::from_raw_parts(data, len);
228        entry.body_buf.extend_from_slice(chunk);
229    }
230    0
231}
232
233// Delivers the complete response to the caller once the stream is fully received.
234// Any chunks accumulated via on_stream_data are appended to the response body here.
235unsafe extern "C" fn client_on_stream_end(
236    _conn: *mut ffi::nwep_conn,
237    stream: *mut ffi::nwep_stream,
238    user_data: *mut std::ffi::c_void,
239) -> std::ffi::c_int {
240    if stream.is_null() { return 0; }
241    let cb = &*(user_data as *const CallbackData);
242    let state = &mut *cb.state;
243    let stream_id = ffi::nwep_stream_get_id(stream);
244    if let Some(entry) = state.pending.remove(&stream_id) {
245        if let Some(mut resp) = entry.resp {
246            if !entry.body_buf.is_empty() {
247                resp.body.extend_from_slice(&entry.body_buf);
248            }
249            let _ = entry.resp_tx.send(Ok(resp));
250        } else {
251            let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE)));
252        }
253    }
254    0
255}
256
257unsafe extern "C" fn client_rand(
258    dest: *mut u8,
259    len: usize,
260    _user_data: *mut std::ffi::c_void,
261) -> std::ffi::c_int {
262    let slice = std::slice::from_raw_parts_mut(dest, len);
263    match crate::crypto::random_bytes(slice) {
264        Ok(()) => 0,
265        Err(_) => -1,
266    }
267}
268
269/// `Client` is a handle to an established NWEP connection.
270///
271/// `Client` is cheap to clone; all clones share the same underlying QUIC connection
272/// and event loop. Use [`fetch`](Client::fetch), [`get`](Client::get), or
273/// [`post`](Client::post) to make requests, and [`close`](Client::close) when done.
274pub struct Client {
275    event_tx: mpsc::SyncSender<ClientEvent>,
276    node_id: NodeId,
277    peer_identity: crate::types::Identity,
278    shutdown_flag: Arc<AtomicBool>,
279    // Becomes readable (returns Err(Disconnected)) when the event loop thread exits.
280    done_rx: mpsc::Receiver<()>,
281    // Maximum time to wait for a response before returning ERR_NETWORK_TIMEOUT.
282    // Prevents fetch() from blocking forever if the remote end closes without notice.
283    request_timeout: Duration,
284}
285
286impl Client {
287    /// `node_id` returns the local node's identifier derived from the connecting keypair.
288    pub fn node_id(&self) -> NodeId {
289        self.node_id
290    }
291
292    /// `peer_identity` returns the verified identity of the remote server as established
293    /// during the NWEP mutual-authentication handshake.
294    pub fn peer_identity(&self) -> crate::types::Identity {
295        self.peer_identity.clone()
296    }
297
298    /// `peer_node_id` returns the remote server's node identifier.
299    ///
300    /// This is a convenience shorthand for `client.peer_identity().node_id`.
301    pub fn peer_node_id(&self) -> NodeId {
302        self.peer_identity.node_id
303    }
304
305    /// `fetch` sends a request with the given method, path, and body, blocking until
306    /// the complete response is received or the request timeout elapses.
307    ///
308    /// # Errors
309    ///
310    /// Returns `Err` if the connection is closed, the timeout elapses, or the server
311    /// returns an error response. Use [`is_ok`](Response::is_ok) on the returned
312    /// `Response` to distinguish application-level errors from transport errors.
313    pub fn fetch(&self, method: &str, path: &str, body: &[u8]) -> Result<Response, Error> {
314        self.fetch_with_headers(method, path, body, &[])
315    }
316
317    /// `fetch_with_headers` sends a request with the given method, path, body, and
318    /// additional request headers, blocking until the complete response is received.
319    ///
320    /// # Errors
321    ///
322    /// Returns `Err` if the connection is closed, the timeout elapses, or the C layer
323    /// fails to open a stream.
324    pub fn fetch_with_headers(&self, method: &str, path: &str, body: &[u8], headers: &[Header]) -> Result<Response, Error> {
325        let (resp_tx, resp_rx) = mpsc::sync_channel(1);
326        self.event_tx.send(ClientEvent::Request {
327            method: method.to_string(),
328            path: path.to_string(),
329            body: body.to_vec(),
330            headers: headers.to_vec(),
331            resp_tx,
332        }).map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))?;
333        resp_rx.recv_timeout(self.request_timeout)
334            .map_err(|e| match e {
335                mpsc::RecvTimeoutError::Timeout => Error::from_code(crate::error::ERR_NETWORK_TIMEOUT),
336                mpsc::RecvTimeoutError::Disconnected => Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED),
337            })?
338    }
339
340    /// `get` sends a read request to `path` with no body.
341    ///
342    /// This is a convenience wrapper around [`fetch`](Client::fetch) using the
343    /// NWEP `read` method.
344    pub fn get(&self, path: &str) -> Result<Response, Error> {
345        self.fetch(crate::protocol::METHOD_READ, path, b"")
346    }
347
348    /// `post` sends a write request to `path` with the given body bytes.
349    ///
350    /// This is a convenience wrapper around [`fetch`](Client::fetch) using the
351    /// NWEP `write` method.
352    pub fn post(&self, path: &str, body: &[u8]) -> Result<Response, Error> {
353        self.fetch(crate::protocol::METHOD_WRITE, path, body)
354    }
355
356    /// Sends the shutdown signal and blocks until the event loop thread has exited
357    /// and all resources (C client, state, pending requests) have been freed.
358    pub fn close(&self) {
359        self.shutdown_flag.store(true, Ordering::SeqCst);
360        let _ = self.event_tx.send(ClientEvent::Shutdown);
361        // Block until the event loop drops done_tx on exit.
362        let _ = self.done_rx.recv();
363    }
364}
365
366/// `ClientBuilder` constructs and connects a NWEP [`Client`].
367///
368/// Use [`new`](ClientBuilder::new) to start with defaults, configure the connection
369/// with builder methods, then call [`connect`](ClientBuilder::connect) to establish
370/// the QUIC session.
371pub struct ClientBuilder {
372    settings: Settings,
373    on_notify: Option<Box<dyn Fn(Notification) + Send>>,
374}
375
376impl ClientBuilder {
377    /// `new` creates a `ClientBuilder` with default [`Settings`] and no notification handler.
378    pub fn new() -> Self {
379        ClientBuilder { settings: Settings::default(), on_notify: None }
380    }
381
382    /// `settings` overrides the transport-level settings for this connection.
383    pub fn settings(mut self, s: Settings) -> Self {
384        self.settings = s;
385        self
386    }
387
388    /// `on_notify` registers a callback to receive server-push [`Notification`] messages.
389    ///
390    /// The callback is invoked on the event loop thread; it must not block or panic.
391    pub fn on_notify<F: Fn(Notification) + Send + 'static>(mut self, f: F) -> Self {
392        self.on_notify = Some(Box::new(f));
393        self
394    }
395
396    /// `connect` initiates a connection to the given `web://` URL and blocks until the
397    /// QUIC + NWEP mutual-authentication handshake completes.
398    ///
399    /// On success returns a [`Client`] handle ready for requests. On failure the
400    /// event loop and socket threads are torn down automatically.
401    ///
402    /// # Errors
403    ///
404    /// Returns `Err` if the URL is invalid, the socket cannot be bound, the C layer
405    /// fails to initialise, or the handshake is rejected by the remote server.
406    pub fn connect(self, mut keypair: Keypair, url: &str) -> Result<Client, Error> {
407        let node_id = keypair.node_id()?;
408        let parsed_url = Url::parse(url)?;
409
410        // Bind to IPv6 dual-stack socket (matching Go's client behaviour which always uses
411        // AF_INET6 with IPv4-mapped addresses to keep address families consistent).
412        let socket = UdpSocket::bind("[::]:0")
413            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
414        let local_addr = socket.local_addr()
415            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
416
417        let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
418        let ffi_settings = ffi::nwep_settings {
419            max_streams: self.settings.max_streams,
420            max_message_size: self.settings.max_message_size,
421            timeout_ms: self.settings.timeout_ms,
422            compression: compression.as_ptr(),
423            role: std::ptr::null(),
424        };
425
426        let state = Box::new(ClientState {
427            c_client: std::ptr::null_mut(),
428            pending: HashMap::new(),
429            on_notify: self.on_notify,
430            disconnected: false,
431        });
432        let state_ptr = Box::into_raw(state);
433
434        // Channel that on_connect / on_disconnect use to signal handshake completion.
435        // Sends Ok(peer_identity) on success, Err on failure.
436        let (connected_tx, connected_rx) = mpsc::sync_channel::<Result<crate::types::Identity, Error>>(1);
437
438        let cb_data = Box::new(CallbackData {
439            state: state_ptr,
440            connected_tx: Some(connected_tx),
441        });
442        let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;
443
444        let callbacks = ffi::nwep_callbacks {
445            on_connect: Some(client_on_connect),
446            on_disconnect: Some(client_on_disconnect),
447            on_request: None,
448            on_response: Some(client_on_response),
449            on_notify: Some(client_on_notify),
450            on_stream_data: Some(client_on_stream_data),
451            on_stream_end: Some(client_on_stream_end),
452            rand: Some(client_rand),
453            log: None,
454        };
455
456        let mut c_client: *mut ffi::nwep_client = std::ptr::null_mut();
457        check(unsafe {
458            ffi::nwep_client_new(&mut c_client, &ffi_settings, &callbacks, keypair.as_ffi_mut(), cb_data_ptr)
459        })?;
460
461        unsafe { (*state_ptr).c_client = c_client; }
462
463        // nwep_client_connect requires the local socket address so the QUIC library
464        // can establish the path correctly. Without it packets are routed incorrectly.
465        let ffi_url = parsed_url.to_ffi();
466        let ts = now_ns();
467        let (local_sa, local_sa_len) = socketaddr_to_sockaddr(&local_addr);
468        check(unsafe {
469            ffi::nwep_client_connect(
470                c_client,
471                &ffi_url,
472                &local_sa as *const ffi::sockaddr_storage as *const _,
473                local_sa_len,
474                ts,
475            )
476        })?;
477        let mut init_buf = vec![0u8; UDP_BUF_SIZE];
478        drain_writes(c_client, &socket, &mut init_buf, ts);
479
480        let (event_tx, event_rx) = mpsc::sync_channel::<ClientEvent>(1024);
481        let shutdown_flag = Arc::new(AtomicBool::new(false));
482
483        // Dropped by the event loop thread on exit; done_rx.recv() unblocks when this happens.
484        let (done_tx, done_rx) = mpsc::channel::<()>();
485
486        // UDP reader thread
487        let socket_recv = socket.try_clone()
488            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
489        let event_tx_recv = event_tx.clone();
490        let shutdown_recv = shutdown_flag.clone();
491        std::thread::spawn(move || {
492            let mut buf = vec![0u8; UDP_BUF_SIZE];
493            loop {
494                if shutdown_recv.load(Ordering::SeqCst) { break; }
495                socket_recv.set_read_timeout(Some(Duration::from_millis(50))).ok();
496                match socket_recv.recv_from(&mut buf) {
497                    Ok((n, remote)) => {
498                        let local = socket_recv.local_addr().unwrap();
499                        let _ = event_tx_recv.send(ClientEvent::Packet {
500                            data: buf[..n].to_vec(),
501                            local,
502                            remote,
503                        });
504                    }
505                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock ||
506                              e.kind() == std::io::ErrorKind::TimedOut => {}
507                    Err(_) => {
508                        if shutdown_recv.load(Ordering::SeqCst) { break; }
509                    }
510                }
511            }
512        });
513
514        // Timer thread
515        let event_tx_timer = event_tx.clone();
516        let shutdown_timer = shutdown_flag.clone();
517        std::thread::spawn(move || {
518            loop {
519                std::thread::sleep(Duration::from_millis(10));
520                if shutdown_timer.load(Ordering::SeqCst) { break; }
521                let _ = event_tx_timer.send(ClientEvent::TimerExpiry);
522            }
523        });
524
525        // Event loop thread
526        // Convert raw pointers to usize so they can cross thread boundaries.
527        // SAFETY: These pointers are exclusively owned by the event loop thread
528        // after this point and are freed before the thread exits.
529        let c_client_addr = c_client as usize;
530        let state_ptr_addr = state_ptr as usize;
531        let cb_data_ptr_addr = cb_data_ptr as usize;
532        let shutdown_loop = shutdown_flag.clone();
533        // Move the keypair into the thread so it stays alive for the full
534        // lifetime of the C client (nwep_client_new stores a pointer to it).
535        std::thread::spawn(move || {
536            // Held alive until thread exits; dropping it signals done_rx.
537            let _done = done_tx;
538            // Keep keypair alive until after nwep_client_free.
539            let _keypair = keypair;
540
541            let c_client = c_client_addr as *mut ffi::nwep_client;
542            let state_ptr = state_ptr_addr as *mut ClientState;
543            let cb_data_ptr = cb_data_ptr_addr as *mut std::ffi::c_void;
544            let mut write_buf = vec![0u8; UDP_BUF_SIZE];
545
546            loop {
547                let event = match event_rx.recv_timeout(Duration::from_millis(100)) {
548                    Ok(e) => e,
549                    Err(mpsc::RecvTimeoutError::Timeout) => {
550                        if shutdown_loop.load(Ordering::SeqCst) { break; }
551                        continue;
552                    }
553                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
554                };
555
556                let ts = now_ns();
557                let state = unsafe { &mut *state_ptr };
558
559                match event {
560                    ClientEvent::Shutdown => break,
561
562                    ClientEvent::Packet { data, local, remote } => {
563                        let path = make_path(&local, &remote);
564                        unsafe {
565                            ffi::nwep_client_read(c_client, &path, data.as_ptr(), data.len(), ts);
566                        }
567                        drain_writes(c_client, &socket, &mut write_buf, ts);
568                    }
569
570                    ClientEvent::TimerExpiry => {
571                        let expiry = unsafe { ffi::nwep_client_get_expiry(c_client) };
572                        if expiry != u64::MAX && ts >= expiry {
573                            unsafe { ffi::nwep_client_handle_expiry(c_client, ts); }
574                            drain_writes(c_client, &socket, &mut write_buf, ts);
575                        }
576                    }
577
578                    ClientEvent::Request { method, path, body, headers, resp_tx } => {
579                        let conn = unsafe { ffi::nwep_client_get_conn(c_client) };
580                        if conn.is_null() || state.disconnected {
581                            let _ = resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
582                            continue;
583                        }
584                        let meth = CString::new(method.as_str()).unwrap_or_default();
585                        let pth = CString::new(path.as_str()).unwrap_or_default();
586
587                        let mut request_id = [0u8; 16];
588                        let mut trace_id = [0u8; 16];
589                        let _ = crate::protocol::request_id_generate().map(|id| request_id = id);
590                        let _ = crate::protocol::trace_id_generate().map(|id| trace_id = id);
591
592                        // Build C header array; strings are valid for the lifetime of `headers`.
593                        let c_headers: Vec<ffi::nwep_header> = headers.iter().map(|h| ffi::nwep_header {
594                            name: h.name.as_ptr(),
595                            name_len: h.name.len(),
596                            value: h.value.as_ptr(),
597                            value_len: h.value.len(),
598                        }).collect();
599
600                        let req = ffi::nwep_request {
601                            method: meth.as_ptr(),
602                            method_len: method.len(),
603                            path: pth.as_ptr(),
604                            path_len: path.len(),
605                            headers: if c_headers.is_empty() { std::ptr::null() } else { c_headers.as_ptr() },
606                            header_count: c_headers.len(),
607                            body: if body.is_empty() { std::ptr::null() } else { body.as_ptr() },
608                            body_len: body.len(),
609                            request_id,
610                            trace_id,
611                        };
612
613                        let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
614                        let rc = unsafe { ffi::nwep_stream_request(conn, &req, &mut stream) };
615                        if rc != 0 {
616                            let _ = resp_tx.send(Err(Error::from_code(rc)));
617                            continue;
618                        }
619                        let stream_id = unsafe { ffi::nwep_stream_get_id(stream) };
620                        unsafe { ffi::nwep_stream_end(stream); }
621                        state.pending.insert(stream_id, PendingEntry {
622                            resp_tx,
623                            resp: None,
624                            body_buf: Vec::new(),
625                        });
626                        drain_writes(c_client, &socket, &mut write_buf, ts);
627                    }
628                }
629            }
630
631            // Fail any requests that were still in-flight when we shut down.
632            let state = unsafe { &mut *state_ptr };
633            for (_, entry) in state.pending.drain() {
634                let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
635            }
636
637            unsafe {
638                // nwep_conn_close notifies the QUIC layer to send CONNECTION_CLOSE.
639                // On the client side this currently does not generate an immediate packet
640                // (the C library schedules it with expiry = u64::MAX), but we call it
641                // anyway so future library versions work correctly.
642                let conn = ffi::nwep_client_get_conn(c_client);
643                if !conn.is_null() {
644                    ffi::nwep_conn_close(conn, 0);
645                }
646                ffi::nwep_client_close(c_client);
647                let ts = now_ns();
648                drain_writes(c_client, &socket, &mut write_buf, ts);
649                ffi::nwep_client_free(c_client);
650                drop(Box::from_raw(state_ptr));
651                drop(Box::from_raw(cb_data_ptr as *mut CallbackData));
652            }
653            // _done is dropped here, unblocking any caller waiting on done_rx.
654        });
655
656        // Block until the handshake completes (on_connect) or fails (on_disconnect).
657        let request_timeout = Duration::from_millis(self.settings.timeout_ms as u64);
658        match connected_rx.recv() {
659            Ok(Ok(peer_identity)) => {
660                Ok(Client { event_tx, node_id, peer_identity, shutdown_flag, done_rx, request_timeout })
661            }
662            Ok(Err(e)) => {
663                // Handshake failed — signal the event loop to exit cleanly.
664                shutdown_flag.store(true, Ordering::SeqCst);
665                let _ = event_tx.send(ClientEvent::Shutdown);
666                Err(e)
667            }
668            Err(_) => {
669                // Channel disconnected before we got a signal — shouldn't happen.
670                Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
671            }
672        }
673    }
674}
675
676impl Default for ClientBuilder {
677    fn default() -> Self {
678        Self::new()
679    }
680}
681
682fn now_ns() -> u64 {
683    use std::time::{SystemTime, UNIX_EPOCH};
684    SystemTime::now()
685        .duration_since(UNIX_EPOCH)
686        .unwrap_or_default()
687        .as_nanos() as u64
688}
689
690fn drain_writes(client: *mut ffi::nwep_client, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
691    loop {
692        let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
693        let n = unsafe {
694            ffi::nwep_client_write(client, &mut path, buf.as_mut_ptr(), buf.len(), ts)
695        };
696        if n <= 0 { break; }
697        let remote = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen);
698        if let Some(addr) = remote {
699            socket.send_to(&buf[..n as usize], addr).ok();
700        }
701    }
702}
703
704fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
705    let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
706    let (local_sa, local_len) = socketaddr_to_sockaddr_v6(local);
707    let (remote_sa, remote_len) = socketaddr_to_sockaddr_v6(remote);
708    path.local_addr = local_sa;
709    path.local_addrlen = local_len;
710    path.remote_addr = remote_sa;
711    path.remote_addrlen = remote_len;
712    path
713}
714
715// Converts any SocketAddr to AF_INET6 using IPv4-mapped addresses for IPv4,
716// matching the Go client's fillClientSockaddr behaviour. The nwep C library
717// requires consistent AF_INET6 paths on the client side.
718fn socketaddr_to_sockaddr_v6(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
719    let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
720    let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
721    sin6.sin6_family = AF_INET6 as sa_family_t;
722    let (port, octets) = match addr {
723        SocketAddr::V4(v4) => {
724            let mut mapped = [0u8; 16];
725            mapped[10] = 0xff;
726            mapped[11] = 0xff;
727            mapped[12..].copy_from_slice(&v4.ip().octets());
728            (v4.port(), mapped)
729        }
730        SocketAddr::V6(v6) => (v6.port(), v6.ip().octets()),
731    };
732    sin6.sin6_port = port.to_be();
733    sin6.sin6_addr.s6_addr = octets;
734    (storage, std::mem::size_of::<sockaddr_in6>())
735}
736
737// Used for the local address in nwep_client_connect (always AF_INET6).
738fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
739    socketaddr_to_sockaddr_v6(addr)
740}
741
742fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
743    let family = storage.ss_family as i32;
744    match family {
745        AF_INET => {
746            let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
747            let ip = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
748            let port = u16::from_be(sin.sin_port);
749            Some(SocketAddr::new(ip.into(), port))
750        }
751        AF_INET6 => {
752            let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
753            let octets = sin6.sin6_addr.s6_addr;
754            let port = u16::from_be(sin6.sin6_port);
755            // Keep as SocketAddr::V6 (including IPv4-mapped ::ffff:x.x.x.x) so
756            // send_to works correctly on the IPv6 dual-stack socket.
757            let ip = std::net::Ipv6Addr::from(octets);
758            Some(SocketAddr::new(ip.into(), port))
759        }
760        _ => None,
761    }
762}