1use std::{
4 any::Any,
5 error::Error,
6 fmt,
7 panic::{AssertUnwindSafe, catch_unwind},
8 sync::{
9 Arc, Mutex, MutexGuard,
10 atomic::{AtomicBool, AtomicU8, Ordering},
11 },
12 thread::{self, Thread, ThreadId},
13 time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21mod stream_ref;
22pub mod stream_ref_proto;
23pub use interop::{
24 ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
25 ActorSourceMessage, ActorStatus, WatchEvent,
26};
27pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
28pub use stream_ref_proto::{
29 StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefPayload, StreamRefPayloadBytes,
30 StreamRefProtoConsumer, StreamRefProtoEndpoint, StreamRefProtoProducer,
31};
32
33pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
34
35const ASK_READY_SPINS: usize = 256;
36const ASK_IDLE_YIELDS: usize = 64;
44const ASK_MAX_PARK: Duration = Duration::from_millis(1);
45const ASK_TIME_REFRESH_ITERS: u32 = 64;
46
47const REPLY_PENDING: u8 = 0;
51const REPLY_READY: u8 = 1;
52const REPLY_DROPPED: u8 = 2;
53
54pub struct ActorFlow;
56
57impl ActorFlow {
58 #[must_use]
64 pub fn ask<In, Msg, Out, F>(
65 actor_ref: ActorRef<Msg>,
66 parallelism: usize,
67 timeout: Duration,
68 make_msg: F,
69 ) -> Flow<In, Out, NotUsed>
70 where
71 In: Send + 'static,
72 Msg: Message,
73 Out: Send + 'static,
74 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
75 {
76 ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
77 }
78}
79
80pub struct ReplyPort<T> {
86 inner: Arc<ReplyState<T>>,
87 active: bool,
88}
89
90#[derive(Debug)]
91struct ReplyState<T> {
92 timeout: Duration,
93 receiver_closed: AtomicBool,
94 gate: AtomicU8,
95 slot: Mutex<ReplySlotState<T>>,
96}
97
98#[derive(Debug)]
99struct ReplySlotState<T> {
100 value: ReplySlot<T>,
101 waiter: Option<Thread>,
102}
103
104#[derive(Debug)]
105enum ReplySlot<T> {
106 Pending,
107 Ready(T),
108 Dropped,
109}
110
111enum ReplyPoll<T> {
112 Pending,
113 Ready(T),
114 Dropped,
115}
116
117impl<T> fmt::Debug for ReplyPort<T> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("ReplyPort")
120 .field("timeout", &self.timeout())
121 .field("closed", &self.is_closed())
122 .finish_non_exhaustive()
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128pub struct ReplySendError;
129
130impl fmt::Display for ReplySendError {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.write_str("actor reply receiver dropped")
133 }
134}
135
136impl Error for ReplySendError {}
137
138impl<T> ReplyPort<T> {
139 fn new(inner: Arc<ReplyState<T>>) -> Self {
140 Self {
141 inner,
142 active: true,
143 }
144 }
145
146 #[must_use]
147 pub fn timeout(&self) -> Option<Duration> {
148 Some(self.inner.timeout)
149 }
150
151 #[must_use]
152 pub fn is_closed(&self) -> bool {
153 self.inner.is_closed()
154 }
155
156 pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
157 self.active = false;
158 self.inner.send(reply)
159 }
160}
161
162impl<T> Drop for ReplyPort<T> {
163 fn drop(&mut self) {
164 if self.active {
165 self.inner.drop_sender();
166 }
167 }
168}
169
170impl<T> ReplyState<T> {
171 fn new(timeout: Duration) -> Self {
172 Self {
173 timeout,
174 receiver_closed: AtomicBool::new(false),
175 gate: AtomicU8::new(REPLY_PENDING),
176 slot: Mutex::new(ReplySlotState {
177 value: ReplySlot::Pending,
178 waiter: None,
179 }),
180 }
181 }
182
183 fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
189 self.slot
190 .lock()
191 .unwrap_or_else(|poison| poison.into_inner())
192 }
193
194 fn reset(&self, timeout: Duration) {
195 self.receiver_closed.store(false, Ordering::Release);
196 self.gate.store(REPLY_PENDING, Ordering::Release);
197 let mut slot = self.lock_slot();
198 slot.value = ReplySlot::Pending;
199 slot.waiter = None;
200 debug_assert_eq!(self.timeout, timeout);
201 }
202
203 fn is_closed(&self) -> bool {
204 self.receiver_closed.load(Ordering::Acquire)
205 }
206
207 fn send(&self, reply: T) -> Result<(), ReplySendError> {
208 if self.is_closed() {
209 return Err(ReplySendError);
210 }
211
212 let mut slot = self.lock_slot();
213 if self.is_closed() {
214 return Err(ReplySendError);
215 }
216
217 if !matches!(slot.value, ReplySlot::Pending) {
218 return Err(ReplySendError);
219 }
220
221 slot.value = ReplySlot::Ready(reply);
222 let waiter = slot.waiter.clone();
223 self.gate.store(REPLY_READY, Ordering::Release);
226 drop(slot);
227 wake_waiter(waiter);
228 Ok(())
229 }
230
231 fn poll(&self) -> ReplyPoll<T> {
232 if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
233 return ReplyPoll::Pending;
235 }
236 self.take_locked()
237 }
238
239 fn close_on_timeout(&self) -> ReplyPoll<T> {
244 let mut slot = self.lock_slot();
245 self.receiver_closed.store(true, Ordering::Release);
246 let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
247 ReplySlot::Pending => ReplyPoll::Pending,
248 ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
249 ReplySlot::Dropped => ReplyPoll::Dropped,
250 };
251 let waiter = slot.waiter.take();
252 drop(slot);
253 wake_waiter(waiter);
254 outcome
255 }
256
257 fn take_locked(&self) -> ReplyPoll<T> {
258 let mut slot = self.lock_slot();
259 slot.waiter = None;
260 match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
261 ReplySlot::Pending => ReplyPoll::Pending,
262 ReplySlot::Ready(reply) => {
263 self.close_receiver();
264 ReplyPoll::Ready(reply)
265 }
266 ReplySlot::Dropped => {
267 self.close_receiver();
268 ReplyPoll::Dropped
269 }
270 }
271 }
272
273 fn drop_sender(&self) {
274 let mut slot = self.lock_slot();
275 if matches!(slot.value, ReplySlot::Pending) {
276 slot.value = ReplySlot::Dropped;
277 let waiter = slot.waiter.clone();
278 self.gate.store(REPLY_DROPPED, Ordering::Release);
279 drop(slot);
280 wake_waiter(waiter);
281 }
282 }
283
284 fn close_receiver(&self) {
285 self.receiver_closed.store(true, Ordering::Release);
286 }
287
288 fn register_waiter(&self, waiter: Thread) {
289 self.lock_slot().waiter = Some(waiter);
290 }
291
292 fn unregister_waiter(&self, waiter_id: ThreadId) {
293 let mut slot = self.lock_slot();
294 if slot
295 .waiter
296 .as_ref()
297 .is_some_and(|thread| thread.id() == waiter_id)
298 {
299 slot.waiter = None;
300 }
301 }
302}
303
304fn wake_waiter(waiter: Option<Thread>) {
305 if let Some(waiter) = waiter {
306 waiter.unpark();
307 }
308}
309
310fn ask_flow<In, Msg, Out, F>(
311 actor_ref: ActorRef<Msg>,
312 parallelism: usize,
313 timeout: Duration,
314 make_msg: Arc<F>,
315) -> Flow<In, Out>
316where
317 In: Send + 'static,
318 Msg: Message,
319 Out: Send + 'static,
320 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
321{
322 assert!(
323 parallelism > 0,
324 "ActorFlow::ask parallelism must be greater than zero"
325 );
326 Flow::from_transform(move |input| {
334 ask_ractor_ordered(
335 input,
336 actor_ref.clone(),
337 parallelism,
338 timeout,
339 Arc::clone(&make_msg),
340 )
341 })
342}
343
344fn ask_ractor_ordered<In, Msg, Out, F>(
345 mut input: BoxStream<In>,
346 actor_ref: ActorRef<Msg>,
347 parallelism: usize,
348 timeout: Duration,
349 make_msg: Arc<F>,
350) -> BoxStream<Out>
351where
352 In: Send + 'static,
353 Msg: Message,
354 Out: Send + 'static,
355 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
356{
357 let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
358 let mut next_index = 0_usize;
359 let mut next_to_emit = 0_usize;
360 let mut completed = Vec::with_capacity(parallelism);
361 let mut reply_pool = Vec::with_capacity(parallelism);
362 let mut input_done = false;
363
364 Box::new(std::iter::from_fn(move || {
365 loop {
366 if let Some(result) = take_completed(&mut completed, next_to_emit) {
367 next_to_emit += 1;
368 return Some(result);
369 }
370
371 while in_flight.len() < parallelism && !input_done {
372 match input.next() {
373 Some(Ok(item)) => {
374 let index = next_index;
375 next_index += 1;
376 match start_ractor_ask(
377 index,
378 actor_ref.clone(),
379 timeout,
380 item,
381 Arc::clone(&make_msg),
382 &mut reply_pool,
383 ) {
384 Ok(ask) => in_flight.push(ask),
385 Err(error) => {
386 completed.push((index, Err(error)));
387 input_done = true;
388 }
389 }
390 }
391 Some(Err(error)) => {
392 completed.push((next_index, Err(error)));
393 next_index += 1;
394 input_done = true;
395 }
396 None => input_done = true,
397 }
398 }
399
400 if let Some(result) = take_completed(&mut completed, next_to_emit) {
401 next_to_emit += 1;
402 return Some(result);
403 }
404
405 if in_flight.is_empty() {
406 return None;
407 }
408
409 let ask = wait_for_ready_ask(&mut in_flight, timeout);
410 let index = ask.index;
411 let result = ask.result;
412 recycle_reply_state(ask.state, &mut reply_pool);
413 if index == next_to_emit {
414 next_to_emit += 1;
415 return Some(result);
416 }
417 completed.push((index, result));
418 }
419 }))
420}
421
422fn take_completed<Out>(
423 completed: &mut Vec<(usize, StreamResult<Out>)>,
424 index: usize,
425) -> Option<StreamResult<Out>> {
426 let position = completed
427 .iter()
428 .position(|(completed_index, _)| *completed_index == index)?;
429 Some(completed.swap_remove(position).1)
430}
431
432struct InFlightAsk<Out> {
433 index: usize,
434 state: Option<Arc<ReplyState<Out>>>,
435 deadline: Option<Instant>,
438}
439
440impl<Out> InFlightAsk<Out> {
441 fn state(&self) -> &Arc<ReplyState<Out>> {
442 self.state.as_ref().expect("in-flight ask has reply state")
443 }
444
445 fn into_state(mut self) -> Arc<ReplyState<Out>> {
446 self.state.take().expect("in-flight ask has reply state")
447 }
448}
449
450impl<Out> Drop for InFlightAsk<Out> {
451 fn drop(&mut self) {
452 if let Some(state) = &self.state {
453 state.close_receiver();
454 }
455 }
456}
457
458struct CompletedAsk<Out> {
459 index: usize,
460 result: StreamResult<Out>,
461 state: Arc<ReplyState<Out>>,
462}
463
464fn start_ractor_ask<In, Msg, Out, F>(
465 index: usize,
466 actor_ref: ActorRef<Msg>,
467 timeout: Duration,
468 input: In,
469 make_msg: Arc<F>,
470 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
471) -> StreamResult<InFlightAsk<Out>>
472where
473 In: Send + 'static,
474 Msg: Message,
475 Out: Send + 'static,
476 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
477{
478 let reply_state = match reply_pool.pop() {
479 Some(state) => {
482 state.reset(timeout);
483 state
484 }
485 None => Arc::new(ReplyState::new(timeout)),
486 };
487 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
488 let message =
489 catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
490 StreamError::ActorAskTaskFailed {
491 reason: panic_reason(panic),
492 }
493 })?;
494
495 match actor_ref.cast(message) {
496 Ok(()) => {}
497 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
498 return Err(StreamError::ActorTerminated);
499 }
500 Err(error) => {
501 return Err(StreamError::ActorAskSendFailed {
502 reason: error.to_string(),
503 });
504 }
505 }
506
507 Ok(InFlightAsk {
508 index,
509 state: Some(reply_state),
510 deadline: Instant::now().checked_add(timeout),
511 })
512}
513
514fn wait_for_ready_ask<Out>(
515 in_flight: &mut Vec<InFlightAsk<Out>>,
516 timeout: Duration,
517) -> CompletedAsk<Out> {
518 let mut idle_spins = 0;
519 let mut idle_yields = 0;
520 let mut time_refresh = 0_u32;
521 let mut now = Instant::now();
522 loop {
523 if time_refresh == 0 {
524 now = Instant::now();
525 }
526 time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
527 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
528 return ask;
529 }
530
531 if idle_spins < ASK_READY_SPINS {
532 idle_spins += 1;
533 std::hint::spin_loop();
534 } else if idle_yields < ASK_IDLE_YIELDS {
535 idle_yields += 1;
536 time_refresh = 0;
537 thread::yield_now();
538 } else {
539 idle_spins = 0;
540 idle_yields = 0;
541 time_refresh = 0;
542 let current = thread::current();
543 let registered = register_ask_waiters(in_flight, ¤t);
544 now = Instant::now();
545 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
546 unregister_ask_waiters(registered, current.id());
547 return ask;
548 }
549 thread::park_timeout(next_ask_park(in_flight, now));
550 unregister_ask_waiters(registered, current.id());
551 }
552 }
553}
554
555fn take_ready_ask<Out>(
556 in_flight: &mut Vec<InFlightAsk<Out>>,
557 timeout: Duration,
558 now: Instant,
559) -> Option<CompletedAsk<Out>> {
560 let mut index = 0;
561 while index < in_flight.len() {
562 match in_flight[index].state().poll() {
563 ReplyPoll::Ready(reply) => {
564 let ask = in_flight.swap_remove(index);
565 return Some(CompletedAsk {
566 index: ask.index,
567 result: Ok(reply),
568 state: ask.into_state(),
569 });
570 }
571 ReplyPoll::Dropped => {
572 let ask = in_flight.swap_remove(index);
573 return Some(CompletedAsk {
574 index: ask.index,
575 result: Err(StreamError::ActorAskResponseDropped),
576 state: ask.into_state(),
577 });
578 }
579 ReplyPoll::Pending => {
580 if in_flight[index]
581 .deadline
582 .is_some_and(|deadline| now >= deadline)
583 {
584 let outcome = in_flight[index].state().close_on_timeout();
587 let ask = in_flight.swap_remove(index);
588 let result = match outcome {
589 ReplyPoll::Ready(reply) => Ok(reply),
590 ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
591 ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
592 };
593 return Some(CompletedAsk {
594 index: ask.index,
595 result,
596 state: ask.into_state(),
597 });
598 }
599 index += 1;
600 }
601 }
602 }
603 None
604}
605
606fn register_ask_waiters<Out>(
607 in_flight: &[InFlightAsk<Out>],
608 current: &Thread,
609) -> Vec<Arc<ReplyState<Out>>> {
610 let mut registered = Vec::with_capacity(in_flight.len());
611 for ask in in_flight {
612 ask.state().register_waiter(current.clone());
613 registered.push(Arc::clone(ask.state()));
614 }
615 registered
616}
617
618fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
619 for state in registered {
620 state.unregister_waiter(current_id);
621 }
622}
623
624fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
625 next_ask_deadline_remaining(in_flight, now)
626 .unwrap_or(ASK_MAX_PARK)
627 .min(ASK_MAX_PARK)
628}
629
630fn next_ask_deadline_remaining<Out>(
631 in_flight: &[InFlightAsk<Out>],
632 now: Instant,
633) -> Option<Duration> {
634 in_flight
635 .iter()
636 .filter_map(|ask| ask.deadline)
637 .map(|deadline| deadline.saturating_duration_since(now))
638 .min()
639}
640
641fn recycle_reply_state<Out>(
642 state: Arc<ReplyState<Out>>,
643 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
644) {
645 if Arc::strong_count(&state) == 1 {
646 reply_pool.push(state);
647 }
648}
649
650fn panic_reason(panic: Box<dyn Any + Send>) -> String {
651 if let Some(reason) = panic.downcast_ref::<&str>() {
652 (*reason).to_owned()
653 } else if let Some(reason) = panic.downcast_ref::<String>() {
654 reason.clone()
655 } else {
656 "actor ask task panicked".to_owned()
657 }
658}
659
660pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
661 static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
662 std::sync::OnceLock::new();
663
664 match RUNTIME.get_or_init(|| {
665 tokio::runtime::Builder::new_multi_thread()
666 .thread_name("datum-ractor-runtime")
667 .enable_all()
668 .build()
669 .map_err(|error| format!("ractor runtime failed to start: {error}"))
670 }) {
671 Ok(runtime) => Ok(runtime),
672 Err(error) => Err(StreamError::Failed(error.clone())),
673 }
674}
675
676pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
677where
678 T: Send + 'static,
679 F: std::future::Future<Output = T> + Send + 'static,
680{
681 let runtime = ractor_runtime()?;
682 let result = if tokio::runtime::Handle::try_current().is_ok() {
683 std::thread::spawn(move || runtime.block_on(future))
684 .join()
685 .map_err(|_| StreamError::AbruptTermination)?
686 } else {
687 runtime.block_on(future)
688 };
689 Ok(result)
690}
691
692#[cfg(test)]
693fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
694where
695 T: Send + 'static,
696 F: std::future::Future<Output = T> + Send + 'static,
697{
698 block_on_ractor_runtime(future).expect("ractor ask runtime starts")
699}
700
701#[cfg(test)]
702mod tests {
703 use super::*;
704 use crate::stream::{Sink, Source, StreamCompletion};
705 use std::sync::{
706 Arc as StdArc,
707 atomic::{AtomicUsize, Ordering},
708 };
709
710 enum AskTestMessage {
711 Delayed {
712 input: u64,
713 delay: Duration,
714 reply_to: ReplyPort<u64>,
715 },
716 Track {
717 input: u64,
718 reply_to: ReplyPort<u64>,
719 },
720 NeverReply {
721 _reply_to: ReplyPort<u64>,
722 },
723 DropReply {
724 _reply_to: ReplyPort<u64>,
725 },
726 BindTcp {
727 reply_to: ReplyPort<u64>,
728 },
729 }
730
731 #[cfg(feature = "cluster")]
732 impl Message for AskTestMessage {}
733
734 struct AskTestActor;
735
736 struct AskTestState {
737 active: StdArc<AtomicUsize>,
738 max_active: StdArc<AtomicUsize>,
739 held_replies: Vec<ReplyPort<u64>>,
740 }
741
742 impl Actor for AskTestActor {
743 type Msg = AskTestMessage;
744 type State = AskTestState;
745 type Arguments = AskTestState;
746
747 async fn pre_start(
748 &self,
749 _myself: ActorRef<Self::Msg>,
750 args: Self::Arguments,
751 ) -> Result<Self::State, ActorProcessingErr> {
752 Ok(args)
753 }
754
755 async fn handle(
756 &self,
757 _myself: ActorRef<Self::Msg>,
758 message: Self::Msg,
759 state: &mut Self::State,
760 ) -> Result<(), ActorProcessingErr> {
761 match message {
762 AskTestMessage::Delayed {
763 input,
764 delay,
765 reply_to,
766 } => {
767 ractor::concurrency::spawn(async move {
768 ractor::concurrency::sleep(delay).await;
769 let _ = reply_to.send(input);
770 });
771 }
772 AskTestMessage::Track { input, reply_to } => {
773 let active = StdArc::clone(&state.active);
774 let max_active = StdArc::clone(&state.max_active);
775 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
776 max_active.fetch_max(current, Ordering::SeqCst);
777 ractor::concurrency::spawn(async move {
778 ractor::concurrency::sleep(Duration::from_millis(20)).await;
779 active.fetch_sub(1, Ordering::SeqCst);
780 let _ = reply_to.send(input);
781 });
782 }
783 AskTestMessage::NeverReply { _reply_to } => {
784 state.held_replies.push(_reply_to);
785 }
786 AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
787 AskTestMessage::BindTcp { reply_to } => {
788 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
789 let port = listener.local_addr()?.port();
790 let _ = reply_to.send(u64::from(port));
791 }
792 }
793 Ok(())
794 }
795 }
796
797 fn wait<T>(completion: StreamCompletion<T>) -> T {
798 completion.wait().unwrap()
799 }
800
801 fn spawn_test_actor() -> (
802 ActorRef<AskTestMessage>,
803 ractor::concurrency::JoinHandle<()>,
804 StdArc<AtomicUsize>,
805 ) {
806 let active = StdArc::new(AtomicUsize::new(0));
807 let max_active = StdArc::new(AtomicUsize::new(0));
808 let state = AskTestState {
809 active,
810 max_active: StdArc::clone(&max_active),
811 held_replies: Vec::new(),
812 };
813 let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
814 Actor::spawn(None, AskTestActor, state)
815 .await
816 .expect("test actor spawns")
817 });
818 (actor_ref, handle, max_active)
819 }
820
821 fn stop_test_actor(
822 actor_ref: ActorRef<AskTestMessage>,
823 handle: ractor::concurrency::JoinHandle<()>,
824 ) {
825 actor_ref.stop(None);
826 block_on_ractor_ask_runtime(async move {
827 handle.await.expect("test actor task joins");
828 });
829 }
830
831 #[test]
832 fn actor_flow_ask_preserves_order_with_parallelism() {
833 let (actor_ref, handle, _) = spawn_test_actor();
834
835 let values = Source::from_iter(0_u64..5)
836 .via(ActorFlow::ask(
837 actor_ref.clone(),
838 5,
839 Duration::from_secs(1),
840 |input, reply_to| AskTestMessage::Delayed {
841 input,
842 delay: Duration::from_millis((5 - input) * 10),
843 reply_to,
844 },
845 ))
846 .run_with(Sink::collect())
847 .unwrap();
848
849 assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
850 stop_test_actor(actor_ref, handle);
851 }
852
853 #[test]
854 fn actor_flow_ask_respects_parallelism() {
855 let (actor_ref, handle, max_active) = spawn_test_actor();
856
857 let values = Source::from_iter(0_u64..8)
858 .via(ActorFlow::ask(
859 actor_ref.clone(),
860 3,
861 Duration::from_secs(1),
862 |input, reply_to| AskTestMessage::Track { input, reply_to },
863 ))
864 .run_with(Sink::collect())
865 .unwrap();
866
867 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
868 assert_eq!(max_active.load(Ordering::SeqCst), 3);
869 stop_test_actor(actor_ref, handle);
870 }
871
872 #[test]
873 fn actor_flow_ask_into_head_returns_value() {
874 let (actor_ref, handle, _) = spawn_test_actor();
884
885 let head = Source::single(7_u64)
886 .via(ActorFlow::ask(
887 actor_ref.clone(),
888 1,
889 Duration::from_secs(1),
890 |input, reply_to| AskTestMessage::Track { input, reply_to },
891 ))
892 .run_with(Sink::head())
893 .unwrap();
894
895 assert_eq!(wait(head), 7);
896
897 stop_test_actor(actor_ref, handle);
898 }
899
900 #[test]
901 fn actor_flow_ask_actor_handler_can_use_tokio_io() {
902 let (actor_ref, handle, _) = spawn_test_actor();
903
904 let ports = Source::single(0_u64)
905 .via(ActorFlow::ask(
906 actor_ref.clone(),
907 1,
908 Duration::from_secs(1),
909 |_input, reply_to| AskTestMessage::BindTcp { reply_to },
910 ))
911 .run_collect()
912 .unwrap();
913
914 assert_eq!(ports.len(), 1);
915 assert_ne!(ports[0], 0);
916 stop_test_actor(actor_ref, handle);
917 }
918
919 #[test]
920 fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
921 let (actor_ref, handle, _) = spawn_test_actor();
928
929 let values = Source::from_iter(0_u64..8)
930 .via(ActorFlow::ask(
931 actor_ref.clone(),
932 4,
933 Duration::from_secs(30),
934 |input, reply_to| AskTestMessage::Delayed {
935 input,
936 delay: Duration::from_millis(25),
937 reply_to,
938 },
939 ))
940 .run_with(Sink::collect())
941 .unwrap();
942
943 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
944 stop_test_actor(actor_ref, handle);
945 }
946
947 #[test]
948 fn dropped_in_flight_ask_closes_reply_port() {
949 let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
950 let reply_to = ReplyPort::new(StdArc::clone(&state));
951 let ask = InFlightAsk {
952 index: 0,
953 state: Some(StdArc::clone(&state)),
954 deadline: None,
955 };
956
957 drop(ask);
958
959 assert!(reply_to.is_closed());
960 assert_eq!(reply_to.send(1), Err(ReplySendError));
961 }
962
963 #[test]
964 fn actor_flow_ask_times_out() {
965 let (actor_ref, handle, _) = spawn_test_actor();
966 let timeout = Duration::from_millis(10);
967
968 let result = Source::single(1_u64)
969 .via(ActorFlow::ask(
970 actor_ref.clone(),
971 1,
972 timeout,
973 |_input, reply_to| AskTestMessage::NeverReply {
974 _reply_to: reply_to,
975 },
976 ))
977 .run_collect();
978
979 assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
980 stop_test_actor(actor_ref, handle);
981 }
982
983 #[test]
984 fn actor_flow_ask_fails_when_actor_is_stopped() {
985 let (actor_ref, handle, _) = spawn_test_actor();
986 actor_ref.stop(None);
987 block_on_ractor_ask_runtime(async move {
988 handle.await.expect("test actor task joins");
989 });
990
991 let result = Source::single(1_u64)
992 .via(ActorFlow::ask(
993 actor_ref,
994 1,
995 Duration::from_secs(1),
996 |input, reply_to| AskTestMessage::Delayed {
997 input,
998 delay: Duration::ZERO,
999 reply_to,
1000 },
1001 ))
1002 .run_collect();
1003
1004 assert_eq!(result, Err(StreamError::ActorTerminated));
1005 }
1006
1007 #[test]
1008 fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1009 let (actor_ref, handle, _) = spawn_test_actor();
1010
1011 let result = Source::single(1_u64)
1012 .via(ActorFlow::ask(
1013 actor_ref.clone(),
1014 1,
1015 Duration::from_secs(1),
1016 |_input, reply_to| AskTestMessage::DropReply {
1017 _reply_to: reply_to,
1018 },
1019 ))
1020 .run_collect();
1021
1022 assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1023 stop_test_actor(actor_ref, handle);
1024 }
1025
1026 #[test]
1027 fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1028 let (actor_ref, handle, _) = spawn_test_actor();
1029
1030 let result = Source::single(1_u64)
1031 .via(ActorFlow::ask(
1032 actor_ref.clone(),
1033 1,
1034 Duration::from_secs(1),
1035 |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1036 panic!("message builder failed");
1037 },
1038 ))
1039 .run_collect();
1040
1041 assert_eq!(
1042 result,
1043 Err(StreamError::ActorAskTaskFailed {
1044 reason: "message builder failed".to_owned()
1045 })
1046 );
1047 stop_test_actor(actor_ref, handle);
1048 }
1049}