use actor_helper::{Action, Handle, Receiver};
use iroh::EndpointId;
use std::{collections::HashSet, time::Duration};
use crate::{GossipReceiver, GossipSender, RecordPublisher, gossip::GossipRecordContent};
use anyhow::Result;
#[derive(Debug, Clone)]
pub struct MessageOverlapMerge {
_api: Handle<MessageOverlapMergeActor, anyhow::Error>,
}
#[derive(Debug)]
struct MessageOverlapMergeActor {
record_publisher: RecordPublisher,
gossip_receiver: GossipReceiver,
gossip_sender: GossipSender,
ticker: tokio::time::Interval,
cancel_token: tokio_util::sync::CancellationToken,
max_join_peers: usize,
base_interval: Duration,
max_jitter: Duration,
}
impl MessageOverlapMerge {
pub fn new(
record_publisher: RecordPublisher,
gossip_sender: GossipSender,
gossip_receiver: GossipReceiver,
cancel_token: tokio_util::sync::CancellationToken,
max_join_peers: usize,
base_interval: Duration,
max_jitter: Duration,
) -> Result<Self> {
let base_interval = base_interval.max(Duration::from_secs(1));
let mut ticker = tokio::time::interval(base_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let api = Handle::spawn_with(
MessageOverlapMergeActor {
record_publisher,
gossip_receiver,
gossip_sender,
ticker,
cancel_token,
max_join_peers,
base_interval,
max_jitter,
},
|mut actor, rx| async move { actor.run(rx).await },
)
.0;
Ok(Self { _api: api })
}
}
impl MessageOverlapMergeActor {
async fn run(&mut self, rx: Receiver<Action<MessageOverlapMergeActor>>) -> Result<()> {
tracing::debug!("MessageOverlapMerge: starting message overlap merge actor");
loop {
tokio::select! {
result = rx.recv_async() => {
match result {
Ok(action) => action(self).await,
Err(_) => break Ok(()),
}
}
_ = self.ticker.tick() => {
tracing::debug!("MessageOverlapMerge: tick fired, checking for split-brain");
if let Err(e) = self.merge().await {
tracing::warn!("MessageOverlapMerge: error during merge: {:?}", e);
}
let jitter = if self.max_jitter > Duration::ZERO {
Duration::from_nanos((rand::random::<u128>() % self.max_jitter.as_nanos()) as u64)
} else {
Duration::ZERO
};
let next_interval = self.base_interval + jitter;
tracing::debug!("MessageOverlapMerge: next check in {}ms", next_interval.as_millis());
self.ticker.reset_after(next_interval);
}
_ = self.cancel_token.cancelled() => {
break Ok(());
}
else => break Ok(()),
}
}
}
}
impl MessageOverlapMergeActor {
async fn merge(&mut self) -> Result<()> {
let unix_minute = crate::unix_minute(0);
let mut records = self
.record_publisher
.get_records(unix_minute - 1, self.cancel_token.clone())
.await?;
records.extend(
self.record_publisher
.get_records(unix_minute, self.cancel_token.clone())
.await?,
);
let local_hashes = self.gossip_receiver.last_message_hashes().await?;
tracing::debug!(
"MessageOverlapMerge: checking {} records with {} local message hashes",
records.len(),
local_hashes.len()
);
if !local_hashes.is_empty() {
let last_message_hashes = local_hashes;
let peers_to_join = records
.iter()
.filter(|record| {
if let Ok(content) = record.content::<GossipRecordContent>() {
let remote_hashes = content
.last_message_hashes
.iter()
.filter(|last_message_hash| **last_message_hash != [0; 32])
.collect::<Vec<_>>();
!remote_hashes.is_empty()
&& remote_hashes.iter().all(|last_message_hash| {
!last_message_hashes.contains(*last_message_hash)
})
} else {
false
}
})
.collect::<Vec<_>>();
tracing::debug!(
"MessageOverlapMerge: found {} peers with no overlapping message hashes",
peers_to_join.len()
);
if !peers_to_join.is_empty() {
let active_neighbors = self.gossip_receiver.neighbors().await?;
let self_pub_key = EndpointId::from_verifying_key(self.record_publisher.pub_key());
let pub_keys = peers_to_join
.iter()
.flat_map(|&record| {
let mut peers = vec![];
if let Ok(pub_key) = EndpointId::from_bytes(&record.pub_key())
&& pub_key != self_pub_key
{
peers.push(pub_key);
}
if let Ok(content) = record.content::<GossipRecordContent>() {
for active_peer in content.active_peers {
if active_peer == [0; 32] {
continue;
}
if let Ok(pub_key) = EndpointId::from_bytes(&active_peer)
&& pub_key != self_pub_key
{
peers.push(pub_key);
}
}
}
peers
})
.filter(|pub_key| !active_neighbors.contains(pub_key))
.collect::<HashSet<_>>();
if !pub_keys.is_empty() {
tracing::debug!(
"MessageOverlapMerge: attempting to join {} pub_keys with no overlapping messages",
pub_keys.len()
);
self.gossip_sender
.join_peers(
pub_keys.iter().cloned().collect::<Vec<_>>(),
Some(self.max_join_peers),
)
.await?;
tracing::debug!(
"MessageOverlapMerge: join_peers request sent for split-brain recovery"
);
}
}
} else {
tracing::debug!(
"MessageOverlapMerge: no local message hashes yet, skipping overlap detection"
);
}
Ok(())
}
}