use std::{
any::Any,
error::Error,
fmt,
panic::{AssertUnwindSafe, catch_unwind},
sync::{
Arc, Mutex, MutexGuard,
atomic::{AtomicBool, AtomicU8, Ordering},
},
thread::{self, Thread, ThreadId},
time::{Duration, Instant},
};
use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
mod interop;
mod stream_ref;
pub mod stream_ref_proto;
pub use interop::{
ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
ActorSourceMessage, ActorStatus, WatchEvent,
};
pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
pub use stream_ref_proto::{
StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefPayload, StreamRefPayloadBytes,
StreamRefProtoConsumer, StreamRefProtoEndpoint, StreamRefProtoProducer,
};
pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
const ASK_READY_SPINS: usize = 256;
const ASK_IDLE_YIELDS: usize = 64;
const ASK_MAX_PARK: Duration = Duration::from_millis(1);
const ASK_TIME_REFRESH_ITERS: u32 = 64;
const REPLY_PENDING: u8 = 0;
const REPLY_READY: u8 = 1;
const REPLY_DROPPED: u8 = 2;
pub struct ActorFlow;
impl ActorFlow {
#[must_use]
pub fn ask<In, Msg, Out, F>(
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
make_msg: F,
) -> Flow<In, Out, NotUsed>
where
In: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
{
ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
}
}
pub struct ReplyPort<T> {
inner: Arc<ReplyState<T>>,
active: bool,
}
#[derive(Debug)]
struct ReplyState<T> {
timeout: Duration,
receiver_closed: AtomicBool,
gate: AtomicU8,
slot: Mutex<ReplySlotState<T>>,
}
#[derive(Debug)]
struct ReplySlotState<T> {
value: ReplySlot<T>,
waiter: Option<Thread>,
}
#[derive(Debug)]
enum ReplySlot<T> {
Pending,
Ready(T),
Dropped,
}
enum ReplyPoll<T> {
Pending,
Ready(T),
Dropped,
}
impl<T> fmt::Debug for ReplyPort<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReplyPort")
.field("timeout", &self.timeout())
.field("closed", &self.is_closed())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReplySendError;
impl fmt::Display for ReplySendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("actor reply receiver dropped")
}
}
impl Error for ReplySendError {}
impl<T> ReplyPort<T> {
fn new(inner: Arc<ReplyState<T>>) -> Self {
Self {
inner,
active: true,
}
}
#[must_use]
pub fn timeout(&self) -> Option<Duration> {
Some(self.inner.timeout)
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
self.active = false;
self.inner.send(reply)
}
}
impl<T> Drop for ReplyPort<T> {
fn drop(&mut self) {
if self.active {
self.inner.drop_sender();
}
}
}
impl<T> ReplyState<T> {
fn new(timeout: Duration) -> Self {
Self {
timeout,
receiver_closed: AtomicBool::new(false),
gate: AtomicU8::new(REPLY_PENDING),
slot: Mutex::new(ReplySlotState {
value: ReplySlot::Pending,
waiter: None,
}),
}
}
fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
self.slot
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn reset(&self, timeout: Duration) {
self.receiver_closed.store(false, Ordering::Release);
self.gate.store(REPLY_PENDING, Ordering::Release);
let mut slot = self.lock_slot();
slot.value = ReplySlot::Pending;
slot.waiter = None;
debug_assert_eq!(self.timeout, timeout);
}
fn is_closed(&self) -> bool {
self.receiver_closed.load(Ordering::Acquire)
}
fn send(&self, reply: T) -> Result<(), ReplySendError> {
if self.is_closed() {
return Err(ReplySendError);
}
let mut slot = self.lock_slot();
if self.is_closed() {
return Err(ReplySendError);
}
if !matches!(slot.value, ReplySlot::Pending) {
return Err(ReplySendError);
}
slot.value = ReplySlot::Ready(reply);
let waiter = slot.waiter.clone();
self.gate.store(REPLY_READY, Ordering::Release);
drop(slot);
wake_waiter(waiter);
Ok(())
}
fn poll(&self) -> ReplyPoll<T> {
if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
return ReplyPoll::Pending;
}
self.take_locked()
}
fn close_on_timeout(&self) -> ReplyPoll<T> {
let mut slot = self.lock_slot();
self.receiver_closed.store(true, Ordering::Release);
let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
ReplySlot::Pending => ReplyPoll::Pending,
ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
ReplySlot::Dropped => ReplyPoll::Dropped,
};
let waiter = slot.waiter.take();
drop(slot);
wake_waiter(waiter);
outcome
}
fn take_locked(&self) -> ReplyPoll<T> {
let mut slot = self.lock_slot();
slot.waiter = None;
match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
ReplySlot::Pending => ReplyPoll::Pending,
ReplySlot::Ready(reply) => {
self.close_receiver();
ReplyPoll::Ready(reply)
}
ReplySlot::Dropped => {
self.close_receiver();
ReplyPoll::Dropped
}
}
}
fn drop_sender(&self) {
let mut slot = self.lock_slot();
if matches!(slot.value, ReplySlot::Pending) {
slot.value = ReplySlot::Dropped;
let waiter = slot.waiter.clone();
self.gate.store(REPLY_DROPPED, Ordering::Release);
drop(slot);
wake_waiter(waiter);
}
}
fn close_receiver(&self) {
self.receiver_closed.store(true, Ordering::Release);
}
fn register_waiter(&self, waiter: Thread) {
self.lock_slot().waiter = Some(waiter);
}
fn unregister_waiter(&self, waiter_id: ThreadId) {
let mut slot = self.lock_slot();
if slot
.waiter
.as_ref()
.is_some_and(|thread| thread.id() == waiter_id)
{
slot.waiter = None;
}
}
}
fn wake_waiter(waiter: Option<Thread>) {
if let Some(waiter) = waiter {
waiter.unpark();
}
}
fn ask_flow<In, Msg, Out, F>(
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
make_msg: Arc<F>,
) -> Flow<In, Out>
where
In: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
{
assert!(
parallelism > 0,
"ActorFlow::ask parallelism must be greater than zero"
);
Flow::from_transform(move |input| {
ask_ractor_ordered(
input,
actor_ref.clone(),
parallelism,
timeout,
Arc::clone(&make_msg),
)
})
}
fn ask_ractor_ordered<In, Msg, Out, F>(
mut input: BoxStream<In>,
actor_ref: ActorRef<Msg>,
parallelism: usize,
timeout: Duration,
make_msg: Arc<F>,
) -> BoxStream<Out>
where
In: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
{
let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
let mut next_index = 0_usize;
let mut next_to_emit = 0_usize;
let mut completed = Vec::with_capacity(parallelism);
let mut reply_pool = Vec::with_capacity(parallelism);
let mut input_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if let Some(result) = take_completed(&mut completed, next_to_emit) {
next_to_emit += 1;
return Some(result);
}
while in_flight.len() < parallelism && !input_done {
match input.next() {
Some(Ok(item)) => {
let index = next_index;
next_index += 1;
match start_ractor_ask(
index,
actor_ref.clone(),
timeout,
item,
Arc::clone(&make_msg),
&mut reply_pool,
) {
Ok(ask) => in_flight.push(ask),
Err(error) => {
completed.push((index, Err(error)));
input_done = true;
}
}
}
Some(Err(error)) => {
completed.push((next_index, Err(error)));
next_index += 1;
input_done = true;
}
None => input_done = true,
}
}
if let Some(result) = take_completed(&mut completed, next_to_emit) {
next_to_emit += 1;
return Some(result);
}
if in_flight.is_empty() {
return None;
}
let ask = wait_for_ready_ask(&mut in_flight, timeout);
let index = ask.index;
let result = ask.result;
recycle_reply_state(ask.state, &mut reply_pool);
if index == next_to_emit {
next_to_emit += 1;
return Some(result);
}
completed.push((index, result));
}
}))
}
fn take_completed<Out>(
completed: &mut Vec<(usize, StreamResult<Out>)>,
index: usize,
) -> Option<StreamResult<Out>> {
let position = completed
.iter()
.position(|(completed_index, _)| *completed_index == index)?;
Some(completed.swap_remove(position).1)
}
struct InFlightAsk<Out> {
index: usize,
state: Option<Arc<ReplyState<Out>>>,
deadline: Option<Instant>,
}
impl<Out> InFlightAsk<Out> {
fn state(&self) -> &Arc<ReplyState<Out>> {
self.state.as_ref().expect("in-flight ask has reply state")
}
fn into_state(mut self) -> Arc<ReplyState<Out>> {
self.state.take().expect("in-flight ask has reply state")
}
}
impl<Out> Drop for InFlightAsk<Out> {
fn drop(&mut self) {
if let Some(state) = &self.state {
state.close_receiver();
}
}
}
struct CompletedAsk<Out> {
index: usize,
result: StreamResult<Out>,
state: Arc<ReplyState<Out>>,
}
fn start_ractor_ask<In, Msg, Out, F>(
index: usize,
actor_ref: ActorRef<Msg>,
timeout: Duration,
input: In,
make_msg: Arc<F>,
reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
) -> StreamResult<InFlightAsk<Out>>
where
In: Send + 'static,
Msg: Message,
Out: Send + 'static,
F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
{
let reply_state = match reply_pool.pop() {
Some(state) => {
state.reset(timeout);
state
}
None => Arc::new(ReplyState::new(timeout)),
};
let reply_to = ReplyPort::new(Arc::clone(&reply_state));
let message =
catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
StreamError::ActorAskTaskFailed {
reason: panic_reason(panic),
}
})?;
match actor_ref.cast(message) {
Ok(()) => {}
Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
return Err(StreamError::ActorTerminated);
}
Err(error) => {
return Err(StreamError::ActorAskSendFailed {
reason: error.to_string(),
});
}
}
Ok(InFlightAsk {
index,
state: Some(reply_state),
deadline: Instant::now().checked_add(timeout),
})
}
fn wait_for_ready_ask<Out>(
in_flight: &mut Vec<InFlightAsk<Out>>,
timeout: Duration,
) -> CompletedAsk<Out> {
let mut idle_spins = 0;
let mut idle_yields = 0;
let mut time_refresh = 0_u32;
let mut now = Instant::now();
loop {
if time_refresh == 0 {
now = Instant::now();
}
time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
return ask;
}
if idle_spins < ASK_READY_SPINS {
idle_spins += 1;
std::hint::spin_loop();
} else if idle_yields < ASK_IDLE_YIELDS {
idle_yields += 1;
time_refresh = 0;
thread::yield_now();
} else {
idle_spins = 0;
idle_yields = 0;
time_refresh = 0;
let current = thread::current();
let registered = register_ask_waiters(in_flight, ¤t);
now = Instant::now();
if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
unregister_ask_waiters(registered, current.id());
return ask;
}
thread::park_timeout(next_ask_park(in_flight, now));
unregister_ask_waiters(registered, current.id());
}
}
}
fn take_ready_ask<Out>(
in_flight: &mut Vec<InFlightAsk<Out>>,
timeout: Duration,
now: Instant,
) -> Option<CompletedAsk<Out>> {
let mut index = 0;
while index < in_flight.len() {
match in_flight[index].state().poll() {
ReplyPoll::Ready(reply) => {
let ask = in_flight.swap_remove(index);
return Some(CompletedAsk {
index: ask.index,
result: Ok(reply),
state: ask.into_state(),
});
}
ReplyPoll::Dropped => {
let ask = in_flight.swap_remove(index);
return Some(CompletedAsk {
index: ask.index,
result: Err(StreamError::ActorAskResponseDropped),
state: ask.into_state(),
});
}
ReplyPoll::Pending => {
if in_flight[index]
.deadline
.is_some_and(|deadline| now >= deadline)
{
let outcome = in_flight[index].state().close_on_timeout();
let ask = in_flight.swap_remove(index);
let result = match outcome {
ReplyPoll::Ready(reply) => Ok(reply),
ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
};
return Some(CompletedAsk {
index: ask.index,
result,
state: ask.into_state(),
});
}
index += 1;
}
}
}
None
}
fn register_ask_waiters<Out>(
in_flight: &[InFlightAsk<Out>],
current: &Thread,
) -> Vec<Arc<ReplyState<Out>>> {
let mut registered = Vec::with_capacity(in_flight.len());
for ask in in_flight {
ask.state().register_waiter(current.clone());
registered.push(Arc::clone(ask.state()));
}
registered
}
fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
for state in registered {
state.unregister_waiter(current_id);
}
}
fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
next_ask_deadline_remaining(in_flight, now)
.unwrap_or(ASK_MAX_PARK)
.min(ASK_MAX_PARK)
}
fn next_ask_deadline_remaining<Out>(
in_flight: &[InFlightAsk<Out>],
now: Instant,
) -> Option<Duration> {
in_flight
.iter()
.filter_map(|ask| ask.deadline)
.map(|deadline| deadline.saturating_duration_since(now))
.min()
}
fn recycle_reply_state<Out>(
state: Arc<ReplyState<Out>>,
reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
) {
if Arc::strong_count(&state) == 1 {
reply_pool.push(state);
}
}
fn panic_reason(panic: Box<dyn Any + Send>) -> String {
if let Some(reason) = panic.downcast_ref::<&str>() {
(*reason).to_owned()
} else if let Some(reason) = panic.downcast_ref::<String>() {
reason.clone()
} else {
"actor ask task panicked".to_owned()
}
}
pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
std::sync::OnceLock::new();
match RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("datum-ractor-runtime")
.enable_all()
.build()
.map_err(|error| format!("ractor runtime failed to start: {error}"))
}) {
Ok(runtime) => Ok(runtime),
Err(error) => Err(StreamError::Failed(error.clone())),
}
}
pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
where
T: Send + 'static,
F: std::future::Future<Output = T> + Send + 'static,
{
let runtime = ractor_runtime()?;
let result = if tokio::runtime::Handle::try_current().is_ok() {
std::thread::spawn(move || runtime.block_on(future))
.join()
.map_err(|_| StreamError::AbruptTermination)?
} else {
runtime.block_on(future)
};
Ok(result)
}
#[cfg(test)]
fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
where
T: Send + 'static,
F: std::future::Future<Output = T> + Send + 'static,
{
block_on_ractor_runtime(future).expect("ractor ask runtime starts")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::{Sink, Source, StreamCompletion};
use std::sync::{
Arc as StdArc,
atomic::{AtomicUsize, Ordering},
};
enum AskTestMessage {
Delayed {
input: u64,
delay: Duration,
reply_to: ReplyPort<u64>,
},
Track {
input: u64,
reply_to: ReplyPort<u64>,
},
NeverReply {
_reply_to: ReplyPort<u64>,
},
DropReply {
_reply_to: ReplyPort<u64>,
},
BindTcp {
reply_to: ReplyPort<u64>,
},
}
#[cfg(feature = "cluster")]
impl Message for AskTestMessage {}
struct AskTestActor;
struct AskTestState {
active: StdArc<AtomicUsize>,
max_active: StdArc<AtomicUsize>,
held_replies: Vec<ReplyPort<u64>>,
}
impl Actor for AskTestActor {
type Msg = AskTestMessage;
type State = AskTestState;
type Arguments = AskTestState;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
AskTestMessage::Delayed {
input,
delay,
reply_to,
} => {
ractor::concurrency::spawn(async move {
ractor::concurrency::sleep(delay).await;
let _ = reply_to.send(input);
});
}
AskTestMessage::Track { input, reply_to } => {
let active = StdArc::clone(&state.active);
let max_active = StdArc::clone(&state.max_active);
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
max_active.fetch_max(current, Ordering::SeqCst);
ractor::concurrency::spawn(async move {
ractor::concurrency::sleep(Duration::from_millis(20)).await;
active.fetch_sub(1, Ordering::SeqCst);
let _ = reply_to.send(input);
});
}
AskTestMessage::NeverReply { _reply_to } => {
state.held_replies.push(_reply_to);
}
AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
AskTestMessage::BindTcp { reply_to } => {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let port = listener.local_addr()?.port();
let _ = reply_to.send(u64::from(port));
}
}
Ok(())
}
}
fn wait<T>(completion: StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn spawn_test_actor() -> (
ActorRef<AskTestMessage>,
ractor::concurrency::JoinHandle<()>,
StdArc<AtomicUsize>,
) {
let active = StdArc::new(AtomicUsize::new(0));
let max_active = StdArc::new(AtomicUsize::new(0));
let state = AskTestState {
active,
max_active: StdArc::clone(&max_active),
held_replies: Vec::new(),
};
let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
Actor::spawn(None, AskTestActor, state)
.await
.expect("test actor spawns")
});
(actor_ref, handle, max_active)
}
fn stop_test_actor(
actor_ref: ActorRef<AskTestMessage>,
handle: ractor::concurrency::JoinHandle<()>,
) {
actor_ref.stop(None);
block_on_ractor_ask_runtime(async move {
handle.await.expect("test actor task joins");
});
}
#[test]
fn actor_flow_ask_preserves_order_with_parallelism() {
let (actor_ref, handle, _) = spawn_test_actor();
let values = Source::from_iter(0_u64..5)
.via(ActorFlow::ask(
actor_ref.clone(),
5,
Duration::from_secs(1),
|input, reply_to| AskTestMessage::Delayed {
input,
delay: Duration::from_millis((5 - input) * 10),
reply_to,
},
))
.run_with(Sink::collect())
.unwrap();
assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
stop_test_actor(actor_ref, handle);
}
#[test]
fn actor_flow_ask_respects_parallelism() {
let (actor_ref, handle, max_active) = spawn_test_actor();
let values = Source::from_iter(0_u64..8)
.via(ActorFlow::ask(
actor_ref.clone(),
3,
Duration::from_secs(1),
|input, reply_to| AskTestMessage::Track { input, reply_to },
))
.run_with(Sink::collect())
.unwrap();
assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
assert_eq!(max_active.load(Ordering::SeqCst), 3);
stop_test_actor(actor_ref, handle);
}
#[test]
fn actor_flow_ask_into_head_returns_value() {
let (actor_ref, handle, _) = spawn_test_actor();
let head = Source::single(7_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
1,
Duration::from_secs(1),
|input, reply_to| AskTestMessage::Track { input, reply_to },
))
.run_with(Sink::head())
.unwrap();
assert_eq!(wait(head), 7);
stop_test_actor(actor_ref, handle);
}
#[test]
fn actor_flow_ask_actor_handler_can_use_tokio_io() {
let (actor_ref, handle, _) = spawn_test_actor();
let ports = Source::single(0_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
1,
Duration::from_secs(1),
|_input, reply_to| AskTestMessage::BindTcp { reply_to },
))
.run_collect()
.unwrap();
assert_eq!(ports.len(), 1);
assert_ne!(ports[0], 0);
stop_test_actor(actor_ref, handle);
}
#[test]
fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
let (actor_ref, handle, _) = spawn_test_actor();
let values = Source::from_iter(0_u64..8)
.via(ActorFlow::ask(
actor_ref.clone(),
4,
Duration::from_secs(30),
|input, reply_to| AskTestMessage::Delayed {
input,
delay: Duration::from_millis(25),
reply_to,
},
))
.run_with(Sink::collect())
.unwrap();
assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
stop_test_actor(actor_ref, handle);
}
#[test]
fn dropped_in_flight_ask_closes_reply_port() {
let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
let reply_to = ReplyPort::new(StdArc::clone(&state));
let ask = InFlightAsk {
index: 0,
state: Some(StdArc::clone(&state)),
deadline: None,
};
drop(ask);
assert!(reply_to.is_closed());
assert_eq!(reply_to.send(1), Err(ReplySendError));
}
#[test]
fn actor_flow_ask_times_out() {
let (actor_ref, handle, _) = spawn_test_actor();
let timeout = Duration::from_millis(10);
let result = Source::single(1_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
1,
timeout,
|_input, reply_to| AskTestMessage::NeverReply {
_reply_to: reply_to,
},
))
.run_collect();
assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
stop_test_actor(actor_ref, handle);
}
#[test]
fn actor_flow_ask_fails_when_actor_is_stopped() {
let (actor_ref, handle, _) = spawn_test_actor();
actor_ref.stop(None);
block_on_ractor_ask_runtime(async move {
handle.await.expect("test actor task joins");
});
let result = Source::single(1_u64)
.via(ActorFlow::ask(
actor_ref,
1,
Duration::from_secs(1),
|input, reply_to| AskTestMessage::Delayed {
input,
delay: Duration::ZERO,
reply_to,
},
))
.run_collect();
assert_eq!(result, Err(StreamError::ActorTerminated));
}
#[test]
fn actor_flow_ask_fails_when_reply_port_is_dropped() {
let (actor_ref, handle, _) = spawn_test_actor();
let result = Source::single(1_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
1,
Duration::from_secs(1),
|_input, reply_to| AskTestMessage::DropReply {
_reply_to: reply_to,
},
))
.run_collect();
assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
stop_test_actor(actor_ref, handle);
}
#[test]
fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
let (actor_ref, handle, _) = spawn_test_actor();
let result = Source::single(1_u64)
.via(ActorFlow::ask(
actor_ref.clone(),
1,
Duration::from_secs(1),
|_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
panic!("message builder failed");
},
))
.run_collect();
assert_eq!(
result,
Err(StreamError::ActorAskTaskFailed {
reason: "message builder failed".to_owned()
})
);
stop_test_actor(actor_ref, handle);
}
}