use std::collections::HashSet;
use p2panda_core::Topic;
use ractor::thread_local::ThreadLocalActor;
use ractor::{ActorProcessingErr, ActorRef};
use tracing::trace;
use crate::NodeId;
use crate::address_book::AddressBook;
use crate::addrs::NodeInfo;
use crate::gossip::actors::session::ToGossipSession;
use crate::utils::from_verifying_key;
use crate::watchers::WatcherReceiver;
pub enum ToGossipHealer {
SubscribeToAddressBook(Topic),
WaitForEvent,
}
pub struct GossipHealerState {
my_node_id: NodeId,
address_book: AddressBook,
topic_endpoint_ids: Vec<iroh::EndpointId>,
topic_watcher: Option<WatcherReceiver<HashSet<NodeId>>>,
node_watcher: Option<WatcherReceiver<Option<NodeInfo>>>,
gossip_session_ref: ActorRef<ToGossipSession>,
}
#[derive(Default)]
pub struct GossipHealer;
impl ThreadLocalActor for GossipHealer {
type State = GossipHealerState;
type Msg = ToGossipHealer;
type Arguments = (NodeId, AddressBook, Topic, ActorRef<ToGossipSession>);
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (my_node_id, address_book, topic, gossip_session_ref) = args;
let _ = myself.cast(ToGossipHealer::SubscribeToAddressBook(topic));
Ok(GossipHealerState {
my_node_id,
address_book,
topic_endpoint_ids: Vec::new(),
topic_watcher: None,
node_watcher: None,
gossip_session_ref,
})
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.topic_watcher.take();
state.node_watcher.take();
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToGossipHealer::SubscribeToAddressBook(topic) => {
let topic_watcher = state.address_book.watch_topic(topic, false).await?;
state.topic_watcher = Some(topic_watcher);
let node_watcher = state
.address_book
.watch_node_info(state.my_node_id, true)
.await?;
state.node_watcher = Some(node_watcher);
let _ = myself.cast(ToGossipHealer::WaitForEvent);
}
ToGossipHealer::WaitForEvent => {
let topic_watcher = state
.topic_watcher
.as_mut()
.expect("was initialised before");
let node_watcher = state.node_watcher.as_mut().expect("was initialised before");
tokio::select! {
Some(event) = topic_watcher.recv() => {
state.topic_endpoint_ids = Vec::from_iter(event.value.into_iter().filter_map(|node_id| {
if node_id != state.my_node_id {
Some(from_verifying_key(node_id))
} else {
None
}
}));
state
.gossip_session_ref
.send_message(ToGossipSession::JoinNodes(state.topic_endpoint_ids.clone()))?;
},
Some(_) = node_watcher.recv() => {
if !state.topic_endpoint_ids.is_empty() {
state
.gossip_session_ref
.send_message(ToGossipSession::JoinNodes(state.topic_endpoint_ids.clone()))?;
}
},
else => {
trace!(
"gossip healer actor: address book dropped broadcast tx - channel closed"
);
myself.stop(Some("topic_watcher channel closed".to_string()));
}
}
let _ = myself.cast(ToGossipHealer::WaitForEvent);
}
}
Ok(())
}
}