Skip to main content

nwep/
server.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2
3use crate::anchorserver::AnchorServer;
4use crate::error::{Error, check};
5use crate::ffi;
6use crate::sock_compat::{AF_INET, AF_INET6, sa_family_t, sockaddr_in, sockaddr_in6};
7use crate::keypair::Keypair;
8use crate::logserver::LogServer;
9use crate::msg::CRequest;
10use crate::role::ServerRole;
11use crate::types::{
12    DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_STREAMS, DEFAULT_TIMEOUT, Header, Identity, NodeId,
13    SECONDS,
14};
15use std::collections::{HashMap, HashSet};
16use std::ffi::CString;
17use std::net::{SocketAddr, UdpSocket};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, mpsc};
20use std::time::Duration;
21
22const UDP_BUF_SIZE: usize = 65536;
23
24/// `Settings` controls transport-level parameters for a NWEP server.
25///
26/// All fields have sensible defaults via [`Settings::default`]; only override what you need.
27pub struct Settings {
28    /// Maximum number of concurrent QUIC streams per connection.
29    pub max_streams: u32,
30    /// Maximum allowed size of a single NWEP message body, in bytes.
31    pub max_message_size: u32,
32    /// QUIC idle timeout in milliseconds.
33    ///
34    /// After this period of inactivity the connection is dropped by the transport layer.
35    pub timeout_ms: u32,
36    /// Optional compression algorithm name (e.g. `"zstd"`).  Empty string means no compression.
37    pub compression: String,
38    /// Role advertised to connecting peers (e.g. `ServerRole::Regular` or `ServerRole::Bootstrap`).
39    pub role: ServerRole,
40}
41
42impl Default for Settings {
43    fn default() -> Self {
44        Settings {
45            max_streams: DEFAULT_MAX_STREAMS,
46            max_message_size: DEFAULT_MAX_MESSAGE_SIZE as u32,
47            timeout_ms: (DEFAULT_TIMEOUT / (SECONDS / 1000)) as u32,
48            compression: String::new(),
49            role: ServerRole::Regular,
50        }
51    }
52}
53
54/// `NotifyOptions` carries optional metadata for a server-initiated push notification.
55///
56/// Pass this to [`Server::notify_with_options`] when you need to attach custom headers or
57/// a stable notification identifier to the push message.
58pub struct NotifyOptions {
59    /// Custom headers to include in the notification message.
60    pub headers: Vec<Header>,
61    /// Optional 16-byte notification identifier.
62    ///
63    /// When `Some`, the C library sets `has_notify_id = 1` in the wire message so the
64    /// peer can deduplicate retransmitted notifications.
65    pub notify_id: Option<[u8; 16]>,
66}
67
68/// `ConnInfo` describes a peer that has connected to (or disconnected from) the server.
69///
70/// It is delivered to the `on_connect` and `on_disconnect` callbacks registered on
71/// [`ServerBuilder`].
72#[derive(Clone, Debug)]
73pub struct ConnInfo {
74    /// The 32-byte node identifier of the peer, derived from its Ed25519 public key.
75    pub node_id: NodeId,
76    /// The full Ed25519 identity of the peer (public key + node ID).
77    pub peer_identity: Identity,
78    /// The role the peer advertised during the handshake.
79    pub role: ServerRole,
80}
81
82enum ServerEvent {
83    Packet {
84        data: Vec<u8>,
85        local: SocketAddr,
86        remote: SocketAddr,
87    },
88    TimerExpiry,
89    Notify {
90        peer_node_id: NodeId,
91        event: String,
92        path: String,
93        body: Vec<u8>,
94        options: Option<NotifyOptions>,
95    },
96    NotifyAll {
97        event: String,
98        path: String,
99        body: Vec<u8>,
100    },
101    CloseConn {
102        peer_node_id: NodeId,
103        error: i32,
104    },
105    Shutdown,
106}
107
108/// `Request` represents an inbound NWEP request delivered to a [`Handler`].
109///
110/// The request is constructed by the event loop from the C library's `nwep_request` struct and
111/// passed (together with a [`ResponseWriter`]) to [`Handler::serve`].
112pub struct Request {
113    /// NWEP method string (e.g. `"read"` or `"write"`).
114    pub method: String,
115    /// Request path (e.g. `"/hello"`).
116    pub path: String,
117    /// Request body bytes.
118    pub body: Vec<u8>,
119    /// 16-byte opaque request identifier generated by the client.
120    pub request_id: [u8; 16],
121    /// 16-byte opaque trace identifier for distributed tracing.
122    pub trace_id: [u8; 16],
123    /// Custom headers sent by the client.
124    pub headers: Vec<Header>,
125    /// Identity and role of the peer that sent this request.
126    pub conn: ConnInfo,
127}
128
129impl Request {
130    /// `header` looks up a request header by name, returning its value if present.
131    pub fn header(&self, name: &str) -> Option<&str> {
132        self.headers
133            .iter()
134            .find(|h| h.name == name)
135            .map(|h| h.value.as_str())
136    }
137}
138
139/// `Handler` is implemented by any type that can process an inbound NWEP request.
140///
141/// The event loop invokes [`Handler::serve`] once per request, on the event loop thread.
142/// Closures of type `Fn(&mut ResponseWriter, &Request) + Send + 'static` implement this
143/// trait automatically, so you can pass a closure wherever a `Handler` is expected.
144pub trait Handler: Send + 'static {
145    /// `serve` processes a single request and writes the response through `w`.
146    ///
147    /// If `serve` returns without having called any write method on `w`, the event loop
148    /// automatically sends an empty `"ok"` response so the stream is always closed cleanly.
149    fn serve(&self, w: &mut ResponseWriter, r: &Request);
150}
151
152impl<F: Fn(&mut ResponseWriter, &Request) + Send + 'static> Handler for F {
153    fn serve(&self, w: &mut ResponseWriter, r: &Request) {
154        self(w, r)
155    }
156}
157
158/// `ResponseWriter` is the handle through which a [`Handler`] writes its response.
159///
160/// Two response modes are supported:
161///
162/// - **Buffered**: call [`set_status`](ResponseWriter::set_status) / [`set_header`](ResponseWriter::set_header)
163///   then [`write`](ResponseWriter::write) (or the shorthand [`respond`](ResponseWriter::respond)).
164///   This sends headers + body atomically and closes the stream.
165/// - **Streaming**: call [`stream_write`](ResponseWriter::stream_write) one or more times, then
166///   [`stream_end`](ResponseWriter::stream_end) (or [`stream_close`](ResponseWriter::stream_close)
167///   on error).  Response headers must have been sent by a prior buffered call in this case, or
168///   the stream is left headerless.
169pub struct ResponseWriter {
170    pub(crate) stream: *mut ffi::nwep_stream,
171    pub(crate) status: String,
172    pub(crate) headers: Vec<Header>,
173    pub(crate) sent: bool,
174}
175
176impl ResponseWriter {
177    /// `set_status` sets the NWEP status string for the pending buffered response.
178    ///
179    /// The default status is `"ok"`.  Must be called before [`write`](ResponseWriter::write).
180    pub fn set_status(&mut self, status: &str) {
181        self.status = status.to_string();
182    }
183
184    /// `set_header` appends a response header to the pending buffered response.
185    pub fn set_header(&mut self, name: &str, value: &str) {
186        self.headers.push(Header::new(name, value));
187    }
188
189    /// `write` sends the buffered response headers and body, then closes the stream.
190    ///
191    /// If called more than once the subsequent calls are no-ops; only the first call
192    /// transmits data.  Use [`stream_write`](ResponseWriter::stream_write) /
193    /// [`stream_end`](ResponseWriter::stream_end) for incremental streaming.
194    ///
195    /// # Errors
196    ///
197    /// Returns an [`Error`] if the underlying `nwep_stream_respond` or `nwep_stream_end`
198    /// C call fails.
199    pub fn write(&mut self, body: &[u8]) -> Result<(), Error> {
200        if self.sent {
201            return Ok(());
202        }
203        self.sent = true;
204        let status = CString::new(self.status.as_str()).unwrap_or_default();
205        let c_headers: Vec<ffi::nwep_header> = self
206            .headers
207            .iter()
208            .map(|h| ffi::nwep_header {
209                name: h.name.as_ptr(),
210                name_len: h.name.len(),
211                value: h.value.as_ptr(),
212                value_len: h.value.len(),
213            })
214            .collect();
215        let resp = ffi::nwep_response {
216            status: status.as_ptr(),
217            status_len: self.status.len(),
218            status_details: std::ptr::null(),
219            status_details_len: 0,
220            headers: if c_headers.is_empty() {
221                std::ptr::null()
222            } else {
223                c_headers.as_ptr()
224            },
225            header_count: c_headers.len(),
226            body: if body.is_empty() {
227                std::ptr::null()
228            } else {
229                body.as_ptr()
230            },
231            body_len: body.len(),
232        };
233        check(unsafe { ffi::nwep_stream_respond(self.stream, &resp) })?;
234        check(unsafe { ffi::nwep_stream_end(self.stream) })?;
235        Ok(())
236    }
237
238    /// `respond` sets `status` and writes `body` in a single call.
239    ///
240    /// Equivalent to calling [`set_status`](ResponseWriter::set_status) followed by
241    /// [`write`](ResponseWriter::write).
242    ///
243    /// # Errors
244    ///
245    /// Returns an [`Error`] if the underlying send fails.
246    pub fn respond(&mut self, status: &str, body: &[u8]) -> Result<(), Error> {
247        self.set_status(status);
248        self.write(body)
249    }
250
251    /// `stream_write` writes a chunk of body data directly to the open QUIC stream.
252    ///
253    /// This is the low-level building block for streaming responses.  Call
254    /// [`stream_end`](ResponseWriter::stream_end) when all chunks have been written.
255    /// Returns the number of bytes written.
256    ///
257    /// # Errors
258    ///
259    /// Returns an [`Error`] if `nwep_stream_write` returns a negative result code.
260    pub fn stream_write(&mut self, data: &[u8]) -> Result<isize, Error> {
261        let n = unsafe { ffi::nwep_stream_write(self.stream, data.as_ptr(), data.len()) };
262        if n < 0 {
263            Err(Error::from_code(n as i32))
264        } else {
265            Ok(n)
266        }
267    }
268
269    /// `stream_end` signals the end of the response body and closes the QUIC stream cleanly.
270    ///
271    /// # Errors
272    ///
273    /// Returns an [`Error`] if `nwep_stream_end` fails.
274    pub fn stream_end(&mut self) -> Result<(), Error> {
275        check(unsafe { ffi::nwep_stream_end(self.stream) })
276    }
277
278    /// `stream_close` aborts the stream with an application error code.
279    ///
280    /// Use this to signal an error mid-stream without sending a complete response.
281    /// Unlike [`stream_end`](ResponseWriter::stream_end), this does not return a result
282    /// because the close is best-effort.
283    pub fn stream_close(&mut self, err: i32) {
284        unsafe { ffi::nwep_stream_close(self.stream, err) }
285    }
286
287    /// `stream_id` returns the QUIC stream identifier for this response stream.
288    ///
289    /// Useful for correlating log output or for advanced stream management.
290    pub fn stream_id(&self) -> i64 {
291        unsafe { ffi::nwep_stream_get_id(self.stream) }
292    }
293
294    /// `is_server_initiated` returns `true` if this stream was opened by the server.
295    ///
296    /// Server-initiated streams correspond to notify messages; client-initiated streams
297    /// correspond to ordinary requests.
298    pub fn is_server_initiated(&self) -> bool {
299        unsafe { ffi::nwep_stream_is_server_initiated(self.stream) != 0 }
300    }
301}
302
303// All fields are owned directly; no double-indirection via raw pointers.
304// This is exclusively accessed from the event loop thread (callbacks fire there too).
305struct CallbackData {
306    handler: Box<dyn Handler>,
307    on_connect: Option<Box<dyn Fn(ConnInfo) + Send>>,
308    on_disconnect: Option<Box<dyn Fn(ConnInfo, i32) + Send>>,
309    // Keyed by NodeId; conn pointers are only valid while the peer is connected.
310    // Maintained by server_on_connect / server_on_disconnect.
311    conns: HashMap<NodeId, *mut ffi::nwep_conn>,
312    // Reverse map: conn pointer (as usize) → NodeId.  Used in server_on_disconnect to
313    // look up the correct NodeId even when nwep_conn_get_peer_identity returns zeroed data.
314    conn_to_node_id: HashMap<usize, NodeId>,
315    // Connections closed via close_connection() that haven't yet been freed by the C library.
316    // server_on_request aborts streams on these connections so no application response is sent
317    // before CONNECTION_CLOSE reaches the client.
318    closing_conns: HashSet<usize>,
319    // Shared with Server so connection_count() / connected_peers() work from any thread.
320    peers: Arc<Mutex<Vec<NodeId>>>,
321    // Optional sub-servers; if set, intercept /log and /checkpoint requests respectively.
322    // Shared with Server so callers can access them after build().
323    log_server: Option<Arc<Mutex<LogServer>>>,
324    anchor_server: Option<Arc<Mutex<AnchorServer>>>,
325}
326
327unsafe extern "C" fn server_on_connect(
328    conn: *mut ffi::nwep_conn,
329    peer: *const ffi::nwep_identity,
330    user_data: *mut std::ffi::c_void,
331) -> std::ffi::c_int {
332    let cb = &mut *(user_data as *mut CallbackData);
333    // The C library populates peer.pubkey but may leave peer.nodeid zeroed.
334    // Derive the node_id from the pubkey when that happens.
335    let mut identity = if peer.is_null() {
336        Identity::default()
337    } else {
338        Identity::from(*peer)
339    };
340    if identity.node_id.0 == [0u8; 32] && identity.pubkey != [0u8; 32] {
341        let mut nid = ffi::nwep_nodeid { data: [0u8; 32] };
342        if ffi::nwep_nodeid_from_pubkey(&mut nid, identity.pubkey.as_ptr()) == 0 {
343            identity.node_id = NodeId(nid.data);
344        }
345    }
346    let node_id = NodeId(identity.node_id.0);
347    let role = if conn.is_null() {
348        ServerRole::Regular
349    } else {
350        let role_ptr = ffi::nwep_conn_get_role(conn);
351        if role_ptr.is_null() {
352            ServerRole::Regular
353        } else {
354            ServerRole::from_str(
355                std::ffi::CStr::from_ptr(role_ptr)
356                    .to_str()
357                    .unwrap_or("regular"),
358            )
359        }
360    };
361
362    // Register conn for Notify and update the shared peer list.
363    if !conn.is_null() {
364        cb.conns.insert(node_id, conn);
365        cb.conn_to_node_id.insert(conn as usize, node_id);
366        if let Ok(mut peers) = cb.peers.lock() {
367            peers.push(node_id);
368        }
369    }
370
371    if let Some(cb_fn) = &cb.on_connect {
372        cb_fn(ConnInfo {
373            node_id,
374            peer_identity: identity,
375            role,
376        });
377    }
378    0
379}
380
381unsafe extern "C" fn server_on_disconnect(
382    conn: *mut ffi::nwep_conn,
383    error: std::ffi::c_int,
384    user_data: *mut std::ffi::c_void,
385) {
386    let cb = &mut *(user_data as *mut CallbackData);
387    // Use the reverse map for a reliable node_id lookup: nwep_conn_get_peer_identity(conn)
388    // may return null or zeroed data when called from a CLOSING/DRAINING connection.
389    let node_id = match cb.conn_to_node_id.remove(&(conn as usize)) {
390        Some(nid) => nid,
391        None => {
392            // Already handled by close_connection(); clean up closing_conns and skip.
393            cb.closing_conns.remove(&(conn as usize));
394            return;
395        }
396    };
397    cb.closing_conns.remove(&(conn as usize));
398    cb.conns.remove(&node_id);
399    if let Ok(mut peers) = cb.peers.lock() {
400        peers.retain(|&n| n != node_id);
401    }
402
403    if let Some(cb_fn) = &cb.on_disconnect {
404        // Build the best available identity, falling back to node_id-only if necessary.
405        let peer = if conn.is_null() {
406            std::ptr::null()
407        } else {
408            ffi::nwep_conn_get_peer_identity(conn)
409        };
410        let identity = if peer.is_null() {
411            Identity {
412                pubkey: [0u8; 32],
413                node_id,
414            }
415        } else {
416            let mut id = Identity::from(*peer);
417            if id.node_id.0 == [0u8; 32] {
418                id.node_id = node_id;
419            }
420            id
421        };
422        cb_fn(
423            ConnInfo {
424                node_id,
425                peer_identity: identity,
426                role: ServerRole::Regular,
427            },
428            error,
429        );
430    }
431}
432
433unsafe extern "C" fn server_on_request(
434    conn: *mut ffi::nwep_conn,
435    stream: *mut ffi::nwep_stream,
436    req: *const ffi::nwep_request,
437    user_data: *mut std::ffi::c_void,
438) -> std::ffi::c_int {
439    let cb = &mut *(user_data as *mut CallbackData);
440    if req.is_null() || stream.is_null() {
441        return 0;
442    }
443    // If this connection is being closed, abort the stream without responding.
444    // The QUIC CLOSING state will send CONNECTION_CLOSE to the client.
445    if !conn.is_null() && cb.closing_conns.contains(&(conn as usize)) {
446        ffi::nwep_stream_close(stream, 0);
447        return 0;
448    }
449    let c_req = CRequest::from_ffi(&*req);
450
451    // Intercept requests for registered sub-servers (matching Go routing).
452    // The C handle_request functions call nwep_stream_respond but NOT nwep_stream_end.
453    if cb.log_server.is_some() || cb.anchor_server.is_some() {
454        if let Some(ref ls_arc) = cb.log_server {
455            if c_req.path == "/log" || c_req.path.starts_with("/log/") {
456                if let Ok(mut ls) = ls_arc.lock() {
457                    if ls.handle_request(stream, req).is_err() {
458                        sub_server_error_respond(stream, crate::protocol::STATUS_INTERNAL_ERROR);
459                    }
460                }
461                ffi::nwep_stream_end(stream);
462                return 0;
463            }
464        }
465        if let Some(ref as_arc) = cb.anchor_server {
466            if c_req.path == "/checkpoint" || c_req.path.starts_with("/checkpoint/") {
467                if let Ok(mut as_) = as_arc.lock() {
468                    if as_.handle_request(stream, req).is_err() {
469                        sub_server_error_respond(stream, crate::protocol::STATUS_INTERNAL_ERROR);
470                    }
471                }
472                ffi::nwep_stream_end(stream);
473                return 0;
474            }
475        }
476    }
477
478    let peer = if conn.is_null() {
479        std::ptr::null()
480    } else {
481        ffi::nwep_conn_get_peer_identity(conn)
482    };
483    let mut identity = if peer.is_null() {
484        Identity::default()
485    } else {
486        Identity::from(*peer)
487    };
488    if identity.node_id.0 == [0u8; 32] && identity.pubkey != [0u8; 32] {
489        let mut nid = ffi::nwep_nodeid { data: [0u8; 32] };
490        if ffi::nwep_nodeid_from_pubkey(&mut nid, identity.pubkey.as_ptr()) == 0 {
491            identity.node_id = NodeId(nid.data);
492        }
493    }
494    let role = if conn.is_null() {
495        ServerRole::Regular
496    } else {
497        let role_ptr = ffi::nwep_conn_get_role(conn);
498        if role_ptr.is_null() {
499            ServerRole::Regular
500        } else {
501            ServerRole::from_str(
502                std::ffi::CStr::from_ptr(role_ptr)
503                    .to_str()
504                    .unwrap_or("regular"),
505            )
506        }
507    };
508    let conn_info = ConnInfo {
509        node_id: NodeId(identity.node_id.0),
510        peer_identity: identity,
511        role,
512    };
513    let request = Request {
514        method: c_req.method,
515        path: c_req.path,
516        body: c_req.body,
517        request_id: c_req.request_id,
518        trace_id: c_req.trace_id,
519        headers: c_req.headers,
520        conn: conn_info,
521    };
522    let mut writer = ResponseWriter {
523        stream,
524        status: crate::protocol::STATUS_OK.to_string(),
525        headers: Vec::new(),
526        sent: false,
527    };
528    cb.handler.serve(&mut writer, &request);
529    if !writer.sent {
530        let _ = writer.write(b"");
531    }
532    0
533}
534
535unsafe extern "C" fn server_rand(
536    dest: *mut u8,
537    len: usize,
538    _user_data: *mut std::ffi::c_void,
539) -> std::ffi::c_int {
540    let slice = std::slice::from_raw_parts_mut(dest, len);
541    match crate::crypto::random_bytes(slice) {
542        Ok(()) => 0,
543        Err(_) => -1,
544    }
545}
546
547/// Sends an error response on `stream` for a failed sub-server request.
548unsafe fn sub_server_error_respond(stream: *mut ffi::nwep_stream, status: &str) {
549    let status_c = CString::new(status).unwrap_or_default();
550    let resp = ffi::nwep_response {
551        status: status_c.as_ptr(),
552        status_len: status.len(),
553        status_details: std::ptr::null(),
554        status_details_len: 0,
555        headers: std::ptr::null(),
556        header_count: 0,
557        body: std::ptr::null(),
558        body_len: 0,
559    };
560    let _ = ffi::nwep_stream_respond(stream, &resp);
561}
562
563/// Calls `nwep_conn_notify` for the given conn and drains writes.
564/// All pointers must remain valid for the duration of this call.
565unsafe fn do_conn_notify(
566    c_server: *mut ffi::nwep_server,
567    conn: *mut ffi::nwep_conn,
568    notify: &ffi::nwep_notify,
569    socket: &UdpSocket,
570    write_buf: &mut Vec<u8>,
571    ts: u64,
572) {
573    let mut stream: *mut ffi::nwep_stream = std::ptr::null_mut();
574    let rc = ffi::nwep_conn_notify(conn, notify, &mut stream);
575    if rc == 0 && !stream.is_null() {
576        ffi::nwep_stream_end(stream);
577    }
578    drain_writes(c_server, socket, write_buf, ts);
579}
580
581/// `Server` is the thread-safe handle to a running NWEP server.
582///
583/// `Server` is obtained from [`ServerBuilder::build`] alongside an [`EventLoop`].  It can be
584/// cloned and sent across threads freely; all mutating operations are communicated to the event
585/// loop via an internal channel.
586pub struct Server {
587    event_tx: mpsc::SyncSender<ServerEvent>,
588    node_id: NodeId,
589    addr: SocketAddr,
590    shutdown_flag: Arc<AtomicBool>,
591    /// Shared peer list; updated by the event loop's connect/disconnect callbacks.
592    peers: Arc<Mutex<Vec<NodeId>>>,
593    /// Shared with CallbackData so callers can access sub-servers after build().
594    log_server: Option<Arc<Mutex<LogServer>>>,
595    anchor_server: Option<Arc<Mutex<AnchorServer>>>,
596}
597
598impl Server {
599    /// `node_id` returns the server's own 32-byte NWEP node identifier.
600    pub fn node_id(&self) -> NodeId {
601        self.node_id
602    }
603
604    /// `addr` returns the local UDP socket address the server is listening on.
605    pub fn addr(&self) -> SocketAddr {
606        self.addr
607    }
608
609    /// `url` formats a `web://` URL for the given path on this server.
610    ///
611    /// If the server was bound to an unspecified address (`0.0.0.0` or `::`), the URL uses
612    /// the loopback address instead, matching Go's `server.URL()` behaviour.
613    ///
614    /// # Example
615    ///
616    /// ```no_run
617    /// # let server: nwep::server::Server = unimplemented!();
618    /// println!("{}", server.url("/hello")); // e.g. "web://127.0.0.1:<node_id>:8080/hello"
619    /// ```
620    pub fn url(&self, path: &str) -> String {
621        use std::net::{IpAddr, Ipv4Addr};
622        // Replace unspecified (0.0.0.0 / ::) with loopback, matching Go's server.URL() behaviour.
623        let ip = match self.addr.ip() {
624            ip if ip.is_unspecified() => IpAddr::V4(Ipv4Addr::LOCALHOST),
625            ip => ip,
626        };
627        let nwep_addr = match ip {
628            IpAddr::V4(v4) => crate::addr::Addr::new_ipv4(v4, self.node_id, self.addr.port()),
629            IpAddr::V6(v6) => crate::addr::Addr::new_ipv6(v6, self.node_id, self.addr.port()),
630        };
631        let url = crate::addr::Url {
632            addr: nwep_addr,
633            path: path.to_string(),
634        };
635        url.format()
636            .unwrap_or_else(|_| format!("web://[{}]:{}{}", ip, self.addr.port(), path))
637    }
638
639    /// `connection_count` returns the number of currently authenticated, connected peers.
640    ///
641    /// The count is updated synchronously on the event loop thread when a peer connects or
642    /// disconnects, so it may lag slightly behind the network state.
643    pub fn connection_count(&self) -> usize {
644        self.peers.lock().map(|p| p.len()).unwrap_or(0)
645    }
646
647    /// `connected_peers` returns a snapshot of the node IDs of all currently connected peers.
648    pub fn connected_peers(&self) -> Vec<NodeId> {
649        self.peers.lock().map(|p| p.clone()).unwrap_or_default()
650    }
651
652    /// `shutdown` signals the event loop to stop accepting new connections and exit.
653    ///
654    /// This call returns immediately; wait for [`EventLoop::run`] to return to confirm the
655    /// server has fully stopped.
656    pub fn shutdown(&self) {
657        self.shutdown_flag.store(true, Ordering::SeqCst);
658        let _ = self.event_tx.send(ServerEvent::Shutdown);
659    }
660
661    /// `notify` sends a server-initiated push message to a single connected peer.
662    ///
663    /// The peer receives the notification in its `on_notify` callback.  `event` is a
664    /// free-form event type string; `path` is the resource path; `body` is the payload.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the internal channel has been closed (i.e. the event loop has
669    /// already stopped).
670    pub fn notify(
671        &self,
672        peer_node_id: NodeId,
673        event: &str,
674        path: &str,
675        body: Vec<u8>,
676    ) -> Result<(), Error> {
677        self.event_tx
678            .send(ServerEvent::Notify {
679                peer_node_id,
680                event: event.to_string(),
681                path: path.to_string(),
682                body,
683                options: None,
684            })
685            .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
686    }
687
688    /// `notify_with_options` sends a server-initiated push message with additional metadata.
689    ///
690    /// Like [`notify`](Server::notify) but also accepts a [`NotifyOptions`] to attach custom
691    /// headers or a stable notification identifier to the wire message.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the internal channel has been closed.
696    pub fn notify_with_options(
697        &self,
698        peer_node_id: NodeId,
699        event: &str,
700        path: &str,
701        body: Vec<u8>,
702        options: NotifyOptions,
703    ) -> Result<(), Error> {
704        self.event_tx
705            .send(ServerEvent::Notify {
706                peer_node_id,
707                event: event.to_string(),
708                path: path.to_string(),
709                body,
710                options: Some(options),
711            })
712            .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
713    }
714
715    /// `notify_all` broadcasts a push notification to every currently connected peer.
716    ///
717    /// The message is sent to the snapshot of connections held by the event loop at the
718    /// time it processes the event; peers that disconnect between the call and processing
719    /// are silently skipped.
720    ///
721    /// # Errors
722    ///
723    /// Returns an error if the internal channel has been closed.
724    pub fn notify_all(&self, event: &str, path: &str, body: Vec<u8>) -> Result<(), Error> {
725        self.event_tx
726            .send(ServerEvent::NotifyAll {
727                event: event.to_string(),
728                path: path.to_string(),
729                body,
730            })
731            .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
732    }
733
734    /// `close_connection` forcefully closes the connection to the identified peer.
735    ///
736    /// The peer is removed from internal state immediately and the `on_disconnect` callback
737    /// fires before the QUIC `CONNECTION_CLOSE` is sent.  This is intentional: the C
738    /// library's QUIC CLOSING state is asynchronous and the disconnect callback would
739    /// otherwise be delayed until the peer sends a packet.
740    ///
741    /// # Errors
742    ///
743    /// Returns an error if the internal channel has been closed.
744    pub fn close_connection(&self, peer_node_id: NodeId, error: i32) -> Result<(), Error> {
745        self.event_tx
746            .send(ServerEvent::CloseConn {
747                peer_node_id,
748                error,
749            })
750            .map_err(|_| Error::from_code(crate::error::ERR_INTERNAL_INVALID_STATE))
751    }
752
753    /// `log_server` returns the shared [`LogServer`] registered during build, if any.
754    ///
755    /// The returned `Arc<Mutex<LogServer>>` is the same instance intercepting `/log/*`
756    /// requests, so callers can inspect or modify log state without additional coordination.
757    pub fn log_server(&self) -> Option<Arc<Mutex<LogServer>>> {
758        self.log_server.clone()
759    }
760
761    /// `anchor_server` returns the shared [`AnchorServer`] registered during build, if any.
762    ///
763    /// The returned `Arc<Mutex<AnchorServer>>` is the same instance intercepting
764    /// `/checkpoint/*` requests.
765    pub fn anchor_server(&self) -> Option<Arc<Mutex<AnchorServer>>> {
766        self.anchor_server.clone()
767    }
768}
769
770/// `ServerBuilder` constructs and configures a NWEP server before binding.
771///
772/// Use the builder pattern to attach callbacks, override [`Settings`], and register optional
773/// sub-servers.  Call [`build`](ServerBuilder::build) when configuration is complete.
774pub struct ServerBuilder {
775    addr: String,
776    keypair: Keypair,
777    settings: Settings,
778    on_connect: Option<Box<dyn Fn(ConnInfo) + Send>>,
779    on_disconnect: Option<Box<dyn Fn(ConnInfo, i32) + Send>>,
780    log_server: Option<Arc<Mutex<LogServer>>>,
781    anchor_server: Option<Arc<Mutex<AnchorServer>>>,
782}
783
784impl ServerBuilder {
785    /// `new` creates a builder that will bind the server to `addr` and authenticate with `keypair`.
786    ///
787    /// `addr` is any string accepted by [`std::net::UdpSocket::bind`] (e.g. `"0.0.0.0:4433"`).
788    /// The `keypair` provides the Ed25519 identity used in the mutual-authentication handshake.
789    pub fn new(addr: impl Into<String>, keypair: Keypair) -> Self {
790        ServerBuilder {
791            addr: addr.into(),
792            keypair,
793            settings: Settings::default(),
794            on_connect: None,
795            on_disconnect: None,
796            log_server: None,
797            anchor_server: None,
798        }
799    }
800
801    /// `settings` overrides the default transport settings for this server.
802    pub fn settings(mut self, s: Settings) -> Self {
803        self.settings = s;
804        self
805    }
806
807    /// `on_connect` registers a callback invoked each time a peer completes the handshake.
808    ///
809    /// The callback receives a [`ConnInfo`] describing the new peer.  It runs on the event
810    /// loop thread, so it should be fast and non-blocking.
811    pub fn on_connect<F: Fn(ConnInfo) + Send + 'static>(mut self, f: F) -> Self {
812        self.on_connect = Some(Box::new(f));
813        self
814    }
815
816    /// `on_disconnect` registers a callback invoked each time a peer disconnects.
817    ///
818    /// The second argument is an error code: `0` means a clean shutdown, non-zero indicates
819    /// a transport error.  The callback runs on the event loop thread.
820    pub fn on_disconnect<F: Fn(ConnInfo, i32) + Send + 'static>(mut self, f: F) -> Self {
821        self.on_disconnect = Some(Box::new(f));
822        self
823    }
824
825    /// Register a `LogServer` to automatically handle requests to `/log` and `/log/*`.
826    /// The `Arc<Mutex<LogServer>>` is shared with the returned `Server` so callers
827    /// can still access it via `Server::log_server()` after starting.
828    pub fn log_server(mut self, ls: Arc<Mutex<LogServer>>) -> Self {
829        self.log_server = Some(ls);
830        self
831    }
832
833    /// Register an `AnchorServer` to automatically handle requests to `/checkpoint` and `/checkpoint/*`.
834    pub fn anchor_server(mut self, as_: Arc<Mutex<AnchorServer>>) -> Self {
835        self.anchor_server = Some(as_);
836        self
837    }
838
839    pub fn build<H: Handler>(mut self, handler: H) -> Result<(Server, EventLoop), Error> {
840        let node_id = self.keypair.node_id()?;
841        // Safety: `nwep_server_new` stores the keypair pointer for use during
842        // TLS signing. We must keep the Keypair alive for the server's lifetime.
843        // The keypair will be moved into the EventLoop below.
844
845        let socket = UdpSocket::bind(&self.addr)
846            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
847        let local_addr = socket
848            .local_addr()
849            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
850
851        let compression = CString::new(self.settings.compression.as_str()).unwrap_or_default();
852        let role_str = CString::new(self.settings.role.as_str()).unwrap_or_default();
853        let ffi_settings = ffi::nwep_settings {
854            max_streams: self.settings.max_streams,
855            max_message_size: self.settings.max_message_size,
856            timeout_ms: self.settings.timeout_ms,
857            compression: compression.as_ptr(),
858            role: role_str.as_ptr(),
859        };
860
861        // Shared peer list between CallbackData (event loop thread) and Server (any thread).
862        let peers = Arc::new(Mutex::new(Vec::<NodeId>::new()));
863
864        let cb_data = Box::new(CallbackData {
865            handler: Box::new(handler),
866            on_connect: self.on_connect,
867            on_disconnect: self.on_disconnect,
868            conns: HashMap::new(),
869            conn_to_node_id: HashMap::new(),
870            closing_conns: HashSet::new(),
871            peers: Arc::clone(&peers),
872            log_server: self.log_server.clone(),
873            anchor_server: self.anchor_server.clone(),
874        });
875        let cb_data_ptr = Box::into_raw(cb_data) as *mut std::ffi::c_void;
876
877        let callbacks = ffi::nwep_callbacks {
878            on_connect: Some(server_on_connect),
879            on_disconnect: Some(server_on_disconnect),
880            on_request: Some(server_on_request),
881            on_response: None,
882            on_notify: None,
883            on_stream_data: None,
884            on_stream_end: None,
885            rand: Some(server_rand),
886            log: None,
887        };
888
889        let mut c_server: *mut ffi::nwep_server = std::ptr::null_mut();
890        check(unsafe {
891            ffi::nwep_server_new(
892                &mut c_server,
893                &ffi_settings,
894                &callbacks,
895                self.keypair.as_ffi_mut(),
896                cb_data_ptr,
897            )
898        })?;
899
900        let (event_tx, event_rx) = mpsc::sync_channel::<ServerEvent>(1024);
901        let shutdown_flag = Arc::new(AtomicBool::new(false));
902
903        let server = Server {
904            event_tx: event_tx.clone(),
905            node_id,
906            addr: local_addr,
907            shutdown_flag: shutdown_flag.clone(),
908            peers,
909            log_server: self.log_server,
910            anchor_server: self.anchor_server,
911        };
912
913        let event_loop = EventLoop {
914            c_server,
915            socket,
916            local_addr,
917            event_rx,
918            event_tx,
919            shutdown_flag,
920            cb_data_ptr,
921            _keypair: self.keypair,
922        };
923
924        Ok((server, event_loop))
925    }
926}
927
928pub struct EventLoop {
929    c_server: *mut ffi::nwep_server,
930    socket: UdpSocket,
931    local_addr: SocketAddr,
932    event_rx: mpsc::Receiver<ServerEvent>,
933    event_tx: mpsc::SyncSender<ServerEvent>,
934    shutdown_flag: Arc<AtomicBool>,
935    cb_data_ptr: *mut std::ffi::c_void,
936    // Keep keypair alive so the C library's stored pointer remains valid
937    // for signing during TLS handshakes.
938    _keypair: crate::keypair::Keypair,
939}
940
941unsafe impl Send for EventLoop {}
942
943impl EventLoop {
944    pub fn run(self) -> Result<(), Error> {
945        // Spawn UDP reader thread
946        let socket_recv = self
947            .socket
948            .try_clone()
949            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
950        let event_tx_recv = self.event_tx.clone();
951        let local_addr = self.local_addr;
952        let shutdown_flag_recv = self.shutdown_flag.clone();
953        std::thread::spawn(move || {
954            let mut buf = vec![0u8; UDP_BUF_SIZE];
955            loop {
956                if shutdown_flag_recv.load(Ordering::SeqCst) {
957                    break;
958                }
959                socket_recv
960                    .set_read_timeout(Some(Duration::from_millis(100)))
961                    .ok();
962                match socket_recv.recv_from(&mut buf) {
963                    Ok((n, remote)) => {
964                        let _ = event_tx_recv.send(ServerEvent::Packet {
965                            data: buf[..n].to_vec(),
966                            local: local_addr,
967                            remote,
968                        });
969                    }
970                    Err(e)
971                        if e.kind() == std::io::ErrorKind::WouldBlock
972                            || e.kind() == std::io::ErrorKind::TimedOut => {}
973                    Err(_) => {
974                        if shutdown_flag_recv.load(Ordering::SeqCst) {
975                            break;
976                        }
977                    }
978                }
979            }
980        });
981
982        // Timer thread
983        let event_tx_timer = self.event_tx.clone();
984        let shutdown_flag_timer = self.shutdown_flag.clone();
985        std::thread::spawn(move || {
986            loop {
987                std::thread::sleep(Duration::from_millis(10));
988                if shutdown_flag_timer.load(Ordering::SeqCst) {
989                    break;
990                }
991                let _ = event_tx_timer.send(ServerEvent::TimerExpiry);
992            }
993        });
994
995        let mut write_buf = vec![0u8; UDP_BUF_SIZE];
996        let socket_send = self
997            .socket
998            .try_clone()
999            .map_err(|_| Error::from_code(crate::error::ERR_NETWORK_SOCKET))?;
1000
1001        loop {
1002            let event = match self.event_rx.recv_timeout(Duration::from_millis(100)) {
1003                Ok(e) => e,
1004                Err(mpsc::RecvTimeoutError::Timeout) => {
1005                    if self.shutdown_flag.load(Ordering::SeqCst) {
1006                        break;
1007                    }
1008                    continue;
1009                }
1010                Err(mpsc::RecvTimeoutError::Disconnected) => break,
1011            };
1012
1013            let ts = now_ns();
1014
1015            match event {
1016                ServerEvent::Shutdown => break,
1017
1018                ServerEvent::Packet {
1019                    data,
1020                    local,
1021                    remote,
1022                } => {
1023                    let path = make_path(&local, &remote);
1024                    unsafe {
1025                        ffi::nwep_server_read(self.c_server, &path, data.as_ptr(), data.len(), ts);
1026                    }
1027                    drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1028                }
1029
1030                ServerEvent::TimerExpiry => {
1031                    let expiry = unsafe { ffi::nwep_server_get_expiry(self.c_server) };
1032                    if expiry != u64::MAX && ts >= expiry {
1033                        unsafe {
1034                            ffi::nwep_server_handle_expiry(self.c_server, ts);
1035                        }
1036                        drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1037                    }
1038                }
1039
1040                ServerEvent::Notify {
1041                    peer_node_id,
1042                    event,
1043                    path,
1044                    body,
1045                    options,
1046                } => {
1047                    let cb = unsafe { &*(self.cb_data_ptr as *const CallbackData) };
1048                    if let Some(&conn) = cb.conns.get(&peer_node_id) {
1049                        let ev = CString::new(event.as_str()).unwrap_or_default();
1050                        let pth = CString::new(path.as_str()).unwrap_or_default();
1051                        // Keep header strings alive across the FFI call.
1052                        let c_headers: Vec<ffi::nwep_header> = options
1053                            .as_ref()
1054                            .map(|o| {
1055                                o.headers
1056                                    .iter()
1057                                    .map(|h| ffi::nwep_header {
1058                                        name: h.name.as_ptr(),
1059                                        name_len: h.name.len(),
1060                                        value: h.value.as_ptr(),
1061                                        value_len: h.value.len(),
1062                                    })
1063                                    .collect()
1064                            })
1065                            .unwrap_or_default();
1066                        let notify = ffi::nwep_notify {
1067                            event: ev.as_ptr(),
1068                            event_len: event.len(),
1069                            path: pth.as_ptr(),
1070                            path_len: path.len(),
1071                            notify_id: options
1072                                .as_ref()
1073                                .and_then(|o| o.notify_id)
1074                                .unwrap_or([0u8; 16]),
1075                            has_notify_id: options.as_ref().and_then(|o| o.notify_id).is_some()
1076                                as i32,
1077                            headers: if c_headers.is_empty() {
1078                                std::ptr::null()
1079                            } else {
1080                                c_headers.as_ptr()
1081                            },
1082                            header_count: c_headers.len(),
1083                            body: if body.is_empty() {
1084                                std::ptr::null()
1085                            } else {
1086                                body.as_ptr()
1087                            },
1088                            body_len: body.len(),
1089                        };
1090                        unsafe {
1091                            do_conn_notify(
1092                                self.c_server,
1093                                conn,
1094                                &notify,
1095                                &socket_send,
1096                                &mut write_buf,
1097                                ts,
1098                            );
1099                        }
1100                    }
1101                }
1102
1103                ServerEvent::NotifyAll { event, path, body } => {
1104                    let cb = unsafe { &*(self.cb_data_ptr as *const CallbackData) };
1105                    // Snapshot conn list so we don't hold cb borrow across async ops.
1106                    let conns: Vec<*mut ffi::nwep_conn> = cb.conns.values().copied().collect();
1107                    if !conns.is_empty() {
1108                        let ev = CString::new(event.as_str()).unwrap_or_default();
1109                        let pth = CString::new(path.as_str()).unwrap_or_default();
1110                        let notify = ffi::nwep_notify {
1111                            event: ev.as_ptr(),
1112                            event_len: event.len(),
1113                            path: pth.as_ptr(),
1114                            path_len: path.len(),
1115                            notify_id: [0u8; 16],
1116                            has_notify_id: 0,
1117                            headers: std::ptr::null(),
1118                            header_count: 0,
1119                            body: if body.is_empty() {
1120                                std::ptr::null()
1121                            } else {
1122                                body.as_ptr()
1123                            },
1124                            body_len: body.len(),
1125                        };
1126                        for conn in conns {
1127                            unsafe {
1128                                do_conn_notify(
1129                                    self.c_server,
1130                                    conn,
1131                                    &notify,
1132                                    &socket_send,
1133                                    &mut write_buf,
1134                                    ts,
1135                                );
1136                            }
1137                        }
1138                    }
1139                }
1140
1141                ServerEvent::CloseConn {
1142                    peer_node_id,
1143                    error,
1144                } => {
1145                    let cb = unsafe { &mut *(self.cb_data_ptr as *mut CallbackData) };
1146                    if let Some(&conn) = cb.conns.get(&peer_node_id) {
1147                        let conn_key = conn as usize;
1148                        // Remove from active maps immediately so connection_count() and
1149                        // connected_peers() reflect the close without waiting for the C library.
1150                        let node_id = cb.conn_to_node_id.remove(&conn_key).unwrap_or(peer_node_id);
1151                        cb.conns.remove(&peer_node_id);
1152                        if let Ok(mut peers) = cb.peers.lock() {
1153                            peers.retain(|&n| n != peer_node_id);
1154                        }
1155                        // Track as closing so server_on_request aborts any incoming streams.
1156                        cb.closing_conns.insert(conn_key);
1157                        // Manually fire the Rust disconnect callback.  The C library won't do
1158                        // it promptly because nwep_conn_close only enters QUIC CLOSING state;
1159                        // it sends CONNECTION_CLOSE only in response to incoming client packets.
1160                        if let Some(cb_fn) = &cb.on_disconnect {
1161                            let peer = unsafe {
1162                                if conn.is_null() {
1163                                    std::ptr::null()
1164                                } else {
1165                                    ffi::nwep_conn_get_peer_identity(conn)
1166                                }
1167                            };
1168                            let identity = if peer.is_null() {
1169                                Identity {
1170                                    pubkey: [0u8; 32],
1171                                    node_id,
1172                                }
1173                            } else {
1174                                let mut id = unsafe { Identity::from(*peer) };
1175                                if id.node_id.0 == [0u8; 32] {
1176                                    id.node_id = node_id;
1177                                }
1178                                id
1179                            };
1180                            cb_fn(
1181                                ConnInfo {
1182                                    node_id,
1183                                    peer_identity: identity,
1184                                    role: ServerRole::Regular,
1185                                },
1186                                error,
1187                            );
1188                        }
1189                        unsafe {
1190                            ffi::nwep_conn_close(conn, error);
1191                        }
1192                        drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1193                    }
1194                }
1195            }
1196        }
1197
1198        unsafe {
1199            ffi::nwep_server_close(self.c_server);
1200            // Pump timer ticks so CONNECTION_CLOSE packets are sent to all connected clients.
1201            // nwep_server_close schedules the close; handle_expiry calls generate the packets.
1202            let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
1203            loop {
1204                let ts = now_ns();
1205                ffi::nwep_server_handle_expiry(self.c_server, ts);
1206                drain_writes(self.c_server, &socket_send, &mut write_buf, ts);
1207                let expiry = ffi::nwep_server_get_expiry(self.c_server);
1208                if expiry == u64::MAX || std::time::Instant::now() >= deadline {
1209                    break;
1210                }
1211                std::thread::sleep(std::time::Duration::from_millis(10));
1212            }
1213            ffi::nwep_server_free(self.c_server);
1214        }
1215
1216        Ok(())
1217    }
1218}
1219
1220impl Drop for EventLoop {
1221    fn drop(&mut self) {
1222        if !self.cb_data_ptr.is_null() {
1223            unsafe {
1224                drop(Box::from_raw(self.cb_data_ptr as *mut CallbackData));
1225            }
1226            self.cb_data_ptr = std::ptr::null_mut();
1227        }
1228    }
1229}
1230
1231fn now_ns() -> u64 {
1232    use std::time::{SystemTime, UNIX_EPOCH};
1233    SystemTime::now()
1234        .duration_since(UNIX_EPOCH)
1235        .unwrap_or_default()
1236        .as_nanos() as u64
1237}
1238
1239fn drain_writes(server: *mut ffi::nwep_server, socket: &UdpSocket, buf: &mut Vec<u8>, ts: u64) {
1240    loop {
1241        let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
1242        let n =
1243            unsafe { ffi::nwep_server_write(server, &mut path, buf.as_mut_ptr(), buf.len(), ts) };
1244        if n <= 0 {
1245            break;
1246        }
1247        if let Some(addr) = sockaddr_to_socketaddr(&path.remote_addr, path.remote_addrlen) {
1248            socket.send_to(&buf[..n as usize], addr).ok();
1249        }
1250    }
1251}
1252
1253fn make_path(local: &SocketAddr, remote: &SocketAddr) -> ffi::nwep_path {
1254    let mut path = unsafe { std::mem::zeroed::<ffi::nwep_path>() };
1255    let (local_sa, local_len) = socketaddr_to_sockaddr(local);
1256    let (remote_sa, remote_len) = socketaddr_to_sockaddr(remote);
1257    path.local_addr = local_sa;
1258    path.local_addrlen = local_len;
1259    path.remote_addr = remote_sa;
1260    path.remote_addrlen = remote_len;
1261    path
1262}
1263
1264fn socketaddr_to_sockaddr(addr: &SocketAddr) -> (ffi::sockaddr_storage, usize) {
1265    let mut storage = unsafe { std::mem::zeroed::<ffi::sockaddr_storage>() };
1266    // Matching Go's fillServerSockaddr: use AF_INET for IPv4 addresses (including
1267    // IPv4-mapped IPv6 ::ffff:x.x.x.x received from a dual-stack socket), and
1268    // AF_INET6 for pure IPv6.
1269    let (is_v4, v4_octets, port) = match addr {
1270        SocketAddr::V4(v4) => (true, Some(v4.ip().octets()), v4.port()),
1271        SocketAddr::V6(v6) => {
1272            let octets = v6.ip().octets();
1273            let is_mapped =
1274                octets[..10].iter().all(|&b| b == 0) && octets[10] == 0xff && octets[11] == 0xff;
1275            if is_mapped {
1276                (
1277                    true,
1278                    Some([octets[12], octets[13], octets[14], octets[15]]),
1279                    v6.port(),
1280                )
1281            } else {
1282                (false, None, v6.port())
1283            }
1284        }
1285    };
1286    if is_v4 {
1287        let sin: &mut sockaddr_in = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
1288        sin.sin_family = AF_INET as sa_family_t;
1289        sin.sin_port = port.to_be();
1290        sin.sin_addr.s_addr = u32::from_ne_bytes(v4_octets.unwrap());
1291        (storage, std::mem::size_of::<sockaddr_in>())
1292    } else {
1293        let sin6: &mut sockaddr_in6 = unsafe { &mut *((&mut storage) as *mut _ as *mut _) };
1294        sin6.sin6_family = AF_INET6 as sa_family_t;
1295        sin6.sin6_port = port.to_be();
1296        sin6.sin6_addr.s6_addr = match addr {
1297            SocketAddr::V6(v6) => v6.ip().octets(),
1298            SocketAddr::V4(_) => unreachable!(),
1299        };
1300        (storage, std::mem::size_of::<sockaddr_in6>())
1301    }
1302}
1303
1304fn sockaddr_to_socketaddr(storage: &ffi::sockaddr_storage, _len: usize) -> Option<SocketAddr> {
1305    let family = storage.ss_family as i32;
1306    match family {
1307        AF_INET => {
1308            // Promote to IPv4-mapped IPv6 so send_to works on the AF_INET6 server socket.
1309            let sin: &sockaddr_in = unsafe { &*(storage as *const _ as *const _) };
1310            let v4 = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr));
1311            let port = u16::from_be(sin.sin_port);
1312            let mapped = std::net::Ipv6Addr::from(v4.to_ipv6_mapped().octets());
1313            Some(SocketAddr::new(mapped.into(), port))
1314        }
1315        AF_INET6 => {
1316            let sin6: &sockaddr_in6 = unsafe { &*(storage as *const _ as *const _) };
1317            let ip = std::net::Ipv6Addr::from(sin6.sin6_addr.s6_addr);
1318            let port = u16::from_be(sin6.sin6_port);
1319            Some(SocketAddr::new(ip.into(), port))
1320        }
1321        _ => None,
1322    }
1323}
1324
1325pub struct Router {
1326    routes: Vec<(String, Box<dyn Handler>)>,
1327    prefix_routes: Vec<(String, Box<dyn Handler>)>,
1328}
1329
1330impl Router {
1331    pub fn new() -> Self {
1332        Router {
1333            routes: Vec::new(),
1334            prefix_routes: Vec::new(),
1335        }
1336    }
1337
1338    pub fn handle(&mut self, path: &str, h: impl Handler) {
1339        self.routes.push((path.to_string(), Box::new(h)));
1340    }
1341
1342    pub fn handle_func<F: Fn(&mut ResponseWriter, &Request) + Send + 'static>(
1343        &mut self,
1344        path: &str,
1345        f: F,
1346    ) {
1347        self.routes.push((path.to_string(), Box::new(f)));
1348    }
1349
1350    pub fn handle_prefix(&mut self, prefix: &str, h: impl Handler) {
1351        self.prefix_routes.push((prefix.to_string(), Box::new(h)));
1352    }
1353}
1354
1355impl Handler for Router {
1356    fn serve(&self, w: &mut ResponseWriter, r: &Request) {
1357        for (path, h) in &self.routes {
1358            if path == &r.path {
1359                h.serve(w, r);
1360                return;
1361            }
1362        }
1363        for (prefix, h) in &self.prefix_routes {
1364            if r.path.starts_with(prefix.as_str()) {
1365                h.serve(w, r);
1366                return;
1367            }
1368        }
1369        let _ = w.respond(crate::protocol::STATUS_NOT_FOUND, b"");
1370    }
1371}
1372
1373impl Default for Router {
1374    fn default() -> Self {
1375        Self::new()
1376    }
1377}