1use std::{
4 any::Any,
5 error::Error,
6 fmt,
7 panic::{AssertUnwindSafe, catch_unwind},
8 sync::{
9 Arc, Mutex, MutexGuard,
10 atomic::{AtomicBool, AtomicU8, Ordering},
11 },
12 thread::{self, Thread, ThreadId},
13 time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21pub use interop::{
22 ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
23 ActorSourceMessage, ActorStatus, WatchEvent,
24};
25
26pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
27
28const ASK_READY_SPINS: usize = 256;
29const ASK_IDLE_YIELDS: usize = 64;
37const ASK_MAX_PARK: Duration = Duration::from_millis(1);
38const ASK_TIME_REFRESH_ITERS: u32 = 64;
39
40const REPLY_PENDING: u8 = 0;
44const REPLY_READY: u8 = 1;
45const REPLY_DROPPED: u8 = 2;
46
47pub struct ActorFlow;
49
50impl ActorFlow {
51 #[must_use]
57 pub fn ask<In, Msg, Out, F>(
58 actor_ref: ActorRef<Msg>,
59 parallelism: usize,
60 timeout: Duration,
61 make_msg: F,
62 ) -> Flow<In, Out, NotUsed>
63 where
64 In: Send + 'static,
65 Msg: Message,
66 Out: Send + 'static,
67 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
68 {
69 ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
70 }
71}
72
73pub struct ReplyPort<T> {
79 inner: Arc<ReplyState<T>>,
80 active: bool,
81}
82
83#[derive(Debug)]
84struct ReplyState<T> {
85 timeout: Duration,
86 receiver_closed: AtomicBool,
87 gate: AtomicU8,
88 slot: Mutex<ReplySlotState<T>>,
89}
90
91#[derive(Debug)]
92struct ReplySlotState<T> {
93 value: ReplySlot<T>,
94 waiter: Option<Thread>,
95}
96
97#[derive(Debug)]
98enum ReplySlot<T> {
99 Pending,
100 Ready(T),
101 Dropped,
102}
103
104enum ReplyPoll<T> {
105 Pending,
106 Ready(T),
107 Dropped,
108}
109
110impl<T> fmt::Debug for ReplyPort<T> {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("ReplyPort")
113 .field("timeout", &self.timeout())
114 .field("closed", &self.is_closed())
115 .finish_non_exhaustive()
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub struct ReplySendError;
122
123impl fmt::Display for ReplySendError {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.write_str("actor reply receiver dropped")
126 }
127}
128
129impl Error for ReplySendError {}
130
131impl<T> ReplyPort<T> {
132 fn new(inner: Arc<ReplyState<T>>) -> Self {
133 Self {
134 inner,
135 active: true,
136 }
137 }
138
139 #[must_use]
140 pub fn timeout(&self) -> Option<Duration> {
141 Some(self.inner.timeout)
142 }
143
144 #[must_use]
145 pub fn is_closed(&self) -> bool {
146 self.inner.is_closed()
147 }
148
149 pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
150 self.active = false;
151 self.inner.send(reply)
152 }
153}
154
155impl<T> Drop for ReplyPort<T> {
156 fn drop(&mut self) {
157 if self.active {
158 self.inner.drop_sender();
159 }
160 }
161}
162
163impl<T> ReplyState<T> {
164 fn new(timeout: Duration) -> Self {
165 Self {
166 timeout,
167 receiver_closed: AtomicBool::new(false),
168 gate: AtomicU8::new(REPLY_PENDING),
169 slot: Mutex::new(ReplySlotState {
170 value: ReplySlot::Pending,
171 waiter: None,
172 }),
173 }
174 }
175
176 fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
182 self.slot
183 .lock()
184 .unwrap_or_else(|poison| poison.into_inner())
185 }
186
187 fn reset(&self, timeout: Duration) {
188 self.receiver_closed.store(false, Ordering::Release);
189 self.gate.store(REPLY_PENDING, Ordering::Release);
190 let mut slot = self.lock_slot();
191 slot.value = ReplySlot::Pending;
192 slot.waiter = None;
193 debug_assert_eq!(self.timeout, timeout);
194 }
195
196 fn is_closed(&self) -> bool {
197 self.receiver_closed.load(Ordering::Acquire)
198 }
199
200 fn send(&self, reply: T) -> Result<(), ReplySendError> {
201 if self.is_closed() {
202 return Err(ReplySendError);
203 }
204
205 let mut slot = self.lock_slot();
206 if self.is_closed() {
207 return Err(ReplySendError);
208 }
209
210 if !matches!(slot.value, ReplySlot::Pending) {
211 return Err(ReplySendError);
212 }
213
214 slot.value = ReplySlot::Ready(reply);
215 let waiter = slot.waiter.clone();
216 self.gate.store(REPLY_READY, Ordering::Release);
219 drop(slot);
220 wake_waiter(waiter);
221 Ok(())
222 }
223
224 fn poll(&self) -> ReplyPoll<T> {
225 if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
226 return ReplyPoll::Pending;
228 }
229 self.take_locked()
230 }
231
232 fn close_on_timeout(&self) -> ReplyPoll<T> {
237 let mut slot = self.lock_slot();
238 self.receiver_closed.store(true, Ordering::Release);
239 let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
240 ReplySlot::Pending => ReplyPoll::Pending,
241 ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
242 ReplySlot::Dropped => ReplyPoll::Dropped,
243 };
244 let waiter = slot.waiter.take();
245 drop(slot);
246 wake_waiter(waiter);
247 outcome
248 }
249
250 fn take_locked(&self) -> ReplyPoll<T> {
251 let mut slot = self.lock_slot();
252 slot.waiter = None;
253 match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
254 ReplySlot::Pending => ReplyPoll::Pending,
255 ReplySlot::Ready(reply) => {
256 self.close_receiver();
257 ReplyPoll::Ready(reply)
258 }
259 ReplySlot::Dropped => {
260 self.close_receiver();
261 ReplyPoll::Dropped
262 }
263 }
264 }
265
266 fn drop_sender(&self) {
267 let mut slot = self.lock_slot();
268 if matches!(slot.value, ReplySlot::Pending) {
269 slot.value = ReplySlot::Dropped;
270 let waiter = slot.waiter.clone();
271 self.gate.store(REPLY_DROPPED, Ordering::Release);
272 drop(slot);
273 wake_waiter(waiter);
274 }
275 }
276
277 fn close_receiver(&self) {
278 self.receiver_closed.store(true, Ordering::Release);
279 }
280
281 fn register_waiter(&self, waiter: Thread) {
282 self.lock_slot().waiter = Some(waiter);
283 }
284
285 fn unregister_waiter(&self, waiter_id: ThreadId) {
286 let mut slot = self.lock_slot();
287 if slot
288 .waiter
289 .as_ref()
290 .is_some_and(|thread| thread.id() == waiter_id)
291 {
292 slot.waiter = None;
293 }
294 }
295}
296
297fn wake_waiter(waiter: Option<Thread>) {
298 if let Some(waiter) = waiter {
299 waiter.unpark();
300 }
301}
302
303fn ask_flow<In, Msg, Out, F>(
304 actor_ref: ActorRef<Msg>,
305 parallelism: usize,
306 timeout: Duration,
307 make_msg: Arc<F>,
308) -> Flow<In, Out>
309where
310 In: Send + 'static,
311 Msg: Message,
312 Out: Send + 'static,
313 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
314{
315 assert!(
316 parallelism > 0,
317 "ActorFlow::ask parallelism must be greater than zero"
318 );
319 Flow::from_transform(move |input| {
327 ask_ractor_ordered(
328 input,
329 actor_ref.clone(),
330 parallelism,
331 timeout,
332 Arc::clone(&make_msg),
333 )
334 })
335}
336
337fn ask_ractor_ordered<In, Msg, Out, F>(
338 mut input: BoxStream<In>,
339 actor_ref: ActorRef<Msg>,
340 parallelism: usize,
341 timeout: Duration,
342 make_msg: Arc<F>,
343) -> BoxStream<Out>
344where
345 In: Send + 'static,
346 Msg: Message,
347 Out: Send + 'static,
348 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
349{
350 let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
351 let mut next_index = 0_usize;
352 let mut next_to_emit = 0_usize;
353 let mut completed = Vec::with_capacity(parallelism);
354 let mut reply_pool = Vec::with_capacity(parallelism);
355 let mut input_done = false;
356
357 Box::new(std::iter::from_fn(move || {
358 loop {
359 if let Some(result) = take_completed(&mut completed, next_to_emit) {
360 next_to_emit += 1;
361 return Some(result);
362 }
363
364 while in_flight.len() < parallelism && !input_done {
365 match input.next() {
366 Some(Ok(item)) => {
367 let index = next_index;
368 next_index += 1;
369 match start_ractor_ask(
370 index,
371 actor_ref.clone(),
372 timeout,
373 item,
374 Arc::clone(&make_msg),
375 &mut reply_pool,
376 ) {
377 Ok(ask) => in_flight.push(ask),
378 Err(error) => {
379 completed.push((index, Err(error)));
380 input_done = true;
381 }
382 }
383 }
384 Some(Err(error)) => {
385 completed.push((next_index, Err(error)));
386 next_index += 1;
387 input_done = true;
388 }
389 None => input_done = true,
390 }
391 }
392
393 if let Some(result) = take_completed(&mut completed, next_to_emit) {
394 next_to_emit += 1;
395 return Some(result);
396 }
397
398 if in_flight.is_empty() {
399 return None;
400 }
401
402 let ask = wait_for_ready_ask(&mut in_flight, timeout);
403 let index = ask.index;
404 let result = ask.result;
405 recycle_reply_state(ask.state, &mut reply_pool);
406 if index == next_to_emit {
407 next_to_emit += 1;
408 return Some(result);
409 }
410 completed.push((index, result));
411 }
412 }))
413}
414
415fn take_completed<Out>(
416 completed: &mut Vec<(usize, StreamResult<Out>)>,
417 index: usize,
418) -> Option<StreamResult<Out>> {
419 let position = completed
420 .iter()
421 .position(|(completed_index, _)| *completed_index == index)?;
422 Some(completed.swap_remove(position).1)
423}
424
425struct InFlightAsk<Out> {
426 index: usize,
427 state: Option<Arc<ReplyState<Out>>>,
428 deadline: Option<Instant>,
431}
432
433impl<Out> InFlightAsk<Out> {
434 fn state(&self) -> &Arc<ReplyState<Out>> {
435 self.state.as_ref().expect("in-flight ask has reply state")
436 }
437
438 fn into_state(mut self) -> Arc<ReplyState<Out>> {
439 self.state.take().expect("in-flight ask has reply state")
440 }
441}
442
443impl<Out> Drop for InFlightAsk<Out> {
444 fn drop(&mut self) {
445 if let Some(state) = &self.state {
446 state.close_receiver();
447 }
448 }
449}
450
451struct CompletedAsk<Out> {
452 index: usize,
453 result: StreamResult<Out>,
454 state: Arc<ReplyState<Out>>,
455}
456
457fn start_ractor_ask<In, Msg, Out, F>(
458 index: usize,
459 actor_ref: ActorRef<Msg>,
460 timeout: Duration,
461 input: In,
462 make_msg: Arc<F>,
463 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
464) -> StreamResult<InFlightAsk<Out>>
465where
466 In: Send + 'static,
467 Msg: Message,
468 Out: Send + 'static,
469 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
470{
471 let reply_state = match reply_pool.pop() {
472 Some(state) => {
475 state.reset(timeout);
476 state
477 }
478 None => Arc::new(ReplyState::new(timeout)),
479 };
480 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
481 let message =
482 catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
483 StreamError::ActorAskTaskFailed {
484 reason: panic_reason(panic),
485 }
486 })?;
487
488 match actor_ref.cast(message) {
489 Ok(()) => {}
490 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
491 return Err(StreamError::ActorTerminated);
492 }
493 Err(error) => {
494 return Err(StreamError::ActorAskSendFailed {
495 reason: error.to_string(),
496 });
497 }
498 }
499
500 Ok(InFlightAsk {
501 index,
502 state: Some(reply_state),
503 deadline: Instant::now().checked_add(timeout),
504 })
505}
506
507fn wait_for_ready_ask<Out>(
508 in_flight: &mut Vec<InFlightAsk<Out>>,
509 timeout: Duration,
510) -> CompletedAsk<Out> {
511 let mut idle_spins = 0;
512 let mut idle_yields = 0;
513 let mut time_refresh = 0_u32;
514 let mut now = Instant::now();
515 loop {
516 if time_refresh == 0 {
517 now = Instant::now();
518 }
519 time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
520 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
521 return ask;
522 }
523
524 if idle_spins < ASK_READY_SPINS {
525 idle_spins += 1;
526 std::hint::spin_loop();
527 } else if idle_yields < ASK_IDLE_YIELDS {
528 idle_yields += 1;
529 time_refresh = 0;
530 thread::yield_now();
531 } else {
532 idle_spins = 0;
533 idle_yields = 0;
534 time_refresh = 0;
535 let current = thread::current();
536 let registered = register_ask_waiters(in_flight, ¤t);
537 now = Instant::now();
538 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
539 unregister_ask_waiters(registered, current.id());
540 return ask;
541 }
542 thread::park_timeout(next_ask_park(in_flight, now));
543 unregister_ask_waiters(registered, current.id());
544 }
545 }
546}
547
548fn take_ready_ask<Out>(
549 in_flight: &mut Vec<InFlightAsk<Out>>,
550 timeout: Duration,
551 now: Instant,
552) -> Option<CompletedAsk<Out>> {
553 let mut index = 0;
554 while index < in_flight.len() {
555 match in_flight[index].state().poll() {
556 ReplyPoll::Ready(reply) => {
557 let ask = in_flight.swap_remove(index);
558 return Some(CompletedAsk {
559 index: ask.index,
560 result: Ok(reply),
561 state: ask.into_state(),
562 });
563 }
564 ReplyPoll::Dropped => {
565 let ask = in_flight.swap_remove(index);
566 return Some(CompletedAsk {
567 index: ask.index,
568 result: Err(StreamError::ActorAskResponseDropped),
569 state: ask.into_state(),
570 });
571 }
572 ReplyPoll::Pending => {
573 if in_flight[index]
574 .deadline
575 .is_some_and(|deadline| now >= deadline)
576 {
577 let outcome = in_flight[index].state().close_on_timeout();
580 let ask = in_flight.swap_remove(index);
581 let result = match outcome {
582 ReplyPoll::Ready(reply) => Ok(reply),
583 ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
584 ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
585 };
586 return Some(CompletedAsk {
587 index: ask.index,
588 result,
589 state: ask.into_state(),
590 });
591 }
592 index += 1;
593 }
594 }
595 }
596 None
597}
598
599fn register_ask_waiters<Out>(
600 in_flight: &[InFlightAsk<Out>],
601 current: &Thread,
602) -> Vec<Arc<ReplyState<Out>>> {
603 let mut registered = Vec::with_capacity(in_flight.len());
604 for ask in in_flight {
605 ask.state().register_waiter(current.clone());
606 registered.push(Arc::clone(ask.state()));
607 }
608 registered
609}
610
611fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
612 for state in registered {
613 state.unregister_waiter(current_id);
614 }
615}
616
617fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
618 next_ask_deadline_remaining(in_flight, now)
619 .unwrap_or(ASK_MAX_PARK)
620 .min(ASK_MAX_PARK)
621}
622
623fn next_ask_deadline_remaining<Out>(
624 in_flight: &[InFlightAsk<Out>],
625 now: Instant,
626) -> Option<Duration> {
627 in_flight
628 .iter()
629 .filter_map(|ask| ask.deadline)
630 .map(|deadline| deadline.saturating_duration_since(now))
631 .min()
632}
633
634fn recycle_reply_state<Out>(
635 state: Arc<ReplyState<Out>>,
636 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
637) {
638 if Arc::strong_count(&state) == 1 {
639 reply_pool.push(state);
640 }
641}
642
643fn panic_reason(panic: Box<dyn Any + Send>) -> String {
644 if let Some(reason) = panic.downcast_ref::<&str>() {
645 (*reason).to_owned()
646 } else if let Some(reason) = panic.downcast_ref::<String>() {
647 reason.clone()
648 } else {
649 "actor ask task panicked".to_owned()
650 }
651}
652
653pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
654 static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
655 std::sync::OnceLock::new();
656
657 match RUNTIME.get_or_init(|| {
658 tokio::runtime::Builder::new_multi_thread()
659 .thread_name("datum-ractor-runtime")
660 .enable_all()
661 .build()
662 .map_err(|error| format!("ractor runtime failed to start: {error}"))
663 }) {
664 Ok(runtime) => Ok(runtime),
665 Err(error) => Err(StreamError::Failed(error.clone())),
666 }
667}
668
669pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
670where
671 T: Send + 'static,
672 F: std::future::Future<Output = T> + Send + 'static,
673{
674 let runtime = ractor_runtime()?;
675 let result = if tokio::runtime::Handle::try_current().is_ok() {
676 std::thread::spawn(move || runtime.block_on(future))
677 .join()
678 .map_err(|_| StreamError::AbruptTermination)?
679 } else {
680 runtime.block_on(future)
681 };
682 Ok(result)
683}
684
685#[cfg(test)]
686fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
687where
688 T: Send + 'static,
689 F: std::future::Future<Output = T> + Send + 'static,
690{
691 block_on_ractor_runtime(future).expect("ractor ask runtime starts")
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use crate::stream::{Sink, Source, StreamCompletion};
698 use std::sync::{
699 Arc as StdArc,
700 atomic::{AtomicUsize, Ordering},
701 };
702
703 enum AskTestMessage {
704 Delayed {
705 input: u64,
706 delay: Duration,
707 reply_to: ReplyPort<u64>,
708 },
709 Track {
710 input: u64,
711 reply_to: ReplyPort<u64>,
712 },
713 NeverReply {
714 _reply_to: ReplyPort<u64>,
715 },
716 DropReply {
717 _reply_to: ReplyPort<u64>,
718 },
719 BindTcp {
720 reply_to: ReplyPort<u64>,
721 },
722 }
723
724 #[cfg(feature = "cluster")]
725 impl Message for AskTestMessage {}
726
727 struct AskTestActor;
728
729 struct AskTestState {
730 active: StdArc<AtomicUsize>,
731 max_active: StdArc<AtomicUsize>,
732 held_replies: Vec<ReplyPort<u64>>,
733 }
734
735 impl Actor for AskTestActor {
736 type Msg = AskTestMessage;
737 type State = AskTestState;
738 type Arguments = AskTestState;
739
740 async fn pre_start(
741 &self,
742 _myself: ActorRef<Self::Msg>,
743 args: Self::Arguments,
744 ) -> Result<Self::State, ActorProcessingErr> {
745 Ok(args)
746 }
747
748 async fn handle(
749 &self,
750 _myself: ActorRef<Self::Msg>,
751 message: Self::Msg,
752 state: &mut Self::State,
753 ) -> Result<(), ActorProcessingErr> {
754 match message {
755 AskTestMessage::Delayed {
756 input,
757 delay,
758 reply_to,
759 } => {
760 ractor::concurrency::spawn(async move {
761 ractor::concurrency::sleep(delay).await;
762 let _ = reply_to.send(input);
763 });
764 }
765 AskTestMessage::Track { input, reply_to } => {
766 let active = StdArc::clone(&state.active);
767 let max_active = StdArc::clone(&state.max_active);
768 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
769 max_active.fetch_max(current, Ordering::SeqCst);
770 ractor::concurrency::spawn(async move {
771 ractor::concurrency::sleep(Duration::from_millis(20)).await;
772 active.fetch_sub(1, Ordering::SeqCst);
773 let _ = reply_to.send(input);
774 });
775 }
776 AskTestMessage::NeverReply { _reply_to } => {
777 state.held_replies.push(_reply_to);
778 }
779 AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
780 AskTestMessage::BindTcp { reply_to } => {
781 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
782 let port = listener.local_addr()?.port();
783 let _ = reply_to.send(u64::from(port));
784 }
785 }
786 Ok(())
787 }
788 }
789
790 fn wait<T>(completion: StreamCompletion<T>) -> T {
791 completion.wait().unwrap()
792 }
793
794 fn spawn_test_actor() -> (
795 ActorRef<AskTestMessage>,
796 ractor::concurrency::JoinHandle<()>,
797 StdArc<AtomicUsize>,
798 ) {
799 let active = StdArc::new(AtomicUsize::new(0));
800 let max_active = StdArc::new(AtomicUsize::new(0));
801 let state = AskTestState {
802 active,
803 max_active: StdArc::clone(&max_active),
804 held_replies: Vec::new(),
805 };
806 let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
807 Actor::spawn(None, AskTestActor, state)
808 .await
809 .expect("test actor spawns")
810 });
811 (actor_ref, handle, max_active)
812 }
813
814 fn stop_test_actor(
815 actor_ref: ActorRef<AskTestMessage>,
816 handle: ractor::concurrency::JoinHandle<()>,
817 ) {
818 actor_ref.stop(None);
819 block_on_ractor_ask_runtime(async move {
820 handle.await.expect("test actor task joins");
821 });
822 }
823
824 #[test]
825 fn actor_flow_ask_preserves_order_with_parallelism() {
826 let (actor_ref, handle, _) = spawn_test_actor();
827
828 let values = Source::from_iter(0_u64..5)
829 .via(ActorFlow::ask(
830 actor_ref.clone(),
831 5,
832 Duration::from_secs(1),
833 |input, reply_to| AskTestMessage::Delayed {
834 input,
835 delay: Duration::from_millis((5 - input) * 10),
836 reply_to,
837 },
838 ))
839 .run_with(Sink::collect())
840 .unwrap();
841
842 assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
843 stop_test_actor(actor_ref, handle);
844 }
845
846 #[test]
847 fn actor_flow_ask_respects_parallelism() {
848 let (actor_ref, handle, max_active) = spawn_test_actor();
849
850 let values = Source::from_iter(0_u64..8)
851 .via(ActorFlow::ask(
852 actor_ref.clone(),
853 3,
854 Duration::from_secs(1),
855 |input, reply_to| AskTestMessage::Track { input, reply_to },
856 ))
857 .run_with(Sink::collect())
858 .unwrap();
859
860 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
861 assert_eq!(max_active.load(Ordering::SeqCst), 3);
862 stop_test_actor(actor_ref, handle);
863 }
864
865 #[test]
866 fn actor_flow_ask_into_head_returns_value() {
867 let (actor_ref, handle, _) = spawn_test_actor();
877
878 let head = Source::single(7_u64)
879 .via(ActorFlow::ask(
880 actor_ref.clone(),
881 1,
882 Duration::from_secs(1),
883 |input, reply_to| AskTestMessage::Track { input, reply_to },
884 ))
885 .run_with(Sink::head())
886 .unwrap();
887
888 assert_eq!(wait(head), 7);
889
890 stop_test_actor(actor_ref, handle);
891 }
892
893 #[test]
894 fn actor_flow_ask_actor_handler_can_use_tokio_io() {
895 let (actor_ref, handle, _) = spawn_test_actor();
896
897 let ports = Source::single(0_u64)
898 .via(ActorFlow::ask(
899 actor_ref.clone(),
900 1,
901 Duration::from_secs(1),
902 |_input, reply_to| AskTestMessage::BindTcp { reply_to },
903 ))
904 .run_collect()
905 .unwrap();
906
907 assert_eq!(ports.len(), 1);
908 assert_ne!(ports[0], 0);
909 stop_test_actor(actor_ref, handle);
910 }
911
912 #[test]
913 fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
914 let (actor_ref, handle, _) = spawn_test_actor();
921
922 let values = Source::from_iter(0_u64..8)
923 .via(ActorFlow::ask(
924 actor_ref.clone(),
925 4,
926 Duration::from_secs(30),
927 |input, reply_to| AskTestMessage::Delayed {
928 input,
929 delay: Duration::from_millis(25),
930 reply_to,
931 },
932 ))
933 .run_with(Sink::collect())
934 .unwrap();
935
936 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
937 stop_test_actor(actor_ref, handle);
938 }
939
940 #[test]
941 fn dropped_in_flight_ask_closes_reply_port() {
942 let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
943 let reply_to = ReplyPort::new(StdArc::clone(&state));
944 let ask = InFlightAsk {
945 index: 0,
946 state: Some(StdArc::clone(&state)),
947 deadline: None,
948 };
949
950 drop(ask);
951
952 assert!(reply_to.is_closed());
953 assert_eq!(reply_to.send(1), Err(ReplySendError));
954 }
955
956 #[test]
957 fn actor_flow_ask_times_out() {
958 let (actor_ref, handle, _) = spawn_test_actor();
959 let timeout = Duration::from_millis(10);
960
961 let result = Source::single(1_u64)
962 .via(ActorFlow::ask(
963 actor_ref.clone(),
964 1,
965 timeout,
966 |_input, reply_to| AskTestMessage::NeverReply {
967 _reply_to: reply_to,
968 },
969 ))
970 .run_collect();
971
972 assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
973 stop_test_actor(actor_ref, handle);
974 }
975
976 #[test]
977 fn actor_flow_ask_fails_when_actor_is_stopped() {
978 let (actor_ref, handle, _) = spawn_test_actor();
979 actor_ref.stop(None);
980 block_on_ractor_ask_runtime(async move {
981 handle.await.expect("test actor task joins");
982 });
983
984 let result = Source::single(1_u64)
985 .via(ActorFlow::ask(
986 actor_ref,
987 1,
988 Duration::from_secs(1),
989 |input, reply_to| AskTestMessage::Delayed {
990 input,
991 delay: Duration::ZERO,
992 reply_to,
993 },
994 ))
995 .run_collect();
996
997 assert_eq!(result, Err(StreamError::ActorTerminated));
998 }
999
1000 #[test]
1001 fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1002 let (actor_ref, handle, _) = spawn_test_actor();
1003
1004 let result = Source::single(1_u64)
1005 .via(ActorFlow::ask(
1006 actor_ref.clone(),
1007 1,
1008 Duration::from_secs(1),
1009 |_input, reply_to| AskTestMessage::DropReply {
1010 _reply_to: reply_to,
1011 },
1012 ))
1013 .run_collect();
1014
1015 assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1016 stop_test_actor(actor_ref, handle);
1017 }
1018
1019 #[test]
1020 fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1021 let (actor_ref, handle, _) = spawn_test_actor();
1022
1023 let result = Source::single(1_u64)
1024 .via(ActorFlow::ask(
1025 actor_ref.clone(),
1026 1,
1027 Duration::from_secs(1),
1028 |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1029 panic!("message builder failed");
1030 },
1031 ))
1032 .run_collect();
1033
1034 assert_eq!(
1035 result,
1036 Err(StreamError::ActorAskTaskFailed {
1037 reason: "message builder failed".to_owned()
1038 })
1039 );
1040 stop_test_actor(actor_ref, handle);
1041 }
1042}