use std::fmt::Debug;
use futures_util::{Stream, StreamExt};
use p2panda_core::Topic;
use p2panda_sync::FromSync;
use ractor::{ActorRef, call};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use crate::sync::actors::{ToSyncManager, ToTopicManager};
#[derive(Debug)]
pub struct SyncHandle<M, E>
where
M: Clone + Send + 'static,
E: Clone + Send + 'static,
{
topic: Topic,
manager_ref: ActorRef<ToSyncManager<M, E>>,
topic_manager_ref: ActorRef<ToTopicManager<M>>,
}
impl<M, E> SyncHandle<M, E>
where
M: Clone + Send + 'static,
E: Clone + Send + 'static,
{
pub(crate) fn new(
topic: Topic,
manager_ref: ActorRef<ToSyncManager<M, E>>,
topic_manager_ref: ActorRef<ToTopicManager<M>>,
) -> Self {
Self {
topic,
manager_ref,
topic_manager_ref,
}
}
pub async fn publish(&self, data: M) -> Result<(), SyncHandleError<M, E>> {
self.topic_manager_ref
.send_message(ToTopicManager::Publish(data))
.map_err(Box::new)?;
Ok(())
}
pub async fn subscribe(&self) -> Result<SyncSubscription<E>, SyncHandleError<M, E>> {
if let Some(stream) =
call!(self.manager_ref, ToSyncManager::Subscribe, self.topic).map_err(Box::new)?
{
Ok(SyncSubscription::<E>::new(self.topic, stream))
} else {
Err(SyncHandleError::StreamNotFound)
}
}
pub fn topic(&self) -> Topic {
self.topic
}
#[cfg(test)]
pub fn initiate_session(&self, node_id: crate::NodeId) {
self.manager_ref
.send_message(ToSyncManager::InitiateSync(self.topic, node_id))
.unwrap();
}
}
impl<M, E> Drop for SyncHandle<M, E>
where
M: Clone + Send + 'static,
E: Clone + Send + 'static,
{
fn drop(&mut self) {
let _ = self
.manager_ref
.send_message(ToSyncManager::Close(self.topic));
}
}
pub struct SyncSubscription<E> {
topic: Topic,
from_sync_rx: BroadcastStream<FromSync<E>>,
}
impl<E> SyncSubscription<E>
where
E: Clone + Send + 'static,
{
pub(crate) fn new(topic: Topic, from_sync_rx: broadcast::Receiver<FromSync<E>>) -> Self {
Self {
topic,
from_sync_rx: BroadcastStream::new(from_sync_rx),
}
}
pub fn topic(&self) -> Topic {
self.topic
}
}
impl<E> Stream for SyncSubscription<E>
where
E: Clone + Send + 'static,
{
type Item = Result<FromSync<E>, BroadcastStreamRecvError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.from_sync_rx.poll_next_unpin(cx)
}
}
#[derive(Debug, Error)]
pub enum SyncHandleError<M, E> {
#[error(transparent)]
ActorRpc(#[from] Box<ractor::RactorErr<ToSyncManager<M, E>>>),
#[error(transparent)]
Publish(#[from] Box<ractor::MessagingErr<ToTopicManager<M>>>),
#[error("no stream exists for the given topic")]
StreamNotFound,
}