1use std::{
2 collections::VecDeque,
3 fmt,
4 sync::{
5 Arc, Condvar, Mutex, MutexGuard, Weak,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11use futures::channel::oneshot;
12#[cfg(feature = "cluster")]
13use ractor::{ActorCell, BytesConvertable};
14#[cfg(feature = "cluster")]
15use std::{
16 sync::atomic::AtomicU64,
17 time::{SystemTime, UNIX_EPOCH},
18};
19
20use crate::stream::{
21 BoxStream, Cancellable, NotUsed, Sink, Source, StreamCancellation, StreamCompletion,
22 StreamError, StreamResult,
23};
24
25use super::{Actor, ActorProcessingErr, ActorRef, ActorResult, Message, block_on_ractor_runtime};
26
27const DEFAULT_STREAM_REF_BUFFER_CAPACITY: usize = 32;
28const DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(30);
29const DEFAULT_STREAM_REF_DEMAND_REDELIVERY: Duration = Duration::from_secs(1);
30const STREAM_REF_WAIT_POLL: Duration = Duration::from_millis(1);
31#[cfg(feature = "cluster")]
32const STREAM_REF_REMOTE_SCOPE: &str = "datum.streamrefs";
33#[cfg(feature = "cluster")]
34static STREAM_REF_REMOTE_ID: AtomicU64 = AtomicU64::new(1);
35
36#[cfg(feature = "cluster")]
37mod stream_ref_actor_bound {
38 pub trait Bound: ractor::BytesConvertable {}
39
40 impl<T> Bound for T where T: ractor::BytesConvertable {}
41}
42
43#[cfg(not(feature = "cluster"))]
44mod stream_ref_actor_bound {
45 pub trait Bound {}
46
47 impl<T> Bound for T {}
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct StreamRefSettings {
57 buffer_capacity: usize,
58 subscription_timeout: Duration,
59 demand_redelivery_interval: Duration,
60}
61
62impl Default for StreamRefSettings {
63 fn default() -> Self {
64 Self {
65 buffer_capacity: DEFAULT_STREAM_REF_BUFFER_CAPACITY,
66 subscription_timeout: DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT,
67 demand_redelivery_interval: DEFAULT_STREAM_REF_DEMAND_REDELIVERY,
68 }
69 }
70}
71
72impl StreamRefSettings {
73 #[must_use]
74 pub fn buffer_capacity(&self) -> usize {
75 self.buffer_capacity
76 }
77
78 #[must_use]
79 pub fn subscription_timeout(&self) -> Duration {
80 self.subscription_timeout
81 }
82
83 #[must_use]
84 pub fn demand_redelivery_interval(&self) -> Duration {
85 self.demand_redelivery_interval
86 }
87
88 #[must_use]
89 pub fn with_buffer_capacity(mut self, capacity: usize) -> Self {
90 assert!(
91 capacity > 0,
92 "StreamRef buffer capacity must be greater than zero"
93 );
94 self.buffer_capacity = capacity;
95 self
96 }
97
98 #[must_use]
99 pub fn with_subscription_timeout(mut self, timeout: Duration) -> Self {
100 self.subscription_timeout = timeout;
101 self
102 }
103
104 #[must_use]
105 pub fn with_demand_redelivery_interval(mut self, interval: Duration) -> Self {
106 self.demand_redelivery_interval = interval;
107 self
108 }
109}
110
111#[cfg(feature = "cluster")]
112#[derive(Debug, Clone, PartialEq, Eq)]
113struct RemoteStreamRefEndpoint {
114 scope: String,
115 group: String,
116}
117
118#[cfg(feature = "cluster")]
119impl RemoteStreamRefEndpoint {
120 fn new(kind: &str) -> Self {
121 let sequence = STREAM_REF_REMOTE_ID.fetch_add(1, Ordering::Relaxed);
122 let timestamp = SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .map(|duration| duration.as_nanos())
125 .unwrap_or_default();
126 Self {
127 scope: STREAM_REF_REMOTE_SCOPE.to_owned(),
128 group: format!(
129 "datum-streamref-{kind}-{}-{sequence}-{timestamp}",
130 std::process::id()
131 ),
132 }
133 }
134}
135
136#[cfg(feature = "cluster")]
137#[derive(Debug, Clone)]
138enum RemoteRefEnvelope {
139 Ready {
140 endpoint: RemoteStreamRefEndpoint,
141 settings: StreamRefSettings,
142 },
143 Failed(StreamError),
144}
145
146#[cfg(feature = "cluster")]
147enum RemoteSourceRefState {
148 Ready(RemoteStreamRefEndpoint),
149 Failed(StreamError),
150}
151
152#[cfg(feature = "cluster")]
153enum RemoteSinkRefState {
154 Ready(RemoteStreamRefEndpoint),
155 Failed(StreamError),
156}
157
158#[cfg(feature = "cluster")]
159impl BytesConvertable for RemoteStreamRefEndpoint {
160 fn into_bytes(self) -> Vec<u8> {
161 let mut bytes = Vec::new();
162 put_string(&mut bytes, self.scope);
163 put_string(&mut bytes, self.group);
164 bytes
165 }
166
167 fn from_bytes(bytes: Vec<u8>) -> Self {
168 let mut cursor = 0;
169 let scope = take_string(&bytes, &mut cursor);
170 let group = take_string(&bytes, &mut cursor);
171 Self { scope, group }
172 }
173}
174
175#[cfg(feature = "cluster")]
176impl BytesConvertable for StreamRefSettings {
177 fn into_bytes(self) -> Vec<u8> {
178 let mut bytes = Vec::with_capacity(40);
179 put_u64(&mut bytes, self.buffer_capacity as u64);
180 put_duration(&mut bytes, self.subscription_timeout);
181 put_duration(&mut bytes, self.demand_redelivery_interval);
182 bytes
183 }
184
185 fn from_bytes(bytes: Vec<u8>) -> Self {
186 let mut cursor = 0;
187 let buffer_capacity = take_u64(&bytes, &mut cursor) as usize;
188 let subscription_timeout = take_duration(&bytes, &mut cursor);
189 let demand_redelivery_interval = take_duration(&bytes, &mut cursor);
190 Self {
191 buffer_capacity,
192 subscription_timeout,
193 demand_redelivery_interval,
194 }
195 }
196}
197
198#[cfg(feature = "cluster")]
199impl BytesConvertable for StreamError {
200 fn into_bytes(self) -> Vec<u8> {
201 let mut bytes = Vec::new();
202 match self {
203 StreamError::Cancelled => put_u8(&mut bytes, 0),
204 StreamError::AbruptTermination => put_u8(&mut bytes, 1),
205 StreamError::Backpressured => put_u8(&mut bytes, 2),
206 StreamError::EmptyStream => put_u8(&mut bytes, 3),
207 StreamError::MaybeIncomplete => put_u8(&mut bytes, 4),
208 StreamError::LimitExceeded { max } => {
209 put_u8(&mut bytes, 5);
210 put_u64(&mut bytes, max);
211 }
212 StreamError::InvalidPortOperation {
213 operation,
214 port,
215 reason,
216 } => {
217 put_u8(&mut bytes, 6);
218 put_string(&mut bytes, operation.to_owned());
219 put_string(&mut bytes, port);
220 put_string(&mut bytes, reason);
221 }
222 StreamError::GraphValidation(message) => {
223 put_u8(&mut bytes, 7);
224 put_string(&mut bytes, message);
225 }
226 StreamError::EventLimitExceeded { limit } => {
227 put_u8(&mut bytes, 8);
228 put_u64(&mut bytes, limit as u64);
229 }
230 StreamError::ActorAskTimeout { timeout } => {
231 put_u8(&mut bytes, 9);
232 put_duration(&mut bytes, timeout);
233 }
234 StreamError::ActorTerminated => put_u8(&mut bytes, 10),
235 StreamError::ActorAskResponseDropped => put_u8(&mut bytes, 11),
236 StreamError::ActorAskSendFailed { reason } => {
237 put_u8(&mut bytes, 12);
238 put_string(&mut bytes, reason);
239 }
240 StreamError::ActorAskTaskFailed { reason } => {
241 put_u8(&mut bytes, 13);
242 put_string(&mut bytes, reason);
243 }
244 StreamError::Failed(message) => {
245 put_u8(&mut bytes, 14);
246 put_string(&mut bytes, message);
247 }
248 }
249 bytes
250 }
251
252 fn from_bytes(bytes: Vec<u8>) -> Self {
253 let mut cursor = 0;
254 match take_u8(&bytes, &mut cursor) {
255 0 => StreamError::Cancelled,
256 1 => StreamError::AbruptTermination,
257 2 => StreamError::Backpressured,
258 3 => StreamError::EmptyStream,
259 4 => StreamError::MaybeIncomplete,
260 5 => StreamError::LimitExceeded {
261 max: take_u64(&bytes, &mut cursor),
262 },
263 6 => StreamError::InvalidPortOperation {
264 operation: remote_port_operation(take_string(&bytes, &mut cursor)),
265 port: take_string(&bytes, &mut cursor),
266 reason: take_string(&bytes, &mut cursor),
267 },
268 7 => StreamError::GraphValidation(take_string(&bytes, &mut cursor)),
269 8 => StreamError::EventLimitExceeded {
270 limit: take_u64(&bytes, &mut cursor) as usize,
271 },
272 9 => StreamError::ActorAskTimeout {
273 timeout: take_duration(&bytes, &mut cursor),
274 },
275 10 => StreamError::ActorTerminated,
276 11 => StreamError::ActorAskResponseDropped,
277 12 => StreamError::ActorAskSendFailed {
278 reason: take_string(&bytes, &mut cursor),
279 },
280 13 => StreamError::ActorAskTaskFailed {
281 reason: take_string(&bytes, &mut cursor),
282 },
283 14 => StreamError::Failed(take_string(&bytes, &mut cursor)),
284 _ => StreamError::Failed("invalid remote stream error encoding".to_owned()),
285 }
286 }
287}
288
289#[cfg(feature = "cluster")]
290impl BytesConvertable for RemoteRefEnvelope {
291 fn into_bytes(self) -> Vec<u8> {
292 let mut bytes = Vec::new();
293 match self {
294 Self::Ready { endpoint, settings } => {
295 put_u8(&mut bytes, 0);
296 put_bytes(&mut bytes, endpoint.into_bytes());
297 put_bytes(&mut bytes, settings.into_bytes());
298 }
299 Self::Failed(error) => {
300 put_u8(&mut bytes, 1);
301 put_bytes(&mut bytes, error.into_bytes());
302 }
303 }
304 bytes
305 }
306
307 fn from_bytes(bytes: Vec<u8>) -> Self {
308 let mut cursor = 0;
309 match take_u8(&bytes, &mut cursor) {
310 0 => {
311 let endpoint = RemoteStreamRefEndpoint::from_bytes(take_bytes(&bytes, &mut cursor));
312 let settings = StreamRefSettings::from_bytes(take_bytes(&bytes, &mut cursor));
313 Self::Ready { endpoint, settings }
314 }
315 1 => Self::Failed(StreamError::from_bytes(take_bytes(&bytes, &mut cursor))),
316 _ => Self::Failed(StreamError::Failed(
317 "invalid remote stream ref encoding".to_owned(),
318 )),
319 }
320 }
321}
322
323pub struct StreamRefs;
330
331impl StreamRefs {
332 #[must_use]
333 pub fn source_ref<T>() -> Sink<T, SourceRef<T>>
334 where
335 T: Send + 'static,
336 {
337 Self::source_ref_with_settings(StreamRefSettings::default())
338 }
339
340 #[must_use]
341 pub fn source_ref_with_settings<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
342 where
343 T: Send + 'static,
344 {
345 stream_ref_source_sink(settings)
346 }
347
348 #[must_use]
349 pub fn sink_ref<T>() -> Source<T, SinkRef<T>>
350 where
351 T: Send + 'static,
352 {
353 Self::sink_ref_with_settings(StreamRefSettings::default())
354 }
355
356 #[must_use]
357 pub fn sink_ref_with_settings<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
358 where
359 T: Send + 'static,
360 {
361 stream_ref_sink_source(settings)
362 }
363}
364
365pub struct SourceRef<T> {
370 inner: Arc<SourceRefInner<T>>,
371}
372
373struct SourceRefInner<T> {
374 #[allow(dead_code)]
375 producer: LazyProducerStatus<T>,
376 direct: Arc<SourceRefDirectState<T>>,
377 settings: StreamRefSettings,
378 subscribed: AtomicBool,
379 timeout: Option<Cancellable>,
380 #[cfg(feature = "cluster")]
381 remote: Option<RemoteSourceRefState>,
382}
383
384impl<T> Clone for SourceRef<T> {
385 fn clone(&self) -> Self {
386 Self {
387 inner: Arc::clone(&self.inner),
388 }
389 }
390}
391
392impl<T> fmt::Debug for SourceRef<T> {
393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394 f.debug_struct("SourceRef").finish_non_exhaustive()
395 }
396}
397
398#[cfg(feature = "cluster")]
399impl<T> BytesConvertable for SourceRef<T>
400where
401 T: BytesConvertable + Send + 'static,
402{
403 fn into_bytes(self) -> Vec<u8> {
404 match self.activate_remote_producer() {
405 Ok(endpoint) => RemoteRefEnvelope::Ready {
406 endpoint,
407 settings: self.inner.settings,
408 },
409 Err(error) => RemoteRefEnvelope::Failed(error),
410 }
411 .into_bytes()
412 }
413
414 fn from_bytes(bytes: Vec<u8>) -> Self {
415 match RemoteRefEnvelope::from_bytes(bytes) {
416 RemoteRefEnvelope::Ready { endpoint, settings } => {
417 Self::remote(settings, RemoteSourceRefState::Ready(endpoint))
418 }
419 RemoteRefEnvelope::Failed(error) => Self::remote(
420 StreamRefSettings::default(),
421 RemoteSourceRefState::Failed(error),
422 ),
423 }
424 }
425}
426
427impl<T> SourceRef<T>
428where
429 T: stream_ref_actor_bound::Bound + Send + 'static,
430{
431 #[must_use]
432 pub fn source(&self) -> Source<T, NotUsed> {
433 let inner = Arc::clone(&self.inner);
434 Source::from_materialized_factory(move |_materializer| {
435 if inner.subscribed.swap(true, Ordering::SeqCst) {
436 return Ok((
437 failed_once("source ref has already been materialized"),
438 NotUsed,
439 ));
440 }
441
442 #[cfg(feature = "cluster")]
443 if let Some(remote) = &inner.remote {
444 return match remote {
445 RemoteSourceRefState::Ready(endpoint) => {
446 match materialize_remote_source_ref(
447 endpoint.clone(),
448 inner.settings,
449 Arc::clone(&inner),
450 ) {
451 Ok(stream) => Ok((stream, NotUsed)),
452 Err(error) => Ok((failed_stream(error), NotUsed)),
453 }
454 }
455 RemoteSourceRefState::Failed(error) => {
456 Ok((failed_stream(error.clone()), NotUsed))
457 }
458 };
459 }
460
461 if let Some(timeout) = &inner.timeout {
462 timeout.cancel();
463 }
464 match inner.direct.claim_input() {
465 Ok(input) => Ok((
466 Box::new(SourceRefDirectStream::new(
467 input,
468 inner.settings,
469 Arc::clone(&inner),
470 )) as BoxStream<T>,
471 NotUsed,
472 )),
473 Err(error) => Ok((failed_stream(error), NotUsed)),
474 }
475 })
476 }
477}
478
479pub(super) fn proto_source<T>(source_ref: &SourceRef<T>) -> Source<T, NotUsed>
480where
481 T: Send + 'static,
482{
483 let inner = Arc::clone(&source_ref.inner);
484 Source::from_materialized_factory(move |_materializer| {
485 if inner.subscribed.swap(true, Ordering::SeqCst) {
486 return Ok((
487 failed_stream(StreamError::Failed(
488 "source ref has already been materialized".to_owned(),
489 )),
490 NotUsed,
491 ));
492 }
493
494 if let Some(timeout) = &inner.timeout {
495 timeout.cancel();
496 }
497 match inner.direct.claim_input() {
498 Ok(input) => Ok((
499 Box::new(SourceRefDirectStream::new(
500 input,
501 inner.settings,
502 Arc::clone(&inner),
503 )) as BoxStream<T>,
504 NotUsed,
505 )),
506 Err(error) => Ok((failed_stream(error), NotUsed)),
507 }
508 })
509}
510
511#[cfg(feature = "cluster")]
512impl<T> SourceRef<T>
513where
514 T: BytesConvertable + Send + 'static,
515{
516 fn remote(settings: StreamRefSettings, remote: RemoteSourceRefState) -> Self {
517 let direct = Arc::new(SourceRefDirectState::terminal(StreamError::ActorTerminated));
518 Self {
519 inner: Arc::new(SourceRefInner {
520 producer: LazyProducerStatus::new(Arc::clone(&direct)),
521 direct,
522 settings,
523 subscribed: AtomicBool::new(false),
524 timeout: None,
525 remote: Some(remote),
526 }),
527 }
528 }
529
530 fn activate_remote_producer(&self) -> StreamResult<RemoteStreamRefEndpoint> {
531 if let Some(RemoteSourceRefState::Ready(endpoint)) = &self.inner.remote {
532 return Ok(endpoint.clone());
533 }
534 if let Some(RemoteSourceRefState::Failed(error)) = &self.inner.remote {
535 return Err(error.clone());
536 }
537 if self.inner.subscribed.swap(true, Ordering::SeqCst) {
538 return Err(StreamError::Failed(
539 "source ref has already been materialized".to_owned(),
540 ));
541 }
542 if let Some(timeout) = &self.inner.timeout {
543 timeout.cancel();
544 }
545 let input = self.inner.direct.claim_input()?;
546 let shared = Arc::new(ProducerShared::new());
547 let endpoint = RemoteStreamRefEndpoint::new("source");
548 let (actor, _handle) = spawn_producer_actor(
549 None,
550 Arc::clone(&shared),
551 self.inner.settings,
552 Some(endpoint.clone()),
553 )?;
554 join_remote_endpoint(&endpoint, &actor);
555 let guard_actor = actor.clone();
556 let producer_endpoint = ProducerEndpointHandle::Actor {
557 actor,
558 shared: Arc::clone(&shared),
559 endpoint: Some(endpoint.clone()),
560 };
561 let cancelled = Arc::new(AtomicBool::new(false));
562 let settings = self.inner.settings;
563 let guard = ProducerEndpointDropGuard::new(guard_actor, None);
564 std::thread::spawn(move || {
565 let _guard = guard;
566 let _ = run_producer_endpoint(input, shared, producer_endpoint, settings, cancelled);
567 });
568 Ok(endpoint)
569 }
570}
571
572pub struct SinkRef<T> {
577 inner: Arc<SinkRefInner<T>>,
578}
579
580struct SinkRefInner<T> {
581 direct: Arc<SinkRefDirectState<T>>,
582 #[cfg(feature = "cluster")]
583 settings: StreamRefSettings,
584 subscribed: AtomicBool,
585 #[cfg(feature = "cluster")]
586 remote: Option<RemoteSinkRefState>,
587}
588
589impl<T> Clone for SinkRef<T> {
590 fn clone(&self) -> Self {
591 Self {
592 inner: Arc::clone(&self.inner),
593 }
594 }
595}
596
597impl<T> fmt::Debug for SinkRef<T> {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 f.debug_struct("SinkRef").finish_non_exhaustive()
600 }
601}
602
603#[cfg(feature = "cluster")]
604impl<T> BytesConvertable for SinkRef<T>
605where
606 T: BytesConvertable + Send + 'static,
607{
608 fn into_bytes(self) -> Vec<u8> {
609 match self.activate_remote_consumer() {
610 Ok(endpoint) => RemoteRefEnvelope::Ready {
611 endpoint,
612 settings: self.inner.settings,
613 },
614 Err(error) => RemoteRefEnvelope::Failed(error),
615 }
616 .into_bytes()
617 }
618
619 fn from_bytes(bytes: Vec<u8>) -> Self {
620 match RemoteRefEnvelope::from_bytes(bytes) {
621 RemoteRefEnvelope::Ready { endpoint, settings } => {
622 Self::remote(settings, RemoteSinkRefState::Ready(endpoint))
623 }
624 RemoteRefEnvelope::Failed(error) => Self::remote(
625 StreamRefSettings::default(),
626 RemoteSinkRefState::Failed(error),
627 ),
628 }
629 }
630}
631
632impl<T> SinkRef<T>
633where
634 T: stream_ref_actor_bound::Bound + Send + 'static,
635{
636 #[must_use]
637 pub fn sink(&self) -> Sink<T, StreamCompletion<NotUsed>> {
638 let inner = Arc::clone(&self.inner);
639 Sink::from_runner(move |input, _materializer| {
640 if inner.subscribed.swap(true, Ordering::SeqCst) {
641 return Ok(StreamCompletion::ready(Err(StreamError::Failed(
642 "sink ref has already been materialized".to_owned(),
643 ))));
644 }
645
646 #[cfg(feature = "cluster")]
647 if let Some(remote) = &inner.remote {
648 return match remote {
649 RemoteSinkRefState::Ready(endpoint) => {
650 materialize_remote_sink_ref(endpoint.clone(), inner.settings, input)
651 }
652 RemoteSinkRefState::Failed(error) => {
653 Ok(StreamCompletion::ready(Err(error.clone())))
654 }
655 };
656 }
657
658 Ok(inner.direct.attach_input(input))
659 })
660 }
661}
662
663#[cfg(feature = "cluster")]
664impl<T> SinkRef<T>
665where
666 T: BytesConvertable + Send + 'static,
667{
668 fn remote(settings: StreamRefSettings, remote: RemoteSinkRefState) -> Self {
669 Self {
670 inner: Arc::new(SinkRefInner {
671 direct: Arc::new(SinkRefDirectState::new()),
672 #[cfg(feature = "cluster")]
673 settings,
674 subscribed: AtomicBool::new(false),
675 remote: Some(remote),
676 }),
677 }
678 }
679
680 fn activate_remote_consumer(&self) -> StreamResult<RemoteStreamRefEndpoint> {
681 if let Some(RemoteSinkRefState::Ready(endpoint)) = &self.inner.remote {
682 return Ok(endpoint.clone());
683 }
684 if let Some(RemoteSinkRefState::Failed(error)) = &self.inner.remote {
685 return Err(error.clone());
686 }
687 if self.inner.subscribed.swap(true, Ordering::SeqCst) {
688 return Err(StreamError::Failed(
689 "sink ref has already been materialized".to_owned(),
690 ));
691 }
692
693 let shared = Arc::new(ConsumerShared::new(self.inner.settings));
694 let endpoint = RemoteStreamRefEndpoint::new("sink");
695 let (actor, _handle) = spawn_consumer_actor(
696 None,
697 Arc::clone(&shared),
698 self.inner.settings,
699 Some(endpoint.clone()),
700 )?;
701 join_remote_endpoint(&endpoint, &actor);
702 let stream = ConsumerStream {
703 shared,
704 actor_ref: Some(actor),
705 endpoint: Some(endpoint.clone()),
706 settings: self.inner.settings,
707 terminated: false,
708 source_ref_keep_alive: None,
709 };
710 self.inner.direct.attach_input_unmanaged(Box::new(stream))?;
711 Ok(endpoint)
712 }
713}
714
715#[allow(dead_code)]
716enum ProducerCommand<T> {
717 Subscribe {
718 consumer: ActorRef<ConsumerCommand<T>>,
719 },
720 #[cfg(feature = "cluster")]
721 SubscribeRemote {
722 consumer: RemoteStreamRefEndpoint,
723 },
724 Demand {
725 consumer: ActorRef<ConsumerCommand<T>>,
726 cumulative: u64,
727 },
728 #[cfg(feature = "cluster")]
729 DemandRemote {
730 consumer: RemoteStreamRefEndpoint,
731 cumulative: u64,
732 },
733 Cancel {
734 consumer: ActorRef<ConsumerCommand<T>>,
735 },
736 #[cfg(feature = "cluster")]
737 CancelRemote {
738 consumer: RemoteStreamRefEndpoint,
739 },
740 RemoteFailure {
741 consumer: ActorRef<ConsumerCommand<T>>,
742 error: StreamError,
743 },
744 #[cfg(feature = "cluster")]
745 RemoteFailureRemote {
746 consumer: RemoteStreamRefEndpoint,
747 error: StreamError,
748 },
749 Ack,
750}
751
752#[cfg(feature = "cluster")]
753impl<T> Message for ProducerCommand<T>
754where
755 T: BytesConvertable + Send + 'static,
756{
757 fn serializable() -> bool {
758 true
759 }
760
761 fn serialize(
762 self,
763 ) -> Result<ractor::message::SerializedMessage, ractor::message::BoxedDowncastErr> {
764 match self {
765 Self::SubscribeRemote { consumer } => {
766 Ok(remote_cast("SubscribeRemote", vec![consumer.into_bytes()]))
767 }
768 Self::DemandRemote {
769 consumer,
770 cumulative,
771 } => Ok(remote_cast(
772 "DemandRemote",
773 vec![consumer.into_bytes(), cumulative.into_bytes()],
774 )),
775 Self::CancelRemote { consumer } => {
776 Ok(remote_cast("CancelRemote", vec![consumer.into_bytes()]))
777 }
778 Self::RemoteFailureRemote { consumer, error } => Ok(remote_cast(
779 "RemoteFailureRemote",
780 vec![consumer.into_bytes(), error.into_bytes()],
781 )),
782 Self::Ack => Ok(remote_cast("Ack", Vec::new())),
783 Self::Subscribe { .. }
784 | Self::Demand { .. }
785 | Self::Cancel { .. }
786 | Self::RemoteFailure { .. } => Err(ractor::message::BoxedDowncastErr),
787 }
788 }
789
790 fn deserialize(
791 message: ractor::message::SerializedMessage,
792 ) -> Result<Self, ractor::message::BoxedDowncastErr> {
793 let ractor::message::SerializedMessage::Cast { variant, args, .. } = message else {
794 return Err(ractor::message::BoxedDowncastErr);
795 };
796 let mut cursor = 0;
797 match variant.as_str() {
798 "SubscribeRemote" => Ok(Self::SubscribeRemote {
799 consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
800 }),
801 "DemandRemote" => Ok(Self::DemandRemote {
802 consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
803 cumulative: u64::from_bytes(take_bytes(&args, &mut cursor)),
804 }),
805 "CancelRemote" => Ok(Self::CancelRemote {
806 consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
807 }),
808 "RemoteFailureRemote" => Ok(Self::RemoteFailureRemote {
809 consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
810 error: StreamError::from_bytes(take_bytes(&args, &mut cursor)),
811 }),
812 "Ack" => Ok(Self::Ack),
813 _ => Err(ractor::message::BoxedDowncastErr),
814 }
815 }
816}
817
818#[allow(dead_code)]
819enum ConsumerCommand<T> {
820 OnSubscribe {
821 producer: ActorRef<ProducerCommand<T>>,
822 },
823 #[cfg(feature = "cluster")]
824 OnSubscribeRemote { producer: RemoteStreamRefEndpoint },
825 Element {
826 producer: ActorRef<ProducerCommand<T>>,
827 seq: u64,
828 item: T,
829 },
830 #[cfg(feature = "cluster")]
831 ElementRemote {
832 producer: RemoteStreamRefEndpoint,
833 seq: u64,
834 item: T,
835 },
836 Complete {
837 producer: ActorRef<ProducerCommand<T>>,
838 seq: u64,
839 },
840 #[cfg(feature = "cluster")]
841 CompleteRemote {
842 producer: RemoteStreamRefEndpoint,
843 seq: u64,
844 },
845 Failure {
846 producer: ActorRef<ProducerCommand<T>>,
847 error: StreamError,
848 },
849 #[cfg(feature = "cluster")]
850 FailureRemote {
851 producer: RemoteStreamRefEndpoint,
852 error: StreamError,
853 },
854}
855
856#[cfg(feature = "cluster")]
857impl<T> Message for ConsumerCommand<T>
858where
859 T: BytesConvertable + Send + 'static,
860{
861 fn serializable() -> bool {
862 true
863 }
864
865 fn serialize(
866 self,
867 ) -> Result<ractor::message::SerializedMessage, ractor::message::BoxedDowncastErr> {
868 match self {
869 Self::OnSubscribeRemote { producer } => Ok(remote_cast(
870 "OnSubscribeRemote",
871 vec![producer.into_bytes()],
872 )),
873 Self::ElementRemote {
874 producer,
875 seq,
876 item,
877 } => Ok(remote_cast(
878 "ElementRemote",
879 vec![producer.into_bytes(), seq.into_bytes(), item.into_bytes()],
880 )),
881 Self::CompleteRemote { producer, seq } => Ok(remote_cast(
882 "CompleteRemote",
883 vec![producer.into_bytes(), seq.into_bytes()],
884 )),
885 Self::FailureRemote { producer, error } => Ok(remote_cast(
886 "FailureRemote",
887 vec![producer.into_bytes(), error.into_bytes()],
888 )),
889 Self::OnSubscribe { .. }
890 | Self::Element { .. }
891 | Self::Complete { .. }
892 | Self::Failure { .. } => Err(ractor::message::BoxedDowncastErr),
893 }
894 }
895
896 fn deserialize(
897 message: ractor::message::SerializedMessage,
898 ) -> Result<Self, ractor::message::BoxedDowncastErr> {
899 let ractor::message::SerializedMessage::Cast { variant, args, .. } = message else {
900 return Err(ractor::message::BoxedDowncastErr);
901 };
902 let mut cursor = 0;
903 match variant.as_str() {
904 "OnSubscribeRemote" => Ok(Self::OnSubscribeRemote {
905 producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
906 }),
907 "ElementRemote" => Ok(Self::ElementRemote {
908 producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
909 seq: u64::from_bytes(take_bytes(&args, &mut cursor)),
910 item: T::from_bytes(take_bytes(&args, &mut cursor)),
911 }),
912 "CompleteRemote" => Ok(Self::CompleteRemote {
913 producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
914 seq: u64::from_bytes(take_bytes(&args, &mut cursor)),
915 }),
916 "FailureRemote" => Ok(Self::FailureRemote {
917 producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
918 error: StreamError::from_bytes(take_bytes(&args, &mut cursor)),
919 }),
920 _ => Err(ractor::message::BoxedDowncastErr),
921 }
922 }
923}
924
925#[allow(dead_code)]
926enum ConsumerEndpoint<T> {
927 Actor(ActorRef<ConsumerCommand<T>>),
928 Direct(Weak<ConsumerShared<T>>),
929}
930
931impl<T> Clone for ConsumerEndpoint<T> {
932 fn clone(&self) -> Self {
933 match self {
934 Self::Actor(actor) => Self::Actor(actor.clone()),
935 Self::Direct(shared) => Self::Direct(Weak::clone(shared)),
936 }
937 }
938}
939
940#[allow(dead_code)]
941enum ProducerEndpoint<T> {
942 Actor(ActorRef<ProducerCommand<T>>),
943 Direct(Weak<ProducerShared<T>>),
944}
945
946impl<T> Clone for ProducerEndpoint<T> {
947 fn clone(&self) -> Self {
948 match self {
949 Self::Actor(actor) => Self::Actor(actor.clone()),
950 Self::Direct(shared) => Self::Direct(Weak::clone(shared)),
951 }
952 }
953}
954
955#[allow(dead_code)]
956enum ProducerEndpointHandle<T> {
957 Actor {
958 actor: ActorRef<ProducerCommand<T>>,
959 shared: Arc<ProducerShared<T>>,
960 #[cfg(feature = "cluster")]
961 endpoint: Option<RemoteStreamRefEndpoint>,
962 },
963 Direct(Arc<ProducerShared<T>>),
964}
965
966impl<T> ProducerEndpointHandle<T>
967where
968 T: stream_ref_actor_bound::Bound + Send + 'static,
969{
970 fn stop_actor(&self) {
971 if let Self::Actor { actor, .. } = self {
972 actor.stop(None);
973 }
974 }
975
976 fn actor_ref(&self) -> Option<ActorRef<ProducerCommand<T>>> {
977 match self {
978 Self::Actor { actor, .. } => Some(actor.clone()),
979 Self::Direct(_) => None,
980 }
981 }
982
983 #[cfg(feature = "cluster")]
984 fn endpoint(&self) -> Option<RemoteStreamRefEndpoint> {
985 match self {
986 Self::Actor { endpoint, .. } => endpoint.clone(),
987 Self::Direct(_) => None,
988 }
989 }
990
991 fn direct_shared(&self) -> Option<Arc<ProducerShared<T>>> {
992 match self {
993 Self::Actor { shared, .. } | Self::Direct(shared) => Some(Arc::clone(shared)),
994 }
995 }
996}
997
998#[cfg(feature = "cluster")]
1003struct ProducerEndpointDropGuard<T: stream_ref_actor_bound::Bound> {
1004 actor: Option<ActorRef<ProducerCommand<T>>>,
1005 cancelled: Option<Arc<AtomicBool>>,
1006}
1007
1008#[cfg(feature = "cluster")]
1009impl<T: stream_ref_actor_bound::Bound> ProducerEndpointDropGuard<T> {
1010 fn new(actor: ActorRef<ProducerCommand<T>>, cancelled: Option<Arc<AtomicBool>>) -> Self {
1011 Self {
1012 actor: Some(actor),
1013 cancelled,
1014 }
1015 }
1016}
1017
1018#[cfg(feature = "cluster")]
1019impl<T: stream_ref_actor_bound::Bound> Drop for ProducerEndpointDropGuard<T> {
1020 fn drop(&mut self) {
1021 if let Some(actor) = self.actor.take() {
1022 actor.stop(None);
1023 }
1024 if let Some(cancelled) = self.cancelled.as_ref() {
1025 cancelled.store(true, Ordering::SeqCst);
1026 }
1027 }
1028}
1029
1030#[allow(dead_code)]
1031struct LazyProducerStatus<T> {
1032 direct: Arc<SourceRefDirectState<T>>,
1033}
1034
1035#[allow(dead_code)]
1036impl<T> LazyProducerStatus<T>
1037where
1038 T: Send + 'static,
1039{
1040 fn new(direct: Arc<SourceRefDirectState<T>>) -> Self {
1041 Self { direct }
1042 }
1043
1044 fn get_status(&self) -> ractor::ActorStatus {
1045 self.direct.status()
1046 }
1047}
1048
1049struct SourceRefDirectState<T> {
1050 inner: Mutex<SourceRefDirectInner<T>>,
1051}
1052
1053struct SourceRefDirectInner<T> {
1054 input: Option<BoxStream<T>>,
1055 terminal: Option<StreamError>,
1056 subscribed: bool,
1057}
1058
1059impl<T> SourceRefDirectState<T>
1060where
1061 T: Send + 'static,
1062{
1063 fn new(input: BoxStream<T>) -> Self {
1064 Self {
1065 inner: Mutex::new(SourceRefDirectInner {
1066 input: Some(input),
1067 terminal: None,
1068 subscribed: false,
1069 }),
1070 }
1071 }
1072
1073 #[cfg(feature = "cluster")]
1074 fn terminal(error: StreamError) -> Self {
1075 Self {
1076 inner: Mutex::new(SourceRefDirectInner {
1077 input: None,
1078 terminal: Some(error),
1079 subscribed: true,
1080 }),
1081 }
1082 }
1083
1084 fn lock(&self) -> MutexGuard<'_, SourceRefDirectInner<T>> {
1085 self.inner
1086 .lock()
1087 .unwrap_or_else(|poison| poison.into_inner())
1088 }
1089
1090 fn claim_input(&self) -> StreamResult<BoxStream<T>> {
1091 let mut inner = self.lock();
1092 inner.subscribed = true;
1093 if let Some(error) = inner.terminal.clone() {
1094 return Err(error);
1095 }
1096 inner.input.take().ok_or(StreamError::ActorTerminated)
1097 }
1098
1099 fn timeout_if_unsubscribed(&self) {
1100 let mut inner = self.lock();
1101 if !inner.subscribed && inner.terminal.is_none() {
1102 let input = inner.input.take();
1103 inner.terminal = Some(subscription_timeout_error("stream ref sink"));
1104 drop(inner);
1105 drop(input);
1106 }
1107 }
1108
1109 #[allow(dead_code)]
1110 fn status(&self) -> ractor::ActorStatus {
1111 let inner = self.lock();
1112 if inner.terminal.is_some() || inner.input.is_none() {
1113 ractor::ActorStatus::Stopped
1114 } else {
1115 ractor::ActorStatus::Running
1116 }
1117 }
1118}
1119
1120struct SourceRefDirectStream<T>
1121where
1122 T: Send + 'static,
1123{
1124 input: Option<BoxStream<T>>,
1125 queue: VecDeque<T>,
1126 terminal: Option<ConsumerTerminal>,
1127 settings: StreamRefSettings,
1128 terminated: bool,
1129 _keep_alive: Arc<SourceRefInner<T>>,
1130}
1131
1132impl<T> SourceRefDirectStream<T>
1133where
1134 T: Send + 'static,
1135{
1136 fn new(
1137 input: BoxStream<T>,
1138 settings: StreamRefSettings,
1139 keep_alive: Arc<SourceRefInner<T>>,
1140 ) -> Self {
1141 Self {
1142 input: Some(input),
1143 queue: VecDeque::new(),
1144 terminal: None,
1145 settings,
1146 terminated: false,
1147 _keep_alive: keep_alive,
1148 }
1149 }
1150
1151 fn fill_prefetch(&mut self) {
1152 while self.terminal.is_none() && self.queue.len() < self.settings.buffer_capacity {
1153 let Some(input) = self.input.as_mut() else {
1154 self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
1155 break;
1156 };
1157 match input.next() {
1158 Some(Ok(item)) => self.queue.push_back(item),
1159 Some(Err(error)) => {
1160 self.input.take();
1161 self.terminal = Some(ConsumerTerminal::Error(error));
1162 }
1163 None => {
1164 self.input.take();
1165 self.terminal = Some(ConsumerTerminal::Complete);
1166 }
1167 }
1168 }
1169 }
1170}
1171
1172impl<T> Iterator for SourceRefDirectStream<T>
1173where
1174 T: Send + 'static,
1175{
1176 type Item = StreamResult<T>;
1177
1178 fn next(&mut self) -> Option<Self::Item> {
1179 if self.terminated {
1180 return None;
1181 }
1182
1183 self.fill_prefetch();
1184 if let Some(item) = self.queue.pop_front() {
1185 self.fill_prefetch();
1186 return Some(Ok(item));
1187 }
1188
1189 match self.terminal.clone() {
1190 Some(ConsumerTerminal::Complete) => {
1191 self.terminated = true;
1192 None
1193 }
1194 Some(ConsumerTerminal::Error(error)) => {
1195 self.terminated = true;
1196 Some(Err(error))
1197 }
1198 None => None,
1199 }
1200 }
1201}
1202
1203impl<T> Drop for SourceRefDirectStream<T>
1204where
1205 T: Send + 'static,
1206{
1207 fn drop(&mut self) {
1208 self.input.take();
1209 self.queue.clear();
1210 }
1211}
1212
1213struct SinkRefDirectState<T> {
1214 inner: Mutex<SinkRefDirectInner<T>>,
1215 changed: Condvar,
1216}
1217
1218struct SinkRefDirectInner<T> {
1219 input: Option<BoxStream<T>>,
1220 completion: Option<oneshot::Sender<StreamResult<NotUsed>>>,
1221 completion_cancelled: Option<Arc<AtomicBool>>,
1222 terminal: Option<StreamError>,
1223 subscribed: bool,
1224}
1225
1226impl<T> SinkRefDirectState<T>
1227where
1228 T: Send + 'static,
1229{
1230 fn new() -> Self {
1231 Self {
1232 inner: Mutex::new(SinkRefDirectInner {
1233 input: None,
1234 completion: None,
1235 completion_cancelled: None,
1236 terminal: None,
1237 subscribed: false,
1238 }),
1239 changed: Condvar::new(),
1240 }
1241 }
1242
1243 fn lock(&self) -> MutexGuard<'_, SinkRefDirectInner<T>> {
1244 self.inner
1245 .lock()
1246 .unwrap_or_else(|poison| poison.into_inner())
1247 }
1248
1249 fn attach_input(&self, input: BoxStream<T>) -> StreamCompletion<NotUsed> {
1250 let mut inner = self.lock();
1251 if let Some(error) = inner.terminal.clone() {
1252 return StreamCompletion::ready(Err(error));
1253 }
1254 if inner.subscribed {
1255 return StreamCompletion::ready(Err(StreamError::Failed(
1256 "stream ref was already subscribed by another endpoint".to_owned(),
1257 )));
1258 }
1259
1260 let (sender, receiver) = oneshot::channel();
1261 let cancellation = StreamCancellation::for_external_completion();
1262 inner.input = Some(input);
1263 inner.completion = Some(sender);
1264 inner.completion_cancelled = Some(cancellation.cancelled());
1265 inner.subscribed = true;
1266 drop(inner);
1267 self.changed.notify_all();
1268 StreamCompletion::from_receiver(receiver, Some(cancellation))
1269 }
1270
1271 #[cfg(feature = "cluster")]
1272 fn attach_input_unmanaged(&self, input: BoxStream<T>) -> StreamResult<()> {
1273 let mut inner = self.lock();
1274 if let Some(error) = inner.terminal.clone() {
1275 return Err(error);
1276 }
1277 if inner.subscribed {
1278 return Err(StreamError::Failed(
1279 "stream ref was already subscribed by another endpoint".to_owned(),
1280 ));
1281 }
1282
1283 inner.input = Some(input);
1284 inner.completion = None;
1285 inner.completion_cancelled = None;
1286 inner.subscribed = true;
1287 drop(inner);
1288 self.changed.notify_all();
1289 Ok(())
1290 }
1291
1292 fn wait_for_input(&self, settings: StreamRefSettings) -> StreamResult<BoxStream<T>> {
1293 let deadline = Instant::now()
1294 .checked_add(settings.subscription_timeout)
1295 .unwrap_or_else(far_future);
1296 let mut inner = self.lock();
1297 loop {
1298 if let Some(cancelled) = &inner.completion_cancelled
1299 && cancelled.load(Ordering::SeqCst)
1300 {
1301 let input = inner.input.take();
1302 inner.terminal = Some(StreamError::Cancelled);
1303 inner.completion.take();
1304 inner.completion_cancelled = None;
1305 drop(inner);
1306 drop(input);
1307 self.changed.notify_all();
1308 return Err(StreamError::Cancelled);
1309 }
1310 if let Some(input) = inner.input.take() {
1311 return Ok(input);
1312 }
1313 if let Some(error) = inner.terminal.clone() {
1314 return Err(error);
1315 }
1316 let now = Instant::now();
1317 if now >= deadline {
1318 let error = subscription_timeout_error("stream ref source");
1319 inner.terminal = Some(error.clone());
1320 if let Some(sender) = inner.completion.take() {
1321 let _ = sender.send(Err(error.clone()));
1322 }
1323 return Err(error);
1324 }
1325 let remaining = deadline.saturating_duration_since(now);
1326 let (next, _) =
1327 wait_timeout_unpoison(&self.changed, inner, remaining.min(STREAM_REF_WAIT_POLL));
1328 inner = next;
1329 }
1330 }
1331
1332 fn settle(&self, result: StreamResult<NotUsed>) {
1333 let mut inner = self.lock();
1334 if let Some(sender) = inner.completion.take() {
1335 let _ = sender.send(result);
1336 }
1337 inner.completion_cancelled = None;
1338 drop(inner);
1339 self.changed.notify_all();
1340 }
1341
1342 fn fail_unattached(&self, error: StreamError) {
1343 let mut inner = self.lock();
1344 if inner.terminal.is_none() {
1345 let input = inner.input.take();
1346 inner.terminal = Some(error.clone());
1347 if let Some(sender) = inner.completion.take() {
1348 let _ = sender.send(Err(error));
1349 }
1350 inner.completion_cancelled = None;
1351 drop(inner);
1352 drop(input);
1353 self.changed.notify_all();
1354 }
1355 }
1356
1357 fn is_completion_cancelled(&self) -> bool {
1358 self.lock()
1359 .completion_cancelled
1360 .as_ref()
1361 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
1362 }
1363}
1364
1365struct SinkRefDirectStream<T>
1366where
1367 T: Send + 'static,
1368{
1369 state: Arc<SinkRefDirectState<T>>,
1370 input: Option<BoxStream<T>>,
1371 queue: VecDeque<T>,
1372 terminal: Option<ConsumerTerminal>,
1373 settings: StreamRefSettings,
1374 terminated: bool,
1375}
1376
1377impl<T> SinkRefDirectStream<T>
1378where
1379 T: Send + 'static,
1380{
1381 fn new(state: Arc<SinkRefDirectState<T>>, settings: StreamRefSettings) -> Self {
1382 Self {
1383 state,
1384 input: None,
1385 queue: VecDeque::new(),
1386 terminal: None,
1387 settings,
1388 terminated: false,
1389 }
1390 }
1391
1392 fn ensure_input(&mut self) -> StreamResult<()> {
1393 if self.input.is_none() && self.terminal.is_none() {
1394 self.input = Some(self.state.wait_for_input(self.settings)?);
1395 }
1396 Ok(())
1397 }
1398
1399 fn fill_prefetch(&mut self) -> StreamResult<()> {
1400 self.ensure_input()?;
1401 while self.terminal.is_none() && self.queue.len() < self.settings.buffer_capacity {
1402 if self.state.is_completion_cancelled() {
1403 self.input.take();
1404 self.state.settle(Err(StreamError::Cancelled));
1405 self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
1406 break;
1407 }
1408 let Some(input) = self.input.as_mut() else {
1409 self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
1410 break;
1411 };
1412 match input.next() {
1413 Some(Ok(item)) => self.queue.push_back(item),
1414 Some(Err(error)) => {
1415 self.input.take();
1416 self.state.settle(Err(error.clone()));
1417 self.terminal = Some(ConsumerTerminal::Error(error));
1418 }
1419 None => {
1420 self.input.take();
1421 self.state.settle(Ok(NotUsed));
1422 self.terminal = Some(ConsumerTerminal::Complete);
1423 }
1424 }
1425 }
1426 Ok(())
1427 }
1428}
1429
1430impl<T> Iterator for SinkRefDirectStream<T>
1431where
1432 T: Send + 'static,
1433{
1434 type Item = StreamResult<T>;
1435
1436 fn next(&mut self) -> Option<Self::Item> {
1437 if self.terminated {
1438 return None;
1439 }
1440
1441 if let Err(error) = self.fill_prefetch() {
1442 self.terminated = true;
1443 return Some(Err(error));
1444 }
1445 if let Some(item) = self.queue.pop_front() {
1446 if let Err(error) = self.fill_prefetch() {
1447 self.terminal = Some(ConsumerTerminal::Error(error));
1448 }
1449 return Some(Ok(item));
1450 }
1451
1452 match self.terminal.clone() {
1453 Some(ConsumerTerminal::Complete) => {
1454 self.terminated = true;
1455 None
1456 }
1457 Some(ConsumerTerminal::Error(error)) => {
1458 self.terminated = true;
1459 Some(Err(error))
1460 }
1461 None => None,
1462 }
1463 }
1464}
1465
1466impl<T> Drop for SinkRefDirectStream<T>
1467where
1468 T: Send + 'static,
1469{
1470 fn drop(&mut self) {
1471 if !self.terminated {
1472 self.input.take();
1473 self.queue.clear();
1474 self.state.fail_unattached(StreamError::Cancelled);
1475 self.state.settle(Err(StreamError::Cancelled));
1476 }
1477 }
1478}
1479
1480#[allow(dead_code)]
1481struct ProducerShared<T> {
1482 inner: Mutex<ProducerInner<T>>,
1483 changed: Condvar,
1484}
1485
1486#[allow(dead_code)]
1487struct ProducerInner<T> {
1488 consumer: Option<ConsumerEndpoint<T>>,
1489 cumulative_demand: u64,
1490 sent: u64,
1491 stopped: Option<StreamError>,
1492}
1493
1494#[allow(dead_code)]
1495impl<T> ProducerShared<T>
1496where
1497 T: stream_ref_actor_bound::Bound + Send + 'static,
1498{
1499 fn new() -> Self {
1500 Self {
1501 inner: Mutex::new(ProducerInner {
1502 consumer: None,
1503 cumulative_demand: 0,
1504 sent: 0,
1505 stopped: None,
1506 }),
1507 changed: Condvar::new(),
1508 }
1509 }
1510
1511 fn lock(&self) -> MutexGuard<'_, ProducerInner<T>> {
1512 self.inner
1513 .lock()
1514 .unwrap_or_else(|poison| poison.into_inner())
1515 }
1516
1517 fn set_consumer(
1518 &self,
1519 consumer: ActorRef<ConsumerCommand<T>>,
1520 ) -> Result<bool, ActorRef<ConsumerCommand<T>>> {
1521 let mut inner = self.lock();
1522 match &inner.consumer {
1523 Some(ConsumerEndpoint::Actor(existing)) if same_actor(existing, &consumer) => Ok(false),
1524 Some(_) => Err(consumer),
1525 None if inner.stopped.is_some() => Err(consumer),
1526 None => {
1527 inner.consumer = Some(ConsumerEndpoint::Actor(consumer));
1528 drop(inner);
1529 self.changed.notify_all();
1530 Ok(true)
1531 }
1532 }
1533 }
1534
1535 fn set_direct_consumer(&self, consumer: &Arc<ConsumerShared<T>>) -> StreamResult<bool> {
1536 let mut inner = self.lock();
1537 match &inner.consumer {
1538 Some(ConsumerEndpoint::Direct(existing))
1539 if existing.ptr_eq(&Arc::downgrade(consumer)) =>
1540 {
1541 Ok(false)
1542 }
1543 Some(_) => Err(StreamError::Failed(
1544 "stream ref was already subscribed by another endpoint".to_owned(),
1545 )),
1546 None => {
1547 if let Some(error) = inner.stopped.clone() {
1548 return Err(error);
1549 }
1550 inner.consumer = Some(ConsumerEndpoint::Direct(Arc::downgrade(consumer)));
1551 drop(inner);
1552 self.changed.notify_all();
1553 Ok(true)
1554 }
1555 }
1556 }
1557
1558 fn update_demand(&self, consumer: &ActorRef<ConsumerCommand<T>>, cumulative: u64) {
1559 self.update_demand_if(cumulative, |existing| {
1560 matches!(existing, ConsumerEndpoint::Actor(actor) if same_actor(actor, consumer))
1561 });
1562 }
1563
1564 fn update_direct_demand(&self, consumer: &Arc<ConsumerShared<T>>, cumulative: u64) {
1565 let consumer = Arc::downgrade(consumer);
1566 self.update_demand_if(cumulative, |existing| {
1567 matches!(existing, ConsumerEndpoint::Direct(shared) if shared.ptr_eq(&consumer))
1568 });
1569 }
1570
1571 fn update_demand_if(
1572 &self,
1573 cumulative: u64,
1574 matches_consumer: impl FnOnce(&ConsumerEndpoint<T>) -> bool,
1575 ) {
1576 let mut inner = self.lock();
1577 if inner.consumer.as_ref().is_some_and(matches_consumer)
1578 && cumulative > inner.cumulative_demand
1579 {
1580 inner.cumulative_demand = cumulative;
1581 drop(inner);
1582 self.changed.notify_all();
1583 }
1584 }
1585
1586 fn stop_from_consumer(&self, consumer: &ActorRef<ConsumerCommand<T>>, error: StreamError) {
1587 self.stop_from_consumer_if(error, |existing| {
1588 matches!(existing, ConsumerEndpoint::Actor(actor) if same_actor(actor, consumer))
1589 });
1590 }
1591
1592 fn stop_from_direct_consumer(&self, consumer: &Arc<ConsumerShared<T>>, error: StreamError) {
1593 let consumer = Arc::downgrade(consumer);
1594 self.stop_from_consumer_if(error, |existing| {
1595 matches!(existing, ConsumerEndpoint::Direct(shared) if shared.ptr_eq(&consumer))
1596 });
1597 }
1598
1599 fn stop_from_consumer_if(
1600 &self,
1601 error: StreamError,
1602 matches_consumer: impl FnOnce(&ConsumerEndpoint<T>) -> bool,
1603 ) {
1604 let mut inner = self.lock();
1605 if inner.consumer.as_ref().is_none_or(matches_consumer) && inner.stopped.is_none() {
1606 inner.stopped = Some(error);
1607 drop(inner);
1608 self.changed.notify_all();
1609 }
1610 }
1611
1612 fn stop_unless_finished(&self, error: StreamError) {
1613 let mut inner = self.lock();
1614 if inner.stopped.is_none() {
1615 inner.stopped = Some(error);
1616 drop(inner);
1617 self.changed.notify_all();
1618 }
1619 }
1620}
1621
1622#[allow(dead_code)]
1623struct ConsumerShared<T> {
1624 inner: Mutex<ConsumerInner<T>>,
1625 changed: Condvar,
1626}
1627
1628#[allow(dead_code)]
1629struct ConsumerInner<T> {
1630 producer: Option<ProducerEndpoint<T>>,
1631 queue: VecDeque<T>,
1632 terminal: Option<ConsumerTerminal>,
1633 expected_seq: u64,
1634 delivered: u64,
1635 cumulative_demand: u64,
1636}
1637
1638#[derive(Clone)]
1639enum ConsumerTerminal {
1640 Complete,
1641 Error(StreamError),
1642}
1643
1644#[allow(dead_code)]
1645impl<T> ConsumerShared<T>
1646where
1647 T: stream_ref_actor_bound::Bound + Send + 'static,
1648{
1649 fn new(_settings: StreamRefSettings) -> Self {
1650 Self {
1651 inner: Mutex::new(ConsumerInner {
1652 producer: None,
1653 queue: VecDeque::new(),
1654 terminal: None,
1655 expected_seq: 0,
1656 delivered: 0,
1657 cumulative_demand: 0,
1658 }),
1659 changed: Condvar::new(),
1660 }
1661 }
1662
1663 fn lock(&self) -> MutexGuard<'_, ConsumerInner<T>> {
1664 self.inner
1665 .lock()
1666 .unwrap_or_else(|poison| poison.into_inner())
1667 }
1668
1669 fn set_producer(&self, producer: ActorRef<ProducerCommand<T>>) -> bool {
1670 let mut inner = self.lock();
1671 match &inner.producer {
1672 Some(ProducerEndpoint::Actor(existing)) => same_actor(existing, &producer),
1673 Some(ProducerEndpoint::Direct(_)) => false,
1674 None => {
1675 inner.producer = Some(ProducerEndpoint::Actor(producer));
1676 drop(inner);
1677 self.changed.notify_all();
1678 true
1679 }
1680 }
1681 }
1682
1683 fn set_direct_producer(&self, producer: &Arc<ProducerShared<T>>) -> StreamResult<bool> {
1684 let mut inner = self.lock();
1685 match &inner.producer {
1686 Some(ProducerEndpoint::Direct(existing))
1687 if existing.ptr_eq(&Arc::downgrade(producer)) =>
1688 {
1689 Ok(false)
1690 }
1691 Some(_) => Err(StreamError::Failed(
1692 "stream ref was already subscribed by another endpoint".to_owned(),
1693 )),
1694 None => {
1695 if let Some(terminal) = inner.terminal.clone() {
1696 return match terminal {
1697 ConsumerTerminal::Complete => Err(StreamError::ActorTerminated),
1698 ConsumerTerminal::Error(error) => Err(error),
1699 };
1700 }
1701 inner.producer = Some(ProducerEndpoint::Direct(Arc::downgrade(producer)));
1702 drop(inner);
1703 self.changed.notify_all();
1704 Ok(true)
1705 }
1706 }
1707 }
1708
1709 fn push(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64, item: T) {
1710 self.push_if(seq, item, |inner| producer_matches_actor(inner, producer));
1711 }
1712
1713 fn push_direct(&self, producer: &Arc<ProducerShared<T>>, seq: u64, item: T) {
1714 let producer = Arc::downgrade(producer);
1715 self.push_if(seq, item, |inner| producer_matches_direct(inner, &producer));
1716 }
1717
1718 fn push_if(&self, seq: u64, item: T, matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool) {
1719 let mut inner = self.lock();
1720 if inner.terminal.is_some() || !matches_producer(&inner) {
1721 return;
1722 }
1723 let should_notify;
1724 if seq != inner.expected_seq {
1725 inner.queue.clear();
1726 inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
1727 inner.expected_seq,
1728 seq,
1729 "stream ref element sequence gap",
1730 )));
1731 should_notify = true;
1732 } else {
1733 should_notify = inner.queue.is_empty();
1734 inner.expected_seq += 1;
1735 inner.queue.push_back(item);
1736 }
1737 drop(inner);
1738 if should_notify {
1739 self.changed.notify_all();
1740 }
1741 }
1742
1743 fn complete(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64) {
1744 self.complete_if(seq, |inner| producer_matches_actor(inner, producer));
1745 }
1746
1747 fn complete_direct(&self, producer: &Arc<ProducerShared<T>>, seq: u64) {
1748 let producer = Arc::downgrade(producer);
1749 self.complete_if(seq, |inner| producer_matches_direct(inner, &producer));
1750 }
1751
1752 fn complete_if(&self, seq: u64, matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool) {
1753 let mut inner = self.lock();
1754 if inner.terminal.is_some() || !matches_producer(&inner) {
1755 return;
1756 }
1757 if seq != inner.expected_seq {
1758 inner.queue.clear();
1759 inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
1760 inner.expected_seq,
1761 seq,
1762 "stream ref completion sequence gap",
1763 )));
1764 } else {
1765 inner.terminal = Some(ConsumerTerminal::Complete);
1766 }
1767 drop(inner);
1768 self.changed.notify_all();
1769 }
1770
1771 fn fail(&self, producer: &ActorRef<ProducerCommand<T>>, error: StreamError) {
1772 self.fail_if(error, |inner| producer_matches_actor(inner, producer));
1773 }
1774
1775 fn fail_direct(&self, producer: &Arc<ProducerShared<T>>, error: StreamError) {
1776 let producer = Arc::downgrade(producer);
1777 self.fail_if(error, |inner| producer_matches_direct(inner, &producer));
1778 }
1779
1780 fn fail_if(
1781 &self,
1782 error: StreamError,
1783 matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool,
1784 ) {
1785 let mut inner = self.lock();
1786 if inner.terminal.is_some() || !matches_producer(&inner) {
1787 return;
1788 }
1789 inner.queue.clear();
1790 inner.terminal = Some(ConsumerTerminal::Error(error));
1791 drop(inner);
1792 self.changed.notify_all();
1793 }
1794
1795 fn fail_local(&self, error: StreamError) {
1796 let mut inner = self.lock();
1797 if inner.terminal.is_none() {
1798 inner.queue.clear();
1799 inner.terminal = Some(ConsumerTerminal::Error(error));
1800 drop(inner);
1801 self.changed.notify_all();
1802 }
1803 }
1804}
1805
1806#[allow(dead_code)]
1807fn producer_matches_actor<T>(
1808 inner: &ConsumerInner<T>,
1809 producer: &ActorRef<ProducerCommand<T>>,
1810) -> bool
1811where
1812 T: stream_ref_actor_bound::Bound + Send + 'static,
1813{
1814 matches!(
1815 inner.producer.as_ref(),
1816 Some(ProducerEndpoint::Actor(existing)) if same_actor(existing, producer)
1817 )
1818}
1819
1820#[allow(dead_code)]
1821fn producer_matches_direct<T>(inner: &ConsumerInner<T>, producer: &Weak<ProducerShared<T>>) -> bool
1822where
1823 T: stream_ref_actor_bound::Bound + Send + 'static,
1824{
1825 matches!(
1826 inner.producer.as_ref(),
1827 Some(ProducerEndpoint::Direct(existing)) if existing.ptr_eq(producer)
1828 )
1829}
1830
1831#[allow(dead_code)]
1832struct ProducerActor<T> {
1833 shared: Arc<ProducerShared<T>>,
1834 initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
1835 settings: StreamRefSettings,
1836 #[cfg(feature = "cluster")]
1837 endpoint: Option<RemoteStreamRefEndpoint>,
1838}
1839
1840impl<T> Actor for ProducerActor<T>
1841where
1842 T: stream_ref_actor_bound::Bound + Send + 'static,
1843{
1844 type Msg = ProducerCommand<T>;
1845 type State = ();
1846 type Arguments = ();
1847
1848 async fn pre_start(
1849 &self,
1850 myself: ActorRef<Self::Msg>,
1851 _args: Self::Arguments,
1852 ) -> Result<Self::State, ActorProcessingErr> {
1853 if let Some(consumer) = &self.initial_consumer {
1854 register_producer_consumer(
1855 &self.shared,
1856 myself,
1857 consumer.clone(),
1858 #[cfg(feature = "cluster")]
1859 self.endpoint.as_ref(),
1860 );
1861 }
1862 Ok(())
1863 }
1864
1865 async fn handle(
1866 &self,
1867 myself: ActorRef<Self::Msg>,
1868 message: Self::Msg,
1869 _state: &mut Self::State,
1870 ) -> ActorResult {
1871 match message {
1872 ProducerCommand::Subscribe { consumer } => {
1873 register_producer_consumer(
1874 &self.shared,
1875 myself,
1876 consumer,
1877 #[cfg(feature = "cluster")]
1878 self.endpoint.as_ref(),
1879 );
1880 }
1881 #[cfg(feature = "cluster")]
1882 ProducerCommand::SubscribeRemote { consumer } => {
1883 match resolve_remote_actor(&consumer, self.settings.subscription_timeout) {
1884 Ok(consumer) => register_producer_consumer(
1885 &self.shared,
1886 myself,
1887 consumer,
1888 self.endpoint.as_ref(),
1889 ),
1890 Err(error) => self.shared.stop_unless_finished(error),
1891 }
1892 }
1893 ProducerCommand::Demand {
1894 consumer,
1895 cumulative,
1896 } => self.shared.update_demand(&consumer, cumulative),
1897 #[cfg(feature = "cluster")]
1898 ProducerCommand::DemandRemote {
1899 consumer,
1900 cumulative,
1901 } => {
1902 if let Ok(consumer) =
1903 resolve_remote_actor(&consumer, self.settings.subscription_timeout)
1904 {
1905 self.shared.update_demand(&consumer, cumulative);
1906 }
1907 }
1908 ProducerCommand::Cancel { consumer } => {
1909 self.shared
1910 .stop_from_consumer(&consumer, StreamError::Cancelled);
1911 }
1912 #[cfg(feature = "cluster")]
1913 ProducerCommand::CancelRemote { consumer } => {
1914 if let Ok(consumer) =
1915 resolve_remote_actor(&consumer, self.settings.subscription_timeout)
1916 {
1917 self.shared
1918 .stop_from_consumer(&consumer, StreamError::Cancelled);
1919 }
1920 }
1921 ProducerCommand::RemoteFailure { consumer, error } => {
1922 self.shared.stop_from_consumer(&consumer, error);
1923 }
1924 #[cfg(feature = "cluster")]
1925 ProducerCommand::RemoteFailureRemote { consumer, error } => {
1926 if let Ok(consumer) =
1927 resolve_remote_actor(&consumer, self.settings.subscription_timeout)
1928 {
1929 self.shared.stop_from_consumer(&consumer, error);
1930 }
1931 }
1932 ProducerCommand::Ack => myself.stop(None),
1933 }
1934 Ok(())
1935 }
1936
1937 async fn post_stop(
1938 &self,
1939 _myself: ActorRef<Self::Msg>,
1940 _state: &mut Self::State,
1941 ) -> ActorResult {
1942 self.shared
1943 .stop_unless_finished(StreamError::ActorTerminated);
1944 Ok(())
1945 }
1946}
1947
1948#[allow(dead_code)]
1949fn register_producer_consumer<T>(
1950 shared: &Arc<ProducerShared<T>>,
1951 producer: ActorRef<ProducerCommand<T>>,
1952 consumer: ActorRef<ConsumerCommand<T>>,
1953 #[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
1954) where
1955 T: stream_ref_actor_bound::Bound + Send + 'static,
1956{
1957 match shared.set_consumer(consumer.clone()) {
1958 Ok(true) | Ok(false) => {
1959 let _ = cast_consumer_on_subscribe(
1960 &consumer,
1961 producer.clone(),
1962 #[cfg(feature = "cluster")]
1963 producer_endpoint,
1964 );
1965 }
1966 Err(duplicate) => {
1967 let _ = cast_consumer_failure(
1968 &duplicate,
1969 producer,
1970 #[cfg(feature = "cluster")]
1971 producer_endpoint,
1972 StreamError::Failed(
1973 "stream ref was already subscribed by another endpoint".to_owned(),
1974 ),
1975 );
1976 }
1977 }
1978}
1979
1980#[allow(dead_code)]
1981fn register_direct_producer_consumer<T>(
1982 producer: &Arc<ProducerShared<T>>,
1983 consumer: &Arc<ConsumerShared<T>>,
1984) -> StreamResult<()>
1985where
1986 T: stream_ref_actor_bound::Bound + Send + 'static,
1987{
1988 consumer.set_direct_producer(producer)?;
1989 if let Err(error) = producer.set_direct_consumer(consumer) {
1990 consumer.fail_local(error.clone());
1991 return Err(error);
1992 }
1993 Ok(())
1994}
1995
1996#[allow(dead_code)]
1997struct ConsumerActor<T> {
1998 shared: Arc<ConsumerShared<T>>,
1999 initial_producer: Option<ActorRef<ProducerCommand<T>>>,
2000 settings: StreamRefSettings,
2001 #[cfg(feature = "cluster")]
2002 endpoint: Option<RemoteStreamRefEndpoint>,
2003}
2004
2005impl<T> Actor for ConsumerActor<T>
2006where
2007 T: stream_ref_actor_bound::Bound + Send + 'static,
2008{
2009 type Msg = ConsumerCommand<T>;
2010 type State = ();
2011 type Arguments = ();
2012
2013 async fn pre_start(
2014 &self,
2015 myself: ActorRef<Self::Msg>,
2016 _args: Self::Arguments,
2017 ) -> Result<Self::State, ActorProcessingErr> {
2018 if let Some(producer) = &self.initial_producer {
2019 self.shared.set_producer(producer.clone());
2020 if let Err(error) = cast_producer_subscribe(
2021 producer,
2022 myself.clone(),
2023 #[cfg(feature = "cluster")]
2024 self.endpoint.as_ref(),
2025 ) {
2026 self.shared.fail_local(error);
2027 myself.stop(None);
2028 }
2029 }
2030 Ok(())
2031 }
2032
2033 async fn handle(
2034 &self,
2035 _myself: ActorRef<Self::Msg>,
2036 message: Self::Msg,
2037 _state: &mut Self::State,
2038 ) -> ActorResult {
2039 match message {
2040 ConsumerCommand::OnSubscribe { producer } => {
2041 if !self.shared.set_producer(producer.clone()) {
2042 let _ = cast_producer_remote_failure(
2043 &producer,
2044 _myself.clone(),
2045 #[cfg(feature = "cluster")]
2046 self.endpoint.as_ref(),
2047 StreamError::Failed(
2048 "stream ref was already subscribed by another endpoint".to_owned(),
2049 ),
2050 );
2051 }
2052 }
2053 #[cfg(feature = "cluster")]
2054 ConsumerCommand::OnSubscribeRemote { producer } => {
2055 match resolve_remote_actor(&producer, self.settings.subscription_timeout) {
2056 Ok(producer) => {
2057 if !self.shared.set_producer(producer.clone()) {
2058 let _ = cast_producer_remote_failure(
2059 &producer,
2060 _myself.clone(),
2061 self.endpoint.as_ref(),
2062 StreamError::Failed(
2063 "stream ref was already subscribed by another endpoint"
2064 .to_owned(),
2065 ),
2066 );
2067 }
2068 }
2069 Err(error) => self.shared.fail_local(error),
2070 }
2071 }
2072 ConsumerCommand::Element {
2073 producer,
2074 seq,
2075 item,
2076 } => self.shared.push(&producer, seq, item),
2077 #[cfg(feature = "cluster")]
2078 ConsumerCommand::ElementRemote {
2079 producer,
2080 seq,
2081 item,
2082 } => {
2083 if let Ok(producer) =
2084 resolve_remote_actor(&producer, self.settings.subscription_timeout)
2085 {
2086 self.shared.push(&producer, seq, item);
2087 }
2088 }
2089 ConsumerCommand::Complete { producer, seq } => {
2090 self.shared.complete(&producer, seq);
2091 let _ = cast_actor(&producer, ProducerCommand::Ack);
2092 }
2093 #[cfg(feature = "cluster")]
2094 ConsumerCommand::CompleteRemote { producer, seq } => {
2095 if let Ok(producer) =
2096 resolve_remote_actor(&producer, self.settings.subscription_timeout)
2097 {
2098 self.shared.complete(&producer, seq);
2099 let _ = cast_actor(&producer, ProducerCommand::Ack);
2100 }
2101 }
2102 ConsumerCommand::Failure { producer, error } => {
2103 self.shared.fail(&producer, error);
2104 let _ = cast_actor(&producer, ProducerCommand::Ack);
2105 }
2106 #[cfg(feature = "cluster")]
2107 ConsumerCommand::FailureRemote { producer, error } => {
2108 if let Ok(producer) =
2109 resolve_remote_actor(&producer, self.settings.subscription_timeout)
2110 {
2111 self.shared.fail(&producer, error);
2112 let _ = cast_actor(&producer, ProducerCommand::Ack);
2113 }
2114 }
2115 }
2116 Ok(())
2117 }
2118
2119 async fn post_stop(
2120 &self,
2121 _myself: ActorRef<Self::Msg>,
2122 _state: &mut Self::State,
2123 ) -> ActorResult {
2124 self.shared.fail_local(StreamError::ActorTerminated);
2125 Ok(())
2126 }
2127}
2128
2129#[allow(dead_code)]
2130struct ConsumerStream<T>
2131where
2132 T: stream_ref_actor_bound::Bound + Send + 'static,
2133{
2134 shared: Arc<ConsumerShared<T>>,
2135 actor_ref: Option<ActorRef<ConsumerCommand<T>>>,
2136 #[cfg(feature = "cluster")]
2137 endpoint: Option<RemoteStreamRefEndpoint>,
2138 settings: StreamRefSettings,
2139 terminated: bool,
2140 source_ref_keep_alive: Option<Arc<SourceRefInner<T>>>,
2141}
2142
2143impl<T> Iterator for ConsumerStream<T>
2144where
2145 T: stream_ref_actor_bound::Bound + Send + 'static,
2146{
2147 type Item = StreamResult<T>;
2148
2149 fn next(&mut self) -> Option<Self::Item> {
2150 if self.terminated {
2151 return None;
2152 }
2153
2154 if let Err(error) = self.wait_for_subscription() {
2155 self.terminated = true;
2156 return Some(Err(error));
2157 }
2158
2159 if let Err(error) = self.redeliver_or_extend_demand() {
2160 self.terminated = true;
2161 return Some(Err(error));
2162 }
2163
2164 let mut next_redelivery = next_redelivery_deadline(self.settings);
2165 loop {
2166 let demand_after_pop = {
2167 let mut inner = self.shared.lock();
2168 if let Some(item) = inner.queue.pop_front() {
2169 inner.delivered = inner.delivered.saturating_add(1);
2170 let demand = next_demand(&mut inner, self.settings);
2171 drop(inner);
2172 if let Some((producer, cumulative)) = demand
2173 && let Err(error) = send_demand(
2174 &self.shared,
2175 &self.actor_ref,
2176 #[cfg(feature = "cluster")]
2177 &self.endpoint,
2178 &producer,
2179 cumulative,
2180 )
2181 {
2182 self.terminated = true;
2183 return Some(Err(error));
2184 }
2185 return Some(Ok(item));
2186 }
2187
2188 if let Some(terminal) = inner.terminal.clone() {
2189 drop(inner);
2190 match terminal {
2191 ConsumerTerminal::Complete => {
2192 self.terminated = true;
2193 self.stop_actor();
2194 return None;
2195 }
2196 ConsumerTerminal::Error(error) => {
2197 self.terminated = true;
2198 self.stop_actor();
2199 return Some(Err(error));
2200 }
2201 }
2202 }
2203 None::<(ProducerEndpoint<T>, u64)>
2204 };
2205 debug_assert!(demand_after_pop.is_none());
2206
2207 let now = Instant::now();
2208 let timeout = next_redelivery.saturating_duration_since(now);
2209 let mut inner = self.shared.lock();
2210 if !inner.queue.is_empty() || inner.terminal.is_some() {
2211 continue;
2212 }
2213 let (next_inner, result) = wait_timeout_unpoison(
2214 &self.shared.changed,
2215 inner,
2216 timeout.min(STREAM_REF_WAIT_POLL),
2217 );
2218 inner = next_inner;
2219 drop(inner);
2220 if result.timed_out() && Instant::now() >= next_redelivery {
2221 if let Err(error) = self.redeliver_demand() {
2222 self.terminated = true;
2223 return Some(Err(error));
2224 }
2225 next_redelivery = next_redelivery_deadline(self.settings);
2226 }
2227 }
2228 }
2229}
2230
2231#[allow(dead_code)]
2232impl<T> ConsumerStream<T>
2233where
2234 T: stream_ref_actor_bound::Bound + Send + 'static,
2235{
2236 fn wait_for_subscription(&self) -> StreamResult<()> {
2237 let deadline = Instant::now()
2238 .checked_add(self.settings.subscription_timeout)
2239 .unwrap_or_else(far_future);
2240 let mut inner = self.shared.lock();
2241 loop {
2242 if inner.producer.is_some() {
2243 return Ok(());
2244 }
2245 if let Some(terminal) = inner.terminal.clone() {
2246 return match terminal {
2247 ConsumerTerminal::Complete => Ok(()),
2248 ConsumerTerminal::Error(error) => Err(error),
2249 };
2250 }
2251 let now = Instant::now();
2252 if now >= deadline {
2253 drop(inner);
2254 let error = subscription_timeout_error("stream ref source");
2255 self.shared.fail_local(error.clone());
2256 self.stop_actor_ref();
2257 return Err(error);
2258 }
2259 let remaining = deadline.saturating_duration_since(now);
2260 let (next, _) = wait_timeout_unpoison(
2261 &self.shared.changed,
2262 inner,
2263 remaining.min(STREAM_REF_WAIT_POLL),
2264 );
2265 inner = next;
2266 }
2267 }
2268
2269 fn redeliver_or_extend_demand(&self) -> StreamResult<()> {
2270 let demand = {
2271 let mut inner = self.shared.lock();
2272 next_demand(&mut inner, self.settings)
2273 };
2274 if let Some((producer, cumulative)) = demand {
2275 send_demand(
2276 &self.shared,
2277 &self.actor_ref,
2278 #[cfg(feature = "cluster")]
2279 &self.endpoint,
2280 &producer,
2281 cumulative,
2282 )?;
2283 }
2284 Ok(())
2285 }
2286
2287 fn redeliver_demand(&self) -> StreamResult<()> {
2288 let (producer, cumulative) = {
2289 let inner = self.shared.lock();
2290 let Some(producer) = inner.producer.clone() else {
2291 return Ok(());
2292 };
2293 if inner.cumulative_demand == 0 {
2294 return Ok(());
2295 }
2296 (producer, inner.cumulative_demand)
2297 };
2298 send_demand(
2299 &self.shared,
2300 &self.actor_ref,
2301 #[cfg(feature = "cluster")]
2302 &self.endpoint,
2303 &producer,
2304 cumulative,
2305 )
2306 }
2307
2308 fn stop_actor(&mut self) {
2309 if let Some(actor_ref) = self.actor_ref.take() {
2310 actor_ref.stop(None);
2311 }
2312 }
2313
2314 fn stop_actor_ref(&self) {
2315 if let Some(actor_ref) = &self.actor_ref {
2316 actor_ref.stop(None);
2317 }
2318 }
2319}
2320
2321impl<T> Drop for ConsumerStream<T>
2322where
2323 T: stream_ref_actor_bound::Bound + Send + 'static,
2324{
2325 fn drop(&mut self) {
2326 if !self.terminated {
2327 let producer = self.shared.lock().producer.clone();
2328 match producer {
2329 Some(ProducerEndpoint::Actor(producer)) => {
2330 if let Some(consumer) = &self.actor_ref {
2331 let _ = cast_producer_cancel(
2332 &producer,
2333 consumer.clone(),
2334 #[cfg(feature = "cluster")]
2335 self.endpoint.as_ref(),
2336 );
2337 }
2338 }
2339 Some(ProducerEndpoint::Direct(producer)) => {
2340 if let Some(producer) = producer.upgrade() {
2341 producer.stop_from_direct_consumer(&self.shared, StreamError::Cancelled);
2342 }
2343 }
2344 None => {}
2345 }
2346 }
2347 self.stop_actor();
2348 drop(self.source_ref_keep_alive.take());
2349 }
2350}
2351
2352#[allow(dead_code)]
2353fn next_demand<T>(
2354 inner: &mut ConsumerInner<T>,
2355 settings: StreamRefSettings,
2356) -> Option<(ProducerEndpoint<T>, u64)> {
2357 if inner.terminal.is_some() {
2358 return None;
2359 }
2360 let remaining_credit = inner.cumulative_demand.saturating_sub(inner.delivered);
2361 if inner.cumulative_demand != 0 && remaining_credit > demand_replenish_threshold(settings) {
2362 return None;
2363 }
2364 let target = inner
2365 .delivered
2366 .saturating_add(settings.buffer_capacity as u64);
2367 if inner.cumulative_demand >= target {
2368 return None;
2369 }
2370 inner.cumulative_demand = target;
2371 inner
2372 .producer
2373 .as_ref()
2374 .map(|producer| (producer.clone(), inner.cumulative_demand))
2375}
2376
2377#[allow(dead_code)]
2378fn send_demand<T>(
2379 consumer_shared: &Arc<ConsumerShared<T>>,
2380 consumer: &Option<ActorRef<ConsumerCommand<T>>>,
2381 #[cfg(feature = "cluster")] consumer_endpoint: &Option<RemoteStreamRefEndpoint>,
2382 producer: &ProducerEndpoint<T>,
2383 cumulative: u64,
2384) -> StreamResult<()>
2385where
2386 T: stream_ref_actor_bound::Bound + Send + 'static,
2387{
2388 match producer {
2389 ProducerEndpoint::Actor(producer) => {
2390 let Some(consumer) = consumer else {
2391 return Err(StreamError::ActorTerminated);
2392 };
2393 cast_producer_demand(
2394 producer,
2395 consumer.clone(),
2396 #[cfg(feature = "cluster")]
2397 consumer_endpoint.as_ref(),
2398 cumulative,
2399 )
2400 }
2401 ProducerEndpoint::Direct(producer) => {
2402 let Some(producer) = producer.upgrade() else {
2403 return Err(StreamError::ActorTerminated);
2404 };
2405 producer.update_direct_demand(consumer_shared, cumulative);
2406 Ok(())
2407 }
2408 }
2409}
2410
2411#[allow(dead_code)]
2412fn demand_replenish_threshold(settings: StreamRefSettings) -> u64 {
2413 (settings.buffer_capacity as u64) / 2
2414}
2415
2416#[cfg(feature = "cluster")]
2417fn materialize_remote_source_ref<T>(
2418 producer_endpoint: RemoteStreamRefEndpoint,
2419 settings: StreamRefSettings,
2420 keep_alive: Arc<SourceRefInner<T>>,
2421) -> StreamResult<BoxStream<T>>
2422where
2423 T: BytesConvertable + Send + 'static,
2424{
2425 let producer = resolve_remote_actor(&producer_endpoint, settings.subscription_timeout)?;
2426 let shared = Arc::new(ConsumerShared::new(settings));
2427 let consumer_endpoint = RemoteStreamRefEndpoint::new("consumer");
2428 let (consumer, _handle) = spawn_consumer_actor(
2429 None,
2430 Arc::clone(&shared),
2431 settings,
2432 Some(consumer_endpoint.clone()),
2433 )?;
2434 join_remote_endpoint(&consumer_endpoint, &consumer);
2435 cast_producer_subscribe(&producer, consumer.clone(), Some(&consumer_endpoint))?;
2436 Ok(Box::new(ConsumerStream {
2437 shared,
2438 actor_ref: Some(consumer),
2439 endpoint: Some(consumer_endpoint),
2440 settings,
2441 terminated: false,
2442 source_ref_keep_alive: Some(keep_alive),
2443 }) as BoxStream<T>)
2444}
2445
2446#[cfg(feature = "cluster")]
2447fn materialize_remote_sink_ref<T>(
2448 consumer_endpoint: RemoteStreamRefEndpoint,
2449 settings: StreamRefSettings,
2450 input: BoxStream<T>,
2451) -> StreamResult<StreamCompletion<NotUsed>>
2452where
2453 T: BytesConvertable + Send + 'static,
2454{
2455 let consumer = resolve_remote_actor(&consumer_endpoint, settings.subscription_timeout)?;
2456 let shared = Arc::new(ProducerShared::new());
2457 let producer_endpoint = RemoteStreamRefEndpoint::new("producer");
2458 let (producer, _handle) = spawn_producer_actor(
2459 None,
2460 Arc::clone(&shared),
2461 settings,
2462 Some(producer_endpoint.clone()),
2463 )?;
2464 join_remote_endpoint(&producer_endpoint, &producer);
2465 register_producer_consumer(
2466 &shared,
2467 producer.clone(),
2468 consumer,
2469 Some(&producer_endpoint),
2470 );
2471 let cancelled = StreamCancellation::for_external_completion();
2472 let cancel_flag = cancelled.cancelled();
2473 let guard_cancel_flag = Arc::clone(&cancel_flag);
2474 let (sender, receiver) = oneshot::channel();
2475 let guard_producer = producer.clone();
2476 let endpoint_handle = ProducerEndpointHandle::Actor {
2477 actor: producer,
2478 shared: Arc::clone(&shared),
2479 endpoint: Some(producer_endpoint),
2480 };
2481 let guard = ProducerEndpointDropGuard::new(guard_producer, Some(guard_cancel_flag));
2482 std::thread::spawn(move || {
2483 let _guard = guard;
2484 let result = run_producer_endpoint(input, shared, endpoint_handle, settings, cancel_flag);
2485 let _ = sender.send(result);
2486 });
2487 Ok(StreamCompletion::from_receiver(receiver, Some(cancelled)))
2488}
2489
2490fn stream_ref_source_sink<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
2491where
2492 T: Send + 'static,
2493{
2494 Sink::from_runner(move |input, materializer| {
2495 let direct = Arc::new(SourceRefDirectState::new(input));
2496 let direct_for_timeout = Arc::clone(&direct);
2497 let timeout = materializer.schedule_once(settings.subscription_timeout, move || {
2498 direct_for_timeout.timeout_if_unsubscribed();
2499 });
2500 Ok(SourceRef {
2501 inner: Arc::new(SourceRefInner {
2502 producer: LazyProducerStatus::new(Arc::clone(&direct)),
2503 direct,
2504 settings,
2505 subscribed: AtomicBool::new(false),
2506 timeout: Some(timeout),
2507 #[cfg(feature = "cluster")]
2508 remote: None,
2509 }),
2510 })
2511 })
2512}
2513
2514fn stream_ref_sink_source<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
2515where
2516 T: Send + 'static,
2517{
2518 Source::from_materialized_factory(move |_materializer| {
2519 let direct = Arc::new(SinkRefDirectState::new());
2520 let sink_ref = SinkRef {
2521 inner: Arc::new(SinkRefInner {
2522 direct: Arc::clone(&direct),
2523 #[cfg(feature = "cluster")]
2524 settings,
2525 subscribed: AtomicBool::new(false),
2526 #[cfg(feature = "cluster")]
2527 remote: None,
2528 }),
2529 };
2530 Ok((
2531 Box::new(SinkRefDirectStream::new(direct, settings)) as BoxStream<T>,
2532 sink_ref,
2533 ))
2534 })
2535}
2536
2537#[allow(dead_code)]
2538fn send_element_to_consumer<T>(
2539 consumer: &ConsumerEndpoint<T>,
2540 producer: &ProducerEndpointHandle<T>,
2541 seq: u64,
2542 item: T,
2543) -> StreamResult<()>
2544where
2545 T: stream_ref_actor_bound::Bound + Send + 'static,
2546{
2547 match consumer {
2548 ConsumerEndpoint::Actor(consumer) => {
2549 let Some(producer_actor) = producer.actor_ref() else {
2550 return Err(StreamError::ActorTerminated);
2551 };
2552 #[cfg(feature = "cluster")]
2553 if is_remote_actor(consumer) {
2554 let Some(producer_endpoint) = producer.endpoint() else {
2555 return Err(StreamError::ActorTerminated);
2556 };
2557 return cast_actor(
2558 consumer,
2559 ConsumerCommand::ElementRemote {
2560 producer: producer_endpoint,
2561 seq,
2562 item,
2563 },
2564 );
2565 }
2566 cast_actor(
2567 consumer,
2568 ConsumerCommand::Element {
2569 producer: producer_actor,
2570 seq,
2571 item,
2572 },
2573 )
2574 }
2575 ConsumerEndpoint::Direct(consumer) => {
2576 let Some(consumer) = consumer.upgrade() else {
2577 return Err(StreamError::Cancelled);
2578 };
2579 let Some(producer) = producer.direct_shared() else {
2580 return Err(StreamError::ActorTerminated);
2581 };
2582 consumer.push_direct(&producer, seq, item);
2583 Ok(())
2584 }
2585 }
2586}
2587
2588#[allow(dead_code)]
2589fn fail_consumer<T>(
2590 consumer: &ConsumerEndpoint<T>,
2591 producer: &ProducerEndpointHandle<T>,
2592 error: StreamError,
2593) -> StreamResult<bool>
2594where
2595 T: stream_ref_actor_bound::Bound + Send + 'static,
2596{
2597 match consumer {
2598 ConsumerEndpoint::Actor(consumer) => {
2599 let Some(producer_actor) = producer.actor_ref() else {
2600 return Err(StreamError::ActorTerminated);
2601 };
2602 #[cfg(feature = "cluster")]
2603 if is_remote_actor(consumer) {
2604 let Some(producer_endpoint) = producer.endpoint() else {
2605 return Err(StreamError::ActorTerminated);
2606 };
2607 cast_actor(
2608 consumer,
2609 ConsumerCommand::FailureRemote {
2610 producer: producer_endpoint,
2611 error,
2612 },
2613 )?;
2614 return Ok(false);
2615 }
2616 cast_actor(
2617 consumer,
2618 ConsumerCommand::Failure {
2619 producer: producer_actor,
2620 error,
2621 },
2622 )?;
2623 Ok(false)
2624 }
2625 ConsumerEndpoint::Direct(consumer) => {
2626 let Some(consumer) = consumer.upgrade() else {
2627 return Err(StreamError::Cancelled);
2628 };
2629 let Some(producer) = producer.direct_shared() else {
2630 return Err(StreamError::ActorTerminated);
2631 };
2632 consumer.fail_direct(&producer, error);
2633 Ok(true)
2634 }
2635 }
2636}
2637
2638#[allow(dead_code)]
2639fn complete_consumer<T>(
2640 consumer: &ConsumerEndpoint<T>,
2641 producer: &ProducerEndpointHandle<T>,
2642 seq: u64,
2643) -> StreamResult<bool>
2644where
2645 T: stream_ref_actor_bound::Bound + Send + 'static,
2646{
2647 match consumer {
2648 ConsumerEndpoint::Actor(consumer) => {
2649 let Some(producer_actor) = producer.actor_ref() else {
2650 return Err(StreamError::ActorTerminated);
2651 };
2652 #[cfg(feature = "cluster")]
2653 if is_remote_actor(consumer) {
2654 let Some(producer_endpoint) = producer.endpoint() else {
2655 return Err(StreamError::ActorTerminated);
2656 };
2657 cast_actor(
2658 consumer,
2659 ConsumerCommand::CompleteRemote {
2660 producer: producer_endpoint,
2661 seq,
2662 },
2663 )?;
2664 return Ok(false);
2665 }
2666 cast_actor(
2667 consumer,
2668 ConsumerCommand::Complete {
2669 producer: producer_actor,
2670 seq,
2671 },
2672 )?;
2673 Ok(false)
2674 }
2675 ConsumerEndpoint::Direct(consumer) => {
2676 let Some(consumer) = consumer.upgrade() else {
2677 return Err(StreamError::Cancelled);
2678 };
2679 let Some(producer) = producer.direct_shared() else {
2680 return Err(StreamError::ActorTerminated);
2681 };
2682 consumer.complete_direct(&producer, seq);
2683 Ok(true)
2684 }
2685 }
2686}
2687
2688#[allow(dead_code)]
2689fn run_producer_endpoint<T>(
2690 mut input: BoxStream<T>,
2691 shared: Arc<ProducerShared<T>>,
2692 producer_endpoint: ProducerEndpointHandle<T>,
2693 settings: StreamRefSettings,
2694 cancelled: Arc<AtomicBool>,
2695) -> StreamResult<NotUsed>
2696where
2697 T: stream_ref_actor_bound::Bound + Send + 'static,
2698{
2699 let deadline = Instant::now()
2700 .checked_add(settings.subscription_timeout)
2701 .unwrap_or_else(far_future);
2702
2703 loop {
2704 let consumer = match wait_for_remote_demand(&shared, deadline, &cancelled) {
2705 Ok(consumer) => consumer,
2706 Err(error) => {
2707 producer_endpoint.stop_actor();
2708 return Err(error);
2709 }
2710 };
2711 if cancelled.load(Ordering::SeqCst) {
2712 return Err(StreamError::Cancelled);
2713 }
2714
2715 match input.next() {
2716 Some(Ok(item)) => {
2717 let seq = {
2718 let mut inner = shared.lock();
2719 let seq = inner.sent;
2720 inner.sent = inner.sent.saturating_add(1);
2721 seq
2722 };
2723 if let Err(error) =
2724 send_element_to_consumer(&consumer, &producer_endpoint, seq, item)
2725 {
2726 producer_endpoint.stop_actor();
2727 return Err(match error {
2728 StreamError::ActorTerminated => StreamError::Cancelled,
2729 other => other,
2730 });
2731 }
2732 }
2733 Some(Err(error)) => {
2734 if fail_consumer(&consumer, &producer_endpoint, error.clone()).unwrap_or(false) {
2735 producer_endpoint.stop_actor();
2736 }
2737 return Err(error);
2738 }
2739 None => {
2740 let seq = shared.lock().sent;
2741 if complete_consumer(&consumer, &producer_endpoint, seq).unwrap_or(false) {
2742 producer_endpoint.stop_actor();
2743 }
2744 return Ok(NotUsed);
2745 }
2746 }
2747 }
2748}
2749
2750#[allow(dead_code)]
2751fn wait_for_remote_demand<T>(
2752 shared: &Arc<ProducerShared<T>>,
2753 deadline: Instant,
2754 cancelled: &Arc<AtomicBool>,
2755) -> StreamResult<ConsumerEndpoint<T>>
2756where
2757 T: stream_ref_actor_bound::Bound + Send + 'static,
2758{
2759 let mut inner = shared.lock();
2760 loop {
2761 if cancelled.load(Ordering::SeqCst) {
2762 return Err(StreamError::Cancelled);
2763 }
2764 if let Some(error) = inner.stopped.clone() {
2765 return Err(error);
2766 }
2767 if let Some(consumer) = &inner.consumer
2768 && inner.sent < inner.cumulative_demand
2769 {
2770 return Ok(consumer.clone());
2771 }
2772 let now = Instant::now();
2773 if inner.consumer.is_none() && now >= deadline {
2774 return Err(subscription_timeout_error("stream ref sink"));
2775 }
2776 let remaining = deadline.saturating_duration_since(now);
2777 let timeout = if inner.consumer.is_none() {
2778 remaining.min(STREAM_REF_WAIT_POLL)
2779 } else {
2780 STREAM_REF_WAIT_POLL
2781 };
2782 let (next, _) = wait_timeout_unpoison(&shared.changed, inner, timeout);
2783 inner = next;
2784 }
2785}
2786
2787#[allow(dead_code)]
2788fn spawn_producer_actor<T>(
2789 initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
2790 shared: Arc<ProducerShared<T>>,
2791 settings: StreamRefSettings,
2792 #[cfg(feature = "cluster")] endpoint: Option<RemoteStreamRefEndpoint>,
2793) -> StreamResult<(
2794 ActorRef<ProducerCommand<T>>,
2795 ractor::concurrency::JoinHandle<()>,
2796)>
2797where
2798 T: stream_ref_actor_bound::Bound + Send + 'static,
2799{
2800 block_on_ractor_runtime(Actor::spawn(
2801 None,
2802 ProducerActor {
2803 shared,
2804 initial_consumer,
2805 settings,
2806 #[cfg(feature = "cluster")]
2807 endpoint,
2808 },
2809 (),
2810 ))?
2811 .map_err(|error| {
2812 StreamError::Failed(format!(
2813 "stream ref producer actor failed to spawn: {error}"
2814 ))
2815 })
2816}
2817
2818#[allow(dead_code)]
2819fn spawn_consumer_actor<T>(
2820 initial_producer: Option<ActorRef<ProducerCommand<T>>>,
2821 shared: Arc<ConsumerShared<T>>,
2822 settings: StreamRefSettings,
2823 #[cfg(feature = "cluster")] endpoint: Option<RemoteStreamRefEndpoint>,
2824) -> StreamResult<(
2825 ActorRef<ConsumerCommand<T>>,
2826 ractor::concurrency::JoinHandle<()>,
2827)>
2828where
2829 T: stream_ref_actor_bound::Bound + Send + 'static,
2830{
2831 block_on_ractor_runtime(Actor::spawn(
2832 None,
2833 ConsumerActor {
2834 shared,
2835 initial_producer,
2836 settings,
2837 #[cfg(feature = "cluster")]
2838 endpoint,
2839 },
2840 (),
2841 ))?
2842 .map_err(|error| {
2843 StreamError::Failed(format!(
2844 "stream ref consumer actor failed to spawn: {error}"
2845 ))
2846 })
2847}
2848
2849#[allow(dead_code)]
2850fn cast_producer_subscribe<T>(
2851 producer: &ActorRef<ProducerCommand<T>>,
2852 consumer: ActorRef<ConsumerCommand<T>>,
2853 #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2854) -> StreamResult<()>
2855where
2856 T: stream_ref_actor_bound::Bound + Send + 'static,
2857{
2858 #[cfg(feature = "cluster")]
2859 if is_remote_actor(producer) {
2860 let Some(consumer_endpoint) = consumer_endpoint else {
2861 return Err(StreamError::ActorTerminated);
2862 };
2863 return cast_actor(
2864 producer,
2865 ProducerCommand::SubscribeRemote {
2866 consumer: consumer_endpoint.clone(),
2867 },
2868 );
2869 }
2870
2871 cast_actor(producer, ProducerCommand::Subscribe { consumer })
2872}
2873
2874#[allow(dead_code)]
2875fn cast_producer_demand<T>(
2876 producer: &ActorRef<ProducerCommand<T>>,
2877 consumer: ActorRef<ConsumerCommand<T>>,
2878 #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2879 cumulative: u64,
2880) -> StreamResult<()>
2881where
2882 T: stream_ref_actor_bound::Bound + Send + 'static,
2883{
2884 #[cfg(feature = "cluster")]
2885 if is_remote_actor(producer) {
2886 let Some(consumer_endpoint) = consumer_endpoint else {
2887 return Err(StreamError::ActorTerminated);
2888 };
2889 return cast_actor(
2890 producer,
2891 ProducerCommand::DemandRemote {
2892 consumer: consumer_endpoint.clone(),
2893 cumulative,
2894 },
2895 );
2896 }
2897
2898 cast_actor(
2899 producer,
2900 ProducerCommand::Demand {
2901 consumer,
2902 cumulative,
2903 },
2904 )
2905}
2906
2907#[allow(dead_code)]
2908fn cast_producer_cancel<T>(
2909 producer: &ActorRef<ProducerCommand<T>>,
2910 consumer: ActorRef<ConsumerCommand<T>>,
2911 #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2912) -> StreamResult<()>
2913where
2914 T: stream_ref_actor_bound::Bound + Send + 'static,
2915{
2916 #[cfg(feature = "cluster")]
2917 if is_remote_actor(producer) {
2918 let Some(consumer_endpoint) = consumer_endpoint else {
2919 return Err(StreamError::ActorTerminated);
2920 };
2921 return cast_actor(
2922 producer,
2923 ProducerCommand::CancelRemote {
2924 consumer: consumer_endpoint.clone(),
2925 },
2926 );
2927 }
2928
2929 cast_actor(producer, ProducerCommand::Cancel { consumer })
2930}
2931
2932#[allow(dead_code)]
2933fn cast_producer_remote_failure<T>(
2934 producer: &ActorRef<ProducerCommand<T>>,
2935 consumer: ActorRef<ConsumerCommand<T>>,
2936 #[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
2937 error: StreamError,
2938) -> StreamResult<()>
2939where
2940 T: stream_ref_actor_bound::Bound + Send + 'static,
2941{
2942 #[cfg(feature = "cluster")]
2943 if is_remote_actor(producer) {
2944 let Some(consumer_endpoint) = consumer_endpoint else {
2945 return Err(StreamError::ActorTerminated);
2946 };
2947 return cast_actor(
2948 producer,
2949 ProducerCommand::RemoteFailureRemote {
2950 consumer: consumer_endpoint.clone(),
2951 error,
2952 },
2953 );
2954 }
2955
2956 cast_actor(producer, ProducerCommand::RemoteFailure { consumer, error })
2957}
2958
2959#[allow(dead_code)]
2960fn cast_consumer_on_subscribe<T>(
2961 consumer: &ActorRef<ConsumerCommand<T>>,
2962 producer: ActorRef<ProducerCommand<T>>,
2963 #[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
2964) -> StreamResult<()>
2965where
2966 T: stream_ref_actor_bound::Bound + Send + 'static,
2967{
2968 #[cfg(feature = "cluster")]
2969 if is_remote_actor(consumer) {
2970 let Some(producer_endpoint) = producer_endpoint else {
2971 return Err(StreamError::ActorTerminated);
2972 };
2973 return cast_actor(
2974 consumer,
2975 ConsumerCommand::OnSubscribeRemote {
2976 producer: producer_endpoint.clone(),
2977 },
2978 );
2979 }
2980
2981 cast_actor(consumer, ConsumerCommand::OnSubscribe { producer })
2982}
2983
2984#[allow(dead_code)]
2985fn cast_consumer_failure<T>(
2986 consumer: &ActorRef<ConsumerCommand<T>>,
2987 producer: ActorRef<ProducerCommand<T>>,
2988 #[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
2989 error: StreamError,
2990) -> StreamResult<()>
2991where
2992 T: stream_ref_actor_bound::Bound + Send + 'static,
2993{
2994 #[cfg(feature = "cluster")]
2995 if is_remote_actor(consumer) {
2996 let Some(producer_endpoint) = producer_endpoint else {
2997 return Err(StreamError::ActorTerminated);
2998 };
2999 return cast_actor(
3000 consumer,
3001 ConsumerCommand::FailureRemote {
3002 producer: producer_endpoint.clone(),
3003 error,
3004 },
3005 );
3006 }
3007
3008 cast_actor(consumer, ConsumerCommand::Failure { producer, error })
3009}
3010
3011#[allow(dead_code)]
3012fn cast_actor<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
3013where
3014 Msg: Message,
3015{
3016 match actor_ref.cast(message) {
3017 Ok(()) => Ok(()),
3018 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
3019 Err(StreamError::ActorTerminated)
3020 }
3021 Err(error) => Err(StreamError::ActorAskSendFailed {
3022 reason: error.to_string(),
3023 }),
3024 }
3025}
3026
3027#[cfg(feature = "cluster")]
3028fn is_remote_actor<Msg>(actor_ref: &ActorRef<Msg>) -> bool {
3029 !actor_ref.get_id().is_local()
3030}
3031
3032#[cfg(feature = "cluster")]
3033fn join_remote_endpoint<Msg>(endpoint: &RemoteStreamRefEndpoint, actor_ref: &ActorRef<Msg>) {
3034 ractor::pg::join_scoped(
3035 endpoint.scope.clone(),
3036 endpoint.group.clone(),
3037 vec![actor_ref.get_cell()],
3038 );
3039}
3040
3041#[cfg(feature = "cluster")]
3042fn resolve_remote_actor<Msg>(
3043 endpoint: &RemoteStreamRefEndpoint,
3044 timeout: Duration,
3045) -> StreamResult<ActorRef<Msg>>
3046where
3047 Msg: Message,
3048{
3049 let deadline = Instant::now()
3050 .checked_add(timeout)
3051 .unwrap_or_else(far_future);
3052 loop {
3053 if let Some(actor) = resolve_remote_actor_once(endpoint) {
3054 return Ok(actor);
3055 }
3056 if Instant::now() >= deadline {
3057 return Err(subscription_timeout_error("stream ref endpoint"));
3058 }
3059 std::thread::park_timeout(STREAM_REF_WAIT_POLL);
3060 }
3061}
3062
3063#[cfg(feature = "cluster")]
3064fn resolve_remote_actor_once<Msg>(endpoint: &RemoteStreamRefEndpoint) -> Option<ActorRef<Msg>>
3065where
3066 Msg: Message,
3067{
3068 ractor::pg::get_scoped_members(&endpoint.scope, &endpoint.group)
3069 .into_iter()
3070 .next()
3071 .map(ActorCell::into)
3072}
3073
3074#[allow(dead_code)]
3075fn same_actor<MsgA, MsgB>(left: &ActorRef<MsgA>, right: &ActorRef<MsgB>) -> bool
3076where
3077 MsgA: Message,
3078 MsgB: Message,
3079{
3080 left.get_cell().get_id() == right.get_cell().get_id()
3081}
3082
3083fn failed_once<T>(reason: &str) -> BoxStream<T>
3084where
3085 T: stream_ref_actor_bound::Bound + Send + 'static,
3086{
3087 failed_stream(StreamError::Failed(reason.to_owned()))
3088}
3089
3090fn failed_stream<T>(error: StreamError) -> BoxStream<T>
3091where
3092 T: Send + 'static,
3093{
3094 Box::new(std::iter::once(Err(error)))
3095}
3096
3097fn subscription_timeout_error(side: &str) -> StreamError {
3098 StreamError::Failed(format!(
3099 "{side} remote side did not subscribe within subscription timeout"
3100 ))
3101}
3102
3103#[allow(dead_code)]
3104fn invalid_sequence_error(expected: u64, got: u64, context: &str) -> StreamError {
3105 StreamError::Failed(format!(
3106 "{context}: expected sequence {expected}, got {got}"
3107 ))
3108}
3109
3110#[allow(dead_code)]
3111fn next_redelivery_deadline(settings: StreamRefSettings) -> Instant {
3112 Instant::now()
3113 .checked_add(settings.demand_redelivery_interval)
3114 .unwrap_or_else(far_future)
3115}
3116
3117fn far_future() -> Instant {
3118 Instant::now() + Duration::from_secs(60 * 60 * 24 * 365)
3119}
3120
3121fn wait_timeout_unpoison<'a, T>(
3122 condvar: &Condvar,
3123 guard: MutexGuard<'a, T>,
3124 timeout: Duration,
3125) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
3126 condvar
3127 .wait_timeout(guard, timeout)
3128 .unwrap_or_else(|poison| poison.into_inner())
3129}
3130
3131#[cfg(feature = "cluster")]
3132fn put_u8(bytes: &mut Vec<u8>, value: u8) {
3133 bytes.push(value);
3134}
3135
3136#[cfg(feature = "cluster")]
3137fn take_u8(bytes: &[u8], cursor: &mut usize) -> u8 {
3138 let value = bytes.get(*cursor).copied().unwrap_or_default();
3139 *cursor = (*cursor).saturating_add(1);
3140 value
3141}
3142
3143#[cfg(feature = "cluster")]
3144fn put_u32(bytes: &mut Vec<u8>, value: u32) {
3145 bytes.extend(value.to_be_bytes());
3146}
3147
3148#[cfg(feature = "cluster")]
3149fn take_u32(bytes: &[u8], cursor: &mut usize) -> u32 {
3150 let mut value = [0_u8; 4];
3151 if let Some(slice) = take_exact(bytes, cursor, value.len()) {
3152 value.copy_from_slice(slice);
3153 }
3154 u32::from_be_bytes(value)
3155}
3156
3157#[cfg(feature = "cluster")]
3158fn put_u64(bytes: &mut Vec<u8>, value: u64) {
3159 bytes.extend(value.to_be_bytes());
3160}
3161
3162#[cfg(feature = "cluster")]
3163fn take_u64(bytes: &[u8], cursor: &mut usize) -> u64 {
3164 let mut value = [0_u8; 8];
3165 if let Some(slice) = take_exact(bytes, cursor, value.len()) {
3166 value.copy_from_slice(slice);
3167 }
3168 u64::from_be_bytes(value)
3169}
3170
3171#[cfg(feature = "cluster")]
3172fn put_bytes(bytes: &mut Vec<u8>, value: Vec<u8>) {
3173 put_u64(bytes, value.len() as u64);
3174 bytes.extend(value);
3175}
3176
3177#[cfg(feature = "cluster")]
3178fn remote_cast(variant: &str, fields: Vec<Vec<u8>>) -> ractor::message::SerializedMessage {
3179 let mut args = Vec::new();
3180 for field in fields {
3181 put_bytes(&mut args, field);
3182 }
3183 ractor::message::SerializedMessage::Cast {
3184 variant: variant.to_owned(),
3185 args,
3186 metadata: None,
3187 }
3188}
3189
3190#[cfg(feature = "cluster")]
3191fn take_bytes(bytes: &[u8], cursor: &mut usize) -> Vec<u8> {
3192 let len = take_u64(bytes, cursor) as usize;
3193 take_exact(bytes, cursor, len).unwrap_or_default().to_vec()
3194}
3195
3196#[cfg(feature = "cluster")]
3197fn put_string(bytes: &mut Vec<u8>, value: String) {
3198 put_bytes(bytes, value.into_bytes());
3199}
3200
3201#[cfg(feature = "cluster")]
3202fn take_string(bytes: &[u8], cursor: &mut usize) -> String {
3203 String::from_utf8(take_bytes(bytes, cursor)).unwrap_or_default()
3204}
3205
3206#[cfg(feature = "cluster")]
3207fn put_duration(bytes: &mut Vec<u8>, value: Duration) {
3208 put_u64(bytes, value.as_secs());
3209 put_u32(bytes, value.subsec_nanos());
3210}
3211
3212#[cfg(feature = "cluster")]
3213fn take_duration(bytes: &[u8], cursor: &mut usize) -> Duration {
3214 Duration::new(take_u64(bytes, cursor), take_u32(bytes, cursor))
3215}
3216
3217#[cfg(feature = "cluster")]
3218fn take_exact<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> Option<&'a [u8]> {
3219 let end = cursor.checked_add(len)?;
3220 let value = bytes.get(*cursor..end)?;
3221 *cursor = end;
3222 Some(value)
3223}
3224
3225#[cfg(feature = "cluster")]
3226fn remote_port_operation(value: String) -> &'static str {
3227 match value.as_str() {
3228 "abort_emitting" => "abort_emitting",
3229 "abort_reading" => "abort_reading",
3230 "complete" => "complete",
3231 "fail" => "fail",
3232 "grab" => "grab",
3233 "offer" => "offer",
3234 "pull" => "pull",
3235 "push" => "push",
3236 "read_n" => "read_n",
3237 "request" => "request",
3238 "set_handler" => "set_handler",
3239 "set_out_handler" => "set_out_handler",
3240 _ => "remote",
3241 }
3242}
3243
3244#[cfg(test)]
3245mod tests {
3246 use super::*;
3247 use crate::{
3248 stream::{Keep, Source},
3249 testkit::TestSink,
3250 };
3251 use std::sync::{
3252 Arc as StdArc,
3253 atomic::{AtomicBool, AtomicUsize, Ordering},
3254 };
3255
3256 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
3257 let deadline = Instant::now() + timeout;
3258 while Instant::now() < deadline {
3259 if condition() {
3260 return true;
3261 }
3262 std::thread::park_timeout(Duration::from_millis(1));
3263 }
3264 condition()
3265 }
3266
3267 fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
3268 let deadline = Instant::now() + timeout;
3269 while Instant::now() < deadline {
3270 assert!(condition());
3271 std::thread::park_timeout(Duration::from_millis(1));
3272 }
3273 assert!(condition());
3274 }
3275
3276 fn short_settings() -> StreamRefSettings {
3277 StreamRefSettings::default()
3278 .with_buffer_capacity(1)
3279 .with_subscription_timeout(Duration::from_millis(50))
3280 .with_demand_redelivery_interval(Duration::from_millis(10))
3281 }
3282
3283 #[test]
3284 fn source_ref_streams_elements_and_completion() {
3285 let source_ref = Source::from_iter(1_u64..=3)
3286 .run_with(StreamRefs::source_ref())
3287 .unwrap();
3288
3289 assert_eq!(source_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
3290 }
3291
3292 #[test]
3293 fn sink_ref_streams_elements_and_completion() {
3294 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3295 .to_mat(Sink::collect(), Keep::both)
3296 .run()
3297 .unwrap();
3298
3299 Source::from_iter(1_u64..=3)
3300 .run_with(sink_ref.sink())
3301 .unwrap()
3302 .wait()
3303 .unwrap();
3304
3305 assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
3306 }
3307
3308 #[test]
3309 fn source_ref_propagates_upstream_failure() {
3310 let source_ref = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
3311 .run_with(StreamRefs::source_ref())
3312 .unwrap();
3313
3314 assert_eq!(
3315 source_ref.source().run_collect(),
3316 Err(StreamError::Failed("boom".to_owned()))
3317 );
3318 }
3319
3320 #[test]
3321 fn sink_ref_propagates_upstream_failure() {
3322 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3323 .to_mat(Sink::collect(), Keep::both)
3324 .run()
3325 .unwrap();
3326
3327 let failure = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
3328 .run_with(sink_ref.sink())
3329 .unwrap()
3330 .wait();
3331
3332 assert_eq!(failure, Err(StreamError::Failed("remote boom".to_owned())));
3333 assert_eq!(
3334 completion.wait(),
3335 Err(StreamError::Failed("remote boom".to_owned()))
3336 );
3337 }
3338
3339 #[test]
3340 fn source_ref_cancellation_reaches_origin() {
3341 let closed = StdArc::new(AtomicBool::new(false));
3342 let close_flag = StdArc::clone(&closed);
3343 let source = Source::unfold_resource(
3344 || Ok(()),
3345 |_state| Ok(Some(1_u64)),
3346 move |_state| {
3347 close_flag.store(true, Ordering::SeqCst);
3348 Ok(())
3349 },
3350 );
3351 let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
3352
3353 assert_eq!(source_ref.source().take(1).run_collect().unwrap(), vec![1]);
3354 assert!(wait_until(Duration::from_secs(1), || {
3355 closed.load(Ordering::SeqCst)
3356 }));
3357 }
3358
3359 #[test]
3360 fn sink_ref_cancellation_stops_remote_producer() {
3361 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3362 .take(1)
3363 .to_mat(Sink::collect(), Keep::both)
3364 .run()
3365 .unwrap();
3366
3367 let producer = Source::repeat(1_u64)
3368 .run_with(sink_ref.sink())
3369 .unwrap()
3370 .wait();
3371
3372 assert_eq!(completion.wait().unwrap(), vec![1]);
3373 assert_eq!(producer, Err(StreamError::Cancelled));
3374 }
3375
3376 #[test]
3377 fn source_ref_backpressures_across_ref() {
3378 let pulled = StdArc::new(AtomicUsize::new(0));
3379 let pulled_for_source = StdArc::clone(&pulled);
3380 let source = Source::unfold(0_u64, move |next| {
3381 pulled_for_source.fetch_add(1, Ordering::SeqCst);
3382 Some((next + 1, next))
3383 });
3384 let source_ref = source
3385 .run_with(StreamRefs::source_ref_with_settings(short_settings()))
3386 .unwrap();
3387 let mut probe = source_ref.source().run_with(TestSink::probe()).unwrap();
3388
3389 probe.request(1);
3390 probe.assert_next(0);
3391 assert!(wait_until(Duration::from_secs(1), || {
3392 pulled.load(Ordering::SeqCst) >= 2
3393 }));
3394 assert_condition_holds(Duration::from_millis(50), || {
3395 pulled.load(Ordering::SeqCst) <= 2
3396 });
3397
3398 probe.request(1);
3399 probe.assert_next(1);
3400 assert!(wait_until(Duration::from_secs(1), || {
3401 pulled.load(Ordering::SeqCst) >= 3
3402 }));
3403 assert_condition_holds(Duration::from_millis(50), || {
3404 pulled.load(Ordering::SeqCst) <= 3
3405 });
3406
3407 probe.cancel();
3408 }
3409
3410 #[test]
3411 fn source_ref_late_subscription_observes_timeout() {
3412 let source_ref = Source::repeat(1_u64)
3413 .run_with(StreamRefs::source_ref_with_settings(short_settings()))
3414 .unwrap();
3415
3416 assert!(wait_until(Duration::from_secs(1), || {
3417 source_ref.inner.producer.get_status() == ractor::ActorStatus::Stopped
3418 }));
3419
3420 let result = source_ref.source().run_collect();
3421 assert!(matches!(
3422 result,
3423 Err(StreamError::ActorTerminated) | Err(StreamError::Failed(_))
3424 ));
3425 }
3426
3427 #[test]
3428 fn sink_ref_subscription_timeout_fails_local_source() {
3429 let (_sink_ref, probe) = StreamRefs::sink_ref_with_settings::<u64>(short_settings())
3430 .to_mat(TestSink::probe(), Keep::both)
3431 .run()
3432 .unwrap();
3433
3434 probe.request(1);
3435 let error = probe.expect_error();
3436 assert!(
3437 matches!(error, StreamError::Failed(message) if message.contains("did not subscribe"))
3438 );
3439 }
3440
3441 #[test]
3442 fn stream_refs_are_one_shot() {
3443 let source_ref = Source::from_iter([1_u64])
3444 .run_with(StreamRefs::source_ref())
3445 .unwrap();
3446 assert_eq!(source_ref.source().run_collect().unwrap(), vec![1]);
3447 assert!(matches!(
3448 source_ref.source().run_collect(),
3449 Err(StreamError::Failed(message)) if message.contains("already")
3450 ));
3451
3452 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3453 .to_mat(Sink::collect(), Keep::both)
3454 .run()
3455 .unwrap();
3456 Source::single(1_u64)
3457 .run_with(sink_ref.sink())
3458 .unwrap()
3459 .wait()
3460 .unwrap();
3461 assert!(matches!(
3462 Source::single(2_u64).run_with(sink_ref.sink()).unwrap().wait(),
3463 Err(StreamError::Failed(message)) if message.contains("already")
3464 ));
3465 assert_eq!(completion.wait().unwrap(), vec![1]);
3466 }
3467
3468 #[cfg(feature = "cluster")]
3469 fn serialize_source_ref<T>(source_ref: SourceRef<T>) -> SourceRef<T>
3470 where
3471 T: BytesConvertable + Send + 'static,
3472 {
3473 SourceRef::from_bytes(source_ref.into_bytes())
3474 }
3475
3476 #[cfg(feature = "cluster")]
3477 fn serialize_sink_ref<T>(sink_ref: SinkRef<T>) -> SinkRef<T>
3478 where
3479 T: BytesConvertable + Send + 'static,
3480 {
3481 SinkRef::from_bytes(sink_ref.into_bytes())
3482 }
3483
3484 #[cfg(feature = "cluster")]
3485 #[test]
3486 fn remote_source_ref_serialized_path_streams_elements_and_completion() {
3487 let source_ref = Source::from_iter(1_u64..=3)
3488 .run_with(StreamRefs::source_ref())
3489 .unwrap();
3490 let remote_ref = serialize_source_ref(source_ref);
3491
3492 assert_eq!(remote_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
3493 }
3494
3495 #[cfg(feature = "cluster")]
3496 #[test]
3497 fn remote_sink_ref_serialized_path_streams_elements_and_completion() {
3498 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3499 .to_mat(Sink::collect(), Keep::both)
3500 .run()
3501 .unwrap();
3502 let remote_ref = serialize_sink_ref(sink_ref);
3503
3504 Source::from_iter(1_u64..=3)
3505 .run_with(remote_ref.sink())
3506 .unwrap()
3507 .wait()
3508 .unwrap();
3509
3510 assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
3511 }
3512
3513 #[cfg(feature = "cluster")]
3514 #[test]
3515 fn remote_source_ref_serialized_path_propagates_failure() {
3516 let source_ref = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
3517 .run_with(StreamRefs::source_ref())
3518 .unwrap();
3519 let remote_ref = serialize_source_ref(source_ref);
3520
3521 assert_eq!(
3522 remote_ref.source().run_collect(),
3523 Err(StreamError::Failed("remote boom".to_owned()))
3524 );
3525 }
3526
3527 #[cfg(feature = "cluster")]
3528 #[test]
3529 fn remote_sink_ref_serialized_path_propagates_failure() {
3530 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
3531 .to_mat(Sink::collect(), Keep::both)
3532 .run()
3533 .unwrap();
3534 let remote_ref = serialize_sink_ref(sink_ref);
3535
3536 let failure = Source::<u64>::failed(StreamError::Failed("sink boom".to_owned()))
3537 .run_with(remote_ref.sink())
3538 .unwrap()
3539 .wait();
3540
3541 assert_eq!(failure, Err(StreamError::Failed("sink boom".to_owned())));
3542 assert_eq!(
3543 completion.wait(),
3544 Err(StreamError::Failed("sink boom".to_owned()))
3545 );
3546 }
3547
3548 #[cfg(feature = "cluster")]
3549 #[test]
3550 fn remote_source_ref_serialized_path_cancellation_reaches_origin() {
3551 let closed = StdArc::new(AtomicBool::new(false));
3552 let close_flag = StdArc::clone(&closed);
3553 let source = Source::unfold_resource(
3554 || Ok(()),
3555 |_state| Ok(Some(1_u64)),
3556 move |_state| {
3557 close_flag.store(true, Ordering::SeqCst);
3558 Ok(())
3559 },
3560 );
3561 let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
3562 let remote_ref = serialize_source_ref(source_ref);
3563
3564 assert_eq!(remote_ref.source().take(1).run_collect().unwrap(), vec![1]);
3565 assert!(wait_until(Duration::from_secs(1), || {
3566 closed.load(Ordering::SeqCst)
3567 }));
3568 }
3569
3570 #[cfg(feature = "cluster")]
3571 #[test]
3572 fn remote_source_ref_serialized_path_backpressures_across_ref() {
3573 let pulled = StdArc::new(AtomicUsize::new(0));
3574 let pulled_for_source = StdArc::clone(&pulled);
3575 let source = Source::unfold(0_u64, move |next| {
3576 pulled_for_source.fetch_add(1, Ordering::SeqCst);
3577 Some((next + 1, next))
3578 });
3579 let source_ref = source
3580 .run_with(StreamRefs::source_ref_with_settings(short_settings()))
3581 .unwrap();
3582 let remote_ref = serialize_source_ref(source_ref);
3583 let mut probe = remote_ref.source().run_with(TestSink::probe()).unwrap();
3584
3585 probe.request(1);
3586 probe.assert_next(0);
3587 assert!(wait_until(Duration::from_secs(1), || {
3588 pulled.load(Ordering::SeqCst) >= 1
3589 }));
3590 assert_condition_holds(Duration::from_secs(1), || {
3591 pulled.load(Ordering::SeqCst) <= 2
3592 });
3593
3594 probe.request(1);
3595 probe.assert_next(1);
3596 assert!(wait_until(Duration::from_secs(1), || {
3597 pulled.load(Ordering::SeqCst) >= 2
3598 }));
3599 assert_condition_holds(Duration::from_secs(1), || {
3600 pulled.load(Ordering::SeqCst) <= 3
3601 });
3602
3603 probe.cancel();
3604 }
3605
3606 #[cfg(feature = "cluster")]
3607 #[test]
3608 fn remote_sink_ref_serialized_path_producer_panic_surfaces_error() {
3609 let (sink_ref, _completion) = StreamRefs::sink_ref::<u64>()
3610 .to_mat(Sink::head(), Keep::both)
3611 .run()
3612 .unwrap();
3613 let remote_sink = serialize_sink_ref(sink_ref);
3614
3615 let panicking = Source::unfold(0_u64, move |state| {
3616 if state >= 5 {
3617 panic!("intentional test panic in producer thread");
3618 }
3619 Some((state + 1, state))
3620 });
3621 let mut completion = panicking
3622 .run_with(remote_sink.sink())
3623 .expect("sink ref materializes");
3624
3625 let deadline = Instant::now() + Duration::from_secs(5);
3626 loop {
3627 if let Some(result) = completion.try_wait() {
3628 assert!(
3629 result.is_err(),
3630 "panic in producer thread should surface error to consumer, got {:?}",
3631 result
3632 );
3633 break;
3634 }
3635 assert!(
3636 Instant::now() < deadline,
3637 "consumer hung after producer panic (no result after 5s)"
3638 );
3639 std::thread::park_timeout(Duration::from_millis(10));
3640 }
3641 }
3642}