nwep-rs 0.1.8

Rust bindings for the NWEP (WEB/1) protocol library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
#![allow(unsafe_op_in_unsafe_fn)]


use crate::ffi;
use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
use crate::error::{check, Error};
use crate::keypair::Keypair;
use crate::types::{Header, NodeId, DEFAULT_MAX_STREAMS, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_TIMEOUT, SECONDS};
use crate::msg::{CResponse, CNotify};
use crate::addr::Url;
use std::ffi::CString;
use std::net::{UdpSocket, SocketAddr};
use std::sync::{mpsc, Arc};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::collections::HashMap;

const UDP_BUF_SIZE: usize = 65536;

/// `Settings` controls transport-level parameters for a NWEP client connection.
///
/// All fields have sensible defaults via [`Settings::default`]; only override what you need.
pub struct Settings {
    /// Maximum number of concurrent QUIC streams.
    pub max_streams: u32,
    /// Maximum allowed NWEP message body size, in bytes.
    pub max_message_size: u32,
    /// QUIC idle timeout in milliseconds, also used as the per-request receive timeout.
    ///
    /// If a response is not received within this duration, [`fetch`](Client::fetch_with_headers)
    /// returns [`ERR_NETWORK_TIMEOUT`](crate::error::ERR_NETWORK_TIMEOUT).
    pub timeout_ms: u32,
    /// Optional compression algorithm name (e.g. `"zstd"`). Empty string means no compression.
    pub compression: String,
}

impl Default for Settings {
    fn default() -> Self {
        Settings {
            max_streams: DEFAULT_MAX_STREAMS,
            max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
            timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
            compression: String::new(),
        }
    }
}

/// `Response` is the complete response received from a NWEP server.
///
/// `Response` is assembled from the response headers (received in `on_response`)
/// and the body chunks (accumulated in `on_stream_data` and delivered in `on_stream_end`).
/// Use [`is_ok`](Response::is_ok) to check whether the status indicates success.
#[derive(Clone, Debug)]
pub struct Response {
    /// NWEP status string (e.g. `"ok"`, `"not-found"`, `"internal-error"`).
    pub status: String,
    /// Optional human-readable status detail string.
    pub status_details: String,
    /// Response headers.
    pub headers: Vec<Header>,
    /// Response body bytes.
    pub body: Vec<u8>,
}

impl Response {
    /// `is_ok` returns `true` if `status` indicates a successful response (e.g. `"ok"` or `"created"`).
    pub fn is_ok(&self) -> bool {
        crate::protocol::status_is_success(&self.status)
    }

    /// `header` looks up a response header by name, returning its value if present.
    pub fn header(&self, name: &str) -> Option<&str> {
        self.headers.iter().find(|h| h.name == name).map(|h| h.value.as_str())
    }
}

impl From<CResponse> for Response {
    fn from(r: CResponse) -> Self {
        Response { status: r.status, status_details: r.status_details, headers: r.headers, body: r.body }
    }
}

/// `Notification` is a server-push message delivered out-of-band on a NWEP connection.
///
/// Notifications are received via the callback registered with
/// [`ClientBuilder::on_notify`] and carry an event name, a path, optional
/// headers, and a body payload.
#[derive(Clone, Debug)]
pub struct Notification {
    /// Event type string identifying the kind of notification (e.g. `"update"`, `"ping"`).
    pub event: String,
    /// Path associated with the notification, typically the resource that changed.
    pub path: String,
    /// Additional metadata sent with the notification.
    pub headers: Vec<Header>,
    /// Notification payload bytes.
    pub body: Vec<u8>,
}

impl From<CNotify> for Notification {
    fn from(n: CNotify) -> Self {
        Notification { event: n.event, path: n.path, headers: n.headers, body: n.body }
    }
}

enum ClientEvent {
    Packet { data: Vec<u8>, local: SocketAddr, remote: SocketAddr },
    TimerExpiry,
    Request { method: String, path: String, body: Vec<u8>, headers: Vec<Header>, resp_tx: mpsc::SyncSender<Result<Response, Error>> },
    Shutdown,
}

// Accumulates response data across the on_response / on_stream_data / on_stream_end callbacks.
// The response is only delivered to the caller in on_stream_end, matching Go's behaviour.
struct PendingEntry {
    resp_tx: mpsc::SyncSender<Result<Response, Error>>,
    // Populated by on_response; None until the response headers arrive.
    resp: Option<Response>,
    // Body chunks from on_stream_data, appended to resp.body in on_stream_end.
    body_buf: Vec<u8>,
}

struct ClientState {
    c_client: *mut ffi::nwep_client,
    pending: HashMap<i64, PendingEntry>,
    on_notify: Option<Box<dyn Fn(Notification) + Send>>,
    // Set to true in on_disconnect so that subsequent fetch() calls fail immediately
    // instead of hanging. The C library may not return null from nwep_client_get_conn
    // after a remote-initiated close.
    disconnected: bool,
}

unsafe impl Send for ClientState {}

struct CallbackData {
    state: *mut ClientState,
    // Signaled once when the handshake completes (Ok(Identity)) or fails before connect (Err).
    // Taken on first use so subsequent callbacks are no-ops on this field.
    connected_tx: Option<mpsc::SyncSender<Result<crate::types::Identity, Error>>>,
}

unsafe extern "C" fn client_on_connect(
    _conn: *mut ffi::nwep_conn,
    peer: *const ffi::nwep_identity,
    user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
    let cb = &mut *(user_data as *mut CallbackData);
    if let Some(tx) = cb.connected_tx.take() {
        let identity = if peer.is_null() {
            crate::types::Identity::default()
        } else {
            crate::types::Identity::from(*peer)
        };
        let _ = tx.send(Ok(identity));
    }
    0
}

unsafe extern "C" fn client_on_disconnect(
    _conn: *mut ffi::nwep_conn,
    error: std::ffi::c_int,
    user_data: *mut std::ffi::c_void,
) {
    let cb = &mut *(user_data as *mut CallbackData);
    // If the handshake hasn't completed yet, signal the error to connect().
    if let Some(tx) = cb.connected_tx.take() {
        let err = if error != 0 { Error::from_code(error) }
                  else { Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED) };
        let _ = tx.send(Err(err));
    }
    // Mark disconnected so subsequent requests fail immediately.
    let state = &mut *cb.state;
    state.disconnected = true;
    // Fail all in-flight requests.
    for (_, entry) in state.pending.drain() {
        let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
    }
}

unsafe extern "C" fn client_on_response(
    _conn: *mut ffi::nwep_conn,
    stream: *mut ffi::nwep_stream,
    resp: *const ffi::nwep_response,
    user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
    if resp.is_null() || stream.is_null() { return 0; }
    let cb = &*(user_data as *const CallbackData);
    let state = &mut *cb.state;
    let stream_id = ffi::nwep_stream_get_id(stream);
    if let Some(entry) = state.pending.get_mut(&stream_id) {
        let c_resp = CResponse::from_ffi(&*resp);
        entry.resp = Some(Response::from(c_resp));
    }
    0
}

unsafe extern "C" fn client_on_notify(
    _conn: *mut ffi::nwep_conn,
    _stream: *mut ffi::nwep_stream,
    notify: *const ffi::nwep_notify,
    user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
    let cb = &*(user_data as *const CallbackData);
    let state = &mut *cb.state;
    if let Some(cb_fn) = &state.on_notify {
        if !notify.is_null() {
            let n = CNotify::from_ffi(&*notify);
            cb_fn(Notification::from(n));
        }
    }
    0
}

// Accumulates streaming body chunks delivered before on_stream_end.
unsafe extern "C" fn client_on_stream_data(
    _conn: *mut ffi::nwep_conn,
    stream: *mut ffi::nwep_stream,
    data: *const u8,
    len: usize,
    user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
    if stream.is_null() || data.is_null() { return 0; }
    let cb = &*(user_data as *const CallbackData);
    let state = &mut *cb.state;
    let stream_id = ffi::nwep_stream_get_id(stream);
    if let Some(entry) = state.pending.get_mut(&stream_id) {
        let chunk = std::slice::from_raw_parts(data, len);
        entry.body_buf.extend_from_slice(chunk);
    }
    0
}

// Delivers the complete response to the caller once the stream is fully received.
// Any chunks accumulated via on_stream_data are appended to the response body here.
unsafe extern "C" fn client_on_stream_end(
    _conn: *mut ffi::nwep_conn,
    stream: *mut ffi::nwep_stream,
    user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
    if stream.is_null() { return 0; }
    let cb = &*(user_data as *const CallbackData);
    let state = &mut *cb.state;
    let stream_id = ffi::nwep_stream_get_id(stream);
    if let Some(entry) = state.pending.remove(&stream_id) {
        if let Some(mut resp) = entry.resp {
            if !entry.body_buf.is_empty() {
                resp.body.extend_from_slice(&entry.body_buf);
            }
            let _ = entry.resp_tx.send(Ok(resp));
        } else {
            let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE)));
        }
    }
    0
}

unsafe extern "C" fn client_rand(
    dest: *mut u8,
    len: usize,
    _user_data: *mut std::ffi::c_void,
) -> std::ffi::c_int {
    let slice = std::slice::from_raw_parts_mut(dest, len);
    match crate::crypto::random_bytes(slice) {
        Ok(()) => 0,
        Err(_) => -1,
    }
}

/// `Client` is a handle to an established NWEP connection.
///
/// `Client` is cheap to clone; all clones share the same underlying QUIC connection
/// and event loop. Use [`fetch`](Client::fetch), [`get`](Client::get), or
/// [`post`](Client::post) to make requests, and [`close`](Client::close) when done.
pub struct Client {
    event_tx: mpsc::SyncSender<ClientEvent>,
    node_id: NodeId,
    peer_identity: crate::types::Identity,
    shutdown_flag: Arc<AtomicBool>,
    // Becomes readable (returns Err(Disconnected)) when the event loop thread exits.
    done_rx: mpsc::Receiver<()>,
    // Maximum time to wait for a response before returning ERR_NETWORK_TIMEOUT.
    // Prevents fetch() from blocking forever if the remote end closes without notice.
    request_timeout: Duration,
}

impl Client {
    /// `node_id` returns the local node's identifier derived from the connecting keypair.
    pub fn node_id(&self) -> NodeId {
        self.node_id
    }

    /// `peer_identity` returns the verified identity of the remote server as established
    /// during the NWEP mutual-authentication handshake.
    pub fn peer_identity(&self) -> crate::types::Identity {
        self.peer_identity.clone()
    }

    /// `peer_node_id` returns the remote server's node identifier.
    ///
    /// This is a convenience shorthand for `client.peer_identity().node_id`.
    pub fn peer_node_id(&self) -> NodeId {
        self.peer_identity.node_id
    }

    /// `fetch` sends a request with the given method, path, and body, blocking until
    /// the complete response is received or the request timeout elapses.
    ///
    /// # Errors
    ///
    /// Returns `Err` if the connection is closed, the timeout elapses, or the server
    /// returns an error response. Use [`is_ok`](Response::is_ok) on the returned
    /// `Response` to distinguish application-level errors from transport errors.
    pub fn fetch(&self, method: &str, path: &str, body: &[u8]) -> Result<Response, Error> {
        self.fetch_with_headers(method, path, body, &[])
    }

    /// `fetch_with_headers` sends a request with the given method, path, body, and
    /// additional request headers, blocking until the complete response is received.
    ///
    /// # Errors
    ///
    /// Returns `Err` if the connection is closed, the timeout elapses, or the C layer
    /// fails to open a stream.
    pub fn fetch_with_headers(&self, method: &str, path: &str, body: &[u8], headers: &[Header]) -> Result<Response, Error> {
        let (resp_tx, resp_rx) = mpsc::sync_channel(1);
        self.event_tx.send(ClientEvent::Request {
            method: method.to_string(),
            path: path.to_string(),
            body: body.to_vec(),
            headers: headers.to_vec(),
            resp_tx,
        }).map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))?;
        resp_rx.recv_timeout(self.request_timeout)
            .map_err(|e| match e {
                mpsc::RecvTimeoutError::Timeout => Error::from_code(crate::error::ERR_NETWORK_TIMEOUT),
                mpsc::RecvTimeoutError::Disconnected => Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED),
            })?
    }

    /// `get` sends a read request to `path` with no body.
    ///
    /// This is a convenience wrapper around [`fetch`](Client::fetch) using the
    /// NWEP `read` method.
    pub fn get(&self, path: &str) -> Result<Response, Error> {
        self.fetch(crate::protocol::METHOD_READ, path, b"")
    }

    /// `post` sends a write request to `path` with the given body bytes.
    ///
    /// This is a convenience wrapper around [`fetch`](Client::fetch) using the
    /// NWEP `write` method.
    pub fn post(&self, path: &str, body: &[u8]) -> Result<Response, Error> {
        self.fetch(crate::protocol::METHOD_WRITE, path, body)
    }

    /// Sends the shutdown signal and blocks until the event loop thread has exited
    /// and all resources (C client, state, pending requests) have been freed.
    pub fn close(&self) {
        self.shutdown_flag.store(true, Ordering::SeqCst);
        let _ = self.event_tx.send(ClientEvent::Shutdown);
        // Block until the event loop drops done_tx on exit.
        let _ = self.done_rx.recv();
    }
}

/// `ClientBuilder` constructs and connects a NWEP [`Client`].
///
/// Use [`new`](ClientBuilder::new) to start with defaults, configure the connection
/// with builder methods, then call [`connect`](ClientBuilder::connect) to establish
/// the QUIC session.
pub struct ClientBuilder {
    settings: Settings,
    on_notify: Option<Box<dyn Fn(Notification) + Send>>,
}

impl ClientBuilder {
    /// `new` creates a `ClientBuilder` with default [`Settings`] and no notification handler.
    pub fn new() -> Self {
        ClientBuilder { settings: Settings::default(), on_notify: None }
    }

    /// `settings` overrides the transport-level settings for this connection.
    pub fn settings(mut self, s: Settings) -> Self {
        self.settings = s;
        self
    }

    /// `on_notify` registers a callback to receive server-push [`Notification`] messages.
    ///
    /// The callback is invoked on the event loop thread; it must not block or panic.
    pub fn on_notify<F: Fn(Notification) + Send + 'static>(mut self, f: F) -> Self {
        self.on_notify = Some(Box::new(f));
        self
    }

    /// `connect` initiates a connection to the given `web://` URL and blocks until the
    /// QUIC + NWEP mutual-authentication handshake completes.
    ///
    /// On success returns a [`Client`] handle ready for requests. On failure the
    /// event loop and socket threads are torn down automatically.
    ///
    /// # Errors
    ///
    /// Returns `Err` if the URL is invalid, the socket cannot be bound, the C layer
    /// fails to initialise, or the handshake is rejected by the remote server.
    pub fn connect(self, mut keypair: Keypair, url: &str) -> Result<Client, Error> {
        let node_id = keypair.node_id()?;
        let parsed_url = Url::parse(url)?;

        // Bind to IPv6 dual-stack socket (matching Go's client behaviour which always uses
        // AF_INET6 with IPv4-mapped addresses to keep address families consistent).
        let socket = UdpSocket::bind("[::]:0")
            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
        let local_addr = socket.local_addr()
            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;

        let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
        let ffi_settings = ffi::nwep_settings {
            max_streams: self.settings.max_streams,
            max_message_size: self.settings.max_message_size,
            timeout_ms: self.settings.timeout_ms,
            compression: compression.as_ptr(),
            role: std::ptr::null(),
        };

        let state = Box::new(ClientState {
            c_client: std::ptr::null_mut(),
            pending: HashMap::new(),
            on_notify: self.on_notify,
            disconnected: false,
        });
        let state_ptr = Box::into_raw(state);

        // Channel that on_connect / on_disconnect use to signal handshake completion.
        // Sends Ok(peer_identity) on success, Err on failure.
        let (connected_tx, connected_rx) = mpsc::sync_channel::<Result<crate::types::Identity, Error>>(1);

        let cb_data = Box::new(CallbackData {
            state: state_ptr,
            connected_tx: Some(connected_tx),
        });
        let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;

        let callbacks = ffi::nwep_callbacks {
            on_connect: Some(client_on_connect),
            on_disconnect: Some(client_on_disconnect),
            on_request: None,
            on_response: Some(client_on_response),
            on_notify: Some(client_on_notify),
            on_stream_data: Some(client_on_stream_data),
            on_stream_end: Some(client_on_stream_end),
            rand: Some(client_rand),
            log: None,
        };

        let mut c_client: *mut ffi::nwep_client = std::ptr::null_mut();
        check(unsafe {
            ffi::nwep_client_new(&mut c_client, &ffi_settings, &callbacks, keypair.as_ffi_mut(), cb_data_ptr)
        })?;

        unsafe { (*state_ptr).c_client = c_client; }

        // nwep_client_connect requires the local socket address so the QUIC library
        // can establish the path correctly. Without it packets are routed incorrectly.
        let ffi_url = parsed_url.to_ffi();
        let ts = now_ns();
        let (local_sa, local_sa_len) = socketaddr_to_sockaddr(&local_addr);
        check(unsafe {
            ffi::nwep_client_connect(
                c_client,
                &ffi_url,
                &local_sa as *const ffi::sockaddr_storage as *const _,
                local_sa_len,
                ts,
            )
        })?;
        let mut init_buf = vec![0u8; UDP_BUF_SIZE];
        drain_writes(c_client, &socket, &mut init_buf, ts);

        let (event_tx, event_rx) = mpsc::sync_channel::<ClientEvent>(1024);
        let shutdown_flag = Arc::new(AtomicBool::new(false));

        // Dropped by the event loop thread on exit; done_rx.recv() unblocks when this happens.
        let (done_tx, done_rx) = mpsc::channel::<()>();

        // UDP reader thread
        let socket_recv = socket.try_clone()
            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
        let event_tx_recv = event_tx.clone();
        let shutdown_recv = shutdown_flag.clone();
        std::thread::spawn(move || {
            let mut buf = vec![0u8; UDP_BUF_SIZE];
            loop {
                if shutdown_recv.load(Ordering::SeqCst) { break; }
                socket_recv.set_read_timeout(Some(Duration::from_millis(50))).ok();
                match socket_recv.recv_from(&mut buf) {
                    Ok((n, remote)) => {
                        let local = socket_recv.local_addr().unwrap();
                        let _ = event_tx_recv.send(ClientEvent::Packet {
                            data: buf[..n].to_vec(),
                            local,
                            remote,
                        });
                    }
                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock ||
                              e.kind() == std::io::ErrorKind::TimedOut => {}
                    Err(_) => {
                        if shutdown_recv.load(Ordering::SeqCst) { break; }
                    }
                }
            }
        });

        // Timer thread
        let event_tx_timer = event_tx.clone();
        let shutdown_timer = shutdown_flag.clone();
        std::thread::spawn(move || {
            loop {
                std::thread::sleep(Duration::from_millis(10));
                if shutdown_timer.load(Ordering::SeqCst) { break; }
                let _ = event_tx_timer.send(ClientEvent::TimerExpiry);
            }
        });

        // Event loop thread
        // Convert raw pointers to usize so they can cross thread boundaries.
        // SAFETY: These pointers are exclusively owned by the event loop thread
        // after this point and are freed before the thread exits.
        let c_client_addr = c_client as usize;
        let state_ptr_addr = state_ptr as usize;
        let cb_data_ptr_addr = cb_data_ptr as usize;
        let shutdown_loop = shutdown_flag.clone();
        // Move the keypair into the thread so it stays alive for the full
        // lifetime of the C client (nwep_client_new stores a pointer to it).
        std::thread::spawn(move || {
            // Held alive until thread exits; dropping it signals done_rx.
            let _done = done_tx;
            // Keep keypair alive until after nwep_client_free.
            let _keypair = keypair;

            let c_client = c_client_addr as *mut ffi::nwep_client;
            let state_ptr = state_ptr_addr as *mut ClientState;
            let cb_data_ptr = cb_data_ptr_addr as *mut std::ffi::c_void;
            let mut write_buf = vec![0u8; UDP_BUF_SIZE];

            loop {
                let event = match event_rx.recv_timeout(Duration::from_millis(100)) {
                    Ok(e) => e,
                    Err(mpsc::RecvTimeoutError::Timeout) => {
                        if shutdown_loop.load(Ordering::SeqCst) { break; }
                        continue;
                    }
                    Err(mpsc::RecvTimeoutError::Disconnected) => break,
                };

                let ts = now_ns();
                let state = unsafe { &mut *state_ptr };

                match event {
                    ClientEvent::Shutdown => break,

                    ClientEvent::Packet { data, local, remote } => {
                        let path = make_path(&local, &remote);
                        unsafe {
                            ffi::nwep_client_read(c_client, &path, data.as_ptr(), data.len(), ts);
                        }
                        drain_writes(c_client, &socket, &mut write_buf, ts);
                    }

                    ClientEvent::TimerExpiry => {
                        let expiry = unsafe { ffi::nwep_client_get_expiry(c_client) };
                        if expiry != u64::MAX && ts >= expiry {
                            unsafe { ffi::nwep_client_handle_expiry(c_client, ts); }
                            drain_writes(c_client, &socket, &mut write_buf, ts);
                        }
                    }

                    ClientEvent::Request { method, path, body, headers, resp_tx } => {
                        let conn = unsafe { ffi::nwep_client_get_conn(c_client) };
                        if conn.is_null() || state.disconnected {
                            let _ = resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
                            continue;
                        }
                        let meth = CString::new(method.as_str()).unwrap_or_default();
                        let pth = CString::new(path.as_str()).unwrap_or_default();

                        let mut request_id = [0u8; 16];
                        let mut trace_id = [0u8; 16];
                        let _ = crate::protocol::request_id_generate().map(|id| request_id = id);
                        let _ = crate::protocol::trace_id_generate().map(|id| trace_id = id);

                        // Build C header array; strings are valid for the lifetime of `headers`.
                        let c_headers: Vec<ffi::nwep_header> = headers.iter().map(|h| ffi::nwep_header {
                            name: h.name.as_ptr(),
                            name_len: h.name.len(),
                            value: h.value.as_ptr(),
                            value_len: h.value.len(),
                        }).collect();

                        let req = ffi::nwep_request {
                            method: meth.as_ptr(),
                            method_len: method.len(),
                            path: pth.as_ptr(),
                            path_len: path.len(),
                            headers: if c_headers.is_empty() { std::ptr::null() } else { c_headers.as_ptr() },
                            header_count: c_headers.len(),
                            body: if body.is_empty() { std::ptr::null() } else { body.as_ptr() },
                            body_len: body.len(),
                            request_id,
                            trace_id,
                        };

                        let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
                        let rc = unsafe { ffi::nwep_stream_request(conn, &req, &mut stream) };
                        if rc != 0 {
                            let _ = resp_tx.send(Err(Error::from_code(rc)));
                            continue;
                        }
                        let stream_id = unsafe { ffi::nwep_stream_get_id(stream) };
                        unsafe { ffi::nwep_stream_end(stream); }
                        state.pending.insert(stream_id, PendingEntry {
                            resp_tx,
                            resp: None,
                            body_buf: Vec::new(),
                        });
                        drain_writes(c_client, &socket, &mut write_buf, ts);
                    }
                }
            }

            // Fail any requests that were still in-flight when we shut down.
            let state = unsafe { &mut *state_ptr };
            for (_, entry) in state.pending.drain() {
                let _ = entry.resp_tx.send(Err(Error::from_code(crate::error::ERR_NETWORK_CONN_CLOSED)));
            }

            unsafe {
                // nwep_conn_close notifies the QUIC layer to send CONNECTION_CLOSE.
                // On the client side this currently does not generate an immediate packet
                // (the C library schedules it with expiry = u64::MAX), but we call it
                // anyway so future library versions work correctly.
                let conn = ffi::nwep_client_get_conn(c_client);
                if !conn.is_null() {
                    ffi::nwep_conn_close(conn, 0);
                }
                ffi::nwep_client_close(c_client);
                let ts = now_ns();
                drain_writes(c_client, &socket, &mut write_buf, ts);
                ffi::nwep_client_free(c_client);
                drop(Box::from_raw(state_ptr));
                drop(Box::from_raw(cb_data_ptr as *mut CallbackData));
            }
            // _done is dropped here, unblocking any caller waiting on done_rx.
        });

        // Block until the handshake completes (on_connect) or fails (on_disconnect).
        let request_timeout = Duration::from_millis(self.settings.timeout_ms as u64);
        match connected_rx.recv() {
            Ok(Ok(peer_identity)) => {
                Ok(Client { event_tx, node_id, peer_identity, shutdown_flag, done_rx, request_timeout })
            }
            Ok(Err(e)) => {
                // Handshake failed — signal the event loop to exit cleanly.
                shutdown_flag.store(true, Ordering::SeqCst);
                let _ = event_tx.send(ClientEvent::Shutdown);
                Err(e)
            }
            Err(_) => {
                // Channel disconnected before we got a signal — shouldn't happen.
                Err(Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
            }
        }
    }
}

impl Default for ClientBuilder {
    fn default() -> Self {
        Self::new()
    }
}

fn now_ns() -> u64 {
    use std::time::{SystemTime, UNIX_EPOCH};
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos() as u64
}

fn drain_writes(client: *mut ffi::nwep_client, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
    loop {
        let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
        let n = unsafe {
            ffi::nwep_client_write(client, &mut path, buf.as_mut_ptr(), buf.len(), ts)
        };
        if n <= 0 { break; }
        let remote = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen);
        if let Some(addr) = remote {
            socket.send_to(&buf[..n as usize], addr).ok();
        }
    }
}

fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
    let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
    let (local_sa, local_len) = socketaddr_to_sockaddr_v6(local);
    let (remote_sa, remote_len) = socketaddr_to_sockaddr_v6(remote);
    path.local_addr = local_sa;
    path.local_addrlen = local_len;
    path.remote_addr = remote_sa;
    path.remote_addrlen = remote_len;
    path
}

// Converts any SocketAddr to AF_INET6 using IPv4-mapped addresses for IPv4,
// matching the Go client's fillClientSockaddr behaviour. The nwep C library
// requires consistent AF_INET6 paths on the client side.
fn socketaddr_to_sockaddr_v6(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
    let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
    let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
    sin6.sin6_family = AF_INET6 as sa_family_t;
    let (port, octets) = match addr {
        SocketAddr::V4(v4) => {
            let mut mapped = [0u8; 16];
            mapped[10] = 0xff;
            mapped[11] = 0xff;
            mapped[12..].copy_from_slice(&v4.ip().octets());
            (v4.port(), mapped)
        }
        SocketAddr::V6(v6) => (v6.port(), v6.ip().octets()),
    };
    sin6.sin6_port = port.to_be();
    sin6.sin6_addr.s6_addr = octets;
    (storage, std::mem::size_of::<sockaddr_in6>())
}

// Used for the local address in nwep_client_connect (always AF_INET6).
fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
    socketaddr_to_sockaddr_v6(addr)
}

fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
    let family = storage.ss_family as i32;
    match family {
        AF_INET => {
            let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
            let ip = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
            let port = u16::from_be(sin.sin_port);
            Some(SocketAddr::new(ip.into(), port))
        }
        AF_INET6 => {
            let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
            let octets = sin6.sin6_addr.s6_addr;
            let port = u16::from_be(sin6.sin6_port);
            // Keep as SocketAddr::V6 (including IPv4-mapped ::ffff:x.x.x.x) so
            // send_to works correctly on the IPv6 dual-stack socket.
            let ip = std::net::Ipv6Addr::from(octets);
            Some(SocketAddr::new(ip.into(), port))
        }
        _ => None,
    }
}