use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use p2panda_core::{Extensions, Hash, LogId, Operation, Topic, VerifyingKey};
use p2panda_store::logs::LogStore;
use p2panda_store::topics::TopicStore;
use p2panda_sync::protocols::TopicLogSyncEvent;
use ractor::{ActorRef, call};
use thiserror::Error;
use tokio::sync::RwLock;
use crate::gossip::Gossip;
use crate::iroh_endpoint::Endpoint;
use crate::sync::actors::ToSyncManager;
use crate::sync::handle::SyncHandle;
use crate::sync::log_sync::Builder;
#[derive(Clone, Debug)]
pub struct LogSync<S, L, E>
where
S: LogStore<Operation<E>, VerifyingKey, L, u64, Hash>
+ TopicStore<Topic, VerifyingKey, L>
+ Clone
+ Send
+ 'static,
L: LogId + Debug + Send + 'static,
E: Extensions + Send + 'static,
{
inner: Arc<RwLock<Inner<E>>>,
_phantom: PhantomData<(S, L)>,
}
#[derive(Debug)]
struct Inner<E>
where
E: Extensions + Send + 'static,
{
#[allow(clippy::type_complexity)]
actor_ref: ActorRef<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>,
}
impl<S, L, E> LogSync<S, L, E>
where
S: LogStore<Operation<E>, VerifyingKey, L, u64, Hash>
+ TopicStore<Topic, VerifyingKey, L>
+ Clone
+ Send
+ 'static,
L: LogId + Debug + Send + 'static,
E: Extensions + Send + 'static,
{
#[allow(clippy::type_complexity)]
pub(crate) fn new(
actor_ref: ActorRef<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>,
) -> Self {
Self {
inner: Arc::new(RwLock::new(Inner { actor_ref })),
_phantom: PhantomData,
}
}
pub fn builder(store: S, endpoint: Endpoint, gossip: Gossip) -> Builder<S, L, E> {
Builder::<S, L, E>::new(store, endpoint, gossip)
}
pub async fn stream(
&self,
topic: Topic,
live_mode: bool,
) -> Result<SyncHandle<Operation<E>, TopicLogSyncEvent<E>>, LogSyncError<E>> {
let inner = self.inner.read().await;
let sync_manager_ref =
call!(inner.actor_ref, ToSyncManager::Create, topic, live_mode).map_err(Box::new)?;
Ok(SyncHandle::new(
topic,
inner.actor_ref.clone(),
sync_manager_ref,
))
}
}
impl<E> Drop for Inner<E>
where
E: Extensions + Send + 'static,
{
fn drop(&mut self) {
self.actor_ref.stop(None);
}
}
#[derive(Debug, Error)]
pub enum LogSyncError<E> {
#[error(transparent)]
ActorSpawn(#[from] ractor::SpawnErr),
#[error(transparent)]
ActorRpc(#[from] Box<ractor::RactorErr<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>>),
}