use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use iroh::endpoint::Connection;
use iroh::protocol::ProtocolHandler;
use p2panda_core::Topic;
use p2panda_sync::FromSync;
use p2panda_sync::protocols::{TopicHandshakeAcceptor, TopicHandshakeEvent, TopicHandshakeMessage};
use p2panda_sync::traits::{Manager as SyncManagerTrait, Protocol};
use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
use ractor::{ActorId, ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent};
use tokio::sync::{RwLock, broadcast};
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use crate::cbor::{into_cbor_sink, into_cbor_stream};
use crate::gossip::{Gossip, GossipEvent, GossipHandle};
use crate::iroh_endpoint::Endpoint;
use crate::sync::actors::{ToTopicManager, TopicManager};
use crate::utils::{ShortFormat, to_verifying_key};
use crate::{NodeId, ProtocolId};
type IsLiveModeEnabled = bool;
const GOSSIP_TOPIC_MIX_VALUE: [u8; 32] = [
253, 6, 251, 217, 173, 228, 215, 244, 130, 181, 150, 142, 220, 244, 49, 219, 35, 94, 163, 197,
229, 93, 143, 227, 97, 61, 38, 202, 63, 250, 26, 233,
];
pub enum ToSyncManager<M, E> {
Create(
Topic,
IsLiveModeEnabled,
RpcReplyPort<ActorRef<ToTopicManager<M>>>,
),
Subscribe(
Topic,
RpcReplyPort<Option<broadcast::Receiver<FromSync<E>>>>,
),
Close(Topic),
InitiateSync(Topic, NodeId),
Accept(NodeId, Topic, Connection),
EndSync(Topic, NodeId),
RegisterProtocol,
}
type TopicManagerReceivers<E> = HashMap<Topic, broadcast::Receiver<FromSync<E>>>;
struct TopicManagers<T> {
topic_manager_map: HashMap<Topic, (ActorRef<ToTopicManager<T>>, IsLiveModeEnabled)>,
actor_topic_map: HashMap<ActorId, Topic>,
}
type GossipHandles = HashMap<Topic, (GossipHandle, JoinHandle<()>)>;
type GossipTopicMap = Arc<RwLock<HashMap<Topic, Topic>>>;
impl<T> Default for TopicManagers<T> {
fn default() -> Self {
Self {
topic_manager_map: Default::default(),
actor_topic_map: Default::default(),
}
}
}
pub struct SyncManagerState<M>
where
M: SyncManagerTrait<Topic> + Send + 'static,
{
protocol_id: ProtocolId,
endpoint: Endpoint,
gossip: Gossip,
gossip_handles: GossipHandles,
topic_managers: TopicManagers<M::Message>,
sync_receivers: TopicManagerReceivers<M::Event>,
gossip_topics: GossipTopicMap,
sync_args: M::Args,
thread_pool: ThreadLocalActorSpawner,
}
impl<M> SyncManagerState<M>
where
M: SyncManagerTrait<Topic> + Send + 'static,
{
fn drop_topic_state(&mut self, topic: &Topic) {
self.topic_managers.topic_manager_map.remove(topic);
self.sync_receivers.remove(topic);
if let Some((_, handle)) = self.gossip_handles.remove(topic) {
handle.abort();
}
}
async fn spawn_membership_task(
&mut self,
myself: &ActorRef<ToSyncManager<M::Message, M::Event>>,
topic: Topic,
) -> Result<(), ActorProcessingErr> {
let gossip_topic = derive_topic(topic, GOSSIP_TOPIC_MIX_VALUE);
self.gossip_topics.write().await.insert(gossip_topic, topic);
debug!(
sync_topic = topic.fmt_short(),
gossip_topic = gossip_topic.fmt_short(),
"join gossip overlay for peer-sampling",
);
let mut events = self.gossip.events().await?;
let gossip_handle = self.gossip.stream(gossip_topic).await?;
let gossip_events_handle = {
let myself = myself.clone();
let gossip_topics = self.gossip_topics.clone();
tokio::spawn(async move {
loop {
let Ok(event) = events.recv().await else {
break;
};
let (topic_from_event, nodes, is_initiate) = match event {
GossipEvent::Joined { topic, ref nodes } => {
(topic, Vec::from_iter(nodes.iter().cloned()), true)
}
GossipEvent::NeighbourUp { node, topic } => (topic, vec![node], true),
GossipEvent::NeighbourDown { node, topic } => (topic, vec![node], false),
GossipEvent::Left { .. } => {
continue;
}
};
let Some(sync_topic) =
gossip_topics.read().await.get(&topic_from_event).cloned()
else {
continue;
};
for node in nodes {
let message = if is_initiate {
ToSyncManager::InitiateSync(sync_topic, node)
} else {
ToSyncManager::EndSync(sync_topic, node)
};
if myself.send_message(message).is_err() {
break;
}
}
}
})
};
self.gossip_handles
.insert(topic, (gossip_handle, gossip_events_handle));
Ok(())
}
}
pub struct SyncManager<M> {
_phantom: PhantomData<M>,
}
impl<M> Default for SyncManager<M> {
fn default() -> Self {
Self {
_phantom: Default::default(),
}
}
}
impl<M> ThreadLocalActor for SyncManager<M>
where
M: SyncManagerTrait<Topic> + Send + 'static,
{
type State = SyncManagerState<M>;
type Msg = ToSyncManager<M::Message, M::Event>;
type Arguments = (ProtocolId, M::Args, Endpoint, Gossip);
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (protocol_id, sync_args, endpoint, gossip) = args;
let gossip_handles = HashMap::new();
let sync_receivers = HashMap::new();
let sync_managers = Default::default();
let thread_pool = ThreadLocalActorSpawner::new();
let _ = myself.cast(ToSyncManager::RegisterProtocol);
Ok(SyncManagerState {
protocol_id,
endpoint,
gossip,
gossip_handles,
topic_managers: sync_managers,
gossip_topics: Arc::default(),
sync_receivers,
sync_args,
thread_pool,
})
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
for (_, (actor, _)) in state.topic_managers.topic_manager_map.drain() {
actor.send_message(ToTopicManager::CloseAll)?;
}
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ToSyncManager::RegisterProtocol => {
debug!(
protocol_id = state.protocol_id.fmt_short(),
"register sync protocol",
);
state
.endpoint
.accept(
state.protocol_id.clone(),
SyncProtocolHandler {
stream_ref: myself.clone(),
},
)
.await?;
}
ToSyncManager::Create(topic, live_mode, reply) => {
if let Some((sync_manager_ref, _)) =
state.topic_managers.topic_manager_map.get(&topic)
{
let _ = reply.send(sync_manager_ref.clone());
return Ok(());
}
debug!(topic = topic.fmt_short(), live_mode, "create sync manager");
state.spawn_membership_task(&myself, topic).await?;
let (from_sync_tx, from_sync_rx) = broadcast::channel(256);
state.sync_receivers.insert(topic, from_sync_rx);
let (sync_manager_ref, _) = TopicManager::<M>::spawn_linked(
None,
(
state.protocol_id.clone(),
topic,
state.sync_args.clone(),
from_sync_tx,
state.endpoint.clone(),
),
myself.clone().into(),
state.thread_pool.clone(),
)
.await?;
state
.topic_managers
.topic_manager_map
.insert(topic, (sync_manager_ref.clone(), live_mode));
state
.topic_managers
.actor_topic_map
.insert(sync_manager_ref.get_id(), topic);
let _ = reply.send(sync_manager_ref);
}
ToSyncManager::Subscribe(topic, reply) => {
if let Some(from_sync_rx) = state.sync_receivers.get(&topic) {
let subscription = from_sync_rx.resubscribe();
let _ = reply.send(Some(subscription));
} else {
let _ = reply.send(None);
}
}
ToSyncManager::Close(topic) => {
if let Some((actor, _)) = state.topic_managers.topic_manager_map.get(&topic) {
actor.send_message(ToTopicManager::CloseAll)?;
}
if let Some((sync_manager, _)) =
state.topic_managers.topic_manager_map.remove(&topic)
{
state
.topic_managers
.actor_topic_map
.remove(&sync_manager.get_id());
sync_manager.drain()?;
}
state.drop_topic_state(&topic);
debug!(topic = topic.fmt_short(), "close sync manager");
}
ToSyncManager::InitiateSync(topic, node_id) => {
if let Some((sync_manager_actor, live_mode)) =
state.topic_managers.topic_manager_map.get(&topic)
{
debug!(
topic = topic.fmt_short(),
node_id = node_id.fmt_short(),
"initiate sync session",
);
sync_manager_actor.send_message(ToTopicManager::Initiate {
node_id,
topic,
live_mode: *live_mode,
})?;
}
}
ToSyncManager::Accept(node_id, topic, connection) => {
if let Some((sync_manager_actor, live_mode)) =
state.topic_managers.topic_manager_map.get(&topic)
{
debug!(
topic = topic.fmt_short(),
node_id = node_id.fmt_short(),
"accept sync session",
);
sync_manager_actor.send_message(ToTopicManager::Accept {
node_id,
topic,
live_mode: *live_mode,
connection,
})?;
}
}
ToSyncManager::EndSync(topic, node_id) => {
if let Some((sync_manager_actor, _)) =
state.topic_managers.topic_manager_map.get(&topic)
{
debug!(
topic = topic.fmt_short(),
node_id = node_id.fmt_short(),
"end sync session",
);
sync_manager_actor.send_message(ToTopicManager::Close { node_id })?;
}
}
}
Ok(())
}
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorStarted(actor) => {
let actor_id = actor.get_id();
if let Some(topic) = state.topic_managers.actor_topic_map.get(&actor_id) {
debug!(
%actor_id,
topic = %topic.fmt_short(),
"received ready from sync manager"
);
}
}
SupervisionEvent::ActorTerminated(actor, _last_state, reason) => {
let actor_id = actor.get_id();
if let Some(topic) = state.topic_managers.actor_topic_map.remove(&actor_id) {
debug!(
%actor_id,
topic = %topic.fmt_short(),
"sync manager terminated: {reason:?}",
);
state.drop_topic_state(&topic);
}
}
SupervisionEvent::ActorFailed(actor, panic_msg) => {
let actor_id = actor.get_id();
if let Some(topic) = state.topic_managers.actor_topic_map.remove(&actor_id) {
warn!(
%actor_id,
topic = %topic.fmt_short(),
"sync manager failed: {panic_msg:#?}",
);
myself.send_message(ToSyncManager::Close(topic))?;
}
}
_ => (),
}
Ok(())
}
}
struct SyncProtocolHandler<M, E>
where
M: Send + 'static,
E: Send + 'static,
{
stream_ref: ActorRef<ToSyncManager<M, E>>,
}
impl<M, E> std::fmt::Debug for SyncProtocolHandler<M, E>
where
M: Send + 'static,
E: Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SyncProtocolHandler").finish()
}
}
impl<M, E> ProtocolHandler for SyncProtocolHandler<M, E>
where
M: Send + 'static,
E: Send + 'static,
{
async fn accept(
&self,
connection: iroh::endpoint::Connection,
) -> Result<(), iroh::protocol::AcceptError> {
let node_id = to_verifying_key(connection.remote_id());
let (tx, rx) = connection.accept_bi().await?;
let mut tx = into_cbor_sink::<TopicHandshakeMessage<Topic>, _>(tx);
let mut rx = into_cbor_stream::<TopicHandshakeMessage<Topic>, _>(rx);
let (event_tx, _event_rx) =
futures_channel::mpsc::channel::<TopicHandshakeEvent<Topic>>(128);
let protocol = TopicHandshakeAcceptor::new(event_tx);
let topic = protocol
.run(&mut tx, &mut rx)
.await
.map_err(|err| iroh::protocol::AcceptError::from_err(err))?;
self.stream_ref
.send_message(ToSyncManager::Accept(node_id, topic, connection))
.map_err(|err| iroh::protocol::AcceptError::from_err(err))?;
Ok(())
}
}
fn derive_topic(topic: Topic, value: impl AsRef<[u8]>) -> Topic {
p2panda_core::Hash::digest([topic.as_bytes(), value.as_ref()].concat()).into()
}