Skip to main content

ant_node/replication/
fresh.rs

1//! Fresh replication (Section 6.1).
2//!
3//! When a node accepts a newly written record with valid `PoP`:
4//! 1. Store locally (already done by chunk handler).
5//! 2. Send fresh offers to `CLOSE_GROUP_SIZE` nearest peers (excluding self).
6//! 3. Send `PaidNotify` to all peers in `PaidCloseGroup(K)`.
7
8use std::sync::Arc;
9
10use crate::logging::{debug, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14use tokio::sync::Semaphore;
15
16use crate::ant_protocol::XorName;
17use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
18use crate::replication::paid_list::PaidList;
19use crate::replication::protocol::{
20    FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody,
21};
22
23/// A newly-stored chunk that needs fresh replication.
24///
25/// Sent from the chunk PUT handler to the replication engine via an
26/// unbounded channel so that the PUT response is not blocked by
27/// replication fan-out.
28pub struct FreshWriteEvent {
29    /// Content-address of the stored chunk.
30    pub key: XorName,
31    /// The chunk data.
32    pub data: Vec<u8>,
33    /// Serialized proof-of-payment.
34    pub payment_proof: Vec<u8>,
35}
36
37/// Execute fresh replication for a newly accepted record.
38///
39/// Sends fresh offers to close group members and `PaidNotify` to
40/// `PaidCloseGroup`. Both are fire-and-forget (no ack tracking or retry per
41/// Section 6.1, rule 8).
42///
43/// The `send_semaphore` limits how many outbound chunk transfers can be
44/// in-flight concurrently across the entire replication engine, preventing
45/// bandwidth saturation on home broadband connections.
46pub async fn replicate_fresh(
47    key: &XorName,
48    data: &[u8],
49    proof_of_payment: &[u8],
50    p2p_node: &Arc<P2PNode>,
51    paid_list: &Arc<PaidList>,
52    config: &ReplicationConfig,
53    send_semaphore: &Arc<Semaphore>,
54) {
55    let self_id = *p2p_node.peer_id();
56
57    // Rule 6: Node that validates PoP adds K to PaidForList(self).
58    if let Err(e) = paid_list.insert(key).await {
59        warn!("Failed to add key {} to PaidForList: {e}", hex::encode(key));
60    }
61
62    // Rule 2-3: Send fresh offers to CLOSE_GROUP_SIZE nearest peers
63    // (excluding self). Use self-inclusive query to get the true close group,
64    // then filter self out.
65    let closest = p2p_node
66        .dht_manager()
67        .find_closest_nodes_local_with_self(key, config.close_group_size)
68        .await;
69    let target_peers: Vec<PeerId> = closest
70        .iter()
71        .filter(|n| n.peer_id != self_id)
72        .map(|n| n.peer_id)
73        .collect();
74
75    let offer = FreshReplicationOffer {
76        key: *key,
77        data: data.to_vec(),
78        proof_of_payment: proof_of_payment.to_vec(),
79    };
80    let request_id = rand::thread_rng().gen::<u64>();
81    let offer_msg = ReplicationMessage {
82        request_id,
83        body: ReplicationMessageBody::FreshReplicationOffer(offer),
84    };
85
86    let Ok(encoded) = offer_msg.encode() else {
87        warn!(
88            "Failed to encode FreshReplicationOffer for {}",
89            hex::encode(key),
90        );
91        return;
92    };
93    for peer in &target_peers {
94        let p2p = Arc::clone(p2p_node);
95        let data = encoded.clone();
96        let peer_id = *peer;
97        let sem = Arc::clone(send_semaphore);
98        tokio::spawn(async move {
99            // Acquire a permit before sending — this caps the number of
100            // concurrent outbound replication transfers across the engine.
101            let _permit = sem.acquire().await;
102            debug!(
103                "Replication send permit acquired for peer {peer_id} ({} available)",
104                sem.available_permits()
105            );
106            if let Err(e) = p2p
107                .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
108                .await
109            {
110                debug!("Failed to send fresh offer to {peer_id}: {e}");
111            }
112        });
113    }
114
115    // Rule 7-8: Send PaidNotify to every member of PaidCloseGroup(K).
116    // PaidNotify messages are small metadata (no chunk data), so they don't
117    // need semaphore gating.
118    send_paid_notify(key, proof_of_payment, p2p_node, config).await;
119
120    debug!(
121        "Fresh replication initiated for {} to {} peers + PaidNotify",
122        hex::encode(key),
123        target_peers.len()
124    );
125}
126
127/// Send `PaidNotify(K)` to every peer in `PaidCloseGroup(K)` (fire-and-forget).
128///
129/// Per Invariant 16: sender MUST attempt delivery to every member.
130async fn send_paid_notify(
131    key: &XorName,
132    proof_of_payment: &[u8],
133    p2p_node: &Arc<P2PNode>,
134    config: &ReplicationConfig,
135) {
136    let self_id = *p2p_node.peer_id();
137    let paid_group = p2p_node
138        .dht_manager()
139        .find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
140        .await;
141
142    let notify = PaidNotify {
143        key: *key,
144        proof_of_payment: proof_of_payment.to_vec(),
145    };
146    let request_id = rand::thread_rng().gen::<u64>();
147    let msg = ReplicationMessage {
148        request_id,
149        body: ReplicationMessageBody::PaidNotify(notify),
150    };
151
152    let Ok(encoded) = msg.encode() else {
153        warn!("Failed to encode PaidNotify for {}", hex::encode(key));
154        return;
155    };
156
157    for node in &paid_group {
158        if node.peer_id == self_id {
159            continue;
160        }
161        let p2p = Arc::clone(p2p_node);
162        let data = encoded.clone();
163        let peer_id = node.peer_id;
164        tokio::spawn(async move {
165            if let Err(e) = p2p
166                .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
167                .await
168            {
169                debug!("Failed to send PaidNotify to {peer_id}: {e}");
170            }
171        });
172    }
173}