freenet/node/
mod.rs

1//! The main node data type which encapsulates all the behaviour for maintaining a connection
2//! and performing operations within the network.
3//!
4//! This module contains the primary event loop (`NodeP2P::run_node`) that orchestrates
5//! interactions between different components like the network, operations, contracts, and clients.
6//! It receives events and dispatches actions via channels.
7//!
8//! # Implementations
9//! Node comes with different underlying implementations that can be used upon construction.
10//! Those implementations are:
11//! - libp2p: all the connection is handled by libp2p.
12//! - in-memory: a simplifying node used for emulation purposes mainly.
13//! - inter-process: similar to in-memory, but can be rana cross multiple processes, closer to the real p2p impl
14//!
15//! The main node data structure and execution loop.
16//! See [`../../architecture.md`](../../architecture.md) for a high-level overview of the node's role and the event loop interactions.
17
18use anyhow::Context;
19use either::Either;
20use freenet_stdlib::{
21    client_api::{ClientRequest, ErrorKind},
22    prelude::ContractKey,
23};
24use std::{
25    borrow::Cow,
26    fmt::Display,
27    fs::File,
28    hash::Hash,
29    io::Read,
30    net::{IpAddr, SocketAddr, ToSocketAddrs},
31    sync::Arc,
32    time::Duration,
33};
34use std::{collections::HashSet, convert::Infallible};
35
36use self::p2p_impl::NodeP2P;
37use crate::{
38    client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
39    config::{Address, GatewayConfig, WebsocketApiConfig},
40    contract::{
41        Callback, ClientResponsesSender, ExecutorError, ExecutorToEventLoopChannel,
42        NetworkContractHandler, WaitingTransaction,
43    },
44    local_node::Executor,
45    message::{InnerMessage, NetMessage, Transaction, TransactionType},
46    operations::{
47        connect::{self, ConnectOp},
48        get, put, subscribe, update, OpEnum, OpError, OpOutcome,
49    },
50    ring::{Location, PeerKeyLocation},
51    router::{RouteEvent, RouteOutcome},
52    tracing::{EventRegister, NetEventLog, NetEventRegister},
53};
54use crate::{
55    config::Config,
56    message::{MessageStats, NetMessageV1},
57};
58use freenet_stdlib::client_api::DelegateRequest;
59use rsa::pkcs8::DecodePublicKey;
60use serde::{Deserialize, Serialize};
61use tracing::Instrument;
62
63use crate::operations::handle_op_request;
64pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};
65
66use crate::topology::rate::Rate;
67use crate::transport::{TransportKeypair, TransportPublicKey};
68pub(crate) use op_state_manager::{OpManager, OpNotAvailable};
69
70mod message_processor;
71mod network_bridge;
72mod op_state_manager;
73mod p2p_impl;
74mod request_router;
75pub(crate) mod testing_impl;
76
77pub use message_processor::MessageProcessor;
78pub use request_router::{DeduplicatedRequest, RequestRouter};
79
80pub struct Node(NodeP2P);
81
82impl Node {
83    pub fn update_location(&mut self, location: Location) {
84        self.0
85            .op_manager
86            .ring
87            .connection_manager
88            .update_location(Some(location));
89    }
90
91    pub async fn run(self) -> anyhow::Result<Infallible> {
92        self.0.run_node().await
93    }
94}
95
96/// When instancing a node you can either join an existing network or bootstrap a new network with a listener
97/// which will act as the initial provider. This initial peer will be listening at the provided port and assigned IP.
98/// If those are not free the instancing process will return an error.
99///
100/// In order to bootstrap a new network the following arguments are required to be provided to the builder:
101/// - ip: IP associated to the initial node.
102/// - port: listening port of the initial node.
103///
104/// If both are provided but also additional peers are added via the [`Self::add_gateway()`] method, this node will
105/// be listening but also try to connect to an existing peer.
106#[derive(Serialize, Deserialize, Clone, Debug)]
107#[non_exhaustive] // avoid directly instantiating this struct
108pub struct NodeConfig {
109    /// Determines if an initial connection should be attempted.
110    /// Only true for an initial gateway/node. If false, the gateway will be disconnected unless other peers connect through it.
111    pub should_connect: bool,
112    pub is_gateway: bool,
113    /// If not specified, a key is generated and used when creating the node.
114    pub key_pair: TransportKeypair,
115    // optional local info, in case this is an initial bootstrap node
116    /// IP to bind to the network listener.
117    pub network_listener_ip: IpAddr,
118    /// socket port to bind to the network listener.
119    pub network_listener_port: u16,
120    pub(crate) peer_id: Option<PeerId>,
121    pub(crate) config: Arc<Config>,
122    /// At least one gateway is required for joining the network.
123    /// Not necessary if this is an initial node.
124    pub(crate) gateways: Vec<InitPeerNode>,
125    /// the location of this node, used for gateways.
126    pub(crate) location: Option<Location>,
127    pub(crate) max_hops_to_live: Option<usize>,
128    pub(crate) rnd_if_htl_above: Option<usize>,
129    pub(crate) max_number_conn: Option<usize>,
130    pub(crate) min_number_conn: Option<usize>,
131    pub(crate) max_upstream_bandwidth: Option<Rate>,
132    pub(crate) max_downstream_bandwidth: Option<Rate>,
133    pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
134}
135
136impl NodeConfig {
137    pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
138        tracing::info!("Loading node configuration for mode {}", config.mode);
139
140        // Get our own public key to filter out self-connections
141        let own_pub_key = config.transport_keypair().public();
142
143        let mut gateways = Vec::with_capacity(config.gateways.len());
144        for gw in &config.gateways {
145            let GatewayConfig {
146                address,
147                public_key_path,
148                location,
149            } = gw;
150
151            let mut key_file = File::open(public_key_path).with_context(|| {
152                format!("failed loading gateway pubkey from {public_key_path:?}")
153            })?;
154            let mut buf = String::new();
155            key_file.read_to_string(&mut buf)?;
156
157            let pub_key = rsa::RsaPublicKey::from_public_key_pem(&buf)?;
158            let transport_pub_key = TransportPublicKey::from(pub_key);
159
160            // Skip if this gateway's public key matches our own
161            if &transport_pub_key == own_pub_key {
162                tracing::warn!(
163                    "Skipping gateway with same public key as self: {:?}",
164                    public_key_path
165                );
166                continue;
167            }
168
169            let address = Self::parse_socket_addr(address).await?;
170            let peer_id = PeerId::new(address, transport_pub_key);
171            let location = location
172                .map(Location::new)
173                .unwrap_or_else(|| Location::from_address(&address));
174            gateways.push(InitPeerNode::new(peer_id, location));
175        }
176        tracing::info!(
177            "Node will be listening at {}:{} internal address",
178            config.network_api.address,
179            config.network_api.port
180        );
181        if let Some(peer_id) = &config.peer_id {
182            tracing::info!("Node external address: {}", peer_id.addr);
183        }
184        Ok(NodeConfig {
185            should_connect: true,
186            is_gateway: config.is_gateway,
187            key_pair: config.transport_keypair().clone(),
188            gateways,
189            peer_id: config.peer_id.clone(),
190            network_listener_ip: config.network_api.address,
191            network_listener_port: config.network_api.port,
192            location: config.location.map(Location::new),
193            config: Arc::new(config.clone()),
194            max_hops_to_live: None,
195            rnd_if_htl_above: None,
196            max_number_conn: None,
197            min_number_conn: None,
198            max_upstream_bandwidth: None,
199            max_downstream_bandwidth: None,
200            blocked_addresses: config.network_api.blocked_addresses.clone(),
201        })
202    }
203
204    pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
205        let (hostname, port) = match address {
206            crate::config::Address::Hostname(hostname) => {
207                match hostname.rsplit_once(':') {
208                    None => {
209                        // no port found, use default
210                        let hostname_with_port =
211                            format!("{}:{}", hostname, crate::config::default_network_api_port());
212
213                        if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
214                            if let Some(addr) = addrs.next() {
215                                return Ok(addr);
216                            }
217                        }
218
219                        (Cow::Borrowed(hostname.as_str()), None)
220                    }
221                    Some((host, port)) => match port.parse::<u16>() {
222                        Ok(port) => {
223                            if let Ok(mut addrs) = hostname.to_socket_addrs() {
224                                if let Some(addr) = addrs.next() {
225                                    return Ok(addr);
226                                }
227                            }
228
229                            (Cow::Borrowed(host), Some(port))
230                        }
231                        Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
232                    },
233                }
234            }
235            Address::HostAddress(addr) => return Ok(*addr),
236        };
237
238        let (conf, opts) = hickory_resolver::system_conf::read_system_conf()?;
239        let resolver = hickory_resolver::TokioAsyncResolver::new(
240            conf,
241            opts,
242            hickory_resolver::name_server::GenericConnector::new(
243                hickory_resolver::name_server::TokioRuntimeProvider::new(),
244            ),
245        );
246
247        // only issue one query with .
248        let hostname = if hostname.ends_with('.') {
249            hostname
250        } else {
251            Cow::Owned(format!("{hostname}."))
252        };
253
254        let ips = resolver.lookup_ip(hostname.as_ref()).await?;
255        match ips.into_iter().next() {
256            Some(ip) => Ok(SocketAddr::new(
257                ip,
258                port.unwrap_or_else(crate::config::default_network_api_port),
259            )),
260            None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
261        }
262    }
263
264    pub fn config(&self) -> &Config {
265        &self.config
266    }
267
268    pub fn is_gateway(&mut self) -> &mut Self {
269        self.is_gateway = true;
270        self
271    }
272
273    pub fn first_gateway(&mut self) {
274        self.should_connect = false;
275    }
276
277    pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
278        self.should_connect = should_connect;
279        self
280    }
281
282    pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
283        self.max_hops_to_live = Some(num_hops);
284        self
285    }
286
287    pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
288        self.rnd_if_htl_above = Some(num_hops);
289        self
290    }
291
292    pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
293        self.max_number_conn = Some(num);
294        self
295    }
296
297    pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
298        self.min_number_conn = Some(num);
299        self
300    }
301
302    pub fn with_peer_id(&mut self, peer_id: PeerId) -> &mut Self {
303        self.peer_id = Some(peer_id);
304        self
305    }
306
307    pub fn with_location(&mut self, loc: Location) -> &mut Self {
308        self.location = Some(loc);
309        self
310    }
311
312    /// Connection info for an already existing peer. Required in case this is not a gateway node.
313    pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
314        self.gateways.push(peer);
315        self
316    }
317
318    /// Builds a node using the default backend connection manager.
319    pub async fn build<const CLIENTS: usize>(
320        self,
321        clients: [BoxedClient; CLIENTS],
322    ) -> anyhow::Result<Node> {
323        let event_register = {
324            #[cfg(feature = "trace-ot")]
325            {
326                use super::tracing::{CombinedRegister, OTEventRegister};
327                CombinedRegister::new([
328                    Box::new(EventRegister::new(self.config.event_log())),
329                    Box::new(OTEventRegister::new()),
330                ])
331            }
332            #[cfg(not(feature = "trace-ot"))]
333            {
334                EventRegister::new(self.config.event_log())
335            }
336        };
337        let cfg = self.config.clone();
338        let node = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
339            self,
340            clients,
341            event_register,
342            cfg,
343        )
344        .await?;
345        Ok(Node(node))
346    }
347
348    pub fn get_peer_id(&self) -> Option<PeerId> {
349        self.peer_id.clone()
350    }
351
352    /// Returns all specified gateways for this peer. Returns an error if the peer is not a gateway
353    /// and no gateways are specified.
354    fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
355        let gateways: Vec<PeerKeyLocation> = self
356            .gateways
357            .iter()
358            .map(|node| PeerKeyLocation {
359                peer: node.peer_id.clone(),
360                location: Some(node.location),
361            })
362            .collect();
363
364        if !self.is_gateway && gateways.is_empty() {
365            anyhow::bail!(
366            "At least one remote gateway is required to join an existing network for non-gateway nodes."
367        )
368        } else {
369            Ok(gateways)
370        }
371    }
372}
373
374/// Gateway node to use for joining the network.
375#[derive(Clone, Serialize, Deserialize, Debug)]
376pub struct InitPeerNode {
377    peer_id: PeerId,
378    location: Location,
379}
380
381impl InitPeerNode {
382    pub fn new(peer_id: PeerId, location: Location) -> Self {
383        Self { peer_id, location }
384    }
385}
386
387async fn report_result(
388    tx: Option<Transaction>,
389    op_result: Result<Option<OpEnum>, OpError>,
390    op_manager: &OpManager,
391    executor_callback: Option<ExecutorToEventLoopChannel<Callback>>,
392    client_req_handler_callback: Option<(Vec<ClientId>, ClientResponsesSender)>,
393    event_listener: &mut dyn NetEventRegister,
394) {
395    // Add UPDATE-specific debug logging at the start
396    if let Some(tx_id) = tx {
397        if matches!(tx_id.transaction_type(), TransactionType::Update) {
398            tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
399        }
400    }
401
402    match op_result {
403        Ok(Some(op_res)) => {
404            // Log specifically for UPDATE operations
405            if let crate::operations::OpEnum::Update(ref update_op) = op_res {
406                tracing::debug!(
407                    "UPDATE operation {} completed, finalized: {}",
408                    update_op.id,
409                    update_op.finalized()
410                );
411            }
412
413            // NEW: Send to result router if feature flag is enabled and transaction exists
414            // (independent of legacy callback presence)
415            if let (Some(transaction), Some(router_tx)) = (tx, &op_manager.result_router_tx) {
416                let host_result = op_res.to_host_result();
417                let router_tx_clone = router_tx.clone();
418
419                // Spawn fire-and-forget task to avoid blocking report_result()
420                // while still guaranteeing message delivery
421                tokio::spawn(async move {
422                    if let Err(e) = router_tx_clone.send((transaction, host_result)).await {
423                        tracing::error!(
424                            "CRITICAL: Result router channel closed - dual-path delivery broken. \
425                             Router or session actor has crashed. Transaction: {}. Error: {}. \
426                             Consider restarting node or disabling FREENET_ACTOR_CLIENTS flag.",
427                            transaction,
428                            e
429                        );
430                        // TODO: Consider implementing circuit breaker or automatic recovery
431                    }
432                });
433            }
434
435            // EXISTING: Legacy client delivery (only when actor_clients is disabled)
436            // When actor_clients is enabled, SessionActor handles all client communication
437            if !op_manager.actor_clients {
438                if let Some((client_ids, cb)) = client_req_handler_callback {
439                    for client_id in client_ids {
440                        // Enhanced logging for UPDATE operations
441                        if let crate::operations::OpEnum::Update(ref update_op) = op_res {
442                            tracing::debug!(
443                                "Sending UPDATE response to client {} for transaction {}",
444                                client_id,
445                                update_op.id
446                            );
447
448                            // Log the result being sent
449                            let host_result = op_res.to_host_result();
450                            match &host_result {
451                                Ok(response) => {
452                                    tracing::debug!(
453                                    "Client {} callback found, sending successful UPDATE response: {:?}",
454                                    client_id,
455                                    response
456                                );
457                                }
458                                Err(error) => {
459                                    tracing::error!(
460                                        "Client {} callback found, sending UPDATE error: {:?}",
461                                        client_id,
462                                        error
463                                    );
464                                }
465                            }
466                        } else {
467                            tracing::debug!(?tx, %client_id,  "Sending response to client");
468                        }
469                        // Legacy delivery needs a RequestId - generate one for backward compatibility
470                        use crate::client_events::RequestId;
471                        let _ = cb.send((client_id, RequestId::new(), op_res.to_host_result()));
472                    }
473                } else {
474                    // Log when no client callback is found for UPDATE operations
475                    if let crate::operations::OpEnum::Update(ref update_op) = op_res {
476                        tracing::debug!(
477                        "No client callback found for UPDATE transaction {} - this may indicate a missing client subscription",
478                        update_op.id
479                    );
480                    }
481                }
482            } // End skip_legacy_delivery check
483
484            // check operations.rs:handle_op_result to see what's the meaning of each state
485            // in case more cases want to be handled when feeding information to the OpManager
486
487            match op_res.outcome() {
488                OpOutcome::ContractOpSuccess {
489                    target_peer,
490                    contract_location,
491                    first_response_time,
492                    payload_size,
493                    payload_transfer_time,
494                } => {
495                    let event = RouteEvent {
496                        peer: target_peer.clone(),
497                        contract_location,
498                        outcome: RouteOutcome::Success {
499                            time_to_response_start: first_response_time,
500                            payload_size,
501                            payload_transfer_time,
502                        },
503                    };
504                    event_listener
505                        .register_events(Either::Left(NetEventLog::route_event(
506                            op_res.id(),
507                            &op_manager.ring,
508                            &event,
509                        )))
510                        .await;
511                    op_manager.ring.routing_finished(event);
512                }
513                // todo: handle failures, need to track timeouts and other potential failures
514                // OpOutcome::ContractOpFailure {
515                //     target_peer: Some(target_peer),
516                //     contract_location,
517                // } => {
518                //     op_manager.ring.routing_finished(RouteEvent {
519                //         peer: *target_peer,
520                //         contract_location,
521                //         outcome: RouteOutcome::Failure,
522                //     });
523                // }
524                OpOutcome::Incomplete | OpOutcome::Irrelevant => {}
525            }
526            if let Some(mut cb) = executor_callback {
527                cb.response(op_res).await;
528            }
529        }
530        Ok(None) => {
531            tracing::debug!(?tx, "No operation result found");
532        }
533        Err(err) => {
534            // just mark the operation as completed so no redundant messages are processed for this transaction anymore
535            if let Some(tx) = tx {
536                op_manager.completed(tx);
537            }
538            #[cfg(any(debug_assertions, test))]
539            {
540                use std::io::Write;
541                #[cfg(debug_assertions)]
542                let OpError::InvalidStateTransition { tx, state, trace } = err
543                else {
544                    tracing::error!("Finished transaction with error: {err}");
545                    return;
546                };
547                #[cfg(not(debug_assertions))]
548                let OpError::InvalidStateTransition { tx } = err
549                else {
550                    tracing::error!("Finished transaction with error: {err}");
551                    return;
552                };
553                // todo: this can be improved once std::backtrace::Backtrace::frames is stabilized
554                #[cfg(debug_assertions)]
555                let trace = format!("{trace}");
556                #[cfg(debug_assertions)]
557                {
558                    let mut tr_lines = trace.lines();
559                    let trace = tr_lines
560                        .nth(2)
561                        .map(|second_trace| {
562                            let second_trace_lines =
563                                [second_trace, tr_lines.next().unwrap_or_default()];
564                            second_trace_lines.join("\n")
565                        })
566                        .unwrap_or_default();
567                    let peer = &op_manager
568                        .ring
569                        .connection_manager
570                        .get_peer_key()
571                        .expect("Peer key not found");
572                    let log = format!(
573                        "Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
574                    );
575                    std::io::stderr().write_all(log.as_bytes()).unwrap();
576                }
577                #[cfg(not(debug_assertions))]
578                {
579                    let peer = &op_manager
580                        .ring
581                        .connection_manager
582                        .get_peer_key()
583                        .expect("Peer key not found");
584                    let log = format!("Transaction ({tx} @ {peer}) error\n");
585                    std::io::stderr().write_all(log.as_bytes()).unwrap();
586                }
587            }
588            #[cfg(not(any(debug_assertions, test)))]
589            {
590                tracing::debug!("Finished transaction with error: {err}");
591            }
592        }
593    }
594}
595
596macro_rules! handle_op_not_available {
597    ($op_result:ident) => {
598        if let Err(OpError::OpNotAvailable(state)) = &$op_result {
599            match state {
600                OpNotAvailable::Running => {
601                    tracing::debug!("Operation still running");
602                    // TODO: do exponential backoff
603                    tokio::time::sleep(Duration::from_micros(1_000)).await;
604                    continue;
605                }
606                OpNotAvailable::Completed => {
607                    tracing::debug!("Operation already completed");
608                    return;
609                }
610            }
611        }
612    };
613}
614
615#[allow(clippy::too_many_arguments)]
616async fn process_message<CB>(
617    msg: NetMessage,
618    op_manager: Arc<OpManager>,
619    conn_manager: CB,
620    event_listener: Box<dyn NetEventRegister>,
621    executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
622    client_req_handler_callback: Option<ClientResponsesSender>,
623    client_ids: Option<Vec<ClientId>>,
624    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
625) where
626    CB: NetworkBridge,
627{
628    let tx = Some(*msg.id());
629    match msg {
630        NetMessage::V1(msg_v1) => {
631            process_message_v1(
632                tx,
633                msg_v1,
634                op_manager,
635                conn_manager,
636                event_listener,
637                executor_callback,
638                client_req_handler_callback,
639                client_ids,
640                pending_op_result,
641            )
642            .await
643        }
644    }
645}
646
647/// NEW: Pure network message processing using MessageProcessor for client handling
648#[allow(clippy::too_many_arguments)]
649pub(crate) async fn process_message_decoupled<CB>(
650    msg: NetMessage,
651    op_manager: Arc<OpManager>,
652    conn_manager: CB,
653    event_listener: Box<dyn NetEventRegister>,
654    executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
655    message_processor: std::sync::Arc<MessageProcessor>,
656    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
657) where
658    CB: NetworkBridge,
659{
660    let tx = *msg.id();
661
662    // Pure network message processing - no client types involved
663    let op_result = handle_pure_network_message(
664        msg,
665        op_manager.clone(),
666        conn_manager,
667        event_listener,
668        executor_callback,
669        pending_op_result,
670    )
671    .await;
672
673    // Delegate to MessageProcessor - it handles all client concerns internally
674    if let Err(e) = message_processor.handle_network_result(tx, op_result).await {
675        tracing::error!(
676            "Failed to handle network result for transaction {}: {}",
677            tx,
678            e
679        );
680    }
681}
682
683/// Pure network message handling (no client concerns)
684#[allow(clippy::too_many_arguments)]
685async fn handle_pure_network_message<CB>(
686    msg: NetMessage,
687    op_manager: Arc<OpManager>,
688    conn_manager: CB,
689    event_listener: Box<dyn NetEventRegister>,
690    executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
691    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
692) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
693where
694    CB: NetworkBridge,
695{
696    match msg {
697        NetMessage::V1(msg_v1) => {
698            handle_pure_network_message_v1(
699                msg_v1,
700                op_manager,
701                conn_manager,
702                event_listener,
703                executor_callback,
704                pending_op_result,
705            )
706            .await
707        }
708    }
709}
710
711#[allow(clippy::too_many_arguments)]
712async fn process_message_v1<CB>(
713    tx: Option<Transaction>,
714    msg: NetMessageV1,
715    op_manager: Arc<OpManager>,
716    mut conn_manager: CB,
717    mut event_listener: Box<dyn NetEventRegister>,
718    executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
719    client_req_handler_callback: Option<ClientResponsesSender>,
720    client_id: Option<Vec<ClientId>>,
721    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
722) where
723    CB: NetworkBridge,
724{
725    let cli_req = client_id.zip(client_req_handler_callback);
726    event_listener
727        .register_events(NetEventLog::from_inbound_msg_v1(&msg, &op_manager))
728        .await;
729
730    const MAX_RETRIES: usize = 10usize;
731    for i in 0..MAX_RETRIES {
732        tracing::debug!(?tx, "Processing operation, iteration: {i}");
733        match msg {
734            NetMessageV1::Connect(ref op) => {
735                let parent_span = tracing::Span::current();
736                let span = tracing::info_span!(
737                    parent: parent_span,
738                    "handle_connect_op_request",
739                    transaction = %msg.id(),
740                    tx_type = %msg.id().transaction_type()
741                );
742                let op_result =
743                    handle_op_request::<connect::ConnectOp, _>(&op_manager, &mut conn_manager, op)
744                        .instrument(span)
745                        .await;
746
747                handle_op_not_available!(op_result);
748                return report_result(
749                    tx,
750                    op_result,
751                    &op_manager,
752                    executor_callback,
753                    cli_req,
754                    &mut *event_listener,
755                )
756                .await;
757            }
758            NetMessageV1::Put(ref op) => {
759                let op_result =
760                    handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await;
761
762                if is_operation_completed(&op_result) {
763                    if let Some(ref op_execution_callback) = pending_op_result {
764                        let tx_id = *op.id();
765                        let _ = op_execution_callback
766                            .send(NetMessage::V1(NetMessageV1::Put((*op).clone())))
767                            .await
768                            .inspect_err(
769                                |err| tracing::error!(%err, %tx_id, "Failed to send message to client"),
770                            );
771                    }
772                }
773
774                handle_op_not_available!(op_result);
775                return report_result(
776                    tx,
777                    op_result,
778                    &op_manager,
779                    executor_callback,
780                    cli_req,
781                    &mut *event_listener,
782                )
783                .await;
784            }
785            NetMessageV1::Get(ref op) => {
786                let op_result =
787                    handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await;
788                if is_operation_completed(&op_result) {
789                    if let Some(ref op_execution_callback) = pending_op_result {
790                        let tx_id = *op.id();
791                        let _ = op_execution_callback
792                            .send(NetMessage::V1(NetMessageV1::Get((*op).clone()))).await.inspect_err(|err|
793                                tracing::error!(%err, %tx_id, "Failed to send message to client")
794                            );
795                    }
796                }
797                handle_op_not_available!(op_result);
798                return report_result(
799                    tx,
800                    op_result,
801                    &op_manager,
802                    executor_callback,
803                    cli_req,
804                    &mut *event_listener,
805                )
806                .await;
807            }
808            NetMessageV1::Subscribe(ref op) => {
809                let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
810                    &op_manager,
811                    &mut conn_manager,
812                    op,
813                )
814                .await;
815                if is_operation_completed(&op_result) {
816                    if let Some(ref op_execution_callback) = pending_op_result {
817                        let tx_id = *op.id();
818                        let _ = op_execution_callback
819                            .send(NetMessage::V1(NetMessageV1::Subscribe((*op).clone()))).await.inspect_err(|err|
820                                tracing::error!(%err, %tx_id, "Failed to send message to client")
821                            );
822                    }
823                }
824                handle_op_not_available!(op_result);
825                return report_result(
826                    tx,
827                    op_result,
828                    &op_manager,
829                    executor_callback,
830                    cli_req,
831                    &mut *event_listener,
832                )
833                .await;
834            }
835            NetMessageV1::Update(ref op) => {
836                let op_result =
837                    handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
838                        .await;
839                if is_operation_completed(&op_result) {
840                    if let Some(ref op_execution_callback) = pending_op_result {
841                        let tx_id = *op.id();
842                        let _ = op_execution_callback
843                                .send(NetMessage::V1(NetMessageV1::Update((*op).clone()))).await.inspect_err(|err|
844                                    tracing::error!(%err, %tx_id, "Failed to send message to client")
845                                );
846                    }
847                }
848                handle_op_not_available!(op_result);
849                return report_result(
850                    tx,
851                    op_result,
852                    &op_manager,
853                    executor_callback,
854                    cli_req,
855                    &mut *event_listener,
856                )
857                .await;
858            }
859            NetMessageV1::Unsubscribed { ref key, .. } => {
860                if let Err(error) = subscribe(op_manager, *key, None).await {
861                    tracing::error!(%error, "Failed to subscribe to contract");
862                }
863                break;
864            }
865            _ => break, // Exit the loop if no applicable message type is found
866        }
867    }
868}
869
870/// Pure network message processing for V1 messages (no client concerns)
871#[allow(clippy::too_many_arguments)]
872async fn handle_pure_network_message_v1<CB>(
873    msg: NetMessageV1,
874    op_manager: Arc<OpManager>,
875    mut conn_manager: CB,
876    mut event_listener: Box<dyn NetEventRegister>,
877    executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
878    pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
879) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
880where
881    CB: NetworkBridge,
882{
883    // Register network events (pure network concern)
884    event_listener
885        .register_events(NetEventLog::from_inbound_msg_v1(&msg, &op_manager))
886        .await;
887
888    const MAX_RETRIES: usize = 10usize;
889    for i in 0..MAX_RETRIES {
890        let tx = Some(*msg.id());
891        tracing::debug!(?tx, "Processing pure network operation, iteration: {i}");
892
893        match msg {
894            NetMessageV1::Connect(ref op) => {
895                let parent_span = tracing::Span::current();
896                let span = tracing::info_span!(
897                    parent: parent_span,
898                    "handle_connect_op_request",
899                    transaction = %msg.id(),
900                    tx_type = %msg.id().transaction_type()
901                );
902                let op_result =
903                    handle_op_request::<connect::ConnectOp, _>(&op_manager, &mut conn_manager, op)
904                        .instrument(span)
905                        .await;
906
907                if let Err(OpError::OpNotAvailable(state)) = &op_result {
908                    match state {
909                        OpNotAvailable::Running => {
910                            tracing::debug!("Pure network: Operation still running");
911                            tokio::time::sleep(Duration::from_micros(1_000)).await;
912                            continue;
913                        }
914                        OpNotAvailable::Completed => {
915                            tracing::debug!("Pure network: Operation already completed");
916                            return Ok(None);
917                        }
918                    }
919                }
920
921                // Pure network result processing - no client handling
922                return handle_pure_network_result(
923                    tx,
924                    op_result,
925                    &op_manager,
926                    executor_callback,
927                    &mut *event_listener,
928                )
929                .await;
930            }
931            NetMessageV1::Put(ref op) => {
932                let op_result =
933                    handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await;
934
935                // Handle pending operation results (network concern)
936                if is_operation_completed(&op_result) {
937                    if let Some(ref op_execution_callback) = pending_op_result {
938                        let tx_id = *op.id();
939                        let _ = op_execution_callback
940                            .send(NetMessage::V1(NetMessageV1::Put((*op).clone())))
941                            .await
942                            .inspect_err(|err| tracing::error!(%err, %tx_id, "Failed to send message to executor"));
943                    }
944                }
945
946                if let Err(OpError::OpNotAvailable(state)) = &op_result {
947                    match state {
948                        OpNotAvailable::Running => {
949                            tracing::debug!("Pure network: Operation still running");
950                            tokio::time::sleep(Duration::from_micros(1_000)).await;
951                            continue;
952                        }
953                        OpNotAvailable::Completed => {
954                            tracing::debug!("Pure network: Operation already completed");
955                            return Ok(None);
956                        }
957                    }
958                }
959
960                return handle_pure_network_result(
961                    tx,
962                    op_result,
963                    &op_manager,
964                    executor_callback,
965                    &mut *event_listener,
966                )
967                .await;
968            }
969            NetMessageV1::Get(ref op) => {
970                let op_result =
971                    handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await;
972
973                // Handle pending operation results (network concern)
974                if is_operation_completed(&op_result) {
975                    if let Some(ref op_execution_callback) = pending_op_result {
976                        let tx_id = *op.id();
977                        let _ = op_execution_callback
978                            .send(NetMessage::V1(NetMessageV1::Get((*op).clone())))
979                            .await
980                            .inspect_err(|err| tracing::error!(%err, %tx_id, "Failed to send message to executor"));
981                    }
982                }
983
984                if let Err(OpError::OpNotAvailable(state)) = &op_result {
985                    match state {
986                        OpNotAvailable::Running => {
987                            tracing::debug!("Pure network: Operation still running");
988                            tokio::time::sleep(Duration::from_micros(1_000)).await;
989                            continue;
990                        }
991                        OpNotAvailable::Completed => {
992                            tracing::debug!("Pure network: Operation already completed");
993                            return Ok(None);
994                        }
995                    }
996                }
997
998                return handle_pure_network_result(
999                    tx,
1000                    op_result,
1001                    &op_manager,
1002                    executor_callback,
1003                    &mut *event_listener,
1004                )
1005                .await;
1006            }
1007            NetMessageV1::Update(ref op) => {
1008                let op_result =
1009                    handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
1010                        .await;
1011
1012                if let Err(OpError::OpNotAvailable(state)) = &op_result {
1013                    match state {
1014                        OpNotAvailable::Running => {
1015                            tracing::debug!("Pure network: Operation still running");
1016                            tokio::time::sleep(Duration::from_micros(1_000)).await;
1017                            continue;
1018                        }
1019                        OpNotAvailable::Completed => {
1020                            tracing::debug!("Pure network: Operation already completed");
1021                            return Ok(None);
1022                        }
1023                    }
1024                }
1025
1026                return handle_pure_network_result(
1027                    tx,
1028                    op_result,
1029                    &op_manager,
1030                    executor_callback,
1031                    &mut *event_listener,
1032                )
1033                .await;
1034            }
1035            NetMessageV1::Subscribe(ref op) => {
1036                let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
1037                    &op_manager,
1038                    &mut conn_manager,
1039                    op,
1040                )
1041                .await;
1042
1043                if let Err(OpError::OpNotAvailable(state)) = &op_result {
1044                    match state {
1045                        OpNotAvailable::Running => {
1046                            tracing::debug!("Pure network: Operation still running");
1047                            tokio::time::sleep(Duration::from_micros(1_000)).await;
1048                            continue;
1049                        }
1050                        OpNotAvailable::Completed => {
1051                            tracing::debug!("Pure network: Operation already completed");
1052                            return Ok(None);
1053                        }
1054                    }
1055                }
1056
1057                return handle_pure_network_result(
1058                    tx,
1059                    op_result,
1060                    &op_manager,
1061                    executor_callback,
1062                    &mut *event_listener,
1063                )
1064                .await;
1065            }
1066            NetMessageV1::Unsubscribed { ref key, .. } => {
1067                if let Err(error) = subscribe(op_manager, *key, None).await {
1068                    tracing::error!(%error, "Failed to subscribe to contract");
1069                }
1070                break;
1071            }
1072            _ => break, // Exit the loop if no applicable message type is found
1073        }
1074    }
1075
1076    // If we reach here, no operation was processed
1077    Ok(None)
1078}
1079
1080/// Pure network result handling - no client notification logic
1081async fn handle_pure_network_result(
1082    tx: Option<Transaction>,
1083    op_result: Result<Option<crate::operations::OpEnum>, OpError>,
1084    _op_manager: &Arc<OpManager>,
1085    _executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
1086    _event_listener: &mut dyn NetEventRegister,
1087) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError> {
1088    tracing::debug!("Pure network result handling for transaction: {:?}", tx);
1089
1090    match &op_result {
1091        Ok(Some(_op_res)) => {
1092            // Log network operation completion
1093            tracing::debug!(
1094                "Network operation completed successfully for transaction: {:?}",
1095                tx
1096            );
1097
1098            // Register completion event (pure network concern)
1099            if let Some(tx_id) = tx {
1100                // TODO: Register completion event properly
1101                tracing::debug!("Network operation completed for transaction: {}", tx_id);
1102            }
1103
1104            // TODO: Handle executor callbacks (network concern)
1105            // Executor callback functionality needs to be restored with proper types
1106            if let Some(_callback) = _executor_callback {
1107                tracing::debug!("Executor callback available for transaction {:?} but not implemented in pure network processing", tx);
1108            }
1109        }
1110        Ok(None) => {
1111            tracing::debug!("Network operation returned no result");
1112        }
1113        Err(e) => {
1114            tracing::error!("Network operation failed: {}", e);
1115            // TODO: Register error event properly
1116            if let Some(tx_id) = tx {
1117                tracing::debug!(
1118                    "Network operation failed for transaction: {} with error: {}",
1119                    tx_id,
1120                    e
1121                );
1122            }
1123        }
1124    }
1125
1126    op_result
1127}
1128
1129/// Attempts to subscribe to a contract
1130pub async fn subscribe(
1131    op_manager: Arc<OpManager>,
1132    key: ContractKey,
1133    client_id: Option<ClientId>,
1134) -> Result<Transaction, OpError> {
1135    subscribe_with_id(op_manager, key, client_id, None).await
1136}
1137
1138/// Attempts to subscribe to a contract with a specific transaction ID (for deduplication)
1139pub async fn subscribe_with_id(
1140    op_manager: Arc<OpManager>,
1141    key: ContractKey,
1142    client_id: Option<ClientId>,
1143    transaction_id: Option<Transaction>,
1144) -> Result<Transaction, OpError> {
1145    let op = match transaction_id {
1146        Some(id) => subscribe::start_op_with_id(key, id),
1147        None => subscribe::start_op(key),
1148    };
1149    let id = op.id;
1150    if let Some(client_id) = client_id {
1151        use crate::client_events::RequestId;
1152        // Generate a default RequestId for internal subscription operations
1153        let request_id = RequestId::new();
1154        let _ = op_manager
1155            .ch_outbound
1156            .waiting_for_transaction_result(
1157                WaitingTransaction::Subscription {
1158                    contract_key: *key.id(),
1159                },
1160                client_id,
1161                request_id,
1162            )
1163            .await;
1164    }
1165    // Initialize a subscribe op.
1166    match subscribe::request_subscribe(&op_manager, op).await {
1167        Err(err) => {
1168            tracing::error!("{}", err);
1169            Err(err)
1170        }
1171        Ok(()) => Ok(id),
1172    }
1173}
1174
1175async fn handle_aborted_op(
1176    tx: Transaction,
1177    op_manager: &OpManager,
1178    gateways: &[PeerKeyLocation],
1179) -> Result<(), OpError> {
1180    use crate::util::IterExt;
1181    if let TransactionType::Connect = tx.transaction_type() {
1182        // attempt to establish a connection failed, this could be a fatal error since the node
1183        // is useless without connecting to the network, we will retry with exponential backoff
1184        // if necessary
1185        match op_manager.pop(&tx) {
1186            // only keep attempting to connect if the node hasn't got enough connections yet
1187            Ok(Some(OpEnum::Connect(op)))
1188                if op.has_backoff()
1189                    && op_manager.ring.open_connections()
1190                        < op_manager.ring.connection_manager.min_connections =>
1191            {
1192                let ConnectOp {
1193                    gateway, backoff, ..
1194                } = *op;
1195                if let Some(gateway) = gateway {
1196                    tracing::warn!("Retry connecting to gateway {}", gateway.peer);
1197                    connect::join_ring_request(backoff, &gateway, op_manager).await?;
1198                }
1199            }
1200            Ok(Some(OpEnum::Connect(_))) => {
1201                // if no connections were achieved just fail
1202                if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
1203                    tracing::warn!("Retrying joining the ring with an other gateway");
1204                    if let Some(gateway) = gateways.iter().shuffle().next() {
1205                        connect::join_ring_request(None, gateway, op_manager).await?
1206                    }
1207                }
1208            }
1209            _ => {}
1210        }
1211    }
1212    Ok(())
1213}
1214
1215/// The identifier of a peer in the network is composed of its address and public key.
1216///
1217/// A regular peer will have its `PeerId` set when it connects to a gateway as it get's
1218/// its external address from the gateway.
1219///
1220/// A gateway will have its `PeerId` set when it is created since it will know its own address
1221/// from the start.
1222#[derive(Serialize, Deserialize, Eq, Clone)]
1223pub struct PeerId {
1224    pub addr: SocketAddr,
1225    pub pub_key: TransportPublicKey,
1226}
1227
1228impl Hash for PeerId {
1229    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1230        self.addr.hash(state);
1231    }
1232}
1233
1234impl PartialEq<PeerId> for PeerId {
1235    fn eq(&self, other: &PeerId) -> bool {
1236        self.addr == other.addr
1237    }
1238}
1239
1240impl Ord for PeerId {
1241    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1242        self.addr.cmp(&other.addr)
1243    }
1244}
1245
1246impl PartialOrd for PeerId {
1247    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1248        Some(self.cmp(other))
1249    }
1250}
1251
1252impl PeerId {
1253    pub fn new(addr: SocketAddr, pub_key: TransportPublicKey) -> Self {
1254        Self { addr, pub_key }
1255    }
1256}
1257
1258thread_local! {
1259    static PEER_ID: std::cell::RefCell<Option<TransportPublicKey>> = const { std::cell::RefCell::new(None) };
1260}
1261
1262#[cfg(test)]
1263impl<'a> arbitrary::Arbitrary<'a> for PeerId {
1264    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1265        let addr: ([u8; 4], u16) = u.arbitrary()?;
1266
1267        let pub_key = PEER_ID.with(|peer_id| {
1268            let mut peer_id = peer_id.borrow_mut();
1269            match &*peer_id {
1270                Some(k) => k.clone(),
1271                None => {
1272                    let key = TransportKeypair::new().public().clone();
1273                    peer_id.replace(key.clone());
1274                    key
1275                }
1276            }
1277        });
1278
1279        Ok(Self {
1280            addr: addr.into(),
1281            pub_key,
1282        })
1283    }
1284}
1285
1286impl PeerId {
1287    pub fn random() -> Self {
1288        use rand::Rng;
1289        let mut addr = [0; 4];
1290        rand::rng().fill(&mut addr[..]);
1291        let port = crate::util::get_free_port().unwrap();
1292
1293        let pub_key = PEER_ID.with(|peer_id| {
1294            let mut peer_id = peer_id.borrow_mut();
1295            match &*peer_id {
1296                Some(k) => k.clone(),
1297                None => {
1298                    let key = TransportKeypair::new().public().clone();
1299                    peer_id.replace(key.clone());
1300                    key
1301                }
1302            }
1303        });
1304
1305        Self {
1306            addr: (addr, port).into(),
1307            pub_key,
1308        }
1309    }
1310
1311    #[cfg(test)]
1312    pub fn to_bytes(self) -> Vec<u8> {
1313        bincode::serialize(&self).unwrap()
1314    }
1315}
1316
1317impl std::fmt::Debug for PeerId {
1318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1319        <Self as Display>::fmt(self, f)
1320    }
1321}
1322
1323impl Display for PeerId {
1324    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1325        write!(f, "{:?}", self.pub_key)
1326    }
1327}
1328
1329pub async fn run_local_node(
1330    mut executor: Executor,
1331    socket: WebsocketApiConfig,
1332) -> anyhow::Result<()> {
1333    match socket.address {
1334        IpAddr::V4(ip) if !ip.is_loopback() => {
1335            anyhow::bail!("invalid ip: {ip}, expecting localhost")
1336        }
1337        IpAddr::V6(ip) if !ip.is_loopback() => {
1338            anyhow::bail!("invalid ip: {ip}, expecting localhost")
1339        }
1340        _ => {}
1341    }
1342
1343    let (mut gw, mut ws_proxy) = crate::server::serve_gateway_in(socket).await;
1344
1345    // TODO: use combinator instead
1346    // let mut all_clients =
1347    //    ClientEventsCombinator::new([Box::new(ws_handle), Box::new(http_handle)]);
1348    enum Receiver {
1349        Ws,
1350        Gw,
1351    }
1352    let mut receiver;
1353    loop {
1354        let req = tokio::select! {
1355            req = ws_proxy.recv() => {
1356                receiver = Receiver::Ws;
1357                req?
1358            }
1359            req = gw.recv() => {
1360                receiver = Receiver::Gw;
1361                req?
1362            }
1363        };
1364        let OpenRequest {
1365            client_id: id,
1366            request,
1367            notification_channel,
1368            token,
1369            ..
1370        } = req;
1371        tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
1372
1373        let res = match *request {
1374            ClientRequest::ContractOp(op) => {
1375                executor
1376                    .contract_requests(op, id, notification_channel)
1377                    .await
1378            }
1379            ClientRequest::DelegateOp(op) => {
1380                let attested_contract = token.and_then(|token| {
1381                    gw.attested_contracts
1382                        .read()
1383                        .ok()
1384                        .and_then(|guard| guard.get(&token).map(|(t, _)| *t))
1385                });
1386                let op_name = match op {
1387                    DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
1388                    DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
1389                    DelegateRequest::GetSecretRequest { .. } => "GetSecretRequest",
1390                    DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
1391                    _ => "Unknown",
1392                };
1393                tracing::debug!(
1394                    op_name = ?op_name,
1395                    ?attested_contract,
1396                    "Handling ClientRequest::DelegateOp"
1397                );
1398                executor.delegate_request(op, attested_contract.as_ref())
1399            }
1400            ClientRequest::Disconnect { cause } => {
1401                if let Some(cause) = cause {
1402                    tracing::info!("disconnecting cause: {cause}");
1403                }
1404                // FIXME: We're not removing tokens on disconnect to allow WebSocket connections
1405                // to use them for authentication. We should implement a proper token expiration
1406                // mechanism instead of keeping them forever or removing them immediately.
1407                // if let Ok(mut guard) = gw.attested_contracts.write() {
1408                //     if let Some(rm_token) = guard
1409                //         .iter()
1410                //         .find_map(|(k, (_, eid))| (eid == &id).then(|| k.clone()))
1411                //     {
1412                //         guard.remove(&rm_token);
1413                //     }
1414                // }
1415                continue;
1416            }
1417            _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
1418        };
1419
1420        match res {
1421            Ok(res) => {
1422                match receiver {
1423                    Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
1424                    Receiver::Gw => gw.send(id, Ok(res)).await?,
1425                };
1426            }
1427            Err(err) if err.is_request() => {
1428                let err = ErrorKind::RequestError(err.unwrap_request());
1429                match receiver {
1430                    Receiver::Ws => {
1431                        ws_proxy.send(id, Err(err.into())).await?;
1432                    }
1433                    Receiver::Gw => {
1434                        gw.send(id, Err(err.into())).await?;
1435                    }
1436                };
1437            }
1438            Err(err) => {
1439                tracing::error!("{err}");
1440                let err = Err(ErrorKind::Unhandled {
1441                    cause: format!("{err}").into(),
1442                }
1443                .into());
1444                match receiver {
1445                    Receiver::Ws => {
1446                        ws_proxy.send(id, err).await?;
1447                    }
1448                    Receiver::Gw => {
1449                        gw.send(id, err).await?;
1450                    }
1451                };
1452            }
1453        }
1454    }
1455}
1456
1457pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
1458    tracing::info!("Starting node");
1459
1460    let is_gateway = node.0.is_gateway;
1461    let location = if let Some(loc) = node.0.location {
1462        Some(loc)
1463    } else {
1464        is_gateway
1465            .then(|| {
1466                node.0
1467                    .peer_id
1468                    .clone()
1469                    .map(|id| Location::from_address(&id.addr))
1470            })
1471            .flatten()
1472    };
1473
1474    if let Some(location) = location {
1475        tracing::info!("Setting initial location: {location}");
1476        node.update_location(location);
1477    }
1478
1479    match node.run().await {
1480        Ok(_) => {
1481            if is_gateway {
1482                tracing::info!("Gateway finished");
1483            } else {
1484                tracing::info!("Node finished");
1485            }
1486
1487            Ok(())
1488        }
1489        Err(e) => {
1490            tracing::error!("{e}");
1491            Err(e)
1492        }
1493    }
1494}
1495
1496/// Trait to determine if an operation has completed, regardless of its specific type.
1497pub trait IsOperationCompleted {
1498    /// Returns true if the operation has completed (successfully or with error)
1499    fn is_completed(&self) -> bool;
1500}
1501
1502impl IsOperationCompleted for OpEnum {
1503    fn is_completed(&self) -> bool {
1504        match self {
1505            OpEnum::Connect(op) => op.is_completed(),
1506            OpEnum::Put(op) => op.is_completed(),
1507            OpEnum::Get(op) => op.is_completed(),
1508            OpEnum::Subscribe(op) => op.is_completed(),
1509            OpEnum::Update(op) => op.is_completed(),
1510        }
1511    }
1512}
1513
1514/// Check if an operation result indicates completion
1515pub fn is_operation_completed(op_result: &Result<Option<OpEnum>, OpError>) -> bool {
1516    match op_result {
1517        // If we got an OpEnum, check its specific completion status using the trait
1518        Ok(Some(op)) => op.is_completed(),
1519        _ => false,
1520    }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525    use std::net::{Ipv4Addr, Ipv6Addr};
1526
1527    use super::*;
1528
1529    #[tokio::test]
1530    async fn test_hostname_resolution() {
1531        let addr = Address::Hostname("localhost".to_string());
1532        let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
1533        assert!(
1534            socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1535                || socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
1536        );
1537        // Port should be in valid range
1538        assert!(socket_addr.port() > 1024); // Ensure we're using unprivileged ports
1539
1540        let addr = Address::Hostname("google.com".to_string());
1541        let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
1542        // Port should be in valid range
1543        assert!(socket_addr.port() > 1024); // Ensure we're using unprivileged ports
1544
1545        let addr = Address::Hostname("google.com:8080".to_string());
1546        let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
1547        assert_eq!(socket_addr.port(), 8080);
1548    }
1549}