Skip to main content

datum/
actor.rs

1//! Actor interop: Ractor-backed stream bridges plus the remote StreamRefs seam.
2//!
3//! This module hosts [`ActorFlow::ask`] (and its status/context variants), the
4//! actor-backed [`ActorSource`]/[`ActorSink`]/[`ActorPubSub`] bridges, the
5//! one-shot [`ReplyPort`] reply handle, and the local-or-remote [`StreamRefs`]
6//! (`SourceRef`/`SinkRef`) handles. Submodules:
7//!
8//! - `interop` — `ask_with_status`/`ask_with_context`/`watch`, the actor
9//!   source/sink constructors, and the `pg`-backed pub/sub bridge.
10//! - `stream_ref` — `StreamRefs`: a same-process direct splice on the local
11//!   path, with an optional `cluster`-feature Ractor remote path.
12//! - [`stream_ref_proto`] — a transport-agnostic protobuf StreamRefs seam (no
13//!   actor transport) that `datum-net` drives over TCP/QUIC.
14//!
15//! The ask path uses a spin-then-park wait loop whose tuning constants are
16//! benchmark-load-bearing; see `CLAUDE.md` and this directory's `AGENTS.md`
17//! before touching it.
18
19use std::{
20    any::Any,
21    error::Error,
22    fmt,
23    panic::{AssertUnwindSafe, catch_unwind},
24    sync::{
25        Arc, Mutex, MutexGuard,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    thread::{self, Thread, ThreadId},
29    time::{Duration, Instant},
30};
31
32use crate::stream::{BoxStream, Flow, NotUsed, StreamError, StreamResult};
33
34pub use ractor::{Actor, ActorProcessingErr, ActorRef, Message};
35
36mod interop;
37mod stream_ref;
38pub mod stream_ref_proto;
39pub use interop::{
40    ActorPubSub, ActorSink, ActorSinkBackpressureMessage, ActorSinkMessage, ActorSource,
41    ActorSourceMessage, ActorStatus, WatchEvent,
42};
43pub use stream_ref::{SinkRef, SourceRef, StreamRefSettings, StreamRefs};
44pub use stream_ref_proto::{
45    StreamRefFrame, StreamRefId, StreamRefMessage, StreamRefOutbound, StreamRefPayload,
46    StreamRefPayloadBatch, StreamRefPayloadBytes, StreamRefProtoConsumer, StreamRefProtoEndpoint,
47    StreamRefProtoProducer,
48};
49
50pub type ActorResult<T = ()> = Result<T, ActorProcessingErr>;
51
52const ASK_READY_SPINS: usize = 256;
53/// Bounded number of `yield_now()` calls between the spin window and parking.
54/// This is deliberately deadline-independent: parking is gated only by this
55/// fixed transition budget, and the *park duration* is what respects the
56/// deadline (see [`next_ask_park`]). A fast reply is caught during the spin or
57/// yield window; a genuinely slow reply (or a long, idle timeout) parks and is
58/// woken by `ReplyState::send`/`drop_sender`/`close_on_timeout` rather than
59/// burning a core busy-yielding until the deadline.
60const ASK_IDLE_YIELDS: usize = 64;
61const ASK_MAX_PARK: Duration = Duration::from_millis(1);
62const ASK_TIME_REFRESH_ITERS: u32 = 64;
63
64// Fast-path reply gate values. `poll()` reads this atomically and only takes
65// the slot mutex once a reply (or drop) is actually present, so the busy-spin
66// consumer does not contend with the replying actor on every spin iteration.
67const REPLY_PENDING: u8 = 0;
68const REPLY_READY: u8 = 1;
69const REPLY_DROPPED: u8 = 2;
70
71/// Collection of actor-aware stream flows.
72pub struct ActorFlow;
73
74impl ActorFlow {
75    /// Sends each stream element to an actor using a request/reply message.
76    ///
77    /// The returned flow preserves input order while allowing up to `parallelism`
78    /// requests in flight. It fails the stream if a request times out, the actor
79    /// cannot be sent to, or the reply port is dropped before a response arrives.
80    #[must_use]
81    pub fn ask<In, Msg, Out, F>(
82        actor_ref: ActorRef<Msg>,
83        parallelism: usize,
84        timeout: Duration,
85        make_msg: F,
86    ) -> Flow<In, Out, NotUsed>
87    where
88        In: Send + 'static,
89        Msg: Message,
90        Out: Send + 'static,
91        F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
92    {
93        ask_flow(actor_ref, parallelism, timeout, Arc::new(make_msg))
94    }
95}
96
97/// One-shot reply handle supplied to actor ask messages.
98///
99/// This wrapper keeps Datum's stream-facing API from exposing Ractor's concrete
100/// RPC port type. The current implementation is local/Ractor-backed, and the
101/// wrapper leaves room for a later transport-specific reply implementation.
102pub struct ReplyPort<T> {
103    inner: Arc<ReplyState<T>>,
104    active: bool,
105}
106
107#[derive(Debug)]
108struct ReplyState<T> {
109    timeout: Duration,
110    receiver_closed: AtomicBool,
111    gate: AtomicU8,
112    slot: Mutex<ReplySlotState<T>>,
113}
114
115#[derive(Debug)]
116struct ReplySlotState<T> {
117    value: ReplySlot<T>,
118    waiter: Option<Thread>,
119}
120
121#[derive(Debug)]
122enum ReplySlot<T> {
123    Pending,
124    Ready(T),
125    Dropped,
126}
127
128enum ReplyPoll<T> {
129    Pending,
130    Ready(T),
131    Dropped,
132}
133
134impl<T> fmt::Debug for ReplyPort<T> {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        f.debug_struct("ReplyPort")
137            .field("timeout", &self.timeout())
138            .field("closed", &self.is_closed())
139            .finish_non_exhaustive()
140    }
141}
142
143/// Error returned when an actor tries to answer an ask whose receiver is gone.
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub struct ReplySendError;
146
147impl fmt::Display for ReplySendError {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.write_str("actor reply receiver dropped")
150    }
151}
152
153impl Error for ReplySendError {}
154
155impl<T> ReplyPort<T> {
156    fn new(inner: Arc<ReplyState<T>>) -> Self {
157        Self {
158            inner,
159            active: true,
160        }
161    }
162
163    #[must_use]
164    pub fn timeout(&self) -> Option<Duration> {
165        Some(self.inner.timeout)
166    }
167
168    #[must_use]
169    pub fn is_closed(&self) -> bool {
170        self.inner.is_closed()
171    }
172
173    pub fn send(mut self, reply: T) -> Result<(), ReplySendError> {
174        self.active = false;
175        self.inner.send(reply)
176    }
177}
178
179impl<T> Drop for ReplyPort<T> {
180    fn drop(&mut self) {
181        if self.active {
182            self.inner.drop_sender();
183        }
184    }
185}
186
187impl<T> ReplyState<T> {
188    fn new(timeout: Duration) -> Self {
189        Self {
190            timeout,
191            receiver_closed: AtomicBool::new(false),
192            gate: AtomicU8::new(REPLY_PENDING),
193            slot: Mutex::new(ReplySlotState {
194                value: ReplySlot::Pending,
195                waiter: None,
196            }),
197        }
198    }
199
200    /// Locks the reply slot, recovering the guard if a previous holder panicked
201    /// while holding it (e.g. a user reply value whose `Drop` panicked). The
202    /// slot is a simple three-state enum with no cross-field invariants, so
203    /// recovering from poison cannot produce unsound or wrong-result behavior
204    /// and avoids escalating one localized panic into a stream-wide crash.
205    fn lock_slot(&self) -> MutexGuard<'_, ReplySlotState<T>> {
206        self.slot
207            .lock()
208            .unwrap_or_else(|poison| poison.into_inner())
209    }
210
211    fn reset(&self, timeout: Duration) {
212        self.receiver_closed.store(false, Ordering::Release);
213        self.gate.store(REPLY_PENDING, Ordering::Release);
214        let mut slot = self.lock_slot();
215        slot.value = ReplySlot::Pending;
216        slot.waiter = None;
217        debug_assert_eq!(self.timeout, timeout);
218    }
219
220    fn is_closed(&self) -> bool {
221        self.receiver_closed.load(Ordering::Acquire)
222    }
223
224    fn send(&self, reply: T) -> Result<(), ReplySendError> {
225        if self.is_closed() {
226            return Err(ReplySendError);
227        }
228
229        let mut slot = self.lock_slot();
230        if self.is_closed() {
231            return Err(ReplySendError);
232        }
233
234        if !matches!(slot.value, ReplySlot::Pending) {
235            return Err(ReplySendError);
236        }
237
238        slot.value = ReplySlot::Ready(reply);
239        let waiter = slot.waiter.clone();
240        // Publish AFTER the value write; the consumer's Acquire load of the
241        // gate synchronizes-with this store.
242        self.gate.store(REPLY_READY, Ordering::Release);
243        drop(slot);
244        wake_waiter(waiter);
245        Ok(())
246    }
247
248    fn poll(&self) -> ReplyPoll<T> {
249        if self.gate.load(Ordering::Acquire) == REPLY_PENDING {
250            // Lock-free fast path: nothing has been delivered yet.
251            return ReplyPoll::Pending;
252        }
253        self.take_locked()
254    }
255
256    /// Closes the receiver under the slot lock so a concurrent `send()` observes
257    /// the closure on its under-lock recheck and reports `ReplySendError`
258    /// instead of disagreeing with a reported timeout. If a reply (or drop)
259    /// raced in just before the deadline, it is delivered rather than lost.
260    fn close_on_timeout(&self) -> ReplyPoll<T> {
261        let mut slot = self.lock_slot();
262        self.receiver_closed.store(true, Ordering::Release);
263        let outcome = match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
264            ReplySlot::Pending => ReplyPoll::Pending,
265            ReplySlot::Ready(reply) => ReplyPoll::Ready(reply),
266            ReplySlot::Dropped => ReplyPoll::Dropped,
267        };
268        let waiter = slot.waiter.take();
269        drop(slot);
270        wake_waiter(waiter);
271        outcome
272    }
273
274    fn take_locked(&self) -> ReplyPoll<T> {
275        let mut slot = self.lock_slot();
276        slot.waiter = None;
277        match std::mem::replace(&mut slot.value, ReplySlot::Pending) {
278            ReplySlot::Pending => ReplyPoll::Pending,
279            ReplySlot::Ready(reply) => {
280                self.close_receiver();
281                ReplyPoll::Ready(reply)
282            }
283            ReplySlot::Dropped => {
284                self.close_receiver();
285                ReplyPoll::Dropped
286            }
287        }
288    }
289
290    fn drop_sender(&self) {
291        let mut slot = self.lock_slot();
292        if matches!(slot.value, ReplySlot::Pending) {
293            slot.value = ReplySlot::Dropped;
294            let waiter = slot.waiter.clone();
295            self.gate.store(REPLY_DROPPED, Ordering::Release);
296            drop(slot);
297            wake_waiter(waiter);
298        }
299    }
300
301    fn close_receiver(&self) {
302        self.receiver_closed.store(true, Ordering::Release);
303    }
304
305    fn register_waiter(&self, waiter: Thread) {
306        self.lock_slot().waiter = Some(waiter);
307    }
308
309    fn unregister_waiter(&self, waiter_id: ThreadId) {
310        let mut slot = self.lock_slot();
311        if slot
312            .waiter
313            .as_ref()
314            .is_some_and(|thread| thread.id() == waiter_id)
315        {
316            slot.waiter = None;
317        }
318    }
319}
320
321fn wake_waiter(waiter: Option<Thread>) {
322    if let Some(waiter) = waiter {
323        waiter.unpark();
324    }
325}
326
327fn ask_flow<In, Msg, Out, F>(
328    actor_ref: ActorRef<Msg>,
329    parallelism: usize,
330    timeout: Duration,
331    make_msg: Arc<F>,
332) -> Flow<In, Out>
333where
334    In: Send + 'static,
335    Msg: Message,
336    Out: Send + 'static,
337    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
338{
339    assert!(
340        parallelism > 0,
341        "ActorFlow::ask parallelism must be greater than zero"
342    );
343    // Deliberately `from_transform`, not `from_preserving_transform`: an ask must
344    // not inherit the inline head-terminal hint. If it did, a bounded eager source
345    // feeding `Sink::head` would run the ask's blocking cross-thread reply wait
346    // inline on the caller during materialization, which breaks the non-blocking /
347    // awaitable contract of `StreamCompletion` and can deadlock if
348    // `run_with_materializer()` is `.await`ed on a runtime thread the actor needs
349    // to make progress. Ask graphs always run on a worker.
350    Flow::from_transform(move |input| {
351        ask_ractor_ordered(
352            input,
353            actor_ref.clone(),
354            parallelism,
355            timeout,
356            Arc::clone(&make_msg),
357        )
358    })
359}
360
361fn ask_ractor_ordered<In, Msg, Out, F>(
362    mut input: BoxStream<In>,
363    actor_ref: ActorRef<Msg>,
364    parallelism: usize,
365    timeout: Duration,
366    make_msg: Arc<F>,
367) -> BoxStream<Out>
368where
369    In: Send + 'static,
370    Msg: Message,
371    Out: Send + 'static,
372    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
373{
374    let mut in_flight = Vec::<InFlightAsk<Out>>::with_capacity(parallelism);
375    let mut next_index = 0_usize;
376    let mut next_to_emit = 0_usize;
377    let mut completed = Vec::with_capacity(parallelism);
378    let mut reply_pool = Vec::with_capacity(parallelism);
379    let mut input_done = false;
380
381    Box::new(std::iter::from_fn(move || {
382        loop {
383            if let Some(result) = take_completed(&mut completed, next_to_emit) {
384                next_to_emit += 1;
385                return Some(result);
386            }
387
388            while in_flight.len() < parallelism && !input_done {
389                match input.next() {
390                    Some(Ok(item)) => {
391                        let index = next_index;
392                        next_index += 1;
393                        match start_ractor_ask(
394                            index,
395                            actor_ref.clone(),
396                            timeout,
397                            item,
398                            Arc::clone(&make_msg),
399                            &mut reply_pool,
400                        ) {
401                            Ok(ask) => in_flight.push(ask),
402                            Err(error) => {
403                                completed.push((index, Err(error)));
404                                input_done = true;
405                            }
406                        }
407                    }
408                    Some(Err(error)) => {
409                        completed.push((next_index, Err(error)));
410                        next_index += 1;
411                        input_done = true;
412                    }
413                    None => input_done = true,
414                }
415            }
416
417            if let Some(result) = take_completed(&mut completed, next_to_emit) {
418                next_to_emit += 1;
419                return Some(result);
420            }
421
422            if in_flight.is_empty() {
423                return None;
424            }
425
426            let ask = wait_for_ready_ask(&mut in_flight, timeout);
427            let index = ask.index;
428            let result = ask.result;
429            recycle_reply_state(ask.state, &mut reply_pool);
430            if index == next_to_emit {
431                next_to_emit += 1;
432                return Some(result);
433            }
434            completed.push((index, result));
435        }
436    }))
437}
438
439fn take_completed<Out>(
440    completed: &mut Vec<(usize, StreamResult<Out>)>,
441    index: usize,
442) -> Option<StreamResult<Out>> {
443    let position = completed
444        .iter()
445        .position(|(completed_index, _)| *completed_index == index)?;
446    Some(completed.swap_remove(position).1)
447}
448
449struct InFlightAsk<Out> {
450    index: usize,
451    state: Option<Arc<ReplyState<Out>>>,
452    /// `None` when `Instant::now() + timeout` would overflow; treated as "never
453    /// time out" rather than panicking on the add.
454    deadline: Option<Instant>,
455}
456
457impl<Out> InFlightAsk<Out> {
458    fn state(&self) -> &Arc<ReplyState<Out>> {
459        self.state.as_ref().expect("in-flight ask has reply state")
460    }
461
462    fn into_state(mut self) -> Arc<ReplyState<Out>> {
463        self.state.take().expect("in-flight ask has reply state")
464    }
465}
466
467impl<Out> Drop for InFlightAsk<Out> {
468    fn drop(&mut self) {
469        if let Some(state) = &self.state {
470            state.close_receiver();
471        }
472    }
473}
474
475struct CompletedAsk<Out> {
476    index: usize,
477    result: StreamResult<Out>,
478    state: Arc<ReplyState<Out>>,
479}
480
481fn start_ractor_ask<In, Msg, Out, F>(
482    index: usize,
483    actor_ref: ActorRef<Msg>,
484    timeout: Duration,
485    input: In,
486    make_msg: Arc<F>,
487    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
488) -> StreamResult<InFlightAsk<Out>>
489where
490    In: Send + 'static,
491    Msg: Message,
492    Out: Send + 'static,
493    F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
494{
495    let reply_state = match reply_pool.pop() {
496        // A freshly allocated state is already Pending/open, so only recycled
497        // states need a reset.
498        Some(state) => {
499            state.reset(timeout);
500            state
501        }
502        None => Arc::new(ReplyState::new(timeout)),
503    };
504    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
505    let message =
506        catch_unwind(AssertUnwindSafe(|| make_msg(input, reply_to))).map_err(|panic| {
507            StreamError::ActorAskTaskFailed {
508                reason: panic_reason(panic),
509            }
510        })?;
511
512    match actor_ref.cast(message) {
513        Ok(()) => {}
514        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
515            return Err(StreamError::ActorTerminated);
516        }
517        Err(error) => {
518            return Err(StreamError::ActorAskSendFailed {
519                reason: error.to_string(),
520            });
521        }
522    }
523
524    Ok(InFlightAsk {
525        index,
526        state: Some(reply_state),
527        deadline: Instant::now().checked_add(timeout),
528    })
529}
530
531fn wait_for_ready_ask<Out>(
532    in_flight: &mut Vec<InFlightAsk<Out>>,
533    timeout: Duration,
534) -> CompletedAsk<Out> {
535    let mut idle_spins = 0;
536    let mut idle_yields = 0;
537    let mut time_refresh = 0_u32;
538    let mut now = Instant::now();
539    loop {
540        if time_refresh == 0 {
541            now = Instant::now();
542        }
543        time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
544        if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
545            return ask;
546        }
547
548        if idle_spins < ASK_READY_SPINS {
549            idle_spins += 1;
550            std::hint::spin_loop();
551        } else if idle_yields < ASK_IDLE_YIELDS {
552            idle_yields += 1;
553            time_refresh = 0;
554            thread::yield_now();
555        } else {
556            idle_spins = 0;
557            idle_yields = 0;
558            time_refresh = 0;
559            let current = thread::current();
560            let registered = register_ask_waiters(in_flight, &current);
561            now = Instant::now();
562            if let Some(ask) = take_ready_ask(in_flight, timeout, now) {
563                unregister_ask_waiters(registered, current.id());
564                return ask;
565            }
566            thread::park_timeout(next_ask_park(in_flight, now));
567            unregister_ask_waiters(registered, current.id());
568        }
569    }
570}
571
572fn take_ready_ask<Out>(
573    in_flight: &mut Vec<InFlightAsk<Out>>,
574    timeout: Duration,
575    now: Instant,
576) -> Option<CompletedAsk<Out>> {
577    let mut index = 0;
578    while index < in_flight.len() {
579        match in_flight[index].state().poll() {
580            ReplyPoll::Ready(reply) => {
581                let ask = in_flight.swap_remove(index);
582                return Some(CompletedAsk {
583                    index: ask.index,
584                    result: Ok(reply),
585                    state: ask.into_state(),
586                });
587            }
588            ReplyPoll::Dropped => {
589                let ask = in_flight.swap_remove(index);
590                return Some(CompletedAsk {
591                    index: ask.index,
592                    result: Err(StreamError::ActorAskResponseDropped),
593                    state: ask.into_state(),
594                });
595            }
596            ReplyPoll::Pending => {
597                if in_flight[index]
598                    .deadline
599                    .is_some_and(|deadline| now >= deadline)
600                {
601                    // Close under the slot lock; deliver a reply that raced in
602                    // just before the deadline rather than losing it.
603                    let outcome = in_flight[index].state().close_on_timeout();
604                    let ask = in_flight.swap_remove(index);
605                    let result = match outcome {
606                        ReplyPoll::Ready(reply) => Ok(reply),
607                        ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
608                        ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
609                    };
610                    return Some(CompletedAsk {
611                        index: ask.index,
612                        result,
613                        state: ask.into_state(),
614                    });
615                }
616                index += 1;
617            }
618        }
619    }
620    None
621}
622
623fn register_ask_waiters<Out>(
624    in_flight: &[InFlightAsk<Out>],
625    current: &Thread,
626) -> Vec<Arc<ReplyState<Out>>> {
627    let mut registered = Vec::with_capacity(in_flight.len());
628    for ask in in_flight {
629        ask.state().register_waiter(current.clone());
630        registered.push(Arc::clone(ask.state()));
631    }
632    registered
633}
634
635fn unregister_ask_waiters<Out>(registered: Vec<Arc<ReplyState<Out>>>, current_id: ThreadId) {
636    for state in registered {
637        state.unregister_waiter(current_id);
638    }
639}
640
641fn next_ask_park<Out>(in_flight: &[InFlightAsk<Out>], now: Instant) -> Duration {
642    next_ask_deadline_remaining(in_flight, now)
643        .unwrap_or(ASK_MAX_PARK)
644        .min(ASK_MAX_PARK)
645}
646
647fn next_ask_deadline_remaining<Out>(
648    in_flight: &[InFlightAsk<Out>],
649    now: Instant,
650) -> Option<Duration> {
651    in_flight
652        .iter()
653        .filter_map(|ask| ask.deadline)
654        .map(|deadline| deadline.saturating_duration_since(now))
655        .min()
656}
657
658fn recycle_reply_state<Out>(
659    state: Arc<ReplyState<Out>>,
660    reply_pool: &mut Vec<Arc<ReplyState<Out>>>,
661) {
662    if Arc::strong_count(&state) == 1 {
663        reply_pool.push(state);
664    }
665}
666
667fn panic_reason(panic: Box<dyn Any + Send>) -> String {
668    if let Some(reason) = panic.downcast_ref::<&str>() {
669        (*reason).to_owned()
670    } else if let Some(reason) = panic.downcast_ref::<String>() {
671        reason.clone()
672    } else {
673        "actor ask task panicked".to_owned()
674    }
675}
676
677pub(crate) fn ractor_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
678    static RUNTIME: std::sync::OnceLock<Result<tokio::runtime::Runtime, String>> =
679        std::sync::OnceLock::new();
680
681    match RUNTIME.get_or_init(|| {
682        tokio::runtime::Builder::new_multi_thread()
683            .thread_name("datum-ractor-runtime")
684            .enable_all()
685            .build()
686            .map_err(|error| format!("ractor runtime failed to start: {error}"))
687    }) {
688        Ok(runtime) => Ok(runtime),
689        Err(error) => Err(StreamError::Failed(error.clone())),
690    }
691}
692
693pub(crate) fn block_on_ractor_runtime<T, F>(future: F) -> StreamResult<T>
694where
695    T: Send + 'static,
696    F: std::future::Future<Output = T> + Send + 'static,
697{
698    let runtime = ractor_runtime()?;
699    let result = if tokio::runtime::Handle::try_current().is_ok() {
700        std::thread::spawn(move || runtime.block_on(future))
701            .join()
702            .map_err(|_| StreamError::AbruptTermination)?
703    } else {
704        runtime.block_on(future)
705    };
706    Ok(result)
707}
708
709#[cfg(test)]
710fn block_on_ractor_ask_runtime<T, F>(future: F) -> T
711where
712    T: Send + 'static,
713    F: std::future::Future<Output = T> + Send + 'static,
714{
715    block_on_ractor_runtime(future).expect("ractor ask runtime starts")
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use crate::stream::{Sink, Source, StreamCompletion};
722    use std::sync::{
723        Arc as StdArc,
724        atomic::{AtomicUsize, Ordering},
725    };
726
727    enum AskTestMessage {
728        Delayed {
729            input: u64,
730            delay: Duration,
731            reply_to: ReplyPort<u64>,
732        },
733        Track {
734            input: u64,
735            reply_to: ReplyPort<u64>,
736        },
737        NeverReply {
738            _reply_to: ReplyPort<u64>,
739        },
740        DropReply {
741            _reply_to: ReplyPort<u64>,
742        },
743        BindTcp {
744            reply_to: ReplyPort<u64>,
745        },
746    }
747
748    #[cfg(feature = "cluster")]
749    impl Message for AskTestMessage {}
750
751    struct AskTestActor;
752
753    struct AskTestState {
754        active: StdArc<AtomicUsize>,
755        max_active: StdArc<AtomicUsize>,
756        held_replies: Vec<ReplyPort<u64>>,
757    }
758
759    impl Actor for AskTestActor {
760        type Msg = AskTestMessage;
761        type State = AskTestState;
762        type Arguments = AskTestState;
763
764        async fn pre_start(
765            &self,
766            _myself: ActorRef<Self::Msg>,
767            args: Self::Arguments,
768        ) -> Result<Self::State, ActorProcessingErr> {
769            Ok(args)
770        }
771
772        async fn handle(
773            &self,
774            _myself: ActorRef<Self::Msg>,
775            message: Self::Msg,
776            state: &mut Self::State,
777        ) -> Result<(), ActorProcessingErr> {
778            match message {
779                AskTestMessage::Delayed {
780                    input,
781                    delay,
782                    reply_to,
783                } => {
784                    ractor::concurrency::spawn(async move {
785                        ractor::concurrency::sleep(delay).await;
786                        let _ = reply_to.send(input);
787                    });
788                }
789                AskTestMessage::Track { input, reply_to } => {
790                    let active = StdArc::clone(&state.active);
791                    let max_active = StdArc::clone(&state.max_active);
792                    let current = active.fetch_add(1, Ordering::SeqCst) + 1;
793                    max_active.fetch_max(current, Ordering::SeqCst);
794                    ractor::concurrency::spawn(async move {
795                        ractor::concurrency::sleep(Duration::from_millis(20)).await;
796                        active.fetch_sub(1, Ordering::SeqCst);
797                        let _ = reply_to.send(input);
798                    });
799                }
800                AskTestMessage::NeverReply { _reply_to } => {
801                    state.held_replies.push(_reply_to);
802                }
803                AskTestMessage::DropReply { _reply_to } => drop(_reply_to),
804                AskTestMessage::BindTcp { reply_to } => {
805                    let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
806                    let port = listener.local_addr()?.port();
807                    let _ = reply_to.send(u64::from(port));
808                }
809            }
810            Ok(())
811        }
812    }
813
814    fn wait<T>(completion: StreamCompletion<T>) -> T {
815        completion.wait().unwrap()
816    }
817
818    fn spawn_test_actor() -> (
819        ActorRef<AskTestMessage>,
820        ractor::concurrency::JoinHandle<()>,
821        StdArc<AtomicUsize>,
822    ) {
823        let active = StdArc::new(AtomicUsize::new(0));
824        let max_active = StdArc::new(AtomicUsize::new(0));
825        let state = AskTestState {
826            active,
827            max_active: StdArc::clone(&max_active),
828            held_replies: Vec::new(),
829        };
830        let (actor_ref, handle) = block_on_ractor_ask_runtime(async move {
831            Actor::spawn(None, AskTestActor, state)
832                .await
833                .expect("test actor spawns")
834        });
835        (actor_ref, handle, max_active)
836    }
837
838    fn stop_test_actor(
839        actor_ref: ActorRef<AskTestMessage>,
840        handle: ractor::concurrency::JoinHandle<()>,
841    ) {
842        actor_ref.stop(None);
843        block_on_ractor_ask_runtime(async move {
844            handle.await.expect("test actor task joins");
845        });
846    }
847
848    #[test]
849    fn actor_flow_ask_preserves_order_with_parallelism() {
850        let (actor_ref, handle, _) = spawn_test_actor();
851
852        let values = Source::from_iter(0_u64..5)
853            .via(ActorFlow::ask(
854                actor_ref.clone(),
855                5,
856                Duration::from_secs(1),
857                |input, reply_to| AskTestMessage::Delayed {
858                    input,
859                    delay: Duration::from_millis((5 - input) * 10),
860                    reply_to,
861                },
862            ))
863            .run_with(Sink::collect())
864            .unwrap();
865
866        assert_eq!(wait(values), vec![0, 1, 2, 3, 4]);
867        stop_test_actor(actor_ref, handle);
868    }
869
870    #[test]
871    fn actor_flow_ask_respects_parallelism() {
872        let (actor_ref, handle, max_active) = spawn_test_actor();
873
874        let values = Source::from_iter(0_u64..8)
875            .via(ActorFlow::ask(
876                actor_ref.clone(),
877                3,
878                Duration::from_secs(1),
879                |input, reply_to| AskTestMessage::Track { input, reply_to },
880            ))
881            .run_with(Sink::collect())
882            .unwrap();
883
884        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
885        assert_eq!(max_active.load(Ordering::SeqCst), 3);
886        stop_test_actor(actor_ref, handle);
887    }
888
889    #[test]
890    fn actor_flow_ask_into_head_returns_value() {
891        // `ActorFlow::ask` does *not* preserve the inline head-terminal hint, so a
892        // bounded eager source feeding `Sink::head` through an ask runs the ask
893        // graph (including the cross-thread reply wait) on a runtime worker rather
894        // than inline on the caller. That keeps `run_with_materializer()`
895        // non-blocking/awaitable: it returns a `StreamCompletion` backed by a
896        // oneshot that resolves once the worker drains the ask round-trip. This
897        // guards that the ask -> head graph still returns the correct first reply
898        // end to end. (The reply is delayed: `Track` sleeps 20 ms on the actor
899        // runtime before replying.)
900        let (actor_ref, handle, _) = spawn_test_actor();
901
902        let head = Source::single(7_u64)
903            .via(ActorFlow::ask(
904                actor_ref.clone(),
905                1,
906                Duration::from_secs(1),
907                |input, reply_to| AskTestMessage::Track { input, reply_to },
908            ))
909            .run_with(Sink::head())
910            .unwrap();
911
912        assert_eq!(wait(head), 7);
913
914        stop_test_actor(actor_ref, handle);
915    }
916
917    #[test]
918    fn actor_flow_ask_actor_handler_can_use_tokio_io() {
919        let (actor_ref, handle, _) = spawn_test_actor();
920
921        let ports = Source::single(0_u64)
922            .via(ActorFlow::ask(
923                actor_ref.clone(),
924                1,
925                Duration::from_secs(1),
926                |_input, reply_to| AskTestMessage::BindTcp { reply_to },
927            ))
928            .run_collect()
929            .unwrap();
930
931        assert_eq!(ports.len(), 1);
932        assert_ne!(ports[0], 0);
933        stop_test_actor(actor_ref, handle);
934    }
935
936    #[test]
937    fn actor_flow_ask_parks_for_delayed_reply_under_long_timeout() {
938        // Replies are delayed well past the spin+yield window, so each wait must
939        // reach the park branch and be woken by `ReplyState::send` rather than
940        // busy-yielding until the (long) deadline. Parallelism > 1 also exercises
941        // registering/unregistering several in-flight waiters on one parked
942        // consumer. This is the regression guard for the long-deadline ask path
943        // that previously yielded indefinitely (never parked).
944        let (actor_ref, handle, _) = spawn_test_actor();
945
946        let values = Source::from_iter(0_u64..8)
947            .via(ActorFlow::ask(
948                actor_ref.clone(),
949                4,
950                Duration::from_secs(30),
951                |input, reply_to| AskTestMessage::Delayed {
952                    input,
953                    delay: Duration::from_millis(25),
954                    reply_to,
955                },
956            ))
957            .run_with(Sink::collect())
958            .unwrap();
959
960        assert_eq!(wait(values), (0_u64..8).collect::<Vec<_>>());
961        stop_test_actor(actor_ref, handle);
962    }
963
964    #[test]
965    fn dropped_in_flight_ask_closes_reply_port() {
966        let state = StdArc::new(ReplyState::new(Duration::from_secs(1)));
967        let reply_to = ReplyPort::new(StdArc::clone(&state));
968        let ask = InFlightAsk {
969            index: 0,
970            state: Some(StdArc::clone(&state)),
971            deadline: None,
972        };
973
974        drop(ask);
975
976        assert!(reply_to.is_closed());
977        assert_eq!(reply_to.send(1), Err(ReplySendError));
978    }
979
980    #[test]
981    fn actor_flow_ask_times_out() {
982        let (actor_ref, handle, _) = spawn_test_actor();
983        let timeout = Duration::from_millis(10);
984
985        let result = Source::single(1_u64)
986            .via(ActorFlow::ask(
987                actor_ref.clone(),
988                1,
989                timeout,
990                |_input, reply_to| AskTestMessage::NeverReply {
991                    _reply_to: reply_to,
992                },
993            ))
994            .run_collect();
995
996        assert_eq!(result, Err(StreamError::ActorAskTimeout { timeout }));
997        stop_test_actor(actor_ref, handle);
998    }
999
1000    #[test]
1001    fn actor_flow_ask_fails_when_actor_is_stopped() {
1002        let (actor_ref, handle, _) = spawn_test_actor();
1003        actor_ref.stop(None);
1004        block_on_ractor_ask_runtime(async move {
1005            handle.await.expect("test actor task joins");
1006        });
1007
1008        let result = Source::single(1_u64)
1009            .via(ActorFlow::ask(
1010                actor_ref,
1011                1,
1012                Duration::from_secs(1),
1013                |input, reply_to| AskTestMessage::Delayed {
1014                    input,
1015                    delay: Duration::ZERO,
1016                    reply_to,
1017                },
1018            ))
1019            .run_collect();
1020
1021        assert_eq!(result, Err(StreamError::ActorTerminated));
1022    }
1023
1024    #[test]
1025    fn actor_flow_ask_fails_when_reply_port_is_dropped() {
1026        let (actor_ref, handle, _) = spawn_test_actor();
1027
1028        let result = Source::single(1_u64)
1029            .via(ActorFlow::ask(
1030                actor_ref.clone(),
1031                1,
1032                Duration::from_secs(1),
1033                |_input, reply_to| AskTestMessage::DropReply {
1034                    _reply_to: reply_to,
1035                },
1036            ))
1037            .run_collect();
1038
1039        assert_eq!(result, Err(StreamError::ActorAskResponseDropped));
1040        stop_test_actor(actor_ref, handle);
1041    }
1042
1043    #[test]
1044    fn actor_flow_ask_maps_message_builder_panic_to_task_failure() {
1045        let (actor_ref, handle, _) = spawn_test_actor();
1046
1047        let result = Source::single(1_u64)
1048            .via(ActorFlow::ask(
1049                actor_ref.clone(),
1050                1,
1051                Duration::from_secs(1),
1052                |_input, _reply_to: ReplyPort<u64>| -> AskTestMessage {
1053                    panic!("message builder failed");
1054                },
1055            ))
1056            .run_collect();
1057
1058        assert_eq!(
1059            result,
1060            Err(StreamError::ActorAskTaskFailed {
1061                reason: "message builder failed".to_owned()
1062            })
1063        );
1064        stop_test_actor(actor_ref, handle);
1065    }
1066}