use std::sync::Arc;
use crate::logging::{debug, warn};
use rand::Rng;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use tokio::sync::Semaphore;
use crate::ant_protocol::XorName;
use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
use crate::replication::paid_list::PaidList;
use crate::replication::protocol::{
FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody,
};
pub struct FreshWriteEvent {
pub key: XorName,
pub data: Vec<u8>,
pub payment_proof: Vec<u8>,
}
pub async fn replicate_fresh(
key: &XorName,
data: &[u8],
proof_of_payment: &[u8],
p2p_node: &Arc<P2PNode>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
send_semaphore: &Arc<Semaphore>,
) {
let self_id = *p2p_node.peer_id();
if let Err(e) = paid_list.insert(key).await {
warn!("Failed to add key {} to PaidForList: {e}", hex::encode(key));
}
let closest = p2p_node
.dht_manager()
.find_closest_nodes_local_with_self(key, config.close_group_size)
.await;
let target_peers: Vec<PeerId> = closest
.iter()
.filter(|n| n.peer_id != self_id)
.map(|n| n.peer_id)
.collect();
let offer = FreshReplicationOffer {
key: *key,
data: data.to_vec(),
proof_of_payment: proof_of_payment.to_vec(),
};
let request_id = rand::thread_rng().gen::<u64>();
let offer_msg = ReplicationMessage {
request_id,
body: ReplicationMessageBody::FreshReplicationOffer(offer),
};
let Ok(encoded) = offer_msg.encode() else {
warn!(
"Failed to encode FreshReplicationOffer for {}",
hex::encode(key),
);
return;
};
for peer in &target_peers {
let p2p = Arc::clone(p2p_node);
let data = encoded.clone();
let peer_id = *peer;
let sem = Arc::clone(send_semaphore);
tokio::spawn(async move {
let _permit = sem.acquire().await;
debug!(
"Replication send permit acquired for peer {peer_id} ({} available)",
sem.available_permits()
);
if let Err(e) = p2p
.send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
.await
{
debug!("Failed to send fresh offer to {peer_id}: {e}");
}
});
}
send_paid_notify(key, proof_of_payment, p2p_node, config).await;
debug!(
"Fresh replication initiated for {} to {} peers + PaidNotify",
hex::encode(key),
target_peers.len()
);
}
async fn send_paid_notify(
key: &XorName,
proof_of_payment: &[u8],
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) {
let self_id = *p2p_node.peer_id();
let paid_group = p2p_node
.dht_manager()
.find_closest_nodes_local_with_self(key, config.paid_list_close_group_size)
.await;
let notify = PaidNotify {
key: *key,
proof_of_payment: proof_of_payment.to_vec(),
};
let request_id = rand::thread_rng().gen::<u64>();
let msg = ReplicationMessage {
request_id,
body: ReplicationMessageBody::PaidNotify(notify),
};
let Ok(encoded) = msg.encode() else {
warn!("Failed to encode PaidNotify for {}", hex::encode(key));
return;
};
for node in &paid_group {
if node.peer_id == self_id {
continue;
}
let p2p = Arc::clone(p2p_node);
let data = encoded.clone();
let peer_id = node.peer_id;
tokio::spawn(async move {
if let Err(e) = p2p
.send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[])
.await
{
debug!("Failed to send PaidNotify to {peer_id}: {e}");
}
});
}
}