1use std::{
2 collections::VecDeque,
3 fmt,
4 sync::{
5 Arc, Condvar, Mutex, MutexGuard,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11use crate::stream::{
12 BoxStream, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult,
13};
14
15use super::{Actor, ActorProcessingErr, ActorRef, ActorResult, Message, block_on_ractor_runtime};
16
17const DEFAULT_STREAM_REF_BUFFER_CAPACITY: usize = 32;
18const DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(30);
19const DEFAULT_STREAM_REF_DEMAND_REDELIVERY: Duration = Duration::from_secs(1);
20const STREAM_REF_WAIT_POLL: Duration = Duration::from_millis(1);
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct StreamRefSettings {
29 buffer_capacity: usize,
30 subscription_timeout: Duration,
31 demand_redelivery_interval: Duration,
32}
33
34impl Default for StreamRefSettings {
35 fn default() -> Self {
36 Self {
37 buffer_capacity: DEFAULT_STREAM_REF_BUFFER_CAPACITY,
38 subscription_timeout: DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT,
39 demand_redelivery_interval: DEFAULT_STREAM_REF_DEMAND_REDELIVERY,
40 }
41 }
42}
43
44impl StreamRefSettings {
45 #[must_use]
46 pub fn buffer_capacity(&self) -> usize {
47 self.buffer_capacity
48 }
49
50 #[must_use]
51 pub fn subscription_timeout(&self) -> Duration {
52 self.subscription_timeout
53 }
54
55 #[must_use]
56 pub fn demand_redelivery_interval(&self) -> Duration {
57 self.demand_redelivery_interval
58 }
59
60 #[must_use]
61 pub fn with_buffer_capacity(mut self, capacity: usize) -> Self {
62 assert!(
63 capacity > 0,
64 "StreamRef buffer capacity must be greater than zero"
65 );
66 self.buffer_capacity = capacity;
67 self
68 }
69
70 #[must_use]
71 pub fn with_subscription_timeout(mut self, timeout: Duration) -> Self {
72 self.subscription_timeout = timeout;
73 self
74 }
75
76 #[must_use]
77 pub fn with_demand_redelivery_interval(mut self, interval: Duration) -> Self {
78 self.demand_redelivery_interval = interval;
79 self
80 }
81}
82
83pub struct StreamRefs;
90
91impl StreamRefs {
92 #[must_use]
93 pub fn source_ref<T>() -> Sink<T, SourceRef<T>>
94 where
95 T: Send + 'static,
96 {
97 Self::source_ref_with_settings(StreamRefSettings::default())
98 }
99
100 #[must_use]
101 pub fn source_ref_with_settings<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
102 where
103 T: Send + 'static,
104 {
105 stream_ref_source_sink(settings)
106 }
107
108 #[must_use]
109 pub fn sink_ref<T>() -> Source<T, SinkRef<T>>
110 where
111 T: Send + 'static,
112 {
113 Self::sink_ref_with_settings(StreamRefSettings::default())
114 }
115
116 #[must_use]
117 pub fn sink_ref_with_settings<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
118 where
119 T: Send + 'static,
120 {
121 stream_ref_sink_source(settings)
122 }
123}
124
125pub struct SourceRef<T> {
130 inner: Arc<SourceRefInner<T>>,
131}
132
133struct SourceRefInner<T> {
134 producer: ActorRef<ProducerCommand<T>>,
135 settings: StreamRefSettings,
136 subscribed: AtomicBool,
137 _keep_alive: Mutex<Option<StreamCompletion<NotUsed>>>,
138}
139
140impl<T> Clone for SourceRef<T> {
141 fn clone(&self) -> Self {
142 Self {
143 inner: Arc::clone(&self.inner),
144 }
145 }
146}
147
148impl<T> fmt::Debug for SourceRef<T> {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("SourceRef").finish_non_exhaustive()
151 }
152}
153
154impl<T> SourceRef<T>
155where
156 T: Send + 'static,
157{
158 #[must_use]
159 pub fn source(&self) -> Source<T, NotUsed> {
160 let inner = Arc::clone(&self.inner);
161 Source::from_materialized_factory(move |_materializer| {
162 if inner.subscribed.swap(true, Ordering::SeqCst) {
163 return Ok((
164 failed_once("source ref has already been materialized"),
165 NotUsed,
166 ));
167 }
168
169 let shared = Arc::new(ConsumerShared::new(inner.settings));
170 let (consumer_ref, _handle) =
171 spawn_consumer_actor(Some(inner.producer.clone()), Arc::clone(&shared))?;
172 Ok((
173 Box::new(ConsumerStream {
174 shared,
175 actor_ref: Some(consumer_ref),
176 settings: inner.settings,
177 terminated: false,
178 source_ref_keep_alive: Some(Arc::clone(&inner)),
179 }) as BoxStream<T>,
180 NotUsed,
181 ))
182 })
183 }
184}
185
186pub struct SinkRef<T> {
191 inner: Arc<SinkRefInner<T>>,
192}
193
194struct SinkRefInner<T> {
195 consumer: ActorRef<ConsumerCommand<T>>,
196 settings: StreamRefSettings,
197 subscribed: AtomicBool,
198}
199
200impl<T> Clone for SinkRef<T> {
201 fn clone(&self) -> Self {
202 Self {
203 inner: Arc::clone(&self.inner),
204 }
205 }
206}
207
208impl<T> fmt::Debug for SinkRef<T> {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.debug_struct("SinkRef").finish_non_exhaustive()
211 }
212}
213
214impl<T> SinkRef<T>
215where
216 T: Send + 'static,
217{
218 #[must_use]
219 pub fn sink(&self) -> Sink<T, StreamCompletion<NotUsed>> {
220 let inner = Arc::clone(&self.inner);
221 Sink::from_runner(move |input, materializer| {
222 if inner.subscribed.swap(true, Ordering::SeqCst) {
223 return Ok(StreamCompletion::ready(Err(StreamError::Failed(
224 "sink ref has already been materialized".to_owned(),
225 ))));
226 }
227
228 let shared = Arc::new(ProducerShared::new());
229 let (producer_ref, _handle) =
230 spawn_producer_actor(Some(inner.consumer.clone()), Arc::clone(&shared))?;
231 let settings = inner.settings;
232 Ok(materializer.spawn_stream(move |cancelled| {
233 run_producer_endpoint(input, shared, producer_ref, settings, cancelled)
234 }))
235 })
236 }
237}
238
239enum ProducerCommand<T> {
240 Subscribe {
241 consumer: ActorRef<ConsumerCommand<T>>,
242 },
243 Demand {
244 consumer: ActorRef<ConsumerCommand<T>>,
245 cumulative: u64,
246 },
247 Cancel {
248 consumer: ActorRef<ConsumerCommand<T>>,
249 },
250 RemoteFailure {
251 consumer: ActorRef<ConsumerCommand<T>>,
252 error: StreamError,
253 },
254 Ack,
255}
256
257#[cfg(feature = "cluster")]
258impl<T: Send + 'static> Message for ProducerCommand<T> {}
259
260enum ConsumerCommand<T> {
261 OnSubscribe {
262 producer: ActorRef<ProducerCommand<T>>,
263 },
264 Element {
265 producer: ActorRef<ProducerCommand<T>>,
266 seq: u64,
267 item: T,
268 },
269 Complete {
270 producer: ActorRef<ProducerCommand<T>>,
271 seq: u64,
272 },
273 Failure {
274 producer: ActorRef<ProducerCommand<T>>,
275 error: StreamError,
276 },
277}
278
279#[cfg(feature = "cluster")]
280impl<T: Send + 'static> Message for ConsumerCommand<T> {}
281
282struct ProducerShared<T> {
283 inner: Mutex<ProducerInner<T>>,
284 changed: Condvar,
285}
286
287struct ProducerInner<T> {
288 consumer: Option<ActorRef<ConsumerCommand<T>>>,
289 cumulative_demand: u64,
290 sent: u64,
291 stopped: Option<StreamError>,
292}
293
294impl<T> ProducerShared<T>
295where
296 T: Send + 'static,
297{
298 fn new() -> Self {
299 Self {
300 inner: Mutex::new(ProducerInner {
301 consumer: None,
302 cumulative_demand: 0,
303 sent: 0,
304 stopped: None,
305 }),
306 changed: Condvar::new(),
307 }
308 }
309
310 fn lock(&self) -> MutexGuard<'_, ProducerInner<T>> {
311 self.inner
312 .lock()
313 .unwrap_or_else(|poison| poison.into_inner())
314 }
315
316 fn set_consumer(
317 &self,
318 consumer: ActorRef<ConsumerCommand<T>>,
319 ) -> Result<bool, ActorRef<ConsumerCommand<T>>> {
320 let mut inner = self.lock();
321 match &inner.consumer {
322 Some(existing) if !same_actor(existing, &consumer) => Err(consumer),
323 Some(_) => Ok(false),
324 None => {
325 inner.consumer = Some(consumer);
326 drop(inner);
327 self.changed.notify_all();
328 Ok(true)
329 }
330 }
331 }
332
333 fn update_demand(&self, consumer: &ActorRef<ConsumerCommand<T>>, cumulative: u64) {
334 let mut inner = self.lock();
335 if inner
336 .consumer
337 .as_ref()
338 .is_some_and(|existing| same_actor(existing, consumer))
339 && cumulative > inner.cumulative_demand
340 {
341 inner.cumulative_demand = cumulative;
342 drop(inner);
343 self.changed.notify_all();
344 }
345 }
346
347 fn stop_from_consumer(&self, consumer: &ActorRef<ConsumerCommand<T>>, error: StreamError) {
348 let mut inner = self.lock();
349 if inner
350 .consumer
351 .as_ref()
352 .is_none_or(|existing| same_actor(existing, consumer))
353 && inner.stopped.is_none()
354 {
355 inner.stopped = Some(error);
356 drop(inner);
357 self.changed.notify_all();
358 }
359 }
360
361 fn stop_unless_finished(&self, error: StreamError) {
362 let mut inner = self.lock();
363 if inner.stopped.is_none() {
364 inner.stopped = Some(error);
365 drop(inner);
366 self.changed.notify_all();
367 }
368 }
369}
370
371struct ConsumerShared<T> {
372 inner: Mutex<ConsumerInner<T>>,
373 changed: Condvar,
374}
375
376struct ConsumerInner<T> {
377 producer: Option<ActorRef<ProducerCommand<T>>>,
378 queue: VecDeque<T>,
379 terminal: Option<ConsumerTerminal>,
380 expected_seq: u64,
381 delivered: u64,
382 cumulative_demand: u64,
383}
384
385#[derive(Clone)]
386enum ConsumerTerminal {
387 Complete,
388 Error(StreamError),
389}
390
391impl<T> ConsumerShared<T>
392where
393 T: Send + 'static,
394{
395 fn new(_settings: StreamRefSettings) -> Self {
396 Self {
397 inner: Mutex::new(ConsumerInner {
398 producer: None,
399 queue: VecDeque::new(),
400 terminal: None,
401 expected_seq: 0,
402 delivered: 0,
403 cumulative_demand: 0,
404 }),
405 changed: Condvar::new(),
406 }
407 }
408
409 fn lock(&self) -> MutexGuard<'_, ConsumerInner<T>> {
410 self.inner
411 .lock()
412 .unwrap_or_else(|poison| poison.into_inner())
413 }
414
415 fn set_producer(&self, producer: ActorRef<ProducerCommand<T>>) -> bool {
416 let mut inner = self.lock();
417 match &inner.producer {
418 Some(existing) => same_actor(existing, &producer),
419 None => {
420 inner.producer = Some(producer);
421 drop(inner);
422 self.changed.notify_all();
423 true
424 }
425 }
426 }
427
428 fn push(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64, item: T) {
429 let mut inner = self.lock();
430 if inner.terminal.is_some() || !producer_matches(&inner, producer) {
431 return;
432 }
433 if seq != inner.expected_seq {
434 inner.queue.clear();
435 inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
436 inner.expected_seq,
437 seq,
438 "stream ref element sequence gap",
439 )));
440 } else {
441 inner.expected_seq += 1;
442 inner.queue.push_back(item);
443 }
444 drop(inner);
445 self.changed.notify_all();
446 }
447
448 fn complete(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64) {
449 let mut inner = self.lock();
450 if inner.terminal.is_some() || !producer_matches(&inner, producer) {
451 return;
452 }
453 if seq != inner.expected_seq {
454 inner.queue.clear();
455 inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
456 inner.expected_seq,
457 seq,
458 "stream ref completion sequence gap",
459 )));
460 } else {
461 inner.terminal = Some(ConsumerTerminal::Complete);
462 }
463 drop(inner);
464 self.changed.notify_all();
465 }
466
467 fn fail(&self, producer: &ActorRef<ProducerCommand<T>>, error: StreamError) {
468 let mut inner = self.lock();
469 if inner.terminal.is_some() || !producer_matches(&inner, producer) {
470 return;
471 }
472 inner.queue.clear();
473 inner.terminal = Some(ConsumerTerminal::Error(error));
474 drop(inner);
475 self.changed.notify_all();
476 }
477
478 fn fail_local(&self, error: StreamError) {
479 let mut inner = self.lock();
480 if inner.terminal.is_none() {
481 inner.queue.clear();
482 inner.terminal = Some(ConsumerTerminal::Error(error));
483 drop(inner);
484 self.changed.notify_all();
485 }
486 }
487}
488
489fn producer_matches<T>(inner: &ConsumerInner<T>, producer: &ActorRef<ProducerCommand<T>>) -> bool
490where
491 T: Send + 'static,
492{
493 inner
494 .producer
495 .as_ref()
496 .is_some_and(|existing| same_actor(existing, producer))
497}
498
499struct ProducerActor<T> {
500 shared: Arc<ProducerShared<T>>,
501 initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
502}
503
504impl<T> Actor for ProducerActor<T>
505where
506 T: Send + 'static,
507{
508 type Msg = ProducerCommand<T>;
509 type State = ();
510 type Arguments = ();
511
512 async fn pre_start(
513 &self,
514 myself: ActorRef<Self::Msg>,
515 _args: Self::Arguments,
516 ) -> Result<Self::State, ActorProcessingErr> {
517 if let Some(consumer) = &self.initial_consumer {
518 register_producer_consumer(&self.shared, myself, consumer.clone());
519 }
520 Ok(())
521 }
522
523 async fn handle(
524 &self,
525 myself: ActorRef<Self::Msg>,
526 message: Self::Msg,
527 _state: &mut Self::State,
528 ) -> ActorResult {
529 match message {
530 ProducerCommand::Subscribe { consumer } => {
531 register_producer_consumer(&self.shared, myself, consumer);
532 }
533 ProducerCommand::Demand {
534 consumer,
535 cumulative,
536 } => self.shared.update_demand(&consumer, cumulative),
537 ProducerCommand::Cancel { consumer } => {
538 self.shared
539 .stop_from_consumer(&consumer, StreamError::Cancelled);
540 }
541 ProducerCommand::RemoteFailure { consumer, error } => {
542 self.shared.stop_from_consumer(&consumer, error);
543 }
544 ProducerCommand::Ack => myself.stop(None),
545 }
546 Ok(())
547 }
548
549 async fn post_stop(
550 &self,
551 _myself: ActorRef<Self::Msg>,
552 _state: &mut Self::State,
553 ) -> ActorResult {
554 self.shared
555 .stop_unless_finished(StreamError::ActorTerminated);
556 Ok(())
557 }
558}
559
560fn register_producer_consumer<T>(
561 shared: &Arc<ProducerShared<T>>,
562 producer: ActorRef<ProducerCommand<T>>,
563 consumer: ActorRef<ConsumerCommand<T>>,
564) where
565 T: Send + 'static,
566{
567 match shared.set_consumer(consumer.clone()) {
568 Ok(true) | Ok(false) => {
569 let _ = cast_actor(
570 &consumer,
571 ConsumerCommand::OnSubscribe {
572 producer: producer.clone(),
573 },
574 );
575 }
576 Err(duplicate) => {
577 let _ = cast_actor(
578 &duplicate,
579 ConsumerCommand::Failure {
580 producer,
581 error: StreamError::Failed(
582 "stream ref was already subscribed by another endpoint".to_owned(),
583 ),
584 },
585 );
586 }
587 }
588}
589
590struct ConsumerActor<T> {
591 shared: Arc<ConsumerShared<T>>,
592 initial_producer: Option<ActorRef<ProducerCommand<T>>>,
593}
594
595impl<T> Actor for ConsumerActor<T>
596where
597 T: Send + 'static,
598{
599 type Msg = ConsumerCommand<T>;
600 type State = ();
601 type Arguments = ();
602
603 async fn pre_start(
604 &self,
605 myself: ActorRef<Self::Msg>,
606 _args: Self::Arguments,
607 ) -> Result<Self::State, ActorProcessingErr> {
608 if let Some(producer) = &self.initial_producer {
609 self.shared.set_producer(producer.clone());
610 if let Err(error) = cast_actor(
611 producer,
612 ProducerCommand::Subscribe {
613 consumer: myself.clone(),
614 },
615 ) {
616 self.shared.fail_local(error);
617 myself.stop(None);
618 }
619 }
620 Ok(())
621 }
622
623 async fn handle(
624 &self,
625 _myself: ActorRef<Self::Msg>,
626 message: Self::Msg,
627 _state: &mut Self::State,
628 ) -> ActorResult {
629 match message {
630 ConsumerCommand::OnSubscribe { producer } => {
631 if !self.shared.set_producer(producer.clone()) {
632 let _ = cast_actor(
633 &producer,
634 ProducerCommand::RemoteFailure {
635 consumer: _myself.clone(),
636 error: StreamError::Failed(
637 "stream ref was already subscribed by another endpoint".to_owned(),
638 ),
639 },
640 );
641 }
642 }
643 ConsumerCommand::Element {
644 producer,
645 seq,
646 item,
647 } => self.shared.push(&producer, seq, item),
648 ConsumerCommand::Complete { producer, seq } => {
649 self.shared.complete(&producer, seq);
650 let _ = cast_actor(&producer, ProducerCommand::Ack);
651 }
652 ConsumerCommand::Failure { producer, error } => {
653 self.shared.fail(&producer, error);
654 let _ = cast_actor(&producer, ProducerCommand::Ack);
655 }
656 }
657 Ok(())
658 }
659
660 async fn post_stop(
661 &self,
662 _myself: ActorRef<Self::Msg>,
663 _state: &mut Self::State,
664 ) -> ActorResult {
665 self.shared.fail_local(StreamError::ActorTerminated);
666 Ok(())
667 }
668}
669
670struct ConsumerStream<T>
671where
672 T: Send + 'static,
673{
674 shared: Arc<ConsumerShared<T>>,
675 actor_ref: Option<ActorRef<ConsumerCommand<T>>>,
676 settings: StreamRefSettings,
677 terminated: bool,
678 source_ref_keep_alive: Option<Arc<SourceRefInner<T>>>,
679}
680
681impl<T> Iterator for ConsumerStream<T>
682where
683 T: Send + 'static,
684{
685 type Item = StreamResult<T>;
686
687 fn next(&mut self) -> Option<Self::Item> {
688 if self.terminated {
689 return None;
690 }
691
692 if let Err(error) = self.wait_for_subscription() {
693 self.terminated = true;
694 return Some(Err(error));
695 }
696
697 if let Err(error) = self.redeliver_or_extend_demand() {
698 self.terminated = true;
699 return Some(Err(error));
700 }
701
702 let mut next_redelivery = next_redelivery_deadline(self.settings);
703 loop {
704 let demand_after_pop = {
705 let mut inner = self.shared.lock();
706 if let Some(item) = inner.queue.pop_front() {
707 inner.delivered = inner.delivered.saturating_add(1);
708 let demand = next_demand(&mut inner, self.settings);
709 drop(inner);
710 if let Some((producer, cumulative)) = demand
711 && let Err(error) = send_demand(&self.actor_ref, &producer, cumulative)
712 {
713 self.terminated = true;
714 return Some(Err(error));
715 }
716 return Some(Ok(item));
717 }
718
719 if let Some(terminal) = inner.terminal.clone() {
720 drop(inner);
721 match terminal {
722 ConsumerTerminal::Complete => {
723 self.terminated = true;
724 self.stop_actor();
725 return None;
726 }
727 ConsumerTerminal::Error(error) => {
728 self.terminated = true;
729 self.stop_actor();
730 return Some(Err(error));
731 }
732 }
733 }
734 None::<(ActorRef<ProducerCommand<T>>, u64)>
735 };
736 debug_assert!(demand_after_pop.is_none());
737
738 let now = Instant::now();
739 let timeout = next_redelivery.saturating_duration_since(now);
740 let mut inner = self.shared.lock();
741 if !inner.queue.is_empty() || inner.terminal.is_some() {
742 continue;
743 }
744 let (next_inner, result) = wait_timeout_unpoison(
745 &self.shared.changed,
746 inner,
747 timeout.min(STREAM_REF_WAIT_POLL),
748 );
749 inner = next_inner;
750 drop(inner);
751 if result.timed_out() && Instant::now() >= next_redelivery {
752 if let Err(error) = self.redeliver_demand() {
753 self.terminated = true;
754 return Some(Err(error));
755 }
756 next_redelivery = next_redelivery_deadline(self.settings);
757 }
758 }
759 }
760}
761
762impl<T> ConsumerStream<T>
763where
764 T: Send + 'static,
765{
766 fn wait_for_subscription(&self) -> StreamResult<()> {
767 let deadline = Instant::now()
768 .checked_add(self.settings.subscription_timeout)
769 .unwrap_or_else(far_future);
770 let mut inner = self.shared.lock();
771 loop {
772 if inner.producer.is_some() {
773 return Ok(());
774 }
775 if let Some(terminal) = inner.terminal.clone() {
776 return match terminal {
777 ConsumerTerminal::Complete => Ok(()),
778 ConsumerTerminal::Error(error) => Err(error),
779 };
780 }
781 let now = Instant::now();
782 if now >= deadline {
783 drop(inner);
784 let error = subscription_timeout_error("stream ref source");
785 self.shared.fail_local(error.clone());
786 self.stop_actor_ref();
787 return Err(error);
788 }
789 let remaining = deadline.saturating_duration_since(now);
790 let (next, _) = wait_timeout_unpoison(
791 &self.shared.changed,
792 inner,
793 remaining.min(STREAM_REF_WAIT_POLL),
794 );
795 inner = next;
796 }
797 }
798
799 fn redeliver_or_extend_demand(&self) -> StreamResult<()> {
800 let demand = {
801 let mut inner = self.shared.lock();
802 next_demand(&mut inner, self.settings)
803 };
804 if let Some((producer, cumulative)) = demand {
805 send_demand(&self.actor_ref, &producer, cumulative)?;
806 }
807 Ok(())
808 }
809
810 fn redeliver_demand(&self) -> StreamResult<()> {
811 let (producer, cumulative) = {
812 let inner = self.shared.lock();
813 let Some(producer) = inner.producer.clone() else {
814 return Ok(());
815 };
816 if inner.cumulative_demand == 0 {
817 return Ok(());
818 }
819 (producer, inner.cumulative_demand)
820 };
821 send_demand(&self.actor_ref, &producer, cumulative)
822 }
823
824 fn stop_actor(&mut self) {
825 if let Some(actor_ref) = self.actor_ref.take() {
826 actor_ref.stop(None);
827 }
828 }
829
830 fn stop_actor_ref(&self) {
831 if let Some(actor_ref) = &self.actor_ref {
832 actor_ref.stop(None);
833 }
834 }
835}
836
837impl<T> Drop for ConsumerStream<T>
838where
839 T: Send + 'static,
840{
841 fn drop(&mut self) {
842 if !self.terminated {
843 let producer = self.shared.lock().producer.clone();
844 if let (Some(consumer), Some(producer)) = (&self.actor_ref, producer) {
845 let _ = cast_actor(
846 &producer,
847 ProducerCommand::Cancel {
848 consumer: consumer.clone(),
849 },
850 );
851 }
852 }
853 self.stop_actor();
854 drop(self.source_ref_keep_alive.take());
855 }
856}
857
858fn next_demand<T>(
859 inner: &mut ConsumerInner<T>,
860 settings: StreamRefSettings,
861) -> Option<(ActorRef<ProducerCommand<T>>, u64)> {
862 if inner.terminal.is_some() {
863 return None;
864 }
865 let target = inner
866 .delivered
867 .saturating_add(settings.buffer_capacity as u64);
868 if inner.cumulative_demand >= target {
869 return None;
870 }
871 inner.cumulative_demand = target;
872 inner
873 .producer
874 .as_ref()
875 .map(|producer| (producer.clone(), inner.cumulative_demand))
876}
877
878fn send_demand<T>(
879 consumer: &Option<ActorRef<ConsumerCommand<T>>>,
880 producer: &ActorRef<ProducerCommand<T>>,
881 cumulative: u64,
882) -> StreamResult<()>
883where
884 T: Send + 'static,
885{
886 let Some(consumer) = consumer else {
887 return Err(StreamError::ActorTerminated);
888 };
889 cast_actor(
890 producer,
891 ProducerCommand::Demand {
892 consumer: consumer.clone(),
893 cumulative,
894 },
895 )
896}
897
898fn stream_ref_source_sink<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
899where
900 T: Send + 'static,
901{
902 Sink::from_runner(move |input, materializer| {
903 let shared = Arc::new(ProducerShared::new());
904 let (producer_ref, _handle) = spawn_producer_actor(None, Arc::clone(&shared))?;
905 let producer_for_task = producer_ref.clone();
906 let completion = materializer.spawn_stream(move |cancelled| {
907 run_producer_endpoint(input, shared, producer_for_task, settings, cancelled)
908 });
909 Ok(SourceRef {
910 inner: Arc::new(SourceRefInner {
911 producer: producer_ref,
912 settings,
913 subscribed: AtomicBool::new(false),
914 _keep_alive: Mutex::new(Some(completion)),
915 }),
916 })
917 })
918}
919
920fn stream_ref_sink_source<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
921where
922 T: Send + 'static,
923{
924 Source::from_materialized_factory(move |_materializer| {
925 let shared = Arc::new(ConsumerShared::new(settings));
926 let (consumer_ref, _handle) = spawn_consumer_actor(None, Arc::clone(&shared))?;
927 let sink_ref = SinkRef {
928 inner: Arc::new(SinkRefInner {
929 consumer: consumer_ref.clone(),
930 settings,
931 subscribed: AtomicBool::new(false),
932 }),
933 };
934 Ok((
935 Box::new(ConsumerStream {
936 shared,
937 actor_ref: Some(consumer_ref),
938 settings,
939 terminated: false,
940 source_ref_keep_alive: None,
941 }) as BoxStream<T>,
942 sink_ref,
943 ))
944 })
945}
946
947fn run_producer_endpoint<T>(
948 mut input: BoxStream<T>,
949 shared: Arc<ProducerShared<T>>,
950 producer_ref: ActorRef<ProducerCommand<T>>,
951 settings: StreamRefSettings,
952 cancelled: Arc<AtomicBool>,
953) -> StreamResult<NotUsed>
954where
955 T: Send + 'static,
956{
957 let deadline = Instant::now()
958 .checked_add(settings.subscription_timeout)
959 .unwrap_or_else(far_future);
960
961 loop {
962 let consumer = match wait_for_remote_demand(&shared, deadline, &cancelled) {
963 Ok(consumer) => consumer,
964 Err(error) => {
965 producer_ref.stop(None);
966 return Err(error);
967 }
968 };
969 if cancelled.load(Ordering::SeqCst) {
970 return Err(StreamError::Cancelled);
971 }
972
973 match input.next() {
974 Some(Ok(item)) => {
975 let seq = {
976 let mut inner = shared.lock();
977 let seq = inner.sent;
978 inner.sent = inner.sent.saturating_add(1);
979 seq
980 };
981 if let Err(error) = cast_actor(
982 &consumer,
983 ConsumerCommand::Element {
984 producer: producer_ref.clone(),
985 seq,
986 item,
987 },
988 ) {
989 return Err(match error {
990 StreamError::ActorTerminated => StreamError::Cancelled,
991 other => other,
992 });
993 }
994 }
995 Some(Err(error)) => {
996 let _ = cast_actor(
997 &consumer,
998 ConsumerCommand::Failure {
999 producer: producer_ref.clone(),
1000 error: error.clone(),
1001 },
1002 );
1003 return Err(error);
1004 }
1005 None => {
1006 let seq = shared.lock().sent;
1007 let _ = cast_actor(
1008 &consumer,
1009 ConsumerCommand::Complete {
1010 producer: producer_ref.clone(),
1011 seq,
1012 },
1013 );
1014 return Ok(NotUsed);
1015 }
1016 }
1017 }
1018}
1019
1020fn wait_for_remote_demand<T>(
1021 shared: &Arc<ProducerShared<T>>,
1022 deadline: Instant,
1023 cancelled: &Arc<AtomicBool>,
1024) -> StreamResult<ActorRef<ConsumerCommand<T>>>
1025where
1026 T: Send + 'static,
1027{
1028 let mut inner = shared.lock();
1029 loop {
1030 if cancelled.load(Ordering::SeqCst) {
1031 return Err(StreamError::Cancelled);
1032 }
1033 if let Some(error) = inner.stopped.clone() {
1034 return Err(error);
1035 }
1036 if let Some(consumer) = &inner.consumer
1037 && inner.sent < inner.cumulative_demand
1038 {
1039 return Ok(consumer.clone());
1040 }
1041 let now = Instant::now();
1042 if inner.consumer.is_none() && now >= deadline {
1043 return Err(subscription_timeout_error("stream ref sink"));
1044 }
1045 let remaining = deadline.saturating_duration_since(now);
1046 let timeout = if inner.consumer.is_none() {
1047 remaining.min(STREAM_REF_WAIT_POLL)
1048 } else {
1049 STREAM_REF_WAIT_POLL
1050 };
1051 let (next, _) = wait_timeout_unpoison(&shared.changed, inner, timeout);
1052 inner = next;
1053 }
1054}
1055
1056fn spawn_producer_actor<T>(
1057 initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
1058 shared: Arc<ProducerShared<T>>,
1059) -> StreamResult<(
1060 ActorRef<ProducerCommand<T>>,
1061 ractor::concurrency::JoinHandle<()>,
1062)>
1063where
1064 T: Send + 'static,
1065{
1066 block_on_ractor_runtime(Actor::spawn(
1067 None,
1068 ProducerActor {
1069 shared,
1070 initial_consumer,
1071 },
1072 (),
1073 ))?
1074 .map_err(|error| {
1075 StreamError::Failed(format!(
1076 "stream ref producer actor failed to spawn: {error}"
1077 ))
1078 })
1079}
1080
1081fn spawn_consumer_actor<T>(
1082 initial_producer: Option<ActorRef<ProducerCommand<T>>>,
1083 shared: Arc<ConsumerShared<T>>,
1084) -> StreamResult<(
1085 ActorRef<ConsumerCommand<T>>,
1086 ractor::concurrency::JoinHandle<()>,
1087)>
1088where
1089 T: Send + 'static,
1090{
1091 block_on_ractor_runtime(Actor::spawn(
1092 None,
1093 ConsumerActor {
1094 shared,
1095 initial_producer,
1096 },
1097 (),
1098 ))?
1099 .map_err(|error| {
1100 StreamError::Failed(format!(
1101 "stream ref consumer actor failed to spawn: {error}"
1102 ))
1103 })
1104}
1105
1106fn cast_actor<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
1107where
1108 Msg: Message,
1109{
1110 match actor_ref.cast(message) {
1111 Ok(()) => Ok(()),
1112 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
1113 Err(StreamError::ActorTerminated)
1114 }
1115 Err(error) => Err(StreamError::ActorAskSendFailed {
1116 reason: error.to_string(),
1117 }),
1118 }
1119}
1120
1121fn same_actor<MsgA, MsgB>(left: &ActorRef<MsgA>, right: &ActorRef<MsgB>) -> bool
1122where
1123 MsgA: Message,
1124 MsgB: Message,
1125{
1126 left.get_cell().get_id() == right.get_cell().get_id()
1127}
1128
1129fn failed_once<T>(reason: &str) -> BoxStream<T>
1130where
1131 T: Send + 'static,
1132{
1133 let error = StreamError::Failed(reason.to_owned());
1134 Box::new(std::iter::once(Err(error)))
1135}
1136
1137fn subscription_timeout_error(side: &str) -> StreamError {
1138 StreamError::Failed(format!(
1139 "{side} remote side did not subscribe within subscription timeout"
1140 ))
1141}
1142
1143fn invalid_sequence_error(expected: u64, got: u64, context: &str) -> StreamError {
1144 StreamError::Failed(format!(
1145 "{context}: expected sequence {expected}, got {got}"
1146 ))
1147}
1148
1149fn next_redelivery_deadline(settings: StreamRefSettings) -> Instant {
1150 Instant::now()
1151 .checked_add(settings.demand_redelivery_interval)
1152 .unwrap_or_else(far_future)
1153}
1154
1155fn far_future() -> Instant {
1156 Instant::now() + Duration::from_secs(60 * 60 * 24 * 365)
1157}
1158
1159fn wait_timeout_unpoison<'a, T>(
1160 condvar: &Condvar,
1161 guard: MutexGuard<'a, T>,
1162 timeout: Duration,
1163) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
1164 condvar
1165 .wait_timeout(guard, timeout)
1166 .unwrap_or_else(|poison| poison.into_inner())
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use super::*;
1172 use crate::{
1173 stream::{Keep, Source},
1174 testkit::TestSink,
1175 };
1176 use std::sync::{
1177 Arc as StdArc,
1178 atomic::{AtomicBool, AtomicUsize, Ordering},
1179 };
1180
1181 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1182 let deadline = Instant::now() + timeout;
1183 while Instant::now() < deadline {
1184 if condition() {
1185 return true;
1186 }
1187 std::thread::park_timeout(Duration::from_millis(1));
1188 }
1189 condition()
1190 }
1191
1192 fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
1193 let deadline = Instant::now() + timeout;
1194 while Instant::now() < deadline {
1195 assert!(condition());
1196 std::thread::park_timeout(Duration::from_millis(1));
1197 }
1198 assert!(condition());
1199 }
1200
1201 fn short_settings() -> StreamRefSettings {
1202 StreamRefSettings::default()
1203 .with_buffer_capacity(1)
1204 .with_subscription_timeout(Duration::from_millis(50))
1205 .with_demand_redelivery_interval(Duration::from_millis(10))
1206 }
1207
1208 #[test]
1209 fn source_ref_streams_elements_and_completion() {
1210 let source_ref = Source::from_iter(1_u64..=3)
1211 .run_with(StreamRefs::source_ref())
1212 .unwrap();
1213
1214 assert_eq!(source_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
1215 }
1216
1217 #[test]
1218 fn sink_ref_streams_elements_and_completion() {
1219 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1220 .to_mat(Sink::collect(), Keep::both)
1221 .run()
1222 .unwrap();
1223
1224 Source::from_iter(1_u64..=3)
1225 .run_with(sink_ref.sink())
1226 .unwrap()
1227 .wait()
1228 .unwrap();
1229
1230 assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
1231 }
1232
1233 #[test]
1234 fn source_ref_propagates_upstream_failure() {
1235 let source_ref = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
1236 .run_with(StreamRefs::source_ref())
1237 .unwrap();
1238
1239 assert_eq!(
1240 source_ref.source().run_collect(),
1241 Err(StreamError::Failed("boom".to_owned()))
1242 );
1243 }
1244
1245 #[test]
1246 fn sink_ref_propagates_upstream_failure() {
1247 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1248 .to_mat(Sink::collect(), Keep::both)
1249 .run()
1250 .unwrap();
1251
1252 let failure = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
1253 .run_with(sink_ref.sink())
1254 .unwrap()
1255 .wait();
1256
1257 assert_eq!(failure, Err(StreamError::Failed("remote boom".to_owned())));
1258 assert_eq!(
1259 completion.wait(),
1260 Err(StreamError::Failed("remote boom".to_owned()))
1261 );
1262 }
1263
1264 #[test]
1265 fn source_ref_cancellation_reaches_origin() {
1266 let closed = StdArc::new(AtomicBool::new(false));
1267 let close_flag = StdArc::clone(&closed);
1268 let source = Source::unfold_resource(
1269 || Ok(()),
1270 |_state| Ok(Some(1_u64)),
1271 move |_state| {
1272 close_flag.store(true, Ordering::SeqCst);
1273 Ok(())
1274 },
1275 );
1276 let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
1277
1278 assert_eq!(source_ref.source().take(1).run_collect().unwrap(), vec![1]);
1279 assert!(wait_until(Duration::from_secs(1), || {
1280 closed.load(Ordering::SeqCst)
1281 }));
1282 }
1283
1284 #[test]
1285 fn sink_ref_cancellation_stops_remote_producer() {
1286 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1287 .take(1)
1288 .to_mat(Sink::collect(), Keep::both)
1289 .run()
1290 .unwrap();
1291
1292 let producer = Source::repeat(1_u64)
1293 .run_with(sink_ref.sink())
1294 .unwrap()
1295 .wait();
1296
1297 assert_eq!(completion.wait().unwrap(), vec![1]);
1298 assert_eq!(producer, Err(StreamError::Cancelled));
1299 }
1300
1301 #[test]
1302 fn source_ref_backpressures_across_ref() {
1303 let pulled = StdArc::new(AtomicUsize::new(0));
1304 let pulled_for_source = StdArc::clone(&pulled);
1305 let source = Source::unfold(0_u64, move |next| {
1306 pulled_for_source.fetch_add(1, Ordering::SeqCst);
1307 Some((next + 1, next))
1308 });
1309 let source_ref = source
1310 .run_with(StreamRefs::source_ref_with_settings(short_settings()))
1311 .unwrap();
1312 let mut probe = source_ref.source().run_with(TestSink::probe()).unwrap();
1313
1314 probe.request(1);
1315 probe.assert_next(0);
1316 assert!(wait_until(Duration::from_secs(1), || {
1317 pulled.load(Ordering::SeqCst) >= 2
1318 }));
1319 assert_condition_holds(Duration::from_millis(50), || {
1320 pulled.load(Ordering::SeqCst) <= 2
1321 });
1322
1323 probe.request(1);
1324 probe.assert_next(1);
1325 assert!(wait_until(Duration::from_secs(1), || {
1326 pulled.load(Ordering::SeqCst) >= 3
1327 }));
1328 assert_condition_holds(Duration::from_millis(50), || {
1329 pulled.load(Ordering::SeqCst) <= 3
1330 });
1331
1332 probe.cancel();
1333 }
1334
1335 #[test]
1336 fn source_ref_late_subscription_observes_timeout() {
1337 let source_ref = Source::repeat(1_u64)
1338 .run_with(StreamRefs::source_ref_with_settings(short_settings()))
1339 .unwrap();
1340
1341 assert!(wait_until(Duration::from_secs(1), || {
1342 source_ref.inner.producer.get_status() == ractor::ActorStatus::Stopped
1343 }));
1344
1345 let result = source_ref.source().run_collect();
1346 assert!(matches!(
1347 result,
1348 Err(StreamError::ActorTerminated) | Err(StreamError::Failed(_))
1349 ));
1350 }
1351
1352 #[test]
1353 fn sink_ref_subscription_timeout_fails_local_source() {
1354 let (_sink_ref, probe) = StreamRefs::sink_ref_with_settings::<u64>(short_settings())
1355 .to_mat(TestSink::probe(), Keep::both)
1356 .run()
1357 .unwrap();
1358
1359 probe.request(1);
1360 let error = probe.expect_error();
1361 assert!(
1362 matches!(error, StreamError::Failed(message) if message.contains("did not subscribe"))
1363 );
1364 }
1365
1366 #[test]
1367 fn stream_refs_are_one_shot() {
1368 let source_ref = Source::from_iter([1_u64])
1369 .run_with(StreamRefs::source_ref())
1370 .unwrap();
1371 assert_eq!(source_ref.source().run_collect().unwrap(), vec![1]);
1372 assert!(matches!(
1373 source_ref.source().run_collect(),
1374 Err(StreamError::Failed(message)) if message.contains("already")
1375 ));
1376
1377 let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
1378 .to_mat(Sink::collect(), Keep::both)
1379 .run()
1380 .unwrap();
1381 Source::single(1_u64)
1382 .run_with(sink_ref.sink())
1383 .unwrap()
1384 .wait()
1385 .unwrap();
1386 assert!(matches!(
1387 Source::single(2_u64).run_with(sink_ref.sink()).unwrap().wait(),
1388 Err(StreamError::Failed(message)) if message.contains("already")
1389 ));
1390 assert_eq!(completion.wait().unwrap(), vec![1]);
1391 }
1392}