use std::{
collections::{HashSet, VecDeque},
fmt::Display,
sync::Arc,
};
use actor_helper::{Action, Handle, Receiver, act_ok};
use anyhow::Result;
use futures_lite::StreamExt;
use iroh::EndpointId;
use sha2::Digest;
use crate::{MAX_MESSAGE_HASHES, Topic};
#[derive(Debug)]
pub struct GossipReceiver {
api: Handle<GossipReceiverActor, anyhow::Error>,
pub(crate) _topic_keep_alive: Option<Arc<Topic>>,
_next_channel_sender: tokio::sync::broadcast::WeakSender<Option<iroh_gossip::api::Event>>,
next_channel_receiver: tokio::sync::broadcast::Receiver<Option<iroh_gossip::api::Event>>,
_join_channel_sender: tokio::sync::broadcast::WeakSender<Option<()>>,
join_channel_receiver: tokio::sync::broadcast::Receiver<Option<()>>,
}
impl Clone for GossipReceiver {
fn clone(&self) -> Self {
let next_rx = match self._next_channel_sender.upgrade() {
Some(sender) => sender.subscribe(),
None => {
let (tx, rx) = tokio::sync::broadcast::channel(1);
drop(tx);
rx
}
};
let join_rx = match self._join_channel_sender.upgrade() {
Some(sender) => sender.subscribe(),
None => {
let (tx, rx) = tokio::sync::broadcast::channel(1);
drop(tx);
rx
}
};
Self {
api: self.api.clone(),
_topic_keep_alive: self._topic_keep_alive.clone(),
_next_channel_sender: self._next_channel_sender.clone(),
next_channel_receiver: next_rx,
_join_channel_sender: self._join_channel_sender.clone(),
join_channel_receiver: join_rx,
}
}
}
#[derive(Debug)]
pub struct GossipReceiverActor {
gossip_receiver: iroh_gossip::api::GossipReceiver,
last_message_hashes: VecDeque<[u8; 32]>,
cancel_token: tokio_util::sync::CancellationToken,
next_channel_sender: tokio::sync::broadcast::Sender<Option<iroh_gossip::api::Event>>,
join_channel_sender: tokio::sync::broadcast::Sender<Option<()>>,
}
#[derive(Debug)]
pub enum ChannelError {
Closed,
Lagged(u64),
}
impl From<tokio::sync::broadcast::error::RecvError> for ChannelError {
fn from(err: tokio::sync::broadcast::error::RecvError) -> Self {
match err {
tokio::sync::broadcast::error::RecvError::Closed => ChannelError::Closed,
tokio::sync::broadcast::error::RecvError::Lagged(skipped) => {
ChannelError::Lagged(skipped)
}
}
}
}
impl std::error::Error for ChannelError {}
impl Display for ChannelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChannelError::Closed => write!(f, "channel closed"),
ChannelError::Lagged(skipped) => {
write!(f, "channel lagged, skipped {} messages", skipped)
}
}
}
}
impl GossipReceiver {
pub fn new(
gossip_receiver: iroh_gossip::api::GossipReceiver,
cancel_token: tokio_util::sync::CancellationToken,
) -> Self {
let (next_tx, next_rx) = tokio::sync::broadcast::channel(64);
let (join_tx, join_rx) = tokio::sync::broadcast::channel(64);
let api = Handle::spawn_with(
GossipReceiverActor {
gossip_receiver,
last_message_hashes: VecDeque::with_capacity(MAX_MESSAGE_HASHES),
cancel_token,
next_channel_sender: next_tx.clone(),
join_channel_sender: join_tx.clone(),
},
|mut actor, rx| async move { actor.run(rx).await },
)
.0;
Self {
api,
_topic_keep_alive: None,
next_channel_receiver: next_rx,
_next_channel_sender: next_tx.downgrade(),
join_channel_receiver: join_rx,
_join_channel_sender: join_tx.downgrade(),
}
}
pub async fn neighbors(&self) -> Result<HashSet<EndpointId>> {
self.api
.call(act_ok!(actor => async move {
actor.gossip_receiver.neighbors().collect::<HashSet<EndpointId>>()
}))
.await
}
pub async fn is_joined(&self) -> Result<bool> {
self.api
.call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
.await
}
pub async fn next(&mut self) -> Result<iroh_gossip::api::Event, ChannelError> {
match self.next_channel_receiver.recv().await {
Ok(event) => match event {
Some(event) => Ok(event),
None => Err(ChannelError::Closed),
},
Err(err) => Err(err.into()),
}
}
pub async fn joined(&mut self) -> Result<(), ChannelError> {
if self.is_joined().await.map_err(|_| ChannelError::Closed)? {
return Ok(());
}
match self.join_channel_receiver.recv().await {
Ok(event) => match event {
Some(event) => Ok(event),
None => Err(ChannelError::Closed),
},
Err(err) => Err(err.into()),
}
}
pub async fn last_message_hashes(&self) -> Result<VecDeque<[u8; 32]>> {
self.api
.call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
.await
}
}
impl GossipReceiverActor {
async fn run(&mut self, rx: Receiver<Action<GossipReceiverActor>>) -> Result<()> {
tracing::debug!("GossipReceiver: starting gossip receiver actor");
loop {
tokio::select! {
result = rx.recv_async() => {
match result {
Ok(action) => action(self).await,
Err(_) => break Ok(()),
}
}
raw_event = self.gossip_receiver.next() => {
let event = match raw_event {
None => {
tracing::debug!("GossipReceiver: gossip receiver closed");
self.join_channel_sender.send(None).ok();
self.next_channel_sender.send(None).ok();
self.cancel_token.cancel();
break Ok(());
}
Some(Err(err)) => {
tracing::warn!("GossipReceiver: error receiving gossip event: {err}");
self.next_channel_sender.send(None).ok();
self.join_channel_sender.send(None).ok();
self.cancel_token.cancel();
break Ok(());
}
Some(Ok(ref event)) => {
match event {
iroh_gossip::api::Event::Received(msg) => {
tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
let mut hash = sha2::Sha512::new();
hash.update(&msg.content);
if let Ok(lmh) = hash.finalize()[..32].try_into() {
if self.last_message_hashes.len() == MAX_MESSAGE_HASHES {
self.last_message_hashes.pop_front();
}
self.last_message_hashes.push_back(lmh);
}
self.join_channel_sender.send(Some(())).ok();
}
iroh_gossip::api::Event::NeighborUp(pub_key) => {
tracing::debug!("GossipReceiver: neighbor UP: {}", pub_key);
self.join_channel_sender.send(Some(())).ok();
}
iroh_gossip::api::Event::NeighborDown(pub_key) => {
tracing::debug!("GossipReceiver: neighbor DOWN: {}", pub_key);
}
iroh_gossip::api::Event::Lagged => {
tracing::debug!("GossipReceiver: event stream lagged");
}
};
event.clone()
}
};
self.next_channel_sender.send(Some(event)).ok();
}
_ = self.cancel_token.cancelled() => {
self.join_channel_sender.send(None).ok();
self.next_channel_sender.send(None).ok();
break Ok(());
}
else => break Ok(()),
}
}
}
}