use futures_util::StreamExt;
use iroh_gossip::api::GossipReceiver as IrohGossipReceiver;
use ractor::thread_local::ThreadLocalActor;
use ractor::{ActorProcessingErr, ActorRef};
use tracing::error;
use crate::gossip::actors::session::ToGossipSession;
pub enum ToGossipReceiver {
WaitForEvent,
WaitForJoin,
}
pub struct GossipReceiverState {
receiver: Option<IrohGossipReceiver>,
session_ref: ActorRef<ToGossipSession>,
}
#[derive(Default)]
pub struct GossipReceiver;
impl ThreadLocalActor for GossipReceiver {
type State = GossipReceiverState;
type Msg = ToGossipReceiver;
type Arguments = (IrohGossipReceiver, ActorRef<ToGossipSession>);
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (receiver, session_ref) = args;
let _ = myself.cast(ToGossipReceiver::WaitForJoin);
Ok(GossipReceiverState {
receiver: Some(receiver),
session_ref,
})
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
drop(state.receiver.take());
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToGossipReceiver::WaitForJoin => {
if let Some(receiver) = &mut state.receiver {
receiver.joined().await?;
let nodes = receiver.neighbors().collect();
let _ = state
.session_ref
.cast(ToGossipSession::ProcessJoined(nodes));
}
let _ = myself.cast(ToGossipReceiver::WaitForEvent);
}
ToGossipReceiver::WaitForEvent => {
if let Some(receiver) = &mut state.receiver
&& let Some(received) = receiver.next().await
{
match received {
Ok(event) => {
let _ = state.session_ref.cast(ToGossipSession::ProcessEvent(event));
}
Err(err) => {
error!("gossip receiver actor: {}", err);
myself.stop(Some("channel closed".to_string()));
return Ok(());
}
}
}
let _ = myself.cast(ToGossipReceiver::WaitForEvent);
}
}
Ok(())
}
}