Skip to main content

datum/
actor.rs

1//! Actor interop APIs backed by Ractor for local execution.
2
3use std::{
4    any::Any,
5    error::Error,
6    fmt,
7    panic::{AssertUnwindSafe, catch_unwind},
8    sync::{
9        Arc, Mutex, MutexGuard,
10        atomic::{AtomicBool, AtomicU8, Ordering},
11    },
12    thread::{self, Thread, ThreadId},
13    time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21mod stream_ref;
22pub use interop::{
23    ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
24    ActorSourceMessage, ActorStatus, WatchEvent,
25};
26pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
27
28pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
29
30const ASK_READY_SPINS: usize = 256;
31/// Bounded number of `yield_now()` calls between the spin window and parking.
32/// This is deliberately deadline-independent: parking is gated only by this
33/// fixed transition budget, and the *park duration* is what respects the
34/// deadline (see [`next_ask_park`]). A fast reply is caught during the spin or
35/// yield window; a genuinely slow reply (or a long, idle timeout) parks and is
36/// woken by `ReplyState::send`/`drop_sender`/`close_on_timeout` rather than
37/// burning a core busy-yielding until the deadline.
38const ASK_IDLE_YIELDS: usize = 64;
39const ASK_MAX_PARK: Duration = Duration::from_millis(1);
40const ASK_TIME_REFRESH_ITERS: u32 = 64;
41
42// Fast-path reply gate values. `poll()` reads this atomically and only takes
43// the slot mutex once a reply (or drop) is actually present, so the busy-spin
44// consumer does not contend with the replying actor on every spin iteration.
45const REPLY_PENDING: u8 = 0;
46const REPLY_READY: u8 = 1;
47const REPLY_DROPPED: u8 = 2;
48
49/// Collection of actor-aware stream flows.
50pub struct ActorFlow;
51
52impl ActorFlow {
53    /// Sends each stream element to an actor using a request/reply message.
54    ///
55    /// The returned flow preserves input order while allowing up to `parallelism`
56    /// requests in flight. It fails the stream if a request times out, the actor
57    /// cannot be sent to, or the reply port is dropped before a response arrives.
58    #[must_use]
59    pub fn ask<In, Msg, Out, F>(
60        actor_ref: ActorRef<Msg>,
61        parallelism: usize,
62        timeout: Duration,
63        make_msg: F,
64    ) -> Flow<In, Out, NotUsed>
65    where
66        In: Send + 'static,
67        Msg: Message,
68        Out: Send + 'static,
69        F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
70    {
71        ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
72    }
73}
74
75/// One-shot reply handle supplied to actor ask messages.
76///
77/// This wrapper keeps Datum's stream-facing API from exposing Ractor's concrete
78/// RPC port type. The current implementation is local/Ractor-backed, and the
79/// wrapper leaves room for a later transport-specific reply implementation.
80pub struct ReplyPort<T> {
81    inner: Arc<ReplyState<T>>,
82    active: bool,
83}
84
85#[derive(Debug)]
86struct ReplyState<T> {
87    timeout: Duration,
88    receiver_closed: AtomicBool,
89    gate: AtomicU8,
90    slot: Mutex<ReplySlotState<T>>,
91}
92
93#[derive(Debug)]
94struct ReplySlotState<T> {
95    value: ReplySlot<T>,
96    waiter: Option<Thread>,
97}
98
99#[derive(Debug)]
100enum ReplySlot<T> {
101    Pending,
102    Ready(T),
103    Dropped,
104}
105
106enum ReplyPoll<T> {
107    Pending,
108    Ready(T),
109    Dropped,
110}
111
112impl<T> fmt::Debug for ReplyPort<T> {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("ReplyPort")
115            .field("timeout", &self.timeout())
116            .field("closed", &self.is_closed())
117            .finish_non_exhaustive()
118    }
119}
120
121/// Error returned when an actor tries to answer an ask whose receiver is gone.
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub struct ReplySendError;
124
125impl fmt::Display for ReplySendError {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.write_str("actor reply receiver dropped")
128    }
129}
130
131impl Error for ReplySendError {}
132
133impl<T> ReplyPort<T> {
134    fn new(inner: Arc<ReplyState<T>>) -> Self {
135        Self {
136            inner,
137            active: true,
138        }
139    }
140
141    #[must_use]
142    pub fn timeout(&self) -> Option<Duration> {
143        Some(self.inner.timeout)
144    }
145
146    #[must_use]
147    pub fn is_closed(&self) -> bool {
148        self.inner.is_closed()
149    }
150
151    pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
152        self.active = false;
153        self.inner.send(reply)
154    }
155}
156
157impl<T> Drop for ReplyPort<T> {
158    fn drop(&mut self) {
159        if self.active {
160            self.inner.drop_sender();
161        }
162    }
163}
164
165impl<T> ReplyState<T> {
166    fn new(timeout: Duration) -> Self {
167        Self {
168            timeout,
169            receiver_closed: AtomicBool::new(false),
170            gate: AtomicU8::new(REPLY_PENDING),
171            slot: Mutex::new(ReplySlotState {
172                value: ReplySlot::Pending,
173                waiter: None,
174            }),
175        }
176    }
177
178    /// Locks the reply slot, recovering the guard if a previous holder panicked
179    /// while holding it (e.g. a user reply value whose `Drop` panicked). The
180    /// slot is a simple three-state enum with no cross-field invariants, so
181    /// recovering from poison cannot produce unsound or wrong-result behavior
182    /// and avoids escalating one localized panic into a stream-wide crash.
183    fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
184        self.slot
185            .lock()
186            .unwrap_or_else(|poison| poison.into_inner())
187    }
188
189    fn reset(&self, timeout: Duration) {
190        self.receiver_closed.store(false, Ordering::Release);
191        self.gate.store(REPLY_PENDING, Ordering::Release);
192        let mut slot = self.lock_slot();
193        slot.value = ReplySlot::Pending;
194        slot.waiter = None;
195        debug_assert_eq!(self.timeout, timeout);
196    }
197
198    fn is_closed(&self) -> bool {
199        self.receiver_closed.load(Ordering::Acquire)
200    }
201
202    fn send(&self, reply: T) -> Result<(), ReplySendError> {
203        if self.is_closed() {
204            return Err(ReplySendError);
205        }
206
207        let mut slot = self.lock_slot();
208        if self.is_closed() {
209            return Err(ReplySendError);
210        }
211
212        if !matches!(slot.value, ReplySlot::Pending) {
213            return Err(ReplySendError);
214        }
215
216        slot.value = ReplySlot::Ready(reply);
217        let waiter = slot.waiter.clone();
218        // Publish AFTER the value write; the consumer's Acquire load of the
219        // gate synchronizes-with this store.
220        self.gate.store(REPLY_READY, Ordering::Release);
221        drop(slot);
222        wake_waiter(waiter);
223        Ok(())
224    }
225
226    fn poll(&self) -> ReplyPoll<T> {
227        if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
228            // Lock-free fast path: nothing has been delivered yet.
229            return ReplyPoll::Pending;
230        }
231        self.take_locked()
232    }
233
234    /// Closes the receiver under the slot lock so a concurrent `send()` observes
235    /// the closure on its under-lock recheck and reports `ReplySendError`
236    /// instead of disagreeing with a reported timeout. If a reply (or drop)
237    /// raced in just before the deadline, it is delivered rather than lost.
238    fn close_on_timeout(&self) -> ReplyPoll<T> {
239        let mut slot = self.lock_slot();
240        self.receiver_closed.store(true, Ordering::Release);
241        let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
242            ReplySlot::Pending => ReplyPoll::Pending,
243            ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
244            ReplySlot::Dropped => ReplyPoll::Dropped,
245        };
246        let waiter = slot.waiter.take();
247        drop(slot);
248        wake_waiter(waiter);
249        outcome
250    }
251
252    fn take_locked(&self) -> ReplyPoll<T> {
253        let mut slot = self.lock_slot();
254        slot.waiter = None;
255        match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
256            ReplySlot::Pending => ReplyPoll::Pending,
257            ReplySlot::Ready(reply) => {
258                self.close_receiver();
259                ReplyPoll::Ready(reply)
260            }
261            ReplySlot::Dropped => {
262                self.close_receiver();
263                ReplyPoll::Dropped
264            }
265        }
266    }
267
268    fn drop_sender(&self) {
269        let mut slot = self.lock_slot();
270        if matches!(slot.value, ReplySlot::Pending) {
271            slot.value = ReplySlot::Dropped;
272            let waiter = slot.waiter.clone();
273            self.gate.store(REPLY_DROPPED, Ordering::Release);
274            drop(slot);
275            wake_waiter(waiter);
276        }
277    }
278
279    fn close_receiver(&self) {
280        self.receiver_closed.store(true, Ordering::Release);
281    }
282
283    fn register_waiter(&self, waiter: Thread) {
284        self.lock_slot().waiter = Some(waiter);
285    }
286
287    fn unregister_waiter(&self, waiter_id: ThreadId) {
288        let mut slot = self.lock_slot();
289        if slot
290            .waiter
291            .as_ref()
292            .is_some_and(|thread| thread.id() == waiter_id)
293        {
294            slot.waiter = None;
295        }
296    }
297}
298
299fn wake_waiter(waiter: Option<Thread>) {
300    if let Some(waiter) = waiter {
301        waiter.unpark();
302    }
303}
304
305fn ask_flow<In, Msg, Out, F>(
306    actor_ref: ActorRef<Msg>,
307    parallelism: usize,
308    timeout: Duration,
309    make_msg: Arc<F>,
310) -> Flow<In, Out>
311where
312    In: Send + 'static,
313    Msg: Message,
314    Out: Send + 'static,
315    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
316{
317    assert!(
318        parallelism > 0,
319        "ActorFlow::ask parallelism must be greater than zero"
320    );
321    // Deliberately `from_transform`, not `from_preserving_transform`: an ask must
322    // not inherit the inline head-terminal hint. If it did, a bounded eager source
323    // feeding `Sink::head` would run the ask's blocking cross-thread reply wait
324    // inline on the caller during materialization, which breaks the non-blocking /
325    // awaitable contract of `StreamCompletion` and can deadlock if
326    // `run_with_materializer()` is `.await`ed on a runtime thread the actor needs
327    // to make progress. Ask graphs always run on a worker.
328    Flow::from_transform(move |input| {
329        ask_ractor_ordered(
330            input,
331            actor_ref.clone(),
332            parallelism,
333            timeout,
334            Arc::clone(&make_msg),
335        )
336    })
337}
338
339fn ask_ractor_ordered<In, Msg, Out, F>(
340    mut input: BoxStream<In>,
341    actor_ref: ActorRef<Msg>,
342    parallelism: usize,
343    timeout: Duration,
344    make_msg: Arc<F>,
345) -> BoxStream<Out>
346where
347    In: Send + 'static,
348    Msg: Message,
349    Out: Send + 'static,
350    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
351{
352    let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
353    let mut next_index = 0_usize;
354    let mut next_to_emit = 0_usize;
355    let mut completed = Vec::with_capacity(parallelism);
356    let mut reply_pool = Vec::with_capacity(parallelism);
357    let mut input_done = false;
358
359    Box::new(std::iter::from_fn(move || {
360        loop {
361            if let Some(result) = take_completed(&mut completed, next_to_emit) {
362                next_to_emit += 1;
363                return Some(result);
364            }
365
366            while in_flight.len() < parallelism && !input_done {
367                match input.next() {
368                    Some(Ok(item)) => {
369                        let index = next_index;
370                        next_index += 1;
371                        match start_ractor_ask(
372                            index,
373                            actor_ref.clone(),
374                            timeout,
375                            item,
376                            Arc::clone(&make_msg),
377                            &mut reply_pool,
378                        ) {
379                            Ok(ask) => in_flight.push(ask),
380                            Err(error) => {
381                                completed.push((index, Err(error)));
382                                input_done = true;
383                            }
384                        }
385                    }
386                    Some(Err(error)) => {
387                        completed.push((next_index, Err(error)));
388                        next_index += 1;
389                        input_done = true;
390                    }
391                    None => input_done = true,
392                }
393            }
394
395            if let Some(result) = take_completed(&mut completed, next_to_emit) {
396                next_to_emit += 1;
397                return Some(result);
398            }
399
400            if in_flight.is_empty() {
401                return None;
402            }
403
404            let ask = wait_for_ready_ask(&mut in_flight, timeout);
405            let index = ask.index;
406            let result = ask.result;
407            recycle_reply_state(ask.state, &mut reply_pool);
408            if index == next_to_emit {
409                next_to_emit += 1;
410                return Some(result);
411            }
412            completed.push((index, result));
413        }
414    }))
415}
416
417fn take_completed<Out>(
418    completed: &mut Vec<(usize, StreamResult<Out>)>,
419    index: usize,
420) -> Option<StreamResult<Out>> {
421    let position = completed
422        .iter()
423        .position(|(completed_index, _)| *completed_index == index)?;
424    Some(completed.swap_remove(position).1)
425}
426
427struct InFlightAsk<Out> {
428    index: usize,
429    state: Option<Arc<ReplyState<Out>>>,
430    /// `None` when `Instant::now() + timeout` would overflow; treated as "never
431    /// time out" rather than panicking on the add.
432    deadline: Option<Instant>,
433}
434
435impl<Out> InFlightAsk<Out> {
436    fn state(&self) -> &Arc<ReplyState<Out>> {
437        self.state.as_ref().expect("in-flight ask has reply state")
438    }
439
440    fn into_state(mut self) -> Arc<ReplyState<Out>> {
441        self.state.take().expect("in-flight ask has reply state")
442    }
443}
444
445impl<Out> Drop for InFlightAsk<Out> {
446    fn drop(&mut self) {
447        if let Some(state) = &self.state {
448            state.close_receiver();
449        }
450    }
451}
452
453struct CompletedAsk<Out> {
454    index: usize,
455    result: StreamResult<Out>,
456    state: Arc<ReplyState<Out>>,
457}
458
459fn start_ractor_ask<In, Msg, Out, F>(
460    index: usize,
461    actor_ref: ActorRef<Msg>,
462    timeout: Duration,
463    input: In,
464    make_msg: Arc<F>,
465    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
466) -> StreamResult<InFlightAsk<Out>>
467where
468    In: Send + 'static,
469    Msg: Message,
470    Out: Send + 'static,
471    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
472{
473    let reply_state = match reply_pool.pop() {
474        // A freshly allocated state is already Pending/open, so only recycled
475        // states need a reset.
476        Some(state) => {
477            state.reset(timeout);
478            state
479        }
480        None => Arc::new(ReplyState::new(timeout)),
481    };
482    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
483    let message =
484        catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
485            StreamError::ActorAskTaskFailed {
486                reason: panic_reason(panic),
487            }
488        })?;
489
490    match actor_ref.cast(message) {
491        Ok(()) => {}
492        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
493            return Err(StreamError::ActorTerminated);
494        }
495        Err(error) => {
496            return Err(StreamError::ActorAskSendFailed {
497                reason: error.to_string(),
498            });
499        }
500    }
501
502    Ok(InFlightAsk {
503        index,
504        state: Some(reply_state),
505        deadline: Instant::now().checked_add(timeout),
506    })
507}
508
509fn wait_for_ready_ask<Out>(
510    in_flight: &mut Vec<InFlightAsk<Out>>,
511    timeout: Duration,
512) -> CompletedAsk<Out> {
513    let mut idle_spins = 0;
514    let mut idle_yields = 0;
515    let mut time_refresh = 0_u32;
516    let mut now = Instant::now();
517    loop {
518        if time_refresh == 0 {
519            now = Instant::now();
520        }
521        time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
522        if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
523            return ask;
524        }
525
526        if idle_spins < ASK_READY_SPINS {
527            idle_spins += 1;
528            std::hint::spin_loop();
529        } else if idle_yields < ASK_IDLE_YIELDS {
530            idle_yields += 1;
531            time_refresh = 0;
532            thread::yield_now();
533        } else {
534            idle_spins = 0;
535            idle_yields = 0;
536            time_refresh = 0;
537            let current = thread::current();
538            let registered = register_ask_waiters(in_flight, &current);
539            now = Instant::now();
540            if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
541                unregister_ask_waiters(registered, current.id());
542                return ask;
543            }
544            thread::park_timeout(next_ask_park(in_flight, now));
545            unregister_ask_waiters(registered, current.id());
546        }
547    }
548}
549
550fn take_ready_ask<Out>(
551    in_flight: &mut Vec<InFlightAsk<Out>>,
552    timeout: Duration,
553    now: Instant,
554) -> Option<CompletedAsk<Out>> {
555    let mut index = 0;
556    while index < in_flight.len() {
557        match in_flight[index].state().poll() {
558            ReplyPoll::Ready(reply) => {
559                let ask = in_flight.swap_remove(index);
560                return Some(CompletedAsk {
561                    index: ask.index,
562                    result: Ok(reply),
563                    state: ask.into_state(),
564                });
565            }
566            ReplyPoll::Dropped => {
567                let ask = in_flight.swap_remove(index);
568                return Some(CompletedAsk {
569                    index: ask.index,
570                    result: Err(StreamError::ActorAskResponseDropped),
571                    state: ask.into_state(),
572                });
573            }
574            ReplyPoll::Pending => {
575                if in_flight[index]
576                    .deadline
577                    .is_some_and(|deadline| now >= deadline)
578                {
579                    // Close under the slot lock; deliver a reply that raced in
580                    // just before the deadline rather than losing it.
581                    let outcome = in_flight[index].state().close_on_timeout();
582                    let ask = in_flight.swap_remove(index);
583                    let result = match outcome {
584                        ReplyPoll::Ready(reply) => Ok(reply),
585                        ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
586                        ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
587                    };
588                    return Some(CompletedAsk {
589                        index: ask.index,
590                        result,
591                        state: ask.into_state(),
592                    });
593                }
594                index += 1;
595            }
596        }
597    }
598    None
599}
600
601fn register_ask_waiters<Out>(
602    in_flight: &[InFlightAsk<Out>],
603    current: &Thread,
604) -> Vec<Arc<ReplyState<Out>>> {
605    let mut registered = Vec::with_capacity(in_flight.len());
606    for ask in in_flight {
607        ask.state().register_waiter(current.clone());
608        registered.push(Arc::clone(ask.state()));
609    }
610    registered
611}
612
613fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
614    for state in registered {
615        state.unregister_waiter(current_id);
616    }
617}
618
619fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
620    next_ask_deadline_remaining(in_flight, now)
621        .unwrap_or(ASK_MAX_PARK)
622        .min(ASK_MAX_PARK)
623}
624
625fn next_ask_deadline_remaining<Out>(
626    in_flight: &[InFlightAsk<Out>],
627    now: Instant,
628) -> Option<Duration> {
629    in_flight
630        .iter()
631        .filter_map(|ask| ask.deadline)
632        .map(|deadline| deadline.saturating_duration_since(now))
633        .min()
634}
635
636fn recycle_reply_state<Out>(
637    state: Arc<ReplyState<Out>>,
638    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
639) {
640    if Arc::strong_count(&state) == 1 {
641        reply_pool.push(state);
642    }
643}
644
645fn panic_reason(panic: Box<dyn Any + Send>) -> String {
646    if let Some(reason) = panic.downcast_ref::<&str>() {
647        (*reason).to_owned()
648    } else if let Some(reason) = panic.downcast_ref::<String>() {
649        reason.clone()
650    } else {
651        "actor ask task panicked".to_owned()
652    }
653}
654
655pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
656    static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
657        std::sync::OnceLock::new();
658
659    match RUNTIME.get_or_init(|| {
660        tokio::runtime::Builder::new_multi_thread()
661            .thread_name("datum-ractor-runtime")
662            .enable_all()
663            .build()
664            .map_err(|error| format!("ractor runtime failed to start: {error}"))
665    }) {
666        Ok(runtime) => Ok(runtime),
667        Err(error) => Err(StreamError::Failed(error.clone())),
668    }
669}
670
671pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
672where
673    T: Send + 'static,
674    F: std::future::Future<Output = T> + Send + 'static,
675{
676    let runtime = ractor_runtime()?;
677    let result = if tokio::runtime::Handle::try_current().is_ok() {
678        std::thread::spawn(move || runtime.block_on(future))
679            .join()
680            .map_err(|_| StreamError::AbruptTermination)?
681    } else {
682        runtime.block_on(future)
683    };
684    Ok(result)
685}
686
687#[cfg(test)]
688fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
689where
690    T: Send + 'static,
691    F: std::future::Future<Output = T> + Send + 'static,
692{
693    block_on_ractor_runtime(future).expect("ractor ask runtime starts")
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use crate::stream::{Sink, Source, StreamCompletion};
700    use std::sync::{
701        Arc as StdArc,
702        atomic::{AtomicUsize, Ordering},
703    };
704
705    enum AskTestMessage {
706        Delayed {
707            input: u64,
708            delay: Duration,
709            reply_to: ReplyPort<u64>,
710        },
711        Track {
712            input: u64,
713            reply_to: ReplyPort<u64>,
714        },
715        NeverReply {
716            _reply_to: ReplyPort<u64>,
717        },
718        DropReply {
719            _reply_to: ReplyPort<u64>,
720        },
721        BindTcp {
722            reply_to: ReplyPort<u64>,
723        },
724    }
725
726    #[cfg(feature = "cluster")]
727    impl Message for AskTestMessage {}
728
729    struct AskTestActor;
730
731    struct AskTestState {
732        active: StdArc<AtomicUsize>,
733        max_active: StdArc<AtomicUsize>,
734        held_replies: Vec<ReplyPort<u64>>,
735    }
736
737    impl Actor for AskTestActor {
738        type Msg = AskTestMessage;
739        type State = AskTestState;
740        type Arguments = AskTestState;
741
742        async fn pre_start(
743            &self,
744            _myself: ActorRef<Self::Msg>,
745            args: Self::Arguments,
746        ) -> Result<Self::State, ActorProcessingErr> {
747            Ok(args)
748        }
749
750        async fn handle(
751            &self,
752            _myself: ActorRef<Self::Msg>,
753            message: Self::Msg,
754            state: &mut Self::State,
755        ) -> Result<(), ActorProcessingErr> {
756            match message {
757                AskTestMessage::Delayed {
758                    input,
759                    delay,
760                    reply_to,
761                } => {
762                    ractor::concurrency::spawn(async move {
763                        ractor::concurrency::sleep(delay).await;
764                        let _ = reply_to.send(input);
765                    });
766                }
767                AskTestMessage::Track { input, reply_to } => {
768                    let active = StdArc::clone(&state.active);
769                    let max_active = StdArc::clone(&state.max_active);
770                    let current = active.fetch_add(1, Ordering::SeqCst) + 1;
771                    max_active.fetch_max(current, Ordering::SeqCst);
772                    ractor::concurrency::spawn(async move {
773                        ractor::concurrency::sleep(Duration::from_millis(20)).await;
774                        active.fetch_sub(1, Ordering::SeqCst);
775                        let _ = reply_to.send(input);
776                    });
777                }
778                AskTestMessage::NeverReply { _reply_to } => {
779                    state.held_replies.push(_reply_to);
780                }
781                AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
782                AskTestMessage::BindTcp { reply_to } => {
783                    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
784                    let port = listener.local_addr()?.port();
785                    let _ = reply_to.send(u64::from(port));
786                }
787            }
788            Ok(())
789        }
790    }
791
792    fn wait<T>(completion: StreamCompletion<T>) -> T {
793        completion.wait().unwrap()
794    }
795
796    fn spawn_test_actor() -> (
797        ActorRef<AskTestMessage>,
798        ractor::concurrency::JoinHandle<()>,
799        StdArc<AtomicUsize>,
800    ) {
801        let active = StdArc::new(AtomicUsize::new(0));
802        let max_active = StdArc::new(AtomicUsize::new(0));
803        let state = AskTestState {
804            active,
805            max_active: StdArc::clone(&max_active),
806            held_replies: Vec::new(),
807        };
808        let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
809            Actor::spawn(None, AskTestActor, state)
810                .await
811                .expect("test actor spawns")
812        });
813        (actor_ref, handle, max_active)
814    }
815
816    fn stop_test_actor(
817        actor_ref: ActorRef<AskTestMessage>,
818        handle: ractor::concurrency::JoinHandle<()>,
819    ) {
820        actor_ref.stop(None);
821        block_on_ractor_ask_runtime(async move {
822            handle.await.expect("test actor task joins");
823        });
824    }
825
826    #[test]
827    fn actor_flow_ask_preserves_order_with_parallelism() {
828        let (actor_ref, handle, _) = spawn_test_actor();
829
830        let values = Source::from_iter(0_u64..5)
831            .via(ActorFlow::ask(
832                actor_ref.clone(),
833                5,
834                Duration::from_secs(1),
835                |input, reply_to| AskTestMessage::Delayed {
836                    input,
837                    delay: Duration::from_millis((5 - input) * 10),
838                    reply_to,
839                },
840            ))
841            .run_with(Sink::collect())
842            .unwrap();
843
844        assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
845        stop_test_actor(actor_ref, handle);
846    }
847
848    #[test]
849    fn actor_flow_ask_respects_parallelism() {
850        let (actor_ref, handle, max_active) = spawn_test_actor();
851
852        let values = Source::from_iter(0_u64..8)
853            .via(ActorFlow::ask(
854                actor_ref.clone(),
855                3,
856                Duration::from_secs(1),
857                |input, reply_to| AskTestMessage::Track { input, reply_to },
858            ))
859            .run_with(Sink::collect())
860            .unwrap();
861
862        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
863        assert_eq!(max_active.load(Ordering::SeqCst), 3);
864        stop_test_actor(actor_ref, handle);
865    }
866
867    #[test]
868    fn actor_flow_ask_into_head_returns_value() {
869        // `ActorFlow::ask` does *not* preserve the inline head-terminal hint, so a
870        // bounded eager source feeding `Sink::head` through an ask runs the ask
871        // graph (including the cross-thread reply wait) on a runtime worker rather
872        // than inline on the caller. That keeps `run_with_materializer()`
873        // non-blocking/awaitable: it returns a `StreamCompletion` backed by a
874        // oneshot that resolves once the worker drains the ask round-trip. This
875        // guards that the ask -> head graph still returns the correct first reply
876        // end to end. (The reply is delayed: `Track` sleeps 20 ms on the actor
877        // runtime before replying.)
878        let (actor_ref, handle, _) = spawn_test_actor();
879
880        let head = Source::single(7_u64)
881            .via(ActorFlow::ask(
882                actor_ref.clone(),
883                1,
884                Duration::from_secs(1),
885                |input, reply_to| AskTestMessage::Track { input, reply_to },
886            ))
887            .run_with(Sink::head())
888            .unwrap();
889
890        assert_eq!(wait(head), 7);
891
892        stop_test_actor(actor_ref, handle);
893    }
894
895    #[test]
896    fn actor_flow_ask_actor_handler_can_use_tokio_io() {
897        let (actor_ref, handle, _) = spawn_test_actor();
898
899        let ports = Source::single(0_u64)
900            .via(ActorFlow::ask(
901                actor_ref.clone(),
902                1,
903                Duration::from_secs(1),
904                |_input, reply_to| AskTestMessage::BindTcp { reply_to },
905            ))
906            .run_collect()
907            .unwrap();
908
909        assert_eq!(ports.len(), 1);
910        assert_ne!(ports[0], 0);
911        stop_test_actor(actor_ref, handle);
912    }
913
914    #[test]
915    fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
916        // Replies are delayed well past the spin+yield window, so each wait must
917        // reach the park branch and be woken by `ReplyState::send` rather than
918        // busy-yielding until the (long) deadline. Parallelism > 1 also exercises
919        // registering/unregistering several in-flight waiters on one parked
920        // consumer. This is the regression guard for the long-deadline ask path
921        // that previously yielded indefinitely (never parked).
922        let (actor_ref, handle, _) = spawn_test_actor();
923
924        let values = Source::from_iter(0_u64..8)
925            .via(ActorFlow::ask(
926                actor_ref.clone(),
927                4,
928                Duration::from_secs(30),
929                |input, reply_to| AskTestMessage::Delayed {
930                    input,
931                    delay: Duration::from_millis(25),
932                    reply_to,
933                },
934            ))
935            .run_with(Sink::collect())
936            .unwrap();
937
938        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
939        stop_test_actor(actor_ref, handle);
940    }
941
942    #[test]
943    fn dropped_in_flight_ask_closes_reply_port() {
944        let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
945        let reply_to = ReplyPort::new(StdArc::clone(&state));
946        let ask = InFlightAsk {
947            index: 0,
948            state: Some(StdArc::clone(&state)),
949            deadline: None,
950        };
951
952        drop(ask);
953
954        assert!(reply_to.is_closed());
955        assert_eq!(reply_to.send(1), Err(ReplySendError));
956    }
957
958    #[test]
959    fn actor_flow_ask_times_out() {
960        let (actor_ref, handle, _) = spawn_test_actor();
961        let timeout = Duration::from_millis(10);
962
963        let result = Source::single(1_u64)
964            .via(ActorFlow::ask(
965                actor_ref.clone(),
966                1,
967                timeout,
968                |_input, reply_to| AskTestMessage::NeverReply {
969                    _reply_to: reply_to,
970                },
971            ))
972            .run_collect();
973
974        assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
975        stop_test_actor(actor_ref, handle);
976    }
977
978    #[test]
979    fn actor_flow_ask_fails_when_actor_is_stopped() {
980        let (actor_ref, handle, _) = spawn_test_actor();
981        actor_ref.stop(None);
982        block_on_ractor_ask_runtime(async move {
983            handle.await.expect("test actor task joins");
984        });
985
986        let result = Source::single(1_u64)
987            .via(ActorFlow::ask(
988                actor_ref,
989                1,
990                Duration::from_secs(1),
991                |input, reply_to| AskTestMessage::Delayed {
992                    input,
993                    delay: Duration::ZERO,
994                    reply_to,
995                },
996            ))
997            .run_collect();
998
999        assert_eq!(result, Err(StreamError::ActorTerminated));
1000    }
1001
1002    #[test]
1003    fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1004        let (actor_ref, handle, _) = spawn_test_actor();
1005
1006        let result = Source::single(1_u64)
1007            .via(ActorFlow::ask(
1008                actor_ref.clone(),
1009                1,
1010                Duration::from_secs(1),
1011                |_input, reply_to| AskTestMessage::DropReply {
1012                    _reply_to: reply_to,
1013                },
1014            ))
1015            .run_collect();
1016
1017        assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1018        stop_test_actor(actor_ref, handle);
1019    }
1020
1021    #[test]
1022    fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1023        let (actor_ref, handle, _) = spawn_test_actor();
1024
1025        let result = Source::single(1_u64)
1026            .via(ActorFlow::ask(
1027                actor_ref.clone(),
1028                1,
1029                Duration::from_secs(1),
1030                |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1031                    panic!("message builder failed");
1032                },
1033            ))
1034            .run_collect();
1035
1036        assert_eq!(
1037            result,
1038            Err(StreamError::ActorAskTaskFailed {
1039                reason: "message builder failed".to_owned()
1040            })
1041        );
1042        stop_test_actor(actor_ref, handle);
1043    }
1044}