Skip to main content

datum/
actor.rs

1//! Actor interop APIs backed by Ractor for local execution.
2
3use std::{
4    any::Any,
5    error::Error,
6    fmt,
7    panic::{AssertUnwindSafe, catch_unwind},
8    sync::{
9        Arc, Mutex, MutexGuard,
10        atomic::{AtomicBool, AtomicU8, Ordering},
11    },
12    thread::{self, Thread, ThreadId},
13    time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21mod stream_ref;
22pub mod stream_ref_proto;
23pub use interop::{
24    ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
25    ActorSourceMessage, ActorStatus, WatchEvent,
26};
27pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
28pub use stream_ref_proto::{
29    StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefPayload, StreamRefPayloadBytes,
30    StreamRefProtoConsumer, StreamRefProtoEndpoint, StreamRefProtoProducer,
31};
32
33pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
34
35const ASK_READY_SPINS: usize = 256;
36/// Bounded number of `yield_now()` calls between the spin window and parking.
37/// This is deliberately deadline-independent: parking is gated only by this
38/// fixed transition budget, and the *park duration* is what respects the
39/// deadline (see [`next_ask_park`]). A fast reply is caught during the spin or
40/// yield window; a genuinely slow reply (or a long, idle timeout) parks and is
41/// woken by `ReplyState::send`/`drop_sender`/`close_on_timeout` rather than
42/// burning a core busy-yielding until the deadline.
43const ASK_IDLE_YIELDS: usize = 64;
44const ASK_MAX_PARK: Duration = Duration::from_millis(1);
45const ASK_TIME_REFRESH_ITERS: u32 = 64;
46
47// Fast-path reply gate values. `poll()` reads this atomically and only takes
48// the slot mutex once a reply (or drop) is actually present, so the busy-spin
49// consumer does not contend with the replying actor on every spin iteration.
50const REPLY_PENDING: u8 = 0;
51const REPLY_READY: u8 = 1;
52const REPLY_DROPPED: u8 = 2;
53
54/// Collection of actor-aware stream flows.
55pub struct ActorFlow;
56
57impl ActorFlow {
58    /// Sends each stream element to an actor using a request/reply message.
59    ///
60    /// The returned flow preserves input order while allowing up to `parallelism`
61    /// requests in flight. It fails the stream if a request times out, the actor
62    /// cannot be sent to, or the reply port is dropped before a response arrives.
63    #[must_use]
64    pub fn ask<In, Msg, Out, F>(
65        actor_ref: ActorRef<Msg>,
66        parallelism: usize,
67        timeout: Duration,
68        make_msg: F,
69    ) -> Flow<In, Out, NotUsed>
70    where
71        In: Send + 'static,
72        Msg: Message,
73        Out: Send + 'static,
74        F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
75    {
76        ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
77    }
78}
79
80/// One-shot reply handle supplied to actor ask messages.
81///
82/// This wrapper keeps Datum's stream-facing API from exposing Ractor's concrete
83/// RPC port type. The current implementation is local/Ractor-backed, and the
84/// wrapper leaves room for a later transport-specific reply implementation.
85pub struct ReplyPort<T> {
86    inner: Arc<ReplyState<T>>,
87    active: bool,
88}
89
90#[derive(Debug)]
91struct ReplyState<T> {
92    timeout: Duration,
93    receiver_closed: AtomicBool,
94    gate: AtomicU8,
95    slot: Mutex<ReplySlotState<T>>,
96}
97
98#[derive(Debug)]
99struct ReplySlotState<T> {
100    value: ReplySlot<T>,
101    waiter: Option<Thread>,
102}
103
104#[derive(Debug)]
105enum ReplySlot<T> {
106    Pending,
107    Ready(T),
108    Dropped,
109}
110
111enum ReplyPoll<T> {
112    Pending,
113    Ready(T),
114    Dropped,
115}
116
117impl<T> fmt::Debug for ReplyPort<T> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("ReplyPort")
120            .field("timeout", &self.timeout())
121            .field("closed", &self.is_closed())
122            .finish_non_exhaustive()
123    }
124}
125
126/// Error returned when an actor tries to answer an ask whose receiver is gone.
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128pub struct ReplySendError;
129
130impl fmt::Display for ReplySendError {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        f.write_str("actor reply receiver dropped")
133    }
134}
135
136impl Error for ReplySendError {}
137
138impl<T> ReplyPort<T> {
139    fn new(inner: Arc<ReplyState<T>>) -> Self {
140        Self {
141            inner,
142            active: true,
143        }
144    }
145
146    #[must_use]
147    pub fn timeout(&self) -> Option<Duration> {
148        Some(self.inner.timeout)
149    }
150
151    #[must_use]
152    pub fn is_closed(&self) -> bool {
153        self.inner.is_closed()
154    }
155
156    pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
157        self.active = false;
158        self.inner.send(reply)
159    }
160}
161
162impl<T> Drop for ReplyPort<T> {
163    fn drop(&mut self) {
164        if self.active {
165            self.inner.drop_sender();
166        }
167    }
168}
169
170impl<T> ReplyState<T> {
171    fn new(timeout: Duration) -> Self {
172        Self {
173            timeout,
174            receiver_closed: AtomicBool::new(false),
175            gate: AtomicU8::new(REPLY_PENDING),
176            slot: Mutex::new(ReplySlotState {
177                value: ReplySlot::Pending,
178                waiter: None,
179            }),
180        }
181    }
182
183    /// Locks the reply slot, recovering the guard if a previous holder panicked
184    /// while holding it (e.g. a user reply value whose `Drop` panicked). The
185    /// slot is a simple three-state enum with no cross-field invariants, so
186    /// recovering from poison cannot produce unsound or wrong-result behavior
187    /// and avoids escalating one localized panic into a stream-wide crash.
188    fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
189        self.slot
190            .lock()
191            .unwrap_or_else(|poison| poison.into_inner())
192    }
193
194    fn reset(&self, timeout: Duration) {
195        self.receiver_closed.store(false, Ordering::Release);
196        self.gate.store(REPLY_PENDING, Ordering::Release);
197        let mut slot = self.lock_slot();
198        slot.value = ReplySlot::Pending;
199        slot.waiter = None;
200        debug_assert_eq!(self.timeout, timeout);
201    }
202
203    fn is_closed(&self) -> bool {
204        self.receiver_closed.load(Ordering::Acquire)
205    }
206
207    fn send(&self, reply: T) -> Result<(), ReplySendError> {
208        if self.is_closed() {
209            return Err(ReplySendError);
210        }
211
212        let mut slot = self.lock_slot();
213        if self.is_closed() {
214            return Err(ReplySendError);
215        }
216
217        if !matches!(slot.value, ReplySlot::Pending) {
218            return Err(ReplySendError);
219        }
220
221        slot.value = ReplySlot::Ready(reply);
222        let waiter = slot.waiter.clone();
223        // Publish AFTER the value write; the consumer's Acquire load of the
224        // gate synchronizes-with this store.
225        self.gate.store(REPLY_READY, Ordering::Release);
226        drop(slot);
227        wake_waiter(waiter);
228        Ok(())
229    }
230
231    fn poll(&self) -> ReplyPoll<T> {
232        if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
233            // Lock-free fast path: nothing has been delivered yet.
234            return ReplyPoll::Pending;
235        }
236        self.take_locked()
237    }
238
239    /// Closes the receiver under the slot lock so a concurrent `send()` observes
240    /// the closure on its under-lock recheck and reports `ReplySendError`
241    /// instead of disagreeing with a reported timeout. If a reply (or drop)
242    /// raced in just before the deadline, it is delivered rather than lost.
243    fn close_on_timeout(&self) -> ReplyPoll<T> {
244        let mut slot = self.lock_slot();
245        self.receiver_closed.store(true, Ordering::Release);
246        let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
247            ReplySlot::Pending => ReplyPoll::Pending,
248            ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
249            ReplySlot::Dropped => ReplyPoll::Dropped,
250        };
251        let waiter = slot.waiter.take();
252        drop(slot);
253        wake_waiter(waiter);
254        outcome
255    }
256
257    fn take_locked(&self) -> ReplyPoll<T> {
258        let mut slot = self.lock_slot();
259        slot.waiter = None;
260        match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
261            ReplySlot::Pending => ReplyPoll::Pending,
262            ReplySlot::Ready(reply) => {
263                self.close_receiver();
264                ReplyPoll::Ready(reply)
265            }
266            ReplySlot::Dropped => {
267                self.close_receiver();
268                ReplyPoll::Dropped
269            }
270        }
271    }
272
273    fn drop_sender(&self) {
274        let mut slot = self.lock_slot();
275        if matches!(slot.value, ReplySlot::Pending) {
276            slot.value = ReplySlot::Dropped;
277            let waiter = slot.waiter.clone();
278            self.gate.store(REPLY_DROPPED, Ordering::Release);
279            drop(slot);
280            wake_waiter(waiter);
281        }
282    }
283
284    fn close_receiver(&self) {
285        self.receiver_closed.store(true, Ordering::Release);
286    }
287
288    fn register_waiter(&self, waiter: Thread) {
289        self.lock_slot().waiter = Some(waiter);
290    }
291
292    fn unregister_waiter(&self, waiter_id: ThreadId) {
293        let mut slot = self.lock_slot();
294        if slot
295            .waiter
296            .as_ref()
297            .is_some_and(|thread| thread.id() == waiter_id)
298        {
299            slot.waiter = None;
300        }
301    }
302}
303
304fn wake_waiter(waiter: Option<Thread>) {
305    if let Some(waiter) = waiter {
306        waiter.unpark();
307    }
308}
309
310fn ask_flow<In, Msg, Out, F>(
311    actor_ref: ActorRef<Msg>,
312    parallelism: usize,
313    timeout: Duration,
314    make_msg: Arc<F>,
315) -> Flow<In, Out>
316where
317    In: Send + 'static,
318    Msg: Message,
319    Out: Send + 'static,
320    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
321{
322    assert!(
323        parallelism > 0,
324        "ActorFlow::ask parallelism must be greater than zero"
325    );
326    // Deliberately `from_transform`, not `from_preserving_transform`: an ask must
327    // not inherit the inline head-terminal hint. If it did, a bounded eager source
328    // feeding `Sink::head` would run the ask's blocking cross-thread reply wait
329    // inline on the caller during materialization, which breaks the non-blocking /
330    // awaitable contract of `StreamCompletion` and can deadlock if
331    // `run_with_materializer()` is `.await`ed on a runtime thread the actor needs
332    // to make progress. Ask graphs always run on a worker.
333    Flow::from_transform(move |input| {
334        ask_ractor_ordered(
335            input,
336            actor_ref.clone(),
337            parallelism,
338            timeout,
339            Arc::clone(&make_msg),
340        )
341    })
342}
343
344fn ask_ractor_ordered<In, Msg, Out, F>(
345    mut input: BoxStream<In>,
346    actor_ref: ActorRef<Msg>,
347    parallelism: usize,
348    timeout: Duration,
349    make_msg: Arc<F>,
350) -> BoxStream<Out>
351where
352    In: Send + 'static,
353    Msg: Message,
354    Out: Send + 'static,
355    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
356{
357    let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
358    let mut next_index = 0_usize;
359    let mut next_to_emit = 0_usize;
360    let mut completed = Vec::with_capacity(parallelism);
361    let mut reply_pool = Vec::with_capacity(parallelism);
362    let mut input_done = false;
363
364    Box::new(std::iter::from_fn(move || {
365        loop {
366            if let Some(result) = take_completed(&mut completed, next_to_emit) {
367                next_to_emit += 1;
368                return Some(result);
369            }
370
371            while in_flight.len() < parallelism && !input_done {
372                match input.next() {
373                    Some(Ok(item)) => {
374                        let index = next_index;
375                        next_index += 1;
376                        match start_ractor_ask(
377                            index,
378                            actor_ref.clone(),
379                            timeout,
380                            item,
381                            Arc::clone(&make_msg),
382                            &mut reply_pool,
383                        ) {
384                            Ok(ask) => in_flight.push(ask),
385                            Err(error) => {
386                                completed.push((index, Err(error)));
387                                input_done = true;
388                            }
389                        }
390                    }
391                    Some(Err(error)) => {
392                        completed.push((next_index, Err(error)));
393                        next_index += 1;
394                        input_done = true;
395                    }
396                    None => input_done = true,
397                }
398            }
399
400            if let Some(result) = take_completed(&mut completed, next_to_emit) {
401                next_to_emit += 1;
402                return Some(result);
403            }
404
405            if in_flight.is_empty() {
406                return None;
407            }
408
409            let ask = wait_for_ready_ask(&mut in_flight, timeout);
410            let index = ask.index;
411            let result = ask.result;
412            recycle_reply_state(ask.state, &mut reply_pool);
413            if index == next_to_emit {
414                next_to_emit += 1;
415                return Some(result);
416            }
417            completed.push((index, result));
418        }
419    }))
420}
421
422fn take_completed<Out>(
423    completed: &mut Vec<(usize, StreamResult<Out>)>,
424    index: usize,
425) -> Option<StreamResult<Out>> {
426    let position = completed
427        .iter()
428        .position(|(completed_index, _)| *completed_index == index)?;
429    Some(completed.swap_remove(position).1)
430}
431
432struct InFlightAsk<Out> {
433    index: usize,
434    state: Option<Arc<ReplyState<Out>>>,
435    /// `None` when `Instant::now() + timeout` would overflow; treated as "never
436    /// time out" rather than panicking on the add.
437    deadline: Option<Instant>,
438}
439
440impl<Out> InFlightAsk<Out> {
441    fn state(&self) -> &Arc<ReplyState<Out>> {
442        self.state.as_ref().expect("in-flight ask has reply state")
443    }
444
445    fn into_state(mut self) -> Arc<ReplyState<Out>> {
446        self.state.take().expect("in-flight ask has reply state")
447    }
448}
449
450impl<Out> Drop for InFlightAsk<Out> {
451    fn drop(&mut self) {
452        if let Some(state) = &self.state {
453            state.close_receiver();
454        }
455    }
456}
457
458struct CompletedAsk<Out> {
459    index: usize,
460    result: StreamResult<Out>,
461    state: Arc<ReplyState<Out>>,
462}
463
464fn start_ractor_ask<In, Msg, Out, F>(
465    index: usize,
466    actor_ref: ActorRef<Msg>,
467    timeout: Duration,
468    input: In,
469    make_msg: Arc<F>,
470    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
471) -> StreamResult<InFlightAsk<Out>>
472where
473    In: Send + 'static,
474    Msg: Message,
475    Out: Send + 'static,
476    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
477{
478    let reply_state = match reply_pool.pop() {
479        // A freshly allocated state is already Pending/open, so only recycled
480        // states need a reset.
481        Some(state) => {
482            state.reset(timeout);
483            state
484        }
485        None => Arc::new(ReplyState::new(timeout)),
486    };
487    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
488    let message =
489        catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
490            StreamError::ActorAskTaskFailed {
491                reason: panic_reason(panic),
492            }
493        })?;
494
495    match actor_ref.cast(message) {
496        Ok(()) => {}
497        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
498            return Err(StreamError::ActorTerminated);
499        }
500        Err(error) => {
501            return Err(StreamError::ActorAskSendFailed {
502                reason: error.to_string(),
503            });
504        }
505    }
506
507    Ok(InFlightAsk {
508        index,
509        state: Some(reply_state),
510        deadline: Instant::now().checked_add(timeout),
511    })
512}
513
514fn wait_for_ready_ask<Out>(
515    in_flight: &mut Vec<InFlightAsk<Out>>,
516    timeout: Duration,
517) -> CompletedAsk<Out> {
518    let mut idle_spins = 0;
519    let mut idle_yields = 0;
520    let mut time_refresh = 0_u32;
521    let mut now = Instant::now();
522    loop {
523        if time_refresh == 0 {
524            now = Instant::now();
525        }
526        time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
527        if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
528            return ask;
529        }
530
531        if idle_spins < ASK_READY_SPINS {
532            idle_spins += 1;
533            std::hint::spin_loop();
534        } else if idle_yields < ASK_IDLE_YIELDS {
535            idle_yields += 1;
536            time_refresh = 0;
537            thread::yield_now();
538        } else {
539            idle_spins = 0;
540            idle_yields = 0;
541            time_refresh = 0;
542            let current = thread::current();
543            let registered = register_ask_waiters(in_flight, &current);
544            now = Instant::now();
545            if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
546                unregister_ask_waiters(registered, current.id());
547                return ask;
548            }
549            thread::park_timeout(next_ask_park(in_flight, now));
550            unregister_ask_waiters(registered, current.id());
551        }
552    }
553}
554
555fn take_ready_ask<Out>(
556    in_flight: &mut Vec<InFlightAsk<Out>>,
557    timeout: Duration,
558    now: Instant,
559) -> Option<CompletedAsk<Out>> {
560    let mut index = 0;
561    while index < in_flight.len() {
562        match in_flight[index].state().poll() {
563            ReplyPoll::Ready(reply) => {
564                let ask = in_flight.swap_remove(index);
565                return Some(CompletedAsk {
566                    index: ask.index,
567                    result: Ok(reply),
568                    state: ask.into_state(),
569                });
570            }
571            ReplyPoll::Dropped => {
572                let ask = in_flight.swap_remove(index);
573                return Some(CompletedAsk {
574                    index: ask.index,
575                    result: Err(StreamError::ActorAskResponseDropped),
576                    state: ask.into_state(),
577                });
578            }
579            ReplyPoll::Pending => {
580                if in_flight[index]
581                    .deadline
582                    .is_some_and(|deadline| now >= deadline)
583                {
584                    // Close under the slot lock; deliver a reply that raced in
585                    // just before the deadline rather than losing it.
586                    let outcome = in_flight[index].state().close_on_timeout();
587                    let ask = in_flight.swap_remove(index);
588                    let result = match outcome {
589                        ReplyPoll::Ready(reply) => Ok(reply),
590                        ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
591                        ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
592                    };
593                    return Some(CompletedAsk {
594                        index: ask.index,
595                        result,
596                        state: ask.into_state(),
597                    });
598                }
599                index += 1;
600            }
601        }
602    }
603    None
604}
605
606fn register_ask_waiters<Out>(
607    in_flight: &[InFlightAsk<Out>],
608    current: &Thread,
609) -> Vec<Arc<ReplyState<Out>>> {
610    let mut registered = Vec::with_capacity(in_flight.len());
611    for ask in in_flight {
612        ask.state().register_waiter(current.clone());
613        registered.push(Arc::clone(ask.state()));
614    }
615    registered
616}
617
618fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
619    for state in registered {
620        state.unregister_waiter(current_id);
621    }
622}
623
624fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
625    next_ask_deadline_remaining(in_flight, now)
626        .unwrap_or(ASK_MAX_PARK)
627        .min(ASK_MAX_PARK)
628}
629
630fn next_ask_deadline_remaining<Out>(
631    in_flight: &[InFlightAsk<Out>],
632    now: Instant,
633) -> Option<Duration> {
634    in_flight
635        .iter()
636        .filter_map(|ask| ask.deadline)
637        .map(|deadline| deadline.saturating_duration_since(now))
638        .min()
639}
640
641fn recycle_reply_state<Out>(
642    state: Arc<ReplyState<Out>>,
643    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
644) {
645    if Arc::strong_count(&state) == 1 {
646        reply_pool.push(state);
647    }
648}
649
650fn panic_reason(panic: Box<dyn Any + Send>) -> String {
651    if let Some(reason) = panic.downcast_ref::<&str>() {
652        (*reason).to_owned()
653    } else if let Some(reason) = panic.downcast_ref::<String>() {
654        reason.clone()
655    } else {
656        "actor ask task panicked".to_owned()
657    }
658}
659
660pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
661    static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
662        std::sync::OnceLock::new();
663
664    match RUNTIME.get_or_init(|| {
665        tokio::runtime::Builder::new_multi_thread()
666            .thread_name("datum-ractor-runtime")
667            .enable_all()
668            .build()
669            .map_err(|error| format!("ractor runtime failed to start: {error}"))
670    }) {
671        Ok(runtime) => Ok(runtime),
672        Err(error) => Err(StreamError::Failed(error.clone())),
673    }
674}
675
676pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
677where
678    T: Send + 'static,
679    F: std::future::Future<Output = T> + Send + 'static,
680{
681    let runtime = ractor_runtime()?;
682    let result = if tokio::runtime::Handle::try_current().is_ok() {
683        std::thread::spawn(move || runtime.block_on(future))
684            .join()
685            .map_err(|_| StreamError::AbruptTermination)?
686    } else {
687        runtime.block_on(future)
688    };
689    Ok(result)
690}
691
692#[cfg(test)]
693fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
694where
695    T: Send + 'static,
696    F: std::future::Future<Output = T> + Send + 'static,
697{
698    block_on_ractor_runtime(future).expect("ractor ask runtime starts")
699}
700
701#[cfg(test)]
702mod tests {
703    use super::*;
704    use crate::stream::{Sink, Source, StreamCompletion};
705    use std::sync::{
706        Arc as StdArc,
707        atomic::{AtomicUsize, Ordering},
708    };
709
710    enum AskTestMessage {
711        Delayed {
712            input: u64,
713            delay: Duration,
714            reply_to: ReplyPort<u64>,
715        },
716        Track {
717            input: u64,
718            reply_to: ReplyPort<u64>,
719        },
720        NeverReply {
721            _reply_to: ReplyPort<u64>,
722        },
723        DropReply {
724            _reply_to: ReplyPort<u64>,
725        },
726        BindTcp {
727            reply_to: ReplyPort<u64>,
728        },
729    }
730
731    #[cfg(feature = "cluster")]
732    impl Message for AskTestMessage {}
733
734    struct AskTestActor;
735
736    struct AskTestState {
737        active: StdArc<AtomicUsize>,
738        max_active: StdArc<AtomicUsize>,
739        held_replies: Vec<ReplyPort<u64>>,
740    }
741
742    impl Actor for AskTestActor {
743        type Msg = AskTestMessage;
744        type State = AskTestState;
745        type Arguments = AskTestState;
746
747        async fn pre_start(
748            &self,
749            _myself: ActorRef<Self::Msg>,
750            args: Self::Arguments,
751        ) -> Result<Self::State, ActorProcessingErr> {
752            Ok(args)
753        }
754
755        async fn handle(
756            &self,
757            _myself: ActorRef<Self::Msg>,
758            message: Self::Msg,
759            state: &mut Self::State,
760        ) -> Result<(), ActorProcessingErr> {
761            match message {
762                AskTestMessage::Delayed {
763                    input,
764                    delay,
765                    reply_to,
766                } => {
767                    ractor::concurrency::spawn(async move {
768                        ractor::concurrency::sleep(delay).await;
769                        let _ = reply_to.send(input);
770                    });
771                }
772                AskTestMessage::Track { input, reply_to } => {
773                    let active = StdArc::clone(&state.active);
774                    let max_active = StdArc::clone(&state.max_active);
775                    let current = active.fetch_add(1, Ordering::SeqCst) + 1;
776                    max_active.fetch_max(current, Ordering::SeqCst);
777                    ractor::concurrency::spawn(async move {
778                        ractor::concurrency::sleep(Duration::from_millis(20)).await;
779                        active.fetch_sub(1, Ordering::SeqCst);
780                        let _ = reply_to.send(input);
781                    });
782                }
783                AskTestMessage::NeverReply { _reply_to } => {
784                    state.held_replies.push(_reply_to);
785                }
786                AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
787                AskTestMessage::BindTcp { reply_to } => {
788                    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
789                    let port = listener.local_addr()?.port();
790                    let _ = reply_to.send(u64::from(port));
791                }
792            }
793            Ok(())
794        }
795    }
796
797    fn wait<T>(completion: StreamCompletion<T>) -> T {
798        completion.wait().unwrap()
799    }
800
801    fn spawn_test_actor() -> (
802        ActorRef<AskTestMessage>,
803        ractor::concurrency::JoinHandle<()>,
804        StdArc<AtomicUsize>,
805    ) {
806        let active = StdArc::new(AtomicUsize::new(0));
807        let max_active = StdArc::new(AtomicUsize::new(0));
808        let state = AskTestState {
809            active,
810            max_active: StdArc::clone(&max_active),
811            held_replies: Vec::new(),
812        };
813        let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
814            Actor::spawn(None, AskTestActor, state)
815                .await
816                .expect("test actor spawns")
817        });
818        (actor_ref, handle, max_active)
819    }
820
821    fn stop_test_actor(
822        actor_ref: ActorRef<AskTestMessage>,
823        handle: ractor::concurrency::JoinHandle<()>,
824    ) {
825        actor_ref.stop(None);
826        block_on_ractor_ask_runtime(async move {
827            handle.await.expect("test actor task joins");
828        });
829    }
830
831    #[test]
832    fn actor_flow_ask_preserves_order_with_parallelism() {
833        let (actor_ref, handle, _) = spawn_test_actor();
834
835        let values = Source::from_iter(0_u64..5)
836            .via(ActorFlow::ask(
837                actor_ref.clone(),
838                5,
839                Duration::from_secs(1),
840                |input, reply_to| AskTestMessage::Delayed {
841                    input,
842                    delay: Duration::from_millis((5 - input) * 10),
843                    reply_to,
844                },
845            ))
846            .run_with(Sink::collect())
847            .unwrap();
848
849        assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
850        stop_test_actor(actor_ref, handle);
851    }
852
853    #[test]
854    fn actor_flow_ask_respects_parallelism() {
855        let (actor_ref, handle, max_active) = spawn_test_actor();
856
857        let values = Source::from_iter(0_u64..8)
858            .via(ActorFlow::ask(
859                actor_ref.clone(),
860                3,
861                Duration::from_secs(1),
862                |input, reply_to| AskTestMessage::Track { input, reply_to },
863            ))
864            .run_with(Sink::collect())
865            .unwrap();
866
867        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
868        assert_eq!(max_active.load(Ordering::SeqCst), 3);
869        stop_test_actor(actor_ref, handle);
870    }
871
872    #[test]
873    fn actor_flow_ask_into_head_returns_value() {
874        // `ActorFlow::ask` does *not* preserve the inline head-terminal hint, so a
875        // bounded eager source feeding `Sink::head` through an ask runs the ask
876        // graph (including the cross-thread reply wait) on a runtime worker rather
877        // than inline on the caller. That keeps `run_with_materializer()`
878        // non-blocking/awaitable: it returns a `StreamCompletion` backed by a
879        // oneshot that resolves once the worker drains the ask round-trip. This
880        // guards that the ask -> head graph still returns the correct first reply
881        // end to end. (The reply is delayed: `Track` sleeps 20 ms on the actor
882        // runtime before replying.)
883        let (actor_ref, handle, _) = spawn_test_actor();
884
885        let head = Source::single(7_u64)
886            .via(ActorFlow::ask(
887                actor_ref.clone(),
888                1,
889                Duration::from_secs(1),
890                |input, reply_to| AskTestMessage::Track { input, reply_to },
891            ))
892            .run_with(Sink::head())
893            .unwrap();
894
895        assert_eq!(wait(head), 7);
896
897        stop_test_actor(actor_ref, handle);
898    }
899
900    #[test]
901    fn actor_flow_ask_actor_handler_can_use_tokio_io() {
902        let (actor_ref, handle, _) = spawn_test_actor();
903
904        let ports = Source::single(0_u64)
905            .via(ActorFlow::ask(
906                actor_ref.clone(),
907                1,
908                Duration::from_secs(1),
909                |_input, reply_to| AskTestMessage::BindTcp { reply_to },
910            ))
911            .run_collect()
912            .unwrap();
913
914        assert_eq!(ports.len(), 1);
915        assert_ne!(ports[0], 0);
916        stop_test_actor(actor_ref, handle);
917    }
918
919    #[test]
920    fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
921        // Replies are delayed well past the spin+yield window, so each wait must
922        // reach the park branch and be woken by `ReplyState::send` rather than
923        // busy-yielding until the (long) deadline. Parallelism > 1 also exercises
924        // registering/unregistering several in-flight waiters on one parked
925        // consumer. This is the regression guard for the long-deadline ask path
926        // that previously yielded indefinitely (never parked).
927        let (actor_ref, handle, _) = spawn_test_actor();
928
929        let values = Source::from_iter(0_u64..8)
930            .via(ActorFlow::ask(
931                actor_ref.clone(),
932                4,
933                Duration::from_secs(30),
934                |input, reply_to| AskTestMessage::Delayed {
935                    input,
936                    delay: Duration::from_millis(25),
937                    reply_to,
938                },
939            ))
940            .run_with(Sink::collect())
941            .unwrap();
942
943        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
944        stop_test_actor(actor_ref, handle);
945    }
946
947    #[test]
948    fn dropped_in_flight_ask_closes_reply_port() {
949        let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
950        let reply_to = ReplyPort::new(StdArc::clone(&state));
951        let ask = InFlightAsk {
952            index: 0,
953            state: Some(StdArc::clone(&state)),
954            deadline: None,
955        };
956
957        drop(ask);
958
959        assert!(reply_to.is_closed());
960        assert_eq!(reply_to.send(1), Err(ReplySendError));
961    }
962
963    #[test]
964    fn actor_flow_ask_times_out() {
965        let (actor_ref, handle, _) = spawn_test_actor();
966        let timeout = Duration::from_millis(10);
967
968        let result = Source::single(1_u64)
969            .via(ActorFlow::ask(
970                actor_ref.clone(),
971                1,
972                timeout,
973                |_input, reply_to| AskTestMessage::NeverReply {
974                    _reply_to: reply_to,
975                },
976            ))
977            .run_collect();
978
979        assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
980        stop_test_actor(actor_ref, handle);
981    }
982
983    #[test]
984    fn actor_flow_ask_fails_when_actor_is_stopped() {
985        let (actor_ref, handle, _) = spawn_test_actor();
986        actor_ref.stop(None);
987        block_on_ractor_ask_runtime(async move {
988            handle.await.expect("test actor task joins");
989        });
990
991        let result = Source::single(1_u64)
992            .via(ActorFlow::ask(
993                actor_ref,
994                1,
995                Duration::from_secs(1),
996                |input, reply_to| AskTestMessage::Delayed {
997                    input,
998                    delay: Duration::ZERO,
999                    reply_to,
1000                },
1001            ))
1002            .run_collect();
1003
1004        assert_eq!(result, Err(StreamError::ActorTerminated));
1005    }
1006
1007    #[test]
1008    fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1009        let (actor_ref, handle, _) = spawn_test_actor();
1010
1011        let result = Source::single(1_u64)
1012            .via(ActorFlow::ask(
1013                actor_ref.clone(),
1014                1,
1015                Duration::from_secs(1),
1016                |_input, reply_to| AskTestMessage::DropReply {
1017                    _reply_to: reply_to,
1018                },
1019            ))
1020            .run_collect();
1021
1022        assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1023        stop_test_actor(actor_ref, handle);
1024    }
1025
1026    #[test]
1027    fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1028        let (actor_ref, handle, _) = spawn_test_actor();
1029
1030        let result = Source::single(1_u64)
1031            .via(ActorFlow::ask(
1032                actor_ref.clone(),
1033                1,
1034                Duration::from_secs(1),
1035                |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1036                    panic!("message builder failed");
1037                },
1038            ))
1039            .run_collect();
1040
1041        assert_eq!(
1042            result,
1043            Err(StreamError::ActorAskTaskFailed {
1044                reason: "message builder failed".to_owned()
1045            })
1046        );
1047        stop_test_actor(actor_ref, handle);
1048    }
1049}