use anyhow::Result;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::bucket_log::BucketLogProvider;
use crate::crypto::PublicKey;
use crate::linked_data::Link;
use crate::mount::Manifest;
use crate::peer::protocol::bidirectional::BidirectionalHandler;
use crate::peer::protocol::messages::Message;
use crate::peer::Peer;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PingMessage {
pub bucket_id: Uuid,
pub link: Link,
pub height: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum PingReplyStatus {
NotFound,
Ahead(Link, u64),
Behind(Link, u64),
InSync,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PingReply {
pub bucket_id: Uuid,
pub status: PingReplyStatus,
}
impl PingReply {
pub fn not_found(bucket_id: Uuid) -> Self {
Self {
bucket_id,
status: PingReplyStatus::NotFound,
}
}
pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
Self {
bucket_id,
status: PingReplyStatus::Ahead(link, height),
}
}
pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
Self {
bucket_id,
status: PingReplyStatus::Behind(link, height),
}
}
pub fn in_sync(bucket_id: Uuid) -> Self {
Self {
bucket_id,
status: PingReplyStatus::InSync,
}
}
}
pub struct Ping;
impl BidirectionalHandler for Ping {
type Message = PingMessage;
type Reply = PingReply;
fn wrap_request(request: Self::Message) -> Message {
Message::Ping(request)
}
async fn handle_message<L: BucketLogProvider>(
peer: &Peer<L>,
_sender_node_id: &PublicKey,
ping: &PingMessage,
) -> PingReply {
let logs = peer.logs();
let bucket_id = ping.bucket_id;
let (link, height) = match logs.head(bucket_id, None).await {
Ok((link, height)) => (link, height),
Err(_) => {
return PingReply::not_found(bucket_id);
}
};
if height < ping.height {
PingReply::behind(bucket_id, link, height)
} else if height == ping.height {
PingReply::in_sync(bucket_id)
} else {
PingReply::ahead(bucket_id, link, height)
}
}
async fn handle_message_side_effect<L: BucketLogProvider>(
peer: &Peer<L>,
sender_node_id: &PublicKey,
ping: &PingMessage,
pong: &PingReply,
) -> Result<()>
where
L::Error: std::error::Error + Send + Sync + 'static,
{
let should_sync = peer
.logs()
.should_sync_content(ping.bucket_id)
.await
.unwrap_or(true);
match &pong.status {
PingReplyStatus::Behind(our_link, our_height) => {
if !should_sync {
tracing::debug!("Skipping sync for bucket {} (not active)", ping.bucket_id);
return Ok(());
}
tracing::info!(
"We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
ping.bucket_id,
our_height,
ping.height
);
let peer_ids = match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
Ok(manifest) => manifest.get_peer_ids(),
Err(e) => {
tracing::warn!(
"Failed to load manifest for peer list, using sender only: {}",
e
);
vec![*sender_node_id]
}
};
use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
if let Err(e) = peer
.dispatch(SyncJob::SyncBucket(SyncBucketJob {
bucket_id: ping.bucket_id,
target: SyncTarget {
link: ping.link.clone(),
height: ping.height,
peer_ids,
},
}))
.await
{
tracing::error!("Failed to dispatch sync job: {}", e);
}
}
PingReplyStatus::Ahead(_, our_height) => {
tracing::debug!(
"We're ahead of peer for bucket {} (our height: {}, their height: {})",
ping.bucket_id,
our_height,
ping.height
);
}
PingReplyStatus::InSync => {
tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
}
PingReplyStatus::NotFound => {
tracing::debug!(
"We don't have bucket {} that peer is asking about",
ping.bucket_id
);
let peer_ids = vec![*sender_node_id];
use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
if let Err(e) = peer
.dispatch(SyncJob::SyncBucket(SyncBucketJob {
bucket_id: ping.bucket_id,
target: SyncTarget {
link: ping.link.clone(),
height: ping.height,
peer_ids,
},
}))
.await
{
tracing::error!("Failed to dispatch sync job: {}", e);
}
}
}
Ok(())
}
async fn handle_reply<L: BucketLogProvider>(
peer: &Peer<L>,
recipient_node_id: &PublicKey,
pong: &PingReply,
) -> Result<()>
where
L::Error: std::error::Error + Send + Sync + 'static,
{
match &pong.status {
PingReplyStatus::NotFound => {
tracing::info!(
"Remote peer {} doesn't have bucket {}",
recipient_node_id.to_hex(),
pong.bucket_id
);
}
PingReplyStatus::Ahead(link, height) => {
tracing::info!(
"Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
recipient_node_id.to_hex(),
pong.bucket_id,
height,
link
);
let peer_ids = match peer.logs().head(pong.bucket_id, None).await {
Ok((our_link, _)) => {
match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
Ok(manifest) => manifest.get_peer_ids(),
Err(e) => {
tracing::warn!(
"Failed to load manifest for peer list, using recipient only: {}",
e
);
vec![*recipient_node_id]
}
}
}
Err(e) => {
tracing::warn!(
"Failed to get head for peer list, using recipient only: {}",
e
);
vec![*recipient_node_id]
}
};
use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
if let Err(e) = peer
.dispatch(SyncJob::SyncBucket(SyncBucketJob {
bucket_id: pong.bucket_id,
target: SyncTarget {
link: link.clone(),
height: *height,
peer_ids,
},
}))
.await
{
tracing::error!("Failed to dispatch sync job: {}", e);
}
}
PingReplyStatus::Behind(link, height) => {
tracing::info!(
"Remote peer {} is behind for bucket {} at height {} with link {:?}",
recipient_node_id.to_hex(),
pong.bucket_id,
height,
link
);
}
PingReplyStatus::InSync => {
tracing::info!(
"In sync with peer {} for bucket {}",
recipient_node_id.to_hex(),
pong.bucket_id
);
}
}
Ok(())
}
}