use super::*;
use crate::actor::{Actor, ActorProcessingErr, ActorRef};
use crate::graph::AsyncBoundaryExecutionConfig;
pub(crate) enum AsyncBoundaryMessage<T> {
Item(T),
Done,
Failed(StreamError),
}
pub(crate) enum RactorBoundaryCommand {
Run,
}
#[cfg(feature = "cluster")]
impl ractor::Message for RactorBoundaryCommand {}
pub(crate) struct RactorBoundarySourceActor<I, T> {
_marker: PhantomData<fn() -> (I, T)>,
}
impl<I, T> RactorBoundarySourceActor<I, T> {
pub(crate) fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
pub(crate) struct RactorBoundarySourceState<I, T> {
pub(crate) input: Option<I>,
pub(crate) output: ractor::concurrency::MpscSender<AsyncBoundaryMessage<T>>,
}
impl<I, T> Actor for RactorBoundarySourceActor<I, T>
where
I: Iterator<Item = StreamResult<T>> + Send + 'static,
T: Send + 'static,
{
type Msg = RactorBoundaryCommand;
type State = RactorBoundarySourceState<I, T>;
type Arguments = RactorBoundarySourceState<I, T>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
RactorBoundaryCommand::Run => {
let input = state.input.take().ok_or_else(|| {
actor_processing_error(StreamError::GraphValidation(
"ractor async boundary source actor was run more than once".into(),
))
})?;
feed_ractor_boundary_input(input, &state.output)
.await
.map_err(actor_processing_error)?;
myself.stop(None);
}
}
Ok(())
}
}
struct LinearAsyncBoundaryStream<T> {
receiver: ractor::concurrency::MpscReceiver<AsyncBoundaryMessage<T>>,
actor: Option<ActorRef<RactorBoundaryCommand>>,
handle: Option<ractor::concurrency::JoinHandle<()>>,
cancelled: Arc<AtomicBool>,
terminated: bool,
}
impl<T> Iterator for LinearAsyncBoundaryStream<T>
where
T: Send + 'static,
{
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
match recv_boundary_message(&mut self.receiver) {
Some(AsyncBoundaryMessage::Item(item)) => Some(Ok(item)),
Some(AsyncBoundaryMessage::Done) => {
self.terminated = true;
self.join_actor().err().map(Err)
}
Some(AsyncBoundaryMessage::Failed(error)) => {
self.terminated = true;
let _ = self.join_actor();
Some(Err(error))
}
None => {
self.terminated = true;
let _ = self.join_actor();
Some(Err(StreamError::AbruptTermination))
}
}
}
}
impl<T> LinearAsyncBoundaryStream<T> {
fn join_actor(&mut self) -> StreamResult<()> {
self.cancelled.store(true, Ordering::SeqCst);
if let Some(actor) = self.actor.take() {
actor.stop(None);
}
if let Some(handle) = self.handle.take() {
block_on_ractor_boundary(async move { join_ractor_boundary_actor(handle).await })
} else {
Ok(())
}
}
}
impl<T> Drop for LinearAsyncBoundaryStream<T> {
fn drop(&mut self) {
self.cancelled.store(true, Ordering::SeqCst);
if let Some(actor) = self.actor.take() {
actor.stop(None);
}
}
}
pub(super) fn linear_async_boundary_stream<T>(
input: BoxStream<T>,
materializer: &Materializer,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<BoxStream<T>>
where
T: Send + 'static,
{
if config.buffer_size == 0 {
return Err(StreamError::GraphValidation(
"linear async boundary execution requires buffer_size greater than zero".into(),
));
}
let (sender, receiver) = ractor::concurrency::mpsc_bounded(config.buffer_size);
let cancelled = Arc::new(AtomicBool::new(false));
let input = runtime_checked_stream(
input,
Arc::clone(&materializer.inner.state),
Some(Arc::clone(&cancelled)),
);
let (actor, handle) = block_on_ractor_boundary(async move {
Actor::spawn(
None,
RactorBoundarySourceActor::<_, T>::new(),
RactorBoundarySourceState {
input: Some(input),
output: sender,
},
)
.await
.map_err(ractor_spawn_error)
})?;
if actor.send_message(RactorBoundaryCommand::Run).is_err() {
actor.stop(None);
let _ = block_on_ractor_boundary(async move { join_ractor_boundary_actor(handle).await });
return Err(StreamError::AbruptTermination);
}
Ok(Box::new(LinearAsyncBoundaryStream {
receiver,
actor: Some(actor),
handle: Some(handle),
cancelled,
terminated: false,
}))
}
pub(crate) fn ractor_boundary_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
static RUNTIME: OnceLock<Result<tokio::runtime::Runtime, String>> = OnceLock::new();
match RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.build()
.map_err(|error| format!("ractor async boundary runtime failed to start: {error}"))
}) {
Ok(runtime) => Ok(runtime),
Err(error) => Err(StreamError::Failed(error.clone())),
}
}
pub(crate) fn block_on_ractor_boundary<F, T>(future: F) -> StreamResult<T>
where
F: Future<Output = StreamResult<T>> + Send,
T: Send,
{
let runtime = ractor_boundary_runtime()?;
if tokio::runtime::Handle::try_current().is_ok() {
thread::scope(|scope| {
let handle = scope.spawn(move || runtime.block_on(future));
handle.join().map_err(|_| {
StreamError::Failed("ractor async boundary runtime thread panicked".into())
})?
})
} else {
runtime.block_on(future)
}
}
pub(crate) async fn feed_ractor_boundary_input<I, T>(
input: I,
output: &ractor::concurrency::MpscSender<AsyncBoundaryMessage<T>>,
) -> StreamResult<()>
where
I: Iterator<Item = StreamResult<T>>,
{
for item in input {
match item {
Ok(item) => output
.send(AsyncBoundaryMessage::Item(item))
.await
.map_err(|_| StreamError::AbruptTermination)?,
Err(error) => {
let _ = output
.send(AsyncBoundaryMessage::Failed(error.clone()))
.await;
return Err(error);
}
}
}
output
.send(AsyncBoundaryMessage::Done)
.await
.map_err(|_| StreamError::AbruptTermination)
}
fn recv_boundary_message<T>(
receiver: &mut ractor::concurrency::MpscReceiver<AsyncBoundaryMessage<T>>,
) -> Option<AsyncBoundaryMessage<T>>
where
T: Send,
{
if tokio::runtime::Handle::try_current().is_ok() {
thread::scope(|scope| {
let handle = scope.spawn(|| receiver.blocking_recv());
handle.join().ok().flatten()
})
} else {
receiver.blocking_recv()
}
}
async fn join_ractor_boundary_actor(
handle: ractor::concurrency::JoinHandle<()>,
) -> StreamResult<()> {
handle.await.map_err(|error| {
StreamError::Failed(format!("ractor async boundary actor task failed: {error}"))
})
}
fn ractor_spawn_error(error: ractor::SpawnErr) -> StreamError {
StreamError::Failed(format!(
"ractor async boundary actor failed to spawn: {error}"
))
}
fn actor_processing_error(error: StreamError) -> ActorProcessingErr {
Box::new(error)
}