1use std::{
20 any::Any,
21 error::Error,
22 fmt,
23 panic::{AssertUnwindSafe, catch_unwind},
24 sync::{
25 Arc, Mutex, MutexGuard,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 thread::{self, Thread, ThreadId},
29 time::{Duration, Instant},
30};
31
32use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
33
34pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
35
36mod interop;
37mod stream_ref;
38pub mod stream_ref_proto;
39pub use interop::{
40 ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
41 ActorSourceMessage, ActorStatus, WatchEvent,
42};
43pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
44pub use stream_ref_proto::{
45 StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefOutbound, StreamRefPayload,
46 StreamRefPayloadBatch, StreamRefPayloadBytes, StreamRefProtoConsumer, StreamRefProtoEndpoint,
47 StreamRefProtoProducer,
48};
49
50pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
51
52const ASK_READY_SPINS: usize = 256;
53const ASK_IDLE_YIELDS: usize = 64;
61const ASK_MAX_PARK: Duration = Duration::from_millis(1);
62const ASK_TIME_REFRESH_ITERS: u32 = 64;
63
64const REPLY_PENDING: u8 = 0;
68const REPLY_READY: u8 = 1;
69const REPLY_DROPPED: u8 = 2;
70
71pub struct ActorFlow;
73
74impl ActorFlow {
75 #[must_use]
81 pub fn ask<In, Msg, Out, F>(
82 actor_ref: ActorRef<Msg>,
83 parallelism: usize,
84 timeout: Duration,
85 make_msg: F,
86 ) -> Flow<In, Out, NotUsed>
87 where
88 In: Send + 'static,
89 Msg: Message,
90 Out: Send + 'static,
91 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
92 {
93 ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
94 }
95}
96
97pub struct ReplyPort<T> {
103 inner: Arc<ReplyState<T>>,
104 active: bool,
105}
106
107#[derive(Debug)]
108struct ReplyState<T> {
109 timeout: Duration,
110 receiver_closed: AtomicBool,
111 gate: AtomicU8,
112 slot: Mutex<ReplySlotState<T>>,
113}
114
115#[derive(Debug)]
116struct ReplySlotState<T> {
117 value: ReplySlot<T>,
118 waiter: Option<Thread>,
119}
120
121#[derive(Debug)]
122enum ReplySlot<T> {
123 Pending,
124 Ready(T),
125 Dropped,
126}
127
128enum ReplyPoll<T> {
129 Pending,
130 Ready(T),
131 Dropped,
132}
133
134impl<T> fmt::Debug for ReplyPort<T> {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_struct("ReplyPort")
137 .field("timeout", &self.timeout())
138 .field("closed", &self.is_closed())
139 .finish_non_exhaustive()
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct ReplySendError;
146
147impl fmt::Display for ReplySendError {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.write_str("actor reply receiver dropped")
150 }
151}
152
153impl Error for ReplySendError {}
154
155impl<T> ReplyPort<T> {
156 fn new(inner: Arc<ReplyState<T>>) -> Self {
157 Self {
158 inner,
159 active: true,
160 }
161 }
162
163 #[must_use]
164 pub fn timeout(&self) -> Option<Duration> {
165 Some(self.inner.timeout)
166 }
167
168 #[must_use]
169 pub fn is_closed(&self) -> bool {
170 self.inner.is_closed()
171 }
172
173 pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
174 self.active = false;
175 self.inner.send(reply)
176 }
177}
178
179impl<T> Drop for ReplyPort<T> {
180 fn drop(&mut self) {
181 if self.active {
182 self.inner.drop_sender();
183 }
184 }
185}
186
187impl<T> ReplyState<T> {
188 fn new(timeout: Duration) -> Self {
189 Self {
190 timeout,
191 receiver_closed: AtomicBool::new(false),
192 gate: AtomicU8::new(REPLY_PENDING),
193 slot: Mutex::new(ReplySlotState {
194 value: ReplySlot::Pending,
195 waiter: None,
196 }),
197 }
198 }
199
200 fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
206 self.slot
207 .lock()
208 .unwrap_or_else(|poison| poison.into_inner())
209 }
210
211 fn reset(&self, timeout: Duration) {
212 self.receiver_closed.store(false, Ordering::Release);
213 self.gate.store(REPLY_PENDING, Ordering::Release);
214 let mut slot = self.lock_slot();
215 slot.value = ReplySlot::Pending;
216 slot.waiter = None;
217 debug_assert_eq!(self.timeout, timeout);
218 }
219
220 fn is_closed(&self) -> bool {
221 self.receiver_closed.load(Ordering::Acquire)
222 }
223
224 fn send(&self, reply: T) -> Result<(), ReplySendError> {
225 if self.is_closed() {
226 return Err(ReplySendError);
227 }
228
229 let mut slot = self.lock_slot();
230 if self.is_closed() {
231 return Err(ReplySendError);
232 }
233
234 if !matches!(slot.value, ReplySlot::Pending) {
235 return Err(ReplySendError);
236 }
237
238 slot.value = ReplySlot::Ready(reply);
239 let waiter = slot.waiter.clone();
240 self.gate.store(REPLY_READY, Ordering::Release);
243 drop(slot);
244 wake_waiter(waiter);
245 Ok(())
246 }
247
248 fn poll(&self) -> ReplyPoll<T> {
249 if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
250 return ReplyPoll::Pending;
252 }
253 self.take_locked()
254 }
255
256 fn close_on_timeout(&self) -> ReplyPoll<T> {
261 let mut slot = self.lock_slot();
262 self.receiver_closed.store(true, Ordering::Release);
263 let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
264 ReplySlot::Pending => ReplyPoll::Pending,
265 ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
266 ReplySlot::Dropped => ReplyPoll::Dropped,
267 };
268 let waiter = slot.waiter.take();
269 drop(slot);
270 wake_waiter(waiter);
271 outcome
272 }
273
274 fn take_locked(&self) -> ReplyPoll<T> {
275 let mut slot = self.lock_slot();
276 slot.waiter = None;
277 match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
278 ReplySlot::Pending => ReplyPoll::Pending,
279 ReplySlot::Ready(reply) => {
280 self.close_receiver();
281 ReplyPoll::Ready(reply)
282 }
283 ReplySlot::Dropped => {
284 self.close_receiver();
285 ReplyPoll::Dropped
286 }
287 }
288 }
289
290 fn drop_sender(&self) {
291 let mut slot = self.lock_slot();
292 if matches!(slot.value, ReplySlot::Pending) {
293 slot.value = ReplySlot::Dropped;
294 let waiter = slot.waiter.clone();
295 self.gate.store(REPLY_DROPPED, Ordering::Release);
296 drop(slot);
297 wake_waiter(waiter);
298 }
299 }
300
301 fn close_receiver(&self) {
302 self.receiver_closed.store(true, Ordering::Release);
303 }
304
305 fn register_waiter(&self, waiter: Thread) {
306 self.lock_slot().waiter = Some(waiter);
307 }
308
309 fn unregister_waiter(&self, waiter_id: ThreadId) {
310 let mut slot = self.lock_slot();
311 if slot
312 .waiter
313 .as_ref()
314 .is_some_and(|thread| thread.id() == waiter_id)
315 {
316 slot.waiter = None;
317 }
318 }
319}
320
321fn wake_waiter(waiter: Option<Thread>) {
322 if let Some(waiter) = waiter {
323 waiter.unpark();
324 }
325}
326
327fn ask_flow<In, Msg, Out, F>(
328 actor_ref: ActorRef<Msg>,
329 parallelism: usize,
330 timeout: Duration,
331 make_msg: Arc<F>,
332) -> Flow<In, Out>
333where
334 In: Send + 'static,
335 Msg: Message,
336 Out: Send + 'static,
337 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
338{
339 assert!(
340 parallelism > 0,
341 "ActorFlow::ask parallelism must be greater than zero"
342 );
343 Flow::from_transform(move |input| {
351 ask_ractor_ordered(
352 input,
353 actor_ref.clone(),
354 parallelism,
355 timeout,
356 Arc::clone(&make_msg),
357 )
358 })
359}
360
361fn ask_ractor_ordered<In, Msg, Out, F>(
362 mut input: BoxStream<In>,
363 actor_ref: ActorRef<Msg>,
364 parallelism: usize,
365 timeout: Duration,
366 make_msg: Arc<F>,
367) -> BoxStream<Out>
368where
369 In: Send + 'static,
370 Msg: Message,
371 Out: Send + 'static,
372 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
373{
374 let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
375 let mut next_index = 0_usize;
376 let mut next_to_emit = 0_usize;
377 let mut completed = Vec::with_capacity(parallelism);
378 let mut reply_pool = Vec::with_capacity(parallelism);
379 let mut input_done = false;
380
381 Box::new(std::iter::from_fn(move || {
382 loop {
383 if let Some(result) = take_completed(&mut completed, next_to_emit) {
384 next_to_emit += 1;
385 return Some(result);
386 }
387
388 while in_flight.len() < parallelism && !input_done {
389 match input.next() {
390 Some(Ok(item)) => {
391 let index = next_index;
392 next_index += 1;
393 match start_ractor_ask(
394 index,
395 actor_ref.clone(),
396 timeout,
397 item,
398 Arc::clone(&make_msg),
399 &mut reply_pool,
400 ) {
401 Ok(ask) => in_flight.push(ask),
402 Err(error) => {
403 completed.push((index, Err(error)));
404 input_done = true;
405 }
406 }
407 }
408 Some(Err(error)) => {
409 completed.push((next_index, Err(error)));
410 next_index += 1;
411 input_done = true;
412 }
413 None => input_done = true,
414 }
415 }
416
417 if let Some(result) = take_completed(&mut completed, next_to_emit) {
418 next_to_emit += 1;
419 return Some(result);
420 }
421
422 if in_flight.is_empty() {
423 return None;
424 }
425
426 let ask = wait_for_ready_ask(&mut in_flight, timeout);
427 let index = ask.index;
428 let result = ask.result;
429 recycle_reply_state(ask.state, &mut reply_pool);
430 if index == next_to_emit {
431 next_to_emit += 1;
432 return Some(result);
433 }
434 completed.push((index, result));
435 }
436 }))
437}
438
439fn take_completed<Out>(
440 completed: &mut Vec<(usize, StreamResult<Out>)>,
441 index: usize,
442) -> Option<StreamResult<Out>> {
443 let position = completed
444 .iter()
445 .position(|(completed_index, _)| *completed_index == index)?;
446 Some(completed.swap_remove(position).1)
447}
448
449struct InFlightAsk<Out> {
450 index: usize,
451 state: Option<Arc<ReplyState<Out>>>,
452 deadline: Option<Instant>,
455}
456
457impl<Out> InFlightAsk<Out> {
458 fn state(&self) -> &Arc<ReplyState<Out>> {
459 self.state.as_ref().expect("in-flight ask has reply state")
460 }
461
462 fn into_state(mut self) -> Arc<ReplyState<Out>> {
463 self.state.take().expect("in-flight ask has reply state")
464 }
465}
466
467impl<Out> Drop for InFlightAsk<Out> {
468 fn drop(&mut self) {
469 if let Some(state) = &self.state {
470 state.close_receiver();
471 }
472 }
473}
474
475struct CompletedAsk<Out> {
476 index: usize,
477 result: StreamResult<Out>,
478 state: Arc<ReplyState<Out>>,
479}
480
481fn start_ractor_ask<In, Msg, Out, F>(
482 index: usize,
483 actor_ref: ActorRef<Msg>,
484 timeout: Duration,
485 input: In,
486 make_msg: Arc<F>,
487 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
488) -> StreamResult<InFlightAsk<Out>>
489where
490 In: Send + 'static,
491 Msg: Message,
492 Out: Send + 'static,
493 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
494{
495 let reply_state = match reply_pool.pop() {
496 Some(state) => {
499 state.reset(timeout);
500 state
501 }
502 None => Arc::new(ReplyState::new(timeout)),
503 };
504 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
505 let message =
506 catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
507 StreamError::ActorAskTaskFailed {
508 reason: panic_reason(panic),
509 }
510 })?;
511
512 match actor_ref.cast(message) {
513 Ok(()) => {}
514 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
515 return Err(StreamError::ActorTerminated);
516 }
517 Err(error) => {
518 return Err(StreamError::ActorAskSendFailed {
519 reason: error.to_string(),
520 });
521 }
522 }
523
524 Ok(InFlightAsk {
525 index,
526 state: Some(reply_state),
527 deadline: Instant::now().checked_add(timeout),
528 })
529}
530
531fn wait_for_ready_ask<Out>(
532 in_flight: &mut Vec<InFlightAsk<Out>>,
533 timeout: Duration,
534) -> CompletedAsk<Out> {
535 let mut idle_spins = 0;
536 let mut idle_yields = 0;
537 let mut time_refresh = 0_u32;
538 let mut now = Instant::now();
539 loop {
540 if time_refresh == 0 {
541 now = Instant::now();
542 }
543 time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
544 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
545 return ask;
546 }
547
548 if idle_spins < ASK_READY_SPINS {
549 idle_spins += 1;
550 std::hint::spin_loop();
551 } else if idle_yields < ASK_IDLE_YIELDS {
552 idle_yields += 1;
553 time_refresh = 0;
554 thread::yield_now();
555 } else {
556 idle_spins = 0;
557 idle_yields = 0;
558 time_refresh = 0;
559 let current = thread::current();
560 let registered = register_ask_waiters(in_flight, ¤t);
561 now = Instant::now();
562 if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
563 unregister_ask_waiters(registered, current.id());
564 return ask;
565 }
566 thread::park_timeout(next_ask_park(in_flight, now));
567 unregister_ask_waiters(registered, current.id());
568 }
569 }
570}
571
572fn take_ready_ask<Out>(
573 in_flight: &mut Vec<InFlightAsk<Out>>,
574 timeout: Duration,
575 now: Instant,
576) -> Option<CompletedAsk<Out>> {
577 let mut index = 0;
578 while index < in_flight.len() {
579 match in_flight[index].state().poll() {
580 ReplyPoll::Ready(reply) => {
581 let ask = in_flight.swap_remove(index);
582 return Some(CompletedAsk {
583 index: ask.index,
584 result: Ok(reply),
585 state: ask.into_state(),
586 });
587 }
588 ReplyPoll::Dropped => {
589 let ask = in_flight.swap_remove(index);
590 return Some(CompletedAsk {
591 index: ask.index,
592 result: Err(StreamError::ActorAskResponseDropped),
593 state: ask.into_state(),
594 });
595 }
596 ReplyPoll::Pending => {
597 if in_flight[index]
598 .deadline
599 .is_some_and(|deadline| now >= deadline)
600 {
601 let outcome = in_flight[index].state().close_on_timeout();
604 let ask = in_flight.swap_remove(index);
605 let result = match outcome {
606 ReplyPoll::Ready(reply) => Ok(reply),
607 ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
608 ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
609 };
610 return Some(CompletedAsk {
611 index: ask.index,
612 result,
613 state: ask.into_state(),
614 });
615 }
616 index += 1;
617 }
618 }
619 }
620 None
621}
622
623fn register_ask_waiters<Out>(
624 in_flight: &[InFlightAsk<Out>],
625 current: &Thread,
626) -> Vec<Arc<ReplyState<Out>>> {
627 let mut registered = Vec::with_capacity(in_flight.len());
628 for ask in in_flight {
629 ask.state().register_waiter(current.clone());
630 registered.push(Arc::clone(ask.state()));
631 }
632 registered
633}
634
635fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
636 for state in registered {
637 state.unregister_waiter(current_id);
638 }
639}
640
641fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
642 next_ask_deadline_remaining(in_flight, now)
643 .unwrap_or(ASK_MAX_PARK)
644 .min(ASK_MAX_PARK)
645}
646
647fn next_ask_deadline_remaining<Out>(
648 in_flight: &[InFlightAsk<Out>],
649 now: Instant,
650) -> Option<Duration> {
651 in_flight
652 .iter()
653 .filter_map(|ask| ask.deadline)
654 .map(|deadline| deadline.saturating_duration_since(now))
655 .min()
656}
657
658fn recycle_reply_state<Out>(
659 state: Arc<ReplyState<Out>>,
660 reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
661) {
662 if Arc::strong_count(&state) == 1 {
663 reply_pool.push(state);
664 }
665}
666
667fn panic_reason(panic: Box<dyn Any + Send>) -> String {
668 if let Some(reason) = panic.downcast_ref::<&str>() {
669 (*reason).to_owned()
670 } else if let Some(reason) = panic.downcast_ref::<String>() {
671 reason.clone()
672 } else {
673 "actor ask task panicked".to_owned()
674 }
675}
676
677pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
678 static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
679 std::sync::OnceLock::new();
680
681 match RUNTIME.get_or_init(|| {
682 tokio::runtime::Builder::new_multi_thread()
683 .thread_name("datum-ractor-runtime")
684 .enable_all()
685 .build()
686 .map_err(|error| format!("ractor runtime failed to start: {error}"))
687 }) {
688 Ok(runtime) => Ok(runtime),
689 Err(error) => Err(StreamError::Failed(error.clone())),
690 }
691}
692
693pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
694where
695 T: Send + 'static,
696 F: std::future::Future<Output = T> + Send + 'static,
697{
698 let runtime = ractor_runtime()?;
699 let result = if tokio::runtime::Handle::try_current().is_ok() {
700 std::thread::spawn(move || runtime.block_on(future))
701 .join()
702 .map_err(|_| StreamError::AbruptTermination)?
703 } else {
704 runtime.block_on(future)
705 };
706 Ok(result)
707}
708
709#[cfg(test)]
710fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
711where
712 T: Send + 'static,
713 F: std::future::Future<Output = T> + Send + 'static,
714{
715 block_on_ractor_runtime(future).expect("ractor ask runtime starts")
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721 use crate::stream::{Sink, Source, StreamCompletion};
722 use std::sync::{
723 Arc as StdArc,
724 atomic::{AtomicUsize, Ordering},
725 };
726
727 enum AskTestMessage {
728 Delayed {
729 input: u64,
730 delay: Duration,
731 reply_to: ReplyPort<u64>,
732 },
733 Track {
734 input: u64,
735 reply_to: ReplyPort<u64>,
736 },
737 NeverReply {
738 _reply_to: ReplyPort<u64>,
739 },
740 DropReply {
741 _reply_to: ReplyPort<u64>,
742 },
743 BindTcp {
744 reply_to: ReplyPort<u64>,
745 },
746 }
747
748 #[cfg(feature = "cluster")]
749 impl Message for AskTestMessage {}
750
751 struct AskTestActor;
752
753 struct AskTestState {
754 active: StdArc<AtomicUsize>,
755 max_active: StdArc<AtomicUsize>,
756 held_replies: Vec<ReplyPort<u64>>,
757 }
758
759 impl Actor for AskTestActor {
760 type Msg = AskTestMessage;
761 type State = AskTestState;
762 type Arguments = AskTestState;
763
764 async fn pre_start(
765 &self,
766 _myself: ActorRef<Self::Msg>,
767 args: Self::Arguments,
768 ) -> Result<Self::State, ActorProcessingErr> {
769 Ok(args)
770 }
771
772 async fn handle(
773 &self,
774 _myself: ActorRef<Self::Msg>,
775 message: Self::Msg,
776 state: &mut Self::State,
777 ) -> Result<(), ActorProcessingErr> {
778 match message {
779 AskTestMessage::Delayed {
780 input,
781 delay,
782 reply_to,
783 } => {
784 ractor::concurrency::spawn(async move {
785 ractor::concurrency::sleep(delay).await;
786 let _ = reply_to.send(input);
787 });
788 }
789 AskTestMessage::Track { input, reply_to } => {
790 let active = StdArc::clone(&state.active);
791 let max_active = StdArc::clone(&state.max_active);
792 let current = active.fetch_add(1, Ordering::SeqCst) + 1;
793 max_active.fetch_max(current, Ordering::SeqCst);
794 ractor::concurrency::spawn(async move {
795 ractor::concurrency::sleep(Duration::from_millis(20)).await;
796 active.fetch_sub(1, Ordering::SeqCst);
797 let _ = reply_to.send(input);
798 });
799 }
800 AskTestMessage::NeverReply { _reply_to } => {
801 state.held_replies.push(_reply_to);
802 }
803 AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
804 AskTestMessage::BindTcp { reply_to } => {
805 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
806 let port = listener.local_addr()?.port();
807 let _ = reply_to.send(u64::from(port));
808 }
809 }
810 Ok(())
811 }
812 }
813
814 fn wait<T>(completion: StreamCompletion<T>) -> T {
815 completion.wait().unwrap()
816 }
817
818 fn spawn_test_actor() -> (
819 ActorRef<AskTestMessage>,
820 ractor::concurrency::JoinHandle<()>,
821 StdArc<AtomicUsize>,
822 ) {
823 let active = StdArc::new(AtomicUsize::new(0));
824 let max_active = StdArc::new(AtomicUsize::new(0));
825 let state = AskTestState {
826 active,
827 max_active: StdArc::clone(&max_active),
828 held_replies: Vec::new(),
829 };
830 let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
831 Actor::spawn(None, AskTestActor, state)
832 .await
833 .expect("test actor spawns")
834 });
835 (actor_ref, handle, max_active)
836 }
837
838 fn stop_test_actor(
839 actor_ref: ActorRef<AskTestMessage>,
840 handle: ractor::concurrency::JoinHandle<()>,
841 ) {
842 actor_ref.stop(None);
843 block_on_ractor_ask_runtime(async move {
844 handle.await.expect("test actor task joins");
845 });
846 }
847
848 #[test]
849 fn actor_flow_ask_preserves_order_with_parallelism() {
850 let (actor_ref, handle, _) = spawn_test_actor();
851
852 let values = Source::from_iter(0_u64..5)
853 .via(ActorFlow::ask(
854 actor_ref.clone(),
855 5,
856 Duration::from_secs(1),
857 |input, reply_to| AskTestMessage::Delayed {
858 input,
859 delay: Duration::from_millis((5 - input) * 10),
860 reply_to,
861 },
862 ))
863 .run_with(Sink::collect())
864 .unwrap();
865
866 assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
867 stop_test_actor(actor_ref, handle);
868 }
869
870 #[test]
871 fn actor_flow_ask_respects_parallelism() {
872 let (actor_ref, handle, max_active) = spawn_test_actor();
873
874 let values = Source::from_iter(0_u64..8)
875 .via(ActorFlow::ask(
876 actor_ref.clone(),
877 3,
878 Duration::from_secs(1),
879 |input, reply_to| AskTestMessage::Track { input, reply_to },
880 ))
881 .run_with(Sink::collect())
882 .unwrap();
883
884 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
885 assert_eq!(max_active.load(Ordering::SeqCst), 3);
886 stop_test_actor(actor_ref, handle);
887 }
888
889 #[test]
890 fn actor_flow_ask_into_head_returns_value() {
891 let (actor_ref, handle, _) = spawn_test_actor();
901
902 let head = Source::single(7_u64)
903 .via(ActorFlow::ask(
904 actor_ref.clone(),
905 1,
906 Duration::from_secs(1),
907 |input, reply_to| AskTestMessage::Track { input, reply_to },
908 ))
909 .run_with(Sink::head())
910 .unwrap();
911
912 assert_eq!(wait(head), 7);
913
914 stop_test_actor(actor_ref, handle);
915 }
916
917 #[test]
918 fn actor_flow_ask_actor_handler_can_use_tokio_io() {
919 let (actor_ref, handle, _) = spawn_test_actor();
920
921 let ports = Source::single(0_u64)
922 .via(ActorFlow::ask(
923 actor_ref.clone(),
924 1,
925 Duration::from_secs(1),
926 |_input, reply_to| AskTestMessage::BindTcp { reply_to },
927 ))
928 .run_collect()
929 .unwrap();
930
931 assert_eq!(ports.len(), 1);
932 assert_ne!(ports[0], 0);
933 stop_test_actor(actor_ref, handle);
934 }
935
936 #[test]
937 fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
938 let (actor_ref, handle, _) = spawn_test_actor();
945
946 let values = Source::from_iter(0_u64..8)
947 .via(ActorFlow::ask(
948 actor_ref.clone(),
949 4,
950 Duration::from_secs(30),
951 |input, reply_to| AskTestMessage::Delayed {
952 input,
953 delay: Duration::from_millis(25),
954 reply_to,
955 },
956 ))
957 .run_with(Sink::collect())
958 .unwrap();
959
960 assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
961 stop_test_actor(actor_ref, handle);
962 }
963
964 #[test]
965 fn dropped_in_flight_ask_closes_reply_port() {
966 let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
967 let reply_to = ReplyPort::new(StdArc::clone(&state));
968 let ask = InFlightAsk {
969 index: 0,
970 state: Some(StdArc::clone(&state)),
971 deadline: None,
972 };
973
974 drop(ask);
975
976 assert!(reply_to.is_closed());
977 assert_eq!(reply_to.send(1), Err(ReplySendError));
978 }
979
980 #[test]
981 fn actor_flow_ask_times_out() {
982 let (actor_ref, handle, _) = spawn_test_actor();
983 let timeout = Duration::from_millis(10);
984
985 let result = Source::single(1_u64)
986 .via(ActorFlow::ask(
987 actor_ref.clone(),
988 1,
989 timeout,
990 |_input, reply_to| AskTestMessage::NeverReply {
991 _reply_to: reply_to,
992 },
993 ))
994 .run_collect();
995
996 assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
997 stop_test_actor(actor_ref, handle);
998 }
999
1000 #[test]
1001 fn actor_flow_ask_fails_when_actor_is_stopped() {
1002 let (actor_ref, handle, _) = spawn_test_actor();
1003 actor_ref.stop(None);
1004 block_on_ractor_ask_runtime(async move {
1005 handle.await.expect("test actor task joins");
1006 });
1007
1008 let result = Source::single(1_u64)
1009 .via(ActorFlow::ask(
1010 actor_ref,
1011 1,
1012 Duration::from_secs(1),
1013 |input, reply_to| AskTestMessage::Delayed {
1014 input,
1015 delay: Duration::ZERO,
1016 reply_to,
1017 },
1018 ))
1019 .run_collect();
1020
1021 assert_eq!(result, Err(StreamError::ActorTerminated));
1022 }
1023
1024 #[test]
1025 fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1026 let (actor_ref, handle, _) = spawn_test_actor();
1027
1028 let result = Source::single(1_u64)
1029 .via(ActorFlow::ask(
1030 actor_ref.clone(),
1031 1,
1032 Duration::from_secs(1),
1033 |_input, reply_to| AskTestMessage::DropReply {
1034 _reply_to: reply_to,
1035 },
1036 ))
1037 .run_collect();
1038
1039 assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1040 stop_test_actor(actor_ref, handle);
1041 }
1042
1043 #[test]
1044 fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1045 let (actor_ref, handle, _) = spawn_test_actor();
1046
1047 let result = Source::single(1_u64)
1048 .via(ActorFlow::ask(
1049 actor_ref.clone(),
1050 1,
1051 Duration::from_secs(1),
1052 |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1053 panic!("message builder failed");
1054 },
1055 ))
1056 .run_collect();
1057
1058 assert_eq!(
1059 result,
1060 Err(StreamError::ActorAskTaskFailed {
1061 reason: "message builder failed".to_owned()
1062 })
1063 );
1064 stop_test_actor(actor_ref, handle);
1065 }
1066}