use std::collections::{HashSet, VecDeque};
use actor_helper::{Action, Actor, Handle, Receiver, act, act_ok};
use anyhow::Result;
use futures_lite::StreamExt;
use iroh::EndpointId;
use sha2::Digest;
#[derive(Debug, Clone)]
pub struct GossipReceiver {
api: Handle<GossipReceiverActor, anyhow::Error>,
_gossip: iroh_gossip::net::Gossip,
}
#[derive(Debug)]
pub struct GossipReceiverActor {
rx: Receiver<Action<GossipReceiverActor>>,
gossip_receiver: iroh_gossip::api::GossipReceiver,
last_message_hashes: Vec<[u8; 32]>,
msg_queue: VecDeque<Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>>,
waiters: VecDeque<
tokio::sync::oneshot::Sender<
Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
>,
>,
_gossip: iroh_gossip::net::Gossip,
}
impl GossipReceiver {
pub fn new(
gossip_receiver: iroh_gossip::api::GossipReceiver,
gossip: iroh_gossip::net::Gossip,
) -> Self {
let (api, rx) = Handle::channel();
tokio::spawn({
let gossip = gossip.clone();
async move {
let mut actor = GossipReceiverActor {
rx,
gossip_receiver,
last_message_hashes: Vec::new(),
msg_queue: VecDeque::new(),
waiters: VecDeque::new(),
_gossip: gossip.clone(),
};
let _ = actor.run().await;
}
});
Self {
api,
_gossip: gossip.clone(),
}
}
pub async fn neighbors(&self) -> HashSet<EndpointId> {
self.api
.call(act_ok!(actor => async move {
actor.gossip_receiver.neighbors().collect::<HashSet<EndpointId>>()
}))
.await
.expect("actor stopped")
}
pub async fn is_joined(&self) -> bool {
self.api
.call(act_ok!(actor => async move { actor.gossip_receiver.is_joined() }))
.await
.expect("actor stopped")
}
pub async fn next(
&self,
) -> Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.api
.call(act!(actor => actor.register_next(tx)))
.await
.ok()?;
rx.await.ok()?
}
pub async fn last_message_hashes(&self) -> Vec<[u8; 32]> {
self.api
.call(act_ok!(actor => async move { actor.last_message_hashes.clone() }))
.await
.expect("void")
}
}
impl Actor<anyhow::Error> for GossipReceiverActor {
async fn run(&mut self) -> Result<()> {
tracing::debug!("GossipReceiver: starting gossip receiver actor");
loop {
tokio::select! {
Ok(action) = self.rx.recv_async() => {
action(self).await;
}
raw_event = self.gossip_receiver.next() => {
self.msg_queue.push_front(raw_event);
if let Some(waiter) = self.waiters.pop_back() {
if let Some(event) = self.msg_queue.pop_back() {
let _ = waiter.send(event);
} else {
let _ = waiter.send(None);
}
}
if let Some(Some(Ok(event))) = self.msg_queue.front() {
match event {
iroh_gossip::api::Event::Received(msg) => {
tracing::debug!("GossipReceiver: received message from {:?}", msg.delivered_from);
let mut hash = sha2::Sha512::new();
hash.update(msg.content.clone());
if let Ok(lmh) = hash.finalize()[..32].try_into() {
self.last_message_hashes.push(lmh);
}
}
iroh_gossip::api::Event::NeighborUp(node_id) => {
tracing::debug!("GossipReceiver: neighbor UP: {}", node_id);
}
iroh_gossip::api::Event::NeighborDown(node_id) => {
tracing::debug!("GossipReceiver: neighbor DOWN: {}", node_id);
}
iroh_gossip::api::Event::Lagged => {
tracing::debug!("GossipReceiver: event stream lagged");
}
}
}
}
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
Ok(())
}
}
impl GossipReceiverActor {
pub async fn register_next(
&mut self,
waiter: tokio::sync::oneshot::Sender<
Option<Result<iroh_gossip::api::Event, iroh_gossip::api::ApiError>>,
>,
) -> Result<()> {
if let Some(event) = self.msg_queue.pop_back() {
let _ = waiter.send(event);
} else {
self.waiters.push_front(waiter);
}
Ok(())
}
}