Skip to main content

datum/
actor.rs

1//! Actor interop APIs backed by Ractor for local execution.
2
3use std::{
4    any::Any,
5    error::Error,
6    fmt,
7    panic::{AssertUnwindSafe, catch_unwind},
8    sync::{
9        Arc, Mutex, MutexGuard,
10        atomic::{AtomicBool, AtomicU8, Ordering},
11    },
12    thread::{self, Thread, ThreadId},
13    time::{Duration, Instant},
14};
15
16use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
17
18pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
19
20mod interop;
21mod stream_ref;
22pub mod stream_ref_proto;
23pub use interop::{
24    ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
25    ActorSourceMessage, ActorStatus, WatchEvent,
26};
27pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
28pub use stream_ref_proto::{
29    StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefOutbound, StreamRefPayload,
30    StreamRefPayloadBatch, StreamRefPayloadBytes, StreamRefProtoConsumer, StreamRefProtoEndpoint,
31    StreamRefProtoProducer,
32};
33
34pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
35
36const ASK_READY_SPINS: usize = 256;
37/// Bounded number of `yield_now()` calls between the spin window and parking.
38/// This is deliberately deadline-independent: parking is gated only by this
39/// fixed transition budget, and the *park duration* is what respects the
40/// deadline (see [`next_ask_park`]). A fast reply is caught during the spin or
41/// yield window; a genuinely slow reply (or a long, idle timeout) parks and is
42/// woken by `ReplyState::send`/`drop_sender`/`close_on_timeout` rather than
43/// burning a core busy-yielding until the deadline.
44const ASK_IDLE_YIELDS: usize = 64;
45const ASK_MAX_PARK: Duration = Duration::from_millis(1);
46const ASK_TIME_REFRESH_ITERS: u32 = 64;
47
48// Fast-path reply gate values. `poll()` reads this atomically and only takes
49// the slot mutex once a reply (or drop) is actually present, so the busy-spin
50// consumer does not contend with the replying actor on every spin iteration.
51const REPLY_PENDING: u8 = 0;
52const REPLY_READY: u8 = 1;
53const REPLY_DROPPED: u8 = 2;
54
55/// Collection of actor-aware stream flows.
56pub struct ActorFlow;
57
58impl ActorFlow {
59    /// Sends each stream element to an actor using a request/reply message.
60    ///
61    /// The returned flow preserves input order while allowing up to `parallelism`
62    /// requests in flight. It fails the stream if a request times out, the actor
63    /// cannot be sent to, or the reply port is dropped before a response arrives.
64    #[must_use]
65    pub fn ask<In, Msg, Out, F>(
66        actor_ref: ActorRef<Msg>,
67        parallelism: usize,
68        timeout: Duration,
69        make_msg: F,
70    ) -> Flow<In, Out, NotUsed>
71    where
72        In: Send + 'static,
73        Msg: Message,
74        Out: Send + 'static,
75        F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
76    {
77        ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
78    }
79}
80
81/// One-shot reply handle supplied to actor ask messages.
82///
83/// This wrapper keeps Datum's stream-facing API from exposing Ractor's concrete
84/// RPC port type. The current implementation is local/Ractor-backed, and the
85/// wrapper leaves room for a later transport-specific reply implementation.
86pub struct ReplyPort<T> {
87    inner: Arc<ReplyState<T>>,
88    active: bool,
89}
90
91#[derive(Debug)]
92struct ReplyState<T> {
93    timeout: Duration,
94    receiver_closed: AtomicBool,
95    gate: AtomicU8,
96    slot: Mutex<ReplySlotState<T>>,
97}
98
99#[derive(Debug)]
100struct ReplySlotState<T> {
101    value: ReplySlot<T>,
102    waiter: Option<Thread>,
103}
104
105#[derive(Debug)]
106enum ReplySlot<T> {
107    Pending,
108    Ready(T),
109    Dropped,
110}
111
112enum ReplyPoll<T> {
113    Pending,
114    Ready(T),
115    Dropped,
116}
117
118impl<T> fmt::Debug for ReplyPort<T> {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        f.debug_struct("ReplyPort")
121            .field("timeout", &self.timeout())
122            .field("closed", &self.is_closed())
123            .finish_non_exhaustive()
124    }
125}
126
127/// Error returned when an actor tries to answer an ask whose receiver is gone.
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub struct ReplySendError;
130
131impl fmt::Display for ReplySendError {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        f.write_str("actor reply receiver dropped")
134    }
135}
136
137impl Error for ReplySendError {}
138
139impl<T> ReplyPort<T> {
140    fn new(inner: Arc<ReplyState<T>>) -> Self {
141        Self {
142            inner,
143            active: true,
144        }
145    }
146
147    #[must_use]
148    pub fn timeout(&self) -> Option<Duration> {
149        Some(self.inner.timeout)
150    }
151
152    #[must_use]
153    pub fn is_closed(&self) -> bool {
154        self.inner.is_closed()
155    }
156
157    pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
158        self.active = false;
159        self.inner.send(reply)
160    }
161}
162
163impl<T> Drop for ReplyPort<T> {
164    fn drop(&mut self) {
165        if self.active {
166            self.inner.drop_sender();
167        }
168    }
169}
170
171impl<T> ReplyState<T> {
172    fn new(timeout: Duration) -> Self {
173        Self {
174            timeout,
175            receiver_closed: AtomicBool::new(false),
176            gate: AtomicU8::new(REPLY_PENDING),
177            slot: Mutex::new(ReplySlotState {
178                value: ReplySlot::Pending,
179                waiter: None,
180            }),
181        }
182    }
183
184    /// Locks the reply slot, recovering the guard if a previous holder panicked
185    /// while holding it (e.g. a user reply value whose `Drop` panicked). The
186    /// slot is a simple three-state enum with no cross-field invariants, so
187    /// recovering from poison cannot produce unsound or wrong-result behavior
188    /// and avoids escalating one localized panic into a stream-wide crash.
189    fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
190        self.slot
191            .lock()
192            .unwrap_or_else(|poison| poison.into_inner())
193    }
194
195    fn reset(&self, timeout: Duration) {
196        self.receiver_closed.store(false, Ordering::Release);
197        self.gate.store(REPLY_PENDING, Ordering::Release);
198        let mut slot = self.lock_slot();
199        slot.value = ReplySlot::Pending;
200        slot.waiter = None;
201        debug_assert_eq!(self.timeout, timeout);
202    }
203
204    fn is_closed(&self) -> bool {
205        self.receiver_closed.load(Ordering::Acquire)
206    }
207
208    fn send(&self, reply: T) -> Result<(), ReplySendError> {
209        if self.is_closed() {
210            return Err(ReplySendError);
211        }
212
213        let mut slot = self.lock_slot();
214        if self.is_closed() {
215            return Err(ReplySendError);
216        }
217
218        if !matches!(slot.value, ReplySlot::Pending) {
219            return Err(ReplySendError);
220        }
221
222        slot.value = ReplySlot::Ready(reply);
223        let waiter = slot.waiter.clone();
224        // Publish AFTER the value write; the consumer's Acquire load of the
225        // gate synchronizes-with this store.
226        self.gate.store(REPLY_READY, Ordering::Release);
227        drop(slot);
228        wake_waiter(waiter);
229        Ok(())
230    }
231
232    fn poll(&self) -> ReplyPoll<T> {
233        if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
234            // Lock-free fast path: nothing has been delivered yet.
235            return ReplyPoll::Pending;
236        }
237        self.take_locked()
238    }
239
240    /// Closes the receiver under the slot lock so a concurrent `send()` observes
241    /// the closure on its under-lock recheck and reports `ReplySendError`
242    /// instead of disagreeing with a reported timeout. If a reply (or drop)
243    /// raced in just before the deadline, it is delivered rather than lost.
244    fn close_on_timeout(&self) -> ReplyPoll<T> {
245        let mut slot = self.lock_slot();
246        self.receiver_closed.store(true, Ordering::Release);
247        let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
248            ReplySlot::Pending => ReplyPoll::Pending,
249            ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
250            ReplySlot::Dropped => ReplyPoll::Dropped,
251        };
252        let waiter = slot.waiter.take();
253        drop(slot);
254        wake_waiter(waiter);
255        outcome
256    }
257
258    fn take_locked(&self) -> ReplyPoll<T> {
259        let mut slot = self.lock_slot();
260        slot.waiter = None;
261        match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
262            ReplySlot::Pending => ReplyPoll::Pending,
263            ReplySlot::Ready(reply) => {
264                self.close_receiver();
265                ReplyPoll::Ready(reply)
266            }
267            ReplySlot::Dropped => {
268                self.close_receiver();
269                ReplyPoll::Dropped
270            }
271        }
272    }
273
274    fn drop_sender(&self) {
275        let mut slot = self.lock_slot();
276        if matches!(slot.value, ReplySlot::Pending) {
277            slot.value = ReplySlot::Dropped;
278            let waiter = slot.waiter.clone();
279            self.gate.store(REPLY_DROPPED, Ordering::Release);
280            drop(slot);
281            wake_waiter(waiter);
282        }
283    }
284
285    fn close_receiver(&self) {
286        self.receiver_closed.store(true, Ordering::Release);
287    }
288
289    fn register_waiter(&self, waiter: Thread) {
290        self.lock_slot().waiter = Some(waiter);
291    }
292
293    fn unregister_waiter(&self, waiter_id: ThreadId) {
294        let mut slot = self.lock_slot();
295        if slot
296            .waiter
297            .as_ref()
298            .is_some_and(|thread| thread.id() == waiter_id)
299        {
300            slot.waiter = None;
301        }
302    }
303}
304
305fn wake_waiter(waiter: Option<Thread>) {
306    if let Some(waiter) = waiter {
307        waiter.unpark();
308    }
309}
310
311fn ask_flow<In, Msg, Out, F>(
312    actor_ref: ActorRef<Msg>,
313    parallelism: usize,
314    timeout: Duration,
315    make_msg: Arc<F>,
316) -> Flow<In, Out>
317where
318    In: Send + 'static,
319    Msg: Message,
320    Out: Send + 'static,
321    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
322{
323    assert!(
324        parallelism > 0,
325        "ActorFlow::ask parallelism must be greater than zero"
326    );
327    // Deliberately `from_transform`, not `from_preserving_transform`: an ask must
328    // not inherit the inline head-terminal hint. If it did, a bounded eager source
329    // feeding `Sink::head` would run the ask's blocking cross-thread reply wait
330    // inline on the caller during materialization, which breaks the non-blocking /
331    // awaitable contract of `StreamCompletion` and can deadlock if
332    // `run_with_materializer()` is `.await`ed on a runtime thread the actor needs
333    // to make progress. Ask graphs always run on a worker.
334    Flow::from_transform(move |input| {
335        ask_ractor_ordered(
336            input,
337            actor_ref.clone(),
338            parallelism,
339            timeout,
340            Arc::clone(&make_msg),
341        )
342    })
343}
344
345fn ask_ractor_ordered<In, Msg, Out, F>(
346    mut input: BoxStream<In>,
347    actor_ref: ActorRef<Msg>,
348    parallelism: usize,
349    timeout: Duration,
350    make_msg: Arc<F>,
351) -> BoxStream<Out>
352where
353    In: Send + 'static,
354    Msg: Message,
355    Out: Send + 'static,
356    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
357{
358    let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
359    let mut next_index = 0_usize;
360    let mut next_to_emit = 0_usize;
361    let mut completed = Vec::with_capacity(parallelism);
362    let mut reply_pool = Vec::with_capacity(parallelism);
363    let mut input_done = false;
364
365    Box::new(std::iter::from_fn(move || {
366        loop {
367            if let Some(result) = take_completed(&mut completed, next_to_emit) {
368                next_to_emit += 1;
369                return Some(result);
370            }
371
372            while in_flight.len() < parallelism && !input_done {
373                match input.next() {
374                    Some(Ok(item)) => {
375                        let index = next_index;
376                        next_index += 1;
377                        match start_ractor_ask(
378                            index,
379                            actor_ref.clone(),
380                            timeout,
381                            item,
382                            Arc::clone(&make_msg),
383                            &mut reply_pool,
384                        ) {
385                            Ok(ask) => in_flight.push(ask),
386                            Err(error) => {
387                                completed.push((index, Err(error)));
388                                input_done = true;
389                            }
390                        }
391                    }
392                    Some(Err(error)) => {
393                        completed.push((next_index, Err(error)));
394                        next_index += 1;
395                        input_done = true;
396                    }
397                    None => input_done = true,
398                }
399            }
400
401            if let Some(result) = take_completed(&mut completed, next_to_emit) {
402                next_to_emit += 1;
403                return Some(result);
404            }
405
406            if in_flight.is_empty() {
407                return None;
408            }
409
410            let ask = wait_for_ready_ask(&mut in_flight, timeout);
411            let index = ask.index;
412            let result = ask.result;
413            recycle_reply_state(ask.state, &mut reply_pool);
414            if index == next_to_emit {
415                next_to_emit += 1;
416                return Some(result);
417            }
418            completed.push((index, result));
419        }
420    }))
421}
422
423fn take_completed<Out>(
424    completed: &mut Vec<(usize, StreamResult<Out>)>,
425    index: usize,
426) -> Option<StreamResult<Out>> {
427    let position = completed
428        .iter()
429        .position(|(completed_index, _)| *completed_index == index)?;
430    Some(completed.swap_remove(position).1)
431}
432
433struct InFlightAsk<Out> {
434    index: usize,
435    state: Option<Arc<ReplyState<Out>>>,
436    /// `None` when `Instant::now() + timeout` would overflow; treated as "never
437    /// time out" rather than panicking on the add.
438    deadline: Option<Instant>,
439}
440
441impl<Out> InFlightAsk<Out> {
442    fn state(&self) -> &Arc<ReplyState<Out>> {
443        self.state.as_ref().expect("in-flight ask has reply state")
444    }
445
446    fn into_state(mut self) -> Arc<ReplyState<Out>> {
447        self.state.take().expect("in-flight ask has reply state")
448    }
449}
450
451impl<Out> Drop for InFlightAsk<Out> {
452    fn drop(&mut self) {
453        if let Some(state) = &self.state {
454            state.close_receiver();
455        }
456    }
457}
458
459struct CompletedAsk<Out> {
460    index: usize,
461    result: StreamResult<Out>,
462    state: Arc<ReplyState<Out>>,
463}
464
465fn start_ractor_ask<In, Msg, Out, F>(
466    index: usize,
467    actor_ref: ActorRef<Msg>,
468    timeout: Duration,
469    input: In,
470    make_msg: Arc<F>,
471    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
472) -> StreamResult<InFlightAsk<Out>>
473where
474    In: Send + 'static,
475    Msg: Message,
476    Out: Send + 'static,
477    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
478{
479    let reply_state = match reply_pool.pop() {
480        // A freshly allocated state is already Pending/open, so only recycled
481        // states need a reset.
482        Some(state) => {
483            state.reset(timeout);
484            state
485        }
486        None => Arc::new(ReplyState::new(timeout)),
487    };
488    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
489    let message =
490        catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
491            StreamError::ActorAskTaskFailed {
492                reason: panic_reason(panic),
493            }
494        })?;
495
496    match actor_ref.cast(message) {
497        Ok(()) => {}
498        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
499            return Err(StreamError::ActorTerminated);
500        }
501        Err(error) => {
502            return Err(StreamError::ActorAskSendFailed {
503                reason: error.to_string(),
504            });
505        }
506    }
507
508    Ok(InFlightAsk {
509        index,
510        state: Some(reply_state),
511        deadline: Instant::now().checked_add(timeout),
512    })
513}
514
515fn wait_for_ready_ask<Out>(
516    in_flight: &mut Vec<InFlightAsk<Out>>,
517    timeout: Duration,
518) -> CompletedAsk<Out> {
519    let mut idle_spins = 0;
520    let mut idle_yields = 0;
521    let mut time_refresh = 0_u32;
522    let mut now = Instant::now();
523    loop {
524        if time_refresh == 0 {
525            now = Instant::now();
526        }
527        time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
528        if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
529            return ask;
530        }
531
532        if idle_spins < ASK_READY_SPINS {
533            idle_spins += 1;
534            std::hint::spin_loop();
535        } else if idle_yields < ASK_IDLE_YIELDS {
536            idle_yields += 1;
537            time_refresh = 0;
538            thread::yield_now();
539        } else {
540            idle_spins = 0;
541            idle_yields = 0;
542            time_refresh = 0;
543            let current = thread::current();
544            let registered = register_ask_waiters(in_flight, &current);
545            now = Instant::now();
546            if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
547                unregister_ask_waiters(registered, current.id());
548                return ask;
549            }
550            thread::park_timeout(next_ask_park(in_flight, now));
551            unregister_ask_waiters(registered, current.id());
552        }
553    }
554}
555
556fn take_ready_ask<Out>(
557    in_flight: &mut Vec<InFlightAsk<Out>>,
558    timeout: Duration,
559    now: Instant,
560) -> Option<CompletedAsk<Out>> {
561    let mut index = 0;
562    while index < in_flight.len() {
563        match in_flight[index].state().poll() {
564            ReplyPoll::Ready(reply) => {
565                let ask = in_flight.swap_remove(index);
566                return Some(CompletedAsk {
567                    index: ask.index,
568                    result: Ok(reply),
569                    state: ask.into_state(),
570                });
571            }
572            ReplyPoll::Dropped => {
573                let ask = in_flight.swap_remove(index);
574                return Some(CompletedAsk {
575                    index: ask.index,
576                    result: Err(StreamError::ActorAskResponseDropped),
577                    state: ask.into_state(),
578                });
579            }
580            ReplyPoll::Pending => {
581                if in_flight[index]
582                    .deadline
583                    .is_some_and(|deadline| now >= deadline)
584                {
585                    // Close under the slot lock; deliver a reply that raced in
586                    // just before the deadline rather than losing it.
587                    let outcome = in_flight[index].state().close_on_timeout();
588                    let ask = in_flight.swap_remove(index);
589                    let result = match outcome {
590                        ReplyPoll::Ready(reply) => Ok(reply),
591                        ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
592                        ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
593                    };
594                    return Some(CompletedAsk {
595                        index: ask.index,
596                        result,
597                        state: ask.into_state(),
598                    });
599                }
600                index += 1;
601            }
602        }
603    }
604    None
605}
606
607fn register_ask_waiters<Out>(
608    in_flight: &[InFlightAsk<Out>],
609    current: &Thread,
610) -> Vec<Arc<ReplyState<Out>>> {
611    let mut registered = Vec::with_capacity(in_flight.len());
612    for ask in in_flight {
613        ask.state().register_waiter(current.clone());
614        registered.push(Arc::clone(ask.state()));
615    }
616    registered
617}
618
619fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
620    for state in registered {
621        state.unregister_waiter(current_id);
622    }
623}
624
625fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
626    next_ask_deadline_remaining(in_flight, now)
627        .unwrap_or(ASK_MAX_PARK)
628        .min(ASK_MAX_PARK)
629}
630
631fn next_ask_deadline_remaining<Out>(
632    in_flight: &[InFlightAsk<Out>],
633    now: Instant,
634) -> Option<Duration> {
635    in_flight
636        .iter()
637        .filter_map(|ask| ask.deadline)
638        .map(|deadline| deadline.saturating_duration_since(now))
639        .min()
640}
641
642fn recycle_reply_state<Out>(
643    state: Arc<ReplyState<Out>>,
644    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
645) {
646    if Arc::strong_count(&state) == 1 {
647        reply_pool.push(state);
648    }
649}
650
651fn panic_reason(panic: Box<dyn Any + Send>) -> String {
652    if let Some(reason) = panic.downcast_ref::<&str>() {
653        (*reason).to_owned()
654    } else if let Some(reason) = panic.downcast_ref::<String>() {
655        reason.clone()
656    } else {
657        "actor ask task panicked".to_owned()
658    }
659}
660
661pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
662    static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
663        std::sync::OnceLock::new();
664
665    match RUNTIME.get_or_init(|| {
666        tokio::runtime::Builder::new_multi_thread()
667            .thread_name("datum-ractor-runtime")
668            .enable_all()
669            .build()
670            .map_err(|error| format!("ractor runtime failed to start: {error}"))
671    }) {
672        Ok(runtime) => Ok(runtime),
673        Err(error) => Err(StreamError::Failed(error.clone())),
674    }
675}
676
677pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
678where
679    T: Send + 'static,
680    F: std::future::Future<Output = T> + Send + 'static,
681{
682    let runtime = ractor_runtime()?;
683    let result = if tokio::runtime::Handle::try_current().is_ok() {
684        std::thread::spawn(move || runtime.block_on(future))
685            .join()
686            .map_err(|_| StreamError::AbruptTermination)?
687    } else {
688        runtime.block_on(future)
689    };
690    Ok(result)
691}
692
693#[cfg(test)]
694fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
695where
696    T: Send + 'static,
697    F: std::future::Future<Output = T> + Send + 'static,
698{
699    block_on_ractor_runtime(future).expect("ractor ask runtime starts")
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    use crate::stream::{Sink, Source, StreamCompletion};
706    use std::sync::{
707        Arc as StdArc,
708        atomic::{AtomicUsize, Ordering},
709    };
710
711    enum AskTestMessage {
712        Delayed {
713            input: u64,
714            delay: Duration,
715            reply_to: ReplyPort<u64>,
716        },
717        Track {
718            input: u64,
719            reply_to: ReplyPort<u64>,
720        },
721        NeverReply {
722            _reply_to: ReplyPort<u64>,
723        },
724        DropReply {
725            _reply_to: ReplyPort<u64>,
726        },
727        BindTcp {
728            reply_to: ReplyPort<u64>,
729        },
730    }
731
732    #[cfg(feature = "cluster")]
733    impl Message for AskTestMessage {}
734
735    struct AskTestActor;
736
737    struct AskTestState {
738        active: StdArc<AtomicUsize>,
739        max_active: StdArc<AtomicUsize>,
740        held_replies: Vec<ReplyPort<u64>>,
741    }
742
743    impl Actor for AskTestActor {
744        type Msg = AskTestMessage;
745        type State = AskTestState;
746        type Arguments = AskTestState;
747
748        async fn pre_start(
749            &self,
750            _myself: ActorRef<Self::Msg>,
751            args: Self::Arguments,
752        ) -> Result<Self::State, ActorProcessingErr> {
753            Ok(args)
754        }
755
756        async fn handle(
757            &self,
758            _myself: ActorRef<Self::Msg>,
759            message: Self::Msg,
760            state: &mut Self::State,
761        ) -> Result<(), ActorProcessingErr> {
762            match message {
763                AskTestMessage::Delayed {
764                    input,
765                    delay,
766                    reply_to,
767                } => {
768                    ractor::concurrency::spawn(async move {
769                        ractor::concurrency::sleep(delay).await;
770                        let _ = reply_to.send(input);
771                    });
772                }
773                AskTestMessage::Track { input, reply_to } => {
774                    let active = StdArc::clone(&state.active);
775                    let max_active = StdArc::clone(&state.max_active);
776                    let current = active.fetch_add(1, Ordering::SeqCst) + 1;
777                    max_active.fetch_max(current, Ordering::SeqCst);
778                    ractor::concurrency::spawn(async move {
779                        ractor::concurrency::sleep(Duration::from_millis(20)).await;
780                        active.fetch_sub(1, Ordering::SeqCst);
781                        let _ = reply_to.send(input);
782                    });
783                }
784                AskTestMessage::NeverReply { _reply_to } => {
785                    state.held_replies.push(_reply_to);
786                }
787                AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
788                AskTestMessage::BindTcp { reply_to } => {
789                    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
790                    let port = listener.local_addr()?.port();
791                    let _ = reply_to.send(u64::from(port));
792                }
793            }
794            Ok(())
795        }
796    }
797
798    fn wait<T>(completion: StreamCompletion<T>) -> T {
799        completion.wait().unwrap()
800    }
801
802    fn spawn_test_actor() -> (
803        ActorRef<AskTestMessage>,
804        ractor::concurrency::JoinHandle<()>,
805        StdArc<AtomicUsize>,
806    ) {
807        let active = StdArc::new(AtomicUsize::new(0));
808        let max_active = StdArc::new(AtomicUsize::new(0));
809        let state = AskTestState {
810            active,
811            max_active: StdArc::clone(&max_active),
812            held_replies: Vec::new(),
813        };
814        let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
815            Actor::spawn(None, AskTestActor, state)
816                .await
817                .expect("test actor spawns")
818        });
819        (actor_ref, handle, max_active)
820    }
821
822    fn stop_test_actor(
823        actor_ref: ActorRef<AskTestMessage>,
824        handle: ractor::concurrency::JoinHandle<()>,
825    ) {
826        actor_ref.stop(None);
827        block_on_ractor_ask_runtime(async move {
828            handle.await.expect("test actor task joins");
829        });
830    }
831
832    #[test]
833    fn actor_flow_ask_preserves_order_with_parallelism() {
834        let (actor_ref, handle, _) = spawn_test_actor();
835
836        let values = Source::from_iter(0_u64..5)
837            .via(ActorFlow::ask(
838                actor_ref.clone(),
839                5,
840                Duration::from_secs(1),
841                |input, reply_to| AskTestMessage::Delayed {
842                    input,
843                    delay: Duration::from_millis((5 - input) * 10),
844                    reply_to,
845                },
846            ))
847            .run_with(Sink::collect())
848            .unwrap();
849
850        assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
851        stop_test_actor(actor_ref, handle);
852    }
853
854    #[test]
855    fn actor_flow_ask_respects_parallelism() {
856        let (actor_ref, handle, max_active) = spawn_test_actor();
857
858        let values = Source::from_iter(0_u64..8)
859            .via(ActorFlow::ask(
860                actor_ref.clone(),
861                3,
862                Duration::from_secs(1),
863                |input, reply_to| AskTestMessage::Track { input, reply_to },
864            ))
865            .run_with(Sink::collect())
866            .unwrap();
867
868        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
869        assert_eq!(max_active.load(Ordering::SeqCst), 3);
870        stop_test_actor(actor_ref, handle);
871    }
872
873    #[test]
874    fn actor_flow_ask_into_head_returns_value() {
875        // `ActorFlow::ask` does *not* preserve the inline head-terminal hint, so a
876        // bounded eager source feeding `Sink::head` through an ask runs the ask
877        // graph (including the cross-thread reply wait) on a runtime worker rather
878        // than inline on the caller. That keeps `run_with_materializer()`
879        // non-blocking/awaitable: it returns a `StreamCompletion` backed by a
880        // oneshot that resolves once the worker drains the ask round-trip. This
881        // guards that the ask -> head graph still returns the correct first reply
882        // end to end. (The reply is delayed: `Track` sleeps 20 ms on the actor
883        // runtime before replying.)
884        let (actor_ref, handle, _) = spawn_test_actor();
885
886        let head = Source::single(7_u64)
887            .via(ActorFlow::ask(
888                actor_ref.clone(),
889                1,
890                Duration::from_secs(1),
891                |input, reply_to| AskTestMessage::Track { input, reply_to },
892            ))
893            .run_with(Sink::head())
894            .unwrap();
895
896        assert_eq!(wait(head), 7);
897
898        stop_test_actor(actor_ref, handle);
899    }
900
901    #[test]
902    fn actor_flow_ask_actor_handler_can_use_tokio_io() {
903        let (actor_ref, handle, _) = spawn_test_actor();
904
905        let ports = Source::single(0_u64)
906            .via(ActorFlow::ask(
907                actor_ref.clone(),
908                1,
909                Duration::from_secs(1),
910                |_input, reply_to| AskTestMessage::BindTcp { reply_to },
911            ))
912            .run_collect()
913            .unwrap();
914
915        assert_eq!(ports.len(), 1);
916        assert_ne!(ports[0], 0);
917        stop_test_actor(actor_ref, handle);
918    }
919
920    #[test]
921    fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
922        // Replies are delayed well past the spin+yield window, so each wait must
923        // reach the park branch and be woken by `ReplyState::send` rather than
924        // busy-yielding until the (long) deadline. Parallelism > 1 also exercises
925        // registering/unregistering several in-flight waiters on one parked
926        // consumer. This is the regression guard for the long-deadline ask path
927        // that previously yielded indefinitely (never parked).
928        let (actor_ref, handle, _) = spawn_test_actor();
929
930        let values = Source::from_iter(0_u64..8)
931            .via(ActorFlow::ask(
932                actor_ref.clone(),
933                4,
934                Duration::from_secs(30),
935                |input, reply_to| AskTestMessage::Delayed {
936                    input,
937                    delay: Duration::from_millis(25),
938                    reply_to,
939                },
940            ))
941            .run_with(Sink::collect())
942            .unwrap();
943
944        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
945        stop_test_actor(actor_ref, handle);
946    }
947
948    #[test]
949    fn dropped_in_flight_ask_closes_reply_port() {
950        let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
951        let reply_to = ReplyPort::new(StdArc::clone(&state));
952        let ask = InFlightAsk {
953            index: 0,
954            state: Some(StdArc::clone(&state)),
955            deadline: None,
956        };
957
958        drop(ask);
959
960        assert!(reply_to.is_closed());
961        assert_eq!(reply_to.send(1), Err(ReplySendError));
962    }
963
964    #[test]
965    fn actor_flow_ask_times_out() {
966        let (actor_ref, handle, _) = spawn_test_actor();
967        let timeout = Duration::from_millis(10);
968
969        let result = Source::single(1_u64)
970            .via(ActorFlow::ask(
971                actor_ref.clone(),
972                1,
973                timeout,
974                |_input, reply_to| AskTestMessage::NeverReply {
975                    _reply_to: reply_to,
976                },
977            ))
978            .run_collect();
979
980        assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
981        stop_test_actor(actor_ref, handle);
982    }
983
984    #[test]
985    fn actor_flow_ask_fails_when_actor_is_stopped() {
986        let (actor_ref, handle, _) = spawn_test_actor();
987        actor_ref.stop(None);
988        block_on_ractor_ask_runtime(async move {
989            handle.await.expect("test actor task joins");
990        });
991
992        let result = Source::single(1_u64)
993            .via(ActorFlow::ask(
994                actor_ref,
995                1,
996                Duration::from_secs(1),
997                |input, reply_to| AskTestMessage::Delayed {
998                    input,
999                    delay: Duration::ZERO,
1000                    reply_to,
1001                },
1002            ))
1003            .run_collect();
1004
1005        assert_eq!(result, Err(StreamError::ActorTerminated));
1006    }
1007
1008    #[test]
1009    fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1010        let (actor_ref, handle, _) = spawn_test_actor();
1011
1012        let result = Source::single(1_u64)
1013            .via(ActorFlow::ask(
1014                actor_ref.clone(),
1015                1,
1016                Duration::from_secs(1),
1017                |_input, reply_to| AskTestMessage::DropReply {
1018                    _reply_to: reply_to,
1019                },
1020            ))
1021            .run_collect();
1022
1023        assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1024        stop_test_actor(actor_ref, handle);
1025    }
1026
1027    #[test]
1028    fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1029        let (actor_ref, handle, _) = spawn_test_actor();
1030
1031        let result = Source::single(1_u64)
1032            .via(ActorFlow::ask(
1033                actor_ref.clone(),
1034                1,
1035                Duration::from_secs(1),
1036                |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1037                    panic!("message builder failed");
1038                },
1039            ))
1040            .run_collect();
1041
1042        assert_eq!(
1043            result,
1044            Err(StreamError::ActorAskTaskFailed {
1045                reason: "message builder failed".to_owned()
1046            })
1047        );
1048        stop_test_actor(actor_ref, handle);
1049    }
1050}