1use anyhow::Context;
19use either::Either;
20use freenet_stdlib::{
21 client_api::{ClientRequest, ErrorKind},
22 prelude::ContractKey,
23};
24use std::{
25 borrow::Cow,
26 fmt::Display,
27 fs::File,
28 hash::Hash,
29 io::Read,
30 net::{IpAddr, SocketAddr, ToSocketAddrs},
31 sync::Arc,
32 time::Duration,
33};
34use std::{collections::HashSet, convert::Infallible};
35
36use self::p2p_impl::NodeP2P;
37use crate::{
38 client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
39 config::{Address, GatewayConfig, WebsocketApiConfig},
40 contract::{
41 Callback, ClientResponsesSender, ExecutorError, ExecutorToEventLoopChannel,
42 NetworkContractHandler, WaitingTransaction,
43 },
44 local_node::Executor,
45 message::{InnerMessage, NetMessage, Transaction, TransactionType},
46 operations::{
47 connect::{self, ConnectOp},
48 get, put, subscribe, update, OpEnum, OpError, OpOutcome,
49 },
50 ring::{Location, PeerKeyLocation},
51 router::{RouteEvent, RouteOutcome},
52 tracing::{EventRegister, NetEventLog, NetEventRegister},
53};
54use crate::{
55 config::Config,
56 message::{MessageStats, NetMessageV1},
57};
58use freenet_stdlib::client_api::DelegateRequest;
59use rsa::pkcs8::DecodePublicKey;
60use serde::{Deserialize, Serialize};
61use tracing::Instrument;
62
63use crate::operations::handle_op_request;
64pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};
65
66use crate::topology::rate::Rate;
67use crate::transport::{TransportKeypair, TransportPublicKey};
68pub(crate) use op_state_manager::{OpManager, OpNotAvailable};
69
70mod message_processor;
71mod network_bridge;
72mod op_state_manager;
73mod p2p_impl;
74mod request_router;
75pub(crate) mod testing_impl;
76
77pub use message_processor::MessageProcessor;
78pub use request_router::{DeduplicatedRequest, RequestRouter};
79
80pub struct Node(NodeP2P);
81
82impl Node {
83 pub fn update_location(&mut self, location: Location) {
84 self.0
85 .op_manager
86 .ring
87 .connection_manager
88 .update_location(Some(location));
89 }
90
91 pub async fn run(self) -> anyhow::Result<Infallible> {
92 self.0.run_node().await
93 }
94}
95
96#[derive(Serialize, Deserialize, Clone, Debug)]
107#[non_exhaustive] pub struct NodeConfig {
109 pub should_connect: bool,
112 pub is_gateway: bool,
113 pub key_pair: TransportKeypair,
115 pub network_listener_ip: IpAddr,
118 pub network_listener_port: u16,
120 pub(crate) peer_id: Option<PeerId>,
121 pub(crate) config: Arc<Config>,
122 pub(crate) gateways: Vec<InitPeerNode>,
125 pub(crate) location: Option<Location>,
127 pub(crate) max_hops_to_live: Option<usize>,
128 pub(crate) rnd_if_htl_above: Option<usize>,
129 pub(crate) max_number_conn: Option<usize>,
130 pub(crate) min_number_conn: Option<usize>,
131 pub(crate) max_upstream_bandwidth: Option<Rate>,
132 pub(crate) max_downstream_bandwidth: Option<Rate>,
133 pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
134}
135
136impl NodeConfig {
137 pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
138 tracing::info!("Loading node configuration for mode {}", config.mode);
139
140 let own_pub_key = config.transport_keypair().public();
142
143 let mut gateways = Vec::with_capacity(config.gateways.len());
144 for gw in &config.gateways {
145 let GatewayConfig {
146 address,
147 public_key_path,
148 location,
149 } = gw;
150
151 let mut key_file = File::open(public_key_path).with_context(|| {
152 format!("failed loading gateway pubkey from {public_key_path:?}")
153 })?;
154 let mut buf = String::new();
155 key_file.read_to_string(&mut buf)?;
156
157 let pub_key = rsa::RsaPublicKey::from_public_key_pem(&buf)?;
158 let transport_pub_key = TransportPublicKey::from(pub_key);
159
160 if &transport_pub_key == own_pub_key {
162 tracing::warn!(
163 "Skipping gateway with same public key as self: {:?}",
164 public_key_path
165 );
166 continue;
167 }
168
169 let address = Self::parse_socket_addr(address).await?;
170 let peer_id = PeerId::new(address, transport_pub_key);
171 let location = location
172 .map(Location::new)
173 .unwrap_or_else(|| Location::from_address(&address));
174 gateways.push(InitPeerNode::new(peer_id, location));
175 }
176 tracing::info!(
177 "Node will be listening at {}:{} internal address",
178 config.network_api.address,
179 config.network_api.port
180 );
181 if let Some(peer_id) = &config.peer_id {
182 tracing::info!("Node external address: {}", peer_id.addr);
183 }
184 Ok(NodeConfig {
185 should_connect: true,
186 is_gateway: config.is_gateway,
187 key_pair: config.transport_keypair().clone(),
188 gateways,
189 peer_id: config.peer_id.clone(),
190 network_listener_ip: config.network_api.address,
191 network_listener_port: config.network_api.port,
192 location: config.location.map(Location::new),
193 config: Arc::new(config.clone()),
194 max_hops_to_live: None,
195 rnd_if_htl_above: None,
196 max_number_conn: None,
197 min_number_conn: None,
198 max_upstream_bandwidth: None,
199 max_downstream_bandwidth: None,
200 blocked_addresses: config.network_api.blocked_addresses.clone(),
201 })
202 }
203
204 pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
205 let (hostname, port) = match address {
206 crate::config::Address::Hostname(hostname) => {
207 match hostname.rsplit_once(':') {
208 None => {
209 let hostname_with_port =
211 format!("{}:{}", hostname, crate::config::default_network_api_port());
212
213 if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
214 if let Some(addr) = addrs.next() {
215 return Ok(addr);
216 }
217 }
218
219 (Cow::Borrowed(hostname.as_str()), None)
220 }
221 Some((host, port)) => match port.parse::<u16>() {
222 Ok(port) => {
223 if let Ok(mut addrs) = hostname.to_socket_addrs() {
224 if let Some(addr) = addrs.next() {
225 return Ok(addr);
226 }
227 }
228
229 (Cow::Borrowed(host), Some(port))
230 }
231 Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
232 },
233 }
234 }
235 Address::HostAddress(addr) => return Ok(*addr),
236 };
237
238 let (conf, opts) = hickory_resolver::system_conf::read_system_conf()?;
239 let resolver = hickory_resolver::TokioAsyncResolver::new(
240 conf,
241 opts,
242 hickory_resolver::name_server::GenericConnector::new(
243 hickory_resolver::name_server::TokioRuntimeProvider::new(),
244 ),
245 );
246
247 let hostname = if hostname.ends_with('.') {
249 hostname
250 } else {
251 Cow::Owned(format!("{hostname}."))
252 };
253
254 let ips = resolver.lookup_ip(hostname.as_ref()).await?;
255 match ips.into_iter().next() {
256 Some(ip) => Ok(SocketAddr::new(
257 ip,
258 port.unwrap_or_else(crate::config::default_network_api_port),
259 )),
260 None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
261 }
262 }
263
264 pub fn config(&self) -> &Config {
265 &self.config
266 }
267
268 pub fn is_gateway(&mut self) -> &mut Self {
269 self.is_gateway = true;
270 self
271 }
272
273 pub fn first_gateway(&mut self) {
274 self.should_connect = false;
275 }
276
277 pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
278 self.should_connect = should_connect;
279 self
280 }
281
282 pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
283 self.max_hops_to_live = Some(num_hops);
284 self
285 }
286
287 pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
288 self.rnd_if_htl_above = Some(num_hops);
289 self
290 }
291
292 pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
293 self.max_number_conn = Some(num);
294 self
295 }
296
297 pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
298 self.min_number_conn = Some(num);
299 self
300 }
301
302 pub fn with_peer_id(&mut self, peer_id: PeerId) -> &mut Self {
303 self.peer_id = Some(peer_id);
304 self
305 }
306
307 pub fn with_location(&mut self, loc: Location) -> &mut Self {
308 self.location = Some(loc);
309 self
310 }
311
312 pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
314 self.gateways.push(peer);
315 self
316 }
317
318 pub async fn build<const CLIENTS: usize>(
320 self,
321 clients: [BoxedClient; CLIENTS],
322 ) -> anyhow::Result<Node> {
323 let event_register = {
324 #[cfg(feature = "trace-ot")]
325 {
326 use super::tracing::{CombinedRegister, OTEventRegister};
327 CombinedRegister::new([
328 Box::new(EventRegister::new(self.config.event_log())),
329 Box::new(OTEventRegister::new()),
330 ])
331 }
332 #[cfg(not(feature = "trace-ot"))]
333 {
334 EventRegister::new(self.config.event_log())
335 }
336 };
337 let cfg = self.config.clone();
338 let node = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
339 self,
340 clients,
341 event_register,
342 cfg,
343 )
344 .await?;
345 Ok(Node(node))
346 }
347
348 pub fn get_peer_id(&self) -> Option<PeerId> {
349 self.peer_id.clone()
350 }
351
352 fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
355 let gateways: Vec<PeerKeyLocation> = self
356 .gateways
357 .iter()
358 .map(|node| PeerKeyLocation {
359 peer: node.peer_id.clone(),
360 location: Some(node.location),
361 })
362 .collect();
363
364 if !self.is_gateway && gateways.is_empty() {
365 anyhow::bail!(
366 "At least one remote gateway is required to join an existing network for non-gateway nodes."
367 )
368 } else {
369 Ok(gateways)
370 }
371 }
372}
373
374#[derive(Clone, Serialize, Deserialize, Debug)]
376pub struct InitPeerNode {
377 peer_id: PeerId,
378 location: Location,
379}
380
381impl InitPeerNode {
382 pub fn new(peer_id: PeerId, location: Location) -> Self {
383 Self { peer_id, location }
384 }
385}
386
387async fn report_result(
388 tx: Option<Transaction>,
389 op_result: Result<Option<OpEnum>, OpError>,
390 op_manager: &OpManager,
391 executor_callback: Option<ExecutorToEventLoopChannel<Callback>>,
392 client_req_handler_callback: Option<(Vec<ClientId>, ClientResponsesSender)>,
393 event_listener: &mut dyn NetEventRegister,
394) {
395 if let Some(tx_id) = tx {
397 if matches!(tx_id.transaction_type(), TransactionType::Update) {
398 tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
399 }
400 }
401
402 match op_result {
403 Ok(Some(op_res)) => {
404 if let crate::operations::OpEnum::Update(ref update_op) = op_res {
406 tracing::debug!(
407 "UPDATE operation {} completed, finalized: {}",
408 update_op.id,
409 update_op.finalized()
410 );
411 }
412
413 if let (Some(transaction), Some(router_tx)) = (tx, &op_manager.result_router_tx) {
416 let host_result = op_res.to_host_result();
417 let router_tx_clone = router_tx.clone();
418
419 tokio::spawn(async move {
422 if let Err(e) = router_tx_clone.send((transaction, host_result)).await {
423 tracing::error!(
424 "CRITICAL: Result router channel closed - dual-path delivery broken. \
425 Router or session actor has crashed. Transaction: {}. Error: {}. \
426 Consider restarting node or disabling FREENET_ACTOR_CLIENTS flag.",
427 transaction,
428 e
429 );
430 }
432 });
433 }
434
435 if !op_manager.actor_clients {
438 if let Some((client_ids, cb)) = client_req_handler_callback {
439 for client_id in client_ids {
440 if let crate::operations::OpEnum::Update(ref update_op) = op_res {
442 tracing::debug!(
443 "Sending UPDATE response to client {} for transaction {}",
444 client_id,
445 update_op.id
446 );
447
448 let host_result = op_res.to_host_result();
450 match &host_result {
451 Ok(response) => {
452 tracing::debug!(
453 "Client {} callback found, sending successful UPDATE response: {:?}",
454 client_id,
455 response
456 );
457 }
458 Err(error) => {
459 tracing::error!(
460 "Client {} callback found, sending UPDATE error: {:?}",
461 client_id,
462 error
463 );
464 }
465 }
466 } else {
467 tracing::debug!(?tx, %client_id, "Sending response to client");
468 }
469 use crate::client_events::RequestId;
471 let _ = cb.send((client_id, RequestId::new(), op_res.to_host_result()));
472 }
473 } else {
474 if let crate::operations::OpEnum::Update(ref update_op) = op_res {
476 tracing::debug!(
477 "No client callback found for UPDATE transaction {} - this may indicate a missing client subscription",
478 update_op.id
479 );
480 }
481 }
482 } match op_res.outcome() {
488 OpOutcome::ContractOpSuccess {
489 target_peer,
490 contract_location,
491 first_response_time,
492 payload_size,
493 payload_transfer_time,
494 } => {
495 let event = RouteEvent {
496 peer: target_peer.clone(),
497 contract_location,
498 outcome: RouteOutcome::Success {
499 time_to_response_start: first_response_time,
500 payload_size,
501 payload_transfer_time,
502 },
503 };
504 event_listener
505 .register_events(Either::Left(NetEventLog::route_event(
506 op_res.id(),
507 &op_manager.ring,
508 &event,
509 )))
510 .await;
511 op_manager.ring.routing_finished(event);
512 }
513 OpOutcome::Incomplete | OpOutcome::Irrelevant => {}
525 }
526 if let Some(mut cb) = executor_callback {
527 cb.response(op_res).await;
528 }
529 }
530 Ok(None) => {
531 tracing::debug!(?tx, "No operation result found");
532 }
533 Err(err) => {
534 if let Some(tx) = tx {
536 op_manager.completed(tx);
537 }
538 #[cfg(any(debug_assertions, test))]
539 {
540 use std::io::Write;
541 #[cfg(debug_assertions)]
542 let OpError::InvalidStateTransition { tx, state, trace } = err
543 else {
544 tracing::error!("Finished transaction with error: {err}");
545 return;
546 };
547 #[cfg(not(debug_assertions))]
548 let OpError::InvalidStateTransition { tx } = err
549 else {
550 tracing::error!("Finished transaction with error: {err}");
551 return;
552 };
553 #[cfg(debug_assertions)]
555 let trace = format!("{trace}");
556 #[cfg(debug_assertions)]
557 {
558 let mut tr_lines = trace.lines();
559 let trace = tr_lines
560 .nth(2)
561 .map(|second_trace| {
562 let second_trace_lines =
563 [second_trace, tr_lines.next().unwrap_or_default()];
564 second_trace_lines.join("\n")
565 })
566 .unwrap_or_default();
567 let peer = &op_manager
568 .ring
569 .connection_manager
570 .get_peer_key()
571 .expect("Peer key not found");
572 let log = format!(
573 "Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
574 );
575 std::io::stderr().write_all(log.as_bytes()).unwrap();
576 }
577 #[cfg(not(debug_assertions))]
578 {
579 let peer = &op_manager
580 .ring
581 .connection_manager
582 .get_peer_key()
583 .expect("Peer key not found");
584 let log = format!("Transaction ({tx} @ {peer}) error\n");
585 std::io::stderr().write_all(log.as_bytes()).unwrap();
586 }
587 }
588 #[cfg(not(any(debug_assertions, test)))]
589 {
590 tracing::debug!("Finished transaction with error: {err}");
591 }
592 }
593 }
594}
595
596macro_rules! handle_op_not_available {
597 ($op_result:ident) => {
598 if let Err(OpError::OpNotAvailable(state)) = &$op_result {
599 match state {
600 OpNotAvailable::Running => {
601 tracing::debug!("Operation still running");
602 tokio::time::sleep(Duration::from_micros(1_000)).await;
604 continue;
605 }
606 OpNotAvailable::Completed => {
607 tracing::debug!("Operation already completed");
608 return;
609 }
610 }
611 }
612 };
613}
614
615#[allow(clippy::too_many_arguments)]
616async fn process_message<CB>(
617 msg: NetMessage,
618 op_manager: Arc<OpManager>,
619 conn_manager: CB,
620 event_listener: Box<dyn NetEventRegister>,
621 executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
622 client_req_handler_callback: Option<ClientResponsesSender>,
623 client_ids: Option<Vec<ClientId>>,
624 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
625) where
626 CB: NetworkBridge,
627{
628 let tx = Some(*msg.id());
629 match msg {
630 NetMessage::V1(msg_v1) => {
631 process_message_v1(
632 tx,
633 msg_v1,
634 op_manager,
635 conn_manager,
636 event_listener,
637 executor_callback,
638 client_req_handler_callback,
639 client_ids,
640 pending_op_result,
641 )
642 .await
643 }
644 }
645}
646
647#[allow(clippy::too_many_arguments)]
649pub(crate) async fn process_message_decoupled<CB>(
650 msg: NetMessage,
651 op_manager: Arc<OpManager>,
652 conn_manager: CB,
653 event_listener: Box<dyn NetEventRegister>,
654 executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
655 message_processor: std::sync::Arc<MessageProcessor>,
656 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
657) where
658 CB: NetworkBridge,
659{
660 let tx = *msg.id();
661
662 let op_result = handle_pure_network_message(
664 msg,
665 op_manager.clone(),
666 conn_manager,
667 event_listener,
668 executor_callback,
669 pending_op_result,
670 )
671 .await;
672
673 if let Err(e) = message_processor.handle_network_result(tx, op_result).await {
675 tracing::error!(
676 "Failed to handle network result for transaction {}: {}",
677 tx,
678 e
679 );
680 }
681}
682
683#[allow(clippy::too_many_arguments)]
685async fn handle_pure_network_message<CB>(
686 msg: NetMessage,
687 op_manager: Arc<OpManager>,
688 conn_manager: CB,
689 event_listener: Box<dyn NetEventRegister>,
690 executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
691 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
692) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
693where
694 CB: NetworkBridge,
695{
696 match msg {
697 NetMessage::V1(msg_v1) => {
698 handle_pure_network_message_v1(
699 msg_v1,
700 op_manager,
701 conn_manager,
702 event_listener,
703 executor_callback,
704 pending_op_result,
705 )
706 .await
707 }
708 }
709}
710
711#[allow(clippy::too_many_arguments)]
712async fn process_message_v1<CB>(
713 tx: Option<Transaction>,
714 msg: NetMessageV1,
715 op_manager: Arc<OpManager>,
716 mut conn_manager: CB,
717 mut event_listener: Box<dyn NetEventRegister>,
718 executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
719 client_req_handler_callback: Option<ClientResponsesSender>,
720 client_id: Option<Vec<ClientId>>,
721 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
722) where
723 CB: NetworkBridge,
724{
725 let cli_req = client_id.zip(client_req_handler_callback);
726 event_listener
727 .register_events(NetEventLog::from_inbound_msg_v1(&msg, &op_manager))
728 .await;
729
730 const MAX_RETRIES: usize = 10usize;
731 for i in 0..MAX_RETRIES {
732 tracing::debug!(?tx, "Processing operation, iteration: {i}");
733 match msg {
734 NetMessageV1::Connect(ref op) => {
735 let parent_span = tracing::Span::current();
736 let span = tracing::info_span!(
737 parent: parent_span,
738 "handle_connect_op_request",
739 transaction = %msg.id(),
740 tx_type = %msg.id().transaction_type()
741 );
742 let op_result =
743 handle_op_request::<connect::ConnectOp, _>(&op_manager, &mut conn_manager, op)
744 .instrument(span)
745 .await;
746
747 handle_op_not_available!(op_result);
748 return report_result(
749 tx,
750 op_result,
751 &op_manager,
752 executor_callback,
753 cli_req,
754 &mut *event_listener,
755 )
756 .await;
757 }
758 NetMessageV1::Put(ref op) => {
759 let op_result =
760 handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await;
761
762 if is_operation_completed(&op_result) {
763 if let Some(ref op_execution_callback) = pending_op_result {
764 let tx_id = *op.id();
765 let _ = op_execution_callback
766 .send(NetMessage::V1(NetMessageV1::Put((*op).clone())))
767 .await
768 .inspect_err(
769 |err| tracing::error!(%err, %tx_id, "Failed to send message to client"),
770 );
771 }
772 }
773
774 handle_op_not_available!(op_result);
775 return report_result(
776 tx,
777 op_result,
778 &op_manager,
779 executor_callback,
780 cli_req,
781 &mut *event_listener,
782 )
783 .await;
784 }
785 NetMessageV1::Get(ref op) => {
786 let op_result =
787 handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await;
788 if is_operation_completed(&op_result) {
789 if let Some(ref op_execution_callback) = pending_op_result {
790 let tx_id = *op.id();
791 let _ = op_execution_callback
792 .send(NetMessage::V1(NetMessageV1::Get((*op).clone()))).await.inspect_err(|err|
793 tracing::error!(%err, %tx_id, "Failed to send message to client")
794 );
795 }
796 }
797 handle_op_not_available!(op_result);
798 return report_result(
799 tx,
800 op_result,
801 &op_manager,
802 executor_callback,
803 cli_req,
804 &mut *event_listener,
805 )
806 .await;
807 }
808 NetMessageV1::Subscribe(ref op) => {
809 let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
810 &op_manager,
811 &mut conn_manager,
812 op,
813 )
814 .await;
815 if is_operation_completed(&op_result) {
816 if let Some(ref op_execution_callback) = pending_op_result {
817 let tx_id = *op.id();
818 let _ = op_execution_callback
819 .send(NetMessage::V1(NetMessageV1::Subscribe((*op).clone()))).await.inspect_err(|err|
820 tracing::error!(%err, %tx_id, "Failed to send message to client")
821 );
822 }
823 }
824 handle_op_not_available!(op_result);
825 return report_result(
826 tx,
827 op_result,
828 &op_manager,
829 executor_callback,
830 cli_req,
831 &mut *event_listener,
832 )
833 .await;
834 }
835 NetMessageV1::Update(ref op) => {
836 let op_result =
837 handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
838 .await;
839 if is_operation_completed(&op_result) {
840 if let Some(ref op_execution_callback) = pending_op_result {
841 let tx_id = *op.id();
842 let _ = op_execution_callback
843 .send(NetMessage::V1(NetMessageV1::Update((*op).clone()))).await.inspect_err(|err|
844 tracing::error!(%err, %tx_id, "Failed to send message to client")
845 );
846 }
847 }
848 handle_op_not_available!(op_result);
849 return report_result(
850 tx,
851 op_result,
852 &op_manager,
853 executor_callback,
854 cli_req,
855 &mut *event_listener,
856 )
857 .await;
858 }
859 NetMessageV1::Unsubscribed { ref key, .. } => {
860 if let Err(error) = subscribe(op_manager, *key, None).await {
861 tracing::error!(%error, "Failed to subscribe to contract");
862 }
863 break;
864 }
865 _ => break, }
867 }
868}
869
870#[allow(clippy::too_many_arguments)]
872async fn handle_pure_network_message_v1<CB>(
873 msg: NetMessageV1,
874 op_manager: Arc<OpManager>,
875 mut conn_manager: CB,
876 mut event_listener: Box<dyn NetEventRegister>,
877 executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
878 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
879) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
880where
881 CB: NetworkBridge,
882{
883 event_listener
885 .register_events(NetEventLog::from_inbound_msg_v1(&msg, &op_manager))
886 .await;
887
888 const MAX_RETRIES: usize = 10usize;
889 for i in 0..MAX_RETRIES {
890 let tx = Some(*msg.id());
891 tracing::debug!(?tx, "Processing pure network operation, iteration: {i}");
892
893 match msg {
894 NetMessageV1::Connect(ref op) => {
895 let parent_span = tracing::Span::current();
896 let span = tracing::info_span!(
897 parent: parent_span,
898 "handle_connect_op_request",
899 transaction = %msg.id(),
900 tx_type = %msg.id().transaction_type()
901 );
902 let op_result =
903 handle_op_request::<connect::ConnectOp, _>(&op_manager, &mut conn_manager, op)
904 .instrument(span)
905 .await;
906
907 if let Err(OpError::OpNotAvailable(state)) = &op_result {
908 match state {
909 OpNotAvailable::Running => {
910 tracing::debug!("Pure network: Operation still running");
911 tokio::time::sleep(Duration::from_micros(1_000)).await;
912 continue;
913 }
914 OpNotAvailable::Completed => {
915 tracing::debug!("Pure network: Operation already completed");
916 return Ok(None);
917 }
918 }
919 }
920
921 return handle_pure_network_result(
923 tx,
924 op_result,
925 &op_manager,
926 executor_callback,
927 &mut *event_listener,
928 )
929 .await;
930 }
931 NetMessageV1::Put(ref op) => {
932 let op_result =
933 handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await;
934
935 if is_operation_completed(&op_result) {
937 if let Some(ref op_execution_callback) = pending_op_result {
938 let tx_id = *op.id();
939 let _ = op_execution_callback
940 .send(NetMessage::V1(NetMessageV1::Put((*op).clone())))
941 .await
942 .inspect_err(|err| tracing::error!(%err, %tx_id, "Failed to send message to executor"));
943 }
944 }
945
946 if let Err(OpError::OpNotAvailable(state)) = &op_result {
947 match state {
948 OpNotAvailable::Running => {
949 tracing::debug!("Pure network: Operation still running");
950 tokio::time::sleep(Duration::from_micros(1_000)).await;
951 continue;
952 }
953 OpNotAvailable::Completed => {
954 tracing::debug!("Pure network: Operation already completed");
955 return Ok(None);
956 }
957 }
958 }
959
960 return handle_pure_network_result(
961 tx,
962 op_result,
963 &op_manager,
964 executor_callback,
965 &mut *event_listener,
966 )
967 .await;
968 }
969 NetMessageV1::Get(ref op) => {
970 let op_result =
971 handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await;
972
973 if is_operation_completed(&op_result) {
975 if let Some(ref op_execution_callback) = pending_op_result {
976 let tx_id = *op.id();
977 let _ = op_execution_callback
978 .send(NetMessage::V1(NetMessageV1::Get((*op).clone())))
979 .await
980 .inspect_err(|err| tracing::error!(%err, %tx_id, "Failed to send message to executor"));
981 }
982 }
983
984 if let Err(OpError::OpNotAvailable(state)) = &op_result {
985 match state {
986 OpNotAvailable::Running => {
987 tracing::debug!("Pure network: Operation still running");
988 tokio::time::sleep(Duration::from_micros(1_000)).await;
989 continue;
990 }
991 OpNotAvailable::Completed => {
992 tracing::debug!("Pure network: Operation already completed");
993 return Ok(None);
994 }
995 }
996 }
997
998 return handle_pure_network_result(
999 tx,
1000 op_result,
1001 &op_manager,
1002 executor_callback,
1003 &mut *event_listener,
1004 )
1005 .await;
1006 }
1007 NetMessageV1::Update(ref op) => {
1008 let op_result =
1009 handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
1010 .await;
1011
1012 if let Err(OpError::OpNotAvailable(state)) = &op_result {
1013 match state {
1014 OpNotAvailable::Running => {
1015 tracing::debug!("Pure network: Operation still running");
1016 tokio::time::sleep(Duration::from_micros(1_000)).await;
1017 continue;
1018 }
1019 OpNotAvailable::Completed => {
1020 tracing::debug!("Pure network: Operation already completed");
1021 return Ok(None);
1022 }
1023 }
1024 }
1025
1026 return handle_pure_network_result(
1027 tx,
1028 op_result,
1029 &op_manager,
1030 executor_callback,
1031 &mut *event_listener,
1032 )
1033 .await;
1034 }
1035 NetMessageV1::Subscribe(ref op) => {
1036 let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
1037 &op_manager,
1038 &mut conn_manager,
1039 op,
1040 )
1041 .await;
1042
1043 if let Err(OpError::OpNotAvailable(state)) = &op_result {
1044 match state {
1045 OpNotAvailable::Running => {
1046 tracing::debug!("Pure network: Operation still running");
1047 tokio::time::sleep(Duration::from_micros(1_000)).await;
1048 continue;
1049 }
1050 OpNotAvailable::Completed => {
1051 tracing::debug!("Pure network: Operation already completed");
1052 return Ok(None);
1053 }
1054 }
1055 }
1056
1057 return handle_pure_network_result(
1058 tx,
1059 op_result,
1060 &op_manager,
1061 executor_callback,
1062 &mut *event_listener,
1063 )
1064 .await;
1065 }
1066 NetMessageV1::Unsubscribed { ref key, .. } => {
1067 if let Err(error) = subscribe(op_manager, *key, None).await {
1068 tracing::error!(%error, "Failed to subscribe to contract");
1069 }
1070 break;
1071 }
1072 _ => break, }
1074 }
1075
1076 Ok(None)
1078}
1079
1080async fn handle_pure_network_result(
1082 tx: Option<Transaction>,
1083 op_result: Result<Option<crate::operations::OpEnum>, OpError>,
1084 _op_manager: &Arc<OpManager>,
1085 _executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
1086 _event_listener: &mut dyn NetEventRegister,
1087) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError> {
1088 tracing::debug!("Pure network result handling for transaction: {:?}", tx);
1089
1090 match &op_result {
1091 Ok(Some(_op_res)) => {
1092 tracing::debug!(
1094 "Network operation completed successfully for transaction: {:?}",
1095 tx
1096 );
1097
1098 if let Some(tx_id) = tx {
1100 tracing::debug!("Network operation completed for transaction: {}", tx_id);
1102 }
1103
1104 if let Some(_callback) = _executor_callback {
1107 tracing::debug!("Executor callback available for transaction {:?} but not implemented in pure network processing", tx);
1108 }
1109 }
1110 Ok(None) => {
1111 tracing::debug!("Network operation returned no result");
1112 }
1113 Err(e) => {
1114 tracing::error!("Network operation failed: {}", e);
1115 if let Some(tx_id) = tx {
1117 tracing::debug!(
1118 "Network operation failed for transaction: {} with error: {}",
1119 tx_id,
1120 e
1121 );
1122 }
1123 }
1124 }
1125
1126 op_result
1127}
1128
1129pub async fn subscribe(
1131 op_manager: Arc<OpManager>,
1132 key: ContractKey,
1133 client_id: Option<ClientId>,
1134) -> Result<Transaction, OpError> {
1135 subscribe_with_id(op_manager, key, client_id, None).await
1136}
1137
1138pub async fn subscribe_with_id(
1140 op_manager: Arc<OpManager>,
1141 key: ContractKey,
1142 client_id: Option<ClientId>,
1143 transaction_id: Option<Transaction>,
1144) -> Result<Transaction, OpError> {
1145 let op = match transaction_id {
1146 Some(id) => subscribe::start_op_with_id(key, id),
1147 None => subscribe::start_op(key),
1148 };
1149 let id = op.id;
1150 if let Some(client_id) = client_id {
1151 use crate::client_events::RequestId;
1152 let request_id = RequestId::new();
1154 let _ = op_manager
1155 .ch_outbound
1156 .waiting_for_transaction_result(
1157 WaitingTransaction::Subscription {
1158 contract_key: *key.id(),
1159 },
1160 client_id,
1161 request_id,
1162 )
1163 .await;
1164 }
1165 match subscribe::request_subscribe(&op_manager, op).await {
1167 Err(err) => {
1168 tracing::error!("{}", err);
1169 Err(err)
1170 }
1171 Ok(()) => Ok(id),
1172 }
1173}
1174
1175async fn handle_aborted_op(
1176 tx: Transaction,
1177 op_manager: &OpManager,
1178 gateways: &[PeerKeyLocation],
1179) -> Result<(), OpError> {
1180 use crate::util::IterExt;
1181 if let TransactionType::Connect = tx.transaction_type() {
1182 match op_manager.pop(&tx) {
1186 Ok(Some(OpEnum::Connect(op)))
1188 if op.has_backoff()
1189 && op_manager.ring.open_connections()
1190 < op_manager.ring.connection_manager.min_connections =>
1191 {
1192 let ConnectOp {
1193 gateway, backoff, ..
1194 } = *op;
1195 if let Some(gateway) = gateway {
1196 tracing::warn!("Retry connecting to gateway {}", gateway.peer);
1197 connect::join_ring_request(backoff, &gateway, op_manager).await?;
1198 }
1199 }
1200 Ok(Some(OpEnum::Connect(_))) => {
1201 if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
1203 tracing::warn!("Retrying joining the ring with an other gateway");
1204 if let Some(gateway) = gateways.iter().shuffle().next() {
1205 connect::join_ring_request(None, gateway, op_manager).await?
1206 }
1207 }
1208 }
1209 _ => {}
1210 }
1211 }
1212 Ok(())
1213}
1214
1215#[derive(Serialize, Deserialize, Eq, Clone)]
1223pub struct PeerId {
1224 pub addr: SocketAddr,
1225 pub pub_key: TransportPublicKey,
1226}
1227
1228impl Hash for PeerId {
1229 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1230 self.addr.hash(state);
1231 }
1232}
1233
1234impl PartialEq<PeerId> for PeerId {
1235 fn eq(&self, other: &PeerId) -> bool {
1236 self.addr == other.addr
1237 }
1238}
1239
1240impl Ord for PeerId {
1241 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1242 self.addr.cmp(&other.addr)
1243 }
1244}
1245
1246impl PartialOrd for PeerId {
1247 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1248 Some(self.cmp(other))
1249 }
1250}
1251
1252impl PeerId {
1253 pub fn new(addr: SocketAddr, pub_key: TransportPublicKey) -> Self {
1254 Self { addr, pub_key }
1255 }
1256}
1257
1258thread_local! {
1259 static PEER_ID: std::cell::RefCell<Option<TransportPublicKey>> = const { std::cell::RefCell::new(None) };
1260}
1261
1262#[cfg(test)]
1263impl<'a> arbitrary::Arbitrary<'a> for PeerId {
1264 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1265 let addr: ([u8; 4], u16) = u.arbitrary()?;
1266
1267 let pub_key = PEER_ID.with(|peer_id| {
1268 let mut peer_id = peer_id.borrow_mut();
1269 match &*peer_id {
1270 Some(k) => k.clone(),
1271 None => {
1272 let key = TransportKeypair::new().public().clone();
1273 peer_id.replace(key.clone());
1274 key
1275 }
1276 }
1277 });
1278
1279 Ok(Self {
1280 addr: addr.into(),
1281 pub_key,
1282 })
1283 }
1284}
1285
1286impl PeerId {
1287 pub fn random() -> Self {
1288 use rand::Rng;
1289 let mut addr = [0; 4];
1290 rand::rng().fill(&mut addr[..]);
1291 let port = crate::util::get_free_port().unwrap();
1292
1293 let pub_key = PEER_ID.with(|peer_id| {
1294 let mut peer_id = peer_id.borrow_mut();
1295 match &*peer_id {
1296 Some(k) => k.clone(),
1297 None => {
1298 let key = TransportKeypair::new().public().clone();
1299 peer_id.replace(key.clone());
1300 key
1301 }
1302 }
1303 });
1304
1305 Self {
1306 addr: (addr, port).into(),
1307 pub_key,
1308 }
1309 }
1310
1311 #[cfg(test)]
1312 pub fn to_bytes(self) -> Vec<u8> {
1313 bincode::serialize(&self).unwrap()
1314 }
1315}
1316
1317impl std::fmt::Debug for PeerId {
1318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1319 <Self as Display>::fmt(self, f)
1320 }
1321}
1322
1323impl Display for PeerId {
1324 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1325 write!(f, "{:?}", self.pub_key)
1326 }
1327}
1328
1329pub async fn run_local_node(
1330 mut executor: Executor,
1331 socket: WebsocketApiConfig,
1332) -> anyhow::Result<()> {
1333 match socket.address {
1334 IpAddr::V4(ip) if !ip.is_loopback() => {
1335 anyhow::bail!("invalid ip: {ip}, expecting localhost")
1336 }
1337 IpAddr::V6(ip) if !ip.is_loopback() => {
1338 anyhow::bail!("invalid ip: {ip}, expecting localhost")
1339 }
1340 _ => {}
1341 }
1342
1343 let (mut gw, mut ws_proxy) = crate::server::serve_gateway_in(socket).await;
1344
1345 enum Receiver {
1349 Ws,
1350 Gw,
1351 }
1352 let mut receiver;
1353 loop {
1354 let req = tokio::select! {
1355 req = ws_proxy.recv() => {
1356 receiver = Receiver::Ws;
1357 req?
1358 }
1359 req = gw.recv() => {
1360 receiver = Receiver::Gw;
1361 req?
1362 }
1363 };
1364 let OpenRequest {
1365 client_id: id,
1366 request,
1367 notification_channel,
1368 token,
1369 ..
1370 } = req;
1371 tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
1372
1373 let res = match *request {
1374 ClientRequest::ContractOp(op) => {
1375 executor
1376 .contract_requests(op, id, notification_channel)
1377 .await
1378 }
1379 ClientRequest::DelegateOp(op) => {
1380 let attested_contract = token.and_then(|token| {
1381 gw.attested_contracts
1382 .read()
1383 .ok()
1384 .and_then(|guard| guard.get(&token).map(|(t, _)| *t))
1385 });
1386 let op_name = match op {
1387 DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
1388 DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
1389 DelegateRequest::GetSecretRequest { .. } => "GetSecretRequest",
1390 DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
1391 _ => "Unknown",
1392 };
1393 tracing::debug!(
1394 op_name = ?op_name,
1395 ?attested_contract,
1396 "Handling ClientRequest::DelegateOp"
1397 );
1398 executor.delegate_request(op, attested_contract.as_ref())
1399 }
1400 ClientRequest::Disconnect { cause } => {
1401 if let Some(cause) = cause {
1402 tracing::info!("disconnecting cause: {cause}");
1403 }
1404 continue;
1416 }
1417 _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
1418 };
1419
1420 match res {
1421 Ok(res) => {
1422 match receiver {
1423 Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
1424 Receiver::Gw => gw.send(id, Ok(res)).await?,
1425 };
1426 }
1427 Err(err) if err.is_request() => {
1428 let err = ErrorKind::RequestError(err.unwrap_request());
1429 match receiver {
1430 Receiver::Ws => {
1431 ws_proxy.send(id, Err(err.into())).await?;
1432 }
1433 Receiver::Gw => {
1434 gw.send(id, Err(err.into())).await?;
1435 }
1436 };
1437 }
1438 Err(err) => {
1439 tracing::error!("{err}");
1440 let err = Err(ErrorKind::Unhandled {
1441 cause: format!("{err}").into(),
1442 }
1443 .into());
1444 match receiver {
1445 Receiver::Ws => {
1446 ws_proxy.send(id, err).await?;
1447 }
1448 Receiver::Gw => {
1449 gw.send(id, err).await?;
1450 }
1451 };
1452 }
1453 }
1454 }
1455}
1456
1457pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
1458 tracing::info!("Starting node");
1459
1460 let is_gateway = node.0.is_gateway;
1461 let location = if let Some(loc) = node.0.location {
1462 Some(loc)
1463 } else {
1464 is_gateway
1465 .then(|| {
1466 node.0
1467 .peer_id
1468 .clone()
1469 .map(|id| Location::from_address(&id.addr))
1470 })
1471 .flatten()
1472 };
1473
1474 if let Some(location) = location {
1475 tracing::info!("Setting initial location: {location}");
1476 node.update_location(location);
1477 }
1478
1479 match node.run().await {
1480 Ok(_) => {
1481 if is_gateway {
1482 tracing::info!("Gateway finished");
1483 } else {
1484 tracing::info!("Node finished");
1485 }
1486
1487 Ok(())
1488 }
1489 Err(e) => {
1490 tracing::error!("{e}");
1491 Err(e)
1492 }
1493 }
1494}
1495
1496pub trait IsOperationCompleted {
1498 fn is_completed(&self) -> bool;
1500}
1501
1502impl IsOperationCompleted for OpEnum {
1503 fn is_completed(&self) -> bool {
1504 match self {
1505 OpEnum::Connect(op) => op.is_completed(),
1506 OpEnum::Put(op) => op.is_completed(),
1507 OpEnum::Get(op) => op.is_completed(),
1508 OpEnum::Subscribe(op) => op.is_completed(),
1509 OpEnum::Update(op) => op.is_completed(),
1510 }
1511 }
1512}
1513
1514pub fn is_operation_completed(op_result: &Result<Option<OpEnum>, OpError>) -> bool {
1516 match op_result {
1517 Ok(Some(op)) => op.is_completed(),
1519 _ => false,
1520 }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525 use std::net::{Ipv4Addr, Ipv6Addr};
1526
1527 use super::*;
1528
1529 #[tokio::test]
1530 async fn test_hostname_resolution() {
1531 let addr = Address::Hostname("localhost".to_string());
1532 let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
1533 assert!(
1534 socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1535 || socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
1536 );
1537 assert!(socket_addr.port() > 1024); let addr = Address::Hostname("google.com".to_string());
1541 let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
1542 assert!(socket_addr.port() > 1024); let addr = Address::Hostname("google.com:8080".to_string());
1546 let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
1547 assert_eq!(socket_addr.port(), 8080);
1548 }
1549}