use std::fmt::Debug;
use std::marker::PhantomData;
use futures_util::{Stream, StreamExt};
use p2panda_sync::FromSync;
use ractor::thread_local::ThreadLocalActor;
use ractor::{ActorProcessingErr, ActorRef};
use tokio::sync::broadcast;
pub enum ToSyncPoller {
WaitForEvent,
}
pub struct SyncPollerState<St, Ev> {
stream: St,
sender: broadcast::Sender<FromSync<Ev>>,
}
pub struct SyncPoller<S, Ev> {
_marker: PhantomData<(S, Ev)>,
}
impl<St, Ev> Default for SyncPoller<St, Ev> {
fn default() -> Self {
Self {
_marker: Default::default(),
}
}
}
impl<St, Ev> ThreadLocalActor for SyncPoller<St, Ev>
where
St: Stream<Item = FromSync<Ev>> + Send + Unpin + 'static,
Ev: Debug + Send + 'static,
{
type State = SyncPollerState<St, Ev>;
type Msg = ToSyncPoller;
type Arguments = (St, broadcast::Sender<FromSync<Ev>>);
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (stream, sender) = args;
let _ = myself.cast(ToSyncPoller::WaitForEvent);
Ok(SyncPollerState { stream, sender })
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
while let Some(event) = state.stream.next().await {
state.sender.send(event).map_err(|err| err.to_string())?;
}
Ok(())
}
}