Skip to main content

datum/actor/
stream_ref.rs

1use std::{
2    collections::VecDeque,
3    fmt,
4    sync::{
5        Arc, Condvar, Mutex, MutexGuard, Weak,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, Instant},
9};
10
11use futures::channel::oneshot;
12#[cfg(feature = "cluster")]
13use ractor::{ActorCell, BytesConvertable};
14#[cfg(feature = "cluster")]
15use std::{
16    sync::atomic::AtomicU64,
17    time::{SystemTime, UNIX_EPOCH},
18};
19
20use crate::stream::{
21    BoxStream, Cancellable, NotUsed, Sink, Source, StreamCancellation, StreamCompletion,
22    StreamError, StreamResult,
23};
24
25use super::{Actor, ActorProcessingErr, ActorRef, ActorResult, Message, block_on_ractor_runtime};
26
27const DEFAULT_STREAM_REF_BUFFER_CAPACITY: usize = 32;
28const DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(30);
29const DEFAULT_STREAM_REF_DEMAND_REDELIVERY: Duration = Duration::from_secs(1);
30const STREAM_REF_WAIT_POLL: Duration = Duration::from_millis(1);
31#[cfg(feature = "cluster")]
32const STREAM_REF_REMOTE_SCOPE: &str = "datum.streamrefs";
33#[cfg(feature = "cluster")]
34static STREAM_REF_REMOTE_ID: AtomicU64 = AtomicU64::new(1);
35
36#[cfg(feature = "cluster")]
37mod stream_ref_actor_bound {
38    pub trait Bound: ractor::BytesConvertable {}
39
40    impl<T> Bound for T where T: ractor::BytesConvertable {}
41}
42
43#[cfg(not(feature = "cluster"))]
44mod stream_ref_actor_bound {
45    pub trait Bound {}
46
47    impl<T> Bound for T {}
48}
49
50/// Settings used by [`StreamRefs`] endpoints.
51///
52/// The defaults mirror Akka StreamRefs: a 32 element receive buffer, a 30 second
53/// subscription timeout, and 1 second demand redelivery. Demand is cumulative,
54/// so redelivery is idempotent.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct StreamRefSettings {
57    buffer_capacity: usize,
58    subscription_timeout: Duration,
59    demand_redelivery_interval: Duration,
60}
61
62impl Default for StreamRefSettings {
63    fn default() -> Self {
64        Self {
65            buffer_capacity: DEFAULT_STREAM_REF_BUFFER_CAPACITY,
66            subscription_timeout: DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT,
67            demand_redelivery_interval: DEFAULT_STREAM_REF_DEMAND_REDELIVERY,
68        }
69    }
70}
71
72impl StreamRefSettings {
73    #[must_use]
74    pub fn buffer_capacity(&self) -> usize {
75        self.buffer_capacity
76    }
77
78    #[must_use]
79    pub fn subscription_timeout(&self) -> Duration {
80        self.subscription_timeout
81    }
82
83    #[must_use]
84    pub fn demand_redelivery_interval(&self) -> Duration {
85        self.demand_redelivery_interval
86    }
87
88    #[must_use]
89    pub fn with_buffer_capacity(mut self, capacity: usize) -> Self {
90        assert!(
91            capacity > 0,
92            "StreamRef buffer capacity must be greater than zero"
93        );
94        self.buffer_capacity = capacity;
95        self
96    }
97
98    #[must_use]
99    pub fn with_subscription_timeout(mut self, timeout: Duration) -> Self {
100        self.subscription_timeout = timeout;
101        self
102    }
103
104    #[must_use]
105    pub fn with_demand_redelivery_interval(mut self, interval: Duration) -> Self {
106        self.demand_redelivery_interval = interval;
107        self
108    }
109}
110
111#[cfg(feature = "cluster")]
112#[derive(Debug, Clone, PartialEq, Eq)]
113struct RemoteStreamRefEndpoint {
114    scope: String,
115    group: String,
116}
117
118#[cfg(feature = "cluster")]
119impl RemoteStreamRefEndpoint {
120    fn new(kind: &str) -> Self {
121        let sequence = STREAM_REF_REMOTE_ID.fetch_add(1, Ordering::Relaxed);
122        let timestamp = SystemTime::now()
123            .duration_since(UNIX_EPOCH)
124            .map(|duration| duration.as_nanos())
125            .unwrap_or_default();
126        Self {
127            scope: STREAM_REF_REMOTE_SCOPE.to_owned(),
128            group: format!(
129                "datum-streamref-{kind}-{}-{sequence}-{timestamp}",
130                std::process::id()
131            ),
132        }
133    }
134}
135
136#[cfg(feature = "cluster")]
137#[derive(Debug, Clone)]
138enum RemoteRefEnvelope {
139    Ready {
140        endpoint: RemoteStreamRefEndpoint,
141        settings: StreamRefSettings,
142    },
143    Failed(StreamError),
144}
145
146#[cfg(feature = "cluster")]
147enum RemoteSourceRefState {
148    Ready(RemoteStreamRefEndpoint),
149    Failed(StreamError),
150}
151
152#[cfg(feature = "cluster")]
153enum RemoteSinkRefState {
154    Ready(RemoteStreamRefEndpoint),
155    Failed(StreamError),
156}
157
158#[cfg(feature = "cluster")]
159impl BytesConvertable for RemoteStreamRefEndpoint {
160    fn into_bytes(self) -> Vec<u8> {
161        let mut bytes = Vec::new();
162        put_string(&mut bytes, self.scope);
163        put_string(&mut bytes, self.group);
164        bytes
165    }
166
167    fn from_bytes(bytes: Vec<u8>) -> Self {
168        let mut cursor = 0;
169        let scope = take_string(&bytes, &mut cursor);
170        let group = take_string(&bytes, &mut cursor);
171        Self { scope, group }
172    }
173}
174
175#[cfg(feature = "cluster")]
176impl BytesConvertable for StreamRefSettings {
177    fn into_bytes(self) -> Vec<u8> {
178        let mut bytes = Vec::with_capacity(40);
179        put_u64(&mut bytes, self.buffer_capacity as u64);
180        put_duration(&mut bytes, self.subscription_timeout);
181        put_duration(&mut bytes, self.demand_redelivery_interval);
182        bytes
183    }
184
185    fn from_bytes(bytes: Vec<u8>) -> Self {
186        let mut cursor = 0;
187        let buffer_capacity = take_u64(&bytes, &mut cursor) as usize;
188        let subscription_timeout = take_duration(&bytes, &mut cursor);
189        let demand_redelivery_interval = take_duration(&bytes, &mut cursor);
190        Self {
191            buffer_capacity,
192            subscription_timeout,
193            demand_redelivery_interval,
194        }
195    }
196}
197
198#[cfg(feature = "cluster")]
199impl BytesConvertable for StreamError {
200    fn into_bytes(self) -> Vec<u8> {
201        let mut bytes = Vec::new();
202        match self {
203            StreamError::Cancelled => put_u8(&mut bytes, 0),
204            StreamError::AbruptTermination => put_u8(&mut bytes, 1),
205            StreamError::Backpressured => put_u8(&mut bytes, 2),
206            StreamError::EmptyStream => put_u8(&mut bytes, 3),
207            StreamError::MaybeIncomplete => put_u8(&mut bytes, 4),
208            StreamError::LimitExceeded { max } => {
209                put_u8(&mut bytes, 5);
210                put_u64(&mut bytes, max);
211            }
212            StreamError::InvalidPortOperation {
213                operation,
214                port,
215                reason,
216            } => {
217                put_u8(&mut bytes, 6);
218                put_string(&mut bytes, operation.to_owned());
219                put_string(&mut bytes, port);
220                put_string(&mut bytes, reason);
221            }
222            StreamError::GraphValidation(message) => {
223                put_u8(&mut bytes, 7);
224                put_string(&mut bytes, message);
225            }
226            StreamError::EventLimitExceeded { limit } => {
227                put_u8(&mut bytes, 8);
228                put_u64(&mut bytes, limit as u64);
229            }
230            StreamError::ActorAskTimeout { timeout } => {
231                put_u8(&mut bytes, 9);
232                put_duration(&mut bytes, timeout);
233            }
234            StreamError::ActorTerminated => put_u8(&mut bytes, 10),
235            StreamError::ActorAskResponseDropped => put_u8(&mut bytes, 11),
236            StreamError::ActorAskSendFailed { reason } => {
237                put_u8(&mut bytes, 12);
238                put_string(&mut bytes, reason);
239            }
240            StreamError::ActorAskTaskFailed { reason } => {
241                put_u8(&mut bytes, 13);
242                put_string(&mut bytes, reason);
243            }
244            StreamError::Failed(message) => {
245                put_u8(&mut bytes, 14);
246                put_string(&mut bytes, message);
247            }
248        }
249        bytes
250    }
251
252    fn from_bytes(bytes: Vec<u8>) -> Self {
253        let mut cursor = 0;
254        match take_u8(&bytes, &mut cursor) {
255            0 => StreamError::Cancelled,
256            1 => StreamError::AbruptTermination,
257            2 => StreamError::Backpressured,
258            3 => StreamError::EmptyStream,
259            4 => StreamError::MaybeIncomplete,
260            5 => StreamError::LimitExceeded {
261                max: take_u64(&bytes, &mut cursor),
262            },
263            6 => StreamError::InvalidPortOperation {
264                operation: remote_port_operation(take_string(&bytes, &mut cursor)),
265                port: take_string(&bytes, &mut cursor),
266                reason: take_string(&bytes, &mut cursor),
267            },
268            7 => StreamError::GraphValidation(take_string(&bytes, &mut cursor)),
269            8 => StreamError::EventLimitExceeded {
270                limit: take_u64(&bytes, &mut cursor) as usize,
271            },
272            9 => StreamError::ActorAskTimeout {
273                timeout: take_duration(&bytes, &mut cursor),
274            },
275            10 => StreamError::ActorTerminated,
276            11 => StreamError::ActorAskResponseDropped,
277            12 => StreamError::ActorAskSendFailed {
278                reason: take_string(&bytes, &mut cursor),
279            },
280            13 => StreamError::ActorAskTaskFailed {
281                reason: take_string(&bytes, &mut cursor),
282            },
283            14 => StreamError::Failed(take_string(&bytes, &mut cursor)),
284            _ => StreamError::Failed("invalid remote stream error encoding".to_owned()),
285        }
286    }
287}
288
289#[cfg(feature = "cluster")]
290impl BytesConvertable for RemoteRefEnvelope {
291    fn into_bytes(self) -> Vec<u8> {
292        let mut bytes = Vec::new();
293        match self {
294            Self::Ready { endpoint, settings } => {
295                put_u8(&mut bytes, 0);
296                put_bytes(&mut bytes, endpoint.into_bytes());
297                put_bytes(&mut bytes, settings.into_bytes());
298            }
299            Self::Failed(error) => {
300                put_u8(&mut bytes, 1);
301                put_bytes(&mut bytes, error.into_bytes());
302            }
303        }
304        bytes
305    }
306
307    fn from_bytes(bytes: Vec<u8>) -> Self {
308        let mut cursor = 0;
309        match take_u8(&bytes, &mut cursor) {
310            0 => {
311                let endpoint = RemoteStreamRefEndpoint::from_bytes(take_bytes(&bytes, &mut cursor));
312                let settings = StreamRefSettings::from_bytes(take_bytes(&bytes, &mut cursor));
313                Self::Ready { endpoint, settings }
314            }
315            1 => Self::Failed(StreamError::from_bytes(take_bytes(&bytes, &mut cursor))),
316            _ => Self::Failed(StreamError::Failed(
317                "invalid remote stream ref encoding".to_owned(),
318            )),
319        }
320    }
321}
322
323/// Factories for Akka-style stream references.
324///
325/// `source_ref` is a local sink that materializes a [`SourceRef`]. The returned
326/// handle can be used once to create a remote source. `sink_ref` is the dual: a
327/// local source that materializes a [`SinkRef`], which can be used once as a
328/// remote sink.
329pub struct StreamRefs;
330
331impl StreamRefs {
332    #[must_use]
333    pub fn source_ref<T>() -> Sink<T, SourceRef<T>>
334    where
335        T: Send + 'static,
336    {
337        Self::source_ref_with_settings(StreamRefSettings::default())
338    }
339
340    #[must_use]
341    pub fn source_ref_with_settings<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
342    where
343        T: Send + 'static,
344    {
345        stream_ref_source_sink(settings)
346    }
347
348    #[must_use]
349    pub fn sink_ref<T>() -> Source<T, SinkRef<T>>
350    where
351        T: Send + 'static,
352    {
353        Self::sink_ref_with_settings(StreamRefSettings::default())
354    }
355
356    #[must_use]
357    pub fn sink_ref_with_settings<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
358    where
359        T: Send + 'static,
360    {
361        stream_ref_sink_source(settings)
362    }
363}
364
365/// A handle that exposes a remote stream as a local [`Source`].
366///
367/// Stream refs are one-shot pairings. Calling [`SourceRef::source`] more than
368/// once returns a source that fails when materialized.
369pub struct SourceRef<T> {
370    inner: Arc<SourceRefInner<T>>,
371}
372
373struct SourceRefInner<T> {
374    #[allow(dead_code)]
375    producer: LazyProducerStatus<T>,
376    direct: Arc<SourceRefDirectState<T>>,
377    settings: StreamRefSettings,
378    subscribed: AtomicBool,
379    timeout: Option<Cancellable>,
380    #[cfg(feature = "cluster")]
381    remote: Option<RemoteSourceRefState>,
382}
383
384impl<T> Clone for SourceRef<T> {
385    fn clone(&self) -> Self {
386        Self {
387            inner: Arc::clone(&self.inner),
388        }
389    }
390}
391
392impl<T> fmt::Debug for SourceRef<T> {
393    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394        f.debug_struct("SourceRef").finish_non_exhaustive()
395    }
396}
397
398#[cfg(feature = "cluster")]
399impl<T> BytesConvertable for SourceRef<T>
400where
401    T: BytesConvertable + Send + 'static,
402{
403    fn into_bytes(self) -> Vec<u8> {
404        match self.activate_remote_producer() {
405            Ok(endpoint) => RemoteRefEnvelope::Ready {
406                endpoint,
407                settings: self.inner.settings,
408            },
409            Err(error) => RemoteRefEnvelope::Failed(error),
410        }
411        .into_bytes()
412    }
413
414    fn from_bytes(bytes: Vec<u8>) -> Self {
415        match RemoteRefEnvelope::from_bytes(bytes) {
416            RemoteRefEnvelope::Ready { endpoint, settings } => {
417                Self::remote(settings, RemoteSourceRefState::Ready(endpoint))
418            }
419            RemoteRefEnvelope::Failed(error) => Self::remote(
420                StreamRefSettings::default(),
421                RemoteSourceRefState::Failed(error),
422            ),
423        }
424    }
425}
426
427impl<T> SourceRef<T>
428where
429    T: stream_ref_actor_bound::Bound + Send + 'static,
430{
431    #[must_use]
432    pub fn source(&self) -> Source<T, NotUsed> {
433        let inner = Arc::clone(&self.inner);
434        Source::from_materialized_factory(move |_materializer| {
435            if inner.subscribed.swap(true, Ordering::SeqCst) {
436                return Ok((
437                    failed_once("source ref has already been materialized"),
438                    NotUsed,
439                ));
440            }
441
442            #[cfg(feature = "cluster")]
443            if let Some(remote) = &inner.remote {
444                return match remote {
445                    RemoteSourceRefState::Ready(endpoint) => {
446                        match materialize_remote_source_ref(
447                            endpoint.clone(),
448                            inner.settings,
449                            Arc::clone(&inner),
450                        ) {
451                            Ok(stream) => Ok((stream, NotUsed)),
452                            Err(error) => Ok((failed_stream(error), NotUsed)),
453                        }
454                    }
455                    RemoteSourceRefState::Failed(error) => {
456                        Ok((failed_stream(error.clone()), NotUsed))
457                    }
458                };
459            }
460
461            if let Some(timeout) = &inner.timeout {
462                timeout.cancel();
463            }
464            match inner.direct.claim_input() {
465                Ok(input) => Ok((
466                    Box::new(SourceRefDirectStream::new(
467                        input,
468                        inner.settings,
469                        Arc::clone(&inner),
470                    )) as BoxStream<T>,
471                    NotUsed,
472                )),
473                Err(error) => Ok((failed_stream(error), NotUsed)),
474            }
475        })
476    }
477}
478
479pub(super) fn proto_source<T>(source_ref: &SourceRef<T>) -> Source<T, NotUsed>
480where
481    T: Send + 'static,
482{
483    let inner = Arc::clone(&source_ref.inner);
484    Source::from_materialized_factory(move |_materializer| {
485        if inner.subscribed.swap(true, Ordering::SeqCst) {
486            return Ok((
487                failed_stream(StreamError::Failed(
488                    "source ref has already been materialized".to_owned(),
489                )),
490                NotUsed,
491            ));
492        }
493
494        if let Some(timeout) = &inner.timeout {
495            timeout.cancel();
496        }
497        match inner.direct.claim_input() {
498            Ok(input) => Ok((
499                Box::new(SourceRefDirectStream::new(
500                    input,
501                    inner.settings,
502                    Arc::clone(&inner),
503                )) as BoxStream<T>,
504                NotUsed,
505            )),
506            Err(error) => Ok((failed_stream(error), NotUsed)),
507        }
508    })
509}
510
511#[cfg(feature = "cluster")]
512impl<T> SourceRef<T>
513where
514    T: BytesConvertable + Send + 'static,
515{
516    fn remote(settings: StreamRefSettings, remote: RemoteSourceRefState) -> Self {
517        let direct = Arc::new(SourceRefDirectState::terminal(StreamError::ActorTerminated));
518        Self {
519            inner: Arc::new(SourceRefInner {
520                producer: LazyProducerStatus::new(Arc::clone(&direct)),
521                direct,
522                settings,
523                subscribed: AtomicBool::new(false),
524                timeout: None,
525                remote: Some(remote),
526            }),
527        }
528    }
529
530    fn activate_remote_producer(&self) -> StreamResult<RemoteStreamRefEndpoint> {
531        if let Some(RemoteSourceRefState::Ready(endpoint)) = &self.inner.remote {
532            return Ok(endpoint.clone());
533        }
534        if let Some(RemoteSourceRefState::Failed(error)) = &self.inner.remote {
535            return Err(error.clone());
536        }
537        if self.inner.subscribed.swap(true, Ordering::SeqCst) {
538            return Err(StreamError::Failed(
539                "source ref has already been materialized".to_owned(),
540            ));
541        }
542        if let Some(timeout) = &self.inner.timeout {
543            timeout.cancel();
544        }
545        let input = self.inner.direct.claim_input()?;
546        let shared = Arc::new(ProducerShared::new());
547        let endpoint = RemoteStreamRefEndpoint::new("source");
548        let (actor, _handle) = spawn_producer_actor(
549            None,
550            Arc::clone(&shared),
551            self.inner.settings,
552            Some(endpoint.clone()),
553        )?;
554        join_remote_endpoint(&endpoint, &actor);
555        let guard_actor = actor.clone();
556        let producer_endpoint = ProducerEndpointHandle::Actor {
557            actor,
558            shared: Arc::clone(&shared),
559            endpoint: Some(endpoint.clone()),
560        };
561        let cancelled = Arc::new(AtomicBool::new(false));
562        let settings = self.inner.settings;
563        let guard = ProducerEndpointDropGuard::new(guard_actor, None);
564        std::thread::spawn(move || {
565            let _guard = guard;
566            let _ = run_producer_endpoint(input, shared, producer_endpoint, settings, cancelled);
567        });
568        Ok(endpoint)
569    }
570}
571
572/// A handle that exposes a remote stream as a local [`Sink`].
573///
574/// Stream refs are one-shot pairings. Calling [`SinkRef::sink`] more than once
575/// returns a sink whose materialized completion fails.
576pub struct SinkRef<T> {
577    inner: Arc<SinkRefInner<T>>,
578}
579
580struct SinkRefInner<T> {
581    direct: Arc<SinkRefDirectState<T>>,
582    #[cfg(feature = "cluster")]
583    settings: StreamRefSettings,
584    subscribed: AtomicBool,
585    #[cfg(feature = "cluster")]
586    remote: Option<RemoteSinkRefState>,
587}
588
589impl<T> Clone for SinkRef<T> {
590    fn clone(&self) -> Self {
591        Self {
592            inner: Arc::clone(&self.inner),
593        }
594    }
595}
596
597impl<T> fmt::Debug for SinkRef<T> {
598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599        f.debug_struct("SinkRef").finish_non_exhaustive()
600    }
601}
602
603#[cfg(feature = "cluster")]
604impl<T> BytesConvertable for SinkRef<T>
605where
606    T: BytesConvertable + Send + 'static,
607{
608    fn into_bytes(self) -> Vec<u8> {
609        match self.activate_remote_consumer() {
610            Ok(endpoint) => RemoteRefEnvelope::Ready {
611                endpoint,
612                settings: self.inner.settings,
613            },
614            Err(error) => RemoteRefEnvelope::Failed(error),
615        }
616        .into_bytes()
617    }
618
619    fn from_bytes(bytes: Vec<u8>) -> Self {
620        match RemoteRefEnvelope::from_bytes(bytes) {
621            RemoteRefEnvelope::Ready { endpoint, settings } => {
622                Self::remote(settings, RemoteSinkRefState::Ready(endpoint))
623            }
624            RemoteRefEnvelope::Failed(error) => Self::remote(
625                StreamRefSettings::default(),
626                RemoteSinkRefState::Failed(error),
627            ),
628        }
629    }
630}
631
632impl<T> SinkRef<T>
633where
634    T: stream_ref_actor_bound::Bound + Send + 'static,
635{
636    #[must_use]
637    pub fn sink(&self) -> Sink<T, StreamCompletion<NotUsed>> {
638        let inner = Arc::clone(&self.inner);
639        Sink::from_runner(move |input, _materializer| {
640            if inner.subscribed.swap(true, Ordering::SeqCst) {
641                return Ok(StreamCompletion::ready(Err(StreamError::Failed(
642                    "sink ref has already been materialized".to_owned(),
643                ))));
644            }
645
646            #[cfg(feature = "cluster")]
647            if let Some(remote) = &inner.remote {
648                return match remote {
649                    RemoteSinkRefState::Ready(endpoint) => {
650                        materialize_remote_sink_ref(endpoint.clone(), inner.settings, input)
651                    }
652                    RemoteSinkRefState::Failed(error) => {
653                        Ok(StreamCompletion::ready(Err(error.clone())))
654                    }
655                };
656            }
657
658            Ok(inner.direct.attach_input(input))
659        })
660    }
661}
662
663#[cfg(feature = "cluster")]
664impl<T> SinkRef<T>
665where
666    T: BytesConvertable + Send + 'static,
667{
668    fn remote(settings: StreamRefSettings, remote: RemoteSinkRefState) -> Self {
669        Self {
670            inner: Arc::new(SinkRefInner {
671                direct: Arc::new(SinkRefDirectState::new()),
672                #[cfg(feature = "cluster")]
673                settings,
674                subscribed: AtomicBool::new(false),
675                remote: Some(remote),
676            }),
677        }
678    }
679
680    fn activate_remote_consumer(&self) -> StreamResult<RemoteStreamRefEndpoint> {
681        if let Some(RemoteSinkRefState::Ready(endpoint)) = &self.inner.remote {
682            return Ok(endpoint.clone());
683        }
684        if let Some(RemoteSinkRefState::Failed(error)) = &self.inner.remote {
685            return Err(error.clone());
686        }
687        if self.inner.subscribed.swap(true, Ordering::SeqCst) {
688            return Err(StreamError::Failed(
689                "sink ref has already been materialized".to_owned(),
690            ));
691        }
692
693        let shared = Arc::new(ConsumerShared::new(self.inner.settings));
694        let endpoint = RemoteStreamRefEndpoint::new("sink");
695        let (actor, _handle) = spawn_consumer_actor(
696            None,
697            Arc::clone(&shared),
698            self.inner.settings,
699            Some(endpoint.clone()),
700        )?;
701        join_remote_endpoint(&endpoint, &actor);
702        let stream = ConsumerStream {
703            shared,
704            actor_ref: Some(actor),
705            endpoint: Some(endpoint.clone()),
706            settings: self.inner.settings,
707            terminated: false,
708            source_ref_keep_alive: None,
709        };
710        self.inner.direct.attach_input_unmanaged(Box::new(stream))?;
711        Ok(endpoint)
712    }
713}
714
715#[allow(dead_code)]
716enum ProducerCommand<T> {
717    Subscribe {
718        consumer: ActorRef<ConsumerCommand<T>>,
719    },
720    #[cfg(feature = "cluster")]
721    SubscribeRemote {
722        consumer: RemoteStreamRefEndpoint,
723    },
724    Demand {
725        consumer: ActorRef<ConsumerCommand<T>>,
726        cumulative: u64,
727    },
728    #[cfg(feature = "cluster")]
729    DemandRemote {
730        consumer: RemoteStreamRefEndpoint,
731        cumulative: u64,
732    },
733    Cancel {
734        consumer: ActorRef<ConsumerCommand<T>>,
735    },
736    #[cfg(feature = "cluster")]
737    CancelRemote {
738        consumer: RemoteStreamRefEndpoint,
739    },
740    RemoteFailure {
741        consumer: ActorRef<ConsumerCommand<T>>,
742        error: StreamError,
743    },
744    #[cfg(feature = "cluster")]
745    RemoteFailureRemote {
746        consumer: RemoteStreamRefEndpoint,
747        error: StreamError,
748    },
749    Ack,
750}
751
752#[cfg(feature = "cluster")]
753impl<T> Message for ProducerCommand<T>
754where
755    T: BytesConvertable + Send + 'static,
756{
757    fn serializable() -> bool {
758        true
759    }
760
761    fn serialize(
762        self,
763    ) -> Result<ractor::message::SerializedMessage, ractor::message::BoxedDowncastErr> {
764        match self {
765            Self::SubscribeRemote { consumer } => {
766                Ok(remote_cast("SubscribeRemote", vec![consumer.into_bytes()]))
767            }
768            Self::DemandRemote {
769                consumer,
770                cumulative,
771            } => Ok(remote_cast(
772                "DemandRemote",
773                vec![consumer.into_bytes(), cumulative.into_bytes()],
774            )),
775            Self::CancelRemote { consumer } => {
776                Ok(remote_cast("CancelRemote", vec![consumer.into_bytes()]))
777            }
778            Self::RemoteFailureRemote { consumer, error } => Ok(remote_cast(
779                "RemoteFailureRemote",
780                vec![consumer.into_bytes(), error.into_bytes()],
781            )),
782            Self::Ack => Ok(remote_cast("Ack", Vec::new())),
783            Self::Subscribe { .. }
784            | Self::Demand { .. }
785            | Self::Cancel { .. }
786            | Self::RemoteFailure { .. } => Err(ractor::message::BoxedDowncastErr),
787        }
788    }
789
790    fn deserialize(
791        message: ractor::message::SerializedMessage,
792    ) -> Result<Self, ractor::message::BoxedDowncastErr> {
793        let ractor::message::SerializedMessage::Cast { variant, args, .. } = message else {
794            return Err(ractor::message::BoxedDowncastErr);
795        };
796        let mut cursor = 0;
797        match variant.as_str() {
798            "SubscribeRemote" => Ok(Self::SubscribeRemote {
799                consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
800            }),
801            "DemandRemote" => Ok(Self::DemandRemote {
802                consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
803                cumulative: u64::from_bytes(take_bytes(&args, &mut cursor)),
804            }),
805            "CancelRemote" => Ok(Self::CancelRemote {
806                consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
807            }),
808            "RemoteFailureRemote" => Ok(Self::RemoteFailureRemote {
809                consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
810                error: StreamError::from_bytes(take_bytes(&args, &mut cursor)),
811            }),
812            "Ack" => Ok(Self::Ack),
813            _ => Err(ractor::message::BoxedDowncastErr),
814        }
815    }
816}
817
818#[allow(dead_code)]
819enum ConsumerCommand<T> {
820    OnSubscribe {
821        producer: ActorRef<ProducerCommand<T>>,
822    },
823    #[cfg(feature = "cluster")]
824    OnSubscribeRemote { producer: RemoteStreamRefEndpoint },
825    Element {
826        producer: ActorRef<ProducerCommand<T>>,
827        seq: u64,
828        item: T,
829    },
830    #[cfg(feature = "cluster")]
831    ElementRemote {
832        producer: RemoteStreamRefEndpoint,
833        seq: u64,
834        item: T,
835    },
836    Complete {
837        producer: ActorRef<ProducerCommand<T>>,
838        seq: u64,
839    },
840    #[cfg(feature = "cluster")]
841    CompleteRemote {
842        producer: RemoteStreamRefEndpoint,
843        seq: u64,
844    },
845    Failure {
846        producer: ActorRef<ProducerCommand<T>>,
847        error: StreamError,
848    },
849    #[cfg(feature = "cluster")]
850    FailureRemote {
851        producer: RemoteStreamRefEndpoint,
852        error: StreamError,
853    },
854}
855
856#[cfg(feature = "cluster")]
857impl<T> Message for ConsumerCommand<T>
858where
859    T: BytesConvertable + Send + 'static,
860{
861    fn serializable() -> bool {
862        true
863    }
864
865    fn serialize(
866        self,
867    ) -> Result<ractor::message::SerializedMessage, ractor::message::BoxedDowncastErr> {
868        match self {
869            Self::OnSubscribeRemote { producer } => Ok(remote_cast(
870                "OnSubscribeRemote",
871                vec![producer.into_bytes()],
872            )),
873            Self::ElementRemote {
874                producer,
875                seq,
876                item,
877            } => Ok(remote_cast(
878                "ElementRemote",
879                vec![producer.into_bytes(), seq.into_bytes(), item.into_bytes()],
880            )),
881            Self::CompleteRemote { producer, seq } => Ok(remote_cast(
882                "CompleteRemote",
883                vec![producer.into_bytes(), seq.into_bytes()],
884            )),
885            Self::FailureRemote { producer, error } => Ok(remote_cast(
886                "FailureRemote",
887                vec![producer.into_bytes(), error.into_bytes()],
888            )),
889            Self::OnSubscribe { .. }
890            | Self::Element { .. }
891            | Self::Complete { .. }
892            | Self::Failure { .. } => Err(ractor::message::BoxedDowncastErr),
893        }
894    }
895
896    fn deserialize(
897        message: ractor::message::SerializedMessage,
898    ) -> Result<Self, ractor::message::BoxedDowncastErr> {
899        let ractor::message::SerializedMessage::Cast { variant, args, .. } = message else {
900            return Err(ractor::message::BoxedDowncastErr);
901        };
902        let mut cursor = 0;
903        match variant.as_str() {
904            "OnSubscribeRemote" => Ok(Self::OnSubscribeRemote {
905                producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
906            }),
907            "ElementRemote" => Ok(Self::ElementRemote {
908                producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
909                seq: u64::from_bytes(take_bytes(&args, &mut cursor)),
910                item: T::from_bytes(take_bytes(&args, &mut cursor)),
911            }),
912            "CompleteRemote" => Ok(Self::CompleteRemote {
913                producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
914                seq: u64::from_bytes(take_bytes(&args, &mut cursor)),
915            }),
916            "FailureRemote" => Ok(Self::FailureRemote {
917                producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
918                error: StreamError::from_bytes(take_bytes(&args, &mut cursor)),
919            }),
920            _ => Err(ractor::message::BoxedDowncastErr),
921        }
922    }
923}
924
925#[allow(dead_code)]
926enum ConsumerEndpoint<T> {
927    Actor(ActorRef<ConsumerCommand<T>>),
928    Direct(Weak<ConsumerShared<T>>),
929}
930
931impl<T> Clone for ConsumerEndpoint<T> {
932    fn clone(&self) -> Self {
933        match self {
934            Self::Actor(actor) => Self::Actor(actor.clone()),
935            Self::Direct(shared) => Self::Direct(Weak::clone(shared)),
936        }
937    }
938}
939
940#[allow(dead_code)]
941enum ProducerEndpoint<T> {
942    Actor(ActorRef<ProducerCommand<T>>),
943    Direct(Weak<ProducerShared<T>>),
944}
945
946impl<T> Clone for ProducerEndpoint<T> {
947    fn clone(&self) -> Self {
948        match self {
949            Self::Actor(actor) => Self::Actor(actor.clone()),
950            Self::Direct(shared) => Self::Direct(Weak::clone(shared)),
951        }
952    }
953}
954
955#[allow(dead_code)]
956enum ProducerEndpointHandle<T> {
957    Actor {
958        actor: ActorRef<ProducerCommand<T>>,
959        shared: Arc<ProducerShared<T>>,
960        #[cfg(feature = "cluster")]
961        endpoint: Option<RemoteStreamRefEndpoint>,
962    },
963    Direct(Arc<ProducerShared<T>>),
964}
965
966impl<T> ProducerEndpointHandle<T>
967where
968    T: stream_ref_actor_bound::Bound + Send + 'static,
969{
970    fn stop_actor(&self) {
971        if let Self::Actor { actor, .. } = self {
972            actor.stop(None);
973        }
974    }
975
976    fn actor_ref(&self) -> Option<ActorRef<ProducerCommand<T>>> {
977        match self {
978            Self::Actor { actor, .. } => Some(actor.clone()),
979            Self::Direct(_) => None,
980        }
981    }
982
983    #[cfg(feature = "cluster")]
984    fn endpoint(&self) -> Option<RemoteStreamRefEndpoint> {
985        match self {
986            Self::Actor { endpoint, .. } => endpoint.clone(),
987            Self::Direct(_) => None,
988        }
989    }
990
991    fn direct_shared(&self) -> Option<Arc<ProducerShared<T>>> {
992        match self {
993            Self::Actor { shared, .. } | Self::Direct(shared) => Some(Arc::clone(shared)),
994        }
995    }
996}
997
998/// Drop‑guard that stops the producer actor and sets a cancellation flag
999/// if the owning thread panics before completing normally.  Without this,
1000/// a panic in a spawned `run_producer_endpoint` thread would leave the
1001/// consumer side hanging forever.
1002#[cfg(feature = "cluster")]
1003struct ProducerEndpointDropGuard<T: stream_ref_actor_bound::Bound> {
1004    actor: Option<ActorRef<ProducerCommand<T>>>,
1005    cancelled: Option<Arc<AtomicBool>>,
1006}
1007
1008#[cfg(feature = "cluster")]
1009impl<T: stream_ref_actor_bound::Bound> ProducerEndpointDropGuard<T> {
1010    fn new(actor: ActorRef<ProducerCommand<T>>, cancelled: Option<Arc<AtomicBool>>) -> Self {
1011        Self {
1012            actor: Some(actor),
1013            cancelled,
1014        }
1015    }
1016}
1017
1018#[cfg(feature = "cluster")]
1019impl<T: stream_ref_actor_bound::Bound> Drop for ProducerEndpointDropGuard<T> {
1020    fn drop(&mut self) {
1021        if let Some(actor) = self.actor.take() {
1022            actor.stop(None);
1023        }
1024        if let Some(cancelled) = self.cancelled.as_ref() {
1025            cancelled.store(true, Ordering::SeqCst);
1026        }
1027    }
1028}
1029
1030#[allow(dead_code)]
1031struct LazyProducerStatus<T> {
1032    direct: Arc<SourceRefDirectState<T>>,
1033}
1034
1035#[allow(dead_code)]
1036impl<T> LazyProducerStatus<T>
1037where
1038    T: Send + 'static,
1039{
1040    fn new(direct: Arc<SourceRefDirectState<T>>) -> Self {
1041        Self { direct }
1042    }
1043
1044    fn get_status(&self) -> ractor::ActorStatus {
1045        self.direct.status()
1046    }
1047}
1048
1049struct SourceRefDirectState<T> {
1050    inner: Mutex<SourceRefDirectInner<T>>,
1051}
1052
1053struct SourceRefDirectInner<T> {
1054    input: Option<BoxStream<T>>,
1055    terminal: Option<StreamError>,
1056    subscribed: bool,
1057}
1058
1059impl<T> SourceRefDirectState<T>
1060where
1061    T: Send + 'static,
1062{
1063    fn new(input: BoxStream<T>) -> Self {
1064        Self {
1065            inner: Mutex::new(SourceRefDirectInner {
1066                input: Some(input),
1067                terminal: None,
1068                subscribed: false,
1069            }),
1070        }
1071    }
1072
1073    #[cfg(feature = "cluster")]
1074    fn terminal(error: StreamError) -> Self {
1075        Self {
1076            inner: Mutex::new(SourceRefDirectInner {
1077                input: None,
1078                terminal: Some(error),
1079                subscribed: true,
1080            }),
1081        }
1082    }
1083
1084    fn lock(&self) -> MutexGuard<'_, SourceRefDirectInner<T>> {
1085        self.inner
1086            .lock()
1087            .unwrap_or_else(|poison| poison.into_inner())
1088    }
1089
1090    fn claim_input(&self) -> StreamResult<BoxStream<T>> {
1091        let mut inner = self.lock();
1092        inner.subscribed = true;
1093        if let Some(error) = inner.terminal.clone() {
1094            return Err(error);
1095        }
1096        inner.input.take().ok_or(StreamError::ActorTerminated)
1097    }
1098
1099    fn timeout_if_unsubscribed(&self) {
1100        let mut inner = self.lock();
1101        if !inner.subscribed && inner.terminal.is_none() {
1102            let input = inner.input.take();
1103            inner.terminal = Some(subscription_timeout_error("stream ref sink"));
1104            drop(inner);
1105            drop(input);
1106        }
1107    }
1108
1109    #[allow(dead_code)]
1110    fn status(&self) -> ractor::ActorStatus {
1111        let inner = self.lock();
1112        if inner.terminal.is_some() || inner.input.is_none() {
1113            ractor::ActorStatus::Stopped
1114        } else {
1115            ractor::ActorStatus::Running
1116        }
1117    }
1118}
1119
1120struct SourceRefDirectStream<T>
1121where
1122    T: Send + 'static,
1123{
1124    input: Option<BoxStream<T>>,
1125    queue: VecDeque<T>,
1126    terminal: Option<ConsumerTerminal>,
1127    settings: StreamRefSettings,
1128    terminated: bool,
1129    _keep_alive: Arc<SourceRefInner<T>>,
1130}
1131
1132impl<T> SourceRefDirectStream<T>
1133where
1134    T: Send + 'static,
1135{
1136    fn new(
1137        input: BoxStream<T>,
1138        settings: StreamRefSettings,
1139        keep_alive: Arc<SourceRefInner<T>>,
1140    ) -> Self {
1141        Self {
1142            input: Some(input),
1143            queue: VecDeque::new(),
1144            terminal: None,
1145            settings,
1146            terminated: false,
1147            _keep_alive: keep_alive,
1148        }
1149    }
1150
1151    fn fill_prefetch(&mut self) {
1152        while self.terminal.is_none() && self.queue.len() < self.settings.buffer_capacity {
1153            let Some(input) = self.input.as_mut() else {
1154                self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
1155                break;
1156            };
1157            match input.next() {
1158                Some(Ok(item)) => self.queue.push_back(item),
1159                Some(Err(error)) => {
1160                    self.input.take();
1161                    self.terminal = Some(ConsumerTerminal::Error(error));
1162                }
1163                None => {
1164                    self.input.take();
1165                    self.terminal = Some(ConsumerTerminal::Complete);
1166                }
1167            }
1168        }
1169    }
1170}
1171
1172impl<T> Iterator for SourceRefDirectStream<T>
1173where
1174    T: Send + 'static,
1175{
1176    type Item = StreamResult<T>;
1177
1178    fn next(&mut self) -> Option<Self::Item> {
1179        if self.terminated {
1180            return None;
1181        }
1182
1183        self.fill_prefetch();
1184        if let Some(item) = self.queue.pop_front() {
1185            self.fill_prefetch();
1186            return Some(Ok(item));
1187        }
1188
1189        match self.terminal.clone() {
1190            Some(ConsumerTerminal::Complete) => {
1191                self.terminated = true;
1192                None
1193            }
1194            Some(ConsumerTerminal::Error(error)) => {
1195                self.terminated = true;
1196                Some(Err(error))
1197            }
1198            None => None,
1199        }
1200    }
1201}
1202
1203impl<T> Drop for SourceRefDirectStream<T>
1204where
1205    T: Send + 'static,
1206{
1207    fn drop(&mut self) {
1208        self.input.take();
1209        self.queue.clear();
1210    }
1211}
1212
1213struct SinkRefDirectState<T> {
1214    inner: Mutex<SinkRefDirectInner<T>>,
1215    changed: Condvar,
1216}
1217
1218struct SinkRefDirectInner<T> {
1219    input: Option<BoxStream<T>>,
1220    completion: Option<oneshot::Sender<StreamResult<NotUsed>>>,
1221    completion_cancelled: Option<Arc<AtomicBool>>,
1222    terminal: Option<StreamError>,
1223    subscribed: bool,
1224}
1225
1226impl<T> SinkRefDirectState<T>
1227where
1228    T: Send + 'static,
1229{
1230    fn new() -> Self {
1231        Self {
1232            inner: Mutex::new(SinkRefDirectInner {
1233                input: None,
1234                completion: None,
1235                completion_cancelled: None,
1236                terminal: None,
1237                subscribed: false,
1238            }),
1239            changed: Condvar::new(),
1240        }
1241    }
1242
1243    fn lock(&self) -> MutexGuard<'_, SinkRefDirectInner<T>> {
1244        self.inner
1245            .lock()
1246            .unwrap_or_else(|poison| poison.into_inner())
1247    }
1248
1249    fn attach_input(&self, input: BoxStream<T>) -> StreamCompletion<NotUsed> {
1250        let mut inner = self.lock();
1251        if let Some(error) = inner.terminal.clone() {
1252            return StreamCompletion::ready(Err(error));
1253        }
1254        if inner.subscribed {
1255            return StreamCompletion::ready(Err(StreamError::Failed(
1256                "stream ref was already subscribed by another endpoint".to_owned(),
1257            )));
1258        }
1259
1260        let (sender, receiver) = oneshot::channel();
1261        let cancellation = StreamCancellation::for_external_completion();
1262        inner.input = Some(input);
1263        inner.completion = Some(sender);
1264        inner.completion_cancelled = Some(cancellation.cancelled());
1265        inner.subscribed = true;
1266        drop(inner);
1267        self.changed.notify_all();
1268        StreamCompletion::from_receiver(receiver, Some(cancellation))
1269    }
1270
1271    #[cfg(feature = "cluster")]
1272    fn attach_input_unmanaged(&self, input: BoxStream<T>) -> StreamResult<()> {
1273        let mut inner = self.lock();
1274        if let Some(error) = inner.terminal.clone() {
1275            return Err(error);
1276        }
1277        if inner.subscribed {
1278            return Err(StreamError::Failed(
1279                "stream ref was already subscribed by another endpoint".to_owned(),
1280            ));
1281        }
1282
1283        inner.input = Some(input);
1284        inner.completion = None;
1285        inner.completion_cancelled = None;
1286        inner.subscribed = true;
1287        drop(inner);
1288        self.changed.notify_all();
1289        Ok(())
1290    }
1291
1292    fn wait_for_input(&self, settings: StreamRefSettings) -> StreamResult<BoxStream<T>> {
1293        let deadline = Instant::now()
1294            .checked_add(settings.subscription_timeout)
1295            .unwrap_or_else(far_future);
1296        let mut inner = self.lock();
1297        loop {
1298            if let Some(cancelled) = &inner.completion_cancelled
1299                && cancelled.load(Ordering::SeqCst)
1300            {
1301                let input = inner.input.take();
1302                inner.terminal = Some(StreamError::Cancelled);
1303                inner.completion.take();
1304                inner.completion_cancelled = None;
1305                drop(inner);
1306                drop(input);
1307                self.changed.notify_all();
1308                return Err(StreamError::Cancelled);
1309            }
1310            if let Some(input) = inner.input.take() {
1311                return Ok(input);
1312            }
1313            if let Some(error) = inner.terminal.clone() {
1314                return Err(error);
1315            }
1316            let now = Instant::now();
1317            if now >= deadline {
1318                let error = subscription_timeout_error("stream ref source");
1319                inner.terminal = Some(error.clone());
1320                if let Some(sender) = inner.completion.take() {
1321                    let _ = sender.send(Err(error.clone()));
1322                }
1323                return Err(error);
1324            }
1325            let remaining = deadline.saturating_duration_since(now);
1326            let (next, _) =
1327                wait_timeout_unpoison(&self.changed, inner, remaining.min(STREAM_REF_WAIT_POLL));
1328            inner = next;
1329        }
1330    }
1331
1332    fn settle(&self, result: StreamResult<NotUsed>) {
1333        let mut inner = self.lock();
1334        if let Some(sender) = inner.completion.take() {
1335            let _ = sender.send(result);
1336        }
1337        inner.completion_cancelled = None;
1338        drop(inner);
1339        self.changed.notify_all();
1340    }
1341
1342    fn fail_unattached(&self, error: StreamError) {
1343        let mut inner = self.lock();
1344        if inner.terminal.is_none() {
1345            let input = inner.input.take();
1346            inner.terminal = Some(error.clone());
1347            if let Some(sender) = inner.completion.take() {
1348                let _ = sender.send(Err(error));
1349            }
1350            inner.completion_cancelled = None;
1351            drop(inner);
1352            drop(input);
1353            self.changed.notify_all();
1354        }
1355    }
1356
1357    fn is_completion_cancelled(&self) -> bool {
1358        self.lock()
1359            .completion_cancelled
1360            .as_ref()
1361            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
1362    }
1363}
1364
1365struct SinkRefDirectStream<T>
1366where
1367    T: Send + 'static,
1368{
1369    state: Arc<SinkRefDirectState<T>>,
1370    input: Option<BoxStream<T>>,
1371    queue: VecDeque<T>,
1372    terminal: Option<ConsumerTerminal>,
1373    settings: StreamRefSettings,
1374    terminated: bool,
1375}
1376
1377impl<T> SinkRefDirectStream<T>
1378where
1379    T: Send + 'static,
1380{
1381    fn new(state: Arc<SinkRefDirectState<T>>, settings: StreamRefSettings) -> Self {
1382        Self {
1383            state,
1384            input: None,
1385            queue: VecDeque::new(),
1386            terminal: None,
1387            settings,
1388            terminated: false,
1389        }
1390    }
1391
1392    fn ensure_input(&mut self) -> StreamResult<()> {
1393        if self.input.is_none() && self.terminal.is_none() {
1394            self.input = Some(self.state.wait_for_input(self.settings)?);
1395        }
1396        Ok(())
1397    }
1398
1399    fn fill_prefetch(&mut self) -> StreamResult<()> {
1400        self.ensure_input()?;
1401        while self.terminal.is_none() && self.queue.len() < self.settings.buffer_capacity {
1402            if self.state.is_completion_cancelled() {
1403                self.input.take();
1404                self.state.settle(Err(StreamError::Cancelled));
1405                self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
1406                break;
1407            }
1408            let Some(input) = self.input.as_mut() else {
1409                self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
1410                break;
1411            };
1412            match input.next() {
1413                Some(Ok(item)) => self.queue.push_back(item),
1414                Some(Err(error)) => {
1415                    self.input.take();
1416                    self.state.settle(Err(error.clone()));
1417                    self.terminal = Some(ConsumerTerminal::Error(error));
1418                }
1419                None => {
1420                    self.input.take();
1421                    self.state.settle(Ok(NotUsed));
1422                    self.terminal = Some(ConsumerTerminal::Complete);
1423                }
1424            }
1425        }
1426        Ok(())
1427    }
1428}
1429
1430impl<T> Iterator for SinkRefDirectStream<T>
1431where
1432    T: Send + 'static,
1433{
1434    type Item = StreamResult<T>;
1435
1436    fn next(&mut self) -> Option<Self::Item> {
1437        if self.terminated {
1438            return None;
1439        }
1440
1441        if let Err(error) = self.fill_prefetch() {
1442            self.terminated = true;
1443            return Some(Err(error));
1444        }
1445        if let Some(item) = self.queue.pop_front() {
1446            if let Err(error) = self.fill_prefetch() {
1447                self.terminal = Some(ConsumerTerminal::Error(error));
1448            }
1449            return Some(Ok(item));
1450        }
1451
1452        match self.terminal.clone() {
1453            Some(ConsumerTerminal::Complete) => {
1454                self.terminated = true;
1455                None
1456            }
1457            Some(ConsumerTerminal::Error(error)) => {
1458                self.terminated = true;
1459                Some(Err(error))
1460            }
1461            None => None,
1462        }
1463    }
1464}
1465
1466impl<T> Drop for SinkRefDirectStream<T>
1467where
1468    T: Send + 'static,
1469{
1470    fn drop(&mut self) {
1471        if !self.terminated {
1472            self.input.take();
1473            self.queue.clear();
1474            self.state.fail_unattached(StreamError::Cancelled);
1475            self.state.settle(Err(StreamError::Cancelled));
1476        }
1477    }
1478}
1479
1480#[allow(dead_code)]
1481struct ProducerShared<T> {
1482    inner: Mutex<ProducerInner<T>>,
1483    changed: Condvar,
1484}
1485
1486#[allow(dead_code)]
1487struct ProducerInner<T> {
1488    consumer: Option<ConsumerEndpoint<T>>,
1489    cumulative_demand: u64,
1490    sent: u64,
1491    stopped: Option<StreamError>,
1492}
1493
1494#[allow(dead_code)]
1495impl<T> ProducerShared<T>
1496where
1497    T: stream_ref_actor_bound::Bound + Send + 'static,
1498{
1499    fn new() -> Self {
1500        Self {
1501            inner: Mutex::new(ProducerInner {
1502                consumer: None,
1503                cumulative_demand: 0,
1504                sent: 0,
1505                stopped: None,
1506            }),
1507            changed: Condvar::new(),
1508        }
1509    }
1510
1511    fn lock(&self) -> MutexGuard<'_, ProducerInner<T>> {
1512        self.inner
1513            .lock()
1514            .unwrap_or_else(|poison| poison.into_inner())
1515    }
1516
1517    fn set_consumer(
1518        &self,
1519        consumer: ActorRef<ConsumerCommand<T>>,
1520    ) -> Result<bool, ActorRef<ConsumerCommand<T>>> {
1521        let mut inner = self.lock();
1522        match &inner.consumer {
1523            Some(ConsumerEndpoint::Actor(existing)) if same_actor(existing, &consumer) => Ok(false),
1524            Some(_) => Err(consumer),
1525            None if inner.stopped.is_some() => Err(consumer),
1526            None => {
1527                inner.consumer = Some(ConsumerEndpoint::Actor(consumer));
1528                drop(inner);
1529                self.changed.notify_all();
1530                Ok(true)
1531            }
1532        }
1533    }
1534
1535    fn set_direct_consumer(&self, consumer: &Arc<ConsumerShared<T>>) -> StreamResult<bool> {
1536        let mut inner = self.lock();
1537        match &inner.consumer {
1538            Some(ConsumerEndpoint::Direct(existing))
1539                if existing.ptr_eq(&Arc::downgrade(consumer)) =>
1540            {
1541                Ok(false)
1542            }
1543            Some(_) => Err(StreamError::Failed(
1544                "stream ref was already subscribed by another endpoint".to_owned(),
1545            )),
1546            None => {
1547                if let Some(error) = inner.stopped.clone() {
1548                    return Err(error);
1549                }
1550                inner.consumer = Some(ConsumerEndpoint::Direct(Arc::downgrade(consumer)));
1551                drop(inner);
1552                self.changed.notify_all();
1553                Ok(true)
1554            }
1555        }
1556    }
1557
1558    fn update_demand(&self, consumer: &ActorRef<ConsumerCommand<T>>, cumulative: u64) {
1559        self.update_demand_if(cumulative, |existing| {
1560            matches!(existing, ConsumerEndpoint::Actor(actor) if same_actor(actor, consumer))
1561        });
1562    }
1563
1564    fn update_direct_demand(&self, consumer: &Arc<ConsumerShared<T>>, cumulative: u64) {
1565        let consumer = Arc::downgrade(consumer);
1566        self.update_demand_if(cumulative, |existing| {
1567            matches!(existing, ConsumerEndpoint::Direct(shared) if shared.ptr_eq(&consumer))
1568        });
1569    }
1570
1571    fn update_demand_if(
1572        &self,
1573        cumulative: u64,
1574        matches_consumer: impl FnOnce(&ConsumerEndpoint<T>) -> bool,
1575    ) {
1576        let mut inner = self.lock();
1577        if inner.consumer.as_ref().is_some_and(matches_consumer)
1578            && cumulative > inner.cumulative_demand
1579        {
1580            inner.cumulative_demand = cumulative;
1581            drop(inner);
1582            self.changed.notify_all();
1583        }
1584    }
1585
1586    fn stop_from_consumer(&self, consumer: &ActorRef<ConsumerCommand<T>>, error: StreamError) {
1587        self.stop_from_consumer_if(error, |existing| {
1588            matches!(existing, ConsumerEndpoint::Actor(actor) if same_actor(actor, consumer))
1589        });
1590    }
1591
1592    fn stop_from_direct_consumer(&self, consumer: &Arc<ConsumerShared<T>>, error: StreamError) {
1593        let consumer = Arc::downgrade(consumer);
1594        self.stop_from_consumer_if(error, |existing| {
1595            matches!(existing, ConsumerEndpoint::Direct(shared) if shared.ptr_eq(&consumer))
1596        });
1597    }
1598
1599    fn stop_from_consumer_if(
1600        &self,
1601        error: StreamError,
1602        matches_consumer: impl FnOnce(&ConsumerEndpoint<T>) -> bool,
1603    ) {
1604        let mut inner = self.lock();
1605        if inner.consumer.as_ref().is_none_or(matches_consumer) && inner.stopped.is_none() {
1606            inner.stopped = Some(error);
1607            drop(inner);
1608            self.changed.notify_all();
1609        }
1610    }
1611
1612    fn stop_unless_finished(&self, error: StreamError) {
1613        let mut inner = self.lock();
1614        if inner.stopped.is_none() {
1615            inner.stopped = Some(error);
1616            drop(inner);
1617            self.changed.notify_all();
1618        }
1619    }
1620}
1621
1622#[allow(dead_code)]
1623struct ConsumerShared<T> {
1624    inner: Mutex<ConsumerInner<T>>,
1625    changed: Condvar,
1626}
1627
1628#[allow(dead_code)]
1629struct ConsumerInner<T> {
1630    producer: Option<ProducerEndpoint<T>>,
1631    queue: VecDeque<T>,
1632    terminal: Option<ConsumerTerminal>,
1633    expected_seq: u64,
1634    delivered: u64,
1635    cumulative_demand: u64,
1636}
1637
1638#[derive(Clone)]
1639enum ConsumerTerminal {
1640    Complete,
1641    Error(StreamError),
1642}
1643
1644#[allow(dead_code)]
1645impl<T> ConsumerShared<T>
1646where
1647    T: stream_ref_actor_bound::Bound + Send + 'static,
1648{
1649    fn new(_settings: StreamRefSettings) -> Self {
1650        Self {
1651            inner: Mutex::new(ConsumerInner {
1652                producer: None,
1653                queue: VecDeque::new(),
1654                terminal: None,
1655                expected_seq: 0,
1656                delivered: 0,
1657                cumulative_demand: 0,
1658            }),
1659            changed: Condvar::new(),
1660        }
1661    }
1662
1663    fn lock(&self) -> MutexGuard<'_, ConsumerInner<T>> {
1664        self.inner
1665            .lock()
1666            .unwrap_or_else(|poison| poison.into_inner())
1667    }
1668
1669    fn set_producer(&self, producer: ActorRef<ProducerCommand<T>>) -> bool {
1670        let mut inner = self.lock();
1671        match &inner.producer {
1672            Some(ProducerEndpoint::Actor(existing)) => same_actor(existing, &producer),
1673            Some(ProducerEndpoint::Direct(_)) => false,
1674            None => {
1675                inner.producer = Some(ProducerEndpoint::Actor(producer));
1676                drop(inner);
1677                self.changed.notify_all();
1678                true
1679            }
1680        }
1681    }
1682
1683    fn set_direct_producer(&self, producer: &Arc<ProducerShared<T>>) -> StreamResult<bool> {
1684        let mut inner = self.lock();
1685        match &inner.producer {
1686            Some(ProducerEndpoint::Direct(existing))
1687                if existing.ptr_eq(&Arc::downgrade(producer)) =>
1688            {
1689                Ok(false)
1690            }
1691            Some(_) => Err(StreamError::Failed(
1692                "stream ref was already subscribed by another endpoint".to_owned(),
1693            )),
1694            None => {
1695                if let Some(terminal) = inner.terminal.clone() {
1696                    return match terminal {
1697                        ConsumerTerminal::Complete => Err(StreamError::ActorTerminated),
1698                        ConsumerTerminal::Error(error) => Err(error),
1699                    };
1700                }
1701                inner.producer = Some(ProducerEndpoint::Direct(Arc::downgrade(producer)));
1702                drop(inner);
1703                self.changed.notify_all();
1704                Ok(true)
1705            }
1706        }
1707    }
1708
1709    fn push(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64, item: T) {
1710        self.push_if(seq, item, |inner| producer_matches_actor(inner, producer));
1711    }
1712
1713    fn push_direct(&self, producer: &Arc<ProducerShared<T>>, seq: u64, item: T) {
1714        let producer = Arc::downgrade(producer);
1715        self.push_if(seq, item, |inner| producer_matches_direct(inner, &producer));
1716    }
1717
1718    fn push_if(&self, seq: u64, item: T, matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool) {
1719        let mut inner = self.lock();
1720        if inner.terminal.is_some() || !matches_producer(&inner) {
1721            return;
1722        }
1723        let should_notify;
1724        if seq != inner.expected_seq {
1725            inner.queue.clear();
1726            inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
1727                inner.expected_seq,
1728                seq,
1729                "stream ref element sequence gap",
1730            )));
1731            should_notify = true;
1732        } else {
1733            should_notify = inner.queue.is_empty();
1734            inner.expected_seq += 1;
1735            inner.queue.push_back(item);
1736        }
1737        drop(inner);
1738        if should_notify {
1739            self.changed.notify_all();
1740        }
1741    }
1742
1743    fn complete(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64) {
1744        self.complete_if(seq, |inner| producer_matches_actor(inner, producer));
1745    }
1746
1747    fn complete_direct(&self, producer: &Arc<ProducerShared<T>>, seq: u64) {
1748        let producer = Arc::downgrade(producer);
1749        self.complete_if(seq, |inner| producer_matches_direct(inner, &producer));
1750    }
1751
1752    fn complete_if(&self, seq: u64, matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool) {
1753        let mut inner = self.lock();
1754        if inner.terminal.is_some() || !matches_producer(&inner) {
1755            return;
1756        }
1757        if seq != inner.expected_seq {
1758            inner.queue.clear();
1759            inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
1760                inner.expected_seq,
1761                seq,
1762                "stream ref completion sequence gap",
1763            )));
1764        } else {
1765            inner.terminal = Some(ConsumerTerminal::Complete);
1766        }
1767        drop(inner);
1768        self.changed.notify_all();
1769    }
1770
1771    fn fail(&self, producer: &ActorRef<ProducerCommand<T>>, error: StreamError) {
1772        self.fail_if(error, |inner| producer_matches_actor(inner, producer));
1773    }
1774
1775    fn fail_direct(&self, producer: &Arc<ProducerShared<T>>, error: StreamError) {
1776        let producer = Arc::downgrade(producer);
1777        self.fail_if(error, |inner| producer_matches_direct(inner, &producer));
1778    }
1779
1780    fn fail_if(
1781        &self,
1782        error: StreamError,
1783        matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool,
1784    ) {
1785        let mut inner = self.lock();
1786        if inner.terminal.is_some() || !matches_producer(&inner) {
1787            return;
1788        }
1789        inner.queue.clear();
1790        inner.terminal = Some(ConsumerTerminal::Error(error));
1791        drop(inner);
1792        self.changed.notify_all();
1793    }
1794
1795    fn fail_local(&self, error: StreamError) {
1796        let mut inner = self.lock();
1797        if inner.terminal.is_none() {
1798            inner.queue.clear();
1799            inner.terminal = Some(ConsumerTerminal::Error(error));
1800            drop(inner);
1801            self.changed.notify_all();
1802        }
1803    }
1804}
1805
1806#[allow(dead_code)]
1807fn producer_matches_actor<T>(
1808    inner: &ConsumerInner<T>,
1809    producer: &ActorRef<ProducerCommand<T>>,
1810) -> bool
1811where
1812    T: stream_ref_actor_bound::Bound + Send + 'static,
1813{
1814    matches!(
1815        inner.producer.as_ref(),
1816        Some(ProducerEndpoint::Actor(existing)) if same_actor(existing, producer)
1817    )
1818}
1819
1820#[allow(dead_code)]
1821fn producer_matches_direct<T>(inner: &ConsumerInner<T>, producer: &Weak<ProducerShared<T>>) -> bool
1822where
1823    T: stream_ref_actor_bound::Bound + Send + 'static,
1824{
1825    matches!(
1826        inner.producer.as_ref(),
1827        Some(ProducerEndpoint::Direct(existing)) if existing.ptr_eq(producer)
1828    )
1829}
1830
1831#[allow(dead_code)]
1832struct ProducerActor<T> {
1833    shared: Arc<ProducerShared<T>>,
1834    initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
1835    settings: StreamRefSettings,
1836    #[cfg(feature = "cluster")]
1837    endpoint: Option<RemoteStreamRefEndpoint>,
1838}
1839
1840impl<T> Actor for ProducerActor<T>
1841where
1842    T: stream_ref_actor_bound::Bound + Send + 'static,
1843{
1844    type Msg = ProducerCommand<T>;
1845    type State = ();
1846    type Arguments = ();
1847
1848    async fn pre_start(
1849        &self,
1850        myself: ActorRef<Self::Msg>,
1851        _args: Self::Arguments,
1852    ) -> Result<Self::State, ActorProcessingErr> {
1853        if let Some(consumer) = &self.initial_consumer {
1854            register_producer_consumer(
1855                &self.shared,
1856                myself,
1857                consumer.clone(),
1858                #[cfg(feature = "cluster")]
1859                self.endpoint.as_ref(),
1860            );
1861        }
1862        Ok(())
1863    }
1864
1865    async fn handle(
1866        &self,
1867        myself: ActorRef<Self::Msg>,
1868        message: Self::Msg,
1869        _state: &mut Self::State,
1870    ) -> ActorResult {
1871        match message {
1872            ProducerCommand::Subscribe { consumer } => {
1873                register_producer_consumer(
1874                    &self.shared,
1875                    myself,
1876                    consumer,
1877                    #[cfg(feature = "cluster")]
1878                    self.endpoint.as_ref(),
1879                );
1880            }
1881            #[cfg(feature = "cluster")]
1882            ProducerCommand::SubscribeRemote { consumer } => {
1883                match resolve_remote_actor(&consumer, self.settings.subscription_timeout) {
1884                    Ok(consumer) => register_producer_consumer(
1885                        &self.shared,
1886                        myself,
1887                        consumer,
1888                        self.endpoint.as_ref(),
1889                    ),
1890                    Err(error) => self.shared.stop_unless_finished(error),
1891                }
1892            }
1893            ProducerCommand::Demand {
1894                consumer,
1895                cumulative,
1896            } => self.shared.update_demand(&consumer, cumulative),
1897            #[cfg(feature = "cluster")]
1898            ProducerCommand::DemandRemote {
1899                consumer,
1900                cumulative,
1901            } => {
1902                if let Ok(consumer) =
1903                    resolve_remote_actor(&consumer, self.settings.subscription_timeout)
1904                {
1905                    self.shared.update_demand(&consumer, cumulative);
1906                }
1907            }
1908            ProducerCommand::Cancel { consumer } => {
1909                self.shared
1910                    .stop_from_consumer(&consumer, StreamError::Cancelled);
1911            }
1912            #[cfg(feature = "cluster")]
1913            ProducerCommand::CancelRemote { consumer } => {
1914                if let Ok(consumer) =
1915                    resolve_remote_actor(&consumer, self.settings.subscription_timeout)
1916                {
1917                    self.shared
1918                        .stop_from_consumer(&consumer, StreamError::Cancelled);
1919                }
1920            }
1921            ProducerCommand::RemoteFailure { consumer, error } => {
1922                self.shared.stop_from_consumer(&consumer, error);
1923            }
1924            #[cfg(feature = "cluster")]
1925            ProducerCommand::RemoteFailureRemote { consumer, error } => {
1926                if let Ok(consumer) =
1927                    resolve_remote_actor(&consumer, self.settings.subscription_timeout)
1928                {
1929                    self.shared.stop_from_consumer(&consumer, error);
1930                }
1931            }
1932            ProducerCommand::Ack => myself.stop(None),
1933        }
1934        Ok(())
1935    }
1936
1937    async fn post_stop(
1938        &self,
1939        _myself: ActorRef<Self::Msg>,
1940        _state: &mut Self::State,
1941    ) -> ActorResult {
1942        self.shared
1943            .stop_unless_finished(StreamError::ActorTerminated);
1944        Ok(())
1945    }
1946}
1947
1948#[allow(dead_code)]
1949fn register_producer_consumer<T>(
1950    shared: &Arc<ProducerShared<T>>,
1951    producer: ActorRef<ProducerCommand<T>>,
1952    consumer: ActorRef<ConsumerCommand<T>>,
1953    #[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
1954) where
1955    T: stream_ref_actor_bound::Bound + Send + 'static,
1956{
1957    match shared.set_consumer(consumer.clone()) {
1958        Ok(true) | Ok(false) => {
1959            let _ = cast_consumer_on_subscribe(
1960                &consumer,
1961                producer.clone(),
1962                #[cfg(feature = "cluster")]
1963                producer_endpoint,
1964            );
1965        }
1966        Err(duplicate) => {
1967            let _ = cast_consumer_failure(
1968                &duplicate,
1969                producer,
1970                #[cfg(feature = "cluster")]
1971                producer_endpoint,
1972                StreamError::Failed(
1973                    "stream ref was already subscribed by another endpoint".to_owned(),
1974                ),
1975            );
1976        }
1977    }
1978}
1979
1980#[allow(dead_code)]
1981fn register_direct_producer_consumer<T>(
1982    producer: &Arc<ProducerShared<T>>,
1983    consumer: &Arc<ConsumerShared<T>>,
1984) -> StreamResult<()>
1985where
1986    T: stream_ref_actor_bound::Bound + Send + 'static,
1987{
1988    consumer.set_direct_producer(producer)?;
1989    if let Err(error) = producer.set_direct_consumer(consumer) {
1990        consumer.fail_local(error.clone());
1991        return Err(error);
1992    }
1993    Ok(())
1994}
1995
1996#[allow(dead_code)]
1997struct ConsumerActor<T> {
1998    shared: Arc<ConsumerShared<T>>,
1999    initial_producer: Option<ActorRef<ProducerCommand<T>>>,
2000    settings: StreamRefSettings,
2001    #[cfg(feature = "cluster")]
2002    endpoint: Option<RemoteStreamRefEndpoint>,
2003}
2004
2005impl<T> Actor for ConsumerActor<T>
2006where
2007    T: stream_ref_actor_bound::Bound + Send + 'static,
2008{
2009    type Msg = ConsumerCommand<T>;
2010    type State = ();
2011    type Arguments = ();
2012
2013    async fn pre_start(
2014        &self,
2015        myself: ActorRef<Self::Msg>,
2016        _args: Self::Arguments,
2017    ) -> Result<Self::State, ActorProcessingErr> {
2018        if let Some(producer) = &self.initial_producer {
2019            self.shared.set_producer(producer.clone());
2020            if let Err(error) = cast_producer_subscribe(
2021                producer,
2022                myself.clone(),
2023                #[cfg(feature = "cluster")]
2024                self.endpoint.as_ref(),
2025            ) {
2026                self.shared.fail_local(error);
2027                myself.stop(None);
2028            }
2029        }
2030        Ok(())
2031    }
2032
2033    async fn handle(
2034        &self,
2035        _myself: ActorRef<Self::Msg>,
2036        message: Self::Msg,
2037        _state: &mut Self::State,
2038    ) -> ActorResult {
2039        match message {
2040            ConsumerCommand::OnSubscribe { producer } => {
2041                if !self.shared.set_producer(producer.clone()) {
2042                    let _ = cast_producer_remote_failure(
2043                        &producer,
2044                        _myself.clone(),
2045                        #[cfg(feature = "cluster")]
2046                        self.endpoint.as_ref(),
2047                        StreamError::Failed(
2048                            "stream ref was already subscribed by another endpoint".to_owned(),
2049                        ),
2050                    );
2051                }
2052            }
2053            #[cfg(feature = "cluster")]
2054            ConsumerCommand::OnSubscribeRemote { producer } => {
2055                match resolve_remote_actor(&producer, self.settings.subscription_timeout) {
2056                    Ok(producer) => {
2057                        if !self.shared.set_producer(producer.clone()) {
2058                            let _ = cast_producer_remote_failure(
2059                                &producer,
2060                                _myself.clone(),
2061                                self.endpoint.as_ref(),
2062                                StreamError::Failed(
2063                                    "stream ref was already subscribed by another endpoint"
2064                                        .to_owned(),
2065                                ),
2066                            );
2067                        }
2068                    }
2069                    Err(error) => self.shared.fail_local(error),
2070                }
2071            }
2072            ConsumerCommand::Element {
2073                producer,
2074                seq,
2075                item,
2076            } => self.shared.push(&producer, seq, item),
2077            #[cfg(feature = "cluster")]
2078            ConsumerCommand::ElementRemote {
2079                producer,
2080                seq,
2081                item,
2082            } => {
2083                if let Ok(producer) =
2084                    resolve_remote_actor(&producer, self.settings.subscription_timeout)
2085                {
2086                    self.shared.push(&producer, seq, item);
2087                }
2088            }
2089            ConsumerCommand::Complete { producer, seq } => {
2090                self.shared.complete(&producer, seq);
2091                let _ = cast_actor(&producer, ProducerCommand::Ack);
2092            }
2093            #[cfg(feature = "cluster")]
2094            ConsumerCommand::CompleteRemote { producer, seq } => {
2095                if let Ok(producer) =
2096                    resolve_remote_actor(&producer, self.settings.subscription_timeout)
2097                {
2098                    self.shared.complete(&producer, seq);
2099                    let _ = cast_actor(&producer, ProducerCommand::Ack);
2100                }
2101            }
2102            ConsumerCommand::Failure { producer, error } => {
2103                self.shared.fail(&producer, error);
2104                let _ = cast_actor(&producer, ProducerCommand::Ack);
2105            }
2106            #[cfg(feature = "cluster")]
2107            ConsumerCommand::FailureRemote { producer, error } => {
2108                if let Ok(producer) =
2109                    resolve_remote_actor(&producer, self.settings.subscription_timeout)
2110                {
2111                    self.shared.fail(&producer, error);
2112                    let _ = cast_actor(&producer, ProducerCommand::Ack);
2113                }
2114            }
2115        }
2116        Ok(())
2117    }
2118
2119    async fn post_stop(
2120        &self,
2121        _myself: ActorRef<Self::Msg>,
2122        _state: &mut Self::State,
2123    ) -> ActorResult {
2124        self.shared.fail_local(StreamError::ActorTerminated);
2125        Ok(())
2126    }
2127}
2128
2129#[allow(dead_code)]
2130struct ConsumerStream<T>
2131where
2132    T: stream_ref_actor_bound::Bound + Send + 'static,
2133{
2134    shared: Arc<ConsumerShared<T>>,
2135    actor_ref: Option<ActorRef<ConsumerCommand<T>>>,
2136    #[cfg(feature = "cluster")]
2137    endpoint: Option<RemoteStreamRefEndpoint>,
2138    settings: StreamRefSettings,
2139    terminated: bool,
2140    source_ref_keep_alive: Option<Arc<SourceRefInner<T>>>,
2141}
2142
2143impl<T> Iterator for ConsumerStream<T>
2144where
2145    T: stream_ref_actor_bound::Bound + Send + 'static,
2146{
2147    type Item = StreamResult<T>;
2148
2149    fn next(&mut self) -> Option<Self::Item> {
2150        if self.terminated {
2151            return None;
2152        }
2153
2154        if let Err(error) = self.wait_for_subscription() {
2155            self.terminated = true;
2156            return Some(Err(error));
2157        }
2158
2159        if let Err(error) = self.redeliver_or_extend_demand() {
2160            self.terminated = true;
2161            return Some(Err(error));
2162        }
2163
2164        let mut next_redelivery = next_redelivery_deadline(self.settings);
2165        loop {
2166            let demand_after_pop = {
2167                let mut inner = self.shared.lock();
2168                if let Some(item) = inner.queue.pop_front() {
2169                    inner.delivered = inner.delivered.saturating_add(1);
2170                    let demand = next_demand(&mut inner, self.settings);
2171                    drop(inner);
2172                    if let Some((producer, cumulative)) = demand
2173                        && let Err(error) = send_demand(
2174                            &self.shared,
2175                            &self.actor_ref,
2176                            #[cfg(feature = "cluster")]
2177                            &self.endpoint,
2178                            &producer,
2179                            cumulative,
2180                        )
2181                    {
2182                        self.terminated = true;
2183                        return Some(Err(error));
2184                    }
2185                    return Some(Ok(item));
2186                }
2187
2188                if let Some(terminal) = inner.terminal.clone() {
2189                    drop(inner);
2190                    match terminal {
2191                        ConsumerTerminal::Complete => {
2192                            self.terminated = true;
2193                            self.stop_actor();
2194                            return None;
2195                        }
2196                        ConsumerTerminal::Error(error) => {
2197                            self.terminated = true;
2198                            self.stop_actor();
2199                            return Some(Err(error));
2200                        }
2201                    }
2202                }
2203                None::<(ProducerEndpoint<T>, u64)>
2204            };
2205            debug_assert!(demand_after_pop.is_none());
2206
2207            let now = Instant::now();
2208            let timeout = next_redelivery.saturating_duration_since(now);
2209            let mut inner = self.shared.lock();
2210            if !inner.queue.is_empty() || inner.terminal.is_some() {
2211                continue;
2212            }
2213            let (next_inner, result) = wait_timeout_unpoison(
2214                &self.shared.changed,
2215                inner,
2216                timeout.min(STREAM_REF_WAIT_POLL),
2217            );
2218            inner = next_inner;
2219            drop(inner);
2220            if result.timed_out() && Instant::now() >= next_redelivery {
2221                if let Err(error) = self.redeliver_demand() {
2222                    self.terminated = true;
2223                    return Some(Err(error));
2224                }
2225                next_redelivery = next_redelivery_deadline(self.settings);
2226            }
2227        }
2228    }
2229}
2230
2231#[allow(dead_code)]
2232impl<T> ConsumerStream<T>
2233where
2234    T: stream_ref_actor_bound::Bound + Send + 'static,
2235{
2236    fn wait_for_subscription(&self) -> StreamResult<()> {
2237        let deadline = Instant::now()
2238            .checked_add(self.settings.subscription_timeout)
2239            .unwrap_or_else(far_future);
2240        let mut inner = self.shared.lock();
2241        loop {
2242            if inner.producer.is_some() {
2243                return Ok(());
2244            }
2245            if let Some(terminal) = inner.terminal.clone() {
2246                return match terminal {
2247                    ConsumerTerminal::Complete => Ok(()),
2248                    ConsumerTerminal::Error(error) => Err(error),
2249                };
2250            }
2251            let now = Instant::now();
2252            if now >= deadline {
2253                drop(inner);
2254                let error = subscription_timeout_error("stream ref source");
2255                self.shared.fail_local(error.clone());
2256                self.stop_actor_ref();
2257                return Err(error);
2258            }
2259            let remaining = deadline.saturating_duration_since(now);
2260            let (next, _) = wait_timeout_unpoison(
2261                &self.shared.changed,
2262                inner,
2263                remaining.min(STREAM_REF_WAIT_POLL),
2264            );
2265            inner = next;
2266        }
2267    }
2268
2269    fn redeliver_or_extend_demand(&self) -> StreamResult<()> {
2270        let demand = {
2271            let mut inner = self.shared.lock();
2272            next_demand(&mut inner, self.settings)
2273        };
2274        if let Some((producer, cumulative)) = demand {
2275            send_demand(
2276                &self.shared,
2277                &self.actor_ref,
2278                #[cfg(feature = "cluster")]
2279                &self.endpoint,
2280                &producer,
2281                cumulative,
2282            )?;
2283        }
2284        Ok(())
2285    }
2286
2287    fn redeliver_demand(&self) -> StreamResult<()> {
2288        let (producer, cumulative) = {
2289            let inner = self.shared.lock();
2290            let Some(producer) = inner.producer.clone() else {
2291                return Ok(());
2292            };
2293            if inner.cumulative_demand == 0 {
2294                return Ok(());
2295            }
2296            (producer, inner.cumulative_demand)
2297        };
2298        send_demand(
2299            &self.shared,
2300            &self.actor_ref,
2301            #[cfg(feature = "cluster")]
2302            &self.endpoint,
2303            &producer,
2304            cumulative,
2305        )
2306    }
2307
2308    fn stop_actor(&mut self) {
2309        if let Some(actor_ref) = self.actor_ref.take() {
2310            actor_ref.stop(None);
2311        }
2312    }
2313
2314    fn stop_actor_ref(&self) {
2315        if let Some(actor_ref) = &self.actor_ref {
2316            actor_ref.stop(None);
2317        }
2318    }
2319}
2320
2321impl<T> Drop for ConsumerStream<T>
2322where
2323    T: stream_ref_actor_bound::Bound + Send + 'static,
2324{
2325    fn drop(&mut self) {
2326        if !self.terminated {
2327            let producer = self.shared.lock().producer.clone();
2328            match producer {
2329                Some(ProducerEndpoint::Actor(producer)) => {
2330                    if let Some(consumer) = &self.actor_ref {
2331                        let _ = cast_producer_cancel(
2332                            &producer,
2333                            consumer.clone(),
2334                            #[cfg(feature = "cluster")]
2335                            self.endpoint.as_ref(),
2336                        );
2337                    }
2338                }
2339                Some(ProducerEndpoint::Direct(producer)) => {
2340                    if let Some(producer) = producer.upgrade() {
2341                        producer.stop_from_direct_consumer(&self.shared, StreamError::Cancelled);
2342                    }
2343                }
2344                None => {}
2345            }
2346        }
2347        self.stop_actor();
2348        drop(self.source_ref_keep_alive.take());
2349    }
2350}
2351
2352#[allow(dead_code)]
2353fn next_demand<T>(
2354    inner: &mut ConsumerInner<T>,
2355    settings: StreamRefSettings,
2356) -> Option<(ProducerEndpoint<T>, u64)> {
2357    if inner.terminal.is_some() {
2358        return None;
2359    }
2360    let remaining_credit = inner.cumulative_demand.saturating_sub(inner.delivered);
2361    if inner.cumulative_demand != 0 && remaining_credit > demand_replenish_threshold(settings) {
2362        return None;
2363    }
2364    let target = inner
2365        .delivered
2366        .saturating_add(settings.buffer_capacity as u64);
2367    if inner.cumulative_demand >= target {
2368        return None;
2369    }
2370    inner.cumulative_demand = target;
2371    inner
2372        .producer
2373        .as_ref()
2374        .map(|producer| (producer.clone(), inner.cumulative_demand))
2375}
2376
2377#[allow(dead_code)]
2378fn send_demand<T>(
2379    consumer_shared: &Arc<ConsumerShared<T>>,
2380    consumer: &Option<ActorRef<ConsumerCommand<T>>>,
2381    #[cfg(feature = "cluster")] consumer_endpoint: &Option<RemoteStreamRefEndpoint>,
2382    producer: &ProducerEndpoint<T>,
2383    cumulative: u64,
2384) -> StreamResult<()>
2385where
2386    T: stream_ref_actor_bound::Bound + Send + 'static,
2387{
2388    match producer {
2389        ProducerEndpoint::Actor(producer) => {
2390            let Some(consumer) = consumer else {
2391                return Err(StreamError::ActorTerminated);
2392            };
2393            cast_producer_demand(
2394                producer,
2395                consumer.clone(),
2396                #[cfg(feature = "cluster")]
2397                consumer_endpoint.as_ref(),
2398                cumulative,
2399            )
2400        }
2401        ProducerEndpoint::Direct(producer) => {
2402            let Some(producer) = producer.upgrade() else {
2403                return Err(StreamError::ActorTerminated);
2404            };
2405            producer.update_direct_demand(consumer_shared, cumulative);
2406            Ok(())
2407        }
2408    }
2409}
2410
2411#[allow(dead_code)]
2412fn demand_replenish_threshold(settings: StreamRefSettings) -> u64 {
2413    (settings.buffer_capacity as u64) / 2
2414}
2415
2416#[cfg(feature = "cluster")]
2417fn materialize_remote_source_ref<T>(
2418    producer_endpoint: RemoteStreamRefEndpoint,
2419    settings: StreamRefSettings,
2420    keep_alive: Arc<SourceRefInner<T>>,
2421) -> StreamResult<BoxStream<T>>
2422where
2423    T: BytesConvertable + Send + 'static,
2424{
2425    let producer = resolve_remote_actor(&producer_endpoint, settings.subscription_timeout)?;
2426    let shared = Arc::new(ConsumerShared::new(settings));
2427    let consumer_endpoint = RemoteStreamRefEndpoint::new("consumer");
2428    let (consumer, _handle) = spawn_consumer_actor(
2429        None,
2430        Arc::clone(&shared),
2431        settings,
2432        Some(consumer_endpoint.clone()),
2433    )?;
2434    join_remote_endpoint(&consumer_endpoint, &consumer);
2435    cast_producer_subscribe(&producer, consumer.clone(), Some(&consumer_endpoint))?;
2436    Ok(Box::new(ConsumerStream {
2437        shared,
2438        actor_ref: Some(consumer),
2439        endpoint: Some(consumer_endpoint),
2440        settings,
2441        terminated: false,
2442        source_ref_keep_alive: Some(keep_alive),
2443    }) as BoxStream<T>)
2444}
2445
2446#[cfg(feature = "cluster")]
2447fn materialize_remote_sink_ref<T>(
2448    consumer_endpoint: RemoteStreamRefEndpoint,
2449    settings: StreamRefSettings,
2450    input: BoxStream<T>,
2451) -> StreamResult<StreamCompletion<NotUsed>>
2452where
2453    T: BytesConvertable + Send + 'static,
2454{
2455    let consumer = resolve_remote_actor(&consumer_endpoint, settings.subscription_timeout)?;
2456    let shared = Arc::new(ProducerShared::new());
2457    let producer_endpoint = RemoteStreamRefEndpoint::new("producer");
2458    let (producer, _handle) = spawn_producer_actor(
2459        None,
2460        Arc::clone(&shared),
2461        settings,
2462        Some(producer_endpoint.clone()),
2463    )?;
2464    join_remote_endpoint(&producer_endpoint, &producer);
2465    register_producer_consumer(
2466        &shared,
2467        producer.clone(),
2468        consumer,
2469        Some(&producer_endpoint),
2470    );
2471    let cancelled = StreamCancellation::for_external_completion();
2472    let cancel_flag = cancelled.cancelled();
2473    let guard_cancel_flag = Arc::clone(&cancel_flag);
2474    let (sender, receiver) = oneshot::channel();
2475    let guard_producer = producer.clone();
2476    let endpoint_handle = ProducerEndpointHandle::Actor {
2477        actor: producer,
2478        shared: Arc::clone(&shared),
2479        endpoint: Some(producer_endpoint),
2480    };
2481    let guard = ProducerEndpointDropGuard::new(guard_producer, Some(guard_cancel_flag));
2482    std::thread::spawn(move || {
2483        let _guard = guard;
2484        let result = run_producer_endpoint(input, shared, endpoint_handle, settings, cancel_flag);
2485        let _ = sender.send(result);
2486    });
2487    Ok(StreamCompletion::from_receiver(receiver, Some(cancelled)))
2488}
2489
2490fn stream_ref_source_sink<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
2491where
2492    T: Send + 'static,
2493{
2494    Sink::from_runner(move |input, materializer| {
2495        let direct = Arc::new(SourceRefDirectState::new(input));
2496        let direct_for_timeout = Arc::clone(&direct);
2497        let timeout = materializer.schedule_once(settings.subscription_timeout, move || {
2498            direct_for_timeout.timeout_if_unsubscribed();
2499        });
2500        Ok(SourceRef {
2501            inner: Arc::new(SourceRefInner {
2502                producer: LazyProducerStatus::new(Arc::clone(&direct)),
2503                direct,
2504                settings,
2505                subscribed: AtomicBool::new(false),
2506                timeout: Some(timeout),
2507                #[cfg(feature = "cluster")]
2508                remote: None,
2509            }),
2510        })
2511    })
2512}
2513
2514fn stream_ref_sink_source<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
2515where
2516    T: Send + 'static,
2517{
2518    Source::from_materialized_factory(move |_materializer| {
2519        let direct = Arc::new(SinkRefDirectState::new());
2520        let sink_ref = SinkRef {
2521            inner: Arc::new(SinkRefInner {
2522                direct: Arc::clone(&direct),
2523                #[cfg(feature = "cluster")]
2524                settings,
2525                subscribed: AtomicBool::new(false),
2526                #[cfg(feature = "cluster")]
2527                remote: None,
2528            }),
2529        };
2530        Ok((
2531            Box::new(SinkRefDirectStream::new(direct, settings)) as BoxStream<T>,
2532            sink_ref,
2533        ))
2534    })
2535}
2536
2537#[allow(dead_code)]
2538fn send_element_to_consumer<T>(
2539    consumer: &ConsumerEndpoint<T>,
2540    producer: &ProducerEndpointHandle<T>,
2541    seq: u64,
2542    item: T,
2543) -> StreamResult<()>
2544where
2545    T: stream_ref_actor_bound::Bound + Send + 'static,
2546{
2547    match consumer {
2548        ConsumerEndpoint::Actor(consumer) => {
2549            let Some(producer_actor) = producer.actor_ref() else {
2550                return Err(StreamError::ActorTerminated);
2551            };
2552            #[cfg(feature = "cluster")]
2553            if is_remote_actor(consumer) {
2554                let Some(producer_endpoint) = producer.endpoint() else {
2555                    return Err(StreamError::ActorTerminated);
2556                };
2557                return cast_actor(
2558                    consumer,
2559                    ConsumerCommand::ElementRemote {
2560                        producer: producer_endpoint,
2561                        seq,
2562                        item,
2563                    },
2564                );
2565            }
2566            cast_actor(
2567                consumer,
2568                ConsumerCommand::Element {
2569                    producer: producer_actor,
2570                    seq,
2571                    item,
2572                },
2573            )
2574        }
2575        ConsumerEndpoint::Direct(consumer) => {
2576            let Some(consumer) = consumer.upgrade() else {
2577                return Err(StreamError::Cancelled);
2578            };
2579            let Some(producer) = producer.direct_shared() else {
2580                return Err(StreamError::ActorTerminated);
2581            };
2582            consumer.push_direct(&producer, seq, item);
2583            Ok(())
2584        }
2585    }
2586}
2587
2588#[allow(dead_code)]
2589fn fail_consumer<T>(
2590    consumer: &ConsumerEndpoint<T>,
2591    producer: &ProducerEndpointHandle<T>,
2592    error: StreamError,
2593) -> StreamResult<bool>
2594where
2595    T: stream_ref_actor_bound::Bound + Send + 'static,
2596{
2597    match consumer {
2598        ConsumerEndpoint::Actor(consumer) => {
2599            let Some(producer_actor) = producer.actor_ref() else {
2600                return Err(StreamError::ActorTerminated);
2601            };
2602            #[cfg(feature = "cluster")]
2603            if is_remote_actor(consumer) {
2604                let Some(producer_endpoint) = producer.endpoint() else {
2605                    return Err(StreamError::ActorTerminated);
2606                };
2607                cast_actor(
2608                    consumer,
2609                    ConsumerCommand::FailureRemote {
2610                        producer: producer_endpoint,
2611                        error,
2612                    },
2613                )?;
2614                return Ok(false);
2615            }
2616            cast_actor(
2617                consumer,
2618                ConsumerCommand::Failure {
2619                    producer: producer_actor,
2620                    error,
2621                },
2622            )?;
2623            Ok(false)
2624        }
2625        ConsumerEndpoint::Direct(consumer) => {
2626            let Some(consumer) = consumer.upgrade() else {
2627                return Err(StreamError::Cancelled);
2628            };
2629            let Some(producer) = producer.direct_shared() else {
2630                return Err(StreamError::ActorTerminated);
2631            };
2632            consumer.fail_direct(&producer, error);
2633            Ok(true)
2634        }
2635    }
2636}
2637
2638#[allow(dead_code)]
2639fn complete_consumer<T>(
2640    consumer: &ConsumerEndpoint<T>,
2641    producer: &ProducerEndpointHandle<T>,
2642    seq: u64,
2643) -> StreamResult<bool>
2644where
2645    T: stream_ref_actor_bound::Bound + Send + 'static,
2646{
2647    match consumer {
2648        ConsumerEndpoint::Actor(consumer) => {
2649            let Some(producer_actor) = producer.actor_ref() else {
2650                return Err(StreamError::ActorTerminated);
2651            };
2652            #[cfg(feature = "cluster")]
2653            if is_remote_actor(consumer) {
2654                let Some(producer_endpoint) = producer.endpoint() else {
2655                    return Err(StreamError::ActorTerminated);
2656                };
2657                cast_actor(
2658                    consumer,
2659                    ConsumerCommand::CompleteRemote {
2660                        producer: producer_endpoint,
2661                        seq,
2662                    },
2663                )?;
2664                return Ok(false);
2665            }
2666            cast_actor(
2667                consumer,
2668                ConsumerCommand::Complete {
2669                    producer: producer_actor,
2670                    seq,
2671                },
2672            )?;
2673            Ok(false)
2674        }
2675        ConsumerEndpoint::Direct(consumer) => {
2676            let Some(consumer) = consumer.upgrade() else {
2677                return Err(StreamError::Cancelled);
2678            };
2679            let Some(producer) = producer.direct_shared() else {
2680                return Err(StreamError::ActorTerminated);
2681            };
2682            consumer.complete_direct(&producer, seq);
2683            Ok(true)
2684        }
2685    }
2686}
2687
2688#[allow(dead_code)]
2689fn run_producer_endpoint<T>(
2690    mut input: BoxStream<T>,
2691    shared: Arc<ProducerShared<T>>,
2692    producer_endpoint: ProducerEndpointHandle<T>,
2693    settings: StreamRefSettings,
2694    cancelled: Arc<AtomicBool>,
2695) -> StreamResult<NotUsed>
2696where
2697    T: stream_ref_actor_bound::Bound + Send + 'static,
2698{
2699    let deadline = Instant::now()
2700        .checked_add(settings.subscription_timeout)
2701        .unwrap_or_else(far_future);
2702
2703    loop {
2704        let consumer = match wait_for_remote_demand(&shared, deadline, &cancelled) {
2705            Ok(consumer) => consumer,
2706            Err(error) => {
2707                producer_endpoint.stop_actor();
2708                return Err(error);
2709            }
2710        };
2711        if cancelled.load(Ordering::SeqCst) {
2712            return Err(StreamError::Cancelled);
2713        }
2714
2715        match input.next() {
2716            Some(Ok(item)) => {
2717                let seq = {
2718                    let mut inner = shared.lock();
2719                    let seq = inner.sent;
2720                    inner.sent = inner.sent.saturating_add(1);
2721                    seq
2722                };
2723                if let Err(error) =
2724                    send_element_to_consumer(&consumer, &producer_endpoint, seq, item)
2725                {
2726                    producer_endpoint.stop_actor();
2727                    return Err(match error {
2728                        StreamError::ActorTerminated => StreamError::Cancelled,
2729                        other => other,
2730                    });
2731                }
2732            }
2733            Some(Err(error)) => {
2734                if fail_consumer(&consumer, &producer_endpoint, error.clone()).unwrap_or(false) {
2735                    producer_endpoint.stop_actor();
2736                }
2737                return Err(error);
2738            }
2739            None => {
2740                let seq = shared.lock().sent;
2741                if complete_consumer(&consumer, &producer_endpoint, seq).unwrap_or(false) {
2742                    producer_endpoint.stop_actor();
2743                }
2744                return Ok(NotUsed);
2745            }
2746        }
2747    }
2748}
2749
2750#[allow(dead_code)]
2751fn wait_for_remote_demand<T>(
2752    shared: &Arc<ProducerShared<T>>,
2753    deadline: Instant,
2754    cancelled: &Arc<AtomicBool>,
2755) -> StreamResult<ConsumerEndpoint<T>>
2756where
2757    T: stream_ref_actor_bound::Bound + Send + 'static,
2758{
2759    let mut inner = shared.lock();
2760    loop {
2761        if cancelled.load(Ordering::SeqCst) {
2762            return Err(StreamError::Cancelled);
2763        }
2764        if let Some(error) = inner.stopped.clone() {
2765            return Err(error);
2766        }
2767        if let Some(consumer) = &inner.consumer
2768            && inner.sent < inner.cumulative_demand
2769        {
2770            return Ok(consumer.clone());
2771        }
2772        let now = Instant::now();
2773        if inner.consumer.is_none() && now >= deadline {
2774            return Err(subscription_timeout_error("stream ref sink"));
2775        }
2776        let remaining = deadline.saturating_duration_since(now);
2777        let timeout = if inner.consumer.is_none() {
2778            remaining.min(STREAM_REF_WAIT_POLL)
2779        } else {
2780            STREAM_REF_WAIT_POLL
2781        };
2782        let (next, _) = wait_timeout_unpoison(&shared.changed, inner, timeout);
2783        inner = next;
2784    }
2785}
2786
2787#[allow(dead_code)]
2788fn spawn_producer_actor<T>(
2789    initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
2790    shared: Arc<ProducerShared<T>>,
2791    settings: StreamRefSettings,
2792    #[cfg(feature = "cluster")] endpoint: Option<RemoteStreamRefEndpoint>,
2793) -> StreamResult<(
2794    ActorRef<ProducerCommand<T>>,
2795    ractor::concurrency::JoinHandle<()>,
2796)>
2797where
2798    T: stream_ref_actor_bound::Bound + Send + 'static,
2799{
2800    block_on_ractor_runtime(Actor::spawn(
2801        None,
2802        ProducerActor {
2803            shared,
2804            initial_consumer,
2805            settings,
2806            #[cfg(feature = "cluster")]
2807            endpoint,
2808        },
2809        (),
2810    ))?
2811    .map_err(|error| {
2812        StreamError::Failed(format!(
2813            "stream ref producer actor failed to spawn: {error}"
2814        ))
2815    })
2816}
2817
2818#[allow(dead_code)]
2819fn spawn_consumer_actor<T>(
2820    initial_producer: Option<ActorRef<ProducerCommand<T>>>,
2821    shared: Arc<ConsumerShared<T>>,
2822    settings: StreamRefSettings,
2823    #[cfg(feature = "cluster")] endpoint: Option<RemoteStreamRefEndpoint>,
2824) -> StreamResult<(
2825    ActorRef<ConsumerCommand<T>>,
2826    ractor::concurrency::JoinHandle<()>,
2827)>
2828where
2829    T: stream_ref_actor_bound::Bound + Send + 'static,
2830{
2831    block_on_ractor_runtime(Actor::spawn(
2832        None,
2833        ConsumerActor {
2834            shared,
2835            initial_producer,
2836            settings,
2837            #[cfg(feature = "cluster")]
2838            endpoint,
2839        },
2840        (),
2841    ))?
2842    .map_err(|error| {
2843        StreamError::Failed(format!(
2844            "stream ref consumer actor failed to spawn: {error}"
2845        ))
2846    })
2847}
2848
2849#[allow(dead_code)]
2850fn cast_producer_subscribe<T>(
2851    producer: &ActorRef<ProducerCommand<T>>,
2852    consumer: ActorRef<ConsumerCommand<T>>,
2853    #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2854) -> StreamResult<()>
2855where
2856    T: stream_ref_actor_bound::Bound + Send + 'static,
2857{
2858    #[cfg(feature = "cluster")]
2859    if is_remote_actor(producer) {
2860        let Some(consumer_endpoint) = consumer_endpoint else {
2861            return Err(StreamError::ActorTerminated);
2862        };
2863        return cast_actor(
2864            producer,
2865            ProducerCommand::SubscribeRemote {
2866                consumer: consumer_endpoint.clone(),
2867            },
2868        );
2869    }
2870
2871    cast_actor(producer, ProducerCommand::Subscribe { consumer })
2872}
2873
2874#[allow(dead_code)]
2875fn cast_producer_demand<T>(
2876    producer: &ActorRef<ProducerCommand<T>>,
2877    consumer: ActorRef<ConsumerCommand<T>>,
2878    #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2879    cumulative: u64,
2880) -> StreamResult<()>
2881where
2882    T: stream_ref_actor_bound::Bound + Send + 'static,
2883{
2884    #[cfg(feature = "cluster")]
2885    if is_remote_actor(producer) {
2886        let Some(consumer_endpoint) = consumer_endpoint else {
2887            return Err(StreamError::ActorTerminated);
2888        };
2889        return cast_actor(
2890            producer,
2891            ProducerCommand::DemandRemote {
2892                consumer: consumer_endpoint.clone(),
2893                cumulative,
2894            },
2895        );
2896    }
2897
2898    cast_actor(
2899        producer,
2900        ProducerCommand::Demand {
2901            consumer,
2902            cumulative,
2903        },
2904    )
2905}
2906
2907#[allow(dead_code)]
2908fn cast_producer_cancel<T>(
2909    producer: &ActorRef<ProducerCommand<T>>,
2910    consumer: ActorRef<ConsumerCommand<T>>,
2911    #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2912) -> StreamResult<()>
2913where
2914    T: stream_ref_actor_bound::Bound + Send + 'static,
2915{
2916    #[cfg(feature = "cluster")]
2917    if is_remote_actor(producer) {
2918        let Some(consumer_endpoint) = consumer_endpoint else {
2919            return Err(StreamError::ActorTerminated);
2920        };
2921        return cast_actor(
2922            producer,
2923            ProducerCommand::CancelRemote {
2924                consumer: consumer_endpoint.clone(),
2925            },
2926        );
2927    }
2928
2929    cast_actor(producer, ProducerCommand::Cancel { consumer })
2930}
2931
2932#[allow(dead_code)]
2933fn cast_producer_remote_failure<T>(
2934    producer: &ActorRef<ProducerCommand<T>>,
2935    consumer: ActorRef<ConsumerCommand<T>>,
2936    #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2937    error: StreamError,
2938) -> StreamResult<()>
2939where
2940    T: stream_ref_actor_bound::Bound + Send + 'static,
2941{
2942    #[cfg(feature = "cluster")]
2943    if is_remote_actor(producer) {
2944        let Some(consumer_endpoint) = consumer_endpoint else {
2945            return Err(StreamError::ActorTerminated);
2946        };
2947        return cast_actor(
2948            producer,
2949            ProducerCommand::RemoteFailureRemote {
2950                consumer: consumer_endpoint.clone(),
2951                error,
2952            },
2953        );
2954    }
2955
2956    cast_actor(producer, ProducerCommand::RemoteFailure { consumer, error })
2957}
2958
2959#[allow(dead_code)]
2960fn cast_consumer_on_subscribe<T>(
2961    consumer: &ActorRef<ConsumerCommand<T>>,
2962    producer: ActorRef<ProducerCommand<T>>,
2963    #[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
2964) -> StreamResult<()>
2965where
2966    T: stream_ref_actor_bound::Bound + Send + 'static,
2967{
2968    #[cfg(feature = "cluster")]
2969    if is_remote_actor(consumer) {
2970        let Some(producer_endpoint) = producer_endpoint else {
2971            return Err(StreamError::ActorTerminated);
2972        };
2973        return cast_actor(
2974            consumer,
2975            ConsumerCommand::OnSubscribeRemote {
2976                producer: producer_endpoint.clone(),
2977            },
2978        );
2979    }
2980
2981    cast_actor(consumer, ConsumerCommand::OnSubscribe { producer })
2982}
2983
2984#[allow(dead_code)]
2985fn cast_consumer_failure<T>(
2986    consumer: &ActorRef<ConsumerCommand<T>>,
2987    producer: ActorRef<ProducerCommand<T>>,
2988    #[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
2989    error: StreamError,
2990) -> StreamResult<()>
2991where
2992    T: stream_ref_actor_bound::Bound + Send + 'static,
2993{
2994    #[cfg(feature = "cluster")]
2995    if is_remote_actor(consumer) {
2996        let Some(producer_endpoint) = producer_endpoint else {
2997            return Err(StreamError::ActorTerminated);
2998        };
2999        return cast_actor(
3000            consumer,
3001            ConsumerCommand::FailureRemote {
3002                producer: producer_endpoint.clone(),
3003                error,
3004            },
3005        );
3006    }
3007
3008    cast_actor(consumer, ConsumerCommand::Failure { producer, error })
3009}
3010
3011#[allow(dead_code)]
3012fn cast_actor<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
3013where
3014    Msg: Message,
3015{
3016    match actor_ref.cast(message) {
3017        Ok(()) => Ok(()),
3018        Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
3019            Err(StreamError::ActorTerminated)
3020        }
3021        Err(error) => Err(StreamError::ActorAskSendFailed {
3022            reason: error.to_string(),
3023        }),
3024    }
3025}
3026
3027#[cfg(feature = "cluster")]
3028fn is_remote_actor<Msg>(actor_ref: &ActorRef<Msg>) -> bool {
3029    !actor_ref.get_id().is_local()
3030}
3031
3032#[cfg(feature = "cluster")]
3033fn join_remote_endpoint<Msg>(endpoint: &RemoteStreamRefEndpoint, actor_ref: &ActorRef<Msg>) {
3034    ractor::pg::join_scoped(
3035        endpoint.scope.clone(),
3036        endpoint.group.clone(),
3037        vec![actor_ref.get_cell()],
3038    );
3039}
3040
3041#[cfg(feature = "cluster")]
3042fn resolve_remote_actor<Msg>(
3043    endpoint: &RemoteStreamRefEndpoint,
3044    timeout: Duration,
3045) -> StreamResult<ActorRef<Msg>>
3046where
3047    Msg: Message,
3048{
3049    let deadline = Instant::now()
3050        .checked_add(timeout)
3051        .unwrap_or_else(far_future);
3052    loop {
3053        if let Some(actor) = resolve_remote_actor_once(endpoint) {
3054            return Ok(actor);
3055        }
3056        if Instant::now() >= deadline {
3057            return Err(subscription_timeout_error("stream ref endpoint"));
3058        }
3059        std::thread::park_timeout(STREAM_REF_WAIT_POLL);
3060    }
3061}
3062
3063#[cfg(feature = "cluster")]
3064fn resolve_remote_actor_once<Msg>(endpoint: &RemoteStreamRefEndpoint) -> Option<ActorRef<Msg>>
3065where
3066    Msg: Message,
3067{
3068    ractor::pg::get_scoped_members(&endpoint.scope, &endpoint.group)
3069        .into_iter()
3070        .next()
3071        .map(ActorCell::into)
3072}
3073
3074#[allow(dead_code)]
3075fn same_actor<MsgA, MsgB>(left: &ActorRef<MsgA>, right: &ActorRef<MsgB>) -> bool
3076where
3077    MsgA: Message,
3078    MsgB: Message,
3079{
3080    left.get_cell().get_id() == right.get_cell().get_id()
3081}
3082
3083fn failed_once<T>(reason: &str) -> BoxStream<T>
3084where
3085    T: stream_ref_actor_bound::Bound + Send + 'static,
3086{
3087    failed_stream(StreamError::Failed(reason.to_owned()))
3088}
3089
3090fn failed_stream<T>(error: StreamError) -> BoxStream<T>
3091where
3092    T: Send + 'static,
3093{
3094    Box::new(std::iter::once(Err(error)))
3095}
3096
3097fn subscription_timeout_error(side: &str) -> StreamError {
3098    StreamError::Failed(format!(
3099        "{side} remote side did not subscribe within subscription timeout"
3100    ))
3101}
3102
3103#[allow(dead_code)]
3104fn invalid_sequence_error(expected: u64, got: u64, context: &str) -> StreamError {
3105    StreamError::Failed(format!(
3106        "{context}: expected sequence {expected}, got {got}"
3107    ))
3108}
3109
3110#[allow(dead_code)]
3111fn next_redelivery_deadline(settings: StreamRefSettings) -> Instant {
3112    Instant::now()
3113        .checked_add(settings.demand_redelivery_interval)
3114        .unwrap_or_else(far_future)
3115}
3116
3117fn far_future() -> Instant {
3118    Instant::now() + Duration::from_secs(60 * 60 * 24 * 365)
3119}
3120
3121fn wait_timeout_unpoison<'a, T>(
3122    condvar: &Condvar,
3123    guard: MutexGuard<'a, T>,
3124    timeout: Duration,
3125) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
3126    condvar
3127        .wait_timeout(guard, timeout)
3128        .unwrap_or_else(|poison| poison.into_inner())
3129}
3130
3131#[cfg(feature = "cluster")]
3132fn put_u8(bytes: &mut Vec<u8>, value: u8) {
3133    bytes.push(value);
3134}
3135
3136#[cfg(feature = "cluster")]
3137fn take_u8(bytes: &[u8], cursor: &mut usize) -> u8 {
3138    let value = bytes.get(*cursor).copied().unwrap_or_default();
3139    *cursor = (*cursor).saturating_add(1);
3140    value
3141}
3142
3143#[cfg(feature = "cluster")]
3144fn put_u32(bytes: &mut Vec<u8>, value: u32) {
3145    bytes.extend(value.to_be_bytes());
3146}
3147
3148#[cfg(feature = "cluster")]
3149fn take_u32(bytes: &[u8], cursor: &mut usize) -> u32 {
3150    let mut value = [0_u8; 4];
3151    if let Some(slice) = take_exact(bytes, cursor, value.len()) {
3152        value.copy_from_slice(slice);
3153    }
3154    u32::from_be_bytes(value)
3155}
3156
3157#[cfg(feature = "cluster")]
3158fn put_u64(bytes: &mut Vec<u8>, value: u64) {
3159    bytes.extend(value.to_be_bytes());
3160}
3161
3162#[cfg(feature = "cluster")]
3163fn take_u64(bytes: &[u8], cursor: &mut usize) -> u64 {
3164    let mut value = [0_u8; 8];
3165    if let Some(slice) = take_exact(bytes, cursor, value.len()) {
3166        value.copy_from_slice(slice);
3167    }
3168    u64::from_be_bytes(value)
3169}
3170
3171#[cfg(feature = "cluster")]
3172fn put_bytes(bytes: &mut Vec<u8>, value: Vec<u8>) {
3173    put_u64(bytes, value.len() as u64);
3174    bytes.extend(value);
3175}
3176
3177#[cfg(feature = "cluster")]
3178fn remote_cast(variant: &str, fields: Vec<Vec<u8>>) -> ractor::message::SerializedMessage {
3179    let mut args = Vec::new();
3180    for field in fields {
3181        put_bytes(&mut args, field);
3182    }
3183    ractor::message::SerializedMessage::Cast {
3184        variant: variant.to_owned(),
3185        args,
3186        metadata: None,
3187    }
3188}
3189
3190#[cfg(feature = "cluster")]
3191fn take_bytes(bytes: &[u8], cursor: &mut usize) -> Vec<u8> {
3192    let len = take_u64(bytes, cursor) as usize;
3193    take_exact(bytes, cursor, len).unwrap_or_default().to_vec()
3194}
3195
3196#[cfg(feature = "cluster")]
3197fn put_string(bytes: &mut Vec<u8>, value: String) {
3198    put_bytes(bytes, value.into_bytes());
3199}
3200
3201#[cfg(feature = "cluster")]
3202fn take_string(bytes: &[u8], cursor: &mut usize) -> String {
3203    String::from_utf8(take_bytes(bytes, cursor)).unwrap_or_default()
3204}
3205
3206#[cfg(feature = "cluster")]
3207fn put_duration(bytes: &mut Vec<u8>, value: Duration) {
3208    put_u64(bytes, value.as_secs());
3209    put_u32(bytes, value.subsec_nanos());
3210}
3211
3212#[cfg(feature = "cluster")]
3213fn take_duration(bytes: &[u8], cursor: &mut usize) -> Duration {
3214    Duration::new(take_u64(bytes, cursor), take_u32(bytes, cursor))
3215}
3216
3217#[cfg(feature = "cluster")]
3218fn take_exact<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> Option<&'a [u8]> {
3219    let end = cursor.checked_add(len)?;
3220    let value = bytes.get(*cursor..end)?;
3221    *cursor = end;
3222    Some(value)
3223}
3224
3225#[cfg(feature = "cluster")]
3226fn remote_port_operation(value: String) -> &'static str {
3227    match value.as_str() {
3228        "abort_emitting" => "abort_emitting",
3229        "abort_reading" => "abort_reading",
3230        "complete" => "complete",
3231        "fail" => "fail",
3232        "grab" => "grab",
3233        "offer" => "offer",
3234        "pull" => "pull",
3235        "push" => "push",
3236        "read_n" => "read_n",
3237        "request" => "request",
3238        "set_handler" => "set_handler",
3239        "set_out_handler" => "set_out_handler",
3240        _ => "remote",
3241    }
3242}
3243
3244#[cfg(test)]
3245mod tests {
3246    use super::*;
3247    use crate::{
3248        stream::{Keep, Source},
3249        testkit::TestSink,
3250    };
3251    use std::sync::{
3252        Arc as StdArc,
3253        atomic::{AtomicBool, AtomicUsize, Ordering},
3254    };
3255
3256    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
3257        let deadline = Instant::now() + timeout;
3258        while Instant::now() < deadline {
3259            if condition() {
3260                return true;
3261            }
3262            std::thread::park_timeout(Duration::from_millis(1));
3263        }
3264        condition()
3265    }
3266
3267    fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
3268        let deadline = Instant::now() + timeout;
3269        while Instant::now() < deadline {
3270            assert!(condition());
3271            std::thread::park_timeout(Duration::from_millis(1));
3272        }
3273        assert!(condition());
3274    }
3275
3276    fn short_settings() -> StreamRefSettings {
3277        StreamRefSettings::default()
3278            .with_buffer_capacity(1)
3279            .with_subscription_timeout(Duration::from_millis(50))
3280            .with_demand_redelivery_interval(Duration::from_millis(10))
3281    }
3282
3283    #[test]
3284    fn source_ref_streams_elements_and_completion() {
3285        let source_ref = Source::from_iter(1_u64..=3)
3286            .run_with(StreamRefs::source_ref())
3287            .unwrap();
3288
3289        assert_eq!(source_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
3290    }
3291
3292    #[test]
3293    fn sink_ref_streams_elements_and_completion() {
3294        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3295            .to_mat(Sink::collect(), Keep::both)
3296            .run()
3297            .unwrap();
3298
3299        Source::from_iter(1_u64..=3)
3300            .run_with(sink_ref.sink())
3301            .unwrap()
3302            .wait()
3303            .unwrap();
3304
3305        assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
3306    }
3307
3308    #[test]
3309    fn source_ref_propagates_upstream_failure() {
3310        let source_ref = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
3311            .run_with(StreamRefs::source_ref())
3312            .unwrap();
3313
3314        assert_eq!(
3315            source_ref.source().run_collect(),
3316            Err(StreamError::Failed("boom".to_owned()))
3317        );
3318    }
3319
3320    #[test]
3321    fn sink_ref_propagates_upstream_failure() {
3322        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3323            .to_mat(Sink::collect(), Keep::both)
3324            .run()
3325            .unwrap();
3326
3327        let failure = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
3328            .run_with(sink_ref.sink())
3329            .unwrap()
3330            .wait();
3331
3332        assert_eq!(failure, Err(StreamError::Failed("remote boom".to_owned())));
3333        assert_eq!(
3334            completion.wait(),
3335            Err(StreamError::Failed("remote boom".to_owned()))
3336        );
3337    }
3338
3339    #[test]
3340    fn source_ref_cancellation_reaches_origin() {
3341        let closed = StdArc::new(AtomicBool::new(false));
3342        let close_flag = StdArc::clone(&closed);
3343        let source = Source::unfold_resource(
3344            || Ok(()),
3345            |_state| Ok(Some(1_u64)),
3346            move |_state| {
3347                close_flag.store(true, Ordering::SeqCst);
3348                Ok(())
3349            },
3350        );
3351        let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
3352
3353        assert_eq!(source_ref.source().take(1).run_collect().unwrap(), vec![1]);
3354        assert!(wait_until(Duration::from_secs(1), || {
3355            closed.load(Ordering::SeqCst)
3356        }));
3357    }
3358
3359    #[test]
3360    fn sink_ref_cancellation_stops_remote_producer() {
3361        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3362            .take(1)
3363            .to_mat(Sink::collect(), Keep::both)
3364            .run()
3365            .unwrap();
3366
3367        let producer = Source::repeat(1_u64)
3368            .run_with(sink_ref.sink())
3369            .unwrap()
3370            .wait();
3371
3372        assert_eq!(completion.wait().unwrap(), vec![1]);
3373        assert_eq!(producer, Err(StreamError::Cancelled));
3374    }
3375
3376    #[test]
3377    fn source_ref_backpressures_across_ref() {
3378        let pulled = StdArc::new(AtomicUsize::new(0));
3379        let pulled_for_source = StdArc::clone(&pulled);
3380        let source = Source::unfold(0_u64, move |next| {
3381            pulled_for_source.fetch_add(1, Ordering::SeqCst);
3382            Some((next + 1, next))
3383        });
3384        let source_ref = source
3385            .run_with(StreamRefs::source_ref_with_settings(short_settings()))
3386            .unwrap();
3387        let mut probe = source_ref.source().run_with(TestSink::probe()).unwrap();
3388
3389        probe.request(1);
3390        probe.assert_next(0);
3391        assert!(wait_until(Duration::from_secs(1), || {
3392            pulled.load(Ordering::SeqCst) >= 2
3393        }));
3394        assert_condition_holds(Duration::from_millis(50), || {
3395            pulled.load(Ordering::SeqCst) <= 2
3396        });
3397
3398        probe.request(1);
3399        probe.assert_next(1);
3400        assert!(wait_until(Duration::from_secs(1), || {
3401            pulled.load(Ordering::SeqCst) >= 3
3402        }));
3403        assert_condition_holds(Duration::from_millis(50), || {
3404            pulled.load(Ordering::SeqCst) <= 3
3405        });
3406
3407        probe.cancel();
3408    }
3409
3410    #[test]
3411    fn source_ref_late_subscription_observes_timeout() {
3412        let source_ref = Source::repeat(1_u64)
3413            .run_with(StreamRefs::source_ref_with_settings(short_settings()))
3414            .unwrap();
3415
3416        assert!(wait_until(Duration::from_secs(1), || {
3417            source_ref.inner.producer.get_status() == ractor::ActorStatus::Stopped
3418        }));
3419
3420        let result = source_ref.source().run_collect();
3421        assert!(matches!(
3422            result,
3423            Err(StreamError::ActorTerminated) | Err(StreamError::Failed(_))
3424        ));
3425    }
3426
3427    #[test]
3428    fn sink_ref_subscription_timeout_fails_local_source() {
3429        let (_sink_ref, probe) = StreamRefs::sink_ref_with_settings::<u64>(short_settings())
3430            .to_mat(TestSink::probe(), Keep::both)
3431            .run()
3432            .unwrap();
3433
3434        probe.request(1);
3435        let error = probe.expect_error();
3436        assert!(
3437            matches!(error, StreamError::Failed(message) if message.contains("did not subscribe"))
3438        );
3439    }
3440
3441    #[test]
3442    fn stream_refs_are_one_shot() {
3443        let source_ref = Source::from_iter([1_u64])
3444            .run_with(StreamRefs::source_ref())
3445            .unwrap();
3446        assert_eq!(source_ref.source().run_collect().unwrap(), vec![1]);
3447        assert!(matches!(
3448            source_ref.source().run_collect(),
3449            Err(StreamError::Failed(message)) if message.contains("already")
3450        ));
3451
3452        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3453            .to_mat(Sink::collect(), Keep::both)
3454            .run()
3455            .unwrap();
3456        Source::single(1_u64)
3457            .run_with(sink_ref.sink())
3458            .unwrap()
3459            .wait()
3460            .unwrap();
3461        assert!(matches!(
3462            Source::single(2_u64).run_with(sink_ref.sink()).unwrap().wait(),
3463            Err(StreamError::Failed(message)) if message.contains("already")
3464        ));
3465        assert_eq!(completion.wait().unwrap(), vec![1]);
3466    }
3467
3468    #[cfg(feature = "cluster")]
3469    fn serialize_source_ref<T>(source_ref: SourceRef<T>) -> SourceRef<T>
3470    where
3471        T: BytesConvertable + Send + 'static,
3472    {
3473        SourceRef::from_bytes(source_ref.into_bytes())
3474    }
3475
3476    #[cfg(feature = "cluster")]
3477    fn serialize_sink_ref<T>(sink_ref: SinkRef<T>) -> SinkRef<T>
3478    where
3479        T: BytesConvertable + Send + 'static,
3480    {
3481        SinkRef::from_bytes(sink_ref.into_bytes())
3482    }
3483
3484    #[cfg(feature = "cluster")]
3485    #[test]
3486    fn remote_source_ref_serialized_path_streams_elements_and_completion() {
3487        let source_ref = Source::from_iter(1_u64..=3)
3488            .run_with(StreamRefs::source_ref())
3489            .unwrap();
3490        let remote_ref = serialize_source_ref(source_ref);
3491
3492        assert_eq!(remote_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
3493    }
3494
3495    #[cfg(feature = "cluster")]
3496    #[test]
3497    fn remote_sink_ref_serialized_path_streams_elements_and_completion() {
3498        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3499            .to_mat(Sink::collect(), Keep::both)
3500            .run()
3501            .unwrap();
3502        let remote_ref = serialize_sink_ref(sink_ref);
3503
3504        Source::from_iter(1_u64..=3)
3505            .run_with(remote_ref.sink())
3506            .unwrap()
3507            .wait()
3508            .unwrap();
3509
3510        assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
3511    }
3512
3513    #[cfg(feature = "cluster")]
3514    #[test]
3515    fn remote_source_ref_serialized_path_propagates_failure() {
3516        let source_ref = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
3517            .run_with(StreamRefs::source_ref())
3518            .unwrap();
3519        let remote_ref = serialize_source_ref(source_ref);
3520
3521        assert_eq!(
3522            remote_ref.source().run_collect(),
3523            Err(StreamError::Failed("remote boom".to_owned()))
3524        );
3525    }
3526
3527    #[cfg(feature = "cluster")]
3528    #[test]
3529    fn remote_sink_ref_serialized_path_propagates_failure() {
3530        let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3531            .to_mat(Sink::collect(), Keep::both)
3532            .run()
3533            .unwrap();
3534        let remote_ref = serialize_sink_ref(sink_ref);
3535
3536        let failure = Source::<u64>::failed(StreamError::Failed("sink boom".to_owned()))
3537            .run_with(remote_ref.sink())
3538            .unwrap()
3539            .wait();
3540
3541        assert_eq!(failure, Err(StreamError::Failed("sink boom".to_owned())));
3542        assert_eq!(
3543            completion.wait(),
3544            Err(StreamError::Failed("sink boom".to_owned()))
3545        );
3546    }
3547
3548    #[cfg(feature = "cluster")]
3549    #[test]
3550    fn remote_source_ref_serialized_path_cancellation_reaches_origin() {
3551        let closed = StdArc::new(AtomicBool::new(false));
3552        let close_flag = StdArc::clone(&closed);
3553        let source = Source::unfold_resource(
3554            || Ok(()),
3555            |_state| Ok(Some(1_u64)),
3556            move |_state| {
3557                close_flag.store(true, Ordering::SeqCst);
3558                Ok(())
3559            },
3560        );
3561        let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
3562        let remote_ref = serialize_source_ref(source_ref);
3563
3564        assert_eq!(remote_ref.source().take(1).run_collect().unwrap(), vec![1]);
3565        assert!(wait_until(Duration::from_secs(1), || {
3566            closed.load(Ordering::SeqCst)
3567        }));
3568    }
3569
3570    #[cfg(feature = "cluster")]
3571    #[test]
3572    fn remote_source_ref_serialized_path_backpressures_across_ref() {
3573        let pulled = StdArc::new(AtomicUsize::new(0));
3574        let pulled_for_source = StdArc::clone(&pulled);
3575        let source = Source::unfold(0_u64, move |next| {
3576            pulled_for_source.fetch_add(1, Ordering::SeqCst);
3577            Some((next + 1, next))
3578        });
3579        let source_ref = source
3580            .run_with(StreamRefs::source_ref_with_settings(short_settings()))
3581            .unwrap();
3582        let remote_ref = serialize_source_ref(source_ref);
3583        let mut probe = remote_ref.source().run_with(TestSink::probe()).unwrap();
3584
3585        probe.request(1);
3586        probe.assert_next(0);
3587        assert!(wait_until(Duration::from_secs(1), || {
3588            pulled.load(Ordering::SeqCst) >= 1
3589        }));
3590        assert_condition_holds(Duration::from_secs(1), || {
3591            pulled.load(Ordering::SeqCst) <= 2
3592        });
3593
3594        probe.request(1);
3595        probe.assert_next(1);
3596        assert!(wait_until(Duration::from_secs(1), || {
3597            pulled.load(Ordering::SeqCst) >= 2
3598        }));
3599        assert_condition_holds(Duration::from_secs(1), || {
3600            pulled.load(Ordering::SeqCst) <= 3
3601        });
3602
3603        probe.cancel();
3604    }
3605
3606    #[cfg(feature = "cluster")]
3607    #[test]
3608    fn remote_sink_ref_serialized_path_producer_panic_surfaces_error() {
3609        let (sink_ref, _completion) = StreamRefs::sink_ref::<u64>()
3610            .to_mat(Sink::head(), Keep::both)
3611            .run()
3612            .unwrap();
3613        let remote_sink = serialize_sink_ref(sink_ref);
3614
3615        let panicking = Source::unfold(0_u64, move |state| {
3616            if state >= 5 {
3617                panic!("intentional test panic in producer thread");
3618            }
3619            Some((state + 1, state))
3620        });
3621        let mut completion = panicking
3622            .run_with(remote_sink.sink())
3623            .expect("sink ref materializes");
3624
3625        let deadline = Instant::now() + Duration::from_secs(5);
3626        loop {
3627            if let Some(result) = completion.try_wait() {
3628                assert!(
3629                    result.is_err(),
3630                    "panic in producer thread should surface error to consumer, got {:?}",
3631                    result
3632                );
3633                break;
3634            }
3635            assert!(
3636                Instant::now() < deadline,
3637                "consumer hung after producer panic (no result after 5s)"
3638            );
3639            std::thread::park_timeout(Duration::from_millis(10));
3640        }
3641    }
3642}