Skip to main content

fips_core/
endpoint.rs

1//! Library-first endpoint API for embedding FIPS in applications.
2//!
3//! This module exposes a no-system-TUN runtime shape for apps that want to own
4//! peer admission and local routing policy while reusing FIPS connectivity.
5
6use crate::config::{EthernetConfig, NostrDiscoveryPolicy, TransportInstances, UdpConfig};
7use crate::node::{
8    EndpointCommandLane, NodeEndpointCommand, NodeEndpointEvent, NodeEndpointPeer,
9    NodeEndpointRelayStatus,
10};
11use crate::{
12    Config, FipsAddress, IdentityConfig, Node, NodeAddr, NodeDeliveredPacket, NodeError,
13    PeerIdentity,
14};
15use std::sync::Arc;
16use thiserror::Error;
17use tokio::sync::{Mutex, mpsc, oneshot};
18use tokio::task::JoinHandle;
19
20#[cfg(debug_assertions)]
21fn endpoint_debug_log(message: impl AsRef<str>) {
22    use std::io::Write as _;
23
24    if let Ok(mut file) = std::fs::OpenOptions::new()
25        .create(true)
26        .append(true)
27        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
28    {
29        let _ = writeln!(
30            file,
31            "{:?} {}",
32            std::time::SystemTime::now(),
33            message.as_ref()
34        );
35    }
36}
37
38#[cfg(not(debug_assertions))]
39fn endpoint_debug_log(_message: impl AsRef<str>) {}
40
41/// Errors returned by the endpoint API.
42#[derive(Debug, Error)]
43pub enum FipsEndpointError {
44    #[error("node error: {0}")]
45    Node(#[from] NodeError),
46
47    #[error("endpoint task failed: {0}")]
48    TaskJoin(#[from] tokio::task::JoinError),
49
50    #[error("endpoint is closed")]
51    Closed,
52
53    #[error("invalid remote npub '{npub}': {reason}")]
54    InvalidRemoteNpub { npub: String, reason: String },
55}
56
57/// Source-attributed endpoint data delivered to an embedded application.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct FipsEndpointMessage {
60    /// FIPS node address that originated the endpoint data.
61    pub source_node_addr: NodeAddr,
62    /// Source Nostr public key when the node has learned it.
63    pub source_npub: Option<String>,
64    /// Application-owned payload bytes.
65    pub data: Vec<u8>,
66}
67
68/// Reports what changed in response to [`FipsEndpoint::update_peers`].
69#[derive(Debug, Clone, Default, PartialEq, Eq)]
70pub struct UpdatePeersOutcome {
71    /// Number of npubs that were not previously in the runtime peer list
72    /// and got an `initiate_peer_connection` call.
73    pub added: usize,
74    /// Number of npubs that were dropped from the runtime peer list. Their
75    /// retry entries are gone; any active session stays up until the
76    /// regular liveness timeout reaps it.
77    pub removed: usize,
78    /// Number of npubs that were already in the list but had a different
79    /// `addresses`, `alias`, `connect_policy`, or `auto_reconnect` value.
80    /// The new values are now in effect for retries and aliasing; refreshed
81    /// direct addresses may also trigger a new direct dial for auto peers.
82    pub updated: usize,
83    /// Number of npubs that were in the list and identical to the new entry.
84    pub unchanged: usize,
85}
86
87impl From<crate::node::UpdatePeersOutcome> for UpdatePeersOutcome {
88    fn from(value: crate::node::UpdatePeersOutcome) -> Self {
89        Self {
90            added: value.added,
91            removed: value.removed,
92            updated: value.updated,
93            unchanged: value.unchanged,
94        }
95    }
96}
97
98/// Authenticated FIPS peer state visible to an embedded application.
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct FipsEndpointPeer {
101    /// Peer Nostr public key.
102    pub npub: String,
103    /// Whether an authenticated link-layer peer is currently active.
104    pub connected: bool,
105    /// Current underlay transport address, when a link has authenticated.
106    pub transport_addr: Option<String>,
107    /// Current underlay transport kind, when known.
108    pub transport_type: Option<String>,
109    /// Authenticated link id.
110    pub link_id: u64,
111    /// Smoothed RTT in milliseconds, once measured by FIPS MMP.
112    pub srtt_ms: Option<u64>,
113    /// Link packets sent.
114    pub packets_sent: u64,
115    /// Link packets received.
116    pub packets_recv: u64,
117    /// Link bytes sent.
118    pub bytes_sent: u64,
119    /// Link bytes received.
120    pub bytes_recv: u64,
121    /// Whether direct UDP probing is queued while this peer may still be
122    /// reachable through a fallback transport.
123    pub direct_probe_pending: bool,
124    /// Millisecond timestamp when the queued direct probe becomes eligible.
125    pub direct_probe_after_ms: Option<u64>,
126}
127
128/// Live Nostr relay state visible to an embedded application.
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct FipsEndpointRelayStatus {
131    pub url: String,
132    pub status: String,
133}
134
135/// Builder for an embedded FIPS endpoint.
136#[derive(Debug, Clone)]
137pub struct FipsEndpointBuilder {
138    config: Config,
139    identity_nsec: Option<String>,
140    discovery_scope: Option<String>,
141    local_ethernet_interfaces: Vec<String>,
142    disable_system_networking: bool,
143    packet_channel_capacity: usize,
144}
145
146impl Default for FipsEndpointBuilder {
147    fn default() -> Self {
148        Self {
149            config: Config::new(),
150            identity_nsec: None,
151            discovery_scope: None,
152            local_ethernet_interfaces: Vec::new(),
153            disable_system_networking: true,
154            packet_channel_capacity: 1024,
155        }
156    }
157}
158
159impl FipsEndpointBuilder {
160    /// Start from an explicit FIPS config.
161    pub fn config(mut self, config: Config) -> Self {
162        self.config = config;
163        self
164    }
165
166    /// Use an `nsec` or hex secret for the endpoint identity.
167    pub fn identity_nsec(mut self, nsec: impl Into<String>) -> Self {
168        self.identity_nsec = Some(nsec.into());
169        self
170    }
171
172    /// Set an application-level discovery scope.
173    ///
174    /// When the builder owns the default empty connectivity config, this also
175    /// enables scoped Nostr discovery, open same-scope peer discovery, local
176    /// LAN candidates, and a UDP NAT advert. If an explicit transport or
177    /// Nostr config was supplied, the explicit config is left in control and
178    /// the scope is retained as endpoint metadata.
179    pub fn discovery_scope(mut self, scope: impl Into<String>) -> Self {
180        self.discovery_scope = Some(scope.into());
181        self
182    }
183
184    /// Enable host-local Ethernet discovery on a private L2 interface.
185    ///
186    /// This is intended for veth/TAP interfaces attached to a per-host bridge
187    /// shared by FIPS-aware applications. The endpoint announces Ethernet
188    /// beacons, listens for matching peers, auto-connects to them, and accepts
189    /// inbound handshakes over the interface.
190    pub fn local_ethernet(mut self, interface: impl Into<String>) -> Self {
191        self.local_ethernet_interfaces.push(interface.into());
192        self
193    }
194
195    /// Disable FIPS-owned TUN and DNS system integration.
196    pub fn without_system_tun(mut self) -> Self {
197        self.disable_system_networking = true;
198        self
199    }
200
201    /// Set the app packet/data channel capacity.
202    pub fn packet_channel_capacity(mut self, capacity: usize) -> Self {
203        self.packet_channel_capacity = capacity.max(1);
204        self
205    }
206
207    fn prepared_config(&self) -> Config {
208        let mut config = self.config.clone();
209        if let Some(nsec) = &self.identity_nsec {
210            config.node.identity = IdentityConfig {
211                nsec: Some(nsec.clone()),
212                persistent: false,
213            };
214        }
215        if self.disable_system_networking {
216            config.tun.enabled = false;
217            config.dns.enabled = false;
218            config.node.system_files_enabled = false;
219        }
220        if let Some(scope) = self.discovery_scope.as_deref() {
221            config.node.discovery.lan.scope = Some(scope.to_string());
222            config.node.discovery.local.enabled = true;
223            apply_default_scoped_discovery(&mut config, scope);
224        }
225        for interface in &self.local_ethernet_interfaces {
226            add_endpoint_ethernet_transport(
227                &mut config,
228                interface,
229                self.discovery_scope.as_deref(),
230            );
231        }
232        config
233    }
234
235    /// Bind and start the embedded endpoint.
236    pub async fn bind(self) -> Result<FipsEndpoint, FipsEndpointError> {
237        endpoint_debug_log("FipsEndpointBuilder::bind begin");
238        let config = self.prepared_config();
239        endpoint_debug_log("FipsEndpointBuilder::bind config prepared");
240
241        let mut node = Node::new(config)?;
242        endpoint_debug_log("FipsEndpointBuilder::bind node created");
243        let npub = node.npub();
244        let node_addr = *node.node_addr();
245        let address = *node.identity().address();
246        let packet_io = node.attach_external_packet_io(self.packet_channel_capacity)?;
247        endpoint_debug_log("FipsEndpointBuilder::bind packet io attached");
248        let endpoint_data_io = node.attach_endpoint_data_io(self.packet_channel_capacity)?;
249        endpoint_debug_log("FipsEndpointBuilder::bind endpoint data io attached");
250        endpoint_debug_log("FipsEndpointBuilder::bind node.start begin");
251        node.start().await?;
252        endpoint_debug_log("FipsEndpointBuilder::bind node.start complete");
253
254        let (shutdown_tx, shutdown_rx) = oneshot::channel();
255        let task = spawn_node_task(node, shutdown_rx);
256        endpoint_debug_log("FipsEndpointBuilder::bind node task spawned");
257        let endpoint_priority_commands = endpoint_data_io.priority_command_tx;
258        let endpoint_commands = endpoint_data_io.command_tx;
259
260        Ok(FipsEndpoint {
261            npub,
262            node_addr,
263            address,
264            discovery_scope: self.discovery_scope,
265            outbound_packets: packet_io.outbound_tx,
266            delivered_packets: Arc::new(Mutex::new(packet_io.inbound_rx)),
267            endpoint_priority_commands,
268            endpoint_commands,
269            inbound_endpoint_tx: endpoint_data_io.event_tx,
270            inbound_endpoint_rx: Arc::new(Mutex::new(endpoint_data_io.event_rx)),
271            peer_identity_cache: std::sync::Mutex::new(std::collections::HashMap::new()),
272            shutdown_tx: Some(shutdown_tx),
273            task,
274        })
275    }
276}
277
278fn apply_default_scoped_discovery(config: &mut Config, scope: &str) {
279    if config.node.discovery.nostr.enabled || !config.transports.is_empty() {
280        return;
281    }
282
283    config.node.discovery.nostr.enabled = true;
284    config.node.discovery.nostr.advertise = true;
285    config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
286    config.node.discovery.nostr.share_local_candidates = true;
287    config.node.discovery.nostr.app = scope.to_string();
288    config.node.discovery.lan.scope = Some(scope.to_string());
289    config.node.discovery.local.enabled = true;
290    config.transports.udp = TransportInstances::Single(UdpConfig {
291        bind_addr: Some("0.0.0.0:0".to_string()),
292        advertise_on_nostr: Some(true),
293        public: Some(false),
294        outbound_only: Some(false),
295        accept_connections: Some(true),
296        ..UdpConfig::default()
297    });
298}
299
300fn endpoint_ethernet_config(interface: &str, scope: Option<&str>) -> EthernetConfig {
301    EthernetConfig {
302        interface: interface.to_string(),
303        discovery: Some(true),
304        announce: Some(true),
305        auto_connect: Some(true),
306        accept_connections: Some(true),
307        discovery_scope: scope
308            .map(str::trim)
309            .filter(|s| !s.is_empty())
310            .map(str::to_string),
311        ..EthernetConfig::default()
312    }
313}
314
315fn add_endpoint_ethernet_transport(config: &mut Config, interface: &str, scope: Option<&str>) {
316    let eth = endpoint_ethernet_config(interface, scope);
317    if config.transports.ethernet.is_empty() {
318        config.transports.ethernet = TransportInstances::Single(eth);
319        return;
320    }
321
322    let existing = std::mem::take(&mut config.transports.ethernet);
323    let mut named = match existing {
324        TransportInstances::Single(config) => {
325            let mut map = std::collections::HashMap::new();
326            map.insert("default".to_string(), config);
327            map
328        }
329        TransportInstances::Named(map) => map,
330    };
331
332    let base_name = endpoint_ethernet_instance_name(interface);
333    let mut name = base_name.clone();
334    let mut suffix = 2usize;
335    while named.contains_key(&name) {
336        name = format!("{base_name}-{suffix}");
337        suffix += 1;
338    }
339    named.insert(name, eth);
340    config.transports.ethernet = TransportInstances::Named(named);
341}
342
343fn endpoint_ethernet_instance_name(interface: &str) -> String {
344    let suffix: String = interface
345        .chars()
346        .map(|c| {
347            if c.is_ascii_alphanumeric() {
348                c.to_ascii_lowercase()
349            } else {
350                '-'
351            }
352        })
353        .collect();
354    let suffix = suffix.trim_matches('-');
355    if suffix.is_empty() {
356        "local-ethernet".to_string()
357    } else {
358        format!("local-ethernet-{suffix}")
359    }
360}
361
362fn spawn_node_task(
363    mut node: Node,
364    shutdown_rx: oneshot::Receiver<()>,
365) -> JoinHandle<Result<(), NodeError>> {
366    tokio::spawn(async move {
367        tokio::pin!(shutdown_rx);
368        let loop_result = tokio::select! {
369            result = node.run_rx_loop() => result,
370            _ = &mut shutdown_rx => Ok(()),
371        };
372        let stop_result = if node.state().can_stop() {
373            node.stop().await
374        } else {
375            Ok(())
376        };
377        loop_result?;
378        stop_result
379    })
380}
381
382/// A running embedded FIPS endpoint.
383pub struct FipsEndpoint {
384    npub: String,
385    node_addr: NodeAddr,
386    address: FipsAddress,
387    discovery_scope: Option<String>,
388    outbound_packets: mpsc::Sender<Vec<u8>>,
389    delivered_packets: Arc<Mutex<mpsc::Receiver<NodeDeliveredPacket>>>,
390    endpoint_priority_commands: mpsc::Sender<NodeEndpointCommand>,
391    endpoint_commands: mpsc::Sender<NodeEndpointCommand>,
392    /// In-process loopback sender — `send()` to our own npub injects an
393    /// event into the same queue without going through the wire/encrypt
394    /// path. The node's rx_loop also sends into this channel directly
395    /// (it holds a clone of this sender) so there is no per-packet relay
396    /// task between the node task and `recv()`.
397    inbound_endpoint_tx: mpsc::UnboundedSender<NodeEndpointEvent>,
398    /// Unbounded receiver. Was previously fed by a per-packet relay task
399    /// that translated `NodeEndpointEvent::Data` into `FipsEndpointMessage`
400    /// across an additional bounded mpsc; collapsed into a single channel
401    /// — the translation happens inline in `recv()` and the second hop
402    /// (with its scheduler wake per packet) is gone.
403    inbound_endpoint_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeEndpointEvent>>>,
404    /// Cache of resolved PeerIdentity by npub string. Avoids the per-packet
405    /// secp256k1 EC point parse that `PeerIdentity::from_npub` performs;
406    /// without this cache the bulk-data send hot path spends ~10–30% of CPU
407    /// re-validating identity bytes the application has already configured.
408    peer_identity_cache: std::sync::Mutex<std::collections::HashMap<String, PeerIdentity>>,
409    shutdown_tx: Option<oneshot::Sender<()>>,
410    task: JoinHandle<Result<(), NodeError>>,
411}
412
413impl FipsEndpoint {
414    /// Create a builder for an embedded endpoint.
415    pub fn builder() -> FipsEndpointBuilder {
416        FipsEndpointBuilder::default()
417    }
418
419    /// Local endpoint npub.
420    pub fn npub(&self) -> &str {
421        &self.npub
422    }
423
424    /// Local FIPS node address.
425    pub fn node_addr(&self) -> &NodeAddr {
426        &self.node_addr
427    }
428
429    /// Local FIPS IPv6-compatible address.
430    pub fn address(&self) -> FipsAddress {
431        self.address
432    }
433
434    /// Application-level discovery scope, if configured.
435    pub fn discovery_scope(&self) -> Option<&str> {
436        self.discovery_scope.as_deref()
437    }
438
439    /// Send application-owned endpoint data to a remote npub.
440    ///
441    /// Fire-and-forget: enqueues the Send command on the node task and
442    /// returns once the command channel accepts it. The node task's send
443    /// result is discarded — TCP and the upper protocol handle loss
444    /// recovery, and the per-packet oneshot round-trip the previous design
445    /// used for error reporting added several hundred microseconds of
446    /// queueing latency under load (measured: 456ms avg ping under iperf3
447    /// saturation → 1ms after this change, 430× lower).
448    ///
449    /// PeerIdentity for `remote_npub` is cached after first resolution to
450    /// avoid the secp256k1 EC point parse on every packet.
451    pub async fn send(
452        &self,
453        remote_npub: impl Into<String>,
454        data: impl Into<Vec<u8>>,
455    ) -> Result<(), FipsEndpointError> {
456        let remote_npub = remote_npub.into();
457        let data = data.into();
458        if remote_npub == self.npub {
459            self.inbound_endpoint_tx
460                .send(NodeEndpointEvent::Data {
461                    source_node_addr: self.node_addr,
462                    source_npub: Some(self.npub.clone()),
463                    payload: data,
464                    queued_at: crate::perf_profile::stamp(),
465                })
466                .map_err(|_| FipsEndpointError::Closed)?;
467            return Ok(());
468        }
469
470        let remote = self.resolve_peer_identity(&remote_npub)?;
471
472        // Fire-and-forget: caller already drops the result, so skip
473        // the per-packet `oneshot::channel()` allocation entirely.
474        // The node task's `SendOneway` arm runs the same code path as
475        // `Send` but without writing the result into a oneshot.
476        let command_tx = match crate::node::endpoint_command_lane_for_payload(&data) {
477            EndpointCommandLane::Priority => &self.endpoint_priority_commands,
478            EndpointCommandLane::Bulk => &self.endpoint_commands,
479        };
480        command_tx
481            .send(NodeEndpointCommand::SendOneway {
482                remote,
483                payload: data,
484                queued_at: crate::perf_profile::stamp(),
485            })
486            .await
487            .map_err(|_| FipsEndpointError::Closed)?;
488        Ok(())
489    }
490
491    fn resolve_peer_identity(&self, remote_npub: &str) -> Result<PeerIdentity, FipsEndpointError> {
492        // Fast path: cached identity (PeerIdentity is Copy after eager
493        // pubkey_full precompute landed in b1e92af, so dereference is free).
494        if let Ok(cache) = self.peer_identity_cache.lock()
495            && let Some(remote) = cache.get(remote_npub)
496        {
497            return Ok(*remote);
498        }
499
500        let remote = PeerIdentity::from_npub(remote_npub).map_err(|error| {
501            FipsEndpointError::InvalidRemoteNpub {
502                npub: remote_npub.to_string(),
503                reason: error.to_string(),
504            }
505        })?;
506
507        if let Ok(mut cache) = self.peer_identity_cache.lock() {
508            cache.entry(remote_npub.to_string()).or_insert(remote);
509        }
510        Ok(remote)
511    }
512
513    /// Receive the next source-attributed endpoint data message.
514    ///
515    /// Translation from the internal `NodeEndpointEvent::Data` shape to
516    /// the public `FipsEndpointMessage` shape happens inline here — the
517    /// rx_loop pushes directly onto this channel, no relay task in
518    /// between, no extra cross-task hop per packet.
519    pub async fn recv(&self) -> Option<FipsEndpointMessage> {
520        let event = self.inbound_endpoint_rx.lock().await.recv().await?;
521        let NodeEndpointEvent::Data {
522            source_node_addr,
523            source_npub,
524            payload,
525            queued_at,
526        } = event;
527        crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
528        Some(FipsEndpointMessage {
529            source_node_addr,
530            source_npub,
531            data: payload,
532        })
533    }
534
535    /// Synchronous blocking send — parks the calling **OS thread** on
536    /// the FIPS endpoint command channel until the runtime accepts
537    /// the send. MUST be called only from a thread spawned via
538    /// `std::thread::spawn`, not from inside a tokio runtime.
539    ///
540    /// Companion to [`Self::blocking_recv`] for control-frame replies
541    /// (e.g. responding to a Ping with a Pong) issued from the
542    /// dedicated TUN-write thread. Failures are returned via
543    /// `FipsEndpointError::Closed` if the runtime has stopped.
544    pub fn blocking_send(
545        &self,
546        remote_npub: impl Into<String>,
547        data: impl Into<Vec<u8>>,
548    ) -> Result<(), FipsEndpointError> {
549        let remote_npub = remote_npub.into();
550        let data = data.into();
551        if remote_npub == self.npub {
552            self.inbound_endpoint_tx
553                .send(NodeEndpointEvent::Data {
554                    source_node_addr: self.node_addr,
555                    source_npub: Some(self.npub.clone()),
556                    payload: data,
557                    queued_at: crate::perf_profile::stamp(),
558                })
559                .map_err(|_| FipsEndpointError::Closed)?;
560            return Ok(());
561        }
562        let remote = self.resolve_peer_identity(&remote_npub)?;
563        let (response_tx, _response_rx) = oneshot::channel();
564        self.endpoint_priority_commands
565            .blocking_send(NodeEndpointCommand::Send {
566                remote,
567                payload: data,
568                queued_at: crate::perf_profile::stamp(),
569                response_tx,
570            })
571            .map_err(|_| FipsEndpointError::Closed)?;
572        Ok(())
573    }
574
575    /// Synchronous blocking receive — parks the calling **OS thread**
576    /// on the channel until an event arrives or the channel closes.
577    ///
578    /// MUST NOT be called from inside a tokio runtime; use this only
579    /// from a thread spawned via `std::thread::spawn` so the tokio
580    /// scheduler doesn't deadlock.
581    ///
582    /// The motivation is the bench's CLI receive task: when run as a
583    /// regular tokio task each `recv().await` is a full task-wake on
584    /// the runtime (~1–3 µs scheduler bookkeeping), and at 113 kpps
585    /// that's ~10–30% of one core spent in plumbing the wake-up
586    /// rather than writing the packet to TUN. A dedicated OS thread
587    /// blocked on the channel via `blocking_recv` parks on a futex
588    /// directly — the wake is a single futex_wake() with no scheduler
589    /// involvement, an order of magnitude cheaper.
590    pub fn blocking_recv(&self) -> Option<FipsEndpointMessage> {
591        let mut rx = self.inbound_endpoint_rx.blocking_lock();
592        let event = rx.blocking_recv()?;
593        let NodeEndpointEvent::Data {
594            source_node_addr,
595            source_npub,
596            payload,
597            queued_at,
598        } = event;
599        crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
600        Some(FipsEndpointMessage {
601            source_node_addr,
602            source_npub,
603            data: payload,
604        })
605    }
606
607    /// Non-blocking receive — returns the next ready endpoint message
608    /// if one is queued, otherwise `None`. Pair with `recv()` to drain
609    /// follow-on packets without paying a scheduler wake per packet:
610    ///
611    /// ```ignore
612    /// // wake on the first packet, then drain everything ready
613    /// while let Some(msg) = endpoint.recv().await { process(msg); }
614    /// while let Some(msg) = endpoint.try_recv() { process(msg); }
615    /// ```
616    ///
617    /// On the bench's FIPS-tunnel receive path the kernel UDP socket
618    /// delivers packets in `recvmmsg`-sized bursts, so after a `.recv()`
619    /// await there are typically 5–30 packets queued waiting. Draining
620    /// them inline with `try_recv` saves N-1 scheduler hops per burst
621    /// at line rate, freeing the consumer task to spend its time on
622    /// the TUN write syscall instead of cross-task plumbing.
623    ///
624    /// Returns `None` if the channel is empty, closed, or briefly
625    /// contested by another consumer.
626    pub fn try_recv(&self) -> Option<FipsEndpointMessage> {
627        let mut rx = self.inbound_endpoint_rx.try_lock().ok()?;
628        let event = rx.try_recv().ok()?;
629        let NodeEndpointEvent::Data {
630            source_node_addr,
631            source_npub,
632            payload,
633            queued_at,
634        } = event;
635        crate::perf_profile::record_since(crate::perf_profile::Stage::EndpointEventWait, queued_at);
636        Some(FipsEndpointMessage {
637            source_node_addr,
638            source_npub,
639            data: payload,
640        })
641    }
642
643    /// Replace the runtime peer list. Newly added auto-connect peers get
644    /// dialed immediately using every known address (overlay-fresh first,
645    /// then operator/cache hints). Removed peers are dropped from the
646    /// retry queue but stay connected if they currently are — the regular
647    /// liveness timeout reaps idle sessions. Existing entries get their
648    /// `addresses` field refreshed so the next retry sees the latest hints.
649    ///
650    /// Pass an empty `addresses` vector for a peer if you want fips to
651    /// resolve them entirely from the Nostr advert at dial time.
652    pub async fn update_peers(
653        &self,
654        peers: Vec<crate::config::PeerConfig>,
655    ) -> Result<UpdatePeersOutcome, FipsEndpointError> {
656        let (response_tx, response_rx) = oneshot::channel();
657        self.endpoint_priority_commands
658            .send(NodeEndpointCommand::UpdatePeers { peers, response_tx })
659            .await
660            .map_err(|_| FipsEndpointError::Closed)?;
661
662        match response_rx.await.map_err(|_| FipsEndpointError::Closed)? {
663            Ok(outcome) => Ok(UpdatePeersOutcome::from(outcome)),
664            Err(error) => Err(FipsEndpointError::Node(error)),
665        }
666    }
667
668    /// Snapshot authenticated peers known by the endpoint.
669    pub async fn peers(&self) -> Result<Vec<FipsEndpointPeer>, FipsEndpointError> {
670        let (response_tx, response_rx) = oneshot::channel();
671        self.endpoint_priority_commands
672            .send(NodeEndpointCommand::PeerSnapshot { response_tx })
673            .await
674            .map_err(|_| FipsEndpointError::Closed)?;
675
676        response_rx
677            .await
678            .map(|peers| peers.into_iter().map(FipsEndpointPeer::from).collect())
679            .map_err(|_| FipsEndpointError::Closed)
680    }
681
682    /// Snapshot live Nostr relay states used by the embedded endpoint.
683    pub async fn relay_statuses(&self) -> Result<Vec<FipsEndpointRelayStatus>, FipsEndpointError> {
684        let (response_tx, response_rx) = oneshot::channel();
685        self.endpoint_priority_commands
686            .send(NodeEndpointCommand::RelaySnapshot { response_tx })
687            .await
688            .map_err(|_| FipsEndpointError::Closed)?;
689
690        response_rx
691            .await
692            .map(|relays| {
693                relays
694                    .into_iter()
695                    .map(FipsEndpointRelayStatus::from)
696                    .collect()
697            })
698            .map_err(|_| FipsEndpointError::Closed)
699    }
700
701    /// Replace Nostr discovery relays without rebuilding the endpoint.
702    pub async fn update_relays(
703        &self,
704        advert_relays: Vec<String>,
705        dm_relays: Vec<String>,
706    ) -> Result<(), FipsEndpointError> {
707        let (response_tx, response_rx) = oneshot::channel();
708        self.endpoint_priority_commands
709            .send(NodeEndpointCommand::UpdateRelays {
710                advert_relays,
711                dm_relays,
712                response_tx,
713            })
714            .await
715            .map_err(|_| FipsEndpointError::Closed)?;
716
717        response_rx
718            .await
719            .map_err(|_| FipsEndpointError::Closed)?
720            .map_err(FipsEndpointError::Node)
721    }
722
723    /// Send an outbound IPv6 packet into the FIPS session pipeline.
724    pub async fn send_ip_packet(
725        &self,
726        packet: impl Into<Vec<u8>>,
727    ) -> Result<(), FipsEndpointError> {
728        self.outbound_packets
729            .send(packet.into())
730            .await
731            .map_err(|_| FipsEndpointError::Closed)
732    }
733
734    /// Receive the next source-attributed IPv6 packet delivered by FIPS.
735    pub async fn recv_ip_packet(&self) -> Option<NodeDeliveredPacket> {
736        self.delivered_packets.lock().await.recv().await
737    }
738
739    /// Shut down the endpoint and wait for the node task to stop.
740    pub async fn shutdown(mut self) -> Result<(), FipsEndpointError> {
741        if let Some(shutdown_tx) = self.shutdown_tx.take() {
742            let _ = shutdown_tx.send(());
743        }
744        self.task.await??;
745        Ok(())
746    }
747}
748
749impl From<NodeEndpointPeer> for FipsEndpointPeer {
750    fn from(peer: NodeEndpointPeer) -> Self {
751        Self {
752            npub: peer.npub,
753            connected: peer.connected,
754            transport_addr: peer.transport_addr,
755            transport_type: peer.transport_type,
756            link_id: peer.link_id,
757            srtt_ms: peer.srtt_ms,
758            packets_sent: peer.packets_sent,
759            packets_recv: peer.packets_recv,
760            bytes_sent: peer.bytes_sent,
761            bytes_recv: peer.bytes_recv,
762            direct_probe_pending: peer.direct_probe_pending,
763            direct_probe_after_ms: peer.direct_probe_after_ms,
764        }
765    }
766}
767
768impl From<NodeEndpointRelayStatus> for FipsEndpointRelayStatus {
769    fn from(relay: NodeEndpointRelayStatus) -> Self {
770        Self {
771            url: relay.url,
772            status: relay.status,
773        }
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use std::time::Duration;
781
782    #[tokio::test]
783    async fn endpoint_starts_without_system_tun() {
784        let endpoint = FipsEndpoint::builder()
785            .without_system_tun()
786            .bind()
787            .await
788            .expect("endpoint should bind");
789
790        assert!(!endpoint.npub().is_empty());
791        assert!(endpoint.discovery_scope().is_none());
792        endpoint.shutdown().await.expect("shutdown should succeed");
793    }
794
795    #[tokio::test]
796    async fn loopback_endpoint_data_roundtrips() {
797        let endpoint = FipsEndpoint::builder()
798            .without_system_tun()
799            .bind()
800            .await
801            .expect("endpoint should bind");
802
803        endpoint
804            .send(endpoint.npub().to_string(), b"ping".to_vec())
805            .await
806            .expect("loopback send should succeed");
807        let message = tokio::time::timeout(Duration::from_secs(1), endpoint.recv())
808            .await
809            .expect("recv should not time out")
810            .expect("message should arrive");
811        assert_eq!(message.source_node_addr, *endpoint.node_addr());
812        assert_eq!(message.source_npub, Some(endpoint.npub().to_string()));
813        assert_eq!(message.data, b"ping");
814        assert!(endpoint.discovery_scope().is_none());
815
816        endpoint.shutdown().await.expect("shutdown should succeed");
817    }
818
819    #[test]
820    fn discovery_scope_enables_default_scoped_udp_discovery() {
821        let config = FipsEndpoint::builder()
822            .discovery_scope("nostr-vpn:test")
823            .prepared_config();
824
825        assert!(!config.tun.enabled);
826        assert!(!config.dns.enabled);
827        assert!(!config.node.system_files_enabled);
828        assert!(config.node.discovery.nostr.enabled);
829        assert!(config.node.discovery.nostr.advertise);
830        assert_eq!(
831            config.node.discovery.nostr.policy,
832            NostrDiscoveryPolicy::Open
833        );
834        assert!(config.node.discovery.nostr.share_local_candidates);
835        assert_eq!(config.node.discovery.nostr.app, "nostr-vpn:test");
836        assert_eq!(
837            config.node.discovery.lan.scope.as_deref(),
838            Some("nostr-vpn:test")
839        );
840        assert!(config.node.discovery.local.enabled);
841
842        let udp = match config.transports.udp {
843            TransportInstances::Single(udp) => udp,
844            TransportInstances::Named(_) => panic!("expected a default UDP transport"),
845        };
846        assert_eq!(udp.bind_addr(), "0.0.0.0:0");
847        assert!(udp.advertise_on_nostr());
848        assert!(!udp.is_public());
849        assert!(!udp.outbound_only());
850        assert!(udp.accept_connections());
851    }
852
853    #[test]
854    fn local_ethernet_adds_scoped_discovery_transport() {
855        let config = FipsEndpoint::builder()
856            .discovery_scope("iris-chat:host")
857            .local_ethernet("fips-app0")
858            .prepared_config();
859
860        assert!(config.node.discovery.nostr.enabled);
861        assert_eq!(
862            config.node.discovery.lan.scope.as_deref(),
863            Some("iris-chat:host")
864        );
865
866        let eth = match config.transports.ethernet {
867            TransportInstances::Single(eth) => eth,
868            TransportInstances::Named(_) => panic!("expected a single Ethernet transport"),
869        };
870        assert_eq!(eth.interface, "fips-app0");
871        assert!(eth.discovery());
872        assert!(eth.announce());
873        assert!(eth.auto_connect());
874        assert!(eth.accept_connections());
875        assert_eq!(eth.discovery_scope(), Some("iris-chat:host"));
876    }
877
878    #[test]
879    fn local_ethernet_preserves_existing_ethernet_config() {
880        let mut explicit = Config::new();
881        explicit.transports.ethernet = TransportInstances::Single(EthernetConfig {
882            interface: "br-existing".to_string(),
883            announce: Some(false),
884            ..EthernetConfig::default()
885        });
886
887        let config = FipsEndpoint::builder()
888            .config(explicit)
889            .local_ethernet("fips-app0")
890            .prepared_config();
891
892        let TransportInstances::Named(map) = config.transports.ethernet else {
893            panic!("expected named Ethernet transports");
894        };
895        assert!(map.contains_key("default"));
896        let local = map
897            .get("local-ethernet-fips-app0")
898            .expect("local endpoint Ethernet transport");
899        assert_eq!(local.interface, "fips-app0");
900        assert!(local.announce());
901        assert!(local.auto_connect());
902        assert!(local.accept_connections());
903    }
904
905    #[test]
906    fn discovery_scope_preserves_explicit_connectivity_config() {
907        let mut explicit = Config::new();
908        explicit.node.discovery.nostr.enabled = true;
909        explicit.node.discovery.nostr.app = "custom-app".to_string();
910        explicit.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
911        explicit.node.discovery.nostr.share_local_candidates = false;
912        explicit.transports.udp = TransportInstances::Single(UdpConfig {
913            bind_addr: Some("127.0.0.1:34567".to_string()),
914            advertise_on_nostr: Some(false),
915            outbound_only: Some(true),
916            ..UdpConfig::default()
917        });
918
919        let config = FipsEndpoint::builder()
920            .config(explicit)
921            .discovery_scope("nostr-vpn:test")
922            .prepared_config();
923
924        assert_eq!(config.node.discovery.nostr.app, "custom-app");
925        assert_eq!(
926            config.node.discovery.nostr.policy,
927            NostrDiscoveryPolicy::ConfiguredOnly
928        );
929        assert!(!config.node.discovery.nostr.share_local_candidates);
930        assert_eq!(
931            config.node.discovery.lan.scope.as_deref(),
932            Some("nostr-vpn:test")
933        );
934        assert!(config.node.discovery.local.enabled);
935        let udp = match config.transports.udp {
936            TransportInstances::Single(udp) => udp,
937            TransportInstances::Named(_) => panic!("expected explicit UDP transport"),
938        };
939        assert_eq!(udp.bind_addr.as_deref(), Some("127.0.0.1:34567"));
940        assert_eq!(udp.bind_addr(), "0.0.0.0:0");
941        assert!(!udp.advertise_on_nostr());
942        assert!(udp.outbound_only());
943    }
944
945    #[tokio::test]
946    async fn invalid_remote_npub_is_rejected() {
947        let endpoint = FipsEndpoint::builder()
948            .without_system_tun()
949            .bind()
950            .await
951            .expect("endpoint should bind");
952
953        let error = endpoint
954            .send("not-an-npub", b"hello".to_vec())
955            .await
956            .expect_err("invalid npub should fail");
957        assert!(matches!(error, FipsEndpointError::InvalidRemoteNpub { .. }));
958
959        endpoint.shutdown().await.expect("shutdown should succeed");
960    }
961
962    #[tokio::test]
963    async fn endpoint_peer_snapshot_starts_empty() {
964        let endpoint = FipsEndpoint::builder()
965            .without_system_tun()
966            .bind()
967            .await
968            .expect("endpoint should bind");
969
970        let peers = endpoint.peers().await.expect("peer snapshot");
971        assert!(peers.is_empty());
972
973        endpoint.shutdown().await.expect("shutdown should succeed");
974    }
975}