ant_node/replication/
fresh.rs1use std::sync::Arc;
9
10use crate::logging::{debug, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14use tokio::sync::Semaphore;
15
16use crate::ant_protocol::XorName;
17use crate::replication::config::{
18 ReplicationConfig, FRESH_REPLICATION_DELIVERY_MAX_RETRIES, REPLICATION_PROTOCOL_ID,
19};
20use crate::replication::paid_list::PaidList;
21use crate::replication::protocol::{
22 FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody,
23};
24
25pub struct FreshWriteEvent {
31 pub key: XorName,
33 pub data: Vec<u8>,
35 pub payment_proof: Vec<u8>,
37}
38
39pub async fn replicate_fresh(
50 key: &XorName,
51 data: &[u8],
52 proof_of_payment: &[u8],
53 p2p_node: &Arc<P2PNode>,
54 paid_list: &Arc<PaidList>,
55 config: &ReplicationConfig,
56 send_semaphore: &Arc<Semaphore>,
57) -> Vec<PeerId> {
58 let self_id = *p2p_node.peer_id();
59
60 if let Err(e) = paid_list.insert(key).await {
62 warn!("Failed to add key {} to PaidForList: {e}", hex::encode(key));
63 }
64
65 let closest = p2p_node
69 .dht_manager()
70 .find_closest_nodes_local_with_self(key, config.close_group_size)
71 .await;
72 let target_peers: Vec<PeerId> = closest
73 .iter()
74 .filter(|n| n.peer_id != self_id)
75 .map(|n| n.peer_id)
76 .collect();
77
78 let offer = FreshReplicationOffer {
79 key: *key,
80 data: data.to_vec(),
81 proof_of_payment: proof_of_payment.to_vec(),
82 };
83 let request_id = rand::thread_rng().gen::<u64>();
84 let offer_msg = ReplicationMessage {
85 request_id,
86 body: ReplicationMessageBody::FreshReplicationOffer(offer),
87 };
88
89 let Ok(encoded) = offer_msg.encode() else {
90 warn!(
91 "Failed to encode FreshReplicationOffer for {}",
92 hex::encode(key),
93 );
94 return Vec::new();
95 };
96 let encoded = Arc::new(encoded);
100 for peer in &target_peers {
101 let p2p = Arc::clone(p2p_node);
102 let data = Arc::clone(&encoded);
103 let peer_id = *peer;
104 let sem = Arc::clone(send_semaphore);
105 tokio::spawn(async move {
106 let _permit = sem.acquire().await;
109 debug!(
110 "Replication send permit acquired for peer {peer_id} ({} available)",
111 sem.available_permits()
112 );
113 let mut attempt = 0u32;
118 loop {
119 match p2p
120 .send_message(
121 &peer_id,
122 REPLICATION_PROTOCOL_ID,
123 data.as_ref().clone(),
124 &[],
125 )
126 .await
127 {
128 Ok(()) => break,
129 Err(e) => {
130 if attempt >= FRESH_REPLICATION_DELIVERY_MAX_RETRIES {
131 debug!(
132 "Failed to send fresh offer to {peer_id} after {} attempts: {e}",
133 attempt + 1
134 );
135 break;
136 }
137 attempt += 1;
138 debug!(
139 "Retrying fresh offer to {peer_id} (attempt {}): {e}",
140 attempt + 1
141 );
142 }
143 }
144 }
145 });
146 }
147
148 send_paid_notify(key, proof_of_payment, p2p_node, config).await;
152
153 debug!(
154 "Fresh replication initiated for {} to {} peers + PaidNotify",
155 hex::encode(key),
156 target_peers.len()
157 );
158
159 target_peers
160}
161
162async fn send_paid_notify(
166 key: &XorName,
167 proof_of_payment: &[u8],
168 p2p_node: &Arc<P2PNode>,
169 config: &ReplicationConfig,
170) {
171 let self_id = *p2p_node.peer_id();
172 let paid_group = p2p_node
173 .dht_manager()
174 .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
175 .await;
176
177 let notify = PaidNotify {
178 key: *key,
179 proof_of_payment: proof_of_payment.to_vec(),
180 };
181 let request_id = rand::thread_rng().gen::<u64>();
182 let msg = ReplicationMessage {
183 request_id,
184 body: ReplicationMessageBody::PaidNotify(notify),
185 };
186
187 let Ok(encoded) = msg.encode() else {
188 warn!("Failed to encode PaidNotify for {}", hex::encode(key));
189 return;
190 };
191
192 for node in &paid_group {
193 if node.peer_id == self_id {
194 continue;
195 }
196 let p2p = Arc::clone(p2p_node);
197 let data = encoded.clone();
198 let peer_id = node.peer_id;
199 tokio::spawn(async move {
200 if let Err(e) = p2p
201 .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
202 .await
203 {
204 debug!("Failed to send PaidNotify to {peer_id}: {e}");
205 }
206 });
207 }
208}