1use anyhow::Context;
19use either::Either;
20use freenet_stdlib::{
21 client_api::{ClientRequest, ErrorKind},
22 prelude::ContractInstanceId,
23};
24use std::{
25 borrow::Cow,
26 fmt::Display,
27 fs::File,
28 hash::Hash,
29 io::Read,
30 net::{IpAddr, SocketAddr, ToSocketAddrs},
31 sync::Arc,
32 time::Duration,
33};
34use std::{collections::HashSet, convert::Infallible};
35
36use self::p2p_impl::NodeP2P;
37use crate::{
38 client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
39 config::{Address, GatewayConfig, GlobalRng, WebsocketApiConfig},
40 contract::{Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler},
41 local_node::Executor,
42 message::{InnerMessage, NetMessage, NodeEvent, Transaction, TransactionType},
43 operations::{
44 connect::{self, ConnectOp},
45 get, put, subscribe, update, OpEnum, OpError, OpOutcome,
46 },
47 ring::{Location, PeerKeyLocation},
48 router::{RouteEvent, RouteOutcome},
49 tracing::{EventRegister, NetEventLog, NetEventRegister},
50};
51use crate::{
52 config::Config,
53 message::{MessageStats, NetMessageV1},
54};
55use freenet_stdlib::client_api::DelegateRequest;
56use serde::{Deserialize, Serialize};
57use tracing::Instrument;
58
59use crate::operations::handle_op_request;
60pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};
61pub use network_bridge::{reset_channel_id_counter, EventLoopExitReason, NetworkStats};
63
64use crate::topology::rate::Rate;
65use crate::transport::{TransportKeypair, TransportPublicKey};
66pub(crate) use op_state_manager::{OpManager, OpNotAvailable};
67
68mod network_bridge;
69
70pub use network_bridge::in_memory::{get_fault_injector, set_fault_injector, FaultInjectorState};
74pub(crate) mod background_task_monitor;
75pub(crate) mod network_status;
76mod op_state_manager;
77mod p2p_impl;
78pub(crate) mod proximity_cache;
79mod request_router;
80pub(crate) mod testing_impl;
81
82pub use request_router::{DeduplicatedRequest, RequestRouter};
83
84#[derive(Clone)]
86pub struct ShutdownHandle {
87 tx: tokio::sync::mpsc::Sender<NodeEvent>,
88}
89
90impl ShutdownHandle {
91 pub async fn shutdown(&self) {
98 if let Err(err) = self
99 .tx
100 .send(NodeEvent::Disconnect {
101 cause: Some("graceful shutdown".into()),
102 })
103 .await
104 {
105 tracing::debug!(
106 error = %err,
107 "failed to send graceful shutdown signal; shutdown channel may already be closed"
108 );
109 }
110 }
111}
112
113pub struct Node {
114 inner: NodeP2P,
115 shutdown_handle: ShutdownHandle,
116}
117
118impl Node {
119 pub fn update_location(&mut self, location: Location) {
120 self.inner
121 .op_manager
122 .ring
123 .connection_manager
124 .update_location(Some(location));
125 }
126
127 pub fn shutdown_handle(&self) -> ShutdownHandle {
129 self.shutdown_handle.clone()
130 }
131
132 pub async fn run(self) -> anyhow::Result<Infallible> {
133 self.inner.run_node().await
134 }
135}
136
137#[derive(Serialize, Deserialize, Clone, Debug)]
148#[non_exhaustive] pub struct NodeConfig {
150 pub should_connect: bool,
153 pub is_gateway: bool,
154 pub key_pair: TransportKeypair,
156 pub network_listener_ip: IpAddr,
159 pub network_listener_port: u16,
161 pub(crate) own_addr: Option<SocketAddr>,
163 pub(crate) config: Arc<Config>,
164 pub(crate) gateways: Vec<InitPeerNode>,
167 pub(crate) location: Option<Location>,
169 pub(crate) max_hops_to_live: Option<usize>,
170 pub(crate) rnd_if_htl_above: Option<usize>,
171 pub(crate) max_number_conn: Option<usize>,
172 pub(crate) min_number_conn: Option<usize>,
173 pub(crate) max_upstream_bandwidth: Option<Rate>,
174 pub(crate) max_downstream_bandwidth: Option<Rate>,
175 pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
176 pub(crate) transient_budget: usize,
177 pub(crate) transient_ttl: Duration,
178 #[serde(default)]
182 pub(crate) relay_ready_connections: Option<usize>,
183}
184
185impl NodeConfig {
186 pub async fn new(config: Config) -> anyhow::Result<NodeConfig> {
187 tracing::info!("Loading node configuration for mode {}", config.mode);
188
189 let own_pub_key = config.transport_keypair().public();
191
192 let mut gateways = Vec::with_capacity(config.gateways.len());
193 for gw in &config.gateways {
194 let GatewayConfig {
195 address,
196 public_key_path,
197 location,
198 } = gw;
199
200 let mut key_bytes = None;
203 for attempt in 0..10 {
204 let mut key_file = File::open(public_key_path).with_context(|| {
205 format!("failed loading gateway pubkey from {public_key_path:?}")
206 })?;
207 let mut buf = String::new();
208 key_file.read_to_string(&mut buf)?;
209 let buf = buf.trim();
210
211 if buf.starts_with("-----BEGIN") {
213 if attempt < 9 {
214 tracing::debug!(
215 public_key_path = ?public_key_path,
216 attempt = attempt + 1,
217 "Gateway public key is still RSA PEM format, waiting for X25519 conversion..."
218 );
219 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
220 continue;
221 } else {
222 tracing::warn!(
223 public_key_path = ?public_key_path,
224 "Gateway public key still in RSA PEM format after 5s. Skipping this gateway."
225 );
226 break;
227 }
228 }
229
230 match hex::decode(buf) {
231 Ok(bytes) if bytes.len() == 32 => {
232 key_bytes = Some(bytes);
233 break;
234 }
235 Ok(bytes) => {
236 anyhow::bail!(
237 "invalid gateway pubkey length {} (expected 32) from {public_key_path:?}",
238 bytes.len()
239 );
240 }
241 Err(e) => {
242 anyhow::bail!(
243 "failed to decode gateway pubkey hex from {public_key_path:?}: {e}"
244 );
245 }
246 }
247 }
248
249 let key_bytes = match key_bytes {
250 Some(bytes) => bytes,
251 None => continue, };
253 let mut key_arr = [0u8; 32];
254 key_arr.copy_from_slice(&key_bytes);
255 let transport_pub_key = TransportPublicKey::from_bytes(key_arr);
256
257 if &transport_pub_key == own_pub_key {
259 tracing::warn!(
260 "Skipping gateway with same public key as self: {:?}",
261 public_key_path
262 );
263 continue;
264 }
265
266 let address = Self::parse_socket_addr(address).await?;
267 let peer_key_location = PeerKeyLocation::new(transport_pub_key, address);
268 let location = location
269 .map(Location::new)
270 .unwrap_or_else(|| Location::from_address(&address));
271 gateways.push(InitPeerNode::new(peer_key_location, location));
272 }
273 tracing::info!(
274 "Node will be listening at {}:{} internal address",
275 config.network_api.address,
276 config.network_api.port
277 );
278 if let Some(own_addr) = &config.peer_id {
279 tracing::info!("Node external address: {}", own_addr.addr);
280 }
281 Ok(NodeConfig {
282 should_connect: true,
283 is_gateway: config.is_gateway,
284 key_pair: config.transport_keypair().clone(),
285 gateways,
286 own_addr: config.peer_id.clone().map(|p| p.addr),
287 network_listener_ip: config.network_api.address,
288 network_listener_port: config.network_api.port,
289 location: config.location.map(Location::new),
290 config: Arc::new(config.clone()),
291 max_hops_to_live: None,
292 rnd_if_htl_above: None,
293 max_number_conn: Some(config.network_api.max_connections),
294 min_number_conn: Some(config.network_api.min_connections),
295 max_upstream_bandwidth: None,
296 max_downstream_bandwidth: None,
297 blocked_addresses: config.network_api.blocked_addresses.clone(),
298 transient_budget: config.network_api.transient_budget,
299 transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
300 relay_ready_connections: if config.network_api.skip_load_from_network {
301 Some(0) } else {
303 Some(3) },
305 })
306 }
307
308 pub(crate) async fn parse_socket_addr(address: &Address) -> anyhow::Result<SocketAddr> {
309 let (hostname, port) = match address {
310 crate::config::Address::Hostname(hostname) => {
311 match hostname.rsplit_once(':') {
312 None => {
313 let hostname_with_port =
315 format!("{}:{}", hostname, crate::config::default_network_api_port());
316
317 if let Ok(mut addrs) = hostname_with_port.to_socket_addrs() {
318 if let Some(addr) = addrs.next() {
319 return Ok(addr);
320 }
321 }
322
323 (Cow::Borrowed(hostname.as_str()), None)
324 }
325 Some((host, port)) => match port.parse::<u16>() {
326 Ok(port) => {
327 if let Ok(mut addrs) = hostname.to_socket_addrs() {
328 if let Some(addr) = addrs.next() {
329 return Ok(addr);
330 }
331 }
332
333 (Cow::Borrowed(host), Some(port))
334 }
335 Err(_) => return Err(anyhow::anyhow!("Invalid port number: {port}")),
336 },
337 }
338 }
339 Address::HostAddress(addr) => return Ok(*addr),
340 };
341
342 let (conf, opts) = hickory_resolver::system_conf::read_system_conf()?;
343 let resolver = hickory_resolver::TokioAsyncResolver::new(
344 conf,
345 opts,
346 hickory_resolver::name_server::GenericConnector::new(
347 hickory_resolver::name_server::TokioRuntimeProvider::new(),
348 ),
349 );
350
351 let hostname = if hostname.ends_with('.') {
353 hostname
354 } else {
355 Cow::Owned(format!("{hostname}."))
356 };
357
358 let ips = resolver.lookup_ip(hostname.as_ref()).await?;
359 match ips.into_iter().next() {
360 Some(ip) => Ok(SocketAddr::new(
361 ip,
362 port.unwrap_or_else(crate::config::default_network_api_port),
363 )),
364 None => Err(anyhow::anyhow!("Fail to resolve IP address of {hostname}")),
365 }
366 }
367
368 pub fn config(&self) -> &Config {
369 &self.config
370 }
371
372 pub fn is_gateway(&mut self) -> &mut Self {
373 self.is_gateway = true;
374 self
375 }
376
377 pub fn first_gateway(&mut self) {
378 self.should_connect = false;
379 }
380
381 pub fn with_should_connect(&mut self, should_connect: bool) -> &mut Self {
382 self.should_connect = should_connect;
383 self
384 }
385
386 pub fn max_hops_to_live(&mut self, num_hops: usize) -> &mut Self {
387 self.max_hops_to_live = Some(num_hops);
388 self
389 }
390
391 pub fn rnd_if_htl_above(&mut self, num_hops: usize) -> &mut Self {
392 self.rnd_if_htl_above = Some(num_hops);
393 self
394 }
395
396 pub fn max_number_of_connections(&mut self, num: usize) -> &mut Self {
397 self.max_number_conn = Some(num);
398 self
399 }
400
401 pub fn min_number_of_connections(&mut self, num: usize) -> &mut Self {
402 self.min_number_conn = Some(num);
403 self
404 }
405
406 pub fn relay_ready_connections(&mut self, num: Option<usize>) -> &mut Self {
407 self.relay_ready_connections = num;
408 self
409 }
410
411 pub fn with_own_addr(&mut self, addr: SocketAddr) -> &mut Self {
412 self.own_addr = Some(addr);
413 self
414 }
415
416 pub fn with_location(&mut self, loc: Location) -> &mut Self {
417 self.location = Some(loc);
418 self
419 }
420
421 pub fn add_gateway(&mut self, peer: InitPeerNode) -> &mut Self {
423 self.gateways.push(peer);
424 self
425 }
426
427 pub async fn build<const CLIENTS: usize>(
429 self,
430 clients: [BoxedClient; CLIENTS],
431 ) -> anyhow::Result<Node> {
432 let (node, _flush_handle) = self.build_with_flush_handle(clients).await?;
433 Ok(node)
434 }
435
436 pub async fn build_with_flush_handle<const CLIENTS: usize>(
438 self,
439 clients: [BoxedClient; CLIENTS],
440 ) -> anyhow::Result<(Node, crate::tracing::EventFlushHandle)> {
441 let (event_register, flush_handle) = {
442 use super::tracing::{DynamicRegister, TelemetryReporter};
443
444 let event_reg = EventRegister::new(self.config.event_log());
445 let flush_handle = event_reg.flush_handle();
446
447 let mut registers: Vec<Box<dyn NetEventRegister>> = vec![Box::new(event_reg)];
448
449 #[cfg(feature = "trace-ot")]
451 {
452 use super::tracing::OTEventRegister;
453 registers.push(Box::new(OTEventRegister::new()));
454 }
455
456 if let Some(telemetry) = TelemetryReporter::new(&self.config.telemetry) {
458 registers.push(Box::new(telemetry));
459 }
460
461 (DynamicRegister::new(registers), flush_handle)
462 };
463 let cfg = self.config.clone();
464 let (node_inner, shutdown_tx) = NodeP2P::build::<NetworkContractHandler, CLIENTS, _>(
465 self,
466 clients,
467 event_register,
468 cfg,
469 )
470 .await?;
471 let shutdown_handle = ShutdownHandle { tx: shutdown_tx };
472 Ok((
473 Node {
474 inner: node_inner,
475 shutdown_handle,
476 },
477 flush_handle,
478 ))
479 }
480
481 pub fn get_own_addr(&self) -> Option<SocketAddr> {
482 self.own_addr
483 }
484
485 fn get_gateways(&self) -> anyhow::Result<Vec<PeerKeyLocation>> {
488 let gateways: Vec<PeerKeyLocation> = self
489 .gateways
490 .iter()
491 .map(|node| node.peer_key_location.clone())
492 .collect();
493
494 if !self.is_gateway && gateways.is_empty() {
495 anyhow::bail!(
496 "At least one remote gateway is required to join an existing network for non-gateway nodes."
497 )
498 } else {
499 Ok(gateways)
500 }
501 }
502}
503
504#[derive(Clone, Serialize, Deserialize, Debug)]
506pub struct InitPeerNode {
507 peer_key_location: PeerKeyLocation,
508 location: Location,
509}
510
511impl InitPeerNode {
512 pub fn new(peer_key_location: PeerKeyLocation, location: Location) -> Self {
513 Self {
514 peer_key_location,
515 location,
516 }
517 }
518}
519
520async fn report_result(
521 tx: Option<Transaction>,
522 op_result: Result<Option<OpEnum>, OpError>,
523 op_manager: &OpManager,
524 executor_callback: Option<ExecutorToEventLoopChannel<Callback>>,
525 event_listener: &mut dyn NetEventRegister,
526) {
527 if let Some(tx_id) = tx {
529 if matches!(tx_id.transaction_type(), TransactionType::Update) {
530 tracing::debug!("report_result called for UPDATE transaction {}", tx_id);
531 }
532 }
533
534 match op_result {
535 Ok(Some(op_res)) => {
536 if let crate::operations::OpEnum::Update(ref update_op) = op_res {
538 tracing::debug!(
539 "UPDATE operation {} completed, finalized: {}",
540 update_op.id,
541 update_op.finalized()
542 );
543 }
544
545 if let Some(transaction) = tx {
547 if op_manager.is_sub_operation(transaction) {
550 tracing::debug!(
551 tx = %transaction,
552 "Skipping client notification for sub-operation"
553 );
554 } else if op_res.is_subscription_renewal() {
555 tracing::debug!(
560 tx = %transaction,
561 "Skipping client notification for subscription renewal"
562 );
563 } else {
564 let host_result = op_res.to_host_result();
565 op_manager
570 .send_client_result(transaction, host_result)
571 .await;
572 }
573 }
574
575 {
580 use crate::node::network_status;
581 let (op_type, success) =
582 classify_op_outcome(op_res.id().transaction_type(), op_res.outcome());
583 if let Some(op_type) = op_type {
584 network_status::record_op_result(op_type, success);
585 }
586 }
587
588 let route_event = match op_res.outcome() {
589 OpOutcome::ContractOpSuccess {
590 target_peer,
591 contract_location,
592 first_response_time,
593 payload_size,
594 payload_transfer_time,
595 } => Some(RouteEvent {
596 peer: target_peer.clone(),
597 contract_location,
598 outcome: RouteOutcome::Success {
599 time_to_response_start: first_response_time,
600 payload_size,
601 payload_transfer_time,
602 },
603 }),
604 OpOutcome::ContractOpSuccessUntimed {
605 target_peer,
606 contract_location,
607 } => Some(RouteEvent {
608 peer: target_peer.clone(),
609 contract_location,
610 outcome: RouteOutcome::SuccessUntimed,
611 }),
612 OpOutcome::ContractOpFailure {
613 target_peer,
614 contract_location,
615 } => Some(RouteEvent {
616 peer: target_peer.clone(),
617 contract_location,
618 outcome: RouteOutcome::Failure,
619 }),
620 OpOutcome::Incomplete | OpOutcome::Irrelevant => None,
621 };
622 if let Some(event) = route_event {
623 if let Some(log_event) =
624 NetEventLog::route_event(op_res.id(), &op_manager.ring, &event)
625 {
626 event_listener
627 .register_events(Either::Left(log_event))
628 .await;
629 }
630 op_manager.ring.routing_finished(event);
631 }
632 if let Some(mut cb) = executor_callback {
633 cb.response(op_res).await;
634 }
635 }
636 Ok(None) => {
637 tracing::debug!(?tx, "No operation result found");
638 }
639 Err(err) => {
640 if let Some(tx) = tx {
642 if !op_manager.is_sub_operation(tx) {
646 let client_error = freenet_stdlib::client_api::ClientError::from(
647 freenet_stdlib::client_api::ErrorKind::OperationError {
648 cause: err.to_string().into(),
649 },
650 );
651 op_manager.send_client_result(tx, Err(client_error)).await;
652 }
653
654 op_manager.completed(tx);
655 }
656 #[cfg(any(debug_assertions, test))]
657 {
658 use std::io::Write;
659 #[cfg(debug_assertions)]
660 let OpError::InvalidStateTransition { tx, state, trace } = err
661 else {
662 tracing::error!("Finished transaction with error: {err}");
663 return;
664 };
665 #[cfg(not(debug_assertions))]
666 let OpError::InvalidStateTransition { tx } = err
667 else {
668 tracing::error!("Finished transaction with error: {err}");
669 return;
670 };
671 #[cfg(debug_assertions)]
673 let trace = format!("{trace}");
674 #[cfg(debug_assertions)]
675 {
676 let mut tr_lines = trace.lines();
677 let trace = tr_lines
678 .nth(2)
679 .map(|second_trace| {
680 let second_trace_lines =
681 [second_trace, tr_lines.next().unwrap_or_default()];
682 second_trace_lines.join("\n")
683 })
684 .unwrap_or_default();
685 let peer = op_manager.ring.connection_manager.own_location();
686 let log = format!(
687 "Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n"
688 );
689 std::io::stderr().write_all(log.as_bytes()).unwrap();
690 }
691 #[cfg(not(debug_assertions))]
692 {
693 let peer = op_manager.ring.connection_manager.own_location();
694 let log = format!("Transaction ({tx} @ {peer}) error\n");
695 std::io::stderr().write_all(log.as_bytes()).unwrap();
696 }
697 }
698 #[cfg(not(any(debug_assertions, test)))]
699 {
700 tracing::debug!("Finished transaction with error: {err}");
701 }
702 }
703 }
704}
705
706pub(crate) async fn process_message_decoupled<CB>(
709 msg: NetMessage,
710 source_addr: Option<std::net::SocketAddr>,
711 op_manager: Arc<OpManager>,
712 conn_manager: CB,
713 mut event_listener: Box<dyn NetEventRegister>,
714 executor_callback: Option<ExecutorToEventLoopChannel<crate::contract::Callback>>,
715 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
716) where
717 CB: NetworkBridge,
718{
719 let tx = *msg.id();
720
721 let op_result = handle_pure_network_message(
722 msg,
723 source_addr,
724 op_manager.clone(),
725 conn_manager,
726 event_listener.as_mut(),
727 pending_op_result,
728 )
729 .await;
730
731 report_result(
734 Some(tx),
735 op_result,
736 &op_manager,
737 executor_callback,
738 &mut *event_listener,
739 )
740 .await;
741}
742
743#[allow(clippy::too_many_arguments)]
745async fn handle_pure_network_message<CB>(
746 msg: NetMessage,
747 source_addr: Option<std::net::SocketAddr>,
748 op_manager: Arc<OpManager>,
749 conn_manager: CB,
750 event_listener: &mut dyn NetEventRegister,
751 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
752) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
753where
754 CB: NetworkBridge,
755{
756 match msg {
757 NetMessage::V1(msg_v1) => {
758 handle_pure_network_message_v1(
759 msg_v1,
760 source_addr,
761 op_manager,
762 conn_manager,
763 event_listener,
764 pending_op_result,
765 )
766 .await
767 }
768 }
769}
770
771fn op_retry_backoff(attempt: usize) -> Duration {
775 Duration::from_millis((5u64 << attempt.min(8)).min(1_000))
776}
777
778#[allow(clippy::too_many_arguments)]
780async fn handle_pure_network_message_v1<CB>(
781 msg: NetMessageV1,
782 source_addr: Option<std::net::SocketAddr>,
783 op_manager: Arc<OpManager>,
784 mut conn_manager: CB,
785 event_listener: &mut dyn NetEventRegister,
786 pending_op_result: Option<tokio::sync::mpsc::Sender<NetMessage>>,
787) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError>
788where
789 CB: NetworkBridge,
790{
791 event_listener
793 .register_events(NetEventLog::from_inbound_msg_v1(
794 &msg,
795 &op_manager,
796 source_addr,
797 ))
798 .await;
799
800 const MAX_RETRIES: usize = 15usize;
801 for i in 0..MAX_RETRIES {
802 let tx = Some(*msg.id());
803 tracing::debug!(?tx, "Processing pure network operation, iteration: {i}");
804
805 match msg {
806 NetMessageV1::Connect(ref op) => {
807 let parent_span = tracing::Span::current();
808 let span = tracing::info_span!(
809 parent: parent_span,
810 "handle_connect_op_request",
811 transaction = %msg.id(),
812 tx_type = %msg.id().transaction_type()
813 );
814 let op_result = handle_op_request::<ConnectOp, _>(
815 &op_manager,
816 &mut conn_manager,
817 op,
818 source_addr,
819 )
820 .instrument(span)
821 .await;
822
823 if let Err(OpError::OpNotAvailable(state)) = &op_result {
824 match state {
825 OpNotAvailable::Running => {
826 let delay = op_retry_backoff(i);
827 tracing::debug!(
828 delay_ms = delay.as_millis() as u64,
829 attempt = i,
830 "Pure network: Operation still running, backing off"
831 );
832 tokio::time::sleep(delay).await;
833 continue;
834 }
835 OpNotAvailable::Completed => {
836 tracing::debug!(
837 tx = %msg.id(),
838 tx_type = ?msg.id().transaction_type(),
839 "Pure network: Operation already completed"
840 );
841 return Ok(None);
842 }
843 }
844 }
845
846 return handle_pure_network_result(
847 tx,
848 op_result,
849 &op_manager,
850 &mut *event_listener,
851 )
852 .await;
853 }
854 NetMessageV1::Put(ref op) => {
855 tracing::debug!(
856 tx = %op.id(),
857 "handle_pure_network_message_v1: Processing PUT message"
858 );
859 let op_result = handle_op_request::<put::PutOp, _>(
860 &op_manager,
861 &mut conn_manager,
862 op,
863 source_addr,
864 )
865 .await;
866 tracing::debug!(
867 tx = %op.id(),
868 op_result_ok = op_result.is_ok(),
869 "handle_pure_network_message_v1: PUT handle_op_request completed"
870 );
871
872 if is_operation_completed(&op_result) {
874 if let Some(ref op_execution_callback) = pending_op_result {
875 let tx_id = *op.id();
876 if let Err(err) = op_execution_callback
877 .send(NetMessage::V1(NetMessageV1::Put((*op).clone())))
878 .await
879 {
880 tracing::error!(%err, %tx_id, "Failed to send message to executor");
881 }
882 }
883 }
884
885 if let Err(OpError::OpNotAvailable(state)) = &op_result {
886 match state {
887 OpNotAvailable::Running => {
888 let delay = op_retry_backoff(i);
889 tracing::debug!(
890 delay_ms = delay.as_millis() as u64,
891 attempt = i,
892 "Pure network: Operation still running, backing off"
893 );
894 tokio::time::sleep(delay).await;
895 continue;
896 }
897 OpNotAvailable::Completed => {
898 tracing::debug!("Pure network: Operation already completed");
899 return Ok(None);
900 }
901 }
902 }
903
904 return handle_pure_network_result(
905 tx,
906 op_result,
907 &op_manager,
908 &mut *event_listener,
909 )
910 .await;
911 }
912 NetMessageV1::Get(ref op) => {
913 let op_result = handle_op_request::<get::GetOp, _>(
914 &op_manager,
915 &mut conn_manager,
916 op,
917 source_addr,
918 )
919 .await;
920
921 if is_operation_completed(&op_result) {
923 if let Some(ref op_execution_callback) = pending_op_result {
924 let tx_id = *op.id();
925 if let Err(err) = op_execution_callback
926 .send(NetMessage::V1(NetMessageV1::Get((*op).clone())))
927 .await
928 {
929 tracing::error!(%err, %tx_id, "Failed to send message to executor");
930 }
931 }
932 }
933
934 if let Err(OpError::OpNotAvailable(state)) = &op_result {
935 match state {
936 OpNotAvailable::Running => {
937 let delay = op_retry_backoff(i);
938 tracing::debug!(
939 delay_ms = delay.as_millis() as u64,
940 attempt = i,
941 "Pure network: Operation still running, backing off"
942 );
943 tokio::time::sleep(delay).await;
944 continue;
945 }
946 OpNotAvailable::Completed => {
947 tracing::debug!("Pure network: Operation already completed");
948 return Ok(None);
949 }
950 }
951 }
952
953 return handle_pure_network_result(
954 tx,
955 op_result,
956 &op_manager,
957 &mut *event_listener,
958 )
959 .await;
960 }
961 NetMessageV1::Update(ref op) => {
962 let op_result = handle_op_request::<update::UpdateOp, _>(
963 &op_manager,
964 &mut conn_manager,
965 op,
966 source_addr,
967 )
968 .await;
969
970 if let Err(OpError::OpNotAvailable(state)) = &op_result {
971 match state {
972 OpNotAvailable::Running => {
973 let delay = op_retry_backoff(i);
974 tracing::debug!(
975 delay_ms = delay.as_millis() as u64,
976 attempt = i,
977 "Pure network: Operation still running, backing off"
978 );
979 tokio::time::sleep(delay).await;
980 continue;
981 }
982 OpNotAvailable::Completed => {
983 tracing::debug!("Pure network: Operation already completed");
984 return Ok(None);
985 }
986 }
987 }
988
989 return handle_pure_network_result(
990 tx,
991 op_result,
992 &op_manager,
993 &mut *event_listener,
994 )
995 .await;
996 }
997 NetMessageV1::Subscribe(ref op) => {
998 let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
999 &op_manager,
1000 &mut conn_manager,
1001 op,
1002 source_addr,
1003 )
1004 .await;
1005
1006 if let Err(OpError::OpNotAvailable(state)) = &op_result {
1007 match state {
1008 OpNotAvailable::Running => {
1009 let delay = op_retry_backoff(i);
1010 tracing::debug!(
1011 delay_ms = delay.as_millis() as u64,
1012 attempt = i,
1013 "Pure network: Operation still running, backing off"
1014 );
1015 tokio::time::sleep(delay).await;
1016 continue;
1017 }
1018 OpNotAvailable::Completed => {
1019 tracing::debug!("Pure network: Operation already completed");
1020 return Ok(None);
1021 }
1022 }
1023 }
1024
1025 return handle_pure_network_result(
1026 tx,
1027 op_result,
1028 &op_manager,
1029 &mut *event_listener,
1030 )
1031 .await;
1032 }
1033 NetMessageV1::ProximityCache { ref message } => {
1037 let Some(source) = source_addr else {
1038 tracing::warn!(
1039 "Received ProximityCache message without source address (pure network)"
1040 );
1041 return Ok(None);
1042 };
1043 tracing::debug!(
1044 from = %source,
1045 "Processing ProximityCache message (pure network)"
1046 );
1047
1048 let source_pub_key = op_manager
1055 .ring
1056 .connection_manager
1057 .get_peer_by_addr(source)
1058 .map(|pkl| pkl.pub_key().clone());
1059 let Some(source_pub_key) = source_pub_key else {
1060 tracing::debug!(
1061 %source,
1062 "ProximityCache: could not resolve source addr to pub_key, skipping"
1063 );
1064 return Ok(None);
1065 };
1066 let result = op_manager
1067 .proximity_cache
1068 .handle_message(&source_pub_key, message.clone());
1069 if let Some(response) = result.response {
1070 let response_msg =
1072 NetMessage::V1(NetMessageV1::ProximityCache { message: response });
1073 if let Err(err) = conn_manager.send(source, response_msg).await {
1074 tracing::error!(%err, %source, "Failed to send ProximityCache response");
1075 }
1076 }
1077 for instance_id in result.overlapping_contracts {
1082 if let Some((key, state)) =
1083 get_contract_state_by_id(&op_manager, &instance_id).await
1084 {
1085 if !op_manager.ring.is_receiving_updates(&key)
1086 && !op_manager.ring.has_downstream_subscribers(&key)
1087 {
1088 continue;
1089 }
1090 tracing::debug!(
1091 contract = %key,
1092 peer = %source_pub_key,
1093 "Proximity cache overlap — broadcasting state to ensure neighbor is current"
1094 );
1095 if let Err(e) = op_manager
1096 .notify_node_event(NodeEvent::BroadcastStateChange {
1097 key,
1098 new_state: state,
1099 })
1100 .await
1101 {
1102 tracing::warn!(
1103 contract = %instance_id,
1104 error = %e,
1105 "Failed to emit BroadcastStateChange for proximity sync"
1106 );
1107 }
1108 }
1109 }
1110 return Ok(None);
1111 }
1112 NetMessageV1::InterestSync { ref message } => {
1113 let Some(source) = source_addr else {
1114 tracing::warn!("Received InterestSync message without source address");
1115 return Ok(None);
1116 };
1117 tracing::debug!(
1118 from = %source,
1119 "Processing InterestSync message"
1120 );
1121
1122 if let Some(response) =
1124 handle_interest_sync_message(&op_manager, source, message.clone()).await
1125 {
1126 let response_msg =
1127 NetMessage::V1(NetMessageV1::InterestSync { message: response });
1128 if let Err(err) = conn_manager.send(source, response_msg).await {
1129 tracing::error!(%err, %source, "Failed to send InterestSync response");
1130 }
1131 }
1132 return Ok(None);
1133 }
1134 NetMessageV1::ReadyState { ready } => {
1135 let Some(source) = source_addr else {
1136 tracing::warn!("Received ReadyState message without source address");
1137 return Ok(None);
1138 };
1139 if ready {
1140 op_manager.ring.connection_manager.mark_peer_ready(source);
1141 } else {
1142 op_manager
1143 .ring
1144 .connection_manager
1145 .mark_peer_not_ready(source);
1146 }
1147 tracing::debug!(
1148 from = %source,
1149 ready,
1150 "Processed ReadyState from peer"
1151 );
1152 return Ok(None);
1153 }
1154 NetMessageV1::Aborted(tx) => {
1155 tracing::debug!(
1156 %tx,
1157 tx_type = ?tx.transaction_type(),
1158 "Received Aborted message, delegating to handle_aborted_op"
1159 );
1160 if let Err(err) = handle_aborted_op(tx, &op_manager, &[]).await {
1164 if !matches!(err, OpError::StatePushed) {
1165 tracing::error!(
1166 %tx,
1167 error = %err,
1168 "Error handling aborted operation"
1169 );
1170 }
1171 }
1172 return Ok(None);
1173 }
1174 }
1175 }
1176
1177 tracing::warn!(
1179 tx = %msg.id(),
1180 tx_type = ?msg.id().transaction_type(),
1181 "Dropping message after {MAX_RETRIES} retry attempts (operation busy)"
1182 );
1183 Ok(None)
1184}
1185
1186async fn handle_pure_network_result(
1188 tx: Option<Transaction>,
1189 op_result: Result<Option<crate::operations::OpEnum>, OpError>,
1190 _op_manager: &Arc<OpManager>,
1191 _event_listener: &mut dyn NetEventRegister,
1192) -> Result<Option<crate::operations::OpEnum>, crate::node::OpError> {
1193 tracing::debug!("Pure network result handling for transaction: {:?}", tx);
1194
1195 match &op_result {
1196 Ok(Some(_op_res)) => {
1197 tracing::debug!(
1199 "Network operation completed successfully for transaction: {:?}",
1200 tx
1201 );
1202
1203 if let Some(tx_id) = tx {
1205 tracing::debug!("Network operation completed for transaction: {}", tx_id);
1207 }
1208
1209 }
1211 Ok(None) => {
1212 tracing::debug!("Network operation returned no result");
1213 }
1214 Err(OpError::StatePushed) => {
1215 return Ok(None);
1216 }
1217 Err(OpError::OpNotPresent(tx_id)) => {
1218 tracing::debug!(
1227 tx = %tx_id,
1228 "Network response arrived for non-existent operation (likely timed out or already completed)"
1229 );
1230 return Ok(None);
1231 }
1232 Err(e) => {
1233 tracing::error!("Network operation failed: {}", e);
1234 if let Some(tx_id) = tx {
1236 tracing::debug!(
1237 "Network operation failed for transaction: {} with error: {}",
1238 tx_id,
1239 e
1240 );
1241 }
1242 }
1243 }
1244
1245 op_result
1246}
1247
1248async fn handle_interest_sync_message(
1256 op_manager: &Arc<OpManager>,
1257 source: std::net::SocketAddr,
1258 message: crate::message::InterestMessage,
1259) -> Option<crate::message::InterestMessage> {
1260 use crate::message::{InterestMessage, NodeEvent, SummaryEntry};
1261 use crate::ring::interest::contract_hash;
1262
1263 match message {
1264 InterestMessage::Interests { hashes } => {
1265 tracing::debug!(
1266 from = %source,
1267 hash_count = hashes.len(),
1268 "Received Interests message"
1269 );
1270
1271 let peer_key = get_peer_key_from_addr(op_manager, source);
1272
1273 if let Some(ref pk) = peer_key {
1277 let incoming_hashes: std::collections::HashSet<u32> =
1278 hashes.iter().copied().collect();
1279 let current_contracts = op_manager.interest_manager.get_contracts_for_peer(pk);
1280
1281 let mut removed = 0usize;
1286 for contract in ¤t_contracts {
1287 let h = contract_hash(contract);
1288 if !incoming_hashes.contains(&h) {
1289 op_manager
1290 .interest_manager
1291 .remove_peer_interest(contract, pk);
1292 removed += 1;
1293 }
1294 }
1295 if removed > 0 {
1296 tracing::debug!(
1297 from = %source,
1298 removed,
1299 "Full-replace: removed stale interest entries"
1300 );
1301 }
1302 }
1303
1304 let matching = op_manager.interest_manager.get_matching_contracts(&hashes);
1306
1307 let mut entries = Vec::with_capacity(matching.len());
1309 for contract in matching {
1310 let hash = contract_hash(&contract);
1311 let summary = get_contract_summary(op_manager, &contract).await;
1312 entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
1313
1314 if let Some(ref pk) = peer_key {
1315 if op_manager
1320 .interest_manager
1321 .get_peer_interest(&contract, pk)
1322 .is_some()
1323 {
1324 op_manager
1325 .interest_manager
1326 .refresh_peer_interest(&contract, pk);
1327 } else {
1328 op_manager.interest_manager.register_peer_interest(
1329 &contract,
1330 pk.clone(),
1331 None, false,
1333 );
1334 }
1335 }
1336 }
1337
1338 if entries.is_empty() {
1339 None
1340 } else {
1341 Some(InterestMessage::Summaries { entries })
1342 }
1343 }
1344
1345 InterestMessage::Summaries { entries } => {
1346 tracing::debug!(
1347 from = %source,
1348 entry_count = entries.len(),
1349 "Received Summaries message"
1350 );
1351
1352 let peer_key = get_peer_key_from_addr(op_manager, source);
1367 let mut stale_contracts = Vec::new();
1368
1369 if let Some(pk) = peer_key {
1370 for entry in entries {
1371 for contract in op_manager.interest_manager.lookup_by_hash(entry.hash) {
1372 if !op_manager.interest_manager.has_local_interest(&contract) {
1373 continue;
1374 }
1375
1376 let their_summary = entry.to_summary();
1377 let our_summary = get_contract_summary(op_manager, &contract).await;
1378
1379 let is_stale = our_summary
1380 .as_ref()
1381 .zip(their_summary.as_ref())
1382 .is_some_and(|(ours, theirs)| ours.as_ref() != theirs.as_ref());
1383
1384 op_manager.interest_manager.update_peer_summary(
1385 &contract,
1386 &pk,
1387 their_summary,
1388 );
1389
1390 if is_stale && !stale_contracts.contains(&contract) {
1391 stale_contracts.push(contract);
1392 }
1393 }
1394 }
1395 }
1396
1397 for contract in stale_contracts {
1399 let Some(state) = get_contract_state(op_manager, &contract).await else {
1400 tracing::trace!(
1401 contract = %contract,
1402 "Skipping stale-peer broadcast — no local state available"
1403 );
1404 continue;
1405 };
1406 tracing::info!(
1407 contract = %contract,
1408 detected_via = %source,
1409 "Summary mismatch in interest sync — broadcasting to all stale peers"
1410 );
1411 if let Err(e) = op_manager
1412 .notify_node_event(NodeEvent::BroadcastStateChange {
1413 key: contract,
1414 new_state: state,
1415 })
1416 .await
1417 {
1418 tracing::warn!(
1419 contract = %contract,
1420 error = %e,
1421 "Failed to emit BroadcastStateChange for stale peer correction"
1422 );
1423 }
1424 }
1425
1426 None
1428 }
1429
1430 InterestMessage::ChangeInterests { added, removed } => {
1431 tracing::debug!(
1432 from = %source,
1433 added_count = added.len(),
1434 removed_count = removed.len(),
1435 "Received ChangeInterests message"
1436 );
1437
1438 let peer_key = get_peer_key_from_addr(op_manager, source);
1439
1440 if let Some(ref pk) = peer_key {
1442 for hash in removed {
1443 for contract in op_manager.interest_manager.lookup_by_hash(hash) {
1445 op_manager
1446 .interest_manager
1447 .remove_peer_interest(&contract, pk);
1448 }
1449 }
1450 }
1451
1452 let mut entries = Vec::new();
1454 if let Some(ref pk) = peer_key {
1455 for hash in added {
1456 for contract in op_manager.interest_manager.lookup_by_hash(hash) {
1458 if !op_manager.interest_manager.has_local_interest(&contract) {
1460 continue;
1461 }
1462
1463 op_manager.interest_manager.register_peer_interest(
1465 &contract,
1466 pk.clone(),
1467 None,
1468 false,
1469 );
1470
1471 let summary = get_contract_summary(op_manager, &contract).await;
1473 entries.push(SummaryEntry::from_summary(hash, summary.as_ref()));
1474 }
1475 }
1476 }
1477
1478 if entries.is_empty() {
1479 None
1480 } else {
1481 Some(InterestMessage::Summaries { entries })
1482 }
1483 }
1484
1485 InterestMessage::ResyncRequest { key } => {
1486 tracing::info!(
1487 from = %source,
1488 contract = %key,
1489 event = "resync_request_received",
1490 "Received ResyncRequest - peer needs full state"
1491 );
1492
1493 op_manager.interest_manager.record_resync_request_received();
1495 crate::config::GlobalTestMetrics::record_resync_request();
1496
1497 let peer_key = get_peer_key_from_addr(op_manager, source);
1499 if let Some(ref pk) = peer_key {
1500 op_manager
1501 .interest_manager
1502 .update_peer_summary(&key, pk, None);
1503 }
1504
1505 let from_peer = op_manager.ring.connection_manager.get_peer_by_addr(source);
1507
1508 if let Some(ref from_pkl) = from_peer {
1510 if let Some(event) = crate::tracing::NetEventLog::resync_request_received(
1511 &op_manager.ring,
1512 key,
1513 from_pkl.clone(),
1514 ) {
1515 op_manager
1516 .ring
1517 .register_events(either::Either::Left(event))
1518 .await;
1519 }
1520 } else {
1521 tracing::debug!(
1522 contract = %key,
1523 source = %source,
1524 "ResyncRequest telemetry skipped: peer lookup failed"
1525 );
1526 }
1527
1528 let state = get_contract_state(op_manager, &key).await;
1530 let Some(state) = state else {
1531 tracing::warn!(
1532 contract = %key,
1533 "ResyncRequest for contract we don't have state for"
1534 );
1535 return None;
1536 };
1537
1538 let summary = get_contract_summary(op_manager, &key).await;
1540 let Some(summary) = summary else {
1541 tracing::warn!(
1542 contract = %key,
1543 "ResyncRequest for contract we can't compute summary for"
1544 );
1545 return None;
1546 };
1547
1548 tracing::info!(
1549 to = %source,
1550 contract = %key,
1551 state_size = state.as_ref().len(),
1552 summary_size = summary.as_ref().len(),
1553 event = "resync_response_sent",
1554 "Sending ResyncResponse with full state"
1555 );
1556
1557 if let Some(ref to_pkl) = from_peer {
1559 if let Some(event) = crate::tracing::NetEventLog::resync_response_sent(
1560 &op_manager.ring,
1561 key,
1562 to_pkl.clone(),
1563 state.as_ref().len(),
1564 ) {
1565 op_manager
1566 .ring
1567 .register_events(either::Either::Left(event))
1568 .await;
1569 }
1570 }
1571
1572 Some(InterestMessage::ResyncResponse {
1573 key,
1574 state_bytes: state.as_ref().to_vec(),
1575 summary_bytes: summary.as_ref().to_vec(),
1576 })
1577 }
1578
1579 InterestMessage::ResyncResponse {
1580 key,
1581 state_bytes,
1582 summary_bytes,
1583 } => {
1584 tracing::info!(
1585 from = %source,
1586 contract = %key,
1587 state_size = state_bytes.len(),
1588 event = "resync_response_received",
1589 "Received ResyncResponse with full state"
1590 );
1591
1592 let state = freenet_stdlib::prelude::State::from(state_bytes.clone());
1594 let update_data = freenet_stdlib::prelude::UpdateData::State(state);
1595
1596 use crate::contract::ContractHandlerEvent;
1598 match op_manager
1599 .notify_contract_handler(ContractHandlerEvent::UpdateQuery {
1600 key,
1601 data: update_data,
1602 related_contracts: Default::default(),
1603 })
1604 .await
1605 {
1606 Ok(ContractHandlerEvent::UpdateResponse {
1607 new_value: Ok(_), ..
1608 }) => {
1609 tracing::info!(
1610 from = %source,
1611 contract = %key,
1612 event = "resync_applied",
1613 changed = true,
1614 "ResyncResponse state applied successfully"
1615 );
1616 }
1617 Ok(ContractHandlerEvent::UpdateNoChange { .. }) => {
1618 tracing::info!(
1619 from = %source,
1620 contract = %key,
1621 event = "resync_applied",
1622 changed = false,
1623 "ResyncResponse state unchanged (already had this state)"
1624 );
1625 }
1626 Ok(other) => {
1627 tracing::warn!(
1628 from = %source,
1629 contract = %key,
1630 event = "resync_failed",
1631 response = ?other,
1632 "Unexpected response to resync update"
1633 );
1634 }
1635 Err(e) => {
1636 tracing::error!(
1637 from = %source,
1638 contract = %key,
1639 event = "resync_failed",
1640 error = %e,
1641 "Failed to apply resync state"
1642 );
1643 }
1644 }
1645
1646 let peer_key = get_peer_key_from_addr(op_manager, source);
1648 if let Some(pk) = peer_key {
1649 let summary = freenet_stdlib::prelude::StateSummary::from(summary_bytes);
1650 op_manager
1651 .interest_manager
1652 .update_peer_summary(&key, &pk, Some(summary));
1653 }
1654
1655 None
1657 }
1658 }
1659}
1660
1661async fn get_contract_state(
1663 op_manager: &Arc<OpManager>,
1664 key: &freenet_stdlib::prelude::ContractKey,
1665) -> Option<freenet_stdlib::prelude::WrappedState> {
1666 get_contract_state_by_id(op_manager, key.id())
1667 .await
1668 .map(|(_, state)| state)
1669}
1670
1671async fn get_contract_state_by_id(
1676 op_manager: &Arc<OpManager>,
1677 instance_id: &freenet_stdlib::prelude::ContractInstanceId,
1678) -> Option<(
1679 freenet_stdlib::prelude::ContractKey,
1680 freenet_stdlib::prelude::WrappedState,
1681)> {
1682 use crate::contract::ContractHandlerEvent;
1683
1684 match op_manager
1685 .notify_contract_handler(ContractHandlerEvent::GetQuery {
1686 instance_id: *instance_id,
1687 return_contract_code: false,
1688 })
1689 .await
1690 {
1691 Ok(ContractHandlerEvent::GetResponse {
1692 key: Some(key),
1693 response: Ok(store_response),
1694 }) => store_response.state.map(|state| (key, state)),
1695 Ok(ContractHandlerEvent::GetResponse {
1696 response: Err(e), ..
1697 }) => {
1698 tracing::warn!(
1699 contract = %instance_id,
1700 error = %e,
1701 "Failed to get contract state by instance id"
1702 );
1703 None
1704 }
1705 _ => None,
1706 }
1707}
1708
1709async fn get_contract_summary(
1711 op_manager: &Arc<OpManager>,
1712 key: &freenet_stdlib::prelude::ContractKey,
1713) -> Option<freenet_stdlib::prelude::StateSummary<'static>> {
1714 use crate::contract::ContractHandlerEvent;
1715
1716 match op_manager
1717 .notify_contract_handler(ContractHandlerEvent::GetSummaryQuery { key: *key })
1718 .await
1719 {
1720 Ok(ContractHandlerEvent::GetSummaryResponse {
1721 summary: Ok(summary),
1722 ..
1723 }) => Some(summary),
1724 Ok(ContractHandlerEvent::GetSummaryResponse {
1725 summary: Err(e), ..
1726 }) => {
1727 tracing::warn!(
1728 contract = %key,
1729 error = %e,
1730 "Failed to get contract summary"
1731 );
1732 None
1733 }
1734 _ => None,
1735 }
1736}
1737
1738fn get_peer_key_from_addr(
1740 op_manager: &Arc<OpManager>,
1741 addr: std::net::SocketAddr,
1742) -> Option<crate::ring::interest::PeerKey> {
1743 op_manager
1744 .ring
1745 .connection_manager
1746 .get_peer_by_addr(addr)
1747 .map(|pkl| crate::ring::interest::PeerKey::from(pkl.pub_key.clone()))
1748}
1749
1750#[allow(dead_code)]
1752pub async fn subscribe(
1753 op_manager: Arc<OpManager>,
1754 instance_id: ContractInstanceId,
1755 client_id: Option<ClientId>,
1756) -> Result<Transaction, OpError> {
1757 subscribe_with_id(op_manager, instance_id, client_id, None, false).await
1759}
1760
1761pub async fn subscribe_with_id(
1766 op_manager: Arc<OpManager>,
1767 instance_id: ContractInstanceId,
1768 client_id: Option<ClientId>,
1769 transaction_id: Option<Transaction>,
1770 is_renewal: bool,
1771) -> Result<Transaction, OpError> {
1772 let op = match transaction_id {
1773 Some(id) => subscribe::start_op_with_id(instance_id, id, is_renewal),
1774 None => subscribe::start_op(instance_id, is_renewal),
1775 };
1776 let id = op.id;
1777 if let Some(client_id) = client_id {
1778 use crate::client_events::RequestId;
1779 let request_id = RequestId::new();
1781 if let Err(e) = op_manager
1782 .ch_outbound
1783 .waiting_for_subscription_result(id, instance_id, client_id, request_id)
1784 .await
1785 {
1786 tracing::warn!(tx = %id, error = %e, "failed to register subscription result waiter");
1787 }
1788 }
1789 match subscribe::request_subscribe(&op_manager, op).await {
1791 Err(err) => {
1792 tracing::error!("{}", err);
1793 Err(err)
1794 }
1795 Ok(()) => Ok(id),
1796 }
1797}
1798
1799async fn handle_aborted_op(
1800 tx: Transaction,
1801 op_manager: &OpManager,
1802 gateways: &[PeerKeyLocation],
1803) -> Result<(), OpError> {
1804 use crate::util::IterExt;
1805 match tx.transaction_type() {
1806 TransactionType::Connect => {
1807 match op_manager.pop(&tx) {
1810 Ok(Some(OpEnum::Connect(op)))
1811 if op_manager.ring.open_connections()
1812 < op_manager.ring.connection_manager.min_connections =>
1813 {
1814 let gateway = op.gateway().cloned();
1815 if let Some(gateway) = gateway {
1816 if let Some(peer_addr) = gateway.peer_addr.as_known() {
1820 op_manager
1821 .ring
1822 .connection_manager
1823 .prune_in_transit_connection(*peer_addr);
1824
1825 let backoff_duration = {
1826 let mut backoff = op_manager.gateway_backoff.lock();
1827 backoff.record_failure(*peer_addr);
1828 backoff.remaining_backoff(*peer_addr)
1829 };
1830
1831 if let Some(duration) = backoff_duration {
1832 let open_conns = op_manager.ring.open_connections();
1837 let effective = if open_conns > 0 {
1838 let jitter_ms = crate::config::GlobalRng::random_range(
1839 0u64..(connect::GATEWAY_BACKOFF_POLL_CAP.as_millis() / 5)
1840 as u64,
1841 );
1842 let cap = connect::GATEWAY_BACKOFF_POLL_CAP.mul_f64(0.8)
1843 + Duration::from_millis(jitter_ms);
1844 duration.min(cap)
1845 } else {
1846 duration
1847 };
1848 tracing::info!(
1849 gateway = %gateway,
1850 backoff_secs = duration.as_secs(),
1851 effective_wait_secs = effective.as_secs(),
1852 open_connections = open_conns,
1853 "Gateway connection failed, waiting before retry"
1854 );
1855 tokio::select! {
1859 _ = tokio::time::sleep(effective) => {},
1860 _ = op_manager.gateway_backoff_cleared.notified() => {
1861 tracing::info!(
1862 gateway = %gateway,
1863 "Gateway backoff cleared externally, retrying immediately"
1864 );
1865 },
1866 }
1867 }
1868 }
1869
1870 tracing::debug!("Retrying connection to gateway {}", gateway);
1871 connect::join_ring_request(&gateway, op_manager).await?;
1872 }
1873 }
1874 Ok(Some(OpEnum::Connect(op))) => {
1875 if let Some(peer_addr) = op.get_next_hop_addr() {
1877 op_manager
1878 .ring
1879 .connection_manager
1880 .prune_in_transit_connection(peer_addr);
1881 }
1882 if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
1883 tracing::warn!("Retrying joining the ring with an other gateway");
1884 if let Some(gateway) = gateways.iter().shuffle().next() {
1885 connect::join_ring_request(gateway, op_manager).await?
1886 }
1887 }
1888 }
1889 Ok(Some(other)) => {
1890 op_manager.push(tx, other).await?;
1891 }
1892 _ => {}
1893 }
1894 }
1895 TransactionType::Get => match op_manager.pop(&tx) {
1896 Ok(Some(OpEnum::Get(op))) => {
1897 if let Err(err) = op.handle_abort(op_manager).await {
1898 if !matches!(err, OpError::StatePushed) {
1899 return Err(err);
1900 }
1901 }
1902 }
1903 Ok(Some(other)) => {
1904 op_manager.push(tx, other).await?;
1905 }
1906 _ => {}
1907 },
1908 TransactionType::Subscribe => match op_manager.pop(&tx) {
1909 Ok(Some(OpEnum::Subscribe(op))) => {
1910 if let Err(err) = op.handle_abort(op_manager).await {
1911 if !matches!(err, OpError::StatePushed) {
1912 return Err(err);
1913 }
1914 }
1915 }
1916 Ok(Some(other)) => {
1917 op_manager.push(tx, other).await?;
1918 }
1919 _ => {}
1920 },
1921 TransactionType::Put => match op_manager.pop(&tx) {
1922 Ok(Some(OpEnum::Put(op))) => {
1923 if let Err(err) = op.handle_abort(op_manager).await {
1924 if !matches!(err, OpError::StatePushed) {
1925 return Err(err);
1926 }
1927 }
1928 }
1929 Ok(Some(other)) => {
1930 op_manager.push(tx, other).await?;
1931 }
1932 _ => {}
1933 },
1934 TransactionType::Update => match op_manager.pop(&tx) {
1935 Ok(Some(OpEnum::Update(op))) => {
1936 if let Err(err) = op.handle_abort(op_manager).await {
1937 if !matches!(err, OpError::StatePushed) {
1938 return Err(err);
1939 }
1940 }
1941 }
1942 Ok(Some(other)) => {
1943 op_manager.push(tx, other).await?;
1944 }
1945 _ => {}
1946 },
1947 }
1948 Ok(())
1949}
1950
1951#[derive(Serialize, Deserialize, Eq, Clone)]
1959pub struct PeerId {
1960 pub addr: SocketAddr,
1961 pub pub_key: TransportPublicKey,
1962}
1963
1964impl Hash for PeerId {
1965 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1966 self.addr.hash(state);
1967 }
1968}
1969
1970impl PartialEq<PeerId> for PeerId {
1971 fn eq(&self, other: &PeerId) -> bool {
1972 self.addr == other.addr
1973 }
1974}
1975
1976impl Ord for PeerId {
1977 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1978 self.addr.cmp(&other.addr)
1979 }
1980}
1981
1982impl PartialOrd for PeerId {
1983 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1984 Some(self.cmp(other))
1985 }
1986}
1987
1988impl PeerId {
1989 pub fn new(addr: SocketAddr, pub_key: TransportPublicKey) -> Self {
1990 Self { addr, pub_key }
1991 }
1992}
1993
1994thread_local! {
1995 static PEER_ID: std::cell::RefCell<Option<TransportPublicKey>> = const { std::cell::RefCell::new(None) };
1996}
1997
1998#[cfg(test)]
1999impl<'a> arbitrary::Arbitrary<'a> for PeerId {
2000 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
2001 let addr: ([u8; 4], u16) = u.arbitrary()?;
2002
2003 let pub_key = PEER_ID.with(|peer_id| {
2004 let mut peer_id = peer_id.borrow_mut();
2005 match &*peer_id {
2006 Some(k) => k.clone(),
2007 None => {
2008 let key = TransportKeypair::new().public().clone();
2009 peer_id.replace(key.clone());
2010 key
2011 }
2012 }
2013 });
2014
2015 Ok(Self {
2016 addr: addr.into(),
2017 pub_key,
2018 })
2019 }
2020}
2021
2022impl PeerId {
2023 pub fn random() -> Self {
2024 let mut addr = [0; 4];
2025 GlobalRng::fill_bytes(&mut addr[..]);
2026 let port: u16 = GlobalRng::random_range(1024u16..65535u16);
2028
2029 let pub_key = PEER_ID.with(|peer_id| {
2030 let mut peer_id = peer_id.borrow_mut();
2031 match &*peer_id {
2032 Some(k) => k.clone(),
2033 None => {
2034 let key = TransportKeypair::new().public().clone();
2035 peer_id.replace(key.clone());
2036 key
2037 }
2038 }
2039 });
2040
2041 Self {
2042 addr: (addr, port).into(),
2043 pub_key,
2044 }
2045 }
2046
2047 #[cfg(test)]
2048 pub fn to_bytes(self) -> Vec<u8> {
2049 bincode::serialize(&self).unwrap()
2050 }
2051}
2052
2053impl std::fmt::Debug for PeerId {
2054 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2055 <Self as Display>::fmt(self, f)
2056 }
2057}
2058
2059impl Display for PeerId {
2060 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2061 write!(f, "{:?}", self.pub_key)
2062 }
2063}
2064
2065pub async fn run_local_node(
2066 mut executor: Executor,
2067 socket: WebsocketApiConfig,
2068) -> anyhow::Result<()> {
2069 match socket.address {
2070 IpAddr::V4(ip) if !ip.is_loopback() => {
2071 anyhow::bail!("invalid ip: {ip}, expecting localhost")
2072 }
2073 IpAddr::V6(ip) if !ip.is_loopback() => {
2074 anyhow::bail!("invalid ip: {ip}, expecting localhost")
2075 }
2076 IpAddr::V4(_) | IpAddr::V6(_) => {}
2077 }
2078
2079 let (mut gw, mut ws_proxy) = crate::server::serve_client_api_in(socket).await?;
2080
2081 enum Receiver {
2085 Ws,
2086 Gw,
2087 }
2088 let mut receiver;
2089 loop {
2090 let req = crate::deterministic_select! {
2091 req = ws_proxy.recv() => {
2092 receiver = Receiver::Ws;
2093 req?
2094 },
2095 req = gw.recv() => {
2096 receiver = Receiver::Gw;
2097 req?
2098 },
2099 };
2100 let OpenRequest {
2101 client_id: id,
2102 request,
2103 notification_channel,
2104 token,
2105 attested_contract,
2106 ..
2107 } = req;
2108 tracing::debug!(client_id = %id, ?token, "Received OpenRequest -> {request}");
2109
2110 let res = match *request {
2111 ClientRequest::ContractOp(op) => {
2112 executor
2113 .contract_requests(op, id, notification_channel)
2114 .await
2115 }
2116 ClientRequest::DelegateOp(op) => {
2117 let op_name = match op {
2121 DelegateRequest::RegisterDelegate { .. } => "RegisterDelegate",
2122 DelegateRequest::ApplicationMessages { .. } => "ApplicationMessages",
2123 DelegateRequest::UnregisterDelegate(_) => "UnregisterDelegate",
2124 _ => "Unknown",
2125 };
2126 tracing::debug!(
2127 op_name = ?op_name,
2128 ?attested_contract,
2129 "Handling ClientRequest::DelegateOp"
2130 );
2131 executor.delegate_request(op, attested_contract.as_ref())
2132 }
2133 ClientRequest::Disconnect { cause } => {
2134 if let Some(cause) = cause {
2135 tracing::info!("disconnecting cause: {cause}");
2136 }
2137 continue;
2138 }
2139 ClientRequest::Authenticate { .. }
2140 | ClientRequest::NodeQueries(_)
2141 | ClientRequest::Close
2142 | _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
2143 };
2144
2145 match res {
2146 Ok(res) => {
2147 match receiver {
2148 Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
2149 Receiver::Gw => gw.send(id, Ok(res)).await?,
2150 };
2151 }
2152 Err(err) if err.is_request() => {
2153 let err = ErrorKind::RequestError(err.unwrap_request());
2154 match receiver {
2155 Receiver::Ws => {
2156 ws_proxy.send(id, Err(err.into())).await?;
2157 }
2158 Receiver::Gw => {
2159 gw.send(id, Err(err.into())).await?;
2160 }
2161 };
2162 }
2163 Err(err) => {
2164 tracing::error!("{err}");
2165 let err = Err(ErrorKind::Unhandled {
2166 cause: format!("{err}").into(),
2167 }
2168 .into());
2169 match receiver {
2170 Receiver::Ws => {
2171 ws_proxy.send(id, err).await?;
2172 }
2173 Receiver::Gw => {
2174 gw.send(id, err).await?;
2175 }
2176 };
2177 }
2178 }
2179 }
2180}
2181
2182pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
2183 tracing::info!("Starting node");
2184
2185 let is_gateway = node.inner.is_gateway;
2186 let location = if let Some(loc) = node.inner.location {
2187 Some(loc)
2188 } else {
2189 is_gateway
2190 .then(|| {
2191 node.inner
2192 .peer_id
2193 .as_ref()
2194 .map(|id| Location::from_address(&id.addr))
2195 })
2196 .flatten()
2197 };
2198
2199 if let Some(location) = location {
2200 tracing::info!("Setting initial location: {location}");
2201 node.update_location(location);
2202 }
2203
2204 match node.run().await {
2205 Ok(_) => {
2206 if is_gateway {
2207 tracing::info!("Gateway finished");
2208 } else {
2209 tracing::info!("Node finished");
2210 }
2211
2212 Ok(())
2213 }
2214 Err(e) => {
2215 tracing::error!("{e}");
2216 Err(e)
2217 }
2218 }
2219}
2220
2221pub trait IsOperationCompleted {
2223 fn is_completed(&self) -> bool;
2225}
2226
2227impl IsOperationCompleted for OpEnum {
2228 fn is_completed(&self) -> bool {
2229 match self {
2230 OpEnum::Connect(op) => op.is_completed(),
2231 OpEnum::Put(op) => op.is_completed(),
2232 OpEnum::Get(op) => op.is_completed(),
2233 OpEnum::Subscribe(op) => op.is_completed(),
2234 OpEnum::Update(op) => op.is_completed(),
2235 }
2236 }
2237}
2238
2239fn classify_op_outcome(
2245 tx_type: TransactionType,
2246 outcome: OpOutcome<'_>,
2247) -> (Option<network_status::OpType>, bool) {
2248 use network_status::OpType;
2249 match (tx_type, outcome) {
2250 (
2251 TransactionType::Get,
2252 OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2253 ) => (Some(OpType::Get), true),
2254 (TransactionType::Get, OpOutcome::ContractOpFailure { .. }) => (Some(OpType::Get), false),
2255 (
2256 TransactionType::Put,
2257 OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2258 ) => (Some(OpType::Put), true),
2259 (TransactionType::Put, OpOutcome::ContractOpFailure { .. }) => (Some(OpType::Put), false),
2260 (
2261 TransactionType::Update,
2262 OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2263 ) => (Some(OpType::Update), true),
2264 (TransactionType::Update, OpOutcome::ContractOpFailure { .. }) => {
2265 (Some(OpType::Update), false)
2266 }
2267 (
2268 TransactionType::Subscribe,
2269 OpOutcome::ContractOpSuccess { .. } | OpOutcome::ContractOpSuccessUntimed { .. },
2270 ) => (Some(OpType::Subscribe), true),
2271 (TransactionType::Subscribe, OpOutcome::ContractOpFailure { .. }) => {
2272 (Some(OpType::Subscribe), false)
2273 }
2274 (TransactionType::Get, OpOutcome::Irrelevant) => (Some(OpType::Get), true),
2277 (TransactionType::Put, OpOutcome::Irrelevant) => (Some(OpType::Put), true),
2278 (TransactionType::Update, OpOutcome::Irrelevant) => (Some(OpType::Update), true),
2279 (TransactionType::Subscribe, OpOutcome::Irrelevant) => (Some(OpType::Subscribe), true),
2280 (TransactionType::Get, OpOutcome::Incomplete) => (Some(OpType::Get), false),
2282 (TransactionType::Put, OpOutcome::Incomplete) => (Some(OpType::Put), false),
2283 (TransactionType::Update, OpOutcome::Incomplete) => (Some(OpType::Update), false),
2284 (TransactionType::Subscribe, OpOutcome::Incomplete) => (Some(OpType::Subscribe), false),
2285 _ => (None, false),
2287 }
2288}
2289
2290pub fn is_operation_completed(op_result: &Result<Option<OpEnum>, OpError>) -> bool {
2292 match op_result {
2293 Ok(Some(op)) => op.is_completed(),
2295 _ => false,
2296 }
2297}
2298
2299#[cfg(test)]
2300mod tests {
2301 use std::net::{Ipv4Addr, Ipv6Addr};
2302
2303 use super::*;
2304 use crate::operations::OpError;
2305 use rstest::rstest;
2306
2307 #[tokio::test]
2309 async fn test_hostname_resolution_localhost() {
2310 let addr = Address::Hostname("localhost".to_string());
2311 let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
2312 assert!(
2313 socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
2314 || socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
2315 );
2316 assert!(socket_addr.port() > 1024);
2317 }
2318
2319 #[tokio::test]
2320 async fn test_hostname_resolution_with_port() {
2321 let addr = Address::Hostname("google.com:8080".to_string());
2322 let socket_addr = NodeConfig::parse_socket_addr(&addr).await.unwrap();
2323 assert_eq!(socket_addr.port(), 8080);
2324 }
2325
2326 #[tokio::test]
2327 async fn test_hostname_resolution_with_trailing_dot() {
2328 let addr = Address::Hostname("localhost.".to_string());
2330 let result = NodeConfig::parse_socket_addr(&addr).await;
2331 if let Ok(socket_addr) = result {
2333 assert!(
2334 socket_addr.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
2335 || socket_addr.ip() == IpAddr::V6(Ipv6Addr::LOCALHOST)
2336 );
2337 }
2338 }
2339
2340 #[tokio::test]
2341 async fn test_hostname_resolution_direct_socket_addr() {
2342 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
2343 let addr = Address::HostAddress(socket);
2344 let resolved = NodeConfig::parse_socket_addr(&addr).await.unwrap();
2345 assert_eq!(resolved, socket);
2346 }
2347
2348 #[tokio::test]
2349 async fn test_hostname_resolution_invalid_port() {
2350 let addr = Address::Hostname("localhost:not_a_port".to_string());
2351 let result = NodeConfig::parse_socket_addr(&addr).await;
2352 assert!(result.is_err());
2353 }
2354
2355 #[rstest]
2357 #[case::same_addr_different_keys(8080, 8080, true)]
2358 #[case::different_addr_same_key(8080, 8081, false)]
2359 fn test_peer_id_equality(#[case] port1: u16, #[case] port2: u16, #[case] expected_equal: bool) {
2360 let keypair1 = TransportKeypair::new();
2361 let keypair2 = TransportKeypair::new();
2362 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port1);
2363 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port2);
2364
2365 let peer1 = PeerId::new(addr1, keypair1.public().clone());
2366 let peer2 = PeerId::new(addr2, keypair2.public().clone());
2367
2368 assert_eq!(peer1 == peer2, expected_equal);
2369 }
2370
2371 #[rstest]
2372 #[case::lower_port_first(8080, 8081)]
2373 #[case::high_port_diff(1024, 65535)]
2374 fn test_peer_id_ordering(#[case] lower_port: u16, #[case] higher_port: u16) {
2375 let keypair = TransportKeypair::new();
2376 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), lower_port);
2377 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), higher_port);
2378
2379 let peer1 = PeerId::new(addr1, keypair.public().clone());
2380 let peer2 = PeerId::new(addr2, keypair.public().clone());
2381
2382 assert!(peer1 < peer2);
2383 assert!(peer2 > peer1);
2384 }
2385
2386 #[test]
2387 fn test_peer_id_hash_consistency() {
2388 use std::collections::hash_map::DefaultHasher;
2389 use std::hash::{Hash, Hasher};
2390
2391 let keypair1 = TransportKeypair::new();
2392 let keypair2 = TransportKeypair::new();
2393 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
2394
2395 let peer1 = PeerId::new(addr, keypair1.public().clone());
2396 let peer2 = PeerId::new(addr, keypair2.public().clone());
2397
2398 let mut hasher1 = DefaultHasher::new();
2399 let mut hasher2 = DefaultHasher::new();
2400 peer1.hash(&mut hasher1);
2401 peer2.hash(&mut hasher2);
2402
2403 assert_eq!(hasher1.finish(), hasher2.finish());
2405 }
2406
2407 #[test]
2408 fn test_peer_id_random_produces_unique() {
2409 let peer1 = PeerId::random();
2410 let peer2 = PeerId::random();
2411
2412 assert_ne!(peer1.addr, peer2.addr);
2414 }
2415
2416 #[test]
2417 fn test_peer_id_serialization() {
2418 let peer = PeerId::random();
2419 let bytes = peer.clone().to_bytes();
2420 assert!(!bytes.is_empty());
2421
2422 let deserialized: PeerId = bincode::deserialize(&bytes).unwrap();
2424 assert_eq!(peer.addr, deserialized.addr);
2425 }
2426
2427 #[test]
2428 fn test_peer_id_display() {
2429 let peer = PeerId::random();
2430 let display = format!("{}", peer);
2431 let debug = format!("{:?}", peer);
2432
2433 assert_eq!(display, debug);
2435 assert!(!display.is_empty());
2437 }
2438
2439 #[test]
2441 fn test_init_peer_node_construction() {
2442 let keypair = TransportKeypair::new();
2443 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
2444 let peer_key_location = PeerKeyLocation::new(keypair.public().clone(), addr);
2445 let location = Location::new(0.5);
2446
2447 let init_peer = InitPeerNode::new(peer_key_location.clone(), location);
2448
2449 assert_eq!(init_peer.peer_key_location, peer_key_location);
2450 assert_eq!(init_peer.location, location);
2451 }
2452
2453 #[rstest]
2455 #[case::with_none(Ok(None), false)]
2456 #[case::with_running_error(Err(OpError::OpNotAvailable(super::OpNotAvailable::Running)), false)]
2457 #[case::with_state_pushed_error(Err(OpError::StatePushed), false)]
2458 fn test_is_operation_completed(
2459 #[case] result: Result<Option<OpEnum>, OpError>,
2460 #[case] expected: bool,
2461 ) {
2462 assert_eq!(is_operation_completed(&result), expected);
2463 }
2464
2465 mod classify_op_outcome_tests {
2467 use super::super::{classify_op_outcome, network_status::OpType};
2468 use crate::message::TransactionType;
2469 use crate::operations::OpOutcome;
2470
2471 #[test]
2472 fn irrelevant_counted_as_success() {
2473 let (op_type, success) =
2474 classify_op_outcome(TransactionType::Update, OpOutcome::Irrelevant);
2475 assert!(matches!(op_type, Some(OpType::Update)));
2476 assert!(success);
2477 }
2478
2479 #[test]
2480 fn incomplete_counted_as_failure() {
2481 let (op_type, success) =
2482 classify_op_outcome(TransactionType::Get, OpOutcome::Incomplete);
2483 assert!(matches!(op_type, Some(OpType::Get)));
2484 assert!(!success);
2485 }
2486
2487 #[test]
2488 fn connect_skipped() {
2489 let (op_type, _) = classify_op_outcome(TransactionType::Connect, OpOutcome::Irrelevant);
2490 assert!(op_type.is_none());
2491
2492 let (op_type, _) = classify_op_outcome(TransactionType::Connect, OpOutcome::Incomplete);
2493 assert!(op_type.is_none());
2494 }
2495
2496 #[test]
2497 fn subscribe_irrelevant_is_success() {
2498 let (op_type, success) =
2499 classify_op_outcome(TransactionType::Subscribe, OpOutcome::Irrelevant);
2500 assert!(matches!(op_type, Some(OpType::Subscribe)));
2501 assert!(success);
2502 }
2503
2504 #[test]
2505 fn put_incomplete_is_failure() {
2506 let (op_type, success) =
2507 classify_op_outcome(TransactionType::Put, OpOutcome::Incomplete);
2508 assert!(matches!(op_type, Some(OpType::Put)));
2509 assert!(!success);
2510 }
2511 }
2512}