Skip to main content

datum/actor/
stream_ref.rs

1use std::{
2    collections::VecDeque,
3    fmt,
4    sync::{
5        Arc, Condvar, Mutex, MutexGuard,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, Instant},
9};
10
11use crate::stream::{
12    BoxStream, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult,
13};
14
15use super::{Actor, ActorProcessingErr, ActorRef, ActorResult, Message, block_on_ractor_runtime};
16
17const DEFAULT_STREAM_REF_BUFFER_CAPACITY: usize = 32;
18const DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(30);
19const DEFAULT_STREAM_REF_DEMAND_REDELIVERY: Duration = Duration::from_secs(1);
20const STREAM_REF_WAIT_POLL: Duration = Duration::from_millis(1);
21
22/// Settings used by [`StreamRefs`] endpoints.
23///
24/// The defaults mirror Akka StreamRefs: a 32 element receive buffer, a 30 second
25/// subscription timeout, and 1 second demand redelivery. Demand is cumulative,
26/// so redelivery is idempotent.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct StreamRefSettings {
29    buffer_capacity: usize,
30    subscription_timeout: Duration,
31    demand_redelivery_interval: Duration,
32}
33
34impl Default for StreamRefSettings {
35    fn default() -> Self {
36        Self {
37            buffer_capacity: DEFAULT_STREAM_REF_BUFFER_CAPACITY,
38            subscription_timeout: DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT,
39            demand_redelivery_interval: DEFAULT_STREAM_REF_DEMAND_REDELIVERY,
40        }
41    }
42}
43
44impl StreamRefSettings {
45    #[must_use]
46    pub fn buffer_capacity(&self) -> usize {
47        self.buffer_capacity
48    }
49
50    #[must_use]
51    pub fn subscription_timeout(&self) -> Duration {
52        self.subscription_timeout
53    }
54
55    #[must_use]
56    pub fn demand_redelivery_interval(&self) -> Duration {
57        self.demand_redelivery_interval
58    }
59
60    #[must_use]
61    pub fn with_buffer_capacity(mut self, capacity: usize) -> Self {
62        assert!(
63            capacity > 0,
64            "StreamRef buffer capacity must be greater than zero"
65        );
66        self.buffer_capacity = capacity;
67        self
68    }
69
70    #[must_use]
71    pub fn with_subscription_timeout(mut self, timeout: Duration) -> Self {
72        self.subscription_timeout = timeout;
73        self
74    }
75
76    #[must_use]
77    pub fn with_demand_redelivery_interval(mut self, interval: Duration) -> Self {
78        self.demand_redelivery_interval = interval;
79        self
80    }
81}
82
83/// Factories for Akka-style stream references.
84///
85/// `source_ref` is a local sink that materializes a [`SourceRef`]. The returned
86/// handle can be used once to create a remote source. `sink_ref` is the dual: a
87/// local source that materializes a [`SinkRef`], which can be used once as a
88/// remote sink.
89pub struct StreamRefs;
90
91impl StreamRefs {
92    #[must_use]
93    pub fn source_ref<T>() -> Sink<T, SourceRef<T>>
94    where
95        T: Send + 'static,
96    {
97        Self::source_ref_with_settings(StreamRefSettings::default())
98    }
99
100    #[must_use]
101    pub fn source_ref_with_settings<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
102    where
103        T: Send + 'static,
104    {
105        stream_ref_source_sink(settings)
106    }
107
108    #[must_use]
109    pub fn sink_ref<T>() -> Source<T, SinkRef<T>>
110    where
111        T: Send + 'static,
112    {
113        Self::sink_ref_with_settings(StreamRefSettings::default())
114    }
115
116    #[must_use]
117    pub fn sink_ref_with_settings<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
118    where
119        T: Send + 'static,
120    {
121        stream_ref_sink_source(settings)
122    }
123}
124
125/// A handle that exposes a remote stream as a local [`Source`].
126///
127/// Stream refs are one-shot pairings. Calling [`SourceRef::source`] more than
128/// once returns a source that fails when materialized.
129pub struct SourceRef<T> {
130    inner: Arc<SourceRefInner<T>>,
131}
132
133struct SourceRefInner<T> {
134    producer: ActorRef<ProducerCommand<T>>,
135    settings: StreamRefSettings,
136    subscribed: AtomicBool,
137    _keep_alive: Mutex<Option<StreamCompletion<NotUsed>>>,
138}
139
140impl<T> Clone for SourceRef<T> {
141    fn clone(&self) -> Self {
142        Self {
143            inner: Arc::clone(&self.inner),
144        }
145    }
146}
147
148impl<T> fmt::Debug for SourceRef<T> {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("SourceRef").finish_non_exhaustive()
151    }
152}
153
154impl<T> SourceRef<T>
155where
156    T: Send + 'static,
157{
158    #[must_use]
159    pub fn source(&self) -> Source<T, NotUsed> {
160        let inner = Arc::clone(&self.inner);
161        Source::from_materialized_factory(move |_materializer| {
162            if inner.subscribed.swap(true, Ordering::SeqCst) {
163                return Ok((
164                    failed_once("source ref has already been materialized"),
165                    NotUsed,
166                ));
167            }
168
169            let shared = Arc::new(ConsumerShared::new(inner.settings));
170            let (consumer_ref, _handle) =
171                spawn_consumer_actor(Some(inner.producer.clone()), Arc::clone(&shared))?;
172            Ok((
173                Box::new(ConsumerStream {
174                    shared,
175                    actor_ref: Some(consumer_ref),
176                    settings: inner.settings,
177                    terminated: false,
178                    source_ref_keep_alive: Some(Arc::clone(&inner)),
179                }) as BoxStream<T>,
180                NotUsed,
181            ))
182        })
183    }
184}
185
186/// A handle that exposes a remote stream as a local [`Sink`].
187///
188/// Stream refs are one-shot pairings. Calling [`SinkRef::sink`] more than once
189/// returns a sink whose materialized completion fails.
190pub struct SinkRef<T> {
191    inner: Arc<SinkRefInner<T>>,
192}
193
194struct SinkRefInner<T> {
195    consumer: ActorRef<ConsumerCommand<T>>,
196    settings: StreamRefSettings,
197    subscribed: AtomicBool,
198}
199
200impl<T> Clone for SinkRef<T> {
201    fn clone(&self) -> Self {
202        Self {
203            inner: Arc::clone(&self.inner),
204        }
205    }
206}
207
208impl<T> fmt::Debug for SinkRef<T> {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.debug_struct("SinkRef").finish_non_exhaustive()
211    }
212}
213
214impl<T> SinkRef<T>
215where
216    T: Send + 'static,
217{
218    #[must_use]
219    pub fn sink(&self) -> Sink<T, StreamCompletion<NotUsed>> {
220        let inner = Arc::clone(&self.inner);
221        Sink::from_runner(move |input, materializer| {
222            if inner.subscribed.swap(true, Ordering::SeqCst) {
223                return Ok(StreamCompletion::ready(Err(StreamError::Failed(
224                    "sink ref has already been materialized".to_owned(),
225                ))));
226            }
227
228            let shared = Arc::new(ProducerShared::new());
229            let (producer_ref, _handle) =
230                spawn_producer_actor(Some(inner.consumer.clone()), Arc::clone(&shared))?;
231            let settings = inner.settings;
232            Ok(materializer.spawn_stream(move |cancelled| {
233                run_producer_endpoint(input, shared, producer_ref, settings, cancelled)
234            }))
235        })
236    }
237}
238
239enum ProducerCommand<T> {
240    Subscribe {
241        consumer: ActorRef<ConsumerCommand<T>>,
242    },
243    Demand {
244        consumer: ActorRef<ConsumerCommand<T>>,
245        cumulative: u64,
246    },
247    Cancel {
248        consumer: ActorRef<ConsumerCommand<T>>,
249    },
250    RemoteFailure {
251        consumer: ActorRef<ConsumerCommand<T>>,
252        error: StreamError,
253    },
254    Ack,
255}
256
257#[cfg(feature = "cluster")]
258impl<T: Send + 'static> Message for ProducerCommand<T> {}
259
260enum ConsumerCommand<T> {
261    OnSubscribe {
262        producer: ActorRef<ProducerCommand<T>>,
263    },
264    Element {
265        producer: ActorRef<ProducerCommand<T>>,
266        seq: u64,
267        item: T,
268    },
269    Complete {
270        producer: ActorRef<ProducerCommand<T>>,
271        seq: u64,
272    },
273    Failure {
274        producer: ActorRef<ProducerCommand<T>>,
275        error: StreamError,
276    },
277}
278
279#[cfg(feature = "cluster")]
280impl<T: Send + 'static> Message for ConsumerCommand<T> {}
281
282struct ProducerShared<T> {
283    inner: Mutex<ProducerInner<T>>,
284    changed: Condvar,
285}
286
287struct ProducerInner<T> {
288    consumer: Option<ActorRef<ConsumerCommand<T>>>,
289    cumulative_demand: u64,
290    sent: u64,
291    stopped: Option<StreamError>,
292}
293
294impl<T> ProducerShared<T>
295where
296    T: Send + 'static,
297{
298    fn new() -> Self {
299        Self {
300            inner: Mutex::new(ProducerInner {
301                consumer: None,
302                cumulative_demand: 0,
303                sent: 0,
304                stopped: None,
305            }),
306            changed: Condvar::new(),
307        }
308    }
309
310    fn lock(&self) -> MutexGuard<'_, ProducerInner<T>> {
311        self.inner
312            .lock()
313            .unwrap_or_else(|poison| poison.into_inner())
314    }
315
316    fn set_consumer(
317        &self,
318        consumer: ActorRef<ConsumerCommand<T>>,
319    ) -> Result<bool, ActorRef<ConsumerCommand<T>>> {
320        let mut inner = self.lock();
321        match &inner.consumer {
322            Some(existing) if !same_actor(existing, &consumer) => Err(consumer),
323            Some(_) => Ok(false),
324            None => {
325                inner.consumer = Some(consumer);
326                drop(inner);
327                self.changed.notify_all();
328                Ok(true)
329            }
330        }
331    }
332
333    fn update_demand(&self, consumer: &ActorRef<ConsumerCommand<T>>, cumulative: u64) {
334        let mut inner = self.lock();
335        if inner
336            .consumer
337            .as_ref()
338            .is_some_and(|existing| same_actor(existing, consumer))
339            && cumulative > inner.cumulative_demand
340        {
341            inner.cumulative_demand = cumulative;
342            drop(inner);
343            self.changed.notify_all();
344        }
345    }
346
347    fn stop_from_consumer(&self, consumer: &ActorRef<ConsumerCommand<T>>, error: StreamError) {
348        let mut inner = self.lock();
349        if inner
350            .consumer
351            .as_ref()
352            .is_none_or(|existing| same_actor(existing, consumer))
353            && inner.stopped.is_none()
354        {
355            inner.stopped = Some(error);
356            drop(inner);
357            self.changed.notify_all();
358        }
359    }
360
361    fn stop_unless_finished(&self, error: StreamError) {
362        let mut inner = self.lock();
363        if inner.stopped.is_none() {
364            inner.stopped = Some(error);
365            drop(inner);
366            self.changed.notify_all();
367        }
368    }
369}
370
371struct ConsumerShared<T> {
372    inner: Mutex<ConsumerInner<T>>,
373    changed: Condvar,
374}
375
376struct ConsumerInner<T> {
377    producer: Option<ActorRef<ProducerCommand<T>>>,
378    queue: VecDeque<T>,
379    terminal: Option<ConsumerTerminal>,
380    expected_seq: u64,
381    delivered: u64,
382    cumulative_demand: u64,
383}
384
385#[derive(Clone)]
386enum ConsumerTerminal {
387    Complete,
388    Error(StreamError),
389}
390
391impl<T> ConsumerShared<T>
392where
393    T: Send + 'static,
394{
395    fn new(_settings: StreamRefSettings) -> Self {
396        Self {
397            inner: Mutex::new(ConsumerInner {
398                producer: None,
399                queue: VecDeque::new(),
400                terminal: None,
401                expected_seq: 0,
402                delivered: 0,
403                cumulative_demand: 0,
404            }),
405            changed: Condvar::new(),
406        }
407    }
408
409    fn lock(&self) -> MutexGuard<'_, ConsumerInner<T>> {
410        self.inner
411            .lock()
412            .unwrap_or_else(|poison| poison.into_inner())
413    }
414
415    fn set_producer(&self, producer: ActorRef<ProducerCommand<T>>) -> bool {
416        let mut inner = self.lock();
417        match &inner.producer {
418            Some(existing) => same_actor(existing, &producer),
419            None => {
420                inner.producer = Some(producer);
421                drop(inner);
422                self.changed.notify_all();
423                true
424            }
425        }
426    }
427
428    fn push(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64, item: T) {
429        let mut inner = self.lock();
430        if inner.terminal.is_some() || !producer_matches(&inner, producer) {
431            return;
432        }
433        if seq != inner.expected_seq {
434            inner.queue.clear();
435            inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
436                inner.expected_seq,
437                seq,
438                "stream ref element sequence gap",
439            )));
440        } else {
441            inner.expected_seq += 1;
442            inner.queue.push_back(item);
443        }
444        drop(inner);
445        self.changed.notify_all();
446    }
447
448    fn complete(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64) {
449        let mut inner = self.lock();
450        if inner.terminal.is_some() || !producer_matches(&inner, producer) {
451            return;
452        }
453        if seq != inner.expected_seq {
454            inner.queue.clear();
455            inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
456                inner.expected_seq,
457                seq,
458                "stream ref completion sequence gap",
459            )));
460        } else {
461            inner.terminal = Some(ConsumerTerminal::Complete);
462        }
463        drop(inner);
464        self.changed.notify_all();
465    }
466
467    fn fail(&self, producer: &ActorRef<ProducerCommand<T>>, error: StreamError) {
468        let mut inner = self.lock();
469        if inner.terminal.is_some() || !producer_matches(&inner, producer) {
470            return;
471        }
472        inner.queue.clear();
473        inner.terminal = Some(ConsumerTerminal::Error(error));
474        drop(inner);
475        self.changed.notify_all();
476    }
477
478    fn fail_local(&self, error: StreamError) {
479        let mut inner = self.lock();
480        if inner.terminal.is_none() {
481            inner.queue.clear();
482            inner.terminal = Some(ConsumerTerminal::Error(error));
483            drop(inner);
484            self.changed.notify_all();
485        }
486    }
487}
488
489fn producer_matches<T>(inner: &ConsumerInner<T>, producer: &ActorRef<ProducerCommand<T>>) -> bool
490where
491    T: Send + 'static,
492{
493    inner
494        .producer
495        .as_ref()
496        .is_some_and(|existing| same_actor(existing, producer))
497}
498
499struct ProducerActor<T> {
500    shared: Arc<ProducerShared<T>>,
501    initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
502}
503
504impl<T> Actor for ProducerActor<T>
505where
506    T: Send + 'static,
507{
508    type Msg = ProducerCommand<T>;
509    type State = ();
510    type Arguments = ();
511
512    async fn pre_start(
513        &self,
514        myself: ActorRef<Self::Msg>,
515        _args: Self::Arguments,
516    ) -> Result<Self::State, ActorProcessingErr> {
517        if let Some(consumer) = &self.initial_consumer {
518            register_producer_consumer(&self.shared, myself, consumer.clone());
519        }
520        Ok(())
521    }
522
523    async fn handle(
524        &self,
525        myself: ActorRef<Self::Msg>,
526        message: Self::Msg,
527        _state: &mut Self::State,
528    ) -> ActorResult {
529        match message {
530            ProducerCommand::Subscribe { consumer } => {
531                register_producer_consumer(&self.shared, myself, consumer);
532            }
533            ProducerCommand::Demand {
534                consumer,
535                cumulative,
536            } => self.shared.update_demand(&consumer, cumulative),
537            ProducerCommand::Cancel { consumer } => {
538                self.shared
539                    .stop_from_consumer(&consumer, StreamError::Cancelled);
540            }
541            ProducerCommand::RemoteFailure { consumer, error } => {
542                self.shared.stop_from_consumer(&consumer, error);
543            }
544            ProducerCommand::Ack => myself.stop(None),
545        }
546        Ok(())
547    }
548
549    async fn post_stop(
550        &self,
551        _myself: ActorRef<Self::Msg>,
552        _state: &mut Self::State,
553    ) -> ActorResult {
554        self.shared
555            .stop_unless_finished(StreamError::ActorTerminated);
556        Ok(())
557    }
558}
559
560fn register_producer_consumer<T>(
561    shared: &Arc<ProducerShared<T>>,
562    producer: ActorRef<ProducerCommand<T>>,
563    consumer: ActorRef<ConsumerCommand<T>>,
564) where
565    T: Send + 'static,
566{
567    match shared.set_consumer(consumer.clone()) {
568        Ok(true) | Ok(false) => {
569            let _ = cast_actor(
570                &consumer,
571                ConsumerCommand::OnSubscribe {
572                    producer: producer.clone(),
573                },
574            );
575        }
576        Err(duplicate) => {
577            let _ = cast_actor(
578                &duplicate,
579                ConsumerCommand::Failure {
580                    producer,
581                    error: StreamError::Failed(
582                        "stream ref was already subscribed by another endpoint".to_owned(),
583                    ),
584                },
585            );
586        }
587    }
588}
589
590struct ConsumerActor<T> {
591    shared: Arc<ConsumerShared<T>>,
592    initial_producer: Option<ActorRef<ProducerCommand<T>>>,
593}
594
595impl<T> Actor for ConsumerActor<T>
596where
597    T: Send + 'static,
598{
599    type Msg = ConsumerCommand<T>;
600    type State = ();
601    type Arguments = ();
602
603    async fn pre_start(
604        &self,
605        myself: ActorRef<Self::Msg>,
606        _args: Self::Arguments,
607    ) -> Result<Self::State, ActorProcessingErr> {
608        if let Some(producer) = &self.initial_producer {
609            self.shared.set_producer(producer.clone());
610            if let Err(error) = cast_actor(
611                producer,
612                ProducerCommand::Subscribe {
613                    consumer: myself.clone(),
614                },
615            ) {
616                self.shared.fail_local(error);
617                myself.stop(None);
618            }
619        }
620        Ok(())
621    }
622
623    async fn handle(
624        &self,
625        _myself: ActorRef<Self::Msg>,
626        message: Self::Msg,
627        _state: &mut Self::State,
628    ) -> ActorResult {
629        match message {
630            ConsumerCommand::OnSubscribe { producer } => {
631                if !self.shared.set_producer(producer.clone()) {
632                    let _ = cast_actor(
633                        &producer,
634                        ProducerCommand::RemoteFailure {
635                            consumer: _myself.clone(),
636                            error: StreamError::Failed(
637                                "stream ref was already subscribed by another endpoint".to_owned(),
638                            ),
639                        },
640                    );
641                }
642            }
643            ConsumerCommand::Element {
644                producer,
645                seq,
646                item,
647            } => self.shared.push(&producer, seq, item),
648            ConsumerCommand::Complete { producer, seq } => {
649                self.shared.complete(&producer, seq);
650                let _ = cast_actor(&producer, ProducerCommand::Ack);
651            }
652            ConsumerCommand::Failure { producer, error } => {
653                self.shared.fail(&producer, error);
654                let _ = cast_actor(&producer, ProducerCommand::Ack);
655            }
656        }
657        Ok(())
658    }
659
660    async fn post_stop(
661        &self,
662        _myself: ActorRef<Self::Msg>,
663        _state: &mut Self::State,
664    ) -> ActorResult {
665        self.shared.fail_local(StreamError::ActorTerminated);
666        Ok(())
667    }
668}
669
670struct ConsumerStream<T>
671where
672    T: Send + 'static,
673{
674    shared: Arc<ConsumerShared<T>>,
675    actor_ref: Option<ActorRef<ConsumerCommand<T>>>,
676    settings: StreamRefSettings,
677    terminated: bool,
678    source_ref_keep_alive: Option<Arc<SourceRefInner<T>>>,
679}
680
681impl<T> Iterator for ConsumerStream<T>
682where
683    T: Send + 'static,
684{
685    type Item = StreamResult<T>;
686
687    fn next(&mut self) -> Option<Self::Item> {
688        if self.terminated {
689            return None;
690        }
691
692        if let Err(error) = self.wait_for_subscription() {
693            self.terminated = true;
694            return Some(Err(error));
695        }
696
697        if let Err(error) = self.redeliver_or_extend_demand() {
698            self.terminated = true;
699            return Some(Err(error));
700        }
701
702        let mut next_redelivery = next_redelivery_deadline(self.settings);
703        loop {
704            let demand_after_pop = {
705                let mut inner = self.shared.lock();
706                if let Some(item) = inner.queue.pop_front() {
707                    inner.delivered = inner.delivered.saturating_add(1);
708                    let demand = next_demand(&mut inner, self.settings);
709                    drop(inner);
710                    if let Some((producer, cumulative)) = demand
711                        && let Err(error) = send_demand(&self.actor_ref, &producer, cumulative)
712                    {
713                        self.terminated = true;
714                        return Some(Err(error));
715                    }
716                    return Some(Ok(item));
717                }
718
719                if let Some(terminal) = inner.terminal.clone() {
720                    drop(inner);
721                    match terminal {
722                        ConsumerTerminal::Complete => {
723                            self.terminated = true;
724                            self.stop_actor();
725                            return None;
726                        }
727                        ConsumerTerminal::Error(error) => {
728                            self.terminated = true;
729                            self.stop_actor();
730                            return Some(Err(error));
731                        }
732                    }
733                }
734                None::<(ActorRef<ProducerCommand<T>>, u64)>
735            };
736            debug_assert!(demand_after_pop.is_none());
737
738            let now = Instant::now();
739            let timeout = next_redelivery.saturating_duration_since(now);
740            let mut inner = self.shared.lock();
741            if !inner.queue.is_empty() || inner.terminal.is_some() {
742                continue;
743            }
744            let (next_inner, result) = wait_timeout_unpoison(
745                &self.shared.changed,
746                inner,
747                timeout.min(STREAM_REF_WAIT_POLL),
748            );
749            inner = next_inner;
750            drop(inner);
751            if result.timed_out() && Instant::now() >= next_redelivery {
752                if let Err(error) = self.redeliver_demand() {
753                    self.terminated = true;
754                    return Some(Err(error));
755                }
756                next_redelivery = next_redelivery_deadline(self.settings);
757            }
758        }
759    }
760}
761
762impl<T> ConsumerStream<T>
763where
764    T: Send + 'static,
765{
766    fn wait_for_subscription(&self) -> StreamResult<()> {
767        let deadline = Instant::now()
768            .checked_add(self.settings.subscription_timeout)
769            .unwrap_or_else(far_future);
770        let mut inner = self.shared.lock();
771        loop {
772            if inner.producer.is_some() {
773                return Ok(());
774            }
775            if let Some(terminal) = inner.terminal.clone() {
776                return match terminal {
777                    ConsumerTerminal::Complete => Ok(()),
778                    ConsumerTerminal::Error(error) => Err(error),
779                };
780            }
781            let now = Instant::now();
782            if now >= deadline {
783                drop(inner);
784                let error = subscription_timeout_error("stream ref source");
785                self.shared.fail_local(error.clone());
786                self.stop_actor_ref();
787                return Err(error);
788            }
789            let remaining = deadline.saturating_duration_since(now);
790            let (next, _) = wait_timeout_unpoison(
791                &self.shared.changed,
792                inner,
793                remaining.min(STREAM_REF_WAIT_POLL),
794            );
795            inner = next;
796        }
797    }
798
799    fn redeliver_or_extend_demand(&self) -> StreamResult<()> {
800        let demand = {
801            let mut inner = self.shared.lock();
802            next_demand(&mut inner, self.settings)
803        };
804        if let Some((producer, cumulative)) = demand {
805            send_demand(&self.actor_ref, &producer, cumulative)?;
806        }
807        Ok(())
808    }
809
810    fn redeliver_demand(&self) -> StreamResult<()> {
811        let (producer, cumulative) = {
812            let inner = self.shared.lock();
813            let Some(producer) = inner.producer.clone() else {
814                return Ok(());
815            };
816            if inner.cumulative_demand == 0 {
817                return Ok(());
818            }
819            (producer, inner.cumulative_demand)
820        };
821        send_demand(&self.actor_ref, &producer, cumulative)
822    }
823
824    fn stop_actor(&mut self) {
825        if let Some(actor_ref) = self.actor_ref.take() {
826            actor_ref.stop(None);
827        }
828    }
829
830    fn stop_actor_ref(&self) {
831        if let Some(actor_ref) = &self.actor_ref {
832            actor_ref.stop(None);
833        }
834    }
835}
836
837impl<T> Drop for ConsumerStream<T>
838where
839    T: Send + 'static,
840{
841    fn drop(&mut self) {
842        if !self.terminated {
843            let producer = self.shared.lock().producer.clone();
844            if let (Some(consumer), Some(producer)) = (&self.actor_ref, producer) {
845                let _ = cast_actor(
846                    &producer,
847                    ProducerCommand::Cancel {
848                        consumer: consumer.clone(),
849                    },
850                );
851            }
852        }
853        self.stop_actor();
854        drop(self.source_ref_keep_alive.take());
855    }
856}
857
858fn next_demand<T>(
859    inner: &mut ConsumerInner<T>,
860    settings: StreamRefSettings,
861) -> Option<(ActorRef<ProducerCommand<T>>, u64)> {
862    if inner.terminal.is_some() {
863        return None;
864    }
865    let target = inner
866        .delivered
867        .saturating_add(settings.buffer_capacity as u64);
868    if inner.cumulative_demand >= target {
869        return None;
870    }
871    inner.cumulative_demand = target;
872    inner
873        .producer
874        .as_ref()
875        .map(|producer| (producer.clone(), inner.cumulative_demand))
876}
877
878fn send_demand<T>(
879    consumer: &Option<ActorRef<ConsumerCommand<T>>>,
880    producer: &ActorRef<ProducerCommand<T>>,
881    cumulative: u64,
882) -> StreamResult<()>
883where
884    T: Send + 'static,
885{
886    let Some(consumer) = consumer else {
887        return Err(StreamError::ActorTerminated);
888    };
889    cast_actor(
890        producer,
891        ProducerCommand::Demand {
892            consumer: consumer.clone(),
893            cumulative,
894        },
895    )
896}
897
898fn stream_ref_source_sink<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
899where
900    T: Send + 'static,
901{
902    Sink::from_runner(move |input, materializer| {
903        let shared = Arc::new(ProducerShared::new());
904        let (producer_ref, _handle) = spawn_producer_actor(None, Arc::clone(&shared))?;
905        let producer_for_task = producer_ref.clone();
906        let completion = materializer.spawn_stream(move |cancelled| {
907            run_producer_endpoint(input, shared, producer_for_task, settings, cancelled)
908        });
909        Ok(SourceRef {
910            inner: Arc::new(SourceRefInner {
911                producer: producer_ref,
912                settings,
913                subscribed: AtomicBool::new(false),
914                _keep_alive: Mutex::new(Some(completion)),
915            }),
916        })
917    })
918}
919
920fn stream_ref_sink_source<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
921where
922    T: Send + 'static,
923{
924    Source::from_materialized_factory(move |_materializer| {
925        let shared = Arc::new(ConsumerShared::new(settings));
926        let (consumer_ref, _handle) = spawn_consumer_actor(None, Arc::clone(&shared))?;
927        let sink_ref = SinkRef {
928            inner: Arc::new(SinkRefInner {
929                consumer: consumer_ref.clone(),
930                settings,
931                subscribed: AtomicBool::new(false),
932            }),
933        };
934        Ok((
935            Box::new(ConsumerStream {
936                shared,
937                actor_ref: Some(consumer_ref),
938                settings,
939                terminated: false,
940                source_ref_keep_alive: None,
941            }) as BoxStream<T>,
942            sink_ref,
943        ))
944    })
945}
946
947fn run_producer_endpoint<T>(
948    mut input: BoxStream<T>,
949    shared: Arc<ProducerShared<T>>,
950    producer_ref: ActorRef<ProducerCommand<T>>,
951    settings: StreamRefSettings,
952    cancelled: Arc<AtomicBool>,
953) -> StreamResult<NotUsed>
954where
955    T: Send + 'static,
956{
957    let deadline = Instant::now()
958        .checked_add(settings.subscription_timeout)
959        .unwrap_or_else(far_future);
960
961    loop {
962        let consumer = match wait_for_remote_demand(&shared, deadline, &cancelled) {
963            Ok(consumer) => consumer,
964            Err(error) => {
965                producer_ref.stop(None);
966                return Err(error);
967            }
968        };
969        if cancelled.load(Ordering::SeqCst) {
970            return Err(StreamError::Cancelled);
971        }
972
973        match input.next() {
974            Some(Ok(item)) => {
975                let seq = {
976                    let mut inner = shared.lock();
977                    let seq = inner.sent;
978                    inner.sent = inner.sent.saturating_add(1);
979                    seq
980                };
981                if let Err(error) = cast_actor(
982                    &consumer,
983                    ConsumerCommand::Element {
984                        producer: producer_ref.clone(),
985                        seq,
986                        item,
987                    },
988                ) {
989                    return Err(match error {
990                        StreamError::ActorTerminated => StreamError::Cancelled,
991                        other => other,
992                    });
993                }
994            }
995            Some(Err(error)) => {
996                let _ = cast_actor(
997                    &consumer,
998                    ConsumerCommand::Failure {
999                        producer: producer_ref.clone(),
1000                        error: error.clone(),
1001                    },
1002                );
1003                return Err(error);
1004            }
1005            None => {
1006                let seq = shared.lock().sent;
1007                let _ = cast_actor(
1008                    &consumer,
1009                    ConsumerCommand::Complete {
1010                        producer: producer_ref.clone(),
1011                        seq,
1012                    },
1013                );
1014                return Ok(NotUsed);
1015            }
1016        }
1017    }
1018}
1019
1020fn wait_for_remote_demand<T>(
1021    shared: &Arc<ProducerShared<T>>,
1022    deadline: Instant,
1023    cancelled: &Arc<AtomicBool>,
1024) -> StreamResult<ActorRef<ConsumerCommand<T>>>
1025where
1026    T: Send + 'static,
1027{
1028    let mut inner = shared.lock();
1029    loop {
1030        if cancelled.load(Ordering::SeqCst) {
1031            return Err(StreamError::Cancelled);
1032        }
1033        if let Some(error) = inner.stopped.clone() {
1034            return Err(error);
1035        }
1036        if let Some(consumer) = &inner.consumer
1037            && inner.sent < inner.cumulative_demand
1038        {
1039            return Ok(consumer.clone());
1040        }
1041        let now = Instant::now();
1042        if inner.consumer.is_none() && now >= deadline {
1043            return Err(subscription_timeout_error("stream ref sink"));
1044        }
1045        let remaining = deadline.saturating_duration_since(now);
1046        let timeout = if inner.consumer.is_none() {
1047            remaining.min(STREAM_REF_WAIT_POLL)
1048        } else {
1049            STREAM_REF_WAIT_POLL
1050        };
1051        let (next, _) = wait_timeout_unpoison(&shared.changed, inner, timeout);
1052        inner = next;
1053    }
1054}
1055
1056fn spawn_producer_actor<T>(
1057    initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
1058    shared: Arc<ProducerShared<T>>,
1059) -> StreamResult<(
1060    ActorRef<ProducerCommand<T>>,
1061    ractor::concurrency::JoinHandle<()>,
1062)>
1063where
1064    T: Send + 'static,
1065{
1066    block_on_ractor_runtime(Actor::spawn(
1067        None,
1068        ProducerActor {
1069            shared,
1070            initial_consumer,
1071        },
1072        (),
1073    ))?
1074    .map_err(|error| {
1075        StreamError::Failed(format!(
1076            "stream ref producer actor failed to spawn: {error}"
1077        ))
1078    })
1079}
1080
1081fn spawn_consumer_actor<T>(
1082    initial_producer: Option<ActorRef<ProducerCommand<T>>>,
1083    shared: Arc<ConsumerShared<T>>,
1084) -> StreamResult<(
1085    ActorRef<ConsumerCommand<T>>,
1086    ractor::concurrency::JoinHandle<()>,
1087)>
1088where
1089    T: Send + 'static,
1090{
1091    block_on_ractor_runtime(Actor::spawn(
1092        None,
1093        ConsumerActor {
1094            shared,
1095            initial_producer,
1096        },
1097        (),
1098    ))?
1099    .map_err(|error| {
1100        StreamError::Failed(format!(
1101            "stream ref consumer actor failed to spawn: {error}"
1102        ))
1103    })
1104}
1105
1106fn cast_actor<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
1107where
1108    Msg: Message,
1109{
1110    match actor_ref.cast(message) {
1111        Ok(()) => Ok(()),
1112        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
1113            Err(StreamError::ActorTerminated)
1114        }
1115        Err(error) => Err(StreamError::ActorAskSendFailed {
1116            reason: error.to_string(),
1117        }),
1118    }
1119}
1120
1121fn same_actor<MsgA, MsgB>(left: &ActorRef<MsgA>, right: &ActorRef<MsgB>) -> bool
1122where
1123    MsgA: Message,
1124    MsgB: Message,
1125{
1126    left.get_cell().get_id() == right.get_cell().get_id()
1127}
1128
1129fn failed_once<T>(reason: &str) -> BoxStream<T>
1130where
1131    T: Send + 'static,
1132{
1133    let error = StreamError::Failed(reason.to_owned());
1134    Box::new(std::iter::once(Err(error)))
1135}
1136
1137fn subscription_timeout_error(side: &str) -> StreamError {
1138    StreamError::Failed(format!(
1139        "{side} remote side did not subscribe within subscription timeout"
1140    ))
1141}
1142
1143fn invalid_sequence_error(expected: u64, got: u64, context: &str) -> StreamError {
1144    StreamError::Failed(format!(
1145        "{context}: expected sequence {expected}, got {got}"
1146    ))
1147}
1148
1149fn next_redelivery_deadline(settings: StreamRefSettings) -> Instant {
1150    Instant::now()
1151        .checked_add(settings.demand_redelivery_interval)
1152        .unwrap_or_else(far_future)
1153}
1154
1155fn far_future() -> Instant {
1156    Instant::now() + Duration::from_secs(60 * 60 * 24 * 365)
1157}
1158
1159fn wait_timeout_unpoison<'a, T>(
1160    condvar: &Condvar,
1161    guard: MutexGuard<'a, T>,
1162    timeout: Duration,
1163) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
1164    condvar
1165        .wait_timeout(guard, timeout)
1166        .unwrap_or_else(|poison| poison.into_inner())
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171    use super::*;
1172    use crate::{
1173        stream::{Keep, Source},
1174        testkit::TestSink,
1175    };
1176    use std::sync::{
1177        Arc as StdArc,
1178        atomic::{AtomicBool, AtomicUsize, Ordering},
1179    };
1180
1181    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1182        let deadline = Instant::now() + timeout;
1183        while Instant::now() < deadline {
1184            if condition() {
1185                return true;
1186            }
1187            std::thread::park_timeout(Duration::from_millis(1));
1188        }
1189        condition()
1190    }
1191
1192    fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
1193        let deadline = Instant::now() + timeout;
1194        while Instant::now() < deadline {
1195            assert!(condition());
1196            std::thread::park_timeout(Duration::from_millis(1));
1197        }
1198        assert!(condition());
1199    }
1200
1201    fn short_settings() -> StreamRefSettings {
1202        StreamRefSettings::default()
1203            .with_buffer_capacity(1)
1204            .with_subscription_timeout(Duration::from_millis(50))
1205            .with_demand_redelivery_interval(Duration::from_millis(10))
1206    }
1207
1208    #[test]
1209    fn source_ref_streams_elements_and_completion() {
1210        let source_ref = Source::from_iter(1_u64..=3)
1211            .run_with(StreamRefs::source_ref())
1212            .unwrap();
1213
1214        assert_eq!(source_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
1215    }
1216
1217    #[test]
1218    fn sink_ref_streams_elements_and_completion() {
1219        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1220            .to_mat(Sink::collect(), Keep::both)
1221            .run()
1222            .unwrap();
1223
1224        Source::from_iter(1_u64..=3)
1225            .run_with(sink_ref.sink())
1226            .unwrap()
1227            .wait()
1228            .unwrap();
1229
1230        assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
1231    }
1232
1233    #[test]
1234    fn source_ref_propagates_upstream_failure() {
1235        let source_ref = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
1236            .run_with(StreamRefs::source_ref())
1237            .unwrap();
1238
1239        assert_eq!(
1240            source_ref.source().run_collect(),
1241            Err(StreamError::Failed("boom".to_owned()))
1242        );
1243    }
1244
1245    #[test]
1246    fn sink_ref_propagates_upstream_failure() {
1247        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1248            .to_mat(Sink::collect(), Keep::both)
1249            .run()
1250            .unwrap();
1251
1252        let failure = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
1253            .run_with(sink_ref.sink())
1254            .unwrap()
1255            .wait();
1256
1257        assert_eq!(failure, Err(StreamError::Failed("remote boom".to_owned())));
1258        assert_eq!(
1259            completion.wait(),
1260            Err(StreamError::Failed("remote boom".to_owned()))
1261        );
1262    }
1263
1264    #[test]
1265    fn source_ref_cancellation_reaches_origin() {
1266        let closed = StdArc::new(AtomicBool::new(false));
1267        let close_flag = StdArc::clone(&closed);
1268        let source = Source::unfold_resource(
1269            || Ok(()),
1270            |_state| Ok(Some(1_u64)),
1271            move |_state| {
1272                close_flag.store(true, Ordering::SeqCst);
1273                Ok(())
1274            },
1275        );
1276        let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
1277
1278        assert_eq!(source_ref.source().take(1).run_collect().unwrap(), vec![1]);
1279        assert!(wait_until(Duration::from_secs(1), || {
1280            closed.load(Ordering::SeqCst)
1281        }));
1282    }
1283
1284    #[test]
1285    fn sink_ref_cancellation_stops_remote_producer() {
1286        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1287            .take(1)
1288            .to_mat(Sink::collect(), Keep::both)
1289            .run()
1290            .unwrap();
1291
1292        let producer = Source::repeat(1_u64)
1293            .run_with(sink_ref.sink())
1294            .unwrap()
1295            .wait();
1296
1297        assert_eq!(completion.wait().unwrap(), vec![1]);
1298        assert_eq!(producer, Err(StreamError::Cancelled));
1299    }
1300
1301    #[test]
1302    fn source_ref_backpressures_across_ref() {
1303        let pulled = StdArc::new(AtomicUsize::new(0));
1304        let pulled_for_source = StdArc::clone(&pulled);
1305        let source = Source::unfold(0_u64, move |next| {
1306            pulled_for_source.fetch_add(1, Ordering::SeqCst);
1307            Some((next + 1, next))
1308        });
1309        let source_ref = source
1310            .run_with(StreamRefs::source_ref_with_settings(short_settings()))
1311            .unwrap();
1312        let mut probe = source_ref.source().run_with(TestSink::probe()).unwrap();
1313
1314        probe.request(1);
1315        probe.assert_next(0);
1316        assert!(wait_until(Duration::from_secs(1), || {
1317            pulled.load(Ordering::SeqCst) >= 2
1318        }));
1319        assert_condition_holds(Duration::from_millis(50), || {
1320            pulled.load(Ordering::SeqCst) <= 2
1321        });
1322
1323        probe.request(1);
1324        probe.assert_next(1);
1325        assert!(wait_until(Duration::from_secs(1), || {
1326            pulled.load(Ordering::SeqCst) >= 3
1327        }));
1328        assert_condition_holds(Duration::from_millis(50), || {
1329            pulled.load(Ordering::SeqCst) <= 3
1330        });
1331
1332        probe.cancel();
1333    }
1334
1335    #[test]
1336    fn source_ref_late_subscription_observes_timeout() {
1337        let source_ref = Source::repeat(1_u64)
1338            .run_with(StreamRefs::source_ref_with_settings(short_settings()))
1339            .unwrap();
1340
1341        assert!(wait_until(Duration::from_secs(1), || {
1342            source_ref.inner.producer.get_status() == ractor::ActorStatus::Stopped
1343        }));
1344
1345        let result = source_ref.source().run_collect();
1346        assert!(matches!(
1347            result,
1348            Err(StreamError::ActorTerminated) | Err(StreamError::Failed(_))
1349        ));
1350    }
1351
1352    #[test]
1353    fn sink_ref_subscription_timeout_fails_local_source() {
1354        let (_sink_ref, probe) = StreamRefs::sink_ref_with_settings::<u64>(short_settings())
1355            .to_mat(TestSink::probe(), Keep::both)
1356            .run()
1357            .unwrap();
1358
1359        probe.request(1);
1360        let error = probe.expect_error();
1361        assert!(
1362            matches!(error, StreamError::Failed(message) if message.contains("did not subscribe"))
1363        );
1364    }
1365
1366    #[test]
1367    fn stream_refs_are_one_shot() {
1368        let source_ref = Source::from_iter([1_u64])
1369            .run_with(StreamRefs::source_ref())
1370            .unwrap();
1371        assert_eq!(source_ref.source().run_collect().unwrap(), vec![1]);
1372        assert!(matches!(
1373            source_ref.source().run_collect(),
1374            Err(StreamError::Failed(message)) if message.contains("already")
1375        ));
1376
1377        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1378            .to_mat(Sink::collect(), Keep::both)
1379            .run()
1380            .unwrap();
1381        Source::single(1_u64)
1382            .run_with(sink_ref.sink())
1383            .unwrap()
1384            .wait()
1385            .unwrap();
1386        assert!(matches!(
1387            Source::single(2_u64).run_with(sink_ref.sink()).unwrap().wait(),
1388            Err(StreamError::Failed(message)) if message.contains("already")
1389        ));
1390        assert_eq!(completion.wait().unwrap(), vec![1]);
1391    }
1392}