use std::marker::PhantomData;
use std::pin::Pin;
use ractor::cast;
use ractor::ActorCell;
use ractor::ActorProcessingErr;
use ractor::ActorRef;
use ractor::SpawnErr;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use crate::streams::{spawn_loop, IterationResult, Operation};
#[cfg(test)]
mod tests;
struct StreamerState<S, T, F>
where
S: Stream + ractor::State,
T: ractor::Message,
F: Fn(Option<<S as Stream>::Item>) -> T + ractor::State,
{
stream: Pin<Box<S>>,
fn_map: F,
who: ActorRef<T>,
}
struct Streamer<S, T, F>
where
S: Stream + ractor::State,
T: ractor::Message,
F: Fn(Option<<S as Stream>::Item>) -> T + ractor::State,
{
_s: PhantomData<S>,
_t: PhantomData<T>,
_f: PhantomData<F>,
}
unsafe impl<S, T, F> Sync for Streamer<S, T, F>
where
S: Stream + ractor::State,
T: ractor::Message,
F: Fn(Option<<S as Stream>::Item>) -> T + ractor::State,
{
}
#[async_trait::async_trait]
impl<S, T, F> Operation for Streamer<S, T, F>
where
S: Stream + ractor::State,
T: ractor::Message,
F: Fn(Option<<S as Stream>::Item>) -> T + ractor::State,
{
type State = StreamerState<S, T, F>;
async fn work(&self, state: &mut Self::State) -> Result<IterationResult, ActorProcessingErr> {
let item = state.stream.next().await;
let last = item.is_none();
tracing::trace!("Streamer forwarding item: last {last}");
cast!(state.who, (state.fn_map)(item))?;
let signal = if last {
IterationResult::End
} else {
IterationResult::Continue
};
Ok(signal)
}
}
pub async fn spawn_stream_pump<S, T, F>(
stream: S,
receiver: ActorRef<T>,
fn_map: F,
supervisor: Option<ActorCell>,
) -> Result<ActorCell, SpawnErr>
where
S: Stream + ractor::State,
T: ractor::Message,
F: Fn(Option<<S as Stream>::Item>) -> T + ractor::State,
{
let sup = supervisor.unwrap_or_else(|| receiver.get_cell());
let pumper = Streamer::<S, T, F> {
_s: PhantomData,
_t: PhantomData,
_f: PhantomData,
};
let pump_state = StreamerState::<S, T, F> {
fn_map,
who: receiver,
stream: Box::pin(stream),
};
spawn_loop(pumper, pump_state, Some(sup)).await
}