1use std::{
4 any::Any,
5 error::Error,
6 fmt,
7 panic::{AssertUnwindSafe, catch_unwind},
8 sync::{
9 Arc, Mutex, MutexGuard,
10 atomic::{AtomicBool, AtomicU8, Ordering},
11 },
12 thread::{self, Thread, ThreadId},
13 time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21mod stream_ref;
22pub mod stream_ref_proto;
23pub use interop::{
24 ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
25 ActorSourceMessage, ActorStatus, WatchEvent,
26};
27pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
28pub use stream_ref_proto::{
29 StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefOutbound, StreamRefPayload,
30 StreamRefPayloadBatch, StreamRefPayloadBytes, StreamRefProtoConsumer, StreamRefProtoEndpoint,
31 StreamRefProtoProducer,
32};
33
34pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
35
36const ASK_READY_SPINS: usize = 256;
37const ASK_IDLE_YIELDS: usize = 64;
45const ASK_MAX_PARK: Duration = Duration::from_millis(1);
46const ASK_TIME_REFRESH_ITERS: u32 = 64;
47
48const REPLY_PENDING: u8 = 0;
52const REPLY_READY: u8 = 1;
53const REPLY_DROPPED: u8 = 2;
54
55pub struct ActorFlow;
57
58impl ActorFlow {
59 #[must_use]
65 pub fn ask<In, Msg, Out, F>(
66 actor_ref: ActorRef<Msg>,
67 parallelism: usize,
68 timeout: Duration,
69 make_msg: F,
70 ) -> Flow<In, Out, NotUsed>
71 where
72 In: Send + 'static,
73 Msg: Message,
74 Out: Send + 'static,
75 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
76 {
77 ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
78 }
79}
80
81pub struct ReplyPort<T> {
87 inner: Arc<ReplyState<T>>,
88 active: bool,
89}
90
91#[derive(Debug)]
92struct ReplyState<T> {
93 timeout: Duration,
94 receiver_closed: AtomicBool,
95 gate: AtomicU8,
96 slot: Mutex<ReplySlotState<T>>,
97}
98
99#[derive(Debug)]
100struct ReplySlotState<T> {
101 value: ReplySlot<T>,
102 waiter: Option<Thread>,
103}
104
105#[derive(Debug)]
106enum ReplySlot<T> {
107 Pending,
108 Ready(T),
109 Dropped,
110}
111
112enum ReplyPoll<T> {
113 Pending,
114 Ready(T),
115 Dropped,
116}
117
118impl<T> fmt::Debug for ReplyPort<T> {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 f.debug_struct("ReplyPort")
121 .field("timeout", &self.timeout())
122 .field("closed", &self.is_closed())
123 .finish_non_exhaustive()
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub struct ReplySendError;
130
131impl fmt::Display for ReplySendError {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 f.write_str("actor reply receiver dropped")
134 }
135}
136
137impl Error for ReplySendError {}
138
139impl<T> ReplyPort<T> {
140 fn new(inner: Arc<ReplyState<T>>) -> Self {
141 Self {
142 inner,
143 active: true,
144 }
145 }
146
147 #[must_use]
148 pub fn timeout(&self) -> Option<Duration> {
149 Some(self.inner.timeout)
150 }
151
152 #[must_use]
153 pub fn is_closed(&self) -> bool {
154 self.inner.is_closed()
155 }
156
157 pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
158 self.active = false;
159 self.inner.send(reply)
160 }
161}
162
163impl<T> Drop for ReplyPort<T> {
164 fn drop(&mut self) {
165 if self.active {
166 self.inner.drop_sender();
167 }
168 }
169}
170
171impl<T> ReplyState<T> {
172 fn new(timeout: Duration) -> Self {
173 Self {
174 timeout,
175 receiver_closed: AtomicBool::new(false),
176 gate: AtomicU8::new(REPLY_PENDING),
177 slot: Mutex::new(ReplySlotState {
178 value: ReplySlot::Pending,
179 waiter: None,
180 }),
181 }
182 }
183
184 fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
190 self.slot
191 .lock()
192 .unwrap_or_else(|poison| poison.into_inner())
193 }
194
195 fn reset(&self, timeout: Duration) {
196 self.receiver_closed.store(false, Ordering::Release);
197 self.gate.store(REPLY_PENDING, Ordering::Release);
198 let mut slot = self.lock_slot();
199 slot.value = ReplySlot::Pending;
200 slot.waiter = None;
201 debug_assert_eq!(self.timeout, timeout);
202 }
203
204 fn is_closed(&self) -> bool {
205 self.receiver_closed.load(Ordering::Acquire)
206 }
207
208 fn send(&self, reply: T) -> Result<(), ReplySendError> {
209 if self.is_closed() {
210 return Err(ReplySendError);
211 }
212
213 let mut slot = self.lock_slot();
214 if self.is_closed() {
215 return Err(ReplySendError);
216 }
217
218 if !matches!(slot.value, ReplySlot::Pending) {
219 return Err(ReplySendError);
220 }
221
222 slot.value = ReplySlot::Ready(reply);
223 let waiter = slot.waiter.clone();
224 self.gate.store(REPLY_READY, Ordering::Release);
227 drop(slot);
228 wake_waiter(waiter);
229 Ok(())
230 }
231
232 fn poll(&self) -> ReplyPoll<T> {
233 if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
234 return ReplyPoll::Pending;
236 }
237 self.take_locked()
238 }
239
240 fn close_on_timeout(&self) -> ReplyPoll<T> {
245 let mut slot = self.lock_slot();
246 self.receiver_closed.store(true, Ordering::Release);
247 let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
248 ReplySlot::Pending => ReplyPoll::Pending,
249 ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
250 ReplySlot::Dropped => ReplyPoll::Dropped,
251 };
252 let waiter = slot.waiter.take();
253 drop(slot);
254 wake_waiter(waiter);
255 outcome
256 }
257
258 fn take_locked(&self) -> ReplyPoll<T> {
259 let mut slot = self.lock_slot();
260 slot.waiter = None;
261 match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
262 ReplySlot::Pending => ReplyPoll::Pending,
263 ReplySlot::Ready(reply) => {
264 self.close_receiver();
265 ReplyPoll::Ready(reply)
266 }
267 ReplySlot::Dropped => {
268 self.close_receiver();
269 ReplyPoll::Dropped
270 }
271 }
272 }
273
274 fn drop_sender(&self) {
275 let mut slot = self.lock_slot();
276 if matches!(slot.value, ReplySlot::Pending) {
277 slot.value = ReplySlot::Dropped;
278 let waiter = slot.waiter.clone();
279 self.gate.store(REPLY_DROPPED, Ordering::Release);
280 drop(slot);
281 wake_waiter(waiter);
282 }
283 }
284
285 fn close_receiver(&self) {
286 self.receiver_closed.store(true, Ordering::Release);
287 }
288
289 fn register_waiter(&self, waiter: Thread) {
290 self.lock_slot().waiter = Some(waiter);
291 }
292
293 fn unregister_waiter(&self, waiter_id: ThreadId) {
294 let mut slot = self.lock_slot();
295 if slot
296 .waiter
297 .as_ref()
298 .is_some_and(|thread| thread.id() == waiter_id)
299 {
300 slot.waiter = None;
301 }
302 }
303}
304
305fn wake_waiter(waiter: Option<Thread>) {
306 if let Some(waiter) = waiter {
307 waiter.unpark();
308 }
309}
310
311fn ask_flow<In, Msg, Out, F>(
312 actor_ref: ActorRef<Msg>,
313 parallelism: usize,
314 timeout: Duration,
315 make_msg: Arc<F>,
316) -> Flow<In, Out>
317where
318 In: Send + 'static,
319 Msg: Message,
320 Out: Send + 'static,
321 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
322{
323 assert!(
324 parallelism > 0,
325 "ActorFlow::ask parallelism must be greater than zero"
326 );
327 Flow::from_transform(move |input| {
335 ask_ractor_ordered(
336 input,
337 actor_ref.clone(),
338 parallelism,
339 timeout,
340 Arc::clone(&make_msg),
341 )
342 })
343}
344
345fn ask_ractor_ordered<In, Msg, Out, F>(
346 mut input: BoxStream<In>,
347 actor_ref: ActorRef<Msg>,
348 parallelism: usize,
349 timeout: Duration,
350 make_msg: Arc<F>,
351) -> BoxStream<Out>
352where
353 In: Send + 'static,
354 Msg: Message,
355 Out: Send + 'static,
356 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
357{
358 let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
359 let mut next_index = 0_usize;
360 let mut next_to_emit = 0_usize;
361 let mut completed = Vec::with_capacity(parallelism);
362 let mut reply_pool = Vec::with_capacity(parallelism);
363 let mut input_done = false;
364
365 Box::new(std::iter::from_fn(move || {
366 loop {
367 if let Some(result) = take_completed(&mut completed, next_to_emit) {
368 next_to_emit += 1;
369 return Some(result);
370 }
371
372 while in_flight.len() < parallelism && !input_done {
373 match input.next() {
374 Some(Ok(item)) => {
375 let index = next_index;
376 next_index += 1;
377 match start_ractor_ask(
378 index,
379 actor_ref.clone(),
380 timeout,
381 item,
382 Arc::clone(&make_msg),
383 &mut reply_pool,
384 ) {
385 Ok(ask) => in_flight.push(ask),
386 Err(error) => {
387 completed.push((index, Err(error)));
388 input_done = true;
389 }
390 }
391 }
392 Some(Err(error)) => {
393 completed.push((next_index, Err(error)));
394 next_index += 1;
395 input_done = true;
396 }
397 None => input_done = true,
398 }
399 }
400
401 if let Some(result) = take_completed(&mut completed, next_to_emit) {
402 next_to_emit += 1;
403 return Some(result);
404 }
405
406 if in_flight.is_empty() {
407 return None;
408 }
409
410 let ask = wait_for_ready_ask(&mut in_flight, timeout);
411 let index = ask.index;
412 let result = ask.result;
413 recycle_reply_state(ask.state, &mut reply_pool);
414 if index == next_to_emit {
415 next_to_emit += 1;
416 return Some(result);
417 }
418 completed.push((index, result));
419 }
420 }))
421}
422
423fn take_completed<Out>(
424 completed: &mut Vec<(usize, StreamResult<Out>)>,
425 index: usize,
426) -> Option<StreamResult<Out>> {
427 let position = completed
428 .iter()
429 .position(|(completed_index, _)| *completed_index == index)?;
430 Some(completed.swap_remove(position).1)
431}
432
433struct InFlightAsk<Out> {
434 index: usize,
435 state: Option<Arc<ReplyState<Out>>>,
436 deadline: Option<Instant>,
439}
440
441impl<Out> InFlightAsk<Out> {
442 fn state(&self) -> &Arc<ReplyState<Out>> {
443 self.state.as_ref().expect("in-flight ask has reply state")
444 }
445
446 fn into_state(mut self) -> Arc<ReplyState<Out>> {
447 self.state.take().expect("in-flight ask has reply state")
448 }
449}
450
451impl<Out> Drop for InFlightAsk<Out> {
452 fn drop(&mut self) {
453 if let Some(state) = &self.state {
454 state.close_receiver();
455 }
456 }
457}
458
459struct CompletedAsk<Out> {
460 index: usize,
461 result: StreamResult<Out>,
462 state: Arc<ReplyState<Out>>,
463}
464
465fn start_ractor_ask<In, Msg, Out, F>(
466 index: usize,
467 actor_ref: ActorRef<Msg>,
468 timeout: Duration,
469 input: In,
470 make_msg: Arc<F>,
471 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
472) -> StreamResult<InFlightAsk<Out>>
473where
474 In: Send + 'static,
475 Msg: Message,
476 Out: Send + 'static,
477 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
478{
479 let reply_state = match reply_pool.pop() {
480 Some(state) => {
483 state.reset(timeout);
484 state
485 }
486 None => Arc::new(ReplyState::new(timeout)),
487 };
488 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
489 let message =
490 catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
491 StreamError::ActorAskTaskFailed {
492 reason: panic_reason(panic),
493 }
494 })?;
495
496 match actor_ref.cast(message) {
497 Ok(()) => {}
498 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
499 return Err(StreamError::ActorTerminated);
500 }
501 Err(error) => {
502 return Err(StreamError::ActorAskSendFailed {
503 reason: error.to_string(),
504 });
505 }
506 }
507
508 Ok(InFlightAsk {
509 index,
510 state: Some(reply_state),
511 deadline: Instant::now().checked_add(timeout),
512 })
513}
514
515fn wait_for_ready_ask<Out>(
516 in_flight: &mut Vec<InFlightAsk<Out>>,
517 timeout: Duration,
518) -> CompletedAsk<Out> {
519 let mut idle_spins = 0;
520 let mut idle_yields = 0;
521 let mut time_refresh = 0_u32;
522 let mut now = Instant::now();
523 loop {
524 if time_refresh == 0 {
525 now = Instant::now();
526 }
527 time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
528 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
529 return ask;
530 }
531
532 if idle_spins < ASK_READY_SPINS {
533 idle_spins += 1;
534 std::hint::spin_loop();
535 } else if idle_yields < ASK_IDLE_YIELDS {
536 idle_yields += 1;
537 time_refresh = 0;
538 thread::yield_now();
539 } else {
540 idle_spins = 0;
541 idle_yields = 0;
542 time_refresh = 0;
543 let current = thread::current();
544 let registered = register_ask_waiters(in_flight, ¤t);
545 now = Instant::now();
546 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
547 unregister_ask_waiters(registered, current.id());
548 return ask;
549 }
550 thread::park_timeout(next_ask_park(in_flight, now));
551 unregister_ask_waiters(registered, current.id());
552 }
553 }
554}
555
556fn take_ready_ask<Out>(
557 in_flight: &mut Vec<InFlightAsk<Out>>,
558 timeout: Duration,
559 now: Instant,
560) -> Option<CompletedAsk<Out>> {
561 let mut index = 0;
562 while index < in_flight.len() {
563 match in_flight[index].state().poll() {
564 ReplyPoll::Ready(reply) => {
565 let ask = in_flight.swap_remove(index);
566 return Some(CompletedAsk {
567 index: ask.index,
568 result: Ok(reply),
569 state: ask.into_state(),
570 });
571 }
572 ReplyPoll::Dropped => {
573 let ask = in_flight.swap_remove(index);
574 return Some(CompletedAsk {
575 index: ask.index,
576 result: Err(StreamError::ActorAskResponseDropped),
577 state: ask.into_state(),
578 });
579 }
580 ReplyPoll::Pending => {
581 if in_flight[index]
582 .deadline
583 .is_some_and(|deadline| now >= deadline)
584 {
585 let outcome = in_flight[index].state().close_on_timeout();
588 let ask = in_flight.swap_remove(index);
589 let result = match outcome {
590 ReplyPoll::Ready(reply) => Ok(reply),
591 ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
592 ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
593 };
594 return Some(CompletedAsk {
595 index: ask.index,
596 result,
597 state: ask.into_state(),
598 });
599 }
600 index += 1;
601 }
602 }
603 }
604 None
605}
606
607fn register_ask_waiters<Out>(
608 in_flight: &[InFlightAsk<Out>],
609 current: &Thread,
610) -> Vec<Arc<ReplyState<Out>>> {
611 let mut registered = Vec::with_capacity(in_flight.len());
612 for ask in in_flight {
613 ask.state().register_waiter(current.clone());
614 registered.push(Arc::clone(ask.state()));
615 }
616 registered
617}
618
619fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
620 for state in registered {
621 state.unregister_waiter(current_id);
622 }
623}
624
625fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
626 next_ask_deadline_remaining(in_flight, now)
627 .unwrap_or(ASK_MAX_PARK)
628 .min(ASK_MAX_PARK)
629}
630
631fn next_ask_deadline_remaining<Out>(
632 in_flight: &[InFlightAsk<Out>],
633 now: Instant,
634) -> Option<Duration> {
635 in_flight
636 .iter()
637 .filter_map(|ask| ask.deadline)
638 .map(|deadline| deadline.saturating_duration_since(now))
639 .min()
640}
641
642fn recycle_reply_state<Out>(
643 state: Arc<ReplyState<Out>>,
644 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
645) {
646 if Arc::strong_count(&state) == 1 {
647 reply_pool.push(state);
648 }
649}
650
651fn panic_reason(panic: Box<dyn Any + Send>) -> String {
652 if let Some(reason) = panic.downcast_ref::<&str>() {
653 (*reason).to_owned()
654 } else if let Some(reason) = panic.downcast_ref::<String>() {
655 reason.clone()
656 } else {
657 "actor ask task panicked".to_owned()
658 }
659}
660
661pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
662 static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
663 std::sync::OnceLock::new();
664
665 match RUNTIME.get_or_init(|| {
666 tokio::runtime::Builder::new_multi_thread()
667 .thread_name("datum-ractor-runtime")
668 .enable_all()
669 .build()
670 .map_err(|error| format!("ractor runtime failed to start: {error}"))
671 }) {
672 Ok(runtime) => Ok(runtime),
673 Err(error) => Err(StreamError::Failed(error.clone())),
674 }
675}
676
677pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
678where
679 T: Send + 'static,
680 F: std::future::Future<Output = T> + Send + 'static,
681{
682 let runtime = ractor_runtime()?;
683 let result = if tokio::runtime::Handle::try_current().is_ok() {
684 std::thread::spawn(move || runtime.block_on(future))
685 .join()
686 .map_err(|_| StreamError::AbruptTermination)?
687 } else {
688 runtime.block_on(future)
689 };
690 Ok(result)
691}
692
693#[cfg(test)]
694fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
695where
696 T: Send + 'static,
697 F: std::future::Future<Output = T> + Send + 'static,
698{
699 block_on_ractor_runtime(future).expect("ractor ask runtime starts")
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use crate::stream::{Sink, Source, StreamCompletion};
706 use std::sync::{
707 Arc as StdArc,
708 atomic::{AtomicUsize, Ordering},
709 };
710
711 enum AskTestMessage {
712 Delayed {
713 input: u64,
714 delay: Duration,
715 reply_to: ReplyPort<u64>,
716 },
717 Track {
718 input: u64,
719 reply_to: ReplyPort<u64>,
720 },
721 NeverReply {
722 _reply_to: ReplyPort<u64>,
723 },
724 DropReply {
725 _reply_to: ReplyPort<u64>,
726 },
727 BindTcp {
728 reply_to: ReplyPort<u64>,
729 },
730 }
731
732 #[cfg(feature = "cluster")]
733 impl Message for AskTestMessage {}
734
735 struct AskTestActor;
736
737 struct AskTestState {
738 active: StdArc<AtomicUsize>,
739 max_active: StdArc<AtomicUsize>,
740 held_replies: Vec<ReplyPort<u64>>,
741 }
742
743 impl Actor for AskTestActor {
744 type Msg = AskTestMessage;
745 type State = AskTestState;
746 type Arguments = AskTestState;
747
748 async fn pre_start(
749 &self,
750 _myself: ActorRef<Self::Msg>,
751 args: Self::Arguments,
752 ) -> Result<Self::State, ActorProcessingErr> {
753 Ok(args)
754 }
755
756 async fn handle(
757 &self,
758 _myself: ActorRef<Self::Msg>,
759 message: Self::Msg,
760 state: &mut Self::State,
761 ) -> Result<(), ActorProcessingErr> {
762 match message {
763 AskTestMessage::Delayed {
764 input,
765 delay,
766 reply_to,
767 } => {
768 ractor::concurrency::spawn(async move {
769 ractor::concurrency::sleep(delay).await;
770 let _ = reply_to.send(input);
771 });
772 }
773 AskTestMessage::Track { input, reply_to } => {
774 let active = StdArc::clone(&state.active);
775 let max_active = StdArc::clone(&state.max_active);
776 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
777 max_active.fetch_max(current, Ordering::SeqCst);
778 ractor::concurrency::spawn(async move {
779 ractor::concurrency::sleep(Duration::from_millis(20)).await;
780 active.fetch_sub(1, Ordering::SeqCst);
781 let _ = reply_to.send(input);
782 });
783 }
784 AskTestMessage::NeverReply { _reply_to } => {
785 state.held_replies.push(_reply_to);
786 }
787 AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
788 AskTestMessage::BindTcp { reply_to } => {
789 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
790 let port = listener.local_addr()?.port();
791 let _ = reply_to.send(u64::from(port));
792 }
793 }
794 Ok(())
795 }
796 }
797
798 fn wait<T>(completion: StreamCompletion<T>) -> T {
799 completion.wait().unwrap()
800 }
801
802 fn spawn_test_actor() -> (
803 ActorRef<AskTestMessage>,
804 ractor::concurrency::JoinHandle<()>,
805 StdArc<AtomicUsize>,
806 ) {
807 let active = StdArc::new(AtomicUsize::new(0));
808 let max_active = StdArc::new(AtomicUsize::new(0));
809 let state = AskTestState {
810 active,
811 max_active: StdArc::clone(&max_active),
812 held_replies: Vec::new(),
813 };
814 let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
815 Actor::spawn(None, AskTestActor, state)
816 .await
817 .expect("test actor spawns")
818 });
819 (actor_ref, handle, max_active)
820 }
821
822 fn stop_test_actor(
823 actor_ref: ActorRef<AskTestMessage>,
824 handle: ractor::concurrency::JoinHandle<()>,
825 ) {
826 actor_ref.stop(None);
827 block_on_ractor_ask_runtime(async move {
828 handle.await.expect("test actor task joins");
829 });
830 }
831
832 #[test]
833 fn actor_flow_ask_preserves_order_with_parallelism() {
834 let (actor_ref, handle, _) = spawn_test_actor();
835
836 let values = Source::from_iter(0_u64..5)
837 .via(ActorFlow::ask(
838 actor_ref.clone(),
839 5,
840 Duration::from_secs(1),
841 |input, reply_to| AskTestMessage::Delayed {
842 input,
843 delay: Duration::from_millis((5 - input) * 10),
844 reply_to,
845 },
846 ))
847 .run_with(Sink::collect())
848 .unwrap();
849
850 assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
851 stop_test_actor(actor_ref, handle);
852 }
853
854 #[test]
855 fn actor_flow_ask_respects_parallelism() {
856 let (actor_ref, handle, max_active) = spawn_test_actor();
857
858 let values = Source::from_iter(0_u64..8)
859 .via(ActorFlow::ask(
860 actor_ref.clone(),
861 3,
862 Duration::from_secs(1),
863 |input, reply_to| AskTestMessage::Track { input, reply_to },
864 ))
865 .run_with(Sink::collect())
866 .unwrap();
867
868 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
869 assert_eq!(max_active.load(Ordering::SeqCst), 3);
870 stop_test_actor(actor_ref, handle);
871 }
872
873 #[test]
874 fn actor_flow_ask_into_head_returns_value() {
875 let (actor_ref, handle, _) = spawn_test_actor();
885
886 let head = Source::single(7_u64)
887 .via(ActorFlow::ask(
888 actor_ref.clone(),
889 1,
890 Duration::from_secs(1),
891 |input, reply_to| AskTestMessage::Track { input, reply_to },
892 ))
893 .run_with(Sink::head())
894 .unwrap();
895
896 assert_eq!(wait(head), 7);
897
898 stop_test_actor(actor_ref, handle);
899 }
900
901 #[test]
902 fn actor_flow_ask_actor_handler_can_use_tokio_io() {
903 let (actor_ref, handle, _) = spawn_test_actor();
904
905 let ports = Source::single(0_u64)
906 .via(ActorFlow::ask(
907 actor_ref.clone(),
908 1,
909 Duration::from_secs(1),
910 |_input, reply_to| AskTestMessage::BindTcp { reply_to },
911 ))
912 .run_collect()
913 .unwrap();
914
915 assert_eq!(ports.len(), 1);
916 assert_ne!(ports[0], 0);
917 stop_test_actor(actor_ref, handle);
918 }
919
920 #[test]
921 fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
922 let (actor_ref, handle, _) = spawn_test_actor();
929
930 let values = Source::from_iter(0_u64..8)
931 .via(ActorFlow::ask(
932 actor_ref.clone(),
933 4,
934 Duration::from_secs(30),
935 |input, reply_to| AskTestMessage::Delayed {
936 input,
937 delay: Duration::from_millis(25),
938 reply_to,
939 },
940 ))
941 .run_with(Sink::collect())
942 .unwrap();
943
944 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
945 stop_test_actor(actor_ref, handle);
946 }
947
948 #[test]
949 fn dropped_in_flight_ask_closes_reply_port() {
950 let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
951 let reply_to = ReplyPort::new(StdArc::clone(&state));
952 let ask = InFlightAsk {
953 index: 0,
954 state: Some(StdArc::clone(&state)),
955 deadline: None,
956 };
957
958 drop(ask);
959
960 assert!(reply_to.is_closed());
961 assert_eq!(reply_to.send(1), Err(ReplySendError));
962 }
963
964 #[test]
965 fn actor_flow_ask_times_out() {
966 let (actor_ref, handle, _) = spawn_test_actor();
967 let timeout = Duration::from_millis(10);
968
969 let result = Source::single(1_u64)
970 .via(ActorFlow::ask(
971 actor_ref.clone(),
972 1,
973 timeout,
974 |_input, reply_to| AskTestMessage::NeverReply {
975 _reply_to: reply_to,
976 },
977 ))
978 .run_collect();
979
980 assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
981 stop_test_actor(actor_ref, handle);
982 }
983
984 #[test]
985 fn actor_flow_ask_fails_when_actor_is_stopped() {
986 let (actor_ref, handle, _) = spawn_test_actor();
987 actor_ref.stop(None);
988 block_on_ractor_ask_runtime(async move {
989 handle.await.expect("test actor task joins");
990 });
991
992 let result = Source::single(1_u64)
993 .via(ActorFlow::ask(
994 actor_ref,
995 1,
996 Duration::from_secs(1),
997 |input, reply_to| AskTestMessage::Delayed {
998 input,
999 delay: Duration::ZERO,
1000 reply_to,
1001 },
1002 ))
1003 .run_collect();
1004
1005 assert_eq!(result, Err(StreamError::ActorTerminated));
1006 }
1007
1008 #[test]
1009 fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1010 let (actor_ref, handle, _) = spawn_test_actor();
1011
1012 let result = Source::single(1_u64)
1013 .via(ActorFlow::ask(
1014 actor_ref.clone(),
1015 1,
1016 Duration::from_secs(1),
1017 |_input, reply_to| AskTestMessage::DropReply {
1018 _reply_to: reply_to,
1019 },
1020 ))
1021 .run_collect();
1022
1023 assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1024 stop_test_actor(actor_ref, handle);
1025 }
1026
1027 #[test]
1028 fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1029 let (actor_ref, handle, _) = spawn_test_actor();
1030
1031 let result = Source::single(1_u64)
1032 .via(ActorFlow::ask(
1033 actor_ref.clone(),
1034 1,
1035 Duration::from_secs(1),
1036 |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1037 panic!("message builder failed");
1038 },
1039 ))
1040 .run_collect();
1041
1042 assert_eq!(
1043 result,
1044 Err(StreamError::ActorAskTaskFailed {
1045 reason: "message builder failed".to_owned()
1046 })
1047 );
1048 stop_test_actor(actor_ref, handle);
1049 }
1050}