Skip to main content

freenet/node/
mod.rs

1//! The main node data type which encapsulates all the behaviour for maintaining a connection
2//! and performing operations within the network.
3//!
4//! This module contains the primary event loop (`NodeP2P::run_node`) that orchestrates
5//! interactions between different components like the network, operations, contracts, and clients.
6//! It receives events and dispatches actions via channels.
7//!
8//! # Implementations
9//! Node comes with different underlying implementations that can be used upon construction.
10//! Those implementations are:
11//! - libp2p: all the connection is handled by libp2p.
12//! - in-memory: a simplifying node used for emulation purposes mainly.
13//! - inter-process: similar to in-memory, but can be rana cross multiple processes, closer to the real p2p impl
14//!
15//! The main node data structure and execution loop.
16//! See [`../../architecture.md`](../../architecture.md) for a high-level overview of the node's role and the event loop interactions.
17
18use anyhow::Context;
19use either::Either;
20use freenet_stdlib::{
21    client_api::{ClientRequest, ErrorKind},
22    prelude::ContractInstanceId,
23};
24use std::{
25    borrow::Cow,
26    fmt::Display,
27    fs::File,
28    hash::Hash,
29    io::Read,
30    net::{IpAddr, SocketAddr, ToSocketAddrs},
31    sync::Arc,
32    time::Duration,
33};
34use std::{collections::HashSet, convert::Infallible};
35
36use self::p2p_impl::NodeP2P;
37use crate::{
38    client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
39    config::{Address, GatewayConfig, GlobalRng, WebsocketApiConfig},
40    contract::{Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler},
41    local_node::Executor,
42    message::{InnerMessage, NetMessage, NodeEvent, Transaction, TransactionType},
43    operations::{
44        connect::{self, ConnectOp},
45        get, put, subscribe, update, OpEnum, OpError, OpOutcome,
46    },
47    ring::{Location, PeerKeyLocation},
48    router::{RouteEvent, RouteOutcome},
49    tracing::{EventRegister, NetEventLog, NetEventRegister},
50};
51use crate::{
52    config::Config,
53    message::{MessageStats, NetMessageV1},
54};
55use freenet_stdlib::client_api::DelegateRequest;
56use serde::{Deserialize, Serialize};
57use tracing::Instrument;
58
59use crate::operations::handle_op_request;
60pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};
61// Re-export types for dev_tool and testing
62pub use network_bridge::{reset_channel_id_counter, EventLoopExitReason, NetworkStats};
63
64use crate::topology::rate::Rate;
65use crate::transport::{TransportKeypair, TransportPublicKey};
66pub(crate) use op_state_manager::{OpManager, OpNotAvailable};
67
68mod network_bridge;
69
70// Re-export fault injection types for test infrastructure.
71// No cfg gate: underlying items are unconditionally compiled and integration
72// tests compile the lib without cfg(test).
73pub use network_bridge::in_memory::{get_fault_injector, set_fault_injector, FaultInjectorState};
74pub(crate) mod background_task_monitor;
75pub(crate) mod network_status;
76mod op_state_manager;
77mod p2p_impl;
78pub(crate) mod proximity_cache;
79mod request_router;
80pub(crate) mod testing_impl;
81
82pub use request_router::{DeduplicatedRequest, RequestRouter};
83
84/// Handle to trigger graceful shutdown of the node.
85#[derive(Clone)]
86pub struct ShutdownHandle {
87    tx: tokio::sync::mpsc::Sender<NodeEvent>,
88}
89
90impl ShutdownHandle {
91    /// Trigger a graceful shutdown of the node.
92    ///
93    /// This will:
94    /// 1. Close all peer connections gracefully
95    /// 2. Stop accepting new connections
96    /// 3. Exit the event loop
97    pub async fn shutdown(&self) {
98        if let Err(err) = self
99            .tx
100            .send(NodeEvent::Disconnect {
101                cause: Some("graceful shutdown".into()),
102            })
103            .await
104        {
105            tracing::debug!(
106                error = %err,
107                "failed to send graceful shutdown signal; shutdown channel may already be closed"
108            );
109        }
110    }
111}
112
113pub struct Node {
114    inner: NodeP2P,
115    shutdown_handle: ShutdownHandle,
116}
117
118impl Node {
119    pub fn update_location(&mut self, location: Location) {
120        self.inner
121            .op_manager
122            .ring
123            .connection_manager
124            .update_location(Some(location));
125    }
126
127    /// Get a handle that can be used to trigger graceful shutdown.
128    pub fn shutdown_handle(&self) -> ShutdownHandle {
129        self.shutdown_handle.clone()
130    }
131
132    pub async fn run(self) -> anyhow::Result<Infallible> {
133        self.inner.run_node().await
134    }
135}
136
137/// When instancing a node you can either join an existing network or bootstrap a new network with a listener
138/// which will act as the initial provider. This initial peer will be listening at the provided port and assigned IP.
139/// If those are not free the instancing process will return an error.
140///
141/// In order to bootstrap a new network the following arguments are required to be provided to the builder:
142/// - ip: IP associated to the initial node.
143/// - port: listening port of the initial node.
144///
145/// If both are provided but also additional peers are added via the [`Self::add_gateway()`] method, this node will
146/// be listening but also try to connect to an existing peer.
147#[derive(Serialize, Deserialize, Clone, Debug)]
148#[non_exhaustive] // avoid directly instantiating this struct
149pub struct NodeConfig {
150    /// Determines if an initial connection should be attempted.
151    /// Only true for an initial gateway/node. If false, the gateway will be disconnected unless other peers connect through it.
152    pub should_connect: bool,
153    pub is_gateway: bool,
154    /// If not specified, a key is generated and used when creating the node.
155    pub key_pair: TransportKeypair,
156    // optional local info, in case this is an initial bootstrap node
157    /// IP to bind to the network listener.
158    pub network_listener_ip: IpAddr,
159    /// socket port to bind to the network listener.
160    pub network_listener_port: u16,
161    /// Our own external socket address, if known (set for gateways, learned for peers).
162    pub(crate) own_addr: Option<SocketAddr>,
163    pub(crate) config: Arc<Config>,
164    /// At least one gateway is required for joining the network.
165    /// Not necessary if this is an initial node.
166    pub(crate) gateways: Vec<InitPeerNode>,
167    /// the location of this node, used for gateways.
168    pub(crate) location: Option<Location>,
169    pub(crate) max_hops_to_live: Option<usize>,
170    pub(crate) rnd_if_htl_above: Option<usize>,
171    pub(crate) max_number_conn: Option<usize>,
172    pub(crate) min_number_conn: Option<usize>,
173    pub(crate) max_upstream_bandwidth: Option<Rate>,
174    pub(crate) max_downstream_bandwidth: Option<Rate>,
175    pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
176    pub(crate) transient_budget: usize,
177    pub(crate) transient_ttl: Duration,
178    /// Minimum ring connections before this peer advertises readiness
179    /// to accept non-CONNECT operations. `None` or `Some(0)` disables the gate.
180    /// Default: `Some(3)` in production.
181    #[serde(default)]
182    pub(crate) relay_ready_connections: Option<usize>,
183}
184
185impl NodeConfig {
186    pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
187        tracing::info!("Loading node configuration for mode {}", config.mode);
188
189        // Get our own public key to filter out self-connections
190        let own_pub_key = config.transport_keypair().public();
191
192        let mut gateways = Vec::with_capacity(config.gateways.len());
193        for gw in &config.gateways {
194            let GatewayConfig {
195                address,
196                public_key_path,
197                location,
198            } = gw;
199
200            // Wait for the public key file to be in X25519 hex format.
201            // The gateway may still be initializing and converting from RSA PEM.
202            let mut key_bytes = None;
203            for attempt in 0..10 {
204                let mut key_file = File::open(public_key_path).with_context(|| {
205                    format!("failed loading gateway pubkey from {public_key_path:?}")
206                })?;
207                let mut buf = String::new();
208                key_file.read_to_string(&mut buf)?;
209                let buf = buf.trim();
210
211                // Check for legacy RSA PEM format - gateway may still be initializing
212                if buf.starts_with("-----BEGIN") {
213                    if attempt < 9 {
214                        tracing::debug!(
215                            public_key_path = ?public_key_path,
216                            attempt = attempt + 1,
217                            "Gateway public key is still RSA PEM format, waiting for X25519 conversion..."
218                        );
219                        tokio::time::sleep(std::time::Duration::from_millis(500)).await;
220                        continue;
221                    } else {
222                        tracing::warn!(
223                            public_key_path = ?public_key_path,
224                            "Gateway public key still in RSA PEM format after 5s. Skipping this gateway."
225                        );
226                        break;
227                    }
228                }
229
230                match hex::decode(buf) {
231                    Ok(bytes) if bytes.len() == 32 => {
232                        key_bytes = Some(bytes);
233                        break;
234                    }
235                    Ok(bytes) => {
236                        anyhow::bail!(
237                            "invalid gateway pubkey length {} (expected 32) from {public_key_path:?}",
238                            bytes.len()
239                        );
240                    }
241                    Err(e) => {
242                        anyhow::bail!(
243                            "failed to decode gateway pubkey hex from {public_key_path:?}: {e}"
244                        );
245                    }
246                }
247            }
248
249            let key_bytes = match key_bytes {
250                Some(bytes) => bytes,
251                None => continue, // Skip this gateway
252            };
253            let mut key_arr = [0u8; 32];
254            key_arr.copy_from_slice(&key_bytes);
255            let transport_pub_key = TransportPublicKey::from_bytes(key_arr);
256
257            // Skip if this gateway's public key matches our own
258            if &transport_pub_key == own_pub_key {
259                tracing::warn!(
260                    "Skipping gateway with same public key as self: {:?}",
261                    public_key_path
262                );
263                continue;
264            }
265
266            let address = Self::parse_socket_addr(address).await?;
267            let peer_key_location = PeerKeyLocation::new(transport_pub_key, address);
268            let location = location
269                .map(Location::new)
270                .unwrap_or_else(|| Location::from_address(&address));
271            gateways.push(InitPeerNode::new(peer_key_location, location));
272        }
273        tracing::info!(
274            "Node will be listening at {}:{} internal address",
275            config.network_api.address,
276            config.network_api.port
277        );
278        if let Some(own_addr) = &config.peer_id {
279            tracing::info!("Node external address: {}", own_addr.addr);
280        }
281        Ok(NodeConfig {
282            should_connect: true,
283            is_gateway: config.is_gateway,
284            key_pair: config.transport_keypair().clone(),
285            gateways,
286            own_addr: config.peer_id.clone().map(|p| p.addr),
287            network_listener_ip: config.network_api.address,
288            network_listener_port: config.network_api.port,
289            location: config.location.map(Location::new),
290            config: Arc::new(config.clone()),
291            max_hops_to_live: None,
292            rnd_if_htl_above: None,
293            max_number_conn: Some(config.network_api.max_connections),
294            min_number_conn: Some(config.network_api.min_connections),
295            max_upstream_bandwidth: None,
296            max_downstream_bandwidth: None,
297            blocked_addresses: config.network_api.blocked_addresses.clone(),
298            transient_budget: config.network_api.transient_budget,
299            transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
300            relay_ready_connections: if config.network_api.skip_load_from_network {
301                Some(0) // Local/test networks: disable relay gate
302            } else {
303                Some(3) // Production: require 3 relay-ready upstream peers
304            },
305        })
306    }
307
308    pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
309        let (hostname, port) = match address {
310            crate::config::Address::Hostname(hostname) => {
311                match hostname.rsplit_once(':') {
312                    None => {
313                        // no port found, use default
314                        let hostname_with_port =
315                            format!("{}:{}", hostname, crate::config::default_network_api_port());
316
317                        if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
318                            if let Some(addr) = addrs.next() {
319                                return Ok(addr);
320                            }
321                        }
322
323                        (Cow::Borrowed(hostname.as_str()), None)
324                    }
325                    Some((host, port)) => match port.parse::<u16>() {
326                        Ok(port) => {
327                            if let Ok(mut addrs) = hostname.to_socket_addrs() {
328                                if let Some(addr) = addrs.next() {
329                                    return Ok(addr);
330                                }
331                            }
332
333                            (Cow::Borrowed(host), Some(port))
334                        }
335                        Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
336                    },
337                }
338            }
339            Address::HostAddress(addr) => return Ok(*addr),
340        };
341
342        let (conf, opts) = hickory_resolver::system_conf::read_system_conf()?;
343        let resolver = hickory_resolver::TokioAsyncResolver::new(
344            conf,
345            opts,
346            hickory_resolver::name_server::GenericConnector::new(
347                hickory_resolver::name_server::TokioRuntimeProvider::new(),
348            ),
349        );
350
351        // only issue one query with .
352        let hostname = if hostname.ends_with('.') {
353            hostname
354        } else {
355            Cow::Owned(format!("{hostname}."))
356        };
357
358        let ips = resolver.lookup_ip(hostname.as_ref()).await?;
359        match ips.into_iter().next() {
360            Some(ip) => Ok(SocketAddr::new(
361                ip,
362                port.unwrap_or_else(crate::config::default_network_api_port),
363            )),
364            None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
365        }
366    }
367
368    pub fn config(&self) -> &Config {
369        &self.config
370    }
371
372    pub fn is_gateway(&mut self) -> &mut Self {
373        self.is_gateway = true;
374        self
375    }
376
377    pub fn first_gateway(&mut self) {
378        self.should_connect = false;
379    }
380
381    pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
382        self.should_connect = should_connect;
383        self
384    }
385
386    pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
387        self.max_hops_to_live = Some(num_hops);
388        self
389    }
390
391    pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
392        self.rnd_if_htl_above = Some(num_hops);
393        self
394    }
395
396    pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
397        self.max_number_conn = Some(num);
398        self
399    }
400
401    pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
402        self.min_number_conn = Some(num);
403        self
404    }
405
406    pub fn relay_ready_connections(&mut self, num: Option<usize>) -> &mut Self {
407        self.relay_ready_connections = num;
408        self
409    }
410
411    pub fn with_own_addr(&mut self, addr: SocketAddr) -> &mut Self {
412        self.own_addr = Some(addr);
413        self
414    }
415
416    pub fn with_location(&mut self, loc: Location) -> &mut Self {
417        self.location = Some(loc);
418        self
419    }
420
421    /// Connection info for an already existing peer. Required in case this is not a gateway node.
422    pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
423        self.gateways.push(peer);
424        self
425    }
426
427    /// Builds a node using the default backend connection manager.
428    pub async fn build<const CLIENTS: usize>(
429        self,
430        clients: [BoxedClient; CLIENTS],
431    ) -> anyhow::Result<Node> {
432        let (node, _flush_handle) = self.build_with_flush_handle(clients).await?;
433        Ok(node)
434    }
435
436    /// Builds a node and returns flush handle for event aggregation (for testing).
437    pub async fn build_with_flush_handle<const CLIENTS: usize>(
438        self,
439        clients: [BoxedClient; CLIENTS],
440    ) -> anyhow::Result<(Node, crate::tracing::EventFlushHandle)> {
441        let (event_register, flush_handle) = {
442            use super::tracing::{DynamicRegister, TelemetryReporter};
443
444            let event_reg = EventRegister::new(self.config.event_log());
445            let flush_handle = event_reg.flush_handle();
446
447            let mut registers: Vec<Box<dyn NetEventRegister>> = vec![Box::new(event_reg)];
448
449            // Add OpenTelemetry register if feature enabled
450            #[cfg(feature = "trace-ot")]
451            {
452                use super::tracing::OTEventRegister;
453                registers.push(Box::new(OTEventRegister::new()));
454            }
455
456            // Add telemetry reporter if enabled in config
457            if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) {
458                registers.push(Box::new(telemetry));
459            }
460
461            (DynamicRegister::new(registers), flush_handle)
462        };
463        let cfg = self.config.clone();
464        let (node_inner, shutdown_tx) = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
465            self,
466            clients,
467            event_register,
468            cfg,
469        )
470        .await?;
471        let shutdown_handle = ShutdownHandle { tx: shutdown_tx };
472        Ok((
473            Node {
474                inner: node_inner,
475                shutdown_handle,
476            },
477            flush_handle,
478        ))
479    }
480
481    pub fn get_own_addr(&self) -> Option<SocketAddr> {
482        self.own_addr
483    }
484
485    /// Returns all specified gateways for this peer. Returns an error if the peer is not a gateway
486    /// and no gateways are specified.
487    fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
488        let gateways: Vec<PeerKeyLocation> = self
489            .gateways
490            .iter()
491            .map(|node| node.peer_key_location.clone())
492            .collect();
493
494        if !self.is_gateway && gateways.is_empty() {
495            anyhow::bail!(
496            "At least one remote gateway is required to join an existing network for non-gateway nodes."
497        )
498        } else {
499            Ok(gateways)
500        }
501    }
502}
503
504/// Gateway node to use for joining the network.
505#[derive(Clone, Serialize, Deserialize, Debug)]
506pub struct InitPeerNode {
507    peer_key_location: PeerKeyLocation,
508    location: Location,
509}
510
511impl InitPeerNode {
512    pub fn new(peer_key_location: PeerKeyLocation, location: Location) -> Self {
513        Self {
514            peer_key_location,
515            location,
516        }
517    }
518}
519
520async fn report_result(
521    tx: Option<Transaction>,
522    op_result: Result<Option<OpEnum>, OpError>,
523    op_manager: &OpManager,
524    executor_callback: Option<ExecutorToEventLoopChannel<Callback>>,
525    event_listener: &mut dyn NetEventRegister,
526) {
527    // Add UPDATE-specific debug logging at the start
528    if let Some(tx_id) = tx {
529        if matches!(tx_id.transaction_type(), TransactionType::Update) {
530            tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
531        }
532    }
533
534    match op_result {
535        Ok(Some(op_res)) => {
536            // Log specifically for UPDATE operations
537            if let crate::operations::OpEnum::Update(ref update_op) = op_res {
538                tracing::debug!(
539                    "UPDATE operation {} completed, finalized: {}",
540                    update_op.id,
541                    update_op.finalized()
542                );
543            }
544
545            // Send to result router (skip for sub-operations and subscription renewals)
546            if let Some(transaction) = tx {
547                // Sub-operations (e.g., Subscribe spawned by PUT) don't notify clients directly;
548                // the parent operation handles the client response.
549                if op_manager.is_sub_operation(transaction) {
550                    tracing::debug!(
551                        tx = %transaction,
552                        "Skipping client notification for sub-operation"
553                    );
554                } else if op_res.is_subscription_renewal() {
555                    // Subscription renewals are node-internal operations spawned by the
556                    // renewal manager (ring/mod.rs). No client registered a transaction
557                    // for these, so sending to the session actor would just produce
558                    // "registered transactions: 0" noise. See #2891.
559                    tracing::debug!(
560                        tx = %transaction,
561                        "Skipping client notification for subscription renewal"
562                    );
563                } else {
564                    let host_result = op_res.to_host_result();
565                    // Await result delivery to ensure the client receives the response
566                    // before the operation is considered complete. This prevents timeout
567                    // issues where the operation completes but the response hasn't been
568                    // delivered through the channel chain yet.
569                    op_manager
570                        .send_client_result(transaction, host_result)
571                        .await;
572                }
573            }
574
575            // check operations.rs:handle_op_result to see what's the meaning of each state
576            // in case more cases want to be handled when feeding information to the OpManager
577
578            // Record operation result for dashboard stats
579            {
580                use crate::node::network_status;
581                let (op_type, success) =
582                    classify_op_outcome(op_res.id().transaction_type(), op_res.outcome());
583                if let Some(op_type) = op_type {
584                    network_status::record_op_result(op_type, success);
585                }
586            }
587
588            let route_event = match op_res.outcome() {
589                OpOutcome::ContractOpSuccess {
590                    target_peer,
591                    contract_location,
592                    first_response_time,
593                    payload_size,
594                    payload_transfer_time,
595                } => Some(RouteEvent {
596                    peer: target_peer.clone(),
597                    contract_location,
598                    outcome: RouteOutcome::Success {
599                        time_to_response_start: first_response_time,
600                        payload_size,
601                        payload_transfer_time,
602                    },
603                }),
604                OpOutcome::ContractOpSuccessUntimed {
605                    target_peer,
606                    contract_location,
607                } => Some(RouteEvent {
608                    peer: target_peer.clone(),
609                    contract_location,
610                    outcome: RouteOutcome::SuccessUntimed,
611                }),
612                OpOutcome::ContractOpFailure {
613                    target_peer,
614                    contract_location,
615                } => Some(RouteEvent {
616                    peer: target_peer.clone(),
617                    contract_location,
618                    outcome: RouteOutcome::Failure,
619                }),
620                OpOutcome::Incomplete | OpOutcome::Irrelevant => None,
621            };
622            if let Some(event) = route_event {
623                if let Some(log_event) =
624                    NetEventLog::route_event(op_res.id(), &op_manager.ring, &event)
625                {
626                    event_listener
627                        .register_events(Either::Left(log_event))
628                        .await;
629                }
630                op_manager.ring.routing_finished(event);
631            }
632            if let Some(mut cb) = executor_callback {
633                cb.response(op_res).await;
634            }
635        }
636        Ok(None) => {
637            tracing::debug!(?tx, "No operation result found");
638        }
639        Err(err) => {
640            // Mark operation as completed and notify waiting clients of the error
641            if let Some(tx) = tx {
642                // Sub-operations (e.g., Subscribe spawned by PUT) have no client
643                // registered — sending errors for them would pollute the
644                // SessionActor's pending_results cache.
645                if !op_manager.is_sub_operation(tx) {
646                    let client_error = freenet_stdlib::client_api::ClientError::from(
647                        freenet_stdlib::client_api::ErrorKind::OperationError {
648                            cause: err.to_string().into(),
649                        },
650                    );
651                    op_manager.send_client_result(tx, Err(client_error)).await;
652                }
653
654                op_manager.completed(tx);
655            }
656            #[cfg(any(debug_assertions, test))]
657            {
658                use std::io::Write;
659                #[cfg(debug_assertions)]
660                let OpError::InvalidStateTransition { tx, state, trace } = err
661                else {
662                    tracing::error!("Finished transaction with error: {err}");
663                    return;
664                };
665                #[cfg(not(debug_assertions))]
666                let OpError::InvalidStateTransition { tx } = err
667                else {
668                    tracing::error!("Finished transaction with error: {err}");
669                    return;
670                };
671                // todo: this can be improved once std::backtrace::Backtrace::frames is stabilized
672                #[cfg(debug_assertions)]
673                let trace = format!("{trace}");
674                #[cfg(debug_assertions)]
675                {
676                    let mut tr_lines = trace.lines();
677                    let trace = tr_lines
678                        .nth(2)
679                        .map(|second_trace| {
680                            let second_trace_lines =
681                                [second_trace, tr_lines.next().unwrap_or_default()];
682                            second_trace_lines.join("\n")
683                        })
684                        .unwrap_or_default();
685                    let peer = op_manager.ring.connection_manager.own_location();
686                    let log = format!(
687                        "Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
688                    );
689                    std::io::stderr().write_all(log.as_bytes()).unwrap();
690                }
691                #[cfg(not(debug_assertions))]
692                {
693                    let peer = op_manager.ring.connection_manager.own_location();
694                    let log = format!("Transaction ({tx} @ {peer}) error\n");
695                    std::io::stderr().write_all(log.as_bytes()).unwrap();
696                }
697            }
698            #[cfg(not(any(debug_assertions, test)))]
699            {
700                tracing::debug!("Finished transaction with error: {err}");
701            }
702        }
703    }
704}
705
706/// Process a network message and deliver results to clients via the canonical
707/// path: report_result → send_client_result → ResultRouter → SessionActor.
708pub(crate) async fn process_message_decoupled<CB>(
709    msg: NetMessage,
710    source_addr: Option<std::net::SocketAddr>,
711    op_manager: Arc<OpManager>,
712    conn_manager: CB,
713    mut event_listener: Box<dyn NetEventRegister>,
714    executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
715    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
716) where
717    CB: NetworkBridge,
718{
719    let tx = *msg.id();
720
721    let op_result = handle_pure_network_message(
722        msg,
723        source_addr,
724        op_manager.clone(),
725        conn_manager,
726        event_listener.as_mut(),
727        pending_op_result,
728    )
729    .await;
730
731    // Report result and deliver to clients via the single canonical path:
732    // report_result → send_client_result → ResultRouter → SessionActor
733    report_result(
734        Some(tx),
735        op_result,
736        &op_manager,
737        executor_callback,
738        &mut *event_listener,
739    )
740    .await;
741}
742
743/// Pure network message handling (no client concerns)
744#[allow(clippy::too_many_arguments)]
745async fn handle_pure_network_message<CB>(
746    msg: NetMessage,
747    source_addr: Option<std::net::SocketAddr>,
748    op_manager: Arc<OpManager>,
749    conn_manager: CB,
750    event_listener: &mut dyn NetEventRegister,
751    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
752) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
753where
754    CB: NetworkBridge,
755{
756    match msg {
757        NetMessage::V1(msg_v1) => {
758            handle_pure_network_message_v1(
759                msg_v1,
760                source_addr,
761                op_manager,
762                conn_manager,
763                event_listener,
764                pending_op_result,
765            )
766            .await
767        }
768    }
769}
770
771/// Returns the exponential backoff delay for the given retry attempt.
772///
773/// Starts at 5ms and doubles each attempt, capped at 1000ms.
774fn op_retry_backoff(attempt: usize) -> Duration {
775    Duration::from_millis((5u64 << attempt.min(8)).min(1_000))
776}
777
778/// Pure network message processing for V1 messages (no client concerns)
779#[allow(clippy::too_many_arguments)]
780async fn handle_pure_network_message_v1<CB>(
781    msg: NetMessageV1,
782    source_addr: Option<std::net::SocketAddr>,
783    op_manager: Arc<OpManager>,
784    mut conn_manager: CB,
785    event_listener: &mut dyn NetEventRegister,
786    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
787) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
788where
789    CB: NetworkBridge,
790{
791    // Register network events (pure network concern)
792    event_listener
793        .register_events(NetEventLog::from_inbound_msg_v1(
794            &msg,
795            &op_manager,
796            source_addr,
797        ))
798        .await;
799
800    const MAX_RETRIES: usize = 15usize;
801    for i in 0..MAX_RETRIES {
802        let tx = Some(*msg.id());
803        tracing::debug!(?tx, "Processing pure network operation, iteration: {i}");
804
805        match msg {
806            NetMessageV1::Connect(ref op) => {
807                let parent_span = tracing::Span::current();
808                let span = tracing::info_span!(
809                    parent: parent_span,
810                    "handle_connect_op_request",
811                    transaction = %msg.id(),
812                    tx_type = %msg.id().transaction_type()
813                );
814                let op_result = handle_op_request::<ConnectOp, _>(
815                    &op_manager,
816                    &mut conn_manager,
817                    op,
818                    source_addr,
819                )
820                .instrument(span)
821                .await;
822
823                if let Err(OpError::OpNotAvailable(state)) = &op_result {
824                    match state {
825                        OpNotAvailable::Running => {
826                            let delay = op_retry_backoff(i);
827                            tracing::debug!(
828                                delay_ms = delay.as_millis() as u64,
829                                attempt = i,
830                                "Pure network: Operation still running, backing off"
831                            );
832                            tokio::time::sleep(delay).await;
833                            continue;
834                        }
835                        OpNotAvailable::Completed => {
836                            tracing::debug!(
837                                tx = %msg.id(),
838                                tx_type = ?msg.id().transaction_type(),
839                                "Pure network: Operation already completed"
840                            );
841                            return Ok(None);
842                        }
843                    }
844                }
845
846                return handle_pure_network_result(
847                    tx,
848                    op_result,
849                    &op_manager,
850                    &mut *event_listener,
851                )
852                .await;
853            }
854            NetMessageV1::Put(ref op) => {
855                tracing::debug!(
856                    tx = %op.id(),
857                    "handle_pure_network_message_v1: Processing PUT message"
858                );
859                let op_result = handle_op_request::<put::PutOp, _>(
860                    &op_manager,
861                    &mut conn_manager,
862                    op,
863                    source_addr,
864                )
865                .await;
866                tracing::debug!(
867                    tx = %op.id(),
868                    op_result_ok = op_result.is_ok(),
869                    "handle_pure_network_message_v1: PUT handle_op_request completed"
870                );
871
872                // Handle pending operation results (network concern)
873                if is_operation_completed(&op_result) {
874                    if let Some(ref op_execution_callback) = pending_op_result {
875                        let tx_id = *op.id();
876                        if let Err(err) = op_execution_callback
877                            .send(NetMessage::V1(NetMessageV1::Put((*op).clone())))
878                            .await
879                        {
880                            tracing::error!(%err, %tx_id, "Failed to send message to executor");
881                        }
882                    }
883                }
884
885                if let Err(OpError::OpNotAvailable(state)) = &op_result {
886                    match state {
887                        OpNotAvailable::Running => {
888                            let delay = op_retry_backoff(i);
889                            tracing::debug!(
890                                delay_ms = delay.as_millis() as u64,
891                                attempt = i,
892                                "Pure network: Operation still running, backing off"
893                            );
894                            tokio::time::sleep(delay).await;
895                            continue;
896                        }
897                        OpNotAvailable::Completed => {
898                            tracing::debug!("Pure network: Operation already completed");
899                            return Ok(None);
900                        }
901                    }
902                }
903
904                return handle_pure_network_result(
905                    tx,
906                    op_result,
907                    &op_manager,
908                    &mut *event_listener,
909                )
910                .await;
911            }
912            NetMessageV1::Get(ref op) => {
913                let op_result = handle_op_request::<get::GetOp, _>(
914                    &op_manager,
915                    &mut conn_manager,
916                    op,
917                    source_addr,
918                )
919                .await;
920
921                // Handle pending operation results (network concern)
922                if is_operation_completed(&op_result) {
923                    if let Some(ref op_execution_callback) = pending_op_result {
924                        let tx_id = *op.id();
925                        if let Err(err) = op_execution_callback
926                            .send(NetMessage::V1(NetMessageV1::Get((*op).clone())))
927                            .await
928                        {
929                            tracing::error!(%err, %tx_id, "Failed to send message to executor");
930                        }
931                    }
932                }
933
934                if let Err(OpError::OpNotAvailable(state)) = &op_result {
935                    match state {
936                        OpNotAvailable::Running => {
937                            let delay = op_retry_backoff(i);
938                            tracing::debug!(
939                                delay_ms = delay.as_millis() as u64,
940                                attempt = i,
941                                "Pure network: Operation still running, backing off"
942                            );
943                            tokio::time::sleep(delay).await;
944                            continue;
945                        }
946                        OpNotAvailable::Completed => {
947                            tracing::debug!("Pure network: Operation already completed");
948                            return Ok(None);
949                        }
950                    }
951                }
952
953                return handle_pure_network_result(
954                    tx,
955                    op_result,
956                    &op_manager,
957                    &mut *event_listener,
958                )
959                .await;
960            }
961            NetMessageV1::Update(ref op) => {
962                let op_result = handle_op_request::<update::UpdateOp, _>(
963                    &op_manager,
964                    &mut conn_manager,
965                    op,
966                    source_addr,
967                )
968                .await;
969
970                if let Err(OpError::OpNotAvailable(state)) = &op_result {
971                    match state {
972                        OpNotAvailable::Running => {
973                            let delay = op_retry_backoff(i);
974                            tracing::debug!(
975                                delay_ms = delay.as_millis() as u64,
976                                attempt = i,
977                                "Pure network: Operation still running, backing off"
978                            );
979                            tokio::time::sleep(delay).await;
980                            continue;
981                        }
982                        OpNotAvailable::Completed => {
983                            tracing::debug!("Pure network: Operation already completed");
984                            return Ok(None);
985                        }
986                    }
987                }
988
989                return handle_pure_network_result(
990                    tx,
991                    op_result,
992                    &op_manager,
993                    &mut *event_listener,
994                )
995                .await;
996            }
997            NetMessageV1::Subscribe(ref op) => {
998                let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
999                    &op_manager,
1000                    &mut conn_manager,
1001                    op,
1002                    source_addr,
1003                )
1004                .await;
1005
1006                if let Err(OpError::OpNotAvailable(state)) = &op_result {
1007                    match state {
1008                        OpNotAvailable::Running => {
1009                            let delay = op_retry_backoff(i);
1010                            tracing::debug!(
1011                                delay_ms = delay.as_millis() as u64,
1012                                attempt = i,
1013                                "Pure network: Operation still running, backing off"
1014                            );
1015                            tokio::time::sleep(delay).await;
1016                            continue;
1017                        }
1018                        OpNotAvailable::Completed => {
1019                            tracing::debug!("Pure network: Operation already completed");
1020                            return Ok(None);
1021                        }
1022                    }
1023                }
1024
1025                return handle_pure_network_result(
1026                    tx,
1027                    op_result,
1028                    &op_manager,
1029                    &mut *event_listener,
1030                )
1031                .await;
1032            }
1033            // Non-transactional message types: process once and return immediately.
1034            // These must NOT fall through to the post-loop "Dropping message" warning,
1035            // which is only meant for operation retry exhaustion.
1036            NetMessageV1::ProximityCache { ref message } => {
1037                let Some(source) = source_addr else {
1038                    tracing::warn!(
1039                        "Received ProximityCache message without source address (pure network)"
1040                    );
1041                    return Ok(None);
1042                };
1043                tracing::debug!(
1044                    from = %source,
1045                    "Processing ProximityCache message (pure network)"
1046                );
1047
1048                // Note: In the simplified architecture (2026-01 refactor), we no longer
1049                // attempt to establish subscriptions based on CacheAnnounce messages.
1050                // Update propagation uses the proximity cache directly, and subscriptions
1051                // are lease-based with automatic expiry.
1052
1053                // Resolve source SocketAddr to TransportPublicKey for proximity cache
1054                let source_pub_key = op_manager
1055                    .ring
1056                    .connection_manager
1057                    .get_peer_by_addr(source)
1058                    .map(|pkl| pkl.pub_key().clone());
1059                let Some(source_pub_key) = source_pub_key else {
1060                    tracing::debug!(
1061                        %source,
1062                        "ProximityCache: could not resolve source addr to pub_key, skipping"
1063                    );
1064                    return Ok(None);
1065                };
1066                let result = op_manager
1067                    .proximity_cache
1068                    .handle_message(&source_pub_key, message.clone());
1069                if let Some(response) = result.response {
1070                    // Send response back to sender
1071                    let response_msg =
1072                        NetMessage::V1(NetMessageV1::ProximityCache { message: response });
1073                    if let Err(err) = conn_manager.send(source, response_msg).await {
1074                        tracing::error!(%err, %source, "Failed to send ProximityCache response");
1075                    }
1076                }
1077                // Proactive state sync: broadcast our state for shared contracts
1078                // so the neighbor gets current state if they're stale after restart.
1079                // Only sync contracts we're actively interested in (receiving updates
1080                // or have downstream subscribers) — skip cached-only contracts.
1081                for instance_id in result.overlapping_contracts {
1082                    if let Some((key, state)) =
1083                        get_contract_state_by_id(&op_manager, &instance_id).await
1084                    {
1085                        if !op_manager.ring.is_receiving_updates(&key)
1086                            && !op_manager.ring.has_downstream_subscribers(&key)
1087                        {
1088                            continue;
1089                        }
1090                        tracing::debug!(
1091                            contract = %key,
1092                            peer = %source_pub_key,
1093                            "Proximity cache overlap — broadcasting state to ensure neighbor is current"
1094                        );
1095                        if let Err(e) = op_manager
1096                            .notify_node_event(NodeEvent::BroadcastStateChange {
1097                                key,
1098                                new_state: state,
1099                            })
1100                            .await
1101                        {
1102                            tracing::warn!(
1103                                contract = %instance_id,
1104                                error = %e,
1105                                "Failed to emit BroadcastStateChange for proximity sync"
1106                            );
1107                        }
1108                    }
1109                }
1110                return Ok(None);
1111            }
1112            NetMessageV1::InterestSync { ref message } => {
1113                let Some(source) = source_addr else {
1114                    tracing::warn!("Received InterestSync message without source address");
1115                    return Ok(None);
1116                };
1117                tracing::debug!(
1118                    from = %source,
1119                    "Processing InterestSync message"
1120                );
1121
1122                // Handle interest synchronization for delta-based updates
1123                if let Some(response) =
1124                    handle_interest_sync_message(&op_manager, source, message.clone()).await
1125                {
1126                    let response_msg =
1127                        NetMessage::V1(NetMessageV1::InterestSync { message: response });
1128                    if let Err(err) = conn_manager.send(source, response_msg).await {
1129                        tracing::error!(%err, %source, "Failed to send InterestSync response");
1130                    }
1131                }
1132                return Ok(None);
1133            }
1134            NetMessageV1::ReadyState { ready } => {
1135                let Some(source) = source_addr else {
1136                    tracing::warn!("Received ReadyState message without source address");
1137                    return Ok(None);
1138                };
1139                if ready {
1140                    op_manager.ring.connection_manager.mark_peer_ready(source);
1141                } else {
1142                    op_manager
1143                        .ring
1144                        .connection_manager
1145                        .mark_peer_not_ready(source);
1146                }
1147                tracing::debug!(
1148                    from = %source,
1149                    ready,
1150                    "Processed ReadyState from peer"
1151                );
1152                return Ok(None);
1153            }
1154            NetMessageV1::Aborted(tx) => {
1155                tracing::debug!(
1156                    %tx,
1157                    tx_type = ?tx.transaction_type(),
1158                    "Received Aborted message, delegating to handle_aborted_op"
1159                );
1160                // Empty gateways: Aborted messages arrive over p2p connections, not
1161                // from the gateway join path. The gateways list is only used by
1162                // Connect retries; other operation types ignore it entirely.
1163                if let Err(err) = handle_aborted_op(tx, &op_manager, &[]).await {
1164                    if !matches!(err, OpError::StatePushed) {
1165                        tracing::error!(
1166                            %tx,
1167                            error = %err,
1168                            "Error handling aborted operation"
1169                        );
1170                    }
1171                }
1172                return Ok(None);
1173            }
1174        }
1175    }
1176
1177    // If we reach here, retries were exhausted waiting for a concurrent operation to finish
1178    tracing::warn!(
1179        tx = %msg.id(),
1180        tx_type = ?msg.id().transaction_type(),
1181        "Dropping message after {MAX_RETRIES} retry attempts (operation busy)"
1182    );
1183    Ok(None)
1184}
1185
1186/// Pure network result handling - no client notification logic
1187async fn handle_pure_network_result(
1188    tx: Option<Transaction>,
1189    op_result: Result<Option<crate::operations::OpEnum>, OpError>,
1190    _op_manager: &Arc<OpManager>,
1191    _event_listener: &mut dyn NetEventRegister,
1192) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError> {
1193    tracing::debug!("Pure network result handling for transaction: {:?}", tx);
1194
1195    match &op_result {
1196        Ok(Some(_op_res)) => {
1197            // Log network operation completion
1198            tracing::debug!(
1199                "Network operation completed successfully for transaction: {:?}",
1200                tx
1201            );
1202
1203            // Register completion event (pure network concern)
1204            if let Some(tx_id) = tx {
1205                // TODO: Register completion event properly
1206                tracing::debug!("Network operation completed for transaction: {}", tx_id);
1207            }
1208
1209            // TODO: Handle executor callbacks (network concern)
1210        }
1211        Ok(None) => {
1212            tracing::debug!("Network operation returned no result");
1213        }
1214        Err(OpError::StatePushed) => {
1215            return Ok(None);
1216        }
1217        Err(OpError::OpNotPresent(tx_id)) => {
1218            // OpNotPresent means a response arrived for an operation that no longer exists.
1219            // This is benign - it happens when:
1220            // 1. An operation timed out before the response arrived
1221            // 2. A late response arrives after a peer restart
1222            // 3. The operation was already completed via another path
1223            //
1224            // We log at debug level and return Ok(None) to avoid propagating
1225            // confusing "op not present" errors to clients.
1226            tracing::debug!(
1227                tx = %tx_id,
1228                "Network response arrived for non-existent operation (likely timed out or already completed)"
1229            );
1230            return Ok(None);
1231        }
1232        Err(e) => {
1233            tracing::error!("Network operation failed: {}", e);
1234            // TODO: Register error event properly
1235            if let Some(tx_id) = tx {
1236                tracing::debug!(
1237                    "Network operation failed for transaction: {} with error: {}",
1238                    tx_id,
1239                    e
1240                );
1241            }
1242        }
1243    }
1244
1245    op_result
1246}
1247
1248/// Handle incoming InterestSync messages for delta-based state synchronization.
1249///
1250/// This function processes the interest exchange protocol:
1251/// - `Interests`: Connection-time discovery of shared contract interests
1252/// - `Summaries`: State summaries for shared contracts
1253/// - `ChangeInterests`: Incremental interest changes
1254/// - `ResyncRequest`: Request full state when delta application fails
1255async fn handle_interest_sync_message(
1256    op_manager: &Arc<OpManager>,
1257    source: std::net::SocketAddr,
1258    message: crate::message::InterestMessage,
1259) -> Option<crate::message::InterestMessage> {
1260    use crate::message::{InterestMessage, NodeEvent, SummaryEntry};
1261    use crate::ring::interest::contract_hash;
1262
1263    match message {
1264        InterestMessage::Interests { hashes } => {
1265            tracing::debug!(
1266                from = %source,
1267                hash_count = hashes.len(),
1268                "Received Interests message"
1269            );
1270
1271            let peer_key = get_peer_key_from_addr(op_manager, source);
1272
1273            // Full-replace semantics: the incoming hashes represent the peer's
1274            // complete interest set. Remove entries for contracts whose hash is
1275            // NOT in the incoming set, then register/refresh the rest.
1276            if let Some(ref pk) = peer_key {
1277                let incoming_hashes: std::collections::HashSet<u32> =
1278                    hashes.iter().copied().collect();
1279                let current_contracts = op_manager.interest_manager.get_contracts_for_peer(pk);
1280
1281                // Hash collisions (FNV-1a u32) can cause a stale entry to
1282                // survive if its hash collides with a live one. This is the
1283                // safe direction — false negatives on removal, not false
1284                // positives — and extremely rare in practice.
1285                let mut removed = 0usize;
1286                for contract in &current_contracts {
1287                    let h = contract_hash(contract);
1288                    if !incoming_hashes.contains(&h) {
1289                        op_manager
1290                            .interest_manager
1291                            .remove_peer_interest(contract, pk);
1292                        removed += 1;
1293                    }
1294                }
1295                if removed > 0 {
1296                    tracing::debug!(
1297                        from = %source,
1298                        removed,
1299                        "Full-replace: removed stale interest entries"
1300                    );
1301                }
1302            }
1303
1304            // Find contracts we share interest in
1305            let matching = op_manager.interest_manager.get_matching_contracts(&hashes);
1306
1307            // Build summaries for shared contracts and register/refresh peer interest
1308            let mut entries = Vec::with_capacity(matching.len());
1309            for contract in matching {
1310                let hash = contract_hash(&contract);
1311                let summary = get_contract_summary(op_manager, &contract).await;
1312                entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
1313
1314                if let Some(ref pk) = peer_key {
1315                    // Refresh TTL for existing entries (preserves cached summary).
1316                    // Only register new interest if this is a genuinely new entry;
1317                    // otherwise register_peer_interest would overwrite the cached
1318                    // summary with None, defeating delta optimization.
1319                    if op_manager
1320                        .interest_manager
1321                        .get_peer_interest(&contract, pk)
1322                        .is_some()
1323                    {
1324                        op_manager
1325                            .interest_manager
1326                            .refresh_peer_interest(&contract, pk);
1327                    } else {
1328                        op_manager.interest_manager.register_peer_interest(
1329                            &contract,
1330                            pk.clone(),
1331                            None, // New entry; summary arrives in their Summaries response
1332                            false,
1333                        );
1334                    }
1335                }
1336            }
1337
1338            if entries.is_empty() {
1339                None
1340            } else {
1341                Some(InterestMessage::Summaries { entries })
1342            }
1343        }
1344
1345        InterestMessage::Summaries { entries } => {
1346            tracing::debug!(
1347                from = %source,
1348                entry_count = entries.len(),
1349                "Received Summaries message"
1350            );
1351
1352            // Update peer summaries and detect stale peers (#3221).
1353            //
1354            // Compare each peer summary with our own before storing it. If they
1355            // differ, the peer missed an earlier broadcast. Emitting
1356            // BroadcastStateChange triggers the existing broadcast path which
1357            // computes deltas and sends state only to peers with stale summaries.
1358            //
1359            // Both sides may detect the same mismatch (A sees B is stale, B sees
1360            // A is stale). This is safe: the contract's merge semantics (CRDTs
1361            // etc.) ensure the newer/correct state wins regardless of push order.
1362            //
1363            // When either summary is None, we skip the comparison. A peer with
1364            // no summary has no state yet and should receive it via the normal
1365            // subscription/GET flow, not via broadcast.
1366            let peer_key = get_peer_key_from_addr(op_manager, source);
1367            let mut stale_contracts = Vec::new();
1368
1369            if let Some(pk) = peer_key {
1370                for entry in entries {
1371                    for contract in op_manager.interest_manager.lookup_by_hash(entry.hash) {
1372                        if !op_manager.interest_manager.has_local_interest(&contract) {
1373                            continue;
1374                        }
1375
1376                        let their_summary = entry.to_summary();
1377                        let our_summary = get_contract_summary(op_manager, &contract).await;
1378
1379                        let is_stale = our_summary
1380                            .as_ref()
1381                            .zip(their_summary.as_ref())
1382                            .is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
1383
1384                        op_manager.interest_manager.update_peer_summary(
1385                            &contract,
1386                            &pk,
1387                            their_summary,
1388                        );
1389
1390                        if is_stale && !stale_contracts.contains(&contract) {
1391                            stale_contracts.push(contract);
1392                        }
1393                    }
1394                }
1395            }
1396
1397            // Push current state to stale peers via the existing broadcast path
1398            for contract in stale_contracts {
1399                let Some(state) = get_contract_state(op_manager, &contract).await else {
1400                    tracing::trace!(
1401                        contract = %contract,
1402                        "Skipping stale-peer broadcast — no local state available"
1403                    );
1404                    continue;
1405                };
1406                tracing::info!(
1407                    contract = %contract,
1408                    detected_via = %source,
1409                    "Summary mismatch in interest sync — broadcasting to all stale peers"
1410                );
1411                if let Err(e) = op_manager
1412                    .notify_node_event(NodeEvent::BroadcastStateChange {
1413                        key: contract,
1414                        new_state: state,
1415                    })
1416                    .await
1417                {
1418                    tracing::warn!(
1419                        contract = %contract,
1420                        error = %e,
1421                        "Failed to emit BroadcastStateChange for stale peer correction"
1422                    );
1423                }
1424            }
1425
1426            // No response needed for Summaries
1427            None
1428        }
1429
1430        InterestMessage::ChangeInterests { added, removed } => {
1431            tracing::debug!(
1432                from = %source,
1433                added_count = added.len(),
1434                removed_count = removed.len(),
1435                "Received ChangeInterests message"
1436            );
1437
1438            let peer_key = get_peer_key_from_addr(op_manager, source);
1439
1440            // Handle removals
1441            if let Some(ref pk) = peer_key {
1442                for hash in removed {
1443                    // Handle hash collisions - remove interest from all matching contracts
1444                    for contract in op_manager.interest_manager.lookup_by_hash(hash) {
1445                        op_manager
1446                            .interest_manager
1447                            .remove_peer_interest(&contract, pk);
1448                    }
1449                }
1450            }
1451
1452            // Handle additions - respond with summaries for newly shared contracts
1453            let mut entries = Vec::new();
1454            if let Some(ref pk) = peer_key {
1455                for hash in added {
1456                    // Handle hash collisions - process all matching contracts
1457                    for contract in op_manager.interest_manager.lookup_by_hash(hash) {
1458                        // Only process if we have local interest in this contract
1459                        if !op_manager.interest_manager.has_local_interest(&contract) {
1460                            continue;
1461                        }
1462
1463                        // Register their interest
1464                        op_manager.interest_manager.register_peer_interest(
1465                            &contract,
1466                            pk.clone(),
1467                            None,
1468                            false,
1469                        );
1470
1471                        // Get our summary to send back
1472                        let summary = get_contract_summary(op_manager, &contract).await;
1473                        entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
1474                    }
1475                }
1476            }
1477
1478            if entries.is_empty() {
1479                None
1480            } else {
1481                Some(InterestMessage::Summaries { entries })
1482            }
1483        }
1484
1485        InterestMessage::ResyncRequest { key } => {
1486            tracing::info!(
1487                from = %source,
1488                contract = %key,
1489                event = "resync_request_received",
1490                "Received ResyncRequest - peer needs full state"
1491            );
1492
1493            // Track this for testing - high counts indicate incorrect summary caching (PR #2763)
1494            op_manager.interest_manager.record_resync_request_received();
1495            crate::config::GlobalTestMetrics::record_resync_request();
1496
1497            // Clear cached summary for this peer
1498            let peer_key = get_peer_key_from_addr(op_manager, source);
1499            if let Some(ref pk) = peer_key {
1500                op_manager
1501                    .interest_manager
1502                    .update_peer_summary(&key, pk, None);
1503            }
1504
1505            // Get PeerKeyLocation for telemetry
1506            let from_peer = op_manager.ring.connection_manager.get_peer_by_addr(source);
1507
1508            // Emit telemetry for ResyncRequest received
1509            if let Some(ref from_pkl) = from_peer {
1510                if let Some(event) = crate::tracing::NetEventLog::resync_request_received(
1511                    &op_manager.ring,
1512                    key,
1513                    from_pkl.clone(),
1514                ) {
1515                    op_manager
1516                        .ring
1517                        .register_events(either::Either::Left(event))
1518                        .await;
1519                }
1520            } else {
1521                tracing::debug!(
1522                    contract = %key,
1523                    source = %source,
1524                    "ResyncRequest telemetry skipped: peer lookup failed"
1525                );
1526            }
1527
1528            // Fetch current state from store
1529            let state = get_contract_state(op_manager, &key).await;
1530            let Some(state) = state else {
1531                tracing::warn!(
1532                    contract = %key,
1533                    "ResyncRequest for contract we don't have state for"
1534                );
1535                return None;
1536            };
1537
1538            // Fetch our summary
1539            let summary = get_contract_summary(op_manager, &key).await;
1540            let Some(summary) = summary else {
1541                tracing::warn!(
1542                    contract = %key,
1543                    "ResyncRequest for contract we can't compute summary for"
1544                );
1545                return None;
1546            };
1547
1548            tracing::info!(
1549                to = %source,
1550                contract = %key,
1551                state_size = state.as_ref().len(),
1552                summary_size = summary.as_ref().len(),
1553                event = "resync_response_sent",
1554                "Sending ResyncResponse with full state"
1555            );
1556
1557            // Emit telemetry for ResyncResponse sent
1558            if let Some(ref to_pkl) = from_peer {
1559                if let Some(event) = crate::tracing::NetEventLog::resync_response_sent(
1560                    &op_manager.ring,
1561                    key,
1562                    to_pkl.clone(),
1563                    state.as_ref().len(),
1564                ) {
1565                    op_manager
1566                        .ring
1567                        .register_events(either::Either::Left(event))
1568                        .await;
1569                }
1570            }
1571
1572            Some(InterestMessage::ResyncResponse {
1573                key,
1574                state_bytes: state.as_ref().to_vec(),
1575                summary_bytes: summary.as_ref().to_vec(),
1576            })
1577        }
1578
1579        InterestMessage::ResyncResponse {
1580            key,
1581            state_bytes,
1582            summary_bytes,
1583        } => {
1584            tracing::info!(
1585                from = %source,
1586                contract = %key,
1587                state_size = state_bytes.len(),
1588                event = "resync_response_received",
1589                "Received ResyncResponse with full state"
1590            );
1591
1592            // Apply the full state using an update
1593            let state = freenet_stdlib::prelude::State::from(state_bytes.clone());
1594            let update_data = freenet_stdlib::prelude::UpdateData::State(state);
1595
1596            // Send to contract handler
1597            use crate::contract::ContractHandlerEvent;
1598            match op_manager
1599                .notify_contract_handler(ContractHandlerEvent::UpdateQuery {
1600                    key,
1601                    data: update_data,
1602                    related_contracts: Default::default(),
1603                })
1604                .await
1605            {
1606                Ok(ContractHandlerEvent::UpdateResponse {
1607                    new_value: Ok(_), ..
1608                }) => {
1609                    tracing::info!(
1610                        from = %source,
1611                        contract = %key,
1612                        event = "resync_applied",
1613                        changed = true,
1614                        "ResyncResponse state applied successfully"
1615                    );
1616                }
1617                Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
1618                    tracing::info!(
1619                        from = %source,
1620                        contract = %key,
1621                        event = "resync_applied",
1622                        changed = false,
1623                        "ResyncResponse state unchanged (already had this state)"
1624                    );
1625                }
1626                Ok(other) => {
1627                    tracing::warn!(
1628                        from = %source,
1629                        contract = %key,
1630                        event = "resync_failed",
1631                        response = ?other,
1632                        "Unexpected response to resync update"
1633                    );
1634                }
1635                Err(e) => {
1636                    tracing::error!(
1637                        from = %source,
1638                        contract = %key,
1639                        event = "resync_failed",
1640                        error = %e,
1641                        "Failed to apply resync state"
1642                    );
1643                }
1644            }
1645
1646            // Update the peer's summary in our interest tracker
1647            let peer_key = get_peer_key_from_addr(op_manager, source);
1648            if let Some(pk) = peer_key {
1649                let summary = freenet_stdlib::prelude::StateSummary::from(summary_bytes);
1650                op_manager
1651                    .interest_manager
1652                    .update_peer_summary(&key, &pk, Some(summary));
1653            }
1654
1655            // No response needed
1656            None
1657        }
1658    }
1659}
1660
1661/// Get the contract state from the state store.
1662async fn get_contract_state(
1663    op_manager: &Arc<OpManager>,
1664    key: &freenet_stdlib::prelude::ContractKey,
1665) -> Option<freenet_stdlib::prelude::WrappedState> {
1666    get_contract_state_by_id(op_manager, key.id())
1667        .await
1668        .map(|(_, state)| state)
1669}
1670
1671/// Get the contract state by instance ID, returning both the full `ContractKey` and state.
1672///
1673/// Used for proactive state sync when proximity cache discovers overlapping contracts,
1674/// where we only have a `ContractInstanceId` (not a full `ContractKey`).
1675async fn get_contract_state_by_id(
1676    op_manager: &Arc<OpManager>,
1677    instance_id: &freenet_stdlib::prelude::ContractInstanceId,
1678) -> Option<(
1679    freenet_stdlib::prelude::ContractKey,
1680    freenet_stdlib::prelude::WrappedState,
1681)> {
1682    use crate::contract::ContractHandlerEvent;
1683
1684    match op_manager
1685        .notify_contract_handler(ContractHandlerEvent::GetQuery {
1686            instance_id: *instance_id,
1687            return_contract_code: false,
1688        })
1689        .await
1690    {
1691        Ok(ContractHandlerEvent::GetResponse {
1692            key: Some(key),
1693            response: Ok(store_response),
1694        }) => store_response.state.map(|state| (key, state)),
1695        Ok(ContractHandlerEvent::GetResponse {
1696            response: Err(e), ..
1697        }) => {
1698            tracing::warn!(
1699                contract = %instance_id,
1700                error = %e,
1701                "Failed to get contract state by instance id"
1702            );
1703            None
1704        }
1705        _ => None,
1706    }
1707}
1708
1709/// Get the contract state summary using the contract's summarize_state method.
1710async fn get_contract_summary(
1711    op_manager: &Arc<OpManager>,
1712    key: &freenet_stdlib::prelude::ContractKey,
1713) -> Option<freenet_stdlib::prelude::StateSummary<'static>> {
1714    use crate::contract::ContractHandlerEvent;
1715
1716    match op_manager
1717        .notify_contract_handler(ContractHandlerEvent::GetSummaryQuery { key: *key })
1718        .await
1719    {
1720        Ok(ContractHandlerEvent::GetSummaryResponse {
1721            summary: Ok(summary),
1722            ..
1723        }) => Some(summary),
1724        Ok(ContractHandlerEvent::GetSummaryResponse {
1725            summary: Err(e), ..
1726        }) => {
1727            tracing::warn!(
1728                contract = %key,
1729                error = %e,
1730                "Failed to get contract summary"
1731            );
1732            None
1733        }
1734        _ => None,
1735    }
1736}
1737
1738/// Get the PeerKey for a socket address.
1739fn get_peer_key_from_addr(
1740    op_manager: &Arc<OpManager>,
1741    addr: std::net::SocketAddr,
1742) -> Option<crate::ring::interest::PeerKey> {
1743    op_manager
1744        .ring
1745        .connection_manager
1746        .get_peer_by_addr(addr)
1747        .map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key.clone()))
1748}
1749
1750/// Attempts to subscribe to a contract
1751#[allow(dead_code)]
1752pub async fn subscribe(
1753    op_manager: Arc<OpManager>,
1754    instance_id: ContractInstanceId,
1755    client_id: Option<ClientId>,
1756) -> Result<Transaction, OpError> {
1757    // Client-initiated subscriptions are never renewals
1758    subscribe_with_id(op_manager, instance_id, client_id, None, false).await
1759}
1760
1761/// Attempts to subscribe to a contract with a specific transaction ID (for deduplication)
1762///
1763/// `is_renewal` indicates whether this is a renewal (requester already has the contract).
1764/// If true, the responder will skip sending state to save bandwidth.
1765pub async fn subscribe_with_id(
1766    op_manager: Arc<OpManager>,
1767    instance_id: ContractInstanceId,
1768    client_id: Option<ClientId>,
1769    transaction_id: Option<Transaction>,
1770    is_renewal: bool,
1771) -> Result<Transaction, OpError> {
1772    let op = match transaction_id {
1773        Some(id) => subscribe::start_op_with_id(instance_id, id, is_renewal),
1774        None => subscribe::start_op(instance_id, is_renewal),
1775    };
1776    let id = op.id;
1777    if let Some(client_id) = client_id {
1778        use crate::client_events::RequestId;
1779        // Generate a default RequestId for internal subscription operations
1780        let request_id = RequestId::new();
1781        if let Err(e) = op_manager
1782            .ch_outbound
1783            .waiting_for_subscription_result(id, instance_id, client_id, request_id)
1784            .await
1785        {
1786            tracing::warn!(tx = %id, error = %e, "failed to register subscription result waiter");
1787        }
1788    }
1789    // Initialize a subscribe op.
1790    match subscribe::request_subscribe(&op_manager, op).await {
1791        Err(err) => {
1792            tracing::error!("{}", err);
1793            Err(err)
1794        }
1795        Ok(()) => Ok(id),
1796    }
1797}
1798
1799async fn handle_aborted_op(
1800    tx: Transaction,
1801    op_manager: &OpManager,
1802    gateways: &[PeerKeyLocation],
1803) -> Result<(), OpError> {
1804    use crate::util::IterExt;
1805    match tx.transaction_type() {
1806        TransactionType::Connect => {
1807            // attempt to establish a connection failed, this could be a fatal error since the node
1808            // is useless without connecting to the network, we will retry with exponential backoff
1809            match op_manager.pop(&tx) {
1810                Ok(Some(OpEnum::Connect(op)))
1811                    if op_manager.ring.open_connections()
1812                        < op_manager.ring.connection_manager.min_connections =>
1813                {
1814                    let gateway = op.gateway().cloned();
1815                    if let Some(gateway) = gateway {
1816                        // Clean up phantom location_for_peer entry left by should_accept's
1817                        // record_pending_location (#3088). Without this, the gateway appears
1818                        // permanently connected and initial_join_procedure never retries it.
1819                        if let Some(peer_addr) = gateway.peer_addr.as_known() {
1820                            op_manager
1821                                .ring
1822                                .connection_manager
1823                                .prune_in_transit_connection(*peer_addr);
1824
1825                            let backoff_duration = {
1826                                let mut backoff = op_manager.gateway_backoff.lock();
1827                                backoff.record_failure(*peer_addr);
1828                                backoff.remaining_backoff(*peer_addr)
1829                            };
1830
1831                            if let Some(duration) = backoff_duration {
1832                                // Cap the wait at GATEWAY_BACKOFF_POLL_CAP when the
1833                                // node already has ring connections, matching the
1834                                // policy in initial_join_procedure (issue #3304).
1835                                // ±20% jitter to prevent thundering herd.
1836                                let open_conns = op_manager.ring.open_connections();
1837                                let effective = if open_conns > 0 {
1838                                    let jitter_ms = crate::config::GlobalRng::random_range(
1839                                        0u64..(connect::GATEWAY_BACKOFF_POLL_CAP.as_millis() / 5)
1840                                            as u64,
1841                                    );
1842                                    let cap = connect::GATEWAY_BACKOFF_POLL_CAP.mul_f64(0.8)
1843                                        + Duration::from_millis(jitter_ms);
1844                                    duration.min(cap)
1845                                } else {
1846                                    duration
1847                                };
1848                                tracing::info!(
1849                                    gateway = %gateway,
1850                                    backoff_secs = duration.as_secs(),
1851                                    effective_wait_secs = effective.as_secs(),
1852                                    open_connections = open_conns,
1853                                    "Gateway connection failed, waiting before retry"
1854                                );
1855                                // Use select! so suspend/isolation recovery can
1856                                // wake us immediately via gateway_backoff_cleared,
1857                                // matching the pattern in initial_join_procedure.
1858                                tokio::select! {
1859                                    _ = tokio::time::sleep(effective) => {},
1860                                    _ = op_manager.gateway_backoff_cleared.notified() => {
1861                                        tracing::info!(
1862                                            gateway = %gateway,
1863                                            "Gateway backoff cleared externally, retrying immediately"
1864                                        );
1865                                    },
1866                                }
1867                            }
1868                        }
1869
1870                        tracing::debug!("Retrying connection to gateway {}", gateway);
1871                        connect::join_ring_request(&gateway, op_manager).await?;
1872                    }
1873                }
1874                Ok(Some(OpEnum::Connect(op))) => {
1875                    // Clean up phantom location_for_peer entry (#3088)
1876                    if let Some(peer_addr) = op.get_next_hop_addr() {
1877                        op_manager
1878                            .ring
1879                            .connection_manager
1880                            .prune_in_transit_connection(peer_addr);
1881                    }
1882                    if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
1883                        tracing::warn!("Retrying joining the ring with an other gateway");
1884                        if let Some(gateway) = gateways.iter().shuffle().next() {
1885                            connect::join_ring_request(gateway, op_manager).await?
1886                        }
1887                    }
1888                }
1889                Ok(Some(other)) => {
1890                    op_manager.push(tx, other).await?;
1891                }
1892                _ => {}
1893            }
1894        }
1895        TransactionType::Get => match op_manager.pop(&tx) {
1896            Ok(Some(OpEnum::Get(op))) => {
1897                if let Err(err) = op.handle_abort(op_manager).await {
1898                    if !matches!(err, OpError::StatePushed) {
1899                        return Err(err);
1900                    }
1901                }
1902            }
1903            Ok(Some(other)) => {
1904                op_manager.push(tx, other).await?;
1905            }
1906            _ => {}
1907        },
1908        TransactionType::Subscribe => match op_manager.pop(&tx) {
1909            Ok(Some(OpEnum::Subscribe(op))) => {
1910                if let Err(err) = op.handle_abort(op_manager).await {
1911                    if !matches!(err, OpError::StatePushed) {
1912                        return Err(err);
1913                    }
1914                }
1915            }
1916            Ok(Some(other)) => {
1917                op_manager.push(tx, other).await?;
1918            }
1919            _ => {}
1920        },
1921        TransactionType::Put => match op_manager.pop(&tx) {
1922            Ok(Some(OpEnum::Put(op))) => {
1923                if let Err(err) = op.handle_abort(op_manager).await {
1924                    if !matches!(err, OpError::StatePushed) {
1925                        return Err(err);
1926                    }
1927                }
1928            }
1929            Ok(Some(other)) => {
1930                op_manager.push(tx, other).await?;
1931            }
1932            _ => {}
1933        },
1934        TransactionType::Update => match op_manager.pop(&tx) {
1935            Ok(Some(OpEnum::Update(op))) => {
1936                if let Err(err) = op.handle_abort(op_manager).await {
1937                    if !matches!(err, OpError::StatePushed) {
1938                        return Err(err);
1939                    }
1940                }
1941            }
1942            Ok(Some(other)) => {
1943                op_manager.push(tx, other).await?;
1944            }
1945            _ => {}
1946        },
1947    }
1948    Ok(())
1949}
1950
1951/// The identifier of a peer in the network is composed of its address and public key.
1952///
1953/// A regular peer will have its `PeerId` set when it connects to a gateway as it get's
1954/// its external address from the gateway.
1955///
1956/// A gateway will have its `PeerId` set when it is created since it will know its own address
1957/// from the start.
1958#[derive(Serialize, Deserialize, Eq, Clone)]
1959pub struct PeerId {
1960    pub addr: SocketAddr,
1961    pub pub_key: TransportPublicKey,
1962}
1963
1964impl Hash for PeerId {
1965    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1966        self.addr.hash(state);
1967    }
1968}
1969
1970impl PartialEq<PeerId> for PeerId {
1971    fn eq(&self, other: &PeerId) -> bool {
1972        self.addr == other.addr
1973    }
1974}
1975
1976impl Ord for PeerId {
1977    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1978        self.addr.cmp(&other.addr)
1979    }
1980}
1981
1982impl PartialOrd for PeerId {
1983    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1984        Some(self.cmp(other))
1985    }
1986}
1987
1988impl PeerId {
1989    pub fn new(addr: SocketAddr, pub_key: TransportPublicKey) -> Self {
1990        Self { addr, pub_key }
1991    }
1992}
1993
1994thread_local! {
1995    static PEER_ID: std::cell::RefCell<Option<TransportPublicKey>> = const { std::cell::RefCell::new(None) };
1996}
1997
1998#[cfg(test)]
1999impl<'a> arbitrary::Arbitrary<'a> for PeerId {
2000    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
2001        let addr: ([u8; 4], u16) = u.arbitrary()?;
2002
2003        let pub_key = PEER_ID.with(|peer_id| {
2004            let mut peer_id = peer_id.borrow_mut();
2005            match &*peer_id {
2006                Some(k) => k.clone(),
2007                None => {
2008                    let key = TransportKeypair::new().public().clone();
2009                    peer_id.replace(key.clone());
2010                    key
2011                }
2012            }
2013        });
2014
2015        Ok(Self {
2016            addr: addr.into(),
2017            pub_key,
2018        })
2019    }
2020}
2021
2022impl PeerId {
2023    pub fn random() -> Self {
2024        let mut addr = [0; 4];
2025        GlobalRng::fill_bytes(&mut addr[..]);
2026        // Use random port instead of get_free_port() for speed - tests don't actually bind
2027        let port: u16 = GlobalRng::random_range(1024u16..65535u16);
2028
2029        let pub_key = PEER_ID.with(|peer_id| {
2030            let mut peer_id = peer_id.borrow_mut();
2031            match &*peer_id {
2032                Some(k) => k.clone(),
2033                None => {
2034                    let key = TransportKeypair::new().public().clone();
2035                    peer_id.replace(key.clone());
2036                    key
2037                }
2038            }
2039        });
2040
2041        Self {
2042            addr: (addr, port).into(),
2043            pub_key,
2044        }
2045    }
2046
2047    #[cfg(test)]
2048    pub fn to_bytes(self) -> Vec<u8> {
2049        bincode::serialize(&self).unwrap()
2050    }
2051}
2052
2053impl std::fmt::Debug for PeerId {
2054    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2055        <Self as Display>::fmt(self, f)
2056    }
2057}
2058
2059impl Display for PeerId {
2060    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2061        write!(f, "{:?}", self.pub_key)
2062    }
2063}
2064
2065pub async fn run_local_node(
2066    mut executor: Executor,
2067    socket: WebsocketApiConfig,
2068) -> anyhow::Result<()> {
2069    match socket.address {
2070        IpAddr::V4(ip) if !ip.is_loopback() => {
2071            anyhow::bail!("invalid ip: {ip}, expecting localhost")
2072        }
2073        IpAddr::V6(ip) if !ip.is_loopback() => {
2074            anyhow::bail!("invalid ip: {ip}, expecting localhost")
2075        }
2076        IpAddr::V4(_) | IpAddr::V6(_) => {}
2077    }
2078
2079    let (mut gw, mut ws_proxy) = crate::server::serve_client_api_in(socket).await?;
2080
2081    // TODO: use combinator instead
2082    // let mut all_clients =
2083    //    ClientEventsCombinator::new([Box::new(ws_handle), Box::new(http_handle)]);
2084    enum Receiver {
2085        Ws,
2086        Gw,
2087    }
2088    let mut receiver;
2089    loop {
2090        let req = crate::deterministic_select! {
2091            req = ws_proxy.recv() => {
2092                receiver = Receiver::Ws;
2093                req?
2094            },
2095            req = gw.recv() => {
2096                receiver = Receiver::Gw;
2097                req?
2098            },
2099        };
2100        let OpenRequest {
2101            client_id: id,
2102            request,
2103            notification_channel,
2104            token,
2105            attested_contract,
2106            ..
2107        } = req;
2108        tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
2109
2110        let res = match *request {
2111            ClientRequest::ContractOp(op) => {
2112                executor
2113                    .contract_requests(op, id, notification_channel)
2114                    .await
2115            }
2116            ClientRequest::DelegateOp(op) => {
2117                // Use the attested_contract already resolved by the WebSocket/HTTP client API
2118                // instead of re-looking up from gw.attested_contracts (which could fail
2119                // if the token expired between WebSocket connect and this request)
2120                let op_name = match op {
2121                    DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
2122                    DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
2123                    DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
2124                    _ => "Unknown",
2125                };
2126                tracing::debug!(
2127                    op_name = ?op_name,
2128                    ?attested_contract,
2129                    "Handling ClientRequest::DelegateOp"
2130                );
2131                executor.delegate_request(op, attested_contract.as_ref())
2132            }
2133            ClientRequest::Disconnect { cause } => {
2134                if let Some(cause) = cause {
2135                    tracing::info!("disconnecting cause: {cause}");
2136                }
2137                continue;
2138            }
2139            ClientRequest::Authenticate { .. }
2140            | ClientRequest::NodeQueries(_)
2141            | ClientRequest::Close
2142            | _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
2143        };
2144
2145        match res {
2146            Ok(res) => {
2147                match receiver {
2148                    Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
2149                    Receiver::Gw => gw.send(id, Ok(res)).await?,
2150                };
2151            }
2152            Err(err) if err.is_request() => {
2153                let err = ErrorKind::RequestError(err.unwrap_request());
2154                match receiver {
2155                    Receiver::Ws => {
2156                        ws_proxy.send(id, Err(err.into())).await?;
2157                    }
2158                    Receiver::Gw => {
2159                        gw.send(id, Err(err.into())).await?;
2160                    }
2161                };
2162            }
2163            Err(err) => {
2164                tracing::error!("{err}");
2165                let err = Err(ErrorKind::Unhandled {
2166                    cause: format!("{err}").into(),
2167                }
2168                .into());
2169                match receiver {
2170                    Receiver::Ws => {
2171                        ws_proxy.send(id, err).await?;
2172                    }
2173                    Receiver::Gw => {
2174                        gw.send(id, err).await?;
2175                    }
2176                };
2177            }
2178        }
2179    }
2180}
2181
2182pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
2183    tracing::info!("Starting node");
2184
2185    let is_gateway = node.inner.is_gateway;
2186    let location = if let Some(loc) = node.inner.location {
2187        Some(loc)
2188    } else {
2189        is_gateway
2190            .then(|| {
2191                node.inner
2192                    .peer_id
2193                    .as_ref()
2194                    .map(|id| Location::from_address(&id.addr))
2195            })
2196            .flatten()
2197    };
2198
2199    if let Some(location) = location {
2200        tracing::info!("Setting initial location: {location}");
2201        node.update_location(location);
2202    }
2203
2204    match node.run().await {
2205        Ok(_) => {
2206            if is_gateway {
2207                tracing::info!("Gateway finished");
2208            } else {
2209                tracing::info!("Node finished");
2210            }
2211
2212            Ok(())
2213        }
2214        Err(e) => {
2215            tracing::error!("{e}");
2216            Err(e)
2217        }
2218    }
2219}
2220
2221/// Trait to determine if an operation has completed, regardless of its specific type.
2222pub trait IsOperationCompleted {
2223    /// Returns true if the operation has completed (successfully or with error)
2224    fn is_completed(&self) -> bool;
2225}
2226
2227impl IsOperationCompleted for OpEnum {
2228    fn is_completed(&self) -> bool {
2229        match self {
2230            OpEnum::Connect(op) => op.is_completed(),
2231            OpEnum::Put(op) => op.is_completed(),
2232            OpEnum::Get(op) => op.is_completed(),
2233            OpEnum::Subscribe(op) => op.is_completed(),
2234            OpEnum::Update(op) => op.is_completed(),
2235        }
2236    }
2237}
2238
2239/// Classify a `(TransactionType, OpOutcome)` pair into an optional `OpType` and success flag
2240/// for dashboard recording. Returns `(None, false)` for CONNECT transactions (not contract ops).
2241///
2242/// - `Irrelevant`: operation completed but without routing stats for this peer → success
2243/// - `Incomplete`: operation never finalized → failure
2244fn classify_op_outcome(
2245    tx_type: TransactionType,
2246    outcome: OpOutcome<'_>,
2247) -> (Option<network_status::OpType>, bool) {
2248    use network_status::OpType;
2249    match (tx_type, outcome) {
2250        (
2251            TransactionType::Get,
2252            OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2253        ) => (Some(OpType::Get), true),
2254        (TransactionType::Get, OpOutcome::ContractOpFailure { .. }) => (Some(OpType::Get), false),
2255        (
2256            TransactionType::Put,
2257            OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2258        ) => (Some(OpType::Put), true),
2259        (TransactionType::Put, OpOutcome::ContractOpFailure { .. }) => (Some(OpType::Put), false),
2260        (
2261            TransactionType::Update,
2262            OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2263        ) => (Some(OpType::Update), true),
2264        (TransactionType::Update, OpOutcome::ContractOpFailure { .. }) => {
2265            (Some(OpType::Update), false)
2266        }
2267        (
2268            TransactionType::Subscribe,
2269            OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2270        ) => (Some(OpType::Subscribe), true),
2271        (TransactionType::Subscribe, OpOutcome::ContractOpFailure { .. }) => {
2272            (Some(OpType::Subscribe), false)
2273        }
2274        // Irrelevant = completed successfully but without routing stats
2275        // (e.g., UPDATE when stats.target is None, SUBSCRIBE when stats is None)
2276        (TransactionType::Get, OpOutcome::Irrelevant) => (Some(OpType::Get), true),
2277        (TransactionType::Put, OpOutcome::Irrelevant) => (Some(OpType::Put), true),
2278        (TransactionType::Update, OpOutcome::Irrelevant) => (Some(OpType::Update), true),
2279        (TransactionType::Subscribe, OpOutcome::Irrelevant) => (Some(OpType::Subscribe), true),
2280        // Incomplete = operation never finalized
2281        (TransactionType::Get, OpOutcome::Incomplete) => (Some(OpType::Get), false),
2282        (TransactionType::Put, OpOutcome::Incomplete) => (Some(OpType::Put), false),
2283        (TransactionType::Update, OpOutcome::Incomplete) => (Some(OpType::Update), false),
2284        (TransactionType::Subscribe, OpOutcome::Incomplete) => (Some(OpType::Subscribe), false),
2285        // CONNECT is not a contract operation
2286        _ => (None, false),
2287    }
2288}
2289
2290/// Check if an operation result indicates completion
2291pub fn is_operation_completed(op_result: &Result<Option<OpEnum>, OpError>) -> bool {
2292    match op_result {
2293        // If we got an OpEnum, check its specific completion status using the trait
2294        Ok(Some(op)) => op.is_completed(),
2295        _ => false,
2296    }
2297}
2298
2299#[cfg(test)]
2300mod tests {
2301    use std::net::{Ipv4Addr, Ipv6Addr};
2302
2303    use super::*;
2304    use crate::operations::OpError;
2305    use rstest::rstest;
2306
2307    // Hostname resolution tests
2308    #[tokio::test]
2309    async fn test_hostname_resolution_localhost() {
2310        let addr = Address::Hostname("localhost".to_string());
2311        let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
2312        assert!(
2313            socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
2314                || socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
2315        );
2316        assert!(socket_addr.port() > 1024);
2317    }
2318
2319    #[tokio::test]
2320    async fn test_hostname_resolution_with_port() {
2321        let addr = Address::Hostname("google.com:8080".to_string());
2322        let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
2323        assert_eq!(socket_addr.port(), 8080);
2324    }
2325
2326    #[tokio::test]
2327    async fn test_hostname_resolution_with_trailing_dot() {
2328        // DNS names with trailing dot should be handled
2329        let addr = Address::Hostname("localhost.".to_string());
2330        let result = NodeConfig::parse_socket_addr(&addr).await;
2331        // This should either succeed or fail gracefully
2332        if let Ok(socket_addr) = result {
2333            assert!(
2334                socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
2335                    || socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
2336            );
2337        }
2338    }
2339
2340    #[tokio::test]
2341    async fn test_hostname_resolution_direct_socket_addr() {
2342        let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
2343        let addr = Address::HostAddress(socket);
2344        let resolved = NodeConfig::parse_socket_addr(&addr).await.unwrap();
2345        assert_eq!(resolved, socket);
2346    }
2347
2348    #[tokio::test]
2349    async fn test_hostname_resolution_invalid_port() {
2350        let addr = Address::Hostname("localhost:not_a_port".to_string());
2351        let result = NodeConfig::parse_socket_addr(&addr).await;
2352        assert!(result.is_err());
2353    }
2354
2355    // PeerId equality tests
2356    #[rstest]
2357    #[case::same_addr_different_keys(8080, 8080, true)]
2358    #[case::different_addr_same_key(8080, 8081, false)]
2359    fn test_peer_id_equality(#[case] port1: u16, #[case] port2: u16, #[case] expected_equal: bool) {
2360        let keypair1 = TransportKeypair::new();
2361        let keypair2 = TransportKeypair::new();
2362        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port1);
2363        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port2);
2364
2365        let peer1 = PeerId::new(addr1, keypair1.public().clone());
2366        let peer2 = PeerId::new(addr2, keypair2.public().clone());
2367
2368        assert_eq!(peer1 == peer2, expected_equal);
2369    }
2370
2371    #[rstest]
2372    #[case::lower_port_first(8080, 8081)]
2373    #[case::high_port_diff(1024, 65535)]
2374    fn test_peer_id_ordering(#[case] lower_port: u16, #[case] higher_port: u16) {
2375        let keypair = TransportKeypair::new();
2376        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), lower_port);
2377        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), higher_port);
2378
2379        let peer1 = PeerId::new(addr1, keypair.public().clone());
2380        let peer2 = PeerId::new(addr2, keypair.public().clone());
2381
2382        assert!(peer1 < peer2);
2383        assert!(peer2 > peer1);
2384    }
2385
2386    #[test]
2387    fn test_peer_id_hash_consistency() {
2388        use std::collections::hash_map::DefaultHasher;
2389        use std::hash::{Hash, Hasher};
2390
2391        let keypair1 = TransportKeypair::new();
2392        let keypair2 = TransportKeypair::new();
2393        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
2394
2395        let peer1 = PeerId::new(addr, keypair1.public().clone());
2396        let peer2 = PeerId::new(addr, keypair2.public().clone());
2397
2398        let mut hasher1 = DefaultHasher::new();
2399        let mut hasher2 = DefaultHasher::new();
2400        peer1.hash(&mut hasher1);
2401        peer2.hash(&mut hasher2);
2402
2403        // Same address should produce same hash
2404        assert_eq!(hasher1.finish(), hasher2.finish());
2405    }
2406
2407    #[test]
2408    fn test_peer_id_random_produces_unique() {
2409        let peer1 = PeerId::random();
2410        let peer2 = PeerId::random();
2411
2412        // Random peers should have different addresses (with high probability)
2413        assert_ne!(peer1.addr, peer2.addr);
2414    }
2415
2416    #[test]
2417    fn test_peer_id_serialization() {
2418        let peer = PeerId::random();
2419        let bytes = peer.clone().to_bytes();
2420        assert!(!bytes.is_empty());
2421
2422        // Should be deserializable
2423        let deserialized: PeerId = bincode::deserialize(&bytes).unwrap();
2424        assert_eq!(peer.addr, deserialized.addr);
2425    }
2426
2427    #[test]
2428    fn test_peer_id_display() {
2429        let peer = PeerId::random();
2430        let display = format!("{}", peer);
2431        let debug = format!("{:?}", peer);
2432
2433        // Display and Debug should produce the same output
2434        assert_eq!(display, debug);
2435        // Should not be empty
2436        assert!(!display.is_empty());
2437    }
2438
2439    // InitPeerNode tests
2440    #[test]
2441    fn test_init_peer_node_construction() {
2442        let keypair = TransportKeypair::new();
2443        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
2444        let peer_key_location = PeerKeyLocation::new(keypair.public().clone(), addr);
2445        let location = Location::new(0.5);
2446
2447        let init_peer = InitPeerNode::new(peer_key_location.clone(), location);
2448
2449        assert_eq!(init_peer.peer_key_location, peer_key_location);
2450        assert_eq!(init_peer.location, location);
2451    }
2452
2453    // is_operation_completed tests - parametrized
2454    #[rstest]
2455    #[case::with_none(Ok(None), false)]
2456    #[case::with_running_error(Err(OpError::OpNotAvailable(super::OpNotAvailable::Running)), false)]
2457    #[case::with_state_pushed_error(Err(OpError::StatePushed), false)]
2458    fn test_is_operation_completed(
2459        #[case] result: Result<Option<OpEnum>, OpError>,
2460        #[case] expected: bool,
2461    ) {
2462        assert_eq!(is_operation_completed(&result), expected);
2463    }
2464
2465    // classify_op_outcome tests
2466    mod classify_op_outcome_tests {
2467        use super::super::{classify_op_outcome, network_status::OpType};
2468        use crate::message::TransactionType;
2469        use crate::operations::OpOutcome;
2470
2471        #[test]
2472        fn irrelevant_counted_as_success() {
2473            let (op_type, success) =
2474                classify_op_outcome(TransactionType::Update, OpOutcome::Irrelevant);
2475            assert!(matches!(op_type, Some(OpType::Update)));
2476            assert!(success);
2477        }
2478
2479        #[test]
2480        fn incomplete_counted_as_failure() {
2481            let (op_type, success) =
2482                classify_op_outcome(TransactionType::Get, OpOutcome::Incomplete);
2483            assert!(matches!(op_type, Some(OpType::Get)));
2484            assert!(!success);
2485        }
2486
2487        #[test]
2488        fn connect_skipped() {
2489            let (op_type, _) = classify_op_outcome(TransactionType::Connect, OpOutcome::Irrelevant);
2490            assert!(op_type.is_none());
2491
2492            let (op_type, _) = classify_op_outcome(TransactionType::Connect, OpOutcome::Incomplete);
2493            assert!(op_type.is_none());
2494        }
2495
2496        #[test]
2497        fn subscribe_irrelevant_is_success() {
2498            let (op_type, success) =
2499                classify_op_outcome(TransactionType::Subscribe, OpOutcome::Irrelevant);
2500            assert!(matches!(op_type, Some(OpType::Subscribe)));
2501            assert!(success);
2502        }
2503
2504        #[test]
2505        fn put_incomplete_is_failure() {
2506            let (op_type, success) =
2507                classify_op_outcome(TransactionType::Put, OpOutcome::Incomplete);
2508            assert!(matches!(op_type, Some(OpType::Put)));
2509            assert!(!success);
2510        }
2511    }
2512}