1use std::{
4 any::Any,
5 error::Error,
6 fmt,
7 panic::{AssertUnwindSafe, catch_unwind},
8 sync::{
9 Arc, Mutex, MutexGuard,
10 atomic::{AtomicBool, AtomicU8, Ordering},
11 },
12 thread::{self, Thread, ThreadId},
13 time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21mod stream_ref;
22pub use interop::{
23 ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
24 ActorSourceMessage, ActorStatus, WatchEvent,
25};
26pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
27
28pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
29
30const ASK_READY_SPINS: usize = 256;
31const ASK_IDLE_YIELDS: usize = 64;
39const ASK_MAX_PARK: Duration = Duration::from_millis(1);
40const ASK_TIME_REFRESH_ITERS: u32 = 64;
41
42const REPLY_PENDING: u8 = 0;
46const REPLY_READY: u8 = 1;
47const REPLY_DROPPED: u8 = 2;
48
49pub struct ActorFlow;
51
52impl ActorFlow {
53 #[must_use]
59 pub fn ask<In, Msg, Out, F>(
60 actor_ref: ActorRef<Msg>,
61 parallelism: usize,
62 timeout: Duration,
63 make_msg: F,
64 ) -> Flow<In, Out, NotUsed>
65 where
66 In: Send + 'static,
67 Msg: Message,
68 Out: Send + 'static,
69 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
70 {
71 ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
72 }
73}
74
75pub struct ReplyPort<T> {
81 inner: Arc<ReplyState<T>>,
82 active: bool,
83}
84
85#[derive(Debug)]
86struct ReplyState<T> {
87 timeout: Duration,
88 receiver_closed: AtomicBool,
89 gate: AtomicU8,
90 slot: Mutex<ReplySlotState<T>>,
91}
92
93#[derive(Debug)]
94struct ReplySlotState<T> {
95 value: ReplySlot<T>,
96 waiter: Option<Thread>,
97}
98
99#[derive(Debug)]
100enum ReplySlot<T> {
101 Pending,
102 Ready(T),
103 Dropped,
104}
105
106enum ReplyPoll<T> {
107 Pending,
108 Ready(T),
109 Dropped,
110}
111
112impl<T> fmt::Debug for ReplyPort<T> {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 f.debug_struct("ReplyPort")
115 .field("timeout", &self.timeout())
116 .field("closed", &self.is_closed())
117 .finish_non_exhaustive()
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub struct ReplySendError;
124
125impl fmt::Display for ReplySendError {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.write_str("actor reply receiver dropped")
128 }
129}
130
131impl Error for ReplySendError {}
132
133impl<T> ReplyPort<T> {
134 fn new(inner: Arc<ReplyState<T>>) -> Self {
135 Self {
136 inner,
137 active: true,
138 }
139 }
140
141 #[must_use]
142 pub fn timeout(&self) -> Option<Duration> {
143 Some(self.inner.timeout)
144 }
145
146 #[must_use]
147 pub fn is_closed(&self) -> bool {
148 self.inner.is_closed()
149 }
150
151 pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
152 self.active = false;
153 self.inner.send(reply)
154 }
155}
156
157impl<T> Drop for ReplyPort<T> {
158 fn drop(&mut self) {
159 if self.active {
160 self.inner.drop_sender();
161 }
162 }
163}
164
165impl<T> ReplyState<T> {
166 fn new(timeout: Duration) -> Self {
167 Self {
168 timeout,
169 receiver_closed: AtomicBool::new(false),
170 gate: AtomicU8::new(REPLY_PENDING),
171 slot: Mutex::new(ReplySlotState {
172 value: ReplySlot::Pending,
173 waiter: None,
174 }),
175 }
176 }
177
178 fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
184 self.slot
185 .lock()
186 .unwrap_or_else(|poison| poison.into_inner())
187 }
188
189 fn reset(&self, timeout: Duration) {
190 self.receiver_closed.store(false, Ordering::Release);
191 self.gate.store(REPLY_PENDING, Ordering::Release);
192 let mut slot = self.lock_slot();
193 slot.value = ReplySlot::Pending;
194 slot.waiter = None;
195 debug_assert_eq!(self.timeout, timeout);
196 }
197
198 fn is_closed(&self) -> bool {
199 self.receiver_closed.load(Ordering::Acquire)
200 }
201
202 fn send(&self, reply: T) -> Result<(), ReplySendError> {
203 if self.is_closed() {
204 return Err(ReplySendError);
205 }
206
207 let mut slot = self.lock_slot();
208 if self.is_closed() {
209 return Err(ReplySendError);
210 }
211
212 if !matches!(slot.value, ReplySlot::Pending) {
213 return Err(ReplySendError);
214 }
215
216 slot.value = ReplySlot::Ready(reply);
217 let waiter = slot.waiter.clone();
218 self.gate.store(REPLY_READY, Ordering::Release);
221 drop(slot);
222 wake_waiter(waiter);
223 Ok(())
224 }
225
226 fn poll(&self) -> ReplyPoll<T> {
227 if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
228 return ReplyPoll::Pending;
230 }
231 self.take_locked()
232 }
233
234 fn close_on_timeout(&self) -> ReplyPoll<T> {
239 let mut slot = self.lock_slot();
240 self.receiver_closed.store(true, Ordering::Release);
241 let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
242 ReplySlot::Pending => ReplyPoll::Pending,
243 ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
244 ReplySlot::Dropped => ReplyPoll::Dropped,
245 };
246 let waiter = slot.waiter.take();
247 drop(slot);
248 wake_waiter(waiter);
249 outcome
250 }
251
252 fn take_locked(&self) -> ReplyPoll<T> {
253 let mut slot = self.lock_slot();
254 slot.waiter = None;
255 match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
256 ReplySlot::Pending => ReplyPoll::Pending,
257 ReplySlot::Ready(reply) => {
258 self.close_receiver();
259 ReplyPoll::Ready(reply)
260 }
261 ReplySlot::Dropped => {
262 self.close_receiver();
263 ReplyPoll::Dropped
264 }
265 }
266 }
267
268 fn drop_sender(&self) {
269 let mut slot = self.lock_slot();
270 if matches!(slot.value, ReplySlot::Pending) {
271 slot.value = ReplySlot::Dropped;
272 let waiter = slot.waiter.clone();
273 self.gate.store(REPLY_DROPPED, Ordering::Release);
274 drop(slot);
275 wake_waiter(waiter);
276 }
277 }
278
279 fn close_receiver(&self) {
280 self.receiver_closed.store(true, Ordering::Release);
281 }
282
283 fn register_waiter(&self, waiter: Thread) {
284 self.lock_slot().waiter = Some(waiter);
285 }
286
287 fn unregister_waiter(&self, waiter_id: ThreadId) {
288 let mut slot = self.lock_slot();
289 if slot
290 .waiter
291 .as_ref()
292 .is_some_and(|thread| thread.id() == waiter_id)
293 {
294 slot.waiter = None;
295 }
296 }
297}
298
299fn wake_waiter(waiter: Option<Thread>) {
300 if let Some(waiter) = waiter {
301 waiter.unpark();
302 }
303}
304
305fn ask_flow<In, Msg, Out, F>(
306 actor_ref: ActorRef<Msg>,
307 parallelism: usize,
308 timeout: Duration,
309 make_msg: Arc<F>,
310) -> Flow<In, Out>
311where
312 In: Send + 'static,
313 Msg: Message,
314 Out: Send + 'static,
315 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
316{
317 assert!(
318 parallelism > 0,
319 "ActorFlow::ask parallelism must be greater than zero"
320 );
321 Flow::from_transform(move |input| {
329 ask_ractor_ordered(
330 input,
331 actor_ref.clone(),
332 parallelism,
333 timeout,
334 Arc::clone(&make_msg),
335 )
336 })
337}
338
339fn ask_ractor_ordered<In, Msg, Out, F>(
340 mut input: BoxStream<In>,
341 actor_ref: ActorRef<Msg>,
342 parallelism: usize,
343 timeout: Duration,
344 make_msg: Arc<F>,
345) -> BoxStream<Out>
346where
347 In: Send + 'static,
348 Msg: Message,
349 Out: Send + 'static,
350 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
351{
352 let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
353 let mut next_index = 0_usize;
354 let mut next_to_emit = 0_usize;
355 let mut completed = Vec::with_capacity(parallelism);
356 let mut reply_pool = Vec::with_capacity(parallelism);
357 let mut input_done = false;
358
359 Box::new(std::iter::from_fn(move || {
360 loop {
361 if let Some(result) = take_completed(&mut completed, next_to_emit) {
362 next_to_emit += 1;
363 return Some(result);
364 }
365
366 while in_flight.len() < parallelism && !input_done {
367 match input.next() {
368 Some(Ok(item)) => {
369 let index = next_index;
370 next_index += 1;
371 match start_ractor_ask(
372 index,
373 actor_ref.clone(),
374 timeout,
375 item,
376 Arc::clone(&make_msg),
377 &mut reply_pool,
378 ) {
379 Ok(ask) => in_flight.push(ask),
380 Err(error) => {
381 completed.push((index, Err(error)));
382 input_done = true;
383 }
384 }
385 }
386 Some(Err(error)) => {
387 completed.push((next_index, Err(error)));
388 next_index += 1;
389 input_done = true;
390 }
391 None => input_done = true,
392 }
393 }
394
395 if let Some(result) = take_completed(&mut completed, next_to_emit) {
396 next_to_emit += 1;
397 return Some(result);
398 }
399
400 if in_flight.is_empty() {
401 return None;
402 }
403
404 let ask = wait_for_ready_ask(&mut in_flight, timeout);
405 let index = ask.index;
406 let result = ask.result;
407 recycle_reply_state(ask.state, &mut reply_pool);
408 if index == next_to_emit {
409 next_to_emit += 1;
410 return Some(result);
411 }
412 completed.push((index, result));
413 }
414 }))
415}
416
417fn take_completed<Out>(
418 completed: &mut Vec<(usize, StreamResult<Out>)>,
419 index: usize,
420) -> Option<StreamResult<Out>> {
421 let position = completed
422 .iter()
423 .position(|(completed_index, _)| *completed_index == index)?;
424 Some(completed.swap_remove(position).1)
425}
426
427struct InFlightAsk<Out> {
428 index: usize,
429 state: Option<Arc<ReplyState<Out>>>,
430 deadline: Option<Instant>,
433}
434
435impl<Out> InFlightAsk<Out> {
436 fn state(&self) -> &Arc<ReplyState<Out>> {
437 self.state.as_ref().expect("in-flight ask has reply state")
438 }
439
440 fn into_state(mut self) -> Arc<ReplyState<Out>> {
441 self.state.take().expect("in-flight ask has reply state")
442 }
443}
444
445impl<Out> Drop for InFlightAsk<Out> {
446 fn drop(&mut self) {
447 if let Some(state) = &self.state {
448 state.close_receiver();
449 }
450 }
451}
452
453struct CompletedAsk<Out> {
454 index: usize,
455 result: StreamResult<Out>,
456 state: Arc<ReplyState<Out>>,
457}
458
459fn start_ractor_ask<In, Msg, Out, F>(
460 index: usize,
461 actor_ref: ActorRef<Msg>,
462 timeout: Duration,
463 input: In,
464 make_msg: Arc<F>,
465 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
466) -> StreamResult<InFlightAsk<Out>>
467where
468 In: Send + 'static,
469 Msg: Message,
470 Out: Send + 'static,
471 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
472{
473 let reply_state = match reply_pool.pop() {
474 Some(state) => {
477 state.reset(timeout);
478 state
479 }
480 None => Arc::new(ReplyState::new(timeout)),
481 };
482 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
483 let message =
484 catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
485 StreamError::ActorAskTaskFailed {
486 reason: panic_reason(panic),
487 }
488 })?;
489
490 match actor_ref.cast(message) {
491 Ok(()) => {}
492 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
493 return Err(StreamError::ActorTerminated);
494 }
495 Err(error) => {
496 return Err(StreamError::ActorAskSendFailed {
497 reason: error.to_string(),
498 });
499 }
500 }
501
502 Ok(InFlightAsk {
503 index,
504 state: Some(reply_state),
505 deadline: Instant::now().checked_add(timeout),
506 })
507}
508
509fn wait_for_ready_ask<Out>(
510 in_flight: &mut Vec<InFlightAsk<Out>>,
511 timeout: Duration,
512) -> CompletedAsk<Out> {
513 let mut idle_spins = 0;
514 let mut idle_yields = 0;
515 let mut time_refresh = 0_u32;
516 let mut now = Instant::now();
517 loop {
518 if time_refresh == 0 {
519 now = Instant::now();
520 }
521 time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
522 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
523 return ask;
524 }
525
526 if idle_spins < ASK_READY_SPINS {
527 idle_spins += 1;
528 std::hint::spin_loop();
529 } else if idle_yields < ASK_IDLE_YIELDS {
530 idle_yields += 1;
531 time_refresh = 0;
532 thread::yield_now();
533 } else {
534 idle_spins = 0;
535 idle_yields = 0;
536 time_refresh = 0;
537 let current = thread::current();
538 let registered = register_ask_waiters(in_flight, ¤t);
539 now = Instant::now();
540 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
541 unregister_ask_waiters(registered, current.id());
542 return ask;
543 }
544 thread::park_timeout(next_ask_park(in_flight, now));
545 unregister_ask_waiters(registered, current.id());
546 }
547 }
548}
549
550fn take_ready_ask<Out>(
551 in_flight: &mut Vec<InFlightAsk<Out>>,
552 timeout: Duration,
553 now: Instant,
554) -> Option<CompletedAsk<Out>> {
555 let mut index = 0;
556 while index < in_flight.len() {
557 match in_flight[index].state().poll() {
558 ReplyPoll::Ready(reply) => {
559 let ask = in_flight.swap_remove(index);
560 return Some(CompletedAsk {
561 index: ask.index,
562 result: Ok(reply),
563 state: ask.into_state(),
564 });
565 }
566 ReplyPoll::Dropped => {
567 let ask = in_flight.swap_remove(index);
568 return Some(CompletedAsk {
569 index: ask.index,
570 result: Err(StreamError::ActorAskResponseDropped),
571 state: ask.into_state(),
572 });
573 }
574 ReplyPoll::Pending => {
575 if in_flight[index]
576 .deadline
577 .is_some_and(|deadline| now >= deadline)
578 {
579 let outcome = in_flight[index].state().close_on_timeout();
582 let ask = in_flight.swap_remove(index);
583 let result = match outcome {
584 ReplyPoll::Ready(reply) => Ok(reply),
585 ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
586 ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
587 };
588 return Some(CompletedAsk {
589 index: ask.index,
590 result,
591 state: ask.into_state(),
592 });
593 }
594 index += 1;
595 }
596 }
597 }
598 None
599}
600
601fn register_ask_waiters<Out>(
602 in_flight: &[InFlightAsk<Out>],
603 current: &Thread,
604) -> Vec<Arc<ReplyState<Out>>> {
605 let mut registered = Vec::with_capacity(in_flight.len());
606 for ask in in_flight {
607 ask.state().register_waiter(current.clone());
608 registered.push(Arc::clone(ask.state()));
609 }
610 registered
611}
612
613fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
614 for state in registered {
615 state.unregister_waiter(current_id);
616 }
617}
618
619fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
620 next_ask_deadline_remaining(in_flight, now)
621 .unwrap_or(ASK_MAX_PARK)
622 .min(ASK_MAX_PARK)
623}
624
625fn next_ask_deadline_remaining<Out>(
626 in_flight: &[InFlightAsk<Out>],
627 now: Instant,
628) -> Option<Duration> {
629 in_flight
630 .iter()
631 .filter_map(|ask| ask.deadline)
632 .map(|deadline| deadline.saturating_duration_since(now))
633 .min()
634}
635
636fn recycle_reply_state<Out>(
637 state: Arc<ReplyState<Out>>,
638 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
639) {
640 if Arc::strong_count(&state) == 1 {
641 reply_pool.push(state);
642 }
643}
644
645fn panic_reason(panic: Box<dyn Any + Send>) -> String {
646 if let Some(reason) = panic.downcast_ref::<&str>() {
647 (*reason).to_owned()
648 } else if let Some(reason) = panic.downcast_ref::<String>() {
649 reason.clone()
650 } else {
651 "actor ask task panicked".to_owned()
652 }
653}
654
655pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
656 static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
657 std::sync::OnceLock::new();
658
659 match RUNTIME.get_or_init(|| {
660 tokio::runtime::Builder::new_multi_thread()
661 .thread_name("datum-ractor-runtime")
662 .enable_all()
663 .build()
664 .map_err(|error| format!("ractor runtime failed to start: {error}"))
665 }) {
666 Ok(runtime) => Ok(runtime),
667 Err(error) => Err(StreamError::Failed(error.clone())),
668 }
669}
670
671pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
672where
673 T: Send + 'static,
674 F: std::future::Future<Output = T> + Send + 'static,
675{
676 let runtime = ractor_runtime()?;
677 let result = if tokio::runtime::Handle::try_current().is_ok() {
678 std::thread::spawn(move || runtime.block_on(future))
679 .join()
680 .map_err(|_| StreamError::AbruptTermination)?
681 } else {
682 runtime.block_on(future)
683 };
684 Ok(result)
685}
686
687#[cfg(test)]
688fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
689where
690 T: Send + 'static,
691 F: std::future::Future<Output = T> + Send + 'static,
692{
693 block_on_ractor_runtime(future).expect("ractor ask runtime starts")
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699 use crate::stream::{Sink, Source, StreamCompletion};
700 use std::sync::{
701 Arc as StdArc,
702 atomic::{AtomicUsize, Ordering},
703 };
704
705 enum AskTestMessage {
706 Delayed {
707 input: u64,
708 delay: Duration,
709 reply_to: ReplyPort<u64>,
710 },
711 Track {
712 input: u64,
713 reply_to: ReplyPort<u64>,
714 },
715 NeverReply {
716 _reply_to: ReplyPort<u64>,
717 },
718 DropReply {
719 _reply_to: ReplyPort<u64>,
720 },
721 BindTcp {
722 reply_to: ReplyPort<u64>,
723 },
724 }
725
726 #[cfg(feature = "cluster")]
727 impl Message for AskTestMessage {}
728
729 struct AskTestActor;
730
731 struct AskTestState {
732 active: StdArc<AtomicUsize>,
733 max_active: StdArc<AtomicUsize>,
734 held_replies: Vec<ReplyPort<u64>>,
735 }
736
737 impl Actor for AskTestActor {
738 type Msg = AskTestMessage;
739 type State = AskTestState;
740 type Arguments = AskTestState;
741
742 async fn pre_start(
743 &self,
744 _myself: ActorRef<Self::Msg>,
745 args: Self::Arguments,
746 ) -> Result<Self::State, ActorProcessingErr> {
747 Ok(args)
748 }
749
750 async fn handle(
751 &self,
752 _myself: ActorRef<Self::Msg>,
753 message: Self::Msg,
754 state: &mut Self::State,
755 ) -> Result<(), ActorProcessingErr> {
756 match message {
757 AskTestMessage::Delayed {
758 input,
759 delay,
760 reply_to,
761 } => {
762 ractor::concurrency::spawn(async move {
763 ractor::concurrency::sleep(delay).await;
764 let _ = reply_to.send(input);
765 });
766 }
767 AskTestMessage::Track { input, reply_to } => {
768 let active = StdArc::clone(&state.active);
769 let max_active = StdArc::clone(&state.max_active);
770 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
771 max_active.fetch_max(current, Ordering::SeqCst);
772 ractor::concurrency::spawn(async move {
773 ractor::concurrency::sleep(Duration::from_millis(20)).await;
774 active.fetch_sub(1, Ordering::SeqCst);
775 let _ = reply_to.send(input);
776 });
777 }
778 AskTestMessage::NeverReply { _reply_to } => {
779 state.held_replies.push(_reply_to);
780 }
781 AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
782 AskTestMessage::BindTcp { reply_to } => {
783 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
784 let port = listener.local_addr()?.port();
785 let _ = reply_to.send(u64::from(port));
786 }
787 }
788 Ok(())
789 }
790 }
791
792 fn wait<T>(completion: StreamCompletion<T>) -> T {
793 completion.wait().unwrap()
794 }
795
796 fn spawn_test_actor() -> (
797 ActorRef<AskTestMessage>,
798 ractor::concurrency::JoinHandle<()>,
799 StdArc<AtomicUsize>,
800 ) {
801 let active = StdArc::new(AtomicUsize::new(0));
802 let max_active = StdArc::new(AtomicUsize::new(0));
803 let state = AskTestState {
804 active,
805 max_active: StdArc::clone(&max_active),
806 held_replies: Vec::new(),
807 };
808 let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
809 Actor::spawn(None, AskTestActor, state)
810 .await
811 .expect("test actor spawns")
812 });
813 (actor_ref, handle, max_active)
814 }
815
816 fn stop_test_actor(
817 actor_ref: ActorRef<AskTestMessage>,
818 handle: ractor::concurrency::JoinHandle<()>,
819 ) {
820 actor_ref.stop(None);
821 block_on_ractor_ask_runtime(async move {
822 handle.await.expect("test actor task joins");
823 });
824 }
825
826 #[test]
827 fn actor_flow_ask_preserves_order_with_parallelism() {
828 let (actor_ref, handle, _) = spawn_test_actor();
829
830 let values = Source::from_iter(0_u64..5)
831 .via(ActorFlow::ask(
832 actor_ref.clone(),
833 5,
834 Duration::from_secs(1),
835 |input, reply_to| AskTestMessage::Delayed {
836 input,
837 delay: Duration::from_millis((5 - input) * 10),
838 reply_to,
839 },
840 ))
841 .run_with(Sink::collect())
842 .unwrap();
843
844 assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
845 stop_test_actor(actor_ref, handle);
846 }
847
848 #[test]
849 fn actor_flow_ask_respects_parallelism() {
850 let (actor_ref, handle, max_active) = spawn_test_actor();
851
852 let values = Source::from_iter(0_u64..8)
853 .via(ActorFlow::ask(
854 actor_ref.clone(),
855 3,
856 Duration::from_secs(1),
857 |input, reply_to| AskTestMessage::Track { input, reply_to },
858 ))
859 .run_with(Sink::collect())
860 .unwrap();
861
862 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
863 assert_eq!(max_active.load(Ordering::SeqCst), 3);
864 stop_test_actor(actor_ref, handle);
865 }
866
867 #[test]
868 fn actor_flow_ask_into_head_returns_value() {
869 let (actor_ref, handle, _) = spawn_test_actor();
879
880 let head = Source::single(7_u64)
881 .via(ActorFlow::ask(
882 actor_ref.clone(),
883 1,
884 Duration::from_secs(1),
885 |input, reply_to| AskTestMessage::Track { input, reply_to },
886 ))
887 .run_with(Sink::head())
888 .unwrap();
889
890 assert_eq!(wait(head), 7);
891
892 stop_test_actor(actor_ref, handle);
893 }
894
895 #[test]
896 fn actor_flow_ask_actor_handler_can_use_tokio_io() {
897 let (actor_ref, handle, _) = spawn_test_actor();
898
899 let ports = Source::single(0_u64)
900 .via(ActorFlow::ask(
901 actor_ref.clone(),
902 1,
903 Duration::from_secs(1),
904 |_input, reply_to| AskTestMessage::BindTcp { reply_to },
905 ))
906 .run_collect()
907 .unwrap();
908
909 assert_eq!(ports.len(), 1);
910 assert_ne!(ports[0], 0);
911 stop_test_actor(actor_ref, handle);
912 }
913
914 #[test]
915 fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
916 let (actor_ref, handle, _) = spawn_test_actor();
923
924 let values = Source::from_iter(0_u64..8)
925 .via(ActorFlow::ask(
926 actor_ref.clone(),
927 4,
928 Duration::from_secs(30),
929 |input, reply_to| AskTestMessage::Delayed {
930 input,
931 delay: Duration::from_millis(25),
932 reply_to,
933 },
934 ))
935 .run_with(Sink::collect())
936 .unwrap();
937
938 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
939 stop_test_actor(actor_ref, handle);
940 }
941
942 #[test]
943 fn dropped_in_flight_ask_closes_reply_port() {
944 let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
945 let reply_to = ReplyPort::new(StdArc::clone(&state));
946 let ask = InFlightAsk {
947 index: 0,
948 state: Some(StdArc::clone(&state)),
949 deadline: None,
950 };
951
952 drop(ask);
953
954 assert!(reply_to.is_closed());
955 assert_eq!(reply_to.send(1), Err(ReplySendError));
956 }
957
958 #[test]
959 fn actor_flow_ask_times_out() {
960 let (actor_ref, handle, _) = spawn_test_actor();
961 let timeout = Duration::from_millis(10);
962
963 let result = Source::single(1_u64)
964 .via(ActorFlow::ask(
965 actor_ref.clone(),
966 1,
967 timeout,
968 |_input, reply_to| AskTestMessage::NeverReply {
969 _reply_to: reply_to,
970 },
971 ))
972 .run_collect();
973
974 assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
975 stop_test_actor(actor_ref, handle);
976 }
977
978 #[test]
979 fn actor_flow_ask_fails_when_actor_is_stopped() {
980 let (actor_ref, handle, _) = spawn_test_actor();
981 actor_ref.stop(None);
982 block_on_ractor_ask_runtime(async move {
983 handle.await.expect("test actor task joins");
984 });
985
986 let result = Source::single(1_u64)
987 .via(ActorFlow::ask(
988 actor_ref,
989 1,
990 Duration::from_secs(1),
991 |input, reply_to| AskTestMessage::Delayed {
992 input,
993 delay: Duration::ZERO,
994 reply_to,
995 },
996 ))
997 .run_collect();
998
999 assert_eq!(result, Err(StreamError::ActorTerminated));
1000 }
1001
1002 #[test]
1003 fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1004 let (actor_ref, handle, _) = spawn_test_actor();
1005
1006 let result = Source::single(1_u64)
1007 .via(ActorFlow::ask(
1008 actor_ref.clone(),
1009 1,
1010 Duration::from_secs(1),
1011 |_input, reply_to| AskTestMessage::DropReply {
1012 _reply_to: reply_to,
1013 },
1014 ))
1015 .run_collect();
1016
1017 assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1018 stop_test_actor(actor_ref, handle);
1019 }
1020
1021 #[test]
1022 fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1023 let (actor_ref, handle, _) = spawn_test_actor();
1024
1025 let result = Source::single(1_u64)
1026 .via(ActorFlow::ask(
1027 actor_ref.clone(),
1028 1,
1029 Duration::from_secs(1),
1030 |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1031 panic!("message builder failed");
1032 },
1033 ))
1034 .run_collect();
1035
1036 assert_eq!(
1037 result,
1038 Err(StreamError::ActorAskTaskFailed {
1039 reason: "message builder failed".to_owned()
1040 })
1041 );
1042 stop_test_actor(actor_ref, handle);
1043 }
1044}