Skip to main content

ant_node/replication/
fresh.rs

1//! Fresh replication (Section 6.1).
2//!
3//! When a node accepts a newly written record with valid `PoP`:
4//! 1. Store locally (already done by chunk handler).
5//! 2. Send fresh offers to `CLOSE_GROUP_SIZE` nearest peers (excluding self).
6//! 3. Send `PaidNotify` to all peers in `PaidCloseGroup(K)`.
7
8use std::sync::Arc;
9
10use crate::logging::{debug, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14use tokio::sync::Semaphore;
15
16use crate::ant_protocol::XorName;
17use crate::replication::config::{
18    ReplicationConfig, FRESH_REPLICATION_DELIVERY_MAX_RETRIES, REPLICATION_PROTOCOL_ID,
19};
20use crate::replication::paid_list::PaidList;
21use crate::replication::protocol::{
22    FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody,
23};
24
25/// A newly-stored chunk that needs fresh replication.
26///
27/// Sent from the chunk PUT handler to the replication engine via an
28/// unbounded channel so that the PUT response is not blocked by
29/// replication fan-out.
30pub struct FreshWriteEvent {
31    /// Content-address of the stored chunk.
32    pub key: XorName,
33    /// The chunk data.
34    pub data: Vec<u8>,
35    /// Serialized proof-of-payment.
36    pub payment_proof: Vec<u8>,
37}
38
39/// Execute fresh replication for a newly accepted record.
40///
41/// Sends fresh offers to close group members (with bounded delivery retries,
42/// ADR-0003) and `PaidNotify` to `PaidCloseGroup`. Returns the close-group
43/// peers responsible for the key (excluding self) so the caller can schedule
44/// the delayed possession check; `PaidNotify` remains fire-and-forget.
45///
46/// The `send_semaphore` limits how many outbound chunk transfers can be
47/// in-flight concurrently across the entire replication engine, preventing
48/// bandwidth saturation on home broadband connections.
49pub async fn replicate_fresh(
50    key: &XorName,
51    data: &[u8],
52    proof_of_payment: &[u8],
53    p2p_node: &Arc<P2PNode>,
54    paid_list: &Arc<PaidList>,
55    config: &ReplicationConfig,
56    send_semaphore: &Arc<Semaphore>,
57) -> Vec<PeerId> {
58    let self_id = *p2p_node.peer_id();
59
60    // Rule 6: Node that validates PoP adds K to PaidForList(self).
61    if let Err(e) = paid_list.insert(key).await {
62        warn!("Failed to add key {} to PaidForList: {e}", hex::encode(key));
63    }
64
65    // Rule 2-3: Send fresh offers to CLOSE_GROUP_SIZE nearest peers
66    // (excluding self). Use self-inclusive query to get the true close group,
67    // then filter self out.
68    let closest = p2p_node
69        .dht_manager()
70        .find_closest_nodes_local_with_self(key, config.close_group_size)
71        .await;
72    let target_peers: Vec<PeerId> = closest
73        .iter()
74        .filter(|n| n.peer_id != self_id)
75        .map(|n| n.peer_id)
76        .collect();
77
78    let offer = FreshReplicationOffer {
79        key: *key,
80        data: data.to_vec(),
81        proof_of_payment: proof_of_payment.to_vec(),
82    };
83    let request_id = rand::thread_rng().gen::<u64>();
84    let offer_msg = ReplicationMessage {
85        request_id,
86        body: ReplicationMessageBody::FreshReplicationOffer(offer),
87    };
88
89    let Ok(encoded) = offer_msg.encode() else {
90        warn!(
91            "Failed to encode FreshReplicationOffer for {}",
92            hex::encode(key),
93        );
94        return Vec::new();
95    };
96    // Share one encoded copy across the per-peer send tasks so a retry only
97    // re-materialises the buffer for the (consuming) send call, keeping the
98    // common single-attempt path at one clone per peer.
99    let encoded = Arc::new(encoded);
100    for peer in &target_peers {
101        let p2p = Arc::clone(p2p_node);
102        let data = Arc::clone(&encoded);
103        let peer_id = *peer;
104        let sem = Arc::clone(send_semaphore);
105        tokio::spawn(async move {
106            // Acquire a permit before sending — this caps the number of
107            // concurrent outbound replication transfers across the engine.
108            let _permit = sem.acquire().await;
109            debug!(
110                "Replication send permit acquired for peer {peer_id} ({} available)",
111                sem.available_permits()
112            );
113            // ADR-0003: best-effort delivery. Retry the push up to
114            // FRESH_REPLICATION_DELIVERY_MAX_RETRIES times on a transport
115            // failure so a transient hiccup doesn't silently drop the offer.
116            // Possession is judged separately by the delayed possession check.
117            let mut attempt = 0u32;
118            loop {
119                match p2p
120                    .send_message(
121                        &peer_id,
122                        REPLICATION_PROTOCOL_ID,
123                        data.as_ref().clone(),
124                        &[],
125                    )
126                    .await
127                {
128                    Ok(()) => break,
129                    Err(e) => {
130                        if attempt >= FRESH_REPLICATION_DELIVERY_MAX_RETRIES {
131                            debug!(
132                                "Failed to send fresh offer to {peer_id} after {} attempts: {e}",
133                                attempt + 1
134                            );
135                            break;
136                        }
137                        attempt += 1;
138                        debug!(
139                            "Retrying fresh offer to {peer_id} (attempt {}): {e}",
140                            attempt + 1
141                        );
142                    }
143                }
144            }
145        });
146    }
147
148    // Rule 7-8: Send PaidNotify to every member of PaidCloseGroup(K).
149    // PaidNotify messages are small metadata (no chunk data), so they don't
150    // need semaphore gating.
151    send_paid_notify(key, proof_of_payment, p2p_node, config).await;
152
153    debug!(
154        "Fresh replication initiated for {} to {} peers + PaidNotify",
155        hex::encode(key),
156        target_peers.len()
157    );
158
159    target_peers
160}
161
162/// Send `PaidNotify(K)` to every peer in `PaidCloseGroup(K)` (fire-and-forget).
163///
164/// Per Invariant 16: sender MUST attempt delivery to every member.
165async fn send_paid_notify(
166    key: &XorName,
167    proof_of_payment: &[u8],
168    p2p_node: &Arc<P2PNode>,
169    config: &ReplicationConfig,
170) {
171    let self_id = *p2p_node.peer_id();
172    let paid_group = p2p_node
173        .dht_manager()
174        .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
175        .await;
176
177    let notify = PaidNotify {
178        key: *key,
179        proof_of_payment: proof_of_payment.to_vec(),
180    };
181    let request_id = rand::thread_rng().gen::<u64>();
182    let msg = ReplicationMessage {
183        request_id,
184        body: ReplicationMessageBody::PaidNotify(notify),
185    };
186
187    let Ok(encoded) = msg.encode() else {
188        warn!("Failed to encode PaidNotify for {}", hex::encode(key));
189        return;
190    };
191
192    for node in &paid_group {
193        if node.peer_id == self_id {
194            continue;
195        }
196        let p2p = Arc::clone(p2p_node);
197        let data = encoded.clone();
198        let peer_id = node.peer_id;
199        tokio::spawn(async move {
200            if let Err(e) = p2p
201                .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
202                .await
203            {
204                debug!("Failed to send PaidNotify to {peer_id}: {e}");
205            }
206        });
207    }
208}