ant_node/replication/
fresh.rs1use std::sync::Arc;
9
10use crate::logging::{debug, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14use tokio::sync::Semaphore;
15
16use crate::ant_protocol::XorName;
17use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
18use crate::replication::paid_list::PaidList;
19use crate::replication::protocol::{
20 FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody,
21};
22
23pub struct FreshWriteEvent {
29 pub key: XorName,
31 pub data: Vec<u8>,
33 pub payment_proof: Vec<u8>,
35}
36
37pub async fn replicate_fresh(
47 key: &XorName,
48 data: &[u8],
49 proof_of_payment: &[u8],
50 p2p_node: &Arc<P2PNode>,
51 paid_list: &Arc<PaidList>,
52 config: &ReplicationConfig,
53 send_semaphore: &Arc<Semaphore>,
54) {
55 let self_id = *p2p_node.peer_id();
56
57 if let Err(e) = paid_list.insert(key).await {
59 warn!("Failed to add key {} to PaidForList: {e}", hex::encode(key));
60 }
61
62 let closest = p2p_node
66 .dht_manager()
67 .find_closest_nodes_local_with_self(key, config.close_group_size)
68 .await;
69 let target_peers: Vec<PeerId> = closest
70 .iter()
71 .filter(|n| n.peer_id != self_id)
72 .map(|n| n.peer_id)
73 .collect();
74
75 let offer = FreshReplicationOffer {
76 key: *key,
77 data: data.to_vec(),
78 proof_of_payment: proof_of_payment.to_vec(),
79 };
80 let request_id = rand::thread_rng().gen::<u64>();
81 let offer_msg = ReplicationMessage {
82 request_id,
83 body: ReplicationMessageBody::FreshReplicationOffer(offer),
84 };
85
86 let Ok(encoded) = offer_msg.encode() else {
87 warn!(
88 "Failed to encode FreshReplicationOffer for {}",
89 hex::encode(key),
90 );
91 return;
92 };
93 for peer in &target_peers {
94 let p2p = Arc::clone(p2p_node);
95 let data = encoded.clone();
96 let peer_id = *peer;
97 let sem = Arc::clone(send_semaphore);
98 tokio::spawn(async move {
99 let _permit = sem.acquire().await;
102 debug!(
103 "Replication send permit acquired for peer {peer_id} ({} available)",
104 sem.available_permits()
105 );
106 if let Err(e) = p2p
107 .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
108 .await
109 {
110 debug!("Failed to send fresh offer to {peer_id}: {e}");
111 }
112 });
113 }
114
115 send_paid_notify(key, proof_of_payment, p2p_node, config).await;
119
120 debug!(
121 "Fresh replication initiated for {} to {} peers + PaidNotify",
122 hex::encode(key),
123 target_peers.len()
124 );
125}
126
127async fn send_paid_notify(
131 key: &XorName,
132 proof_of_payment: &[u8],
133 p2p_node: &Arc<P2PNode>,
134 config: &ReplicationConfig,
135) {
136 let self_id = *p2p_node.peer_id();
137 let paid_group = p2p_node
138 .dht_manager()
139 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
140 .await;
141
142 let notify = PaidNotify {
143 key: *key,
144 proof_of_payment: proof_of_payment.to_vec(),
145 };
146 let request_id = rand::thread_rng().gen::<u64>();
147 let msg = ReplicationMessage {
148 request_id,
149 body: ReplicationMessageBody::PaidNotify(notify),
150 };
151
152 let Ok(encoded) = msg.encode() else {
153 warn!("Failed to encode PaidNotify for {}", hex::encode(key));
154 return;
155 };
156
157 for node in &paid_group {
158 if node.peer_id == self_id {
159 continue;
160 }
161 let p2p = Arc::clone(p2p_node);
162 let data = encoded.clone();
163 let peer_id = node.peer_id;
164 tokio::spawn(async move {
165 if let Err(e) = p2p
166 .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
167 .await
168 {
169 debug!("Failed to send PaidNotify to {peer_id}: {e}");
170 }
171 });
172 }
173}