Skip to main content

datum/actor/
interop.rs

1use std::{
2    collections::VecDeque,
3    fmt,
4    marker::PhantomData,
5    panic::{AssertUnwindSafe, catch_unwind},
6    sync::{
7        Arc, Condvar, Mutex, MutexGuard,
8        atomic::{AtomicBool, Ordering},
9    },
10    thread::{self, Thread, ThreadId},
11    time::{Duration, Instant},
12};
13
14use crate::{
15    context::FlowWithContext,
16    stream::{BoxStream, Flow, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult},
17};
18
19use super::{
20    ASK_IDLE_YIELDS, ASK_MAX_PARK, ASK_READY_SPINS, ASK_TIME_REFRESH_ITERS, Actor, ActorFlow,
21    ActorRef, ActorResult, InFlightAsk, Message, ReplyPoll, ReplyPort, ReplyState,
22    block_on_ractor_runtime, panic_reason, recycle_reply_state, wait_for_ready_ask,
23};
24
25/// Status envelope used by [`ActorFlow::ask_with_status`].
26///
27/// `Ok` replies are unwrapped into stream elements. `Err` replies fail the
28/// stream with the supplied [`StreamError`].
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ActorStatus<T> {
31    Ok(T),
32    Err(StreamError),
33}
34
35impl<T> From<StreamResult<T>> for ActorStatus<T> {
36    fn from(value: StreamResult<T>) -> Self {
37        match value {
38            Ok(value) => Self::Ok(value),
39            Err(error) => Self::Err(error),
40        }
41    }
42}
43
44impl<T> ActorStatus<T> {
45    fn into_result(self) -> StreamResult<T> {
46        match self {
47            Self::Ok(value) => Ok(value),
48            Self::Err(error) => Err(error),
49        }
50    }
51}
52
53/// Protocol accepted by [`ActorSource::actor_ref`] and
54/// [`ActorSource::actor_ref_with_backpressure`].
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ActorSourceMessage<T> {
57    Element(T),
58    Complete,
59    Fail(String),
60}
61
62#[cfg(feature = "cluster")]
63impl<T: Send + 'static> Message for ActorSourceMessage<T> {}
64
65/// Default typed protocol emitted by [`ActorSink::typed`].
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum ActorSinkMessage<T> {
68    Element(T),
69    Complete,
70    Fail(StreamError),
71}
72
73#[cfg(feature = "cluster")]
74impl<T: Send + 'static> Message for ActorSinkMessage<T> {}
75
76/// Default typed protocol emitted by [`ActorSink::typed_with_backpressure`].
77#[derive(Debug)]
78pub enum ActorSinkBackpressureMessage<T, Ack> {
79    Init(ReplyPort<Ack>),
80    Element(T, ReplyPort<Ack>),
81    Complete,
82    Fail(StreamError),
83}
84
85#[cfg(feature = "cluster")]
86impl<T: Send + 'static, Ack: Send + 'static> Message for ActorSinkBackpressureMessage<T, Ack> {}
87
88/// Watch failure details used to build the stream failure for
89/// [`ActorFlow::watch`].
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct WatchEvent {
92    reason: String,
93}
94
95impl WatchEvent {
96    #[must_use]
97    pub fn reason(&self) -> &str {
98        &self.reason
99    }
100
101    fn into_stream_error(self) -> StreamError {
102        StreamError::Failed(self.reason)
103    }
104}
105
106impl fmt::Display for WatchEvent {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.write_str(&self.reason)
109    }
110}
111
112/// Collection of actor-backed source constructors.
113pub struct ActorSource;
114
115/// Collection of actor-backed sink constructors.
116pub struct ActorSink;
117
118/// pg-backed PubSub bridge.
119///
120/// Ractor's `pg` process groups are the natural local primitive for named actor
121/// subscriber sets. A PubSub source joins its materialized actor to a pg group;
122/// a PubSub sink broadcasts to the current members of that group. Groups are
123/// expected to be type-homogeneous for the `ActorSourceMessage<T>` protocol.
124pub struct ActorPubSub;
125
126impl ActorFlow {
127    /// Sends each stream element to an actor and unwraps a status reply.
128    ///
129    /// `ActorStatus::Ok` emits the wrapped value. `ActorStatus::Err` fails the
130    /// stream with that error.
131    #[must_use]
132    pub fn ask_with_status<In, Msg, Out, F>(
133        actor_ref: ActorRef<Msg>,
134        parallelism: usize,
135        timeout: Duration,
136        make_msg: F,
137    ) -> Flow<In, Out, NotUsed>
138    where
139        In: Send + 'static,
140        Msg: Message,
141        Out: Send + 'static,
142        F: Fn(In, ReplyPort<ActorStatus<Out>>) -> Msg + Send + Sync + 'static,
143    {
144        ask_flow_with_pending(
145            actor_ref,
146            parallelism,
147            timeout,
148            move |input, reply_to| (make_msg(input, reply_to), ()),
149            |(), reply| reply.into_result(),
150        )
151    }
152
153    /// Sends each `(element, context)` pair to an actor while preserving context.
154    ///
155    /// The user message builder receives only the element and a reply port. The
156    /// original context is reattached to the actor reply in-order.
157    #[must_use]
158    pub fn ask_with_context<In, Ctx, Msg, Out, F>(
159        actor_ref: ActorRef<Msg>,
160        parallelism: usize,
161        timeout: Duration,
162        make_msg: F,
163    ) -> FlowWithContext<In, Ctx, Out, Ctx, NotUsed>
164    where
165        In: Send + 'static,
166        Ctx: Send + 'static,
167        Msg: Message,
168        Out: Send + 'static,
169        F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
170    {
171        FlowWithContext::from_flow(ask_flow_with_pending(
172            actor_ref,
173            parallelism,
174            timeout,
175            move |(input, context), reply_to| (make_msg(input, reply_to), context),
176            |context, reply| Ok((reply, context)),
177        ))
178    }
179
180    /// Status-reply variant of [`ActorFlow::ask_with_context`].
181    #[must_use]
182    pub fn ask_with_status_and_context<In, Ctx, Msg, Out, F>(
183        actor_ref: ActorRef<Msg>,
184        parallelism: usize,
185        timeout: Duration,
186        make_msg: F,
187    ) -> FlowWithContext<In, Ctx, Out, Ctx, NotUsed>
188    where
189        In: Send + 'static,
190        Ctx: Send + 'static,
191        Msg: Message,
192        Out: Send + 'static,
193        F: Fn(In, ReplyPort<ActorStatus<Out>>) -> Msg + Send + Sync + 'static,
194    {
195        FlowWithContext::from_flow(ask_flow_with_pending(
196            actor_ref,
197            parallelism,
198            timeout,
199            move |(input, context), reply_to| (make_msg(input, reply_to), context),
200            |context, reply| reply.into_result().map(|reply| (reply, context)),
201        ))
202    }
203
204    /// Fails a pass-through flow if `actor_ref` terminates while the stream is
205    /// being consumed.
206    ///
207    /// This binds to Ractor's monitor supervision events. Once the monitor sees
208    /// termination, the stream fails with `StreamError::Failed` and a
209    /// watched-actor termination message, even if upstream is idle.
210    #[must_use]
211    pub fn watch<T, Msg>(actor_ref: ActorRef<Msg>) -> Flow<T, T, NotUsed>
212    where
213        T: Send + 'static,
214        Msg: Message,
215    {
216        Flow::from_runtime_transform(move |input, materializer| {
217            let shared = Arc::new(WatchShared::default());
218            let (monitor_ref, _handle) =
219                spawn_watch_monitor(actor_ref.get_cell(), Arc::clone(&shared))?;
220            let producer_shared = Arc::clone(&shared);
221            let producer_completion = materializer.spawn_stream(move |cancelled| {
222                run_watch_upstream(input, producer_shared, cancelled)
223            });
224            Ok(Box::new(WatchStream {
225                shared,
226                monitor_ref: Some(monitor_ref),
227                producer_completion: Some(producer_completion),
228                terminated: false,
229            }) as BoxStream<T>)
230        })
231    }
232}
233
234impl ActorSource {
235    /// Materializes a typed actor ref that feeds this source.
236    ///
237    /// # Mailbox risk
238    ///
239    /// This source uses Ractor's regular actor mailbox. It is not backpressured:
240    /// a fast sender can enqueue messages faster than the stream consumes them,
241    /// and the actor mailbox and this source's internal buffer can grow without
242    /// bound. Prefer [`ActorSource::actor_ref_with_backpressure`] when the
243    /// producer may outpace downstream demand.
244    #[must_use]
245    pub fn actor_ref<T>() -> Source<T, ActorRef<ActorSourceMessage<T>>>
246    where
247        T: Send + 'static,
248    {
249        actor_source(None::<SourceBackpressure<ActorSourceMessage<T>>>)
250    }
251
252    /// Alias for [`ActorSource::actor_ref`] using Datum's explicit protocol
253    /// enum.
254    #[must_use]
255    pub fn typed<T>() -> Source<T, ActorRef<ActorSourceMessage<T>>>
256    where
257        T: Send + 'static,
258    {
259        Self::actor_ref()
260    }
261
262    /// Materializes a typed actor ref that accepts one element per ack.
263    ///
264    /// The source sends `ack_message` to `ack_to` at startup and after each
265    /// emitted element. Sending another `Element` before the ack arrives at the
266    /// producer violates the protocol and fails the stream.
267    #[must_use]
268    pub fn actor_ref_with_backpressure<T, AckMsg>(
269        ack_to: ActorRef<AckMsg>,
270        ack_message: AckMsg,
271    ) -> Source<T, ActorRef<ActorSourceMessage<T>>>
272    where
273        T: Send + 'static,
274        AckMsg: Message + Clone + Sync,
275    {
276        actor_source(Some(SourceBackpressure {
277            ack_to,
278            make_ack: Arc::new(move || ack_message.clone()),
279            _marker: PhantomData,
280        }))
281    }
282}
283
284impl ActorSink {
285    /// Sends stream elements to `actor_ref` without waiting for an actor ack.
286    ///
287    /// # Mailbox risk
288    ///
289    /// This sink has no backpressure signal from the destination actor. A fast
290    /// upstream can enqueue messages faster than the actor processes them, so
291    /// the target actor's mailbox may grow without bound. Use
292    /// [`ActorSink::actor_ref_with_backpressure`] for slow or untrusted
293    /// consumers.
294    #[must_use]
295    pub fn actor_ref<In, Msg, Elem, Complete, Failure>(
296        actor_ref: ActorRef<Msg>,
297        make_element_message: Elem,
298        on_complete_message: Complete,
299        on_failure_message: Failure,
300    ) -> Sink<In, StreamCompletion<NotUsed>>
301    where
302        In: Send + 'static,
303        Msg: Message,
304        Elem: Fn(In) -> Msg + Send + Sync + 'static,
305        Complete: Fn() -> Msg + Send + Sync + 'static,
306        Failure: Fn(StreamError) -> Msg + Send + Sync + 'static,
307    {
308        let make_element_message = Arc::new(make_element_message);
309        let on_complete_message = Arc::new(on_complete_message);
310        let on_failure_message = Arc::new(on_failure_message);
311        Sink::from_runner(move |mut input, materializer| {
312            let actor_ref = actor_ref.clone();
313            let make_element_message = Arc::clone(&make_element_message);
314            let on_complete_message = Arc::clone(&on_complete_message);
315            let on_failure_message = Arc::clone(&on_failure_message);
316            Ok(materializer.spawn_stream(move |cancelled| {
317                run_actor_ref_sink(
318                    &mut input,
319                    cancelled,
320                    actor_ref,
321                    make_element_message,
322                    on_complete_message,
323                    on_failure_message,
324                )
325            }))
326        })
327    }
328
329    /// Sends stream elements using [`ActorSinkMessage`].
330    ///
331    /// # Mailbox risk
332    ///
333    /// This is the typed-protocol form of [`ActorSink::actor_ref`] and has the
334    /// same unbounded-mailbox risk.
335    #[must_use]
336    pub fn typed<In>(
337        actor_ref: ActorRef<ActorSinkMessage<In>>,
338    ) -> Sink<In, StreamCompletion<NotUsed>>
339    where
340        In: Send + 'static,
341    {
342        Self::actor_ref(
343            actor_ref,
344            ActorSinkMessage::Element,
345            || ActorSinkMessage::Complete,
346            ActorSinkMessage::Fail,
347        )
348    }
349
350    /// Sends an init message, then sends one element per actor ack.
351    ///
352    /// `make_init_message` and `make_element_message` receive a [`ReplyPort`].
353    /// The destination actor must send any value to that port to acknowledge
354    /// readiness for the next message. Completion and failure messages are sent
355    /// without waiting for an ack.
356    #[must_use]
357    pub fn actor_ref_with_backpressure<In, Msg, Ack, Init, Elem, Complete, Failure>(
358        actor_ref: ActorRef<Msg>,
359        timeout: Duration,
360        make_init_message: Init,
361        make_element_message: Elem,
362        on_complete_message: Complete,
363        on_failure_message: Failure,
364    ) -> Sink<In, StreamCompletion<NotUsed>>
365    where
366        In: Send + 'static,
367        Msg: Message,
368        Ack: Send + 'static,
369        Init: Fn(ReplyPort<Ack>) -> Msg + Send + Sync + 'static,
370        Elem: Fn(In, ReplyPort<Ack>) -> Msg + Send + Sync + 'static,
371        Complete: Fn() -> Msg + Send + Sync + 'static,
372        Failure: Fn(StreamError) -> Msg + Send + Sync + 'static,
373    {
374        let make_init_message = Arc::new(make_init_message);
375        let make_element_message = Arc::new(make_element_message);
376        let on_complete_message = Arc::new(on_complete_message);
377        let on_failure_message = Arc::new(on_failure_message);
378        Sink::from_runner(move |mut input, materializer| {
379            let actor_ref = actor_ref.clone();
380            let make_init_message = Arc::clone(&make_init_message);
381            let make_element_message = Arc::clone(&make_element_message);
382            let on_complete_message = Arc::clone(&on_complete_message);
383            let on_failure_message = Arc::clone(&on_failure_message);
384            Ok(materializer.spawn_stream(move |cancelled| {
385                send_and_wait_ack(&actor_ref, timeout, |reply_to| make_init_message(reply_to))?;
386                run_actor_ref_backpressure_sink(
387                    &mut input,
388                    cancelled,
389                    actor_ref,
390                    timeout,
391                    make_element_message,
392                    on_complete_message,
393                    on_failure_message,
394                )
395            }))
396        })
397    }
398
399    /// Backpressured typed-protocol sink.
400    #[must_use]
401    pub fn typed_with_backpressure<In, Ack>(
402        actor_ref: ActorRef<ActorSinkBackpressureMessage<In, Ack>>,
403        timeout: Duration,
404    ) -> Sink<In, StreamCompletion<NotUsed>>
405    where
406        In: Send + 'static,
407        Ack: Send + 'static,
408    {
409        Self::actor_ref_with_backpressure(
410            actor_ref,
411            timeout,
412            ActorSinkBackpressureMessage::Init,
413            ActorSinkBackpressureMessage::Element,
414            || ActorSinkBackpressureMessage::Complete,
415            ActorSinkBackpressureMessage::Fail,
416        )
417    }
418}
419
420impl ActorPubSub {
421    /// Creates a source whose materialized actor joins a Ractor pg group.
422    ///
423    /// Messages broadcast by [`ActorPubSub::sink`] to the same group are emitted
424    /// by this source.
425    #[must_use]
426    pub fn source<T>(group: impl Into<String>) -> Source<T, ActorRef<ActorSourceMessage<T>>>
427    where
428        T: Send + 'static,
429    {
430        let group = group.into();
431        ActorSource::actor_ref().map_materialized_value(move |actor_ref| {
432            ractor::pg::join(group.clone(), vec![actor_ref.get_cell()]);
433            actor_ref
434        })
435    }
436
437    /// Broadcasts each stream element to current members of a Ractor pg group.
438    ///
439    /// Group members are expected to be actors accepting
440    /// `ActorSourceMessage<T>`. Members that terminate before Ractor's pg
441    /// membership catches up are skipped so one stale entry does not prevent
442    /// delivery to the remaining subscribers.
443    #[must_use]
444    pub fn sink<T>(group: impl Into<String>) -> Sink<T, StreamCompletion<NotUsed>>
445    where
446        T: Clone + Send + 'static,
447    {
448        let group = group.into();
449        Sink::from_runner(move |mut input, materializer| {
450            let group = group.clone();
451            Ok(materializer.spawn_stream(move |cancelled| {
452                loop {
453                    if cancelled.load(Ordering::SeqCst) {
454                        return Err(StreamError::Cancelled);
455                    }
456                    match input.next() {
457                        Some(Ok(item)) => {
458                            broadcast_to_group(&group, ActorSourceMessage::Element(item))?;
459                        }
460                        Some(Err(error)) => {
461                            let _ = broadcast_to_group(
462                                &group,
463                                ActorSourceMessage::<T>::Fail(error.to_string()),
464                            );
465                            return Err(error);
466                        }
467                        None => {
468                            broadcast_to_group(&group, ActorSourceMessage::<T>::Complete)?;
469                            return Ok(NotUsed);
470                        }
471                    }
472                }
473            }))
474        })
475    }
476}
477
478fn ask_flow_with_pending<In, Msg, Reply, Out, Pending, Prepare, Finish>(
479    actor_ref: ActorRef<Msg>,
480    parallelism: usize,
481    timeout: Duration,
482    prepare: Prepare,
483    finish: Finish,
484) -> Flow<In, Out, NotUsed>
485where
486    In: Send + 'static,
487    Msg: Message,
488    Reply: Send + 'static,
489    Out: Send + 'static,
490    Pending: Send + 'static,
491    Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
492    Finish: Fn(Pending, Reply) -> StreamResult<Out> + Send + Sync + 'static,
493{
494    assert!(
495        parallelism > 0,
496        "ActorFlow ask parallelism must be greater than zero"
497    );
498    let prepare = Arc::new(prepare);
499    let finish = Arc::new(finish);
500    Flow::from_transform(move |input| {
501        ask_ractor_ordered_with_pending(
502            input,
503            actor_ref.clone(),
504            parallelism,
505            timeout,
506            Arc::clone(&prepare),
507            Arc::clone(&finish),
508        )
509    })
510}
511
512fn ask_ractor_ordered_with_pending<In, Msg, Reply, Out, Pending, Prepare, Finish>(
513    mut input: BoxStream<In>,
514    actor_ref: ActorRef<Msg>,
515    parallelism: usize,
516    timeout: Duration,
517    prepare: Arc<Prepare>,
518    finish: Arc<Finish>,
519) -> BoxStream<Out>
520where
521    In: Send + 'static,
522    Msg: Message,
523    Reply: Send + 'static,
524    Out: Send + 'static,
525    Pending: Send + 'static,
526    Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
527    Finish: Fn(Pending, Reply) -> StreamResult<Out> + Send + Sync + 'static,
528{
529    let mut in_flight = Vec::<InFlightAskWith<Reply, Pending>>::with_capacity(parallelism);
530    let mut next_index = 0_usize;
531    let mut next_to_emit = 0_usize;
532    let mut completed = Vec::with_capacity(parallelism);
533    let mut reply_pool = Vec::with_capacity(parallelism);
534    let mut input_done = false;
535
536    Box::new(std::iter::from_fn(move || {
537        loop {
538            if let Some(result) = take_completed_with_pending(&mut completed, next_to_emit) {
539                next_to_emit += 1;
540                return Some(result);
541            }
542
543            while in_flight.len() < parallelism && !input_done {
544                match input.next() {
545                    Some(Ok(item)) => {
546                        let index = next_index;
547                        next_index += 1;
548                        match start_ractor_ask_with_pending(
549                            index,
550                            actor_ref.clone(),
551                            timeout,
552                            item,
553                            Arc::clone(&prepare),
554                            &mut reply_pool,
555                        ) {
556                            Ok(ask) => in_flight.push(ask),
557                            Err(error) => {
558                                completed.push((index, Err(error)));
559                                input_done = true;
560                            }
561                        }
562                    }
563                    Some(Err(error)) => {
564                        completed.push((next_index, Err(error)));
565                        next_index += 1;
566                        input_done = true;
567                    }
568                    None => input_done = true,
569                }
570            }
571
572            if let Some(result) = take_completed_with_pending(&mut completed, next_to_emit) {
573                next_to_emit += 1;
574                return Some(result);
575            }
576
577            if in_flight.is_empty() {
578                return None;
579            }
580
581            let ask = wait_for_ready_ask_with_pending(&mut in_flight, timeout, finish.as_ref());
582            let index = ask.index;
583            let result = ask.result;
584            recycle_reply_state(ask.state, &mut reply_pool);
585            if index == next_to_emit {
586                next_to_emit += 1;
587                return Some(result);
588            }
589            completed.push((index, result));
590        }
591    }))
592}
593
594fn take_completed_with_pending<Out>(
595    completed: &mut Vec<(usize, StreamResult<Out>)>,
596    index: usize,
597) -> Option<StreamResult<Out>> {
598    let position = completed
599        .iter()
600        .position(|(completed_index, _)| *completed_index == index)?;
601    Some(completed.swap_remove(position).1)
602}
603
604struct InFlightAskWith<Reply, Pending> {
605    index: usize,
606    state: Option<Arc<ReplyState<Reply>>>,
607    pending: Option<Pending>,
608    deadline: Option<Instant>,
609}
610
611impl<Reply, Pending> InFlightAskWith<Reply, Pending> {
612    fn state(&self) -> &Arc<ReplyState<Reply>> {
613        self.state.as_ref().expect("in-flight ask has reply state")
614    }
615
616    fn into_parts(mut self) -> (Arc<ReplyState<Reply>>, Pending) {
617        let state = self.state.take().expect("in-flight ask has reply state");
618        let pending = self
619            .pending
620            .take()
621            .expect("in-flight ask has pending state");
622        (state, pending)
623    }
624}
625
626impl<Reply, Pending> Drop for InFlightAskWith<Reply, Pending> {
627    fn drop(&mut self) {
628        if let Some(state) = &self.state {
629            state.close_receiver();
630        }
631    }
632}
633
634struct CompletedAskWith<Reply, Out> {
635    index: usize,
636    result: StreamResult<Out>,
637    state: Arc<ReplyState<Reply>>,
638}
639
640fn start_ractor_ask_with_pending<In, Msg, Reply, Pending, Prepare>(
641    index: usize,
642    actor_ref: ActorRef<Msg>,
643    timeout: Duration,
644    input: In,
645    prepare: Arc<Prepare>,
646    reply_pool: &mut Vec<Arc<ReplyState<Reply>>>,
647) -> StreamResult<InFlightAskWith<Reply, Pending>>
648where
649    In: Send + 'static,
650    Msg: Message,
651    Reply: Send + 'static,
652    Pending: Send + 'static,
653    Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
654{
655    let reply_state = match reply_pool.pop() {
656        Some(state) => {
657            state.reset(timeout);
658            state
659        }
660        None => Arc::new(ReplyState::new(timeout)),
661    };
662    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
663    let (message, pending) =
664        catch_unwind(AssertUnwindSafe(|| prepare(input, reply_to))).map_err(|panic| {
665            StreamError::ActorAskTaskFailed {
666                reason: panic_reason(panic),
667            }
668        })?;
669
670    send_actor_message(&actor_ref, message)?;
671
672    Ok(InFlightAskWith {
673        index,
674        state: Some(reply_state),
675        pending: Some(pending),
676        deadline: Instant::now().checked_add(timeout),
677    })
678}
679
680fn wait_for_ready_ask_with_pending<Reply, Pending, Out, Finish>(
681    in_flight: &mut Vec<InFlightAskWith<Reply, Pending>>,
682    timeout: Duration,
683    finish: &Finish,
684) -> CompletedAskWith<Reply, Out>
685where
686    Reply: Send + 'static,
687    Pending: Send + 'static,
688    Out: Send + 'static,
689    Finish: Fn(Pending, Reply) -> StreamResult<Out>,
690{
691    let mut idle_spins = 0;
692    let mut idle_yields = 0;
693    let mut time_refresh = 0_u32;
694    let mut now = Instant::now();
695    loop {
696        if time_refresh == 0 {
697            now = Instant::now();
698        }
699        time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
700        if let Some(ask) = take_ready_ask_with_pending(in_flight, timeout, now, finish) {
701            return ask;
702        }
703
704        if idle_spins < ASK_READY_SPINS {
705            idle_spins += 1;
706            std::hint::spin_loop();
707        } else if idle_yields < ASK_IDLE_YIELDS {
708            idle_yields += 1;
709            time_refresh = 0;
710            thread::yield_now();
711        } else {
712            idle_spins = 0;
713            idle_yields = 0;
714            time_refresh = 0;
715            let current = thread::current();
716            let registered = register_ask_waiters_with_pending(in_flight, &current);
717            now = Instant::now();
718            if let Some(ask) = take_ready_ask_with_pending(in_flight, timeout, now, finish) {
719                unregister_ask_waiters_with_pending(registered, current.id());
720                return ask;
721            }
722            thread::park_timeout(next_ask_park_with_pending(in_flight, now));
723            unregister_ask_waiters_with_pending(registered, current.id());
724        }
725    }
726}
727
728fn take_ready_ask_with_pending<Reply, Pending, Out, Finish>(
729    in_flight: &mut Vec<InFlightAskWith<Reply, Pending>>,
730    timeout: Duration,
731    now: Instant,
732    finish: &Finish,
733) -> Option<CompletedAskWith<Reply, Out>>
734where
735    Finish: Fn(Pending, Reply) -> StreamResult<Out>,
736{
737    let mut index = 0;
738    while index < in_flight.len() {
739        match in_flight[index].state().poll() {
740            ReplyPoll::Ready(reply) => {
741                let ask = in_flight.swap_remove(index);
742                let ask_index = ask.index;
743                let (state, pending) = ask.into_parts();
744                return Some(CompletedAskWith {
745                    index: ask_index,
746                    result: finish(pending, reply),
747                    state,
748                });
749            }
750            ReplyPoll::Dropped => {
751                let ask = in_flight.swap_remove(index);
752                let ask_index = ask.index;
753                let (state, _pending) = ask.into_parts();
754                return Some(CompletedAskWith {
755                    index: ask_index,
756                    result: Err(StreamError::ActorAskResponseDropped),
757                    state,
758                });
759            }
760            ReplyPoll::Pending => {
761                if in_flight[index]
762                    .deadline
763                    .is_some_and(|deadline| now >= deadline)
764                {
765                    let outcome = in_flight[index].state().close_on_timeout();
766                    let ask = in_flight.swap_remove(index);
767                    let ask_index = ask.index;
768                    let (state, pending) = ask.into_parts();
769                    let result = match outcome {
770                        ReplyPoll::Ready(reply) => finish(pending, reply),
771                        ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
772                        ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
773                    };
774                    return Some(CompletedAskWith {
775                        index: ask_index,
776                        result,
777                        state,
778                    });
779                }
780                index += 1;
781            }
782        }
783    }
784    None
785}
786
787fn register_ask_waiters_with_pending<Reply, Pending>(
788    in_flight: &[InFlightAskWith<Reply, Pending>],
789    current: &Thread,
790) -> Vec<Arc<ReplyState<Reply>>> {
791    let mut registered = Vec::with_capacity(in_flight.len());
792    for ask in in_flight {
793        ask.state().register_waiter(current.clone());
794        registered.push(Arc::clone(ask.state()));
795    }
796    registered
797}
798
799fn unregister_ask_waiters_with_pending<Reply>(
800    registered: Vec<Arc<ReplyState<Reply>>>,
801    current_id: ThreadId,
802) {
803    for state in registered {
804        state.unregister_waiter(current_id);
805    }
806}
807
808fn next_ask_park_with_pending<Reply, Pending>(
809    in_flight: &[InFlightAskWith<Reply, Pending>],
810    now: Instant,
811) -> Duration {
812    in_flight
813        .iter()
814        .filter_map(|ask| ask.deadline)
815        .map(|deadline| deadline.saturating_duration_since(now))
816        .min()
817        .unwrap_or(ASK_MAX_PARK)
818        .min(ASK_MAX_PARK)
819}
820
821struct SourceBackpressure<AckMsg> {
822    ack_to: ActorRef<AckMsg>,
823    make_ack: Arc<dyn Fn() -> AckMsg + Send + Sync>,
824    _marker: PhantomData<fn() -> AckMsg>,
825}
826
827impl<AckMsg> Clone for SourceBackpressure<AckMsg> {
828    fn clone(&self) -> Self {
829        Self {
830            ack_to: self.ack_to.clone(),
831            make_ack: Arc::clone(&self.make_ack),
832            _marker: PhantomData,
833        }
834    }
835}
836
837fn actor_source<T, AckMsg>(
838    backpressure: Option<SourceBackpressure<AckMsg>>,
839) -> Source<T, ActorRef<ActorSourceMessage<T>>>
840where
841    T: Send + 'static,
842    AckMsg: Message,
843{
844    Source::from_materialized_factory(move |_materializer| {
845        let shared = Arc::new(ActorSourceShared::new(backpressure.is_none()));
846        let actor = SourceActor {
847            shared: Arc::clone(&shared),
848            backpressure: backpressure.clone(),
849        };
850        let (actor_ref, _handle) =
851            block_on_ractor_runtime(Actor::spawn(None, actor, SourceActorState::default()))?
852                .map_err(|error| {
853                    StreamError::Failed(format!("actor source failed to spawn: {error}"))
854                })?;
855        let stream = ActorSourceStream {
856            shared,
857            actor_ref: Some(actor_ref.clone()),
858            backpressure: backpressure.clone(),
859            terminated: false,
860        };
861        Ok((Box::new(stream) as BoxStream<T>, actor_ref))
862    })
863}
864
865struct ActorSourceShared<T> {
866    inner: Mutex<ActorSourceInner<T>>,
867    available: Condvar,
868}
869
870impl<T> ActorSourceShared<T> {
871    fn new(ready: bool) -> Self {
872        Self {
873            inner: Mutex::new(ActorSourceInner {
874                queue: VecDeque::new(),
875                completed: false,
876                ready,
877            }),
878            available: Condvar::new(),
879        }
880    }
881
882    fn lock(&self) -> MutexGuard<'_, ActorSourceInner<T>> {
883        self.inner
884            .lock()
885            .unwrap_or_else(|poison| poison.into_inner())
886    }
887
888    fn push(&self, item: T) {
889        let mut inner = self.lock();
890        if !inner.completed {
891            inner.queue.push_back(Ok(item));
892        }
893        drop(inner);
894        self.available.notify_all();
895    }
896
897    fn complete(&self) {
898        let mut inner = self.lock();
899        inner.completed = true;
900        drop(inner);
901        self.available.notify_all();
902    }
903
904    fn fail(&self, error: StreamError) {
905        let mut inner = self.lock();
906        inner.queue.clear();
907        inner.queue.push_back(Err(error));
908        inner.completed = true;
909        drop(inner);
910        self.available.notify_all();
911    }
912
913    fn mark_ready(&self) {
914        let mut inner = self.lock();
915        if !inner.completed {
916            inner.ready = true;
917        }
918        drop(inner);
919        self.available.notify_all();
920    }
921}
922
923struct ActorSourceInner<T> {
924    queue: VecDeque<StreamResult<T>>,
925    completed: bool,
926    ready: bool,
927}
928
929struct SourceActor<T, AckMsg> {
930    shared: Arc<ActorSourceShared<T>>,
931    backpressure: Option<SourceBackpressure<AckMsg>>,
932}
933
934#[derive(Default)]
935struct SourceActorState {
936    stopped_by_stream: bool,
937}
938
939impl<T, AckMsg> Actor for SourceActor<T, AckMsg>
940where
941    T: Send + 'static,
942    AckMsg: Message,
943{
944    type Msg = ActorSourceMessage<T>;
945    type State = SourceActorState;
946    type Arguments = SourceActorState;
947
948    async fn pre_start(
949        &self,
950        myself: ActorRef<Self::Msg>,
951        args: Self::Arguments,
952    ) -> ActorResult<Self::State> {
953        if let Some(backpressure) = &self.backpressure {
954            match send_source_ack(backpressure) {
955                Ok(()) => {
956                    self.shared.mark_ready();
957                }
958                Err(error) => {
959                    self.shared.fail(error);
960                    myself.stop(None);
961                }
962            }
963        }
964        Ok(args)
965    }
966
967    async fn handle(
968        &self,
969        myself: ActorRef<Self::Msg>,
970        message: Self::Msg,
971        _state: &mut Self::State,
972    ) -> ActorResult {
973        match message {
974            ActorSourceMessage::Element(item) => {
975                if self.backpressure.is_some() {
976                    let mut inner = self.shared.lock();
977                    if !inner.ready {
978                        inner.queue.clear();
979                        inner.queue.push_back(Err(actor_interop_error(
980                            "actor source backpressure protocol violation: element arrived before ack",
981                        )));
982                        inner.completed = true;
983                        drop(inner);
984                        self.shared.available.notify_all();
985                        myself.stop(None);
986                        return Ok(());
987                    }
988                    inner.ready = false;
989                    drop(inner);
990                }
991                self.shared.push(item);
992            }
993            ActorSourceMessage::Complete => {
994                self.shared.complete();
995                myself.stop(None);
996            }
997            ActorSourceMessage::Fail(reason) => {
998                self.shared.fail(StreamError::Failed(reason));
999                myself.stop(None);
1000            }
1001        }
1002        Ok(())
1003    }
1004
1005    async fn post_stop(
1006        &self,
1007        _myself: ActorRef<Self::Msg>,
1008        state: &mut Self::State,
1009    ) -> ActorResult {
1010        if !state.stopped_by_stream {
1011            self.shared.complete();
1012        }
1013        Ok(())
1014    }
1015}
1016
1017struct ActorSourceStream<T, AckMsg> {
1018    shared: Arc<ActorSourceShared<T>>,
1019    actor_ref: Option<ActorRef<ActorSourceMessage<T>>>,
1020    backpressure: Option<SourceBackpressure<AckMsg>>,
1021    terminated: bool,
1022}
1023
1024impl<T, AckMsg> Iterator for ActorSourceStream<T, AckMsg>
1025where
1026    T: Send + 'static,
1027    AckMsg: Message,
1028{
1029    type Item = StreamResult<T>;
1030
1031    fn next(&mut self) -> Option<Self::Item> {
1032        if self.terminated {
1033            return None;
1034        }
1035
1036        let next = {
1037            let mut inner = self.shared.lock();
1038            loop {
1039                if let Some(item) = inner.queue.pop_front() {
1040                    break Some(item);
1041                }
1042                if inner.completed {
1043                    self.terminated = true;
1044                    break None;
1045                }
1046                inner = self
1047                    .shared
1048                    .available
1049                    .wait(inner)
1050                    .unwrap_or_else(|poison| poison.into_inner());
1051            }
1052        };
1053
1054        if let Some(Ok(_)) = &next
1055            && let Some(backpressure) = &self.backpressure
1056        {
1057            match send_source_ack(backpressure) {
1058                Ok(()) => self.shared.mark_ready(),
1059                Err(error) => {
1060                    self.shared.fail(error.clone());
1061                    return Some(Err(error));
1062                }
1063            }
1064        }
1065
1066        next
1067    }
1068}
1069
1070impl<T, AckMsg> Drop for ActorSourceStream<T, AckMsg> {
1071    fn drop(&mut self) {
1072        if let Some(actor_ref) = self.actor_ref.take() {
1073            actor_ref.stop(None);
1074        }
1075    }
1076}
1077
1078fn send_source_ack<AckMsg>(backpressure: &SourceBackpressure<AckMsg>) -> StreamResult<()>
1079where
1080    AckMsg: Message,
1081{
1082    let ack = catch_unwind(AssertUnwindSafe(|| (backpressure.make_ack)())).map_err(|panic| {
1083        StreamError::Failed(format!(
1084            "actor source ack builder failed: {}",
1085            panic_reason(panic)
1086        ))
1087    })?;
1088    send_actor_message(&backpressure.ack_to, ack)
1089}
1090
1091fn run_actor_ref_sink<In, Msg, Elem, Complete, Failure>(
1092    input: &mut BoxStream<In>,
1093    cancelled: Arc<AtomicBool>,
1094    actor_ref: ActorRef<Msg>,
1095    make_element_message: Arc<Elem>,
1096    on_complete_message: Arc<Complete>,
1097    on_failure_message: Arc<Failure>,
1098) -> StreamResult<NotUsed>
1099where
1100    Msg: Message,
1101    Elem: Fn(In) -> Msg,
1102    Complete: Fn() -> Msg,
1103    Failure: Fn(StreamError) -> Msg,
1104{
1105    loop {
1106        if cancelled.load(Ordering::SeqCst) {
1107            return Err(StreamError::Cancelled);
1108        }
1109        match input.next() {
1110            Some(Ok(item)) => {
1111                let message = catch_unwind(AssertUnwindSafe(|| make_element_message(item)))
1112                    .map_err(actor_sink_panic)?;
1113                send_actor_message(&actor_ref, message)?;
1114            }
1115            Some(Err(error)) => {
1116                let message = catch_unwind(AssertUnwindSafe(|| on_failure_message(error.clone())))
1117                    .map_err(actor_sink_panic)?;
1118                send_actor_message(&actor_ref, message)?;
1119                return Err(error);
1120            }
1121            None => {
1122                let message = catch_unwind(AssertUnwindSafe(|| on_complete_message()))
1123                    .map_err(actor_sink_panic)?;
1124                send_actor_message(&actor_ref, message)?;
1125                return Ok(NotUsed);
1126            }
1127        }
1128    }
1129}
1130
1131fn run_actor_ref_backpressure_sink<In, Msg, Ack, Elem, Complete, Failure>(
1132    input: &mut BoxStream<In>,
1133    cancelled: Arc<AtomicBool>,
1134    actor_ref: ActorRef<Msg>,
1135    timeout: Duration,
1136    make_element_message: Arc<Elem>,
1137    on_complete_message: Arc<Complete>,
1138    on_failure_message: Arc<Failure>,
1139) -> StreamResult<NotUsed>
1140where
1141    Msg: Message,
1142    Ack: Send + 'static,
1143    Elem: Fn(In, ReplyPort<Ack>) -> Msg,
1144    Complete: Fn() -> Msg,
1145    Failure: Fn(StreamError) -> Msg,
1146{
1147    loop {
1148        if cancelled.load(Ordering::SeqCst) {
1149            return Err(StreamError::Cancelled);
1150        }
1151        match input.next() {
1152            Some(Ok(item)) => {
1153                send_and_wait_ack(&actor_ref, timeout, |reply_to| {
1154                    make_element_message(item, reply_to)
1155                })?;
1156            }
1157            Some(Err(error)) => {
1158                let message = catch_unwind(AssertUnwindSafe(|| on_failure_message(error.clone())))
1159                    .map_err(actor_sink_panic)?;
1160                send_actor_message(&actor_ref, message)?;
1161                return Err(error);
1162            }
1163            None => {
1164                let message = catch_unwind(AssertUnwindSafe(|| on_complete_message()))
1165                    .map_err(actor_sink_panic)?;
1166                send_actor_message(&actor_ref, message)?;
1167                return Ok(NotUsed);
1168            }
1169        }
1170    }
1171}
1172
1173fn send_and_wait_ack<Msg, Ack, Build>(
1174    actor_ref: &ActorRef<Msg>,
1175    timeout: Duration,
1176    build: Build,
1177) -> StreamResult<()>
1178where
1179    Msg: Message,
1180    Ack: Send + 'static,
1181    Build: FnOnce(ReplyPort<Ack>) -> Msg,
1182{
1183    let reply_state = Arc::new(ReplyState::new(timeout));
1184    let reply_to = ReplyPort::new(Arc::clone(&reply_state));
1185    let message = catch_unwind(AssertUnwindSafe(|| build(reply_to))).map_err(actor_sink_panic)?;
1186    send_actor_message(actor_ref, message)?;
1187    let ask = InFlightAsk {
1188        index: 0,
1189        state: Some(reply_state),
1190        deadline: Instant::now().checked_add(timeout),
1191    };
1192    let mut in_flight = vec![ask];
1193    wait_for_ready_ask(&mut in_flight, timeout)
1194        .result
1195        .map(|_ack| ())
1196}
1197
1198fn actor_sink_panic(panic: Box<dyn std::any::Any + Send>) -> StreamError {
1199    StreamError::Failed(format!(
1200        "actor sink message builder failed: {}",
1201        panic_reason(panic)
1202    ))
1203}
1204
1205fn send_actor_message<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
1206where
1207    Msg: Message,
1208{
1209    match actor_ref.cast(message) {
1210        Ok(()) => Ok(()),
1211        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
1212            Err(StreamError::ActorTerminated)
1213        }
1214        Err(error) => Err(StreamError::ActorAskSendFailed {
1215            reason: error.to_string(),
1216        }),
1217    }
1218}
1219
1220fn broadcast_to_group<T>(group: &str, message: ActorSourceMessage<T>) -> StreamResult<()>
1221where
1222    T: Clone + Send + 'static,
1223{
1224    for member in ractor::pg::get_members(&group.to_owned()) {
1225        let actor_ref: ActorRef<ActorSourceMessage<T>> = member.into();
1226        match send_actor_message(&actor_ref, message.clone()) {
1227            Ok(()) | Err(StreamError::ActorTerminated) => {}
1228            Err(error) => return Err(error),
1229        }
1230    }
1231    Ok(())
1232}
1233
1234struct WatchShared<T> {
1235    inner: Mutex<WatchInner<T>>,
1236    available: Condvar,
1237}
1238
1239impl<T> Default for WatchShared<T> {
1240    fn default() -> Self {
1241        Self {
1242            inner: Mutex::new(WatchInner {
1243                queue: VecDeque::new(),
1244                event: None,
1245                upstream_done: false,
1246            }),
1247            available: Condvar::new(),
1248        }
1249    }
1250}
1251
1252impl<T> WatchShared<T> {
1253    fn lock(&self) -> MutexGuard<'_, WatchInner<T>> {
1254        self.inner
1255            .lock()
1256            .unwrap_or_else(|poison| poison.into_inner())
1257    }
1258
1259    fn push(&self, item: StreamResult<T>) -> bool {
1260        let mut inner = self.lock();
1261        if inner.event.is_some() || inner.upstream_done {
1262            return false;
1263        }
1264        inner.queue.push_back(item);
1265        drop(inner);
1266        self.available.notify_all();
1267        true
1268    }
1269
1270    fn complete(&self) {
1271        let mut inner = self.lock();
1272        inner.upstream_done = true;
1273        drop(inner);
1274        self.available.notify_all();
1275    }
1276
1277    fn fail(&self, event: WatchEvent) {
1278        let mut inner = self.lock();
1279        if inner.event.is_none() {
1280            inner.queue.clear();
1281            inner.event = Some(event);
1282            inner.upstream_done = true;
1283        }
1284        drop(inner);
1285        self.available.notify_all();
1286    }
1287}
1288
1289struct WatchInner<T> {
1290    queue: VecDeque<StreamResult<T>>,
1291    event: Option<WatchEvent>,
1292    upstream_done: bool,
1293}
1294
1295struct WatchMonitorActor<T> {
1296    watched: ractor::ActorCell,
1297    shared: Arc<WatchShared<T>>,
1298    ready: Mutex<Option<std::sync::mpsc::Sender<()>>>,
1299}
1300
1301impl<T> Actor for WatchMonitorActor<T>
1302where
1303    T: Send + 'static,
1304{
1305    type Msg = ();
1306    type State = ();
1307    type Arguments = ();
1308
1309    async fn pre_start(
1310        &self,
1311        myself: ActorRef<Self::Msg>,
1312        _args: Self::Arguments,
1313    ) -> ActorResult<Self::State> {
1314        myself.monitor(self.watched.clone());
1315        let ready = self
1316            .ready
1317            .lock()
1318            .unwrap_or_else(|poison| poison.into_inner())
1319            .take();
1320        if let Some(ready) = ready {
1321            let _ = ready.send(());
1322        }
1323        Ok(())
1324    }
1325
1326    async fn handle_supervisor_evt(
1327        &self,
1328        myself: ActorRef<Self::Msg>,
1329        event: ractor::SupervisionEvent,
1330        _state: &mut Self::State,
1331    ) -> ActorResult {
1332        match event {
1333            ractor::SupervisionEvent::ActorTerminated(who, _, reason)
1334                if who.get_id() == self.watched.get_id() =>
1335            {
1336                let detail = reason.unwrap_or_else(|| "terminated".to_owned());
1337                self.shared.fail(WatchEvent {
1338                    reason: format!("watched actor terminated: {detail}"),
1339                });
1340                myself.stop(None);
1341            }
1342            ractor::SupervisionEvent::ActorFailed(who, error)
1343                if who.get_id() == self.watched.get_id() =>
1344            {
1345                self.shared.fail(WatchEvent {
1346                    reason: format!("watched actor terminated: {error}"),
1347                });
1348                myself.stop(None);
1349            }
1350            _ => {}
1351        }
1352        Ok(())
1353    }
1354
1355    async fn post_stop(
1356        &self,
1357        myself: ActorRef<Self::Msg>,
1358        _state: &mut Self::State,
1359    ) -> ActorResult {
1360        myself.unmonitor(self.watched.clone());
1361        Ok(())
1362    }
1363}
1364
1365fn spawn_watch_monitor<T>(
1366    watched: ractor::ActorCell,
1367    shared: Arc<WatchShared<T>>,
1368) -> StreamResult<(ActorRef<()>, ractor::concurrency::JoinHandle<()>)>
1369where
1370    T: Send + 'static,
1371{
1372    let (ready_tx, ready_rx) = std::sync::mpsc::channel();
1373    block_on_ractor_runtime(Actor::spawn(
1374        None,
1375        WatchMonitorActor {
1376            watched,
1377            shared,
1378            ready: Mutex::new(Some(ready_tx)),
1379        },
1380        (),
1381    ))?
1382    .map_err(|error| StreamError::Failed(format!("watch monitor failed to spawn: {error}")))
1383    .and_then(|(monitor_ref, handle)| {
1384        ready_rx.recv_timeout(Duration::from_secs(1)).map_err(|_| {
1385            monitor_ref.stop(None);
1386            StreamError::Failed("watch monitor did not become ready".to_owned())
1387        })?;
1388        Ok((monitor_ref, handle))
1389    })
1390}
1391
1392struct WatchStream<T> {
1393    shared: Arc<WatchShared<T>>,
1394    monitor_ref: Option<ActorRef<()>>,
1395    producer_completion: Option<StreamCompletion<NotUsed>>,
1396    terminated: bool,
1397}
1398
1399impl<T> Iterator for WatchStream<T>
1400where
1401    T: Send + 'static,
1402{
1403    type Item = StreamResult<T>;
1404
1405    fn next(&mut self) -> Option<Self::Item> {
1406        if self.terminated {
1407            return None;
1408        }
1409
1410        let mut inner = self.shared.lock();
1411        loop {
1412            if let Some(event) = inner.event.take() {
1413                self.terminated = true;
1414                return Some(Err(event.into_stream_error()));
1415            }
1416            if let Some(item) = inner.queue.pop_front() {
1417                if item.is_err() {
1418                    self.terminated = true;
1419                }
1420                return Some(item);
1421            }
1422            if inner.upstream_done {
1423                self.terminated = true;
1424                return None;
1425            }
1426            inner = self
1427                .shared
1428                .available
1429                .wait(inner)
1430                .unwrap_or_else(|poison| poison.into_inner());
1431        }
1432    }
1433}
1434
1435impl<T> Drop for WatchStream<T> {
1436    fn drop(&mut self) {
1437        if let Some(monitor_ref) = self.monitor_ref.take() {
1438            monitor_ref.stop(None);
1439        }
1440        drop(self.producer_completion.take());
1441    }
1442}
1443
1444fn run_watch_upstream<T>(
1445    mut input: BoxStream<T>,
1446    shared: Arc<WatchShared<T>>,
1447    cancelled: Arc<AtomicBool>,
1448) -> StreamResult<NotUsed>
1449where
1450    T: Send + 'static,
1451{
1452    loop {
1453        if cancelled.load(Ordering::SeqCst) {
1454            shared.complete();
1455            return Err(StreamError::Cancelled);
1456        }
1457        match input.next() {
1458            Some(Ok(item)) => {
1459                if !shared.push(Ok(item)) {
1460                    return Err(StreamError::Cancelled);
1461                }
1462            }
1463            Some(Err(error)) => {
1464                let _ = shared.push(Err(error.clone()));
1465                shared.complete();
1466                return Err(error);
1467            }
1468            None => {
1469                shared.complete();
1470                return Ok(NotUsed);
1471            }
1472        }
1473    }
1474}
1475
1476fn actor_interop_error(reason: impl Into<String>) -> StreamError {
1477    StreamError::Failed(reason.into())
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482    use super::*;
1483    use crate::stream::{Keep, Sink};
1484    use std::{
1485        sync::{
1486            Arc as StdArc, Mutex as StdMutex,
1487            atomic::{AtomicUsize, Ordering},
1488            mpsc,
1489        },
1490        time::{Duration as StdDuration, Instant as StdInstant},
1491    };
1492
1493    fn wait_until<F>(timeout: StdDuration, mut condition: F) -> bool
1494    where
1495        F: FnMut() -> bool,
1496    {
1497        let deadline = StdInstant::now() + timeout;
1498        while StdInstant::now() < deadline {
1499            if condition() {
1500                return true;
1501            }
1502            std::thread::park_timeout(StdDuration::from_millis(1));
1503        }
1504        condition()
1505    }
1506
1507    fn spawn_actor<A>(
1508        actor: A,
1509        args: A::Arguments,
1510    ) -> (ActorRef<A::Msg>, ractor::concurrency::JoinHandle<()>)
1511    where
1512        A: Actor,
1513        A::Msg: Message,
1514        A::Arguments: Send + 'static,
1515    {
1516        block_on_ractor_runtime(Actor::spawn(None, actor, args))
1517            .expect("ractor runtime runs")
1518            .expect("actor spawns")
1519    }
1520
1521    fn stop_actor<Msg>(actor_ref: ActorRef<Msg>, handle: ractor::concurrency::JoinHandle<()>)
1522    where
1523        Msg: Message,
1524    {
1525        actor_ref.stop(None);
1526        block_on_ractor_runtime(async move {
1527            handle.await.expect("actor task joins");
1528        })
1529        .expect("ractor runtime joins actor");
1530    }
1531
1532    enum AskInteropMsg {
1533        Status {
1534            input: u64,
1535            reply_to: ReplyPort<ActorStatus<u64>>,
1536        },
1537        Plain {
1538            input: u64,
1539            reply_to: ReplyPort<u64>,
1540        },
1541    }
1542
1543    #[cfg(feature = "cluster")]
1544    impl Message for AskInteropMsg {}
1545
1546    struct AskInteropActor;
1547
1548    impl Actor for AskInteropActor {
1549        type Msg = AskInteropMsg;
1550        type State = ();
1551        type Arguments = ();
1552
1553        async fn pre_start(
1554            &self,
1555            _myself: ActorRef<Self::Msg>,
1556            _args: Self::Arguments,
1557        ) -> ActorResult<Self::State> {
1558            Ok(())
1559        }
1560
1561        async fn handle(
1562            &self,
1563            _myself: ActorRef<Self::Msg>,
1564            message: Self::Msg,
1565            _state: &mut Self::State,
1566        ) -> ActorResult {
1567            match message {
1568                AskInteropMsg::Status { input: 0, reply_to } => {
1569                    let _ =
1570                        reply_to.send(ActorStatus::Err(StreamError::Failed("bad status".into())));
1571                }
1572                AskInteropMsg::Status { input, reply_to } => {
1573                    let _ = reply_to.send(ActorStatus::Ok(input + 10));
1574                }
1575                AskInteropMsg::Plain { input, reply_to } => {
1576                    let _ = reply_to.send(input * 2);
1577                }
1578            }
1579            Ok(())
1580        }
1581    }
1582
1583    #[test]
1584    fn ask_with_status_unwraps_ok_and_fails_err() {
1585        let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
1586
1587        let values = Source::from_iter([1_u64, 2])
1588            .via(ActorFlow::ask_with_status(
1589                actor_ref.clone(),
1590                2,
1591                Duration::from_secs(1),
1592                |input, reply_to| AskInteropMsg::Status { input, reply_to },
1593            ))
1594            .run_collect()
1595            .unwrap();
1596        assert_eq!(values, vec![11, 12]);
1597
1598        let failed = Source::single(0_u64)
1599            .via(ActorFlow::ask_with_status(
1600                actor_ref.clone(),
1601                1,
1602                Duration::from_secs(1),
1603                |input, reply_to| AskInteropMsg::Status { input, reply_to },
1604            ))
1605            .run_collect();
1606        assert_eq!(failed, Err(StreamError::Failed("bad status".into())));
1607
1608        stop_actor(actor_ref, handle);
1609    }
1610
1611    #[test]
1612    fn ask_with_context_preserves_context() {
1613        let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
1614
1615        let values = Source::from_iter([1_u64, 2, 3])
1616            .as_source_with_context(|item| item + 100)
1617            .via(ActorFlow::ask_with_context(
1618                actor_ref.clone(),
1619                2,
1620                Duration::from_secs(1),
1621                |input, reply_to| AskInteropMsg::Plain { input, reply_to },
1622            ))
1623            .run_collect()
1624            .unwrap();
1625
1626        assert_eq!(values, vec![(2, 101), (4, 102), (6, 103)]);
1627        stop_actor(actor_ref, handle);
1628    }
1629
1630    #[test]
1631    fn ask_with_status_and_context_preserves_context_and_fails_status() {
1632        let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
1633
1634        let values = Source::from_iter([1_u64, 2])
1635            .as_source_with_context(|item| item + 10)
1636            .via(ActorFlow::ask_with_status_and_context(
1637                actor_ref.clone(),
1638                2,
1639                Duration::from_secs(1),
1640                |input, reply_to| AskInteropMsg::Status { input, reply_to },
1641            ))
1642            .run_collect()
1643            .unwrap();
1644        assert_eq!(values, vec![(11, 11), (12, 12)]);
1645
1646        let failed = Source::single(0_u64)
1647            .as_source_with_context(|_| 99_u64)
1648            .via(ActorFlow::ask_with_status_and_context(
1649                actor_ref.clone(),
1650                1,
1651                Duration::from_secs(1),
1652                |input, reply_to| AskInteropMsg::Status { input, reply_to },
1653            ))
1654            .run_collect();
1655        assert_eq!(failed, Err(StreamError::Failed("bad status".into())));
1656
1657        stop_actor(actor_ref, handle);
1658    }
1659
1660    #[test]
1661    fn actor_source_actor_ref_emits_and_completes() {
1662        let (actor_ref, completion) = ActorSource::actor_ref::<u64>()
1663            .to_mat(Sink::collect(), Keep::both)
1664            .run()
1665            .unwrap();
1666
1667        actor_ref.cast(ActorSourceMessage::Element(1)).unwrap();
1668        actor_ref.cast(ActorSourceMessage::Element(2)).unwrap();
1669        actor_ref.cast(ActorSourceMessage::Complete).unwrap();
1670
1671        assert_eq!(completion.wait().unwrap(), vec![1, 2]);
1672    }
1673
1674    #[derive(Clone)]
1675    enum SourceAckMsg {
1676        Ack,
1677    }
1678
1679    #[cfg(feature = "cluster")]
1680    impl Message for SourceAckMsg {}
1681
1682    struct SourceAckActor {
1683        count: StdArc<AtomicUsize>,
1684    }
1685
1686    impl Actor for SourceAckActor {
1687        type Msg = SourceAckMsg;
1688        type State = ();
1689        type Arguments = ();
1690
1691        async fn pre_start(
1692            &self,
1693            _myself: ActorRef<Self::Msg>,
1694            _args: Self::Arguments,
1695        ) -> ActorResult<Self::State> {
1696            Ok(())
1697        }
1698
1699        async fn handle(
1700            &self,
1701            _myself: ActorRef<Self::Msg>,
1702            _message: Self::Msg,
1703            _state: &mut Self::State,
1704        ) -> ActorResult {
1705            self.count.fetch_add(1, Ordering::SeqCst);
1706            Ok(())
1707        }
1708    }
1709
1710    #[test]
1711    fn actor_source_with_backpressure_acks_startup_and_each_element() {
1712        let ack_count = StdArc::new(AtomicUsize::new(0));
1713        let (ack_ref, ack_handle) = spawn_actor(
1714            SourceAckActor {
1715                count: StdArc::clone(&ack_count),
1716            },
1717            (),
1718        );
1719
1720        let (actor_ref, completion) =
1721            ActorSource::actor_ref_with_backpressure::<u64, SourceAckMsg>(
1722                ack_ref.clone(),
1723                SourceAckMsg::Ack,
1724            )
1725            .to_mat(Sink::collect(), Keep::both)
1726            .run()
1727            .unwrap();
1728
1729        assert!(wait_until(StdDuration::from_secs(1), || {
1730            ack_count.load(Ordering::SeqCst) >= 1
1731        }));
1732        actor_ref.cast(ActorSourceMessage::Element(1)).unwrap();
1733        assert!(wait_until(StdDuration::from_secs(1), || {
1734            ack_count.load(Ordering::SeqCst) >= 2
1735        }));
1736        actor_ref.cast(ActorSourceMessage::Element(2)).unwrap();
1737        assert!(wait_until(StdDuration::from_secs(1), || {
1738            ack_count.load(Ordering::SeqCst) >= 3
1739        }));
1740        actor_ref.cast(ActorSourceMessage::Complete).unwrap();
1741
1742        assert_eq!(completion.wait().unwrap(), vec![1, 2]);
1743        stop_actor(ack_ref, ack_handle);
1744    }
1745
1746    #[derive(Debug, Clone, PartialEq, Eq)]
1747    enum SinkEvent {
1748        Element(u64),
1749        Complete,
1750        Fail(String),
1751    }
1752
1753    #[cfg(feature = "cluster")]
1754    impl Message for SinkEvent {}
1755
1756    struct EventActor {
1757        sender: mpsc::Sender<SinkEvent>,
1758    }
1759
1760    impl Actor for EventActor {
1761        type Msg = SinkEvent;
1762        type State = ();
1763        type Arguments = ();
1764
1765        async fn pre_start(
1766            &self,
1767            _myself: ActorRef<Self::Msg>,
1768            _args: Self::Arguments,
1769        ) -> ActorResult<Self::State> {
1770            Ok(())
1771        }
1772
1773        async fn handle(
1774            &self,
1775            _myself: ActorRef<Self::Msg>,
1776            message: Self::Msg,
1777            _state: &mut Self::State,
1778        ) -> ActorResult {
1779            self.sender
1780                .send(message)
1781                .expect("event receiver stays open");
1782            Ok(())
1783        }
1784    }
1785
1786    #[test]
1787    fn actor_sink_actor_ref_sends_elements_and_complete() {
1788        let (tx, rx) = mpsc::channel();
1789        let (actor_ref, handle) = spawn_actor(EventActor { sender: tx }, ());
1790
1791        Source::from_iter([1_u64, 2])
1792            .run_with(ActorSink::actor_ref(
1793                actor_ref.clone(),
1794                SinkEvent::Element,
1795                || SinkEvent::Complete,
1796                |error| SinkEvent::Fail(error.to_string()),
1797            ))
1798            .unwrap()
1799            .wait()
1800            .unwrap();
1801
1802        assert_eq!(
1803            [
1804                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1805                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1806                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1807            ],
1808            [
1809                SinkEvent::Element(1),
1810                SinkEvent::Element(2),
1811                SinkEvent::Complete,
1812            ]
1813        );
1814        stop_actor(actor_ref, handle);
1815    }
1816
1817    #[derive(Debug, Clone, PartialEq, Eq)]
1818    enum BackpressureEvent {
1819        Init,
1820        Element(u64),
1821        Complete,
1822        Fail(String),
1823    }
1824
1825    enum BackpressureSinkMsg {
1826        Init(ReplyPort<()>),
1827        Element(u64, ReplyPort<()>),
1828        Complete,
1829        Fail(String),
1830    }
1831
1832    #[cfg(feature = "cluster")]
1833    impl Message for BackpressureSinkMsg {}
1834
1835    struct BackpressureSinkActor {
1836        sender: mpsc::Sender<BackpressureEvent>,
1837    }
1838
1839    impl Actor for BackpressureSinkActor {
1840        type Msg = BackpressureSinkMsg;
1841        type State = ();
1842        type Arguments = ();
1843
1844        async fn pre_start(
1845            &self,
1846            _myself: ActorRef<Self::Msg>,
1847            _args: Self::Arguments,
1848        ) -> ActorResult<Self::State> {
1849            Ok(())
1850        }
1851
1852        async fn handle(
1853            &self,
1854            _myself: ActorRef<Self::Msg>,
1855            message: Self::Msg,
1856            _state: &mut Self::State,
1857        ) -> ActorResult {
1858            match message {
1859                BackpressureSinkMsg::Init(reply_to) => {
1860                    self.sender.send(BackpressureEvent::Init).unwrap();
1861                    let _ = reply_to.send(());
1862                }
1863                BackpressureSinkMsg::Element(item, reply_to) => {
1864                    self.sender.send(BackpressureEvent::Element(item)).unwrap();
1865                    let _ = reply_to.send(());
1866                }
1867                BackpressureSinkMsg::Complete => {
1868                    self.sender.send(BackpressureEvent::Complete).unwrap();
1869                }
1870                BackpressureSinkMsg::Fail(error) => {
1871                    self.sender.send(BackpressureEvent::Fail(error)).unwrap();
1872                }
1873            }
1874            Ok(())
1875        }
1876    }
1877
1878    #[test]
1879    fn actor_sink_with_backpressure_waits_for_init_and_element_acks() {
1880        let (tx, rx) = mpsc::channel();
1881        let (actor_ref, handle) = spawn_actor(BackpressureSinkActor { sender: tx }, ());
1882
1883        Source::from_iter([1_u64, 2])
1884            .run_with(ActorSink::actor_ref_with_backpressure(
1885                actor_ref.clone(),
1886                Duration::from_secs(1),
1887                BackpressureSinkMsg::Init,
1888                BackpressureSinkMsg::Element,
1889                || BackpressureSinkMsg::Complete,
1890                |error| BackpressureSinkMsg::Fail(error.to_string()),
1891            ))
1892            .unwrap()
1893            .wait()
1894            .unwrap();
1895
1896        assert_eq!(
1897            [
1898                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1899                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1900                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1901                rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1902            ],
1903            [
1904                BackpressureEvent::Init,
1905                BackpressureEvent::Element(1),
1906                BackpressureEvent::Element(2),
1907                BackpressureEvent::Complete,
1908            ]
1909        );
1910        stop_actor(actor_ref, handle);
1911    }
1912
1913    enum WatchMsg {
1914        Stop,
1915    }
1916
1917    #[cfg(feature = "cluster")]
1918    impl Message for WatchMsg {}
1919
1920    struct WatchActor;
1921
1922    impl Actor for WatchActor {
1923        type Msg = WatchMsg;
1924        type State = ();
1925        type Arguments = ();
1926
1927        async fn pre_start(
1928            &self,
1929            _myself: ActorRef<Self::Msg>,
1930            _args: Self::Arguments,
1931        ) -> ActorResult<Self::State> {
1932            Ok(())
1933        }
1934
1935        async fn handle(
1936            &self,
1937            myself: ActorRef<Self::Msg>,
1938            _message: Self::Msg,
1939            _state: &mut Self::State,
1940        ) -> ActorResult {
1941            myself.stop(Some("watched-stop".into()));
1942            Ok(())
1943        }
1944    }
1945
1946    #[test]
1947    fn watch_fails_idle_stream_when_actor_terminates() {
1948        let (watched_ref, watched_handle) = spawn_actor(WatchActor, ());
1949        let (source_ref, completion) = ActorSource::actor_ref::<u64>()
1950            .via(ActorFlow::watch(watched_ref.clone()))
1951            .to_mat(Sink::collect(), Keep::both)
1952            .run()
1953            .unwrap();
1954
1955        watched_ref.cast(WatchMsg::Stop).unwrap();
1956        assert!(wait_until(StdDuration::from_secs(1), || {
1957            watched_ref.get_status() == ractor::ActorStatus::Stopped
1958        }));
1959
1960        let result = completion.wait();
1961        let _ = source_ref.cast(ActorSourceMessage::Complete);
1962        assert!(
1963            matches!(result, Err(StreamError::Failed(reason)) if reason.contains("watched actor terminated"))
1964        );
1965        block_on_ractor_runtime(async move {
1966            watched_handle.await.expect("watched actor joins");
1967        })
1968        .expect("ractor runtime joins watched actor");
1969    }
1970
1971    static NEXT_GROUP: AtomicUsize = AtomicUsize::new(0);
1972
1973    #[test]
1974    fn pubsub_sink_broadcasts_to_pg_source() {
1975        let group = format!(
1976            "datum-test-pubsub-{}",
1977            NEXT_GROUP.fetch_add(1, Ordering::SeqCst)
1978        );
1979
1980        let (_source_ref, completion) = ActorPubSub::source::<u64>(group.clone())
1981            .to_mat(Sink::collect(), Keep::both)
1982            .run()
1983            .unwrap();
1984
1985        assert!(wait_until(StdDuration::from_secs(1), || {
1986            !ractor::pg::get_members(&group).is_empty()
1987        }));
1988
1989        Source::from_iter([1_u64, 2])
1990            .run_with(ActorPubSub::sink(group))
1991            .unwrap()
1992            .wait()
1993            .unwrap();
1994
1995        assert_eq!(completion.wait().unwrap(), vec![1, 2]);
1996    }
1997
1998    #[test]
1999    fn actor_source_fail_message_fails_stream() {
2000        let (actor_ref, completion) = ActorSource::typed::<u64>()
2001            .to_mat(Sink::collect(), Keep::both)
2002            .run()
2003            .unwrap();
2004
2005        actor_ref
2006            .cast(ActorSourceMessage::Fail("source failed".into()))
2007            .unwrap();
2008
2009        assert_eq!(
2010            completion.wait(),
2011            Err(StreamError::Failed("source failed".into()))
2012        );
2013    }
2014
2015    #[test]
2016    fn actor_sink_typed_sends_protocol_messages() {
2017        let received = StdArc::new(StdMutex::new(Vec::<String>::new()));
2018
2019        struct TypedSinkActor {
2020            received: StdArc<StdMutex<Vec<String>>>,
2021        }
2022
2023        impl Actor for TypedSinkActor {
2024            type Msg = ActorSinkMessage<u64>;
2025            type State = ();
2026            type Arguments = ();
2027
2028            async fn pre_start(
2029                &self,
2030                _myself: ActorRef<Self::Msg>,
2031                _args: Self::Arguments,
2032            ) -> ActorResult<Self::State> {
2033                Ok(())
2034            }
2035
2036            async fn handle(
2037                &self,
2038                _myself: ActorRef<Self::Msg>,
2039                message: Self::Msg,
2040                _state: &mut Self::State,
2041            ) -> ActorResult {
2042                let label = match message {
2043                    ActorSinkMessage::Element(item) => format!("element:{item}"),
2044                    ActorSinkMessage::Complete => "complete".to_owned(),
2045                    ActorSinkMessage::Fail(error) => format!("fail:{error}"),
2046                };
2047                self.received.lock().unwrap().push(label);
2048                Ok(())
2049            }
2050        }
2051
2052        let (actor_ref, handle) = spawn_actor(
2053            TypedSinkActor {
2054                received: StdArc::clone(&received),
2055            },
2056            (),
2057        );
2058
2059        Source::from_iter([7_u64])
2060            .run_with(ActorSink::typed(actor_ref.clone()))
2061            .unwrap()
2062            .wait()
2063            .unwrap();
2064
2065        assert!(wait_until(StdDuration::from_secs(1), || {
2066            received.lock().unwrap().len() == 2
2067        }));
2068        assert_eq!(
2069            *received.lock().unwrap(),
2070            vec!["element:7".to_owned(), "complete".to_owned()]
2071        );
2072        stop_actor(actor_ref, handle);
2073    }
2074}