datum-core 0.6.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use super::*;
use crate::actor::{Actor, ActorProcessingErr, ActorRef};
use crate::graph::AsyncBoundaryExecutionConfig;

pub(crate) enum AsyncBoundaryMessage<T> {
    Item(T),
    Done,
    Failed(StreamError),
}

pub(crate) enum RactorBoundaryCommand {
    Run,
}

#[cfg(feature = "cluster")]
impl ractor::Message for RactorBoundaryCommand {}

pub(crate) struct RactorBoundarySourceActor<I, T> {
    _marker: PhantomData<fn() -> (I, T)>,
}

impl<I, T> RactorBoundarySourceActor<I, T> {
    pub(crate) fn new() -> Self {
        Self {
            _marker: PhantomData,
        }
    }
}

pub(crate) struct RactorBoundarySourceState<I, T> {
    pub(crate) input: Option<I>,
    pub(crate) output: ractor::concurrency::MpscSender<AsyncBoundaryMessage<T>>,
}

impl<I, T> Actor for RactorBoundarySourceActor<I, T>
where
    I: Iterator<Item = StreamResult<T>> + Send + 'static,
    T: Send + 'static,
{
    type Msg = RactorBoundaryCommand;
    type State = RactorBoundarySourceState<I, T>;
    type Arguments = RactorBoundarySourceState<I, T>;

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        args: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(args)
    }

    async fn handle(
        &self,
        myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match message {
            RactorBoundaryCommand::Run => {
                let input = state.input.take().ok_or_else(|| {
                    actor_processing_error(StreamError::GraphValidation(
                        "ractor async boundary source actor was run more than once".into(),
                    ))
                })?;
                feed_ractor_boundary_input(input, &state.output)
                    .await
                    .map_err(actor_processing_error)?;
                myself.stop(None);
            }
        }
        Ok(())
    }
}

struct LinearAsyncBoundaryStream<T> {
    receiver: ractor::concurrency::MpscReceiver<AsyncBoundaryMessage<T>>,
    actor: Option<ActorRef<RactorBoundaryCommand>>,
    handle: Option<ractor::concurrency::JoinHandle<()>>,
    cancelled: Arc<AtomicBool>,
    terminated: bool,
}

impl<T> Iterator for LinearAsyncBoundaryStream<T>
where
    T: Send + 'static,
{
    type Item = StreamResult<T>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.terminated {
            return None;
        }

        match recv_boundary_message(&mut self.receiver) {
            Some(AsyncBoundaryMessage::Item(item)) => Some(Ok(item)),
            Some(AsyncBoundaryMessage::Done) => {
                self.terminated = true;
                self.join_actor().err().map(Err)
            }
            Some(AsyncBoundaryMessage::Failed(error)) => {
                self.terminated = true;
                let _ = self.join_actor();
                Some(Err(error))
            }
            None => {
                self.terminated = true;
                let _ = self.join_actor();
                Some(Err(StreamError::AbruptTermination))
            }
        }
    }
}

impl<T> LinearAsyncBoundaryStream<T> {
    fn join_actor(&mut self) -> StreamResult<()> {
        self.cancelled.store(true, Ordering::SeqCst);
        if let Some(actor) = self.actor.take() {
            actor.stop(None);
        }
        if let Some(handle) = self.handle.take() {
            block_on_ractor_boundary(async move { join_ractor_boundary_actor(handle).await })
        } else {
            Ok(())
        }
    }
}

impl<T> Drop for LinearAsyncBoundaryStream<T> {
    fn drop(&mut self) {
        self.cancelled.store(true, Ordering::SeqCst);
        if let Some(actor) = self.actor.take() {
            actor.stop(None);
        }
    }
}

pub(super) fn linear_async_boundary_stream<T>(
    input: BoxStream<T>,
    materializer: &Materializer,
    config: AsyncBoundaryExecutionConfig,
) -> StreamResult<BoxStream<T>>
where
    T: Send + 'static,
{
    if config.buffer_size == 0 {
        return Err(StreamError::GraphValidation(
            "linear async boundary execution requires buffer_size greater than zero".into(),
        ));
    }

    let (sender, receiver) = ractor::concurrency::mpsc_bounded(config.buffer_size);
    let cancelled = Arc::new(AtomicBool::new(false));
    let input = runtime_checked_stream(
        input,
        Arc::clone(&materializer.inner.state),
        Some(Arc::clone(&cancelled)),
    );
    let (actor, handle) = block_on_ractor_boundary(async move {
        Actor::spawn(
            None,
            RactorBoundarySourceActor::<_, T>::new(),
            RactorBoundarySourceState {
                input: Some(input),
                output: sender,
            },
        )
        .await
        .map_err(ractor_spawn_error)
    })?;

    if actor.send_message(RactorBoundaryCommand::Run).is_err() {
        actor.stop(None);
        let _ = block_on_ractor_boundary(async move { join_ractor_boundary_actor(handle).await });
        return Err(StreamError::AbruptTermination);
    }

    Ok(Box::new(LinearAsyncBoundaryStream {
        receiver,
        actor: Some(actor),
        handle: Some(handle),
        cancelled,
        terminated: false,
    }))
}

pub(crate) fn ractor_boundary_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
    static RUNTIME: OnceLock<Result<tokio::runtime::Runtime, String>> = OnceLock::new();

    match RUNTIME.get_or_init(|| {
        tokio::runtime::Builder::new_multi_thread()
            .build()
            .map_err(|error| format!("ractor async boundary runtime failed to start: {error}"))
    }) {
        Ok(runtime) => Ok(runtime),
        Err(error) => Err(StreamError::Failed(error.clone())),
    }
}

pub(crate) fn block_on_ractor_boundary<F, T>(future: F) -> StreamResult<T>
where
    F: Future<Output = StreamResult<T>> + Send,
    T: Send,
{
    let runtime = ractor_boundary_runtime()?;
    if tokio::runtime::Handle::try_current().is_ok() {
        thread::scope(|scope| {
            let handle = scope.spawn(move || runtime.block_on(future));
            handle.join().map_err(|_| {
                StreamError::Failed("ractor async boundary runtime thread panicked".into())
            })?
        })
    } else {
        runtime.block_on(future)
    }
}

pub(crate) async fn feed_ractor_boundary_input<I, T>(
    input: I,
    output: &ractor::concurrency::MpscSender<AsyncBoundaryMessage<T>>,
) -> StreamResult<()>
where
    I: Iterator<Item = StreamResult<T>>,
{
    for item in input {
        match item {
            Ok(item) => output
                .send(AsyncBoundaryMessage::Item(item))
                .await
                .map_err(|_| StreamError::AbruptTermination)?,
            Err(error) => {
                let _ = output
                    .send(AsyncBoundaryMessage::Failed(error.clone()))
                    .await;
                return Err(error);
            }
        }
    }
    output
        .send(AsyncBoundaryMessage::Done)
        .await
        .map_err(|_| StreamError::AbruptTermination)
}

fn recv_boundary_message<T>(
    receiver: &mut ractor::concurrency::MpscReceiver<AsyncBoundaryMessage<T>>,
) -> Option<AsyncBoundaryMessage<T>>
where
    T: Send,
{
    if tokio::runtime::Handle::try_current().is_ok() {
        thread::scope(|scope| {
            let handle = scope.spawn(|| receiver.blocking_recv());
            handle.join().ok().flatten()
        })
    } else {
        receiver.blocking_recv()
    }
}

async fn join_ractor_boundary_actor(
    handle: ractor::concurrency::JoinHandle<()>,
) -> StreamResult<()> {
    handle.await.map_err(|error| {
        StreamError::Failed(format!("ractor async boundary actor task failed: {error}"))
    })
}

fn ractor_spawn_error(error: ractor::SpawnErr) -> StreamError {
    StreamError::Failed(format!(
        "ractor async boundary actor failed to spawn: {error}"
    ))
}

fn actor_processing_error(error: StreamError) -> ActorProcessingErr {
    Box::new(error)
}