use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use futures_util::{Stream, StreamExt};
use p2panda_core::Topic;
use p2panda_store::address_book::NodeInfo as _;
use ractor::{ActorRef, call};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{trace, warn};
use crate::NodeId;
use crate::address_book::{AddressBook, AddressBookError};
use crate::gossip::GossipConfig;
use crate::gossip::actors::ToGossipManager;
use crate::gossip::builder::Builder;
use crate::gossip::events::GossipEvent;
use crate::iroh_endpoint::Endpoint;
use crate::utils::ShortFormat;
type GossipSenders = HashMap<
Topic,
(
mpsc::Sender<Vec<u8>>,
broadcast::Sender<Vec<u8>>,
TopicDropGuard,
),
>;
#[derive(Clone, Debug)]
pub struct Gossip {
my_node_id: NodeId,
address_book: AddressBook,
inner: Arc<RwLock<Inner>>,
senders: Arc<RwLock<GossipSenders>>,
config: GossipConfig,
}
#[derive(Debug)]
struct Inner {
actor_ref: ActorRef<ToGossipManager>,
}
impl Gossip {
pub(crate) fn new(
actor_ref: ActorRef<ToGossipManager>,
my_node_id: NodeId,
address_book: AddressBook,
config: GossipConfig,
) -> Self {
Self {
my_node_id,
address_book,
inner: Arc::new(RwLock::new(Inner { actor_ref })),
senders: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
pub fn builder(address_book: AddressBook, endpoint: Endpoint) -> Builder {
Builder::new(address_book, endpoint)
}
pub async fn stream(&self, topic: Topic) -> Result<GossipHandle, GossipError> {
let max_message_size = self.config.max_message_size;
if let Some((to_gossip_tx, from_gossip_tx, guard)) = self.senders.read().await.get(&topic)
&& guard.has_subscriptions()
{
return Ok(GossipHandle::new(
topic,
max_message_size,
to_gossip_tx.clone(),
from_gossip_tx.clone(),
guard.clone(),
));
}
let inner = self.inner.read().await;
let guard = TopicDropGuard::new(topic, inner.actor_ref.clone());
let node_ids = {
let node_infos = self.address_book.node_infos_by_topics([topic]).await?;
node_infos
.iter()
.filter_map(|info| {
let node_id = info.id();
if node_id != self.my_node_id {
Some(node_id)
} else {
None
}
})
.collect()
};
let (to_gossip_tx, from_gossip_tx) =
call!(inner.actor_ref, ToGossipManager::Subscribe, topic, node_ids)
.map_err(Box::new)?;
let mut senders = self.senders.write().await;
senders.insert(
topic,
(
to_gossip_tx.clone(),
from_gossip_tx.clone(),
guard.clone_without_increment(),
),
);
Ok(GossipHandle::new(
topic,
max_message_size,
to_gossip_tx,
from_gossip_tx,
guard,
))
}
pub async fn events(&self) -> Result<broadcast::Receiver<GossipEvent>, GossipError> {
let inner = self.inner.read().await;
let result = call!(inner.actor_ref, ToGossipManager::Events).map_err(Box::new)?;
Ok(result)
}
}
impl Drop for Inner {
fn drop(&mut self) {
trace!(
actor_id = %self.actor_ref.get_id(),
"drop gossip actor reference"
);
self.actor_ref.drain_children();
if let Err(err) = self.actor_ref.send_message(ToGossipManager::Shutdown) {
warn!("failed to send shutdown event to gossip manager: {}", err)
}
if let Err(err) = self.actor_ref.drain() {
warn!("failed to drain gossip manager: {}", err)
}
}
}
#[derive(Debug, Error)]
pub enum GossipError {
#[error(transparent)]
ActorSpawn(#[from] ractor::SpawnErr),
#[error(transparent)]
ActorRpc(#[from] Box<ractor::RactorErr<ToGossipManager>>),
#[error(transparent)]
AddressBook(#[from] AddressBookError),
}
#[derive(Debug, Error, PartialEq)]
pub enum GossipPublishError {
#[error("message size exceeds maximum limit ({} vs {})", .0.0, .0.1)]
MessageTooLarge((usize, usize)),
#[error(transparent)]
SendError(SendError<Vec<u8>>),
}
#[derive(Clone, Debug)]
pub struct GossipHandle {
topic: Topic,
max_message_size: usize,
to_topic_tx: mpsc::Sender<Vec<u8>>,
from_gossip_tx: broadcast::Sender<Vec<u8>>,
_guard: TopicDropGuard,
}
impl GossipHandle {
fn new(
topic: Topic,
max_message_size: usize,
to_topic_tx: mpsc::Sender<Vec<u8>>,
from_gossip_tx: broadcast::Sender<Vec<u8>>,
_guard: TopicDropGuard,
) -> Self {
Self {
topic,
max_message_size,
to_topic_tx,
from_gossip_tx,
_guard,
}
}
pub async fn publish(&self, bytes: impl Into<Vec<u8>>) -> Result<(), GossipPublishError> {
let bytes: Vec<u8> = bytes.into();
let message_size = bytes.len();
if message_size > self.max_message_size {
return Err(GossipPublishError::MessageTooLarge((
message_size,
self.max_message_size,
)));
}
self.to_topic_tx
.send(bytes)
.await
.map_err(GossipPublishError::SendError)
}
pub fn subscribe(&self) -> GossipSubscription {
GossipSubscription::new(
self.topic,
self.from_gossip_tx.subscribe(),
self._guard.clone(),
)
}
pub fn topic(&self) -> Topic {
self.topic
}
}
#[derive(Debug)]
pub struct GossipSubscription {
topic: Topic,
from_topic_rx: BroadcastStream<Vec<u8>>,
_guard: TopicDropGuard,
}
impl GossipSubscription {
fn new(
topic: Topic,
from_topic_rx: broadcast::Receiver<Vec<u8>>,
_guard: TopicDropGuard,
) -> Self {
Self {
topic,
from_topic_rx: BroadcastStream::new(from_topic_rx),
_guard,
}
}
pub fn topic(&self) -> Topic {
self.topic
}
}
impl Stream for GossipSubscription {
type Item = Result<Vec<u8>, BroadcastStreamRecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.from_topic_rx.poll_next_unpin(cx)
}
}
#[derive(Debug)]
struct TopicDropGuard {
topic: Topic,
counter: Arc<AtomicUsize>,
actor_ref: ActorRef<ToGossipManager>,
ignore_drop: bool,
}
const INITIAL_COUNTER: usize = 1;
impl TopicDropGuard {
fn new(topic: Topic, actor_ref: ActorRef<ToGossipManager>) -> Self {
trace!(
topic = topic.fmt_short(),
counter = INITIAL_COUNTER,
actor_id = %actor_ref.get_id(),
"new topic drop guard"
);
Self {
topic,
counter: Arc::new(AtomicUsize::new(INITIAL_COUNTER)),
actor_ref,
ignore_drop: false,
}
}
fn counter(&self) -> usize {
self.counter.load(std::sync::atomic::Ordering::SeqCst)
}
fn has_subscriptions(&self) -> bool {
self.counter() >= INITIAL_COUNTER
}
fn clone_without_increment(&self) -> Self {
Self {
topic: self.topic,
counter: self.counter.clone(),
actor_ref: self.actor_ref.clone(),
ignore_drop: true,
}
}
}
impl Clone for TopicDropGuard {
fn clone(&self) -> Self {
let value = self
.counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
trace!(
topic = self.topic.fmt_short(),
counter = value + 1,
actor_id = %self.actor_ref.get_id(),
"clone topic drop guard +1"
);
Self {
topic: self.topic,
counter: self.counter.clone(),
actor_ref: self.actor_ref.clone(),
ignore_drop: false,
}
}
}
impl Drop for TopicDropGuard {
fn drop(&mut self) {
if self.ignore_drop {
return;
}
let previous_counter = self
.counter
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
trace!(
topic = self.topic.fmt_short(),
counter = previous_counter - 1,
actor_id = %self.actor_ref.get_id(),
"drop topic drop guard -1"
);
let no_references_left = previous_counter == INITIAL_COUNTER;
if no_references_left {
trace!(
topic = self.topic.fmt_short(),
actor_id = %self.actor_ref.get_id(),
"send unsubscribe message"
);
let _ = self
.actor_ref
.send_message(ToGossipManager::Unsubscribe(self.topic));
}
}
}
#[cfg(test)]
mod tests {
use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
use crate::gossip::GossipConfig;
use crate::gossip::actors::GossipManager;
use crate::{AddressBook, Endpoint};
use super::TopicDropGuard;
#[tokio::test]
async fn topic_drop_guard() {
let (actor_ref, _) = {
let address_book = AddressBook::builder().spawn().await.unwrap();
let endpoint = Endpoint::builder(address_book.clone())
.spawn()
.await
.unwrap();
let thread_pool = ThreadLocalActorSpawner::new();
let args = (GossipConfig::default(), address_book, endpoint);
GossipManager::spawn(None, args, thread_pool).await.unwrap()
};
let guard_1 = TopicDropGuard::new([1; 32].into(), actor_ref);
assert_eq!(guard_1.counter(), 1);
let _guard_2 = guard_1.clone();
assert_eq!(guard_1.counter(), 2);
let guard_3 = guard_1.clone();
assert_eq!(guard_1.counter(), 3);
drop(guard_3);
assert_eq!(guard_1.counter(), 2);
}
}