use std::{
collections::VecDeque,
fmt,
marker::PhantomData,
panic::{AssertUnwindSafe, catch_unwind},
sync::{
Arc, Condvar, Mutex, MutexGuard,
atomic::{AtomicBool, Ordering},
},
thread::{self, Thread, ThreadId},
time::{Duration, Instant},
};
use crate::{
context::FlowWithContext,
stream::{BoxStream, Flow, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult},
};
use super::{
ASK_IDLE_YIELDS, ASK_MAX_PARK, ASK_READY_SPINS, ASK_TIME_REFRESH_ITERS, Actor, ActorFlow,
ActorRef, ActorResult, InFlightAsk, Message, ReplyPoll, ReplyPort, ReplyState,
block_on_ractor_runtime, panic_reason, recycle_reply_state, wait_for_ready_ask,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActorStatus<T> {
Ok(T),
Err(StreamError),
}
impl<T> From<StreamResult<T>> for ActorStatus<T> {
fn from(value: StreamResult<T>) -> Self {
match value {
Ok(value) => Self::Ok(value),
Err(error) => Self::Err(error),
}
}
}
impl<T> ActorStatus<T> {
fn into_result(self) -> StreamResult<T> {
match self {
Self::Ok(value) => Ok(value),
Self::Err(error) => Err(error),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActorSourceMessage<T> {
Element(T),
Complete,
Fail(String),
}
#[cfg(feature = "cluster")]
impl<T: Send + 'static> Message for ActorSourceMessage<T> {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActorSinkMessage<T> {
Element(T),
Complete,
Fail(StreamError),
}
#[cfg(feature = "cluster")]
impl<T: Send + 'static> Message for ActorSinkMessage<T> {}
#[derive(Debug)]
pub enum ActorSinkBackpressureMessage<T, Ack> {
Init(ReplyPort<Ack>),
Element(T, ReplyPort<Ack>),
Complete,
Fail(StreamError),
}
#[cfg(feature = "cluster")]
impl<T: Send + 'static, Ack: Send + 'static> Message for ActorSinkBackpressureMessage<T, Ack> {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WatchEvent {
reason: String,
}
impl WatchEvent {
#[must_use]
pub fn reason(&self) -> &str {
&self.reason
}
fn into_stream_error(self) -> StreamError {
StreamError::Failed(self.reason)
}
}
impl fmt::Display for WatchEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.reason)
}
}
pub struct ActorSource;
pub struct ActorSink;
pub struct ActorPubSub;
impl ActorFlow {
#[must_use]
pub fn ask_with_status<In, Msg, Out, F>(
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
make_msg: F,
) -> Flow<In, Out, NotUsed>
where
In: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<ActorStatus<Out>>) -> Msg + Send + Sync + 'static,
{
ask_flow_with_pending(
actor_ref,
parallelism,
timeout,
move |input, reply_to| (make_msg(input, reply_to), ()),
|(), reply| reply.into_result(),
)
}
#[must_use]
pub fn ask_with_context<In, Ctx, Msg, Out, F>(
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
make_msg: F,
) -> FlowWithContext<In, Ctx, Out, Ctx, NotUsed>
where
In: Send + 'static,
Ctx: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
{
FlowWithContext::from_flow(ask_flow_with_pending(
actor_ref,
parallelism,
timeout,
move |(input, context), reply_to| (make_msg(input, reply_to), context),
|context, reply| Ok((reply, context)),
))
}
#[must_use]
pub fn ask_with_status_and_context<In, Ctx, Msg, Out, F>(
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
make_msg: F,
) -> FlowWithContext<In, Ctx, Out, Ctx, NotUsed>
where
In: Send + 'static,
Ctx: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<ActorStatus<Out>>) -> Msg + Send + Sync + 'static,
{
FlowWithContext::from_flow(ask_flow_with_pending(
actor_ref,
parallelism,
timeout,
move |(input, context), reply_to| (make_msg(input, reply_to), context),
|context, reply| reply.into_result().map(|reply| (reply, context)),
))
}
#[must_use]
pub fn watch<T, Msg>(actor_ref: ActorRef<Msg>) -> Flow<T, T, NotUsed>
where
T: Send + 'static,
Msg: Message,
{
Flow::from_runtime_transform(move |input, materializer| {
let shared = Arc::new(WatchShared::default());
let (monitor_ref, _handle) =
spawn_watch_monitor(actor_ref.get_cell(), Arc::clone(&shared))?;
let producer_shared = Arc::clone(&shared);
let producer_completion = materializer.spawn_stream(move |cancelled| {
run_watch_upstream(input, producer_shared, cancelled)
});
Ok(Box::new(WatchStream {
shared,
monitor_ref: Some(monitor_ref),
producer_completion: Some(producer_completion),
terminated: false,
}) as BoxStream<T>)
})
}
}
impl ActorSource {
#[must_use]
pub fn actor_ref<T>() -> Source<T, ActorRef<ActorSourceMessage<T>>>
where
T: Send + 'static,
{
actor_source(None::<SourceBackpressure<ActorSourceMessage<T>>>)
}
#[must_use]
pub fn typed<T>() -> Source<T, ActorRef<ActorSourceMessage<T>>>
where
T: Send + 'static,
{
Self::actor_ref()
}
#[must_use]
pub fn actor_ref_with_backpressure<T, AckMsg>(
ack_to: ActorRef<AckMsg>,
ack_message: AckMsg,
) -> Source<T, ActorRef<ActorSourceMessage<T>>>
where
T: Send + 'static,
AckMsg: Message + Clone + Sync,
{
actor_source(Some(SourceBackpressure {
ack_to,
make_ack: Arc::new(move || ack_message.clone()),
_marker: PhantomData,
}))
}
}
impl ActorSink {
#[must_use]
pub fn actor_ref<In, Msg, Elem, Complete, Failure>(
actor_ref: ActorRef<Msg>,
make_element_message: Elem,
on_complete_message: Complete,
on_failure_message: Failure,
) -> Sink<In, StreamCompletion<NotUsed>>
where
In: Send + 'static,
Msg: Message,
Elem: Fn(In) -> Msg + Send + Sync + 'static,
Complete: Fn() -> Msg + Send + Sync + 'static,
Failure: Fn(StreamError) -> Msg + Send + Sync + 'static,
{
let make_element_message = Arc::new(make_element_message);
let on_complete_message = Arc::new(on_complete_message);
let on_failure_message = Arc::new(on_failure_message);
Sink::from_runner(move |mut input, materializer| {
let actor_ref = actor_ref.clone();
let make_element_message = Arc::clone(&make_element_message);
let on_complete_message = Arc::clone(&on_complete_message);
let on_failure_message = Arc::clone(&on_failure_message);
Ok(materializer.spawn_stream(move |cancelled| {
run_actor_ref_sink(
&mut input,
cancelled,
actor_ref,
make_element_message,
on_complete_message,
on_failure_message,
)
}))
})
}
#[must_use]
pub fn typed<In>(
actor_ref: ActorRef<ActorSinkMessage<In>>,
) -> Sink<In, StreamCompletion<NotUsed>>
where
In: Send + 'static,
{
Self::actor_ref(
actor_ref,
ActorSinkMessage::Element,
|| ActorSinkMessage::Complete,
ActorSinkMessage::Fail,
)
}
#[must_use]
pub fn actor_ref_with_backpressure<In, Msg, Ack, Init, Elem, Complete, Failure>(
actor_ref: ActorRef<Msg>,
timeout: Duration,
make_init_message: Init,
make_element_message: Elem,
on_complete_message: Complete,
on_failure_message: Failure,
) -> Sink<In, StreamCompletion<NotUsed>>
where
In: Send + 'static,
Msg: Message,
Ack: Send + 'static,
Init: Fn(ReplyPort<Ack>) -> Msg + Send + Sync + 'static,
Elem: Fn(In, ReplyPort<Ack>) -> Msg + Send + Sync + 'static,
Complete: Fn() -> Msg + Send + Sync + 'static,
Failure: Fn(StreamError) -> Msg + Send + Sync + 'static,
{
let make_init_message = Arc::new(make_init_message);
let make_element_message = Arc::new(make_element_message);
let on_complete_message = Arc::new(on_complete_message);
let on_failure_message = Arc::new(on_failure_message);
Sink::from_runner(move |mut input, materializer| {
let actor_ref = actor_ref.clone();
let make_init_message = Arc::clone(&make_init_message);
let make_element_message = Arc::clone(&make_element_message);
let on_complete_message = Arc::clone(&on_complete_message);
let on_failure_message = Arc::clone(&on_failure_message);
Ok(materializer.spawn_stream(move |cancelled| {
send_and_wait_ack(&actor_ref, timeout, |reply_to| make_init_message(reply_to))?;
run_actor_ref_backpressure_sink(
&mut input,
cancelled,
actor_ref,
timeout,
make_element_message,
on_complete_message,
on_failure_message,
)
}))
})
}
#[must_use]
pub fn typed_with_backpressure<In, Ack>(
actor_ref: ActorRef<ActorSinkBackpressureMessage<In, Ack>>,
timeout: Duration,
) -> Sink<In, StreamCompletion<NotUsed>>
where
In: Send + 'static,
Ack: Send + 'static,
{
Self::actor_ref_with_backpressure(
actor_ref,
timeout,
ActorSinkBackpressureMessage::Init,
ActorSinkBackpressureMessage::Element,
|| ActorSinkBackpressureMessage::Complete,
ActorSinkBackpressureMessage::Fail,
)
}
}
impl ActorPubSub {
#[must_use]
pub fn source<T>(group: impl Into<String>) -> Source<T, ActorRef<ActorSourceMessage<T>>>
where
T: Send + 'static,
{
let group = group.into();
ActorSource::actor_ref().map_materialized_value(move |actor_ref| {
ractor::pg::join(group.clone(), vec![actor_ref.get_cell()]);
actor_ref
})
}
#[must_use]
pub fn sink<T>(group: impl Into<String>) -> Sink<T, StreamCompletion<NotUsed>>
where
T: Clone + Send + 'static,
{
let group = group.into();
Sink::from_runner(move |mut input, materializer| {
let group = group.clone();
Ok(materializer.spawn_stream(move |cancelled| {
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(item)) => {
broadcast_to_group(&group, ActorSourceMessage::Element(item))?;
}
Some(Err(error)) => {
let _ = broadcast_to_group(
&group,
ActorSourceMessage::<T>::Fail(error.to_string()),
);
return Err(error);
}
None => {
broadcast_to_group(&group, ActorSourceMessage::<T>::Complete)?;
return Ok(NotUsed);
}
}
}
}))
})
}
}
fn ask_flow_with_pending<In, Msg, Reply, Out, Pending, Prepare, Finish>(
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
prepare: Prepare,
finish: Finish,
) -> Flow<In, Out, NotUsed>
where
In: Send + 'static,
Msg: Message,
Reply: Send + 'static,
Out: Send + 'static,
Pending: Send + 'static,
Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
Finish: Fn(Pending, Reply) -> StreamResult<Out> + Send + Sync + 'static,
{
assert!(
parallelism > 0,
"ActorFlow ask parallelism must be greater than zero"
);
let prepare = Arc::new(prepare);
let finish = Arc::new(finish);
Flow::from_transform(move |input| {
ask_ractor_ordered_with_pending(
input,
actor_ref.clone(),
parallelism,
timeout,
Arc::clone(&prepare),
Arc::clone(&finish),
)
})
}
fn ask_ractor_ordered_with_pending<In, Msg, Reply, Out, Pending, Prepare, Finish>(
mut input: BoxStream<In>,
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
prepare: Arc<Prepare>,
finish: Arc<Finish>,
) -> BoxStream<Out>
where
In: Send + 'static,
Msg: Message,
Reply: Send + 'static,
Out: Send + 'static,
Pending: Send + 'static,
Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
Finish: Fn(Pending, Reply) -> StreamResult<Out> + Send + Sync + 'static,
{
let mut in_flight = Vec::<InFlightAskWith<Reply, Pending>>::with_capacity(parallelism);
let mut next_index = 0_usize;
let mut next_to_emit = 0_usize;
let mut completed = Vec::with_capacity(parallelism);
let mut reply_pool = Vec::with_capacity(parallelism);
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(result) = take_completed_with_pending(&mut completed, next_to_emit) {
next_to_emit += 1;
return Some(result);
}
while in_flight.len() < parallelism && !input_done {
match input.next() {
Some(Ok(item)) => {
let index = next_index;
next_index += 1;
match start_ractor_ask_with_pending(
index,
actor_ref.clone(),
timeout,
item,
Arc::clone(&prepare),
&mut reply_pool,
) {
Ok(ask) => in_flight.push(ask),
Err(error) => {
completed.push((index, Err(error)));
input_done = true;
}
}
}
Some(Err(error)) => {
completed.push((next_index, Err(error)));
next_index += 1;
input_done = true;
}
None => input_done = true,
}
}
if let Some(result) = take_completed_with_pending(&mut completed, next_to_emit) {
next_to_emit += 1;
return Some(result);
}
if in_flight.is_empty() {
return None;
}
let ask = wait_for_ready_ask_with_pending(&mut in_flight, timeout, finish.as_ref());
let index = ask.index;
let result = ask.result;
recycle_reply_state(ask.state, &mut reply_pool);
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.push((index, result));
}
}))
}
fn take_completed_with_pending<Out>(
completed: &mut Vec<(usize, StreamResult<Out>)>,
index: usize,
) -> Option<StreamResult<Out>> {
let position = completed
.iter()
.position(|(completed_index, _)| *completed_index == index)?;
Some(completed.swap_remove(position).1)
}
struct InFlightAskWith<Reply, Pending> {
index: usize,
state: Option<Arc<ReplyState<Reply>>>,
pending: Option<Pending>,
deadline: Option<Instant>,
}
impl<Reply, Pending> InFlightAskWith<Reply, Pending> {
fn state(&self) -> &Arc<ReplyState<Reply>> {
self.state.as_ref().expect("in-flight ask has reply state")
}
fn into_parts(mut self) -> (Arc<ReplyState<Reply>>, Pending) {
let state = self.state.take().expect("in-flight ask has reply state");
let pending = self
.pending
.take()
.expect("in-flight ask has pending state");
(state, pending)
}
}
impl<Reply, Pending> Drop for InFlightAskWith<Reply, Pending> {
fn drop(&mut self) {
if let Some(state) = &self.state {
state.close_receiver();
}
}
}
struct CompletedAskWith<Reply, Out> {
index: usize,
result: StreamResult<Out>,
state: Arc<ReplyState<Reply>>,
}
fn start_ractor_ask_with_pending<In, Msg, Reply, Pending, Prepare>(
index: usize,
actor_ref: ActorRef<Msg>,
timeout: Duration,
input: In,
prepare: Arc<Prepare>,
reply_pool: &mut Vec<Arc<ReplyState<Reply>>>,
) -> StreamResult<InFlightAskWith<Reply, Pending>>
where
In: Send + 'static,
Msg: Message,
Reply: Send + 'static,
Pending: Send + 'static,
Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
{
let reply_state = match reply_pool.pop() {
Some(state) => {
state.reset(timeout);
state
}
None => Arc::new(ReplyState::new(timeout)),
};
let reply_to = ReplyPort::new(Arc::clone(&reply_state));
let (message, pending) =
catch_unwind(AssertUnwindSafe(|| prepare(input, reply_to))).map_err(|panic| {
StreamError::ActorAskTaskFailed {
reason: panic_reason(panic),
}
})?;
send_actor_message(&actor_ref, message)?;
Ok(InFlightAskWith {
index,
state: Some(reply_state),
pending: Some(pending),
deadline: Instant::now().checked_add(timeout),
})
}
fn wait_for_ready_ask_with_pending<Reply, Pending, Out, Finish>(
in_flight: &mut Vec<InFlightAskWith<Reply, Pending>>,
timeout: Duration,
finish: &Finish,
) -> CompletedAskWith<Reply, Out>
where
Reply: Send + 'static,
Pending: Send + 'static,
Out: Send + 'static,
Finish: Fn(Pending, Reply) -> StreamResult<Out>,
{
let mut idle_spins = 0;
let mut idle_yields = 0;
let mut time_refresh = 0_u32;
let mut now = Instant::now();
loop {
if time_refresh == 0 {
now = Instant::now();
}
time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
if let Some(ask) = take_ready_ask_with_pending(in_flight, timeout, now, finish) {
return ask;
}
if idle_spins < ASK_READY_SPINS {
idle_spins += 1;
std::hint::spin_loop();
} else if idle_yields < ASK_IDLE_YIELDS {
idle_yields += 1;
time_refresh = 0;
thread::yield_now();
} else {
idle_spins = 0;
idle_yields = 0;
time_refresh = 0;
let current = thread::current();
let registered = register_ask_waiters_with_pending(in_flight, ¤t);
now = Instant::now();
if let Some(ask) = take_ready_ask_with_pending(in_flight, timeout, now, finish) {
unregister_ask_waiters_with_pending(registered, current.id());
return ask;
}
thread::park_timeout(next_ask_park_with_pending(in_flight, now));
unregister_ask_waiters_with_pending(registered, current.id());
}
}
}
fn take_ready_ask_with_pending<Reply, Pending, Out, Finish>(
in_flight: &mut Vec<InFlightAskWith<Reply, Pending>>,
timeout: Duration,
now: Instant,
finish: &Finish,
) -> Option<CompletedAskWith<Reply, Out>>
where
Finish: Fn(Pending, Reply) -> StreamResult<Out>,
{
let mut index = 0;
while index < in_flight.len() {
match in_flight[index].state().poll() {
ReplyPoll::Ready(reply) => {
let ask = in_flight.swap_remove(index);
let ask_index = ask.index;
let (state, pending) = ask.into_parts();
return Some(CompletedAskWith {
index: ask_index,
result: finish(pending, reply),
state,
});
}
ReplyPoll::Dropped => {
let ask = in_flight.swap_remove(index);
let ask_index = ask.index;
let (state, _pending) = ask.into_parts();
return Some(CompletedAskWith {
index: ask_index,
result: Err(StreamError::ActorAskResponseDropped),
state,
});
}
ReplyPoll::Pending => {
if in_flight[index]
.deadline
.is_some_and(|deadline| now >= deadline)
{
let outcome = in_flight[index].state().close_on_timeout();
let ask = in_flight.swap_remove(index);
let ask_index = ask.index;
let (state, pending) = ask.into_parts();
let result = match outcome {
ReplyPoll::Ready(reply) => finish(pending, reply),
ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
};
return Some(CompletedAskWith {
index: ask_index,
result,
state,
});
}
index += 1;
}
}
}
None
}
fn register_ask_waiters_with_pending<Reply, Pending>(
in_flight: &[InFlightAskWith<Reply, Pending>],
current: &Thread,
) -> Vec<Arc<ReplyState<Reply>>> {
let mut registered = Vec::with_capacity(in_flight.len());
for ask in in_flight {
ask.state().register_waiter(current.clone());
registered.push(Arc::clone(ask.state()));
}
registered
}
fn unregister_ask_waiters_with_pending<Reply>(
registered: Vec<Arc<ReplyState<Reply>>>,
current_id: ThreadId,
) {
for state in registered {
state.unregister_waiter(current_id);
}
}
fn next_ask_park_with_pending<Reply, Pending>(
in_flight: &[InFlightAskWith<Reply, Pending>],
now: Instant,
) -> Duration {
in_flight
.iter()
.filter_map(|ask| ask.deadline)
.map(|deadline| deadline.saturating_duration_since(now))
.min()
.unwrap_or(ASK_MAX_PARK)
.min(ASK_MAX_PARK)
}
struct SourceBackpressure<AckMsg> {
ack_to: ActorRef<AckMsg>,
make_ack: Arc<dyn Fn() -> AckMsg + Send + Sync>,
_marker: PhantomData<fn() -> AckMsg>,
}
impl<AckMsg> Clone for SourceBackpressure<AckMsg> {
fn clone(&self) -> Self {
Self {
ack_to: self.ack_to.clone(),
make_ack: Arc::clone(&self.make_ack),
_marker: PhantomData,
}
}
}
fn actor_source<T, AckMsg>(
backpressure: Option<SourceBackpressure<AckMsg>>,
) -> Source<T, ActorRef<ActorSourceMessage<T>>>
where
T: Send + 'static,
AckMsg: Message,
{
Source::from_materialized_factory(move |_materializer| {
let shared = Arc::new(ActorSourceShared::new(backpressure.is_none()));
let actor = SourceActor {
shared: Arc::clone(&shared),
backpressure: backpressure.clone(),
};
let (actor_ref, _handle) =
block_on_ractor_runtime(Actor::spawn(None, actor, SourceActorState::default()))?
.map_err(|error| {
StreamError::Failed(format!("actor source failed to spawn: {error}"))
})?;
let stream = ActorSourceStream {
shared,
actor_ref: Some(actor_ref.clone()),
backpressure: backpressure.clone(),
terminated: false,
};
Ok((Box::new(stream) as BoxStream<T>, actor_ref))
})
}
struct ActorSourceShared<T> {
inner: Mutex<ActorSourceInner<T>>,
available: Condvar,
}
impl<T> ActorSourceShared<T> {
fn new(ready: bool) -> Self {
Self {
inner: Mutex::new(ActorSourceInner {
queue: VecDeque::new(),
completed: false,
ready,
}),
available: Condvar::new(),
}
}
fn lock(&self) -> MutexGuard<'_, ActorSourceInner<T>> {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn push(&self, item: T) {
let mut inner = self.lock();
if !inner.completed {
inner.queue.push_back(Ok(item));
}
drop(inner);
self.available.notify_all();
}
fn complete(&self) {
let mut inner = self.lock();
inner.completed = true;
drop(inner);
self.available.notify_all();
}
fn fail(&self, error: StreamError) {
let mut inner = self.lock();
inner.queue.clear();
inner.queue.push_back(Err(error));
inner.completed = true;
drop(inner);
self.available.notify_all();
}
fn mark_ready(&self) {
let mut inner = self.lock();
if !inner.completed {
inner.ready = true;
}
drop(inner);
self.available.notify_all();
}
}
struct ActorSourceInner<T> {
queue: VecDeque<StreamResult<T>>,
completed: bool,
ready: bool,
}
struct SourceActor<T, AckMsg> {
shared: Arc<ActorSourceShared<T>>,
backpressure: Option<SourceBackpressure<AckMsg>>,
}
#[derive(Default)]
struct SourceActorState {
stopped_by_stream: bool,
}
impl<T, AckMsg> Actor for SourceActor<T, AckMsg>
where
T: Send + 'static,
AckMsg: Message,
{
type Msg = ActorSourceMessage<T>;
type State = SourceActorState;
type Arguments = SourceActorState;
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> ActorResult<Self::State> {
if let Some(backpressure) = &self.backpressure {
match send_source_ack(backpressure) {
Ok(()) => {
self.shared.mark_ready();
}
Err(error) => {
self.shared.fail(error);
myself.stop(None);
}
}
}
Ok(args)
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
match message {
ActorSourceMessage::Element(item) => {
if self.backpressure.is_some() {
let mut inner = self.shared.lock();
if !inner.ready {
inner.queue.clear();
inner.queue.push_back(Err(actor_interop_error(
"actor source backpressure protocol violation: element arrived before ack",
)));
inner.completed = true;
drop(inner);
self.shared.available.notify_all();
myself.stop(None);
return Ok(());
}
inner.ready = false;
drop(inner);
}
self.shared.push(item);
}
ActorSourceMessage::Complete => {
self.shared.complete();
myself.stop(None);
}
ActorSourceMessage::Fail(reason) => {
self.shared.fail(StreamError::Failed(reason));
myself.stop(None);
}
}
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> ActorResult {
if !state.stopped_by_stream {
self.shared.complete();
}
Ok(())
}
}
struct ActorSourceStream<T, AckMsg> {
shared: Arc<ActorSourceShared<T>>,
actor_ref: Option<ActorRef<ActorSourceMessage<T>>>,
backpressure: Option<SourceBackpressure<AckMsg>>,
terminated: bool,
}
impl<T, AckMsg> Iterator for ActorSourceStream<T, AckMsg>
where
T: Send + 'static,
AckMsg: Message,
{
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
let next = {
let mut inner = self.shared.lock();
loop {
if let Some(item) = inner.queue.pop_front() {
break Some(item);
}
if inner.completed {
self.terminated = true;
break None;
}
inner = self
.shared
.available
.wait(inner)
.unwrap_or_else(|poison| poison.into_inner());
}
};
if let Some(Ok(_)) = &next
&& let Some(backpressure) = &self.backpressure
{
match send_source_ack(backpressure) {
Ok(()) => self.shared.mark_ready(),
Err(error) => {
self.shared.fail(error.clone());
return Some(Err(error));
}
}
}
next
}
}
impl<T, AckMsg> Drop for ActorSourceStream<T, AckMsg> {
fn drop(&mut self) {
if let Some(actor_ref) = self.actor_ref.take() {
actor_ref.stop(None);
}
}
}
fn send_source_ack<AckMsg>(backpressure: &SourceBackpressure<AckMsg>) -> StreamResult<()>
where
AckMsg: Message,
{
let ack = catch_unwind(AssertUnwindSafe(|| (backpressure.make_ack)())).map_err(|panic| {
StreamError::Failed(format!(
"actor source ack builder failed: {}",
panic_reason(panic)
))
})?;
send_actor_message(&backpressure.ack_to, ack)
}
fn run_actor_ref_sink<In, Msg, Elem, Complete, Failure>(
input: &mut BoxStream<In>,
cancelled: Arc<AtomicBool>,
actor_ref: ActorRef<Msg>,
make_element_message: Arc<Elem>,
on_complete_message: Arc<Complete>,
on_failure_message: Arc<Failure>,
) -> StreamResult<NotUsed>
where
Msg: Message,
Elem: Fn(In) -> Msg,
Complete: Fn() -> Msg,
Failure: Fn(StreamError) -> Msg,
{
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(item)) => {
let message = catch_unwind(AssertUnwindSafe(|| make_element_message(item)))
.map_err(actor_sink_panic)?;
send_actor_message(&actor_ref, message)?;
}
Some(Err(error)) => {
let message = catch_unwind(AssertUnwindSafe(|| on_failure_message(error.clone())))
.map_err(actor_sink_panic)?;
send_actor_message(&actor_ref, message)?;
return Err(error);
}
None => {
let message = catch_unwind(AssertUnwindSafe(|| on_complete_message()))
.map_err(actor_sink_panic)?;
send_actor_message(&actor_ref, message)?;
return Ok(NotUsed);
}
}
}
}
fn run_actor_ref_backpressure_sink<In, Msg, Ack, Elem, Complete, Failure>(
input: &mut BoxStream<In>,
cancelled: Arc<AtomicBool>,
actor_ref: ActorRef<Msg>,
timeout: Duration,
make_element_message: Arc<Elem>,
on_complete_message: Arc<Complete>,
on_failure_message: Arc<Failure>,
) -> StreamResult<NotUsed>
where
Msg: Message,
Ack: Send + 'static,
Elem: Fn(In, ReplyPort<Ack>) -> Msg,
Complete: Fn() -> Msg,
Failure: Fn(StreamError) -> Msg,
{
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(item)) => {
send_and_wait_ack(&actor_ref, timeout, |reply_to| {
make_element_message(item, reply_to)
})?;
}
Some(Err(error)) => {
let message = catch_unwind(AssertUnwindSafe(|| on_failure_message(error.clone())))
.map_err(actor_sink_panic)?;
send_actor_message(&actor_ref, message)?;
return Err(error);
}
None => {
let message = catch_unwind(AssertUnwindSafe(|| on_complete_message()))
.map_err(actor_sink_panic)?;
send_actor_message(&actor_ref, message)?;
return Ok(NotUsed);
}
}
}
}
fn send_and_wait_ack<Msg, Ack, Build>(
actor_ref: &ActorRef<Msg>,
timeout: Duration,
build: Build,
) -> StreamResult<()>
where
Msg: Message,
Ack: Send + 'static,
Build: FnOnce(ReplyPort<Ack>) -> Msg,
{
let reply_state = Arc::new(ReplyState::new(timeout));
let reply_to = ReplyPort::new(Arc::clone(&reply_state));
let message = catch_unwind(AssertUnwindSafe(|| build(reply_to))).map_err(actor_sink_panic)?;
send_actor_message(actor_ref, message)?;
let ask = InFlightAsk {
index: 0,
state: Some(reply_state),
deadline: Instant::now().checked_add(timeout),
};
let mut in_flight = vec![ask];
wait_for_ready_ask(&mut in_flight, timeout)
.result
.map(|_ack| ())
}
fn actor_sink_panic(panic: Box<dyn std::any::Any + Send>) -> StreamError {
StreamError::Failed(format!(
"actor sink message builder failed: {}",
panic_reason(panic)
))
}
fn send_actor_message<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
where
Msg: Message,
{
match actor_ref.cast(message) {
Ok(()) => Ok(()),
Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
Err(StreamError::ActorTerminated)
}
Err(error) => Err(StreamError::ActorAskSendFailed {
reason: error.to_string(),
}),
}
}
fn broadcast_to_group<T>(group: &str, message: ActorSourceMessage<T>) -> StreamResult<()>
where
T: Clone + Send + 'static,
{
for member in ractor::pg::get_members(&group.to_owned()) {
let actor_ref: ActorRef<ActorSourceMessage<T>> = member.into();
match send_actor_message(&actor_ref, message.clone()) {
Ok(()) | Err(StreamError::ActorTerminated) => {}
Err(error) => return Err(error),
}
}
Ok(())
}
struct WatchShared<T> {
inner: Mutex<WatchInner<T>>,
available: Condvar,
}
impl<T> Default for WatchShared<T> {
fn default() -> Self {
Self {
inner: Mutex::new(WatchInner {
queue: VecDeque::new(),
event: None,
upstream_done: false,
}),
available: Condvar::new(),
}
}
}
impl<T> WatchShared<T> {
fn lock(&self) -> MutexGuard<'_, WatchInner<T>> {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn push(&self, item: StreamResult<T>) -> bool {
let mut inner = self.lock();
if inner.event.is_some() || inner.upstream_done {
return false;
}
inner.queue.push_back(item);
drop(inner);
self.available.notify_all();
true
}
fn complete(&self) {
let mut inner = self.lock();
inner.upstream_done = true;
drop(inner);
self.available.notify_all();
}
fn fail(&self, event: WatchEvent) {
let mut inner = self.lock();
if inner.event.is_none() {
inner.queue.clear();
inner.event = Some(event);
inner.upstream_done = true;
}
drop(inner);
self.available.notify_all();
}
}
struct WatchInner<T> {
queue: VecDeque<StreamResult<T>>,
event: Option<WatchEvent>,
upstream_done: bool,
}
struct WatchMonitorActor<T> {
watched: ractor::ActorCell,
shared: Arc<WatchShared<T>>,
ready: Mutex<Option<std::sync::mpsc::Sender<()>>>,
}
impl<T> Actor for WatchMonitorActor<T>
where
T: Send + 'static,
{
type Msg = ();
type State = ();
type Arguments = ();
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
myself.monitor(self.watched.clone());
let ready = self
.ready
.lock()
.unwrap_or_else(|poison| poison.into_inner())
.take();
if let Some(ready) = ready {
let _ = ready.send(());
}
Ok(())
}
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
event: ractor::SupervisionEvent,
_state: &mut Self::State,
) -> ActorResult {
match event {
ractor::SupervisionEvent::ActorTerminated(who, _, reason)
if who.get_id() == self.watched.get_id() =>
{
let detail = reason.unwrap_or_else(|| "terminated".to_owned());
self.shared.fail(WatchEvent {
reason: format!("watched actor terminated: {detail}"),
});
myself.stop(None);
}
ractor::SupervisionEvent::ActorFailed(who, error)
if who.get_id() == self.watched.get_id() =>
{
self.shared.fail(WatchEvent {
reason: format!("watched actor terminated: {error}"),
});
myself.stop(None);
}
_ => {}
}
Ok(())
}
async fn post_stop(
&self,
myself: ActorRef<Self::Msg>,
_state: &mut Self::State,
) -> ActorResult {
myself.unmonitor(self.watched.clone());
Ok(())
}
}
fn spawn_watch_monitor<T>(
watched: ractor::ActorCell,
shared: Arc<WatchShared<T>>,
) -> StreamResult<(ActorRef<()>, ractor::concurrency::JoinHandle<()>)>
where
T: Send + 'static,
{
let (ready_tx, ready_rx) = std::sync::mpsc::channel();
block_on_ractor_runtime(Actor::spawn(
None,
WatchMonitorActor {
watched,
shared,
ready: Mutex::new(Some(ready_tx)),
},
(),
))?
.map_err(|error| StreamError::Failed(format!("watch monitor failed to spawn: {error}")))
.and_then(|(monitor_ref, handle)| {
ready_rx.recv_timeout(Duration::from_secs(1)).map_err(|_| {
monitor_ref.stop(None);
StreamError::Failed("watch monitor did not become ready".to_owned())
})?;
Ok((monitor_ref, handle))
})
}
struct WatchStream<T> {
shared: Arc<WatchShared<T>>,
monitor_ref: Option<ActorRef<()>>,
producer_completion: Option<StreamCompletion<NotUsed>>,
terminated: bool,
}
impl<T> Iterator for WatchStream<T>
where
T: Send + 'static,
{
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
let mut inner = self.shared.lock();
loop {
if let Some(event) = inner.event.take() {
self.terminated = true;
return Some(Err(event.into_stream_error()));
}
if let Some(item) = inner.queue.pop_front() {
if item.is_err() {
self.terminated = true;
}
return Some(item);
}
if inner.upstream_done {
self.terminated = true;
return None;
}
inner = self
.shared
.available
.wait(inner)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
impl<T> Drop for WatchStream<T> {
fn drop(&mut self) {
if let Some(monitor_ref) = self.monitor_ref.take() {
monitor_ref.stop(None);
}
drop(self.producer_completion.take());
}
}
fn run_watch_upstream<T>(
mut input: BoxStream<T>,
shared: Arc<WatchShared<T>>,
cancelled: Arc<AtomicBool>,
) -> StreamResult<NotUsed>
where
T: Send + 'static,
{
loop {
if cancelled.load(Ordering::SeqCst) {
shared.complete();
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(item)) => {
if !shared.push(Ok(item)) {
return Err(StreamError::Cancelled);
}
}
Some(Err(error)) => {
let _ = shared.push(Err(error.clone()));
shared.complete();
return Err(error);
}
None => {
shared.complete();
return Ok(NotUsed);
}
}
}
}
fn actor_interop_error(reason: impl Into<String>) -> StreamError {
StreamError::Failed(reason.into())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::{Keep, Sink};
use std::{
sync::{
Arc as StdArc, Mutex as StdMutex,
atomic::{AtomicUsize, Ordering},
mpsc,
},
time::{Duration as StdDuration, Instant as StdInstant},
};
fn wait_until<F>(timeout: StdDuration, mut condition: F) -> bool
where
F: FnMut() -> bool,
{
let deadline = StdInstant::now() + timeout;
while StdInstant::now() < deadline {
if condition() {
return true;
}
std::thread::park_timeout(StdDuration::from_millis(1));
}
condition()
}
fn spawn_actor<A>(
actor: A,
args: A::Arguments,
) -> (ActorRef<A::Msg>, ractor::concurrency::JoinHandle<()>)
where
A: Actor,
A::Msg: Message,
A::Arguments: Send + 'static,
{
block_on_ractor_runtime(Actor::spawn(None, actor, args))
.expect("ractor runtime runs")
.expect("actor spawns")
}
fn stop_actor<Msg>(actor_ref: ActorRef<Msg>, handle: ractor::concurrency::JoinHandle<()>)
where
Msg: Message,
{
actor_ref.stop(None);
block_on_ractor_runtime(async move {
handle.await.expect("actor task joins");
})
.expect("ractor runtime joins actor");
}
enum AskInteropMsg {
Status {
input: u64,
reply_to: ReplyPort<ActorStatus<u64>>,
},
Plain {
input: u64,
reply_to: ReplyPort<u64>,
},
}
#[cfg(feature = "cluster")]
impl Message for AskInteropMsg {}
struct AskInteropActor;
impl Actor for AskInteropActor {
type Msg = AskInteropMsg;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
match message {
AskInteropMsg::Status { input: 0, reply_to } => {
let _ =
reply_to.send(ActorStatus::Err(StreamError::Failed("bad status".into())));
}
AskInteropMsg::Status { input, reply_to } => {
let _ = reply_to.send(ActorStatus::Ok(input + 10));
}
AskInteropMsg::Plain { input, reply_to } => {
let _ = reply_to.send(input * 2);
}
}
Ok(())
}
}
#[test]
fn ask_with_status_unwraps_ok_and_fails_err() {
let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
let values = Source::from_iter([1_u64, 2])
.via(ActorFlow::ask_with_status(
actor_ref.clone(),
2,
Duration::from_secs(1),
|input, reply_to| AskInteropMsg::Status { input, reply_to },
))
.run_collect()
.unwrap();
assert_eq!(values, vec![11, 12]);
let failed = Source::single(0_u64)
.via(ActorFlow::ask_with_status(
actor_ref.clone(),
1,
Duration::from_secs(1),
|input, reply_to| AskInteropMsg::Status { input, reply_to },
))
.run_collect();
assert_eq!(failed, Err(StreamError::Failed("bad status".into())));
stop_actor(actor_ref, handle);
}
#[test]
fn ask_with_context_preserves_context() {
let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
let values = Source::from_iter([1_u64, 2, 3])
.as_source_with_context(|item| item + 100)
.via(ActorFlow::ask_with_context(
actor_ref.clone(),
2,
Duration::from_secs(1),
|input, reply_to| AskInteropMsg::Plain { input, reply_to },
))
.run_collect()
.unwrap();
assert_eq!(values, vec![(2, 101), (4, 102), (6, 103)]);
stop_actor(actor_ref, handle);
}
#[test]
fn ask_with_status_and_context_preserves_context_and_fails_status() {
let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
let values = Source::from_iter([1_u64, 2])
.as_source_with_context(|item| item + 10)
.via(ActorFlow::ask_with_status_and_context(
actor_ref.clone(),
2,
Duration::from_secs(1),
|input, reply_to| AskInteropMsg::Status { input, reply_to },
))
.run_collect()
.unwrap();
assert_eq!(values, vec![(11, 11), (12, 12)]);
let failed = Source::single(0_u64)
.as_source_with_context(|_| 99_u64)
.via(ActorFlow::ask_with_status_and_context(
actor_ref.clone(),
1,
Duration::from_secs(1),
|input, reply_to| AskInteropMsg::Status { input, reply_to },
))
.run_collect();
assert_eq!(failed, Err(StreamError::Failed("bad status".into())));
stop_actor(actor_ref, handle);
}
#[test]
fn actor_source_actor_ref_emits_and_completes() {
let (actor_ref, completion) = ActorSource::actor_ref::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
actor_ref.cast(ActorSourceMessage::Element(1)).unwrap();
actor_ref.cast(ActorSourceMessage::Element(2)).unwrap();
actor_ref.cast(ActorSourceMessage::Complete).unwrap();
assert_eq!(completion.wait().unwrap(), vec![1, 2]);
}
#[derive(Clone)]
enum SourceAckMsg {
Ack,
}
#[cfg(feature = "cluster")]
impl Message for SourceAckMsg {}
struct SourceAckActor {
count: StdArc<AtomicUsize>,
}
impl Actor for SourceAckActor {
type Msg = SourceAckMsg;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
_message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[test]
fn actor_source_with_backpressure_acks_startup_and_each_element() {
let ack_count = StdArc::new(AtomicUsize::new(0));
let (ack_ref, ack_handle) = spawn_actor(
SourceAckActor {
count: StdArc::clone(&ack_count),
},
(),
);
let (actor_ref, completion) =
ActorSource::actor_ref_with_backpressure::<u64, SourceAckMsg>(
ack_ref.clone(),
SourceAckMsg::Ack,
)
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
ack_count.load(Ordering::SeqCst) >= 1
}));
actor_ref.cast(ActorSourceMessage::Element(1)).unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
ack_count.load(Ordering::SeqCst) >= 2
}));
actor_ref.cast(ActorSourceMessage::Element(2)).unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
ack_count.load(Ordering::SeqCst) >= 3
}));
actor_ref.cast(ActorSourceMessage::Complete).unwrap();
assert_eq!(completion.wait().unwrap(), vec![1, 2]);
stop_actor(ack_ref, ack_handle);
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum SinkEvent {
Element(u64),
Complete,
Fail(String),
}
#[cfg(feature = "cluster")]
impl Message for SinkEvent {}
struct EventActor {
sender: mpsc::Sender<SinkEvent>,
}
impl Actor for EventActor {
type Msg = SinkEvent;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
self.sender
.send(message)
.expect("event receiver stays open");
Ok(())
}
}
#[test]
fn actor_sink_actor_ref_sends_elements_and_complete() {
let (tx, rx) = mpsc::channel();
let (actor_ref, handle) = spawn_actor(EventActor { sender: tx }, ());
Source::from_iter([1_u64, 2])
.run_with(ActorSink::actor_ref(
actor_ref.clone(),
SinkEvent::Element,
|| SinkEvent::Complete,
|error| SinkEvent::Fail(error.to_string()),
))
.unwrap()
.wait()
.unwrap();
assert_eq!(
[
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
],
[
SinkEvent::Element(1),
SinkEvent::Element(2),
SinkEvent::Complete,
]
);
stop_actor(actor_ref, handle);
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum BackpressureEvent {
Init,
Element(u64),
Complete,
Fail(String),
}
enum BackpressureSinkMsg {
Init(ReplyPort<()>),
Element(u64, ReplyPort<()>),
Complete,
Fail(String),
}
#[cfg(feature = "cluster")]
impl Message for BackpressureSinkMsg {}
struct BackpressureSinkActor {
sender: mpsc::Sender<BackpressureEvent>,
}
impl Actor for BackpressureSinkActor {
type Msg = BackpressureSinkMsg;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
match message {
BackpressureSinkMsg::Init(reply_to) => {
self.sender.send(BackpressureEvent::Init).unwrap();
let _ = reply_to.send(());
}
BackpressureSinkMsg::Element(item, reply_to) => {
self.sender.send(BackpressureEvent::Element(item)).unwrap();
let _ = reply_to.send(());
}
BackpressureSinkMsg::Complete => {
self.sender.send(BackpressureEvent::Complete).unwrap();
}
BackpressureSinkMsg::Fail(error) => {
self.sender.send(BackpressureEvent::Fail(error)).unwrap();
}
}
Ok(())
}
}
#[test]
fn actor_sink_with_backpressure_waits_for_init_and_element_acks() {
let (tx, rx) = mpsc::channel();
let (actor_ref, handle) = spawn_actor(BackpressureSinkActor { sender: tx }, ());
Source::from_iter([1_u64, 2])
.run_with(ActorSink::actor_ref_with_backpressure(
actor_ref.clone(),
Duration::from_secs(1),
BackpressureSinkMsg::Init,
BackpressureSinkMsg::Element,
|| BackpressureSinkMsg::Complete,
|error| BackpressureSinkMsg::Fail(error.to_string()),
))
.unwrap()
.wait()
.unwrap();
assert_eq!(
[
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
],
[
BackpressureEvent::Init,
BackpressureEvent::Element(1),
BackpressureEvent::Element(2),
BackpressureEvent::Complete,
]
);
stop_actor(actor_ref, handle);
}
enum WatchMsg {
Stop,
}
#[cfg(feature = "cluster")]
impl Message for WatchMsg {}
struct WatchActor;
impl Actor for WatchActor {
type Msg = WatchMsg;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
_message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
myself.stop(Some("watched-stop".into()));
Ok(())
}
}
#[test]
fn watch_fails_idle_stream_when_actor_terminates() {
let (watched_ref, watched_handle) = spawn_actor(WatchActor, ());
let (source_ref, completion) = ActorSource::actor_ref::<u64>()
.via(ActorFlow::watch(watched_ref.clone()))
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
watched_ref.cast(WatchMsg::Stop).unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
watched_ref.get_status() == ractor::ActorStatus::Stopped
}));
let result = completion.wait();
let _ = source_ref.cast(ActorSourceMessage::Complete);
assert!(
matches!(result, Err(StreamError::Failed(reason)) if reason.contains("watched actor terminated"))
);
block_on_ractor_runtime(async move {
watched_handle.await.expect("watched actor joins");
})
.expect("ractor runtime joins watched actor");
}
static NEXT_GROUP: AtomicUsize = AtomicUsize::new(0);
#[test]
fn pubsub_sink_broadcasts_to_pg_source() {
let group = format!(
"datum-test-pubsub-{}",
NEXT_GROUP.fetch_add(1, Ordering::SeqCst)
);
let (_source_ref, completion) = ActorPubSub::source::<u64>(group.clone())
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
!ractor::pg::get_members(&group).is_empty()
}));
Source::from_iter([1_u64, 2])
.run_with(ActorPubSub::sink(group))
.unwrap()
.wait()
.unwrap();
assert_eq!(completion.wait().unwrap(), vec![1, 2]);
}
#[test]
fn actor_source_fail_message_fails_stream() {
let (actor_ref, completion) = ActorSource::typed::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
actor_ref
.cast(ActorSourceMessage::Fail("source failed".into()))
.unwrap();
assert_eq!(
completion.wait(),
Err(StreamError::Failed("source failed".into()))
);
}
#[test]
fn actor_sink_typed_sends_protocol_messages() {
let received = StdArc::new(StdMutex::new(Vec::<String>::new()));
struct TypedSinkActor {
received: StdArc<StdMutex<Vec<String>>>,
}
impl Actor for TypedSinkActor {
type Msg = ActorSinkMessage<u64>;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> ActorResult<Self::State> {
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
let label = match message {
ActorSinkMessage::Element(item) => format!("element:{item}"),
ActorSinkMessage::Complete => "complete".to_owned(),
ActorSinkMessage::Fail(error) => format!("fail:{error}"),
};
self.received.lock().unwrap().push(label);
Ok(())
}
}
let (actor_ref, handle) = spawn_actor(
TypedSinkActor {
received: StdArc::clone(&received),
},
(),
);
Source::from_iter([7_u64])
.run_with(ActorSink::typed(actor_ref.clone()))
.unwrap()
.wait()
.unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
received.lock().unwrap().len() == 2
}));
assert_eq!(
*received.lock().unwrap(),
vec!["element:7".to_owned(), "complete".to_owned()]
);
stop_actor(actor_ref, handle);
}
}