Skip to main content

datum/
actor.rs

1//! Actor interop APIs backed by Ractor for local execution.
2
3use std::{
4    any::Any,
5    error::Error,
6    fmt,
7    panic::{AssertUnwindSafe, catch_unwind},
8    sync::{
9        Arc, Mutex, MutexGuard,
10        atomic::{AtomicBool, AtomicU8, Ordering},
11    },
12    thread::{self, Thread, ThreadId},
13    time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21pub use interop::{
22    ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
23    ActorSourceMessage, ActorStatus, WatchEvent,
24};
25
26pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
27
28const ASK_READY_SPINS: usize = 256;
29/// Bounded number of `yield_now()` calls between the spin window and parking.
30/// This is deliberately deadline-independent: parking is gated only by this
31/// fixed transition budget, and the *park duration* is what respects the
32/// deadline (see [`next_ask_park`]). A fast reply is caught during the spin or
33/// yield window; a genuinely slow reply (or a long, idle timeout) parks and is
34/// woken by `ReplyState::send`/`drop_sender`/`close_on_timeout` rather than
35/// burning a core busy-yielding until the deadline.
36const ASK_IDLE_YIELDS: usize = 64;
37const ASK_MAX_PARK: Duration = Duration::from_millis(1);
38const ASK_TIME_REFRESH_ITERS: u32 = 64;
39
40// Fast-path reply gate values. `poll()` reads this atomically and only takes
41// the slot mutex once a reply (or drop) is actually present, so the busy-spin
42// consumer does not contend with the replying actor on every spin iteration.
43const REPLY_PENDING: u8 = 0;
44const REPLY_READY: u8 = 1;
45const REPLY_DROPPED: u8 = 2;
46
47/// Collection of actor-aware stream flows.
48pub struct ActorFlow;
49
50impl ActorFlow {
51    /// Sends each stream element to an actor using a request/reply message.
52    ///
53    /// The returned flow preserves input order while allowing up to `parallelism`
54    /// requests in flight. It fails the stream if a request times out, the actor
55    /// cannot be sent to, or the reply port is dropped before a response arrives.
56    #[must_use]
57    pub fn ask<In, Msg, Out, F>(
58        actor_ref: ActorRef<Msg>,
59        parallelism: usize,
60        timeout: Duration,
61        make_msg: F,
62    ) -> Flow<In, Out, NotUsed>
63    where
64        In: Send + 'static,
65        Msg: Message,
66        Out: Send + 'static,
67        F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
68    {
69        ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
70    }
71}
72
73/// One-shot reply handle supplied to actor ask messages.
74///
75/// This wrapper keeps Datum's stream-facing API from exposing Ractor's concrete
76/// RPC port type. The current implementation is local/Ractor-backed, and the
77/// wrapper leaves room for a later transport-specific reply implementation.
78pub struct ReplyPort<T> {
79    inner: Arc<ReplyState<T>>,
80    active: bool,
81}
82
83#[derive(Debug)]
84struct ReplyState<T> {
85    timeout: Duration,
86    receiver_closed: AtomicBool,
87    gate: AtomicU8,
88    slot: Mutex<ReplySlotState<T>>,
89}
90
91#[derive(Debug)]
92struct ReplySlotState<T> {
93    value: ReplySlot<T>,
94    waiter: Option<Thread>,
95}
96
97#[derive(Debug)]
98enum ReplySlot<T> {
99    Pending,
100    Ready(T),
101    Dropped,
102}
103
104enum ReplyPoll<T> {
105    Pending,
106    Ready(T),
107    Dropped,
108}
109
110impl<T> fmt::Debug for ReplyPort<T> {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("ReplyPort")
113            .field("timeout", &self.timeout())
114            .field("closed", &self.is_closed())
115            .finish_non_exhaustive()
116    }
117}
118
119/// Error returned when an actor tries to answer an ask whose receiver is gone.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub struct ReplySendError;
122
123impl fmt::Display for ReplySendError {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.write_str("actor reply receiver dropped")
126    }
127}
128
129impl Error for ReplySendError {}
130
131impl<T> ReplyPort<T> {
132    fn new(inner: Arc<ReplyState<T>>) -> Self {
133        Self {
134            inner,
135            active: true,
136        }
137    }
138
139    #[must_use]
140    pub fn timeout(&self) -> Option<Duration> {
141        Some(self.inner.timeout)
142    }
143
144    #[must_use]
145    pub fn is_closed(&self) -> bool {
146        self.inner.is_closed()
147    }
148
149    pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
150        self.active = false;
151        self.inner.send(reply)
152    }
153}
154
155impl<T> Drop for ReplyPort<T> {
156    fn drop(&mut self) {
157        if self.active {
158            self.inner.drop_sender();
159        }
160    }
161}
162
163impl<T> ReplyState<T> {
164    fn new(timeout: Duration) -> Self {
165        Self {
166            timeout,
167            receiver_closed: AtomicBool::new(false),
168            gate: AtomicU8::new(REPLY_PENDING),
169            slot: Mutex::new(ReplySlotState {
170                value: ReplySlot::Pending,
171                waiter: None,
172            }),
173        }
174    }
175
176    /// Locks the reply slot, recovering the guard if a previous holder panicked
177    /// while holding it (e.g. a user reply value whose `Drop` panicked). The
178    /// slot is a simple three-state enum with no cross-field invariants, so
179    /// recovering from poison cannot produce unsound or wrong-result behavior
180    /// and avoids escalating one localized panic into a stream-wide crash.
181    fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
182        self.slot
183            .lock()
184            .unwrap_or_else(|poison| poison.into_inner())
185    }
186
187    fn reset(&self, timeout: Duration) {
188        self.receiver_closed.store(false, Ordering::Release);
189        self.gate.store(REPLY_PENDING, Ordering::Release);
190        let mut slot = self.lock_slot();
191        slot.value = ReplySlot::Pending;
192        slot.waiter = None;
193        debug_assert_eq!(self.timeout, timeout);
194    }
195
196    fn is_closed(&self) -> bool {
197        self.receiver_closed.load(Ordering::Acquire)
198    }
199
200    fn send(&self, reply: T) -> Result<(), ReplySendError> {
201        if self.is_closed() {
202            return Err(ReplySendError);
203        }
204
205        let mut slot = self.lock_slot();
206        if self.is_closed() {
207            return Err(ReplySendError);
208        }
209
210        if !matches!(slot.value, ReplySlot::Pending) {
211            return Err(ReplySendError);
212        }
213
214        slot.value = ReplySlot::Ready(reply);
215        let waiter = slot.waiter.clone();
216        // Publish AFTER the value write; the consumer's Acquire load of the
217        // gate synchronizes-with this store.
218        self.gate.store(REPLY_READY, Ordering::Release);
219        drop(slot);
220        wake_waiter(waiter);
221        Ok(())
222    }
223
224    fn poll(&self) -> ReplyPoll<T> {
225        if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
226            // Lock-free fast path: nothing has been delivered yet.
227            return ReplyPoll::Pending;
228        }
229        self.take_locked()
230    }
231
232    /// Closes the receiver under the slot lock so a concurrent `send()` observes
233    /// the closure on its under-lock recheck and reports `ReplySendError`
234    /// instead of disagreeing with a reported timeout. If a reply (or drop)
235    /// raced in just before the deadline, it is delivered rather than lost.
236    fn close_on_timeout(&self) -> ReplyPoll<T> {
237        let mut slot = self.lock_slot();
238        self.receiver_closed.store(true, Ordering::Release);
239        let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
240            ReplySlot::Pending => ReplyPoll::Pending,
241            ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
242            ReplySlot::Dropped => ReplyPoll::Dropped,
243        };
244        let waiter = slot.waiter.take();
245        drop(slot);
246        wake_waiter(waiter);
247        outcome
248    }
249
250    fn take_locked(&self) -> ReplyPoll<T> {
251        let mut slot = self.lock_slot();
252        slot.waiter = None;
253        match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
254            ReplySlot::Pending => ReplyPoll::Pending,
255            ReplySlot::Ready(reply) => {
256                self.close_receiver();
257                ReplyPoll::Ready(reply)
258            }
259            ReplySlot::Dropped => {
260                self.close_receiver();
261                ReplyPoll::Dropped
262            }
263        }
264    }
265
266    fn drop_sender(&self) {
267        let mut slot = self.lock_slot();
268        if matches!(slot.value, ReplySlot::Pending) {
269            slot.value = ReplySlot::Dropped;
270            let waiter = slot.waiter.clone();
271            self.gate.store(REPLY_DROPPED, Ordering::Release);
272            drop(slot);
273            wake_waiter(waiter);
274        }
275    }
276
277    fn close_receiver(&self) {
278        self.receiver_closed.store(true, Ordering::Release);
279    }
280
281    fn register_waiter(&self, waiter: Thread) {
282        self.lock_slot().waiter = Some(waiter);
283    }
284
285    fn unregister_waiter(&self, waiter_id: ThreadId) {
286        let mut slot = self.lock_slot();
287        if slot
288            .waiter
289            .as_ref()
290            .is_some_and(|thread| thread.id() == waiter_id)
291        {
292            slot.waiter = None;
293        }
294    }
295}
296
297fn wake_waiter(waiter: Option<Thread>) {
298    if let Some(waiter) = waiter {
299        waiter.unpark();
300    }
301}
302
303fn ask_flow<In, Msg, Out, F>(
304    actor_ref: ActorRef<Msg>,
305    parallelism: usize,
306    timeout: Duration,
307    make_msg: Arc<F>,
308) -> Flow<In, Out>
309where
310    In: Send + 'static,
311    Msg: Message,
312    Out: Send + 'static,
313    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
314{
315    assert!(
316        parallelism > 0,
317        "ActorFlow::ask parallelism must be greater than zero"
318    );
319    // Deliberately `from_transform`, not `from_preserving_transform`: an ask must
320    // not inherit the inline head-terminal hint. If it did, a bounded eager source
321    // feeding `Sink::head` would run the ask's blocking cross-thread reply wait
322    // inline on the caller during materialization, which breaks the non-blocking /
323    // awaitable contract of `StreamCompletion` and can deadlock if
324    // `run_with_materializer()` is `.await`ed on a runtime thread the actor needs
325    // to make progress. Ask graphs always run on a worker.
326    Flow::from_transform(move |input| {
327        ask_ractor_ordered(
328            input,
329            actor_ref.clone(),
330            parallelism,
331            timeout,
332            Arc::clone(&make_msg),
333        )
334    })
335}
336
337fn ask_ractor_ordered<In, Msg, Out, F>(
338    mut input: BoxStream<In>,
339    actor_ref: ActorRef<Msg>,
340    parallelism: usize,
341    timeout: Duration,
342    make_msg: Arc<F>,
343) -> BoxStream<Out>
344where
345    In: Send + 'static,
346    Msg: Message,
347    Out: Send + 'static,
348    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
349{
350    let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
351    let mut next_index = 0_usize;
352    let mut next_to_emit = 0_usize;
353    let mut completed = Vec::with_capacity(parallelism);
354    let mut reply_pool = Vec::with_capacity(parallelism);
355    let mut input_done = false;
356
357    Box::new(std::iter::from_fn(move || {
358        loop {
359            if let Some(result) = take_completed(&mut completed, next_to_emit) {
360                next_to_emit += 1;
361                return Some(result);
362            }
363
364            while in_flight.len() < parallelism && !input_done {
365                match input.next() {
366                    Some(Ok(item)) => {
367                        let index = next_index;
368                        next_index += 1;
369                        match start_ractor_ask(
370                            index,
371                            actor_ref.clone(),
372                            timeout,
373                            item,
374                            Arc::clone(&make_msg),
375                            &mut reply_pool,
376                        ) {
377                            Ok(ask) => in_flight.push(ask),
378                            Err(error) => {
379                                completed.push((index, Err(error)));
380                                input_done = true;
381                            }
382                        }
383                    }
384                    Some(Err(error)) => {
385                        completed.push((next_index, Err(error)));
386                        next_index += 1;
387                        input_done = true;
388                    }
389                    None => input_done = true,
390                }
391            }
392
393            if let Some(result) = take_completed(&mut completed, next_to_emit) {
394                next_to_emit += 1;
395                return Some(result);
396            }
397
398            if in_flight.is_empty() {
399                return None;
400            }
401
402            let ask = wait_for_ready_ask(&mut in_flight, timeout);
403            let index = ask.index;
404            let result = ask.result;
405            recycle_reply_state(ask.state, &mut reply_pool);
406            if index == next_to_emit {
407                next_to_emit += 1;
408                return Some(result);
409            }
410            completed.push((index, result));
411        }
412    }))
413}
414
415fn take_completed<Out>(
416    completed: &mut Vec<(usize, StreamResult<Out>)>,
417    index: usize,
418) -> Option<StreamResult<Out>> {
419    let position = completed
420        .iter()
421        .position(|(completed_index, _)| *completed_index == index)?;
422    Some(completed.swap_remove(position).1)
423}
424
425struct InFlightAsk<Out> {
426    index: usize,
427    state: Option<Arc<ReplyState<Out>>>,
428    /// `None` when `Instant::now() + timeout` would overflow; treated as "never
429    /// time out" rather than panicking on the add.
430    deadline: Option<Instant>,
431}
432
433impl<Out> InFlightAsk<Out> {
434    fn state(&self) -> &Arc<ReplyState<Out>> {
435        self.state.as_ref().expect("in-flight ask has reply state")
436    }
437
438    fn into_state(mut self) -> Arc<ReplyState<Out>> {
439        self.state.take().expect("in-flight ask has reply state")
440    }
441}
442
443impl<Out> Drop for InFlightAsk<Out> {
444    fn drop(&mut self) {
445        if let Some(state) = &self.state {
446            state.close_receiver();
447        }
448    }
449}
450
451struct CompletedAsk<Out> {
452    index: usize,
453    result: StreamResult<Out>,
454    state: Arc<ReplyState<Out>>,
455}
456
457fn start_ractor_ask<In, Msg, Out, F>(
458    index: usize,
459    actor_ref: ActorRef<Msg>,
460    timeout: Duration,
461    input: In,
462    make_msg: Arc<F>,
463    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
464) -> StreamResult<InFlightAsk<Out>>
465where
466    In: Send + 'static,
467    Msg: Message,
468    Out: Send + 'static,
469    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
470{
471    let reply_state = match reply_pool.pop() {
472        // A freshly allocated state is already Pending/open, so only recycled
473        // states need a reset.
474        Some(state) => {
475            state.reset(timeout);
476            state
477        }
478        None => Arc::new(ReplyState::new(timeout)),
479    };
480    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
481    let message =
482        catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
483            StreamError::ActorAskTaskFailed {
484                reason: panic_reason(panic),
485            }
486        })?;
487
488    match actor_ref.cast(message) {
489        Ok(()) => {}
490        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
491            return Err(StreamError::ActorTerminated);
492        }
493        Err(error) => {
494            return Err(StreamError::ActorAskSendFailed {
495                reason: error.to_string(),
496            });
497        }
498    }
499
500    Ok(InFlightAsk {
501        index,
502        state: Some(reply_state),
503        deadline: Instant::now().checked_add(timeout),
504    })
505}
506
507fn wait_for_ready_ask<Out>(
508    in_flight: &mut Vec<InFlightAsk<Out>>,
509    timeout: Duration,
510) -> CompletedAsk<Out> {
511    let mut idle_spins = 0;
512    let mut idle_yields = 0;
513    let mut time_refresh = 0_u32;
514    let mut now = Instant::now();
515    loop {
516        if time_refresh == 0 {
517            now = Instant::now();
518        }
519        time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
520        if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
521            return ask;
522        }
523
524        if idle_spins < ASK_READY_SPINS {
525            idle_spins += 1;
526            std::hint::spin_loop();
527        } else if idle_yields < ASK_IDLE_YIELDS {
528            idle_yields += 1;
529            time_refresh = 0;
530            thread::yield_now();
531        } else {
532            idle_spins = 0;
533            idle_yields = 0;
534            time_refresh = 0;
535            let current = thread::current();
536            let registered = register_ask_waiters(in_flight, &current);
537            now = Instant::now();
538            if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
539                unregister_ask_waiters(registered, current.id());
540                return ask;
541            }
542            thread::park_timeout(next_ask_park(in_flight, now));
543            unregister_ask_waiters(registered, current.id());
544        }
545    }
546}
547
548fn take_ready_ask<Out>(
549    in_flight: &mut Vec<InFlightAsk<Out>>,
550    timeout: Duration,
551    now: Instant,
552) -> Option<CompletedAsk<Out>> {
553    let mut index = 0;
554    while index < in_flight.len() {
555        match in_flight[index].state().poll() {
556            ReplyPoll::Ready(reply) => {
557                let ask = in_flight.swap_remove(index);
558                return Some(CompletedAsk {
559                    index: ask.index,
560                    result: Ok(reply),
561                    state: ask.into_state(),
562                });
563            }
564            ReplyPoll::Dropped => {
565                let ask = in_flight.swap_remove(index);
566                return Some(CompletedAsk {
567                    index: ask.index,
568                    result: Err(StreamError::ActorAskResponseDropped),
569                    state: ask.into_state(),
570                });
571            }
572            ReplyPoll::Pending => {
573                if in_flight[index]
574                    .deadline
575                    .is_some_and(|deadline| now >= deadline)
576                {
577                    // Close under the slot lock; deliver a reply that raced in
578                    // just before the deadline rather than losing it.
579                    let outcome = in_flight[index].state().close_on_timeout();
580                    let ask = in_flight.swap_remove(index);
581                    let result = match outcome {
582                        ReplyPoll::Ready(reply) => Ok(reply),
583                        ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
584                        ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
585                    };
586                    return Some(CompletedAsk {
587                        index: ask.index,
588                        result,
589                        state: ask.into_state(),
590                    });
591                }
592                index += 1;
593            }
594        }
595    }
596    None
597}
598
599fn register_ask_waiters<Out>(
600    in_flight: &[InFlightAsk<Out>],
601    current: &Thread,
602) -> Vec<Arc<ReplyState<Out>>> {
603    let mut registered = Vec::with_capacity(in_flight.len());
604    for ask in in_flight {
605        ask.state().register_waiter(current.clone());
606        registered.push(Arc::clone(ask.state()));
607    }
608    registered
609}
610
611fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
612    for state in registered {
613        state.unregister_waiter(current_id);
614    }
615}
616
617fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
618    next_ask_deadline_remaining(in_flight, now)
619        .unwrap_or(ASK_MAX_PARK)
620        .min(ASK_MAX_PARK)
621}
622
623fn next_ask_deadline_remaining<Out>(
624    in_flight: &[InFlightAsk<Out>],
625    now: Instant,
626) -> Option<Duration> {
627    in_flight
628        .iter()
629        .filter_map(|ask| ask.deadline)
630        .map(|deadline| deadline.saturating_duration_since(now))
631        .min()
632}
633
634fn recycle_reply_state<Out>(
635    state: Arc<ReplyState<Out>>,
636    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
637) {
638    if Arc::strong_count(&state) == 1 {
639        reply_pool.push(state);
640    }
641}
642
643fn panic_reason(panic: Box<dyn Any + Send>) -> String {
644    if let Some(reason) = panic.downcast_ref::<&str>() {
645        (*reason).to_owned()
646    } else if let Some(reason) = panic.downcast_ref::<String>() {
647        reason.clone()
648    } else {
649        "actor ask task panicked".to_owned()
650    }
651}
652
653pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
654    static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
655        std::sync::OnceLock::new();
656
657    match RUNTIME.get_or_init(|| {
658        tokio::runtime::Builder::new_multi_thread()
659            .thread_name("datum-ractor-runtime")
660            .enable_all()
661            .build()
662            .map_err(|error| format!("ractor runtime failed to start: {error}"))
663    }) {
664        Ok(runtime) => Ok(runtime),
665        Err(error) => Err(StreamError::Failed(error.clone())),
666    }
667}
668
669pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
670where
671    T: Send + 'static,
672    F: std::future::Future<Output = T> + Send + 'static,
673{
674    let runtime = ractor_runtime()?;
675    let result = if tokio::runtime::Handle::try_current().is_ok() {
676        std::thread::spawn(move || runtime.block_on(future))
677            .join()
678            .map_err(|_| StreamError::AbruptTermination)?
679    } else {
680        runtime.block_on(future)
681    };
682    Ok(result)
683}
684
685#[cfg(test)]
686fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
687where
688    T: Send + 'static,
689    F: std::future::Future<Output = T> + Send + 'static,
690{
691    block_on_ractor_runtime(future).expect("ractor ask runtime starts")
692}
693
694#[cfg(test)]
695mod tests {
696    use super::*;
697    use crate::stream::{Sink, Source, StreamCompletion};
698    use std::sync::{
699        Arc as StdArc,
700        atomic::{AtomicUsize, Ordering},
701    };
702
703    enum AskTestMessage {
704        Delayed {
705            input: u64,
706            delay: Duration,
707            reply_to: ReplyPort<u64>,
708        },
709        Track {
710            input: u64,
711            reply_to: ReplyPort<u64>,
712        },
713        NeverReply {
714            _reply_to: ReplyPort<u64>,
715        },
716        DropReply {
717            _reply_to: ReplyPort<u64>,
718        },
719        BindTcp {
720            reply_to: ReplyPort<u64>,
721        },
722    }
723
724    #[cfg(feature = "cluster")]
725    impl Message for AskTestMessage {}
726
727    struct AskTestActor;
728
729    struct AskTestState {
730        active: StdArc<AtomicUsize>,
731        max_active: StdArc<AtomicUsize>,
732        held_replies: Vec<ReplyPort<u64>>,
733    }
734
735    impl Actor for AskTestActor {
736        type Msg = AskTestMessage;
737        type State = AskTestState;
738        type Arguments = AskTestState;
739
740        async fn pre_start(
741            &self,
742            _myself: ActorRef<Self::Msg>,
743            args: Self::Arguments,
744        ) -> Result<Self::State, ActorProcessingErr> {
745            Ok(args)
746        }
747
748        async fn handle(
749            &self,
750            _myself: ActorRef<Self::Msg>,
751            message: Self::Msg,
752            state: &mut Self::State,
753        ) -> Result<(), ActorProcessingErr> {
754            match message {
755                AskTestMessage::Delayed {
756                    input,
757                    delay,
758                    reply_to,
759                } => {
760                    ractor::concurrency::spawn(async move {
761                        ractor::concurrency::sleep(delay).await;
762                        let _ = reply_to.send(input);
763                    });
764                }
765                AskTestMessage::Track { input, reply_to } => {
766                    let active = StdArc::clone(&state.active);
767                    let max_active = StdArc::clone(&state.max_active);
768                    let current = active.fetch_add(1, Ordering::SeqCst) + 1;
769                    max_active.fetch_max(current, Ordering::SeqCst);
770                    ractor::concurrency::spawn(async move {
771                        ractor::concurrency::sleep(Duration::from_millis(20)).await;
772                        active.fetch_sub(1, Ordering::SeqCst);
773                        let _ = reply_to.send(input);
774                    });
775                }
776                AskTestMessage::NeverReply { _reply_to } => {
777                    state.held_replies.push(_reply_to);
778                }
779                AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
780                AskTestMessage::BindTcp { reply_to } => {
781                    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
782                    let port = listener.local_addr()?.port();
783                    let _ = reply_to.send(u64::from(port));
784                }
785            }
786            Ok(())
787        }
788    }
789
790    fn wait<T>(completion: StreamCompletion<T>) -> T {
791        completion.wait().unwrap()
792    }
793
794    fn spawn_test_actor() -> (
795        ActorRef<AskTestMessage>,
796        ractor::concurrency::JoinHandle<()>,
797        StdArc<AtomicUsize>,
798    ) {
799        let active = StdArc::new(AtomicUsize::new(0));
800        let max_active = StdArc::new(AtomicUsize::new(0));
801        let state = AskTestState {
802            active,
803            max_active: StdArc::clone(&max_active),
804            held_replies: Vec::new(),
805        };
806        let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
807            Actor::spawn(None, AskTestActor, state)
808                .await
809                .expect("test actor spawns")
810        });
811        (actor_ref, handle, max_active)
812    }
813
814    fn stop_test_actor(
815        actor_ref: ActorRef<AskTestMessage>,
816        handle: ractor::concurrency::JoinHandle<()>,
817    ) {
818        actor_ref.stop(None);
819        block_on_ractor_ask_runtime(async move {
820            handle.await.expect("test actor task joins");
821        });
822    }
823
824    #[test]
825    fn actor_flow_ask_preserves_order_with_parallelism() {
826        let (actor_ref, handle, _) = spawn_test_actor();
827
828        let values = Source::from_iter(0_u64..5)
829            .via(ActorFlow::ask(
830                actor_ref.clone(),
831                5,
832                Duration::from_secs(1),
833                |input, reply_to| AskTestMessage::Delayed {
834                    input,
835                    delay: Duration::from_millis((5 - input) * 10),
836                    reply_to,
837                },
838            ))
839            .run_with(Sink::collect())
840            .unwrap();
841
842        assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
843        stop_test_actor(actor_ref, handle);
844    }
845
846    #[test]
847    fn actor_flow_ask_respects_parallelism() {
848        let (actor_ref, handle, max_active) = spawn_test_actor();
849
850        let values = Source::from_iter(0_u64..8)
851            .via(ActorFlow::ask(
852                actor_ref.clone(),
853                3,
854                Duration::from_secs(1),
855                |input, reply_to| AskTestMessage::Track { input, reply_to },
856            ))
857            .run_with(Sink::collect())
858            .unwrap();
859
860        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
861        assert_eq!(max_active.load(Ordering::SeqCst), 3);
862        stop_test_actor(actor_ref, handle);
863    }
864
865    #[test]
866    fn actor_flow_ask_into_head_returns_value() {
867        // `ActorFlow::ask` does *not* preserve the inline head-terminal hint, so a
868        // bounded eager source feeding `Sink::head` through an ask runs the ask
869        // graph (including the cross-thread reply wait) on a runtime worker rather
870        // than inline on the caller. That keeps `run_with_materializer()`
871        // non-blocking/awaitable: it returns a `StreamCompletion` backed by a
872        // oneshot that resolves once the worker drains the ask round-trip. This
873        // guards that the ask -> head graph still returns the correct first reply
874        // end to end. (The reply is delayed: `Track` sleeps 20 ms on the actor
875        // runtime before replying.)
876        let (actor_ref, handle, _) = spawn_test_actor();
877
878        let head = Source::single(7_u64)
879            .via(ActorFlow::ask(
880                actor_ref.clone(),
881                1,
882                Duration::from_secs(1),
883                |input, reply_to| AskTestMessage::Track { input, reply_to },
884            ))
885            .run_with(Sink::head())
886            .unwrap();
887
888        assert_eq!(wait(head), 7);
889
890        stop_test_actor(actor_ref, handle);
891    }
892
893    #[test]
894    fn actor_flow_ask_actor_handler_can_use_tokio_io() {
895        let (actor_ref, handle, _) = spawn_test_actor();
896
897        let ports = Source::single(0_u64)
898            .via(ActorFlow::ask(
899                actor_ref.clone(),
900                1,
901                Duration::from_secs(1),
902                |_input, reply_to| AskTestMessage::BindTcp { reply_to },
903            ))
904            .run_collect()
905            .unwrap();
906
907        assert_eq!(ports.len(), 1);
908        assert_ne!(ports[0], 0);
909        stop_test_actor(actor_ref, handle);
910    }
911
912    #[test]
913    fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
914        // Replies are delayed well past the spin+yield window, so each wait must
915        // reach the park branch and be woken by `ReplyState::send` rather than
916        // busy-yielding until the (long) deadline. Parallelism > 1 also exercises
917        // registering/unregistering several in-flight waiters on one parked
918        // consumer. This is the regression guard for the long-deadline ask path
919        // that previously yielded indefinitely (never parked).
920        let (actor_ref, handle, _) = spawn_test_actor();
921
922        let values = Source::from_iter(0_u64..8)
923            .via(ActorFlow::ask(
924                actor_ref.clone(),
925                4,
926                Duration::from_secs(30),
927                |input, reply_to| AskTestMessage::Delayed {
928                    input,
929                    delay: Duration::from_millis(25),
930                    reply_to,
931                },
932            ))
933            .run_with(Sink::collect())
934            .unwrap();
935
936        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
937        stop_test_actor(actor_ref, handle);
938    }
939
940    #[test]
941    fn dropped_in_flight_ask_closes_reply_port() {
942        let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
943        let reply_to = ReplyPort::new(StdArc::clone(&state));
944        let ask = InFlightAsk {
945            index: 0,
946            state: Some(StdArc::clone(&state)),
947            deadline: None,
948        };
949
950        drop(ask);
951
952        assert!(reply_to.is_closed());
953        assert_eq!(reply_to.send(1), Err(ReplySendError));
954    }
955
956    #[test]
957    fn actor_flow_ask_times_out() {
958        let (actor_ref, handle, _) = spawn_test_actor();
959        let timeout = Duration::from_millis(10);
960
961        let result = Source::single(1_u64)
962            .via(ActorFlow::ask(
963                actor_ref.clone(),
964                1,
965                timeout,
966                |_input, reply_to| AskTestMessage::NeverReply {
967                    _reply_to: reply_to,
968                },
969            ))
970            .run_collect();
971
972        assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
973        stop_test_actor(actor_ref, handle);
974    }
975
976    #[test]
977    fn actor_flow_ask_fails_when_actor_is_stopped() {
978        let (actor_ref, handle, _) = spawn_test_actor();
979        actor_ref.stop(None);
980        block_on_ractor_ask_runtime(async move {
981            handle.await.expect("test actor task joins");
982        });
983
984        let result = Source::single(1_u64)
985            .via(ActorFlow::ask(
986                actor_ref,
987                1,
988                Duration::from_secs(1),
989                |input, reply_to| AskTestMessage::Delayed {
990                    input,
991                    delay: Duration::ZERO,
992                    reply_to,
993                },
994            ))
995            .run_collect();
996
997        assert_eq!(result, Err(StreamError::ActorTerminated));
998    }
999
1000    #[test]
1001    fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1002        let (actor_ref, handle, _) = spawn_test_actor();
1003
1004        let result = Source::single(1_u64)
1005            .via(ActorFlow::ask(
1006                actor_ref.clone(),
1007                1,
1008                Duration::from_secs(1),
1009                |_input, reply_to| AskTestMessage::DropReply {
1010                    _reply_to: reply_to,
1011                },
1012            ))
1013            .run_collect();
1014
1015        assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1016        stop_test_actor(actor_ref, handle);
1017    }
1018
1019    #[test]
1020    fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1021        let (actor_ref, handle, _) = spawn_test_actor();
1022
1023        let result = Source::single(1_u64)
1024            .via(ActorFlow::ask(
1025                actor_ref.clone(),
1026                1,
1027                Duration::from_secs(1),
1028                |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1029                    panic!("message builder failed");
1030                },
1031            ))
1032            .run_collect();
1033
1034        assert_eq!(
1035            result,
1036            Err(StreamError::ActorAskTaskFailed {
1037                reason: "message builder failed".to_owned()
1038            })
1039        );
1040        stop_test_actor(actor_ref, handle);
1041    }
1042}