use std::time::Duration;
use iroh_gossip::api::{Event as IrohEvent, GossipTopic as IrohGossipTopic};
use p2panda_core::Topic;
use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
use ractor::{ActorProcessingErr, ActorRef, SupervisionEvent};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, trace, warn};
use crate::NodeId;
use crate::address_book::AddressBook;
use crate::gossip::actors::ToGossipManager;
use crate::gossip::actors::healer::{GossipHealer, ToGossipHealer};
use crate::gossip::actors::joiner::{GossipJoiner, ToGossipJoiner};
use crate::gossip::actors::listener::GossipListener;
use crate::gossip::actors::receiver::{GossipReceiver, ToGossipReceiver};
use crate::gossip::actors::sender::{GossipSender, ToGossipSender};
use crate::utils::{ShortFormat, to_verifying_key};
pub enum ToGossipSession {
ProcessEvent(IrohEvent),
ProcessJoined(Vec<iroh::EndpointId>),
JoinNodes(Vec<iroh::EndpointId>),
}
pub struct GossipSessionState {
topic: Topic,
#[allow(unused)]
gossip_healer_actor: ActorRef<ToGossipHealer>,
gossip_joiner_actor: ActorRef<ToGossipJoiner>,
gossip_sender_actor: ActorRef<ToGossipSender>,
gossip_receiver_actor: ActorRef<ToGossipReceiver>,
gossip_actor: ActorRef<ToGossipManager>,
}
#[derive(Default)]
pub struct GossipSession;
impl ThreadLocalActor for GossipSession {
type State = GossipSessionState;
type Msg = ToGossipSession;
type Arguments = (
NodeId,
AddressBook,
Topic,
IrohGossipTopic,
mpsc::Receiver<Vec<u8>>,
oneshot::Receiver<()>,
ActorRef<ToGossipManager>,
ThreadLocalActorSpawner,
);
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (
my_node_id,
address_book,
topic,
subscription,
receiver_from_user,
gossip_joined,
gossip_actor,
gossip_thread_pool,
) = args;
let (sender, receiver) = subscription.split();
let (gossip_receiver_actor, _) = GossipReceiver::spawn_linked(
None,
(receiver, myself.clone()),
myself.clone().into(),
gossip_thread_pool.clone(),
)
.await?;
let (gossip_sender_actor, _) = GossipSender::spawn_linked(
None,
(sender.clone(), gossip_joined),
myself.clone().into(),
gossip_thread_pool.clone(),
)
.await?;
let (_gossip_listener_actor, _) = GossipListener::spawn_linked(
None,
(receiver_from_user, gossip_sender_actor.clone()),
myself.clone().into(),
gossip_thread_pool.clone(),
)
.await?;
let (gossip_joiner_actor, _) = GossipJoiner::spawn_linked(
None,
sender,
myself.clone().into(),
gossip_thread_pool.clone(),
)
.await?;
let (gossip_healer_actor, _) = GossipHealer::spawn_linked(
None,
(my_node_id, address_book, topic, myself.clone()),
myself.clone().into(),
gossip_thread_pool.clone(),
)
.await?;
let state = GossipSessionState {
topic,
gossip_healer_actor,
gossip_joiner_actor,
gossip_sender_actor,
gossip_receiver_actor,
gossip_actor,
};
Ok(state)
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.gossip_healer_actor.stop(None);
state.gossip_joiner_actor.stop_and_wait(None, None).await?;
state.gossip_sender_actor.stop_and_wait(None, None).await?;
state
.gossip_receiver_actor
.stop_and_wait(None, None)
.await?;
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToGossipSession::ProcessJoined(nodes) => {
let topic = state.topic;
let nodes: Vec<NodeId> = nodes.into_iter().map(to_verifying_key).collect();
let session_id = myself.get_id();
let _ = state.gossip_actor.cast(ToGossipManager::Joined {
topic,
nodes,
session_id,
});
}
ToGossipSession::ProcessEvent(event) => match event {
IrohEvent::Lagged => {
warn!("gossip session actor: missed messages - dropping gossip event")
}
IrohEvent::Received(msg) => {
let bytes = msg.content.into();
let delivered_from = to_verifying_key(msg.delivered_from);
let delivery_scope = msg.scope;
let topic = state.topic;
let session_id = myself.get_id();
let _ = state.gossip_actor.cast(ToGossipManager::ReceivedMessage {
bytes,
delivered_from,
delivery_scope,
topic,
session_id,
});
}
IrohEvent::NeighborUp(peer) => {
let node_id = to_verifying_key(peer);
let session_id = myself.get_id();
let _ = state.gossip_actor.cast(ToGossipManager::NeighborUp {
node_id,
session_id,
});
}
IrohEvent::NeighborDown(peer) => {
let node_id = to_verifying_key(peer);
let session_id = myself.get_id();
let _ = state.gossip_actor.cast(ToGossipManager::NeighborDown {
node_id,
session_id,
});
}
},
ToGossipSession::JoinNodes(nodes) => {
debug!(
topic = %state.topic.fmt_short(),
nodes = %nodes.fmt_short(),
"(re-) join gossip overlay"
);
let _ = state
.gossip_joiner_actor
.cast(ToGossipJoiner::JoinNodes(nodes));
}
}
Ok(())
}
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorStarted(actor) => {
let actor_id = actor.get_id();
if actor_id == state.gossip_sender_actor.get_id() {
trace!(%actor_id, "received ready from gossip sender",)
} else if actor_id == state.gossip_receiver_actor.get_id() {
trace!(%actor_id, "received ready from gossip receiver",)
}
}
SupervisionEvent::ActorTerminated(actor, _last_state, reason) => {
let actor_id = actor.get_id();
if actor_id == state.gossip_sender_actor.get_id() {
debug!(%actor_id, "gossip sender actor terminated: {reason:?}",);
} else if actor_id == state.gossip_receiver_actor.get_id() {
debug!(%actor_id, "gossip receiver actor terminated: {reason:?}",);
}
myself
.drain_children_and_wait(Some(Duration::from_millis(100)))
.await;
myself.stop(Some("lost connection to gossip overlay".to_string()));
}
SupervisionEvent::ActorFailed(actor, panic_msg) => {
let actor_id = actor.get_id();
if actor_id == state.gossip_sender_actor.get_id() {
warn!(%actor_id, "gossip sender actor failed: {panic_msg:#?}",);
} else if actor_id == state.gossip_receiver_actor.get_id() {
warn!(%actor_id, "gossip receiver actor failed: {panic_msg:#?}",);
}
myself
.drain_children_and_wait(Some(Duration::from_millis(100)))
.await;
myself.stop(Some("lost connection to gossip overlay".to_string()));
}
_ => (),
}
Ok(())
}
}