1use std::{
2 collections::VecDeque,
3 fmt,
4 marker::PhantomData,
5 panic::{AssertUnwindSafe, catch_unwind},
6 sync::{
7 Arc, Condvar, Mutex, MutexGuard,
8 atomic::{AtomicBool, Ordering},
9 },
10 thread::{self, Thread, ThreadId},
11 time::{Duration, Instant},
12};
13
14use crate::{
15 context::FlowWithContext,
16 stream::{BoxStream, Flow, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult},
17};
18
19use super::{
20 ASK_IDLE_YIELDS, ASK_MAX_PARK, ASK_READY_SPINS, ASK_TIME_REFRESH_ITERS, Actor, ActorFlow,
21 ActorRef, ActorResult, InFlightAsk, Message, ReplyPoll, ReplyPort, ReplyState,
22 block_on_ractor_runtime, panic_reason, recycle_reply_state, wait_for_ready_ask,
23};
24
25#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum ActorStatus<T> {
31 Ok(T),
32 Err(StreamError),
33}
34
35impl<T> From<StreamResult<T>> for ActorStatus<T> {
36 fn from(value: StreamResult<T>) -> Self {
37 match value {
38 Ok(value) => Self::Ok(value),
39 Err(error) => Self::Err(error),
40 }
41 }
42}
43
44impl<T> ActorStatus<T> {
45 fn into_result(self) -> StreamResult<T> {
46 match self {
47 Self::Ok(value) => Ok(value),
48 Self::Err(error) => Err(error),
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum ActorSourceMessage<T> {
57 Element(T),
58 Complete,
59 Fail(String),
60}
61
62#[cfg(feature = "cluster")]
63impl<T: Send + 'static> Message for ActorSourceMessage<T> {}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum ActorSinkMessage<T> {
68 Element(T),
69 Complete,
70 Fail(StreamError),
71}
72
73#[cfg(feature = "cluster")]
74impl<T: Send + 'static> Message for ActorSinkMessage<T> {}
75
76#[derive(Debug)]
78pub enum ActorSinkBackpressureMessage<T, Ack> {
79 Init(ReplyPort<Ack>),
80 Element(T, ReplyPort<Ack>),
81 Complete,
82 Fail(StreamError),
83}
84
85#[cfg(feature = "cluster")]
86impl<T: Send + 'static, Ack: Send + 'static> Message for ActorSinkBackpressureMessage<T, Ack> {}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct WatchEvent {
92 reason: String,
93}
94
95impl WatchEvent {
96 #[must_use]
97 pub fn reason(&self) -> &str {
98 &self.reason
99 }
100
101 fn into_stream_error(self) -> StreamError {
102 StreamError::Failed(self.reason)
103 }
104}
105
106impl fmt::Display for WatchEvent {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.write_str(&self.reason)
109 }
110}
111
112pub struct ActorSource;
114
115pub struct ActorSink;
117
118pub struct ActorPubSub;
125
126impl ActorFlow {
127 #[must_use]
132 pub fn ask_with_status<In, Msg, Out, F>(
133 actor_ref: ActorRef<Msg>,
134 parallelism: usize,
135 timeout: Duration,
136 make_msg: F,
137 ) -> Flow<In, Out, NotUsed>
138 where
139 In: Send + 'static,
140 Msg: Message,
141 Out: Send + 'static,
142 F: Fn(In, ReplyPort<ActorStatus<Out>>) -> Msg + Send + Sync + 'static,
143 {
144 ask_flow_with_pending(
145 actor_ref,
146 parallelism,
147 timeout,
148 move |input, reply_to| (make_msg(input, reply_to), ()),
149 |(), reply| reply.into_result(),
150 )
151 }
152
153 #[must_use]
158 pub fn ask_with_context<In, Ctx, Msg, Out, F>(
159 actor_ref: ActorRef<Msg>,
160 parallelism: usize,
161 timeout: Duration,
162 make_msg: F,
163 ) -> FlowWithContext<In, Ctx, Out, Ctx, NotUsed>
164 where
165 In: Send + 'static,
166 Ctx: Send + 'static,
167 Msg: Message,
168 Out: Send + 'static,
169 F: Fn(In, ReplyPort<Out>) -> Msg + Send + Sync + 'static,
170 {
171 FlowWithContext::from_flow(ask_flow_with_pending(
172 actor_ref,
173 parallelism,
174 timeout,
175 move |(input, context), reply_to| (make_msg(input, reply_to), context),
176 |context, reply| Ok((reply, context)),
177 ))
178 }
179
180 #[must_use]
182 pub fn ask_with_status_and_context<In, Ctx, Msg, Out, F>(
183 actor_ref: ActorRef<Msg>,
184 parallelism: usize,
185 timeout: Duration,
186 make_msg: F,
187 ) -> FlowWithContext<In, Ctx, Out, Ctx, NotUsed>
188 where
189 In: Send + 'static,
190 Ctx: Send + 'static,
191 Msg: Message,
192 Out: Send + 'static,
193 F: Fn(In, ReplyPort<ActorStatus<Out>>) -> Msg + Send + Sync + 'static,
194 {
195 FlowWithContext::from_flow(ask_flow_with_pending(
196 actor_ref,
197 parallelism,
198 timeout,
199 move |(input, context), reply_to| (make_msg(input, reply_to), context),
200 |context, reply| reply.into_result().map(|reply| (reply, context)),
201 ))
202 }
203
204 #[must_use]
211 pub fn watch<T, Msg>(actor_ref: ActorRef<Msg>) -> Flow<T, T, NotUsed>
212 where
213 T: Send + 'static,
214 Msg: Message,
215 {
216 Flow::from_runtime_transform(move |input, materializer| {
217 let shared = Arc::new(WatchShared::default());
218 let (monitor_ref, _handle) =
219 spawn_watch_monitor(actor_ref.get_cell(), Arc::clone(&shared))?;
220 let producer_shared = Arc::clone(&shared);
221 let producer_completion = materializer.spawn_stream(move |cancelled| {
222 run_watch_upstream(input, producer_shared, cancelled)
223 });
224 Ok(Box::new(WatchStream {
225 shared,
226 monitor_ref: Some(monitor_ref),
227 producer_completion: Some(producer_completion),
228 terminated: false,
229 }) as BoxStream<T>)
230 })
231 }
232}
233
234impl ActorSource {
235 #[must_use]
245 pub fn actor_ref<T>() -> Source<T, ActorRef<ActorSourceMessage<T>>>
246 where
247 T: Send + 'static,
248 {
249 actor_source(None::<SourceBackpressure<ActorSourceMessage<T>>>)
250 }
251
252 #[must_use]
255 pub fn typed<T>() -> Source<T, ActorRef<ActorSourceMessage<T>>>
256 where
257 T: Send + 'static,
258 {
259 Self::actor_ref()
260 }
261
262 #[must_use]
268 pub fn actor_ref_with_backpressure<T, AckMsg>(
269 ack_to: ActorRef<AckMsg>,
270 ack_message: AckMsg,
271 ) -> Source<T, ActorRef<ActorSourceMessage<T>>>
272 where
273 T: Send + 'static,
274 AckMsg: Message + Clone + Sync,
275 {
276 actor_source(Some(SourceBackpressure {
277 ack_to,
278 make_ack: Arc::new(move || ack_message.clone()),
279 _marker: PhantomData,
280 }))
281 }
282}
283
284impl ActorSink {
285 #[must_use]
295 pub fn actor_ref<In, Msg, Elem, Complete, Failure>(
296 actor_ref: ActorRef<Msg>,
297 make_element_message: Elem,
298 on_complete_message: Complete,
299 on_failure_message: Failure,
300 ) -> Sink<In, StreamCompletion<NotUsed>>
301 where
302 In: Send + 'static,
303 Msg: Message,
304 Elem: Fn(In) -> Msg + Send + Sync + 'static,
305 Complete: Fn() -> Msg + Send + Sync + 'static,
306 Failure: Fn(StreamError) -> Msg + Send + Sync + 'static,
307 {
308 let make_element_message = Arc::new(make_element_message);
309 let on_complete_message = Arc::new(on_complete_message);
310 let on_failure_message = Arc::new(on_failure_message);
311 Sink::from_runner(move |mut input, materializer| {
312 let actor_ref = actor_ref.clone();
313 let make_element_message = Arc::clone(&make_element_message);
314 let on_complete_message = Arc::clone(&on_complete_message);
315 let on_failure_message = Arc::clone(&on_failure_message);
316 Ok(materializer.spawn_stream(move |cancelled| {
317 run_actor_ref_sink(
318 &mut input,
319 cancelled,
320 actor_ref,
321 make_element_message,
322 on_complete_message,
323 on_failure_message,
324 )
325 }))
326 })
327 }
328
329 #[must_use]
336 pub fn typed<In>(
337 actor_ref: ActorRef<ActorSinkMessage<In>>,
338 ) -> Sink<In, StreamCompletion<NotUsed>>
339 where
340 In: Send + 'static,
341 {
342 Self::actor_ref(
343 actor_ref,
344 ActorSinkMessage::Element,
345 || ActorSinkMessage::Complete,
346 ActorSinkMessage::Fail,
347 )
348 }
349
350 #[must_use]
357 pub fn actor_ref_with_backpressure<In, Msg, Ack, Init, Elem, Complete, Failure>(
358 actor_ref: ActorRef<Msg>,
359 timeout: Duration,
360 make_init_message: Init,
361 make_element_message: Elem,
362 on_complete_message: Complete,
363 on_failure_message: Failure,
364 ) -> Sink<In, StreamCompletion<NotUsed>>
365 where
366 In: Send + 'static,
367 Msg: Message,
368 Ack: Send + 'static,
369 Init: Fn(ReplyPort<Ack>) -> Msg + Send + Sync + 'static,
370 Elem: Fn(In, ReplyPort<Ack>) -> Msg + Send + Sync + 'static,
371 Complete: Fn() -> Msg + Send + Sync + 'static,
372 Failure: Fn(StreamError) -> Msg + Send + Sync + 'static,
373 {
374 let make_init_message = Arc::new(make_init_message);
375 let make_element_message = Arc::new(make_element_message);
376 let on_complete_message = Arc::new(on_complete_message);
377 let on_failure_message = Arc::new(on_failure_message);
378 Sink::from_runner(move |mut input, materializer| {
379 let actor_ref = actor_ref.clone();
380 let make_init_message = Arc::clone(&make_init_message);
381 let make_element_message = Arc::clone(&make_element_message);
382 let on_complete_message = Arc::clone(&on_complete_message);
383 let on_failure_message = Arc::clone(&on_failure_message);
384 Ok(materializer.spawn_stream(move |cancelled| {
385 send_and_wait_ack(&actor_ref, timeout, |reply_to| make_init_message(reply_to))?;
386 run_actor_ref_backpressure_sink(
387 &mut input,
388 cancelled,
389 actor_ref,
390 timeout,
391 make_element_message,
392 on_complete_message,
393 on_failure_message,
394 )
395 }))
396 })
397 }
398
399 #[must_use]
401 pub fn typed_with_backpressure<In, Ack>(
402 actor_ref: ActorRef<ActorSinkBackpressureMessage<In, Ack>>,
403 timeout: Duration,
404 ) -> Sink<In, StreamCompletion<NotUsed>>
405 where
406 In: Send + 'static,
407 Ack: Send + 'static,
408 {
409 Self::actor_ref_with_backpressure(
410 actor_ref,
411 timeout,
412 ActorSinkBackpressureMessage::Init,
413 ActorSinkBackpressureMessage::Element,
414 || ActorSinkBackpressureMessage::Complete,
415 ActorSinkBackpressureMessage::Fail,
416 )
417 }
418}
419
420impl ActorPubSub {
421 #[must_use]
426 pub fn source<T>(group: impl Into<String>) -> Source<T, ActorRef<ActorSourceMessage<T>>>
427 where
428 T: Send + 'static,
429 {
430 let group = group.into();
431 ActorSource::actor_ref().map_materialized_value(move |actor_ref| {
432 ractor::pg::join(group.clone(), vec![actor_ref.get_cell()]);
433 actor_ref
434 })
435 }
436
437 #[must_use]
444 pub fn sink<T>(group: impl Into<String>) -> Sink<T, StreamCompletion<NotUsed>>
445 where
446 T: Clone + Send + 'static,
447 {
448 let group = group.into();
449 Sink::from_runner(move |mut input, materializer| {
450 let group = group.clone();
451 Ok(materializer.spawn_stream(move |cancelled| {
452 loop {
453 if cancelled.load(Ordering::SeqCst) {
454 return Err(StreamError::Cancelled);
455 }
456 match input.next() {
457 Some(Ok(item)) => {
458 broadcast_to_group(&group, ActorSourceMessage::Element(item))?;
459 }
460 Some(Err(error)) => {
461 let _ = broadcast_to_group(
462 &group,
463 ActorSourceMessage::<T>::Fail(error.to_string()),
464 );
465 return Err(error);
466 }
467 None => {
468 broadcast_to_group(&group, ActorSourceMessage::<T>::Complete)?;
469 return Ok(NotUsed);
470 }
471 }
472 }
473 }))
474 })
475 }
476}
477
478fn ask_flow_with_pending<In, Msg, Reply, Out, Pending, Prepare, Finish>(
479 actor_ref: ActorRef<Msg>,
480 parallelism: usize,
481 timeout: Duration,
482 prepare: Prepare,
483 finish: Finish,
484) -> Flow<In, Out, NotUsed>
485where
486 In: Send + 'static,
487 Msg: Message,
488 Reply: Send + 'static,
489 Out: Send + 'static,
490 Pending: Send + 'static,
491 Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
492 Finish: Fn(Pending, Reply) -> StreamResult<Out> + Send + Sync + 'static,
493{
494 assert!(
495 parallelism > 0,
496 "ActorFlow ask parallelism must be greater than zero"
497 );
498 let prepare = Arc::new(prepare);
499 let finish = Arc::new(finish);
500 Flow::from_transform(move |input| {
501 ask_ractor_ordered_with_pending(
502 input,
503 actor_ref.clone(),
504 parallelism,
505 timeout,
506 Arc::clone(&prepare),
507 Arc::clone(&finish),
508 )
509 })
510}
511
512fn ask_ractor_ordered_with_pending<In, Msg, Reply, Out, Pending, Prepare, Finish>(
513 mut input: BoxStream<In>,
514 actor_ref: ActorRef<Msg>,
515 parallelism: usize,
516 timeout: Duration,
517 prepare: Arc<Prepare>,
518 finish: Arc<Finish>,
519) -> BoxStream<Out>
520where
521 In: Send + 'static,
522 Msg: Message,
523 Reply: Send + 'static,
524 Out: Send + 'static,
525 Pending: Send + 'static,
526 Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
527 Finish: Fn(Pending, Reply) -> StreamResult<Out> + Send + Sync + 'static,
528{
529 let mut in_flight = Vec::<InFlightAskWith<Reply, Pending>>::with_capacity(parallelism);
530 let mut next_index = 0_usize;
531 let mut next_to_emit = 0_usize;
532 let mut completed = Vec::with_capacity(parallelism);
533 let mut reply_pool = Vec::with_capacity(parallelism);
534 let mut input_done = false;
535
536 Box::new(std::iter::from_fn(move || {
537 loop {
538 if let Some(result) = take_completed_with_pending(&mut completed, next_to_emit) {
539 next_to_emit += 1;
540 return Some(result);
541 }
542
543 while in_flight.len() < parallelism && !input_done {
544 match input.next() {
545 Some(Ok(item)) => {
546 let index = next_index;
547 next_index += 1;
548 match start_ractor_ask_with_pending(
549 index,
550 actor_ref.clone(),
551 timeout,
552 item,
553 Arc::clone(&prepare),
554 &mut reply_pool,
555 ) {
556 Ok(ask) => in_flight.push(ask),
557 Err(error) => {
558 completed.push((index, Err(error)));
559 input_done = true;
560 }
561 }
562 }
563 Some(Err(error)) => {
564 completed.push((next_index, Err(error)));
565 next_index += 1;
566 input_done = true;
567 }
568 None => input_done = true,
569 }
570 }
571
572 if let Some(result) = take_completed_with_pending(&mut completed, next_to_emit) {
573 next_to_emit += 1;
574 return Some(result);
575 }
576
577 if in_flight.is_empty() {
578 return None;
579 }
580
581 let ask = wait_for_ready_ask_with_pending(&mut in_flight, timeout, finish.as_ref());
582 let index = ask.index;
583 let result = ask.result;
584 recycle_reply_state(ask.state, &mut reply_pool);
585 if index == next_to_emit {
586 next_to_emit += 1;
587 return Some(result);
588 }
589 completed.push((index, result));
590 }
591 }))
592}
593
594fn take_completed_with_pending<Out>(
595 completed: &mut Vec<(usize, StreamResult<Out>)>,
596 index: usize,
597) -> Option<StreamResult<Out>> {
598 let position = completed
599 .iter()
600 .position(|(completed_index, _)| *completed_index == index)?;
601 Some(completed.swap_remove(position).1)
602}
603
604struct InFlightAskWith<Reply, Pending> {
605 index: usize,
606 state: Option<Arc<ReplyState<Reply>>>,
607 pending: Option<Pending>,
608 deadline: Option<Instant>,
609}
610
611impl<Reply, Pending> InFlightAskWith<Reply, Pending> {
612 fn state(&self) -> &Arc<ReplyState<Reply>> {
613 self.state.as_ref().expect("in-flight ask has reply state")
614 }
615
616 fn into_parts(mut self) -> (Arc<ReplyState<Reply>>, Pending) {
617 let state = self.state.take().expect("in-flight ask has reply state");
618 let pending = self
619 .pending
620 .take()
621 .expect("in-flight ask has pending state");
622 (state, pending)
623 }
624}
625
626impl<Reply, Pending> Drop for InFlightAskWith<Reply, Pending> {
627 fn drop(&mut self) {
628 if let Some(state) = &self.state {
629 state.close_receiver();
630 }
631 }
632}
633
634struct CompletedAskWith<Reply, Out> {
635 index: usize,
636 result: StreamResult<Out>,
637 state: Arc<ReplyState<Reply>>,
638}
639
640fn start_ractor_ask_with_pending<In, Msg, Reply, Pending, Prepare>(
641 index: usize,
642 actor_ref: ActorRef<Msg>,
643 timeout: Duration,
644 input: In,
645 prepare: Arc<Prepare>,
646 reply_pool: &mut Vec<Arc<ReplyState<Reply>>>,
647) -> StreamResult<InFlightAskWith<Reply, Pending>>
648where
649 In: Send + 'static,
650 Msg: Message,
651 Reply: Send + 'static,
652 Pending: Send + 'static,
653 Prepare: Fn(In, ReplyPort<Reply>) -> (Msg, Pending) + Send + Sync + 'static,
654{
655 let reply_state = match reply_pool.pop() {
656 Some(state) => {
657 state.reset(timeout);
658 state
659 }
660 None => Arc::new(ReplyState::new(timeout)),
661 };
662 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
663 let (message, pending) =
664 catch_unwind(AssertUnwindSafe(|| prepare(input, reply_to))).map_err(|panic| {
665 StreamError::ActorAskTaskFailed {
666 reason: panic_reason(panic),
667 }
668 })?;
669
670 send_actor_message(&actor_ref, message)?;
671
672 Ok(InFlightAskWith {
673 index,
674 state: Some(reply_state),
675 pending: Some(pending),
676 deadline: Instant::now().checked_add(timeout),
677 })
678}
679
680fn wait_for_ready_ask_with_pending<Reply, Pending, Out, Finish>(
681 in_flight: &mut Vec<InFlightAskWith<Reply, Pending>>,
682 timeout: Duration,
683 finish: &Finish,
684) -> CompletedAskWith<Reply, Out>
685where
686 Reply: Send + 'static,
687 Pending: Send + 'static,
688 Out: Send + 'static,
689 Finish: Fn(Pending, Reply) -> StreamResult<Out>,
690{
691 let mut idle_spins = 0;
692 let mut idle_yields = 0;
693 let mut time_refresh = 0_u32;
694 let mut now = Instant::now();
695 loop {
696 if time_refresh == 0 {
697 now = Instant::now();
698 }
699 time_refresh = (time_refresh + 1) % ASK_TIME_REFRESH_ITERS;
700 if let Some(ask) = take_ready_ask_with_pending(in_flight, timeout, now, finish) {
701 return ask;
702 }
703
704 if idle_spins < ASK_READY_SPINS {
705 idle_spins += 1;
706 std::hint::spin_loop();
707 } else if idle_yields < ASK_IDLE_YIELDS {
708 idle_yields += 1;
709 time_refresh = 0;
710 thread::yield_now();
711 } else {
712 idle_spins = 0;
713 idle_yields = 0;
714 time_refresh = 0;
715 let current = thread::current();
716 let registered = register_ask_waiters_with_pending(in_flight, ¤t);
717 now = Instant::now();
718 if let Some(ask) = take_ready_ask_with_pending(in_flight, timeout, now, finish) {
719 unregister_ask_waiters_with_pending(registered, current.id());
720 return ask;
721 }
722 thread::park_timeout(next_ask_park_with_pending(in_flight, now));
723 unregister_ask_waiters_with_pending(registered, current.id());
724 }
725 }
726}
727
728fn take_ready_ask_with_pending<Reply, Pending, Out, Finish>(
729 in_flight: &mut Vec<InFlightAskWith<Reply, Pending>>,
730 timeout: Duration,
731 now: Instant,
732 finish: &Finish,
733) -> Option<CompletedAskWith<Reply, Out>>
734where
735 Finish: Fn(Pending, Reply) -> StreamResult<Out>,
736{
737 let mut index = 0;
738 while index < in_flight.len() {
739 match in_flight[index].state().poll() {
740 ReplyPoll::Ready(reply) => {
741 let ask = in_flight.swap_remove(index);
742 let ask_index = ask.index;
743 let (state, pending) = ask.into_parts();
744 return Some(CompletedAskWith {
745 index: ask_index,
746 result: finish(pending, reply),
747 state,
748 });
749 }
750 ReplyPoll::Dropped => {
751 let ask = in_flight.swap_remove(index);
752 let ask_index = ask.index;
753 let (state, _pending) = ask.into_parts();
754 return Some(CompletedAskWith {
755 index: ask_index,
756 result: Err(StreamError::ActorAskResponseDropped),
757 state,
758 });
759 }
760 ReplyPoll::Pending => {
761 if in_flight[index]
762 .deadline
763 .is_some_and(|deadline| now >= deadline)
764 {
765 let outcome = in_flight[index].state().close_on_timeout();
766 let ask = in_flight.swap_remove(index);
767 let ask_index = ask.index;
768 let (state, pending) = ask.into_parts();
769 let result = match outcome {
770 ReplyPoll::Ready(reply) => finish(pending, reply),
771 ReplyPoll::Dropped => Err(StreamError::ActorAskResponseDropped),
772 ReplyPoll::Pending => Err(StreamError::ActorAskTimeout { timeout }),
773 };
774 return Some(CompletedAskWith {
775 index: ask_index,
776 result,
777 state,
778 });
779 }
780 index += 1;
781 }
782 }
783 }
784 None
785}
786
787fn register_ask_waiters_with_pending<Reply, Pending>(
788 in_flight: &[InFlightAskWith<Reply, Pending>],
789 current: &Thread,
790) -> Vec<Arc<ReplyState<Reply>>> {
791 let mut registered = Vec::with_capacity(in_flight.len());
792 for ask in in_flight {
793 ask.state().register_waiter(current.clone());
794 registered.push(Arc::clone(ask.state()));
795 }
796 registered
797}
798
799fn unregister_ask_waiters_with_pending<Reply>(
800 registered: Vec<Arc<ReplyState<Reply>>>,
801 current_id: ThreadId,
802) {
803 for state in registered {
804 state.unregister_waiter(current_id);
805 }
806}
807
808fn next_ask_park_with_pending<Reply, Pending>(
809 in_flight: &[InFlightAskWith<Reply, Pending>],
810 now: Instant,
811) -> Duration {
812 in_flight
813 .iter()
814 .filter_map(|ask| ask.deadline)
815 .map(|deadline| deadline.saturating_duration_since(now))
816 .min()
817 .unwrap_or(ASK_MAX_PARK)
818 .min(ASK_MAX_PARK)
819}
820
821struct SourceBackpressure<AckMsg> {
822 ack_to: ActorRef<AckMsg>,
823 make_ack: Arc<dyn Fn() -> AckMsg + Send + Sync>,
824 _marker: PhantomData<fn() -> AckMsg>,
825}
826
827impl<AckMsg> Clone for SourceBackpressure<AckMsg> {
828 fn clone(&self) -> Self {
829 Self {
830 ack_to: self.ack_to.clone(),
831 make_ack: Arc::clone(&self.make_ack),
832 _marker: PhantomData,
833 }
834 }
835}
836
837fn actor_source<T, AckMsg>(
838 backpressure: Option<SourceBackpressure<AckMsg>>,
839) -> Source<T, ActorRef<ActorSourceMessage<T>>>
840where
841 T: Send + 'static,
842 AckMsg: Message,
843{
844 Source::from_materialized_factory(move |_materializer| {
845 let shared = Arc::new(ActorSourceShared::new(backpressure.is_none()));
846 let actor = SourceActor {
847 shared: Arc::clone(&shared),
848 backpressure: backpressure.clone(),
849 };
850 let (actor_ref, _handle) =
851 block_on_ractor_runtime(Actor::spawn(None, actor, SourceActorState::default()))?
852 .map_err(|error| {
853 StreamError::Failed(format!("actor source failed to spawn: {error}"))
854 })?;
855 let stream = ActorSourceStream {
856 shared,
857 actor_ref: Some(actor_ref.clone()),
858 backpressure: backpressure.clone(),
859 terminated: false,
860 };
861 Ok((Box::new(stream) as BoxStream<T>, actor_ref))
862 })
863}
864
865struct ActorSourceShared<T> {
866 inner: Mutex<ActorSourceInner<T>>,
867 available: Condvar,
868}
869
870impl<T> ActorSourceShared<T> {
871 fn new(ready: bool) -> Self {
872 Self {
873 inner: Mutex::new(ActorSourceInner {
874 queue: VecDeque::new(),
875 completed: false,
876 ready,
877 }),
878 available: Condvar::new(),
879 }
880 }
881
882 fn lock(&self) -> MutexGuard<'_, ActorSourceInner<T>> {
883 self.inner
884 .lock()
885 .unwrap_or_else(|poison| poison.into_inner())
886 }
887
888 fn push(&self, item: T) {
889 let mut inner = self.lock();
890 if !inner.completed {
891 inner.queue.push_back(Ok(item));
892 }
893 drop(inner);
894 self.available.notify_all();
895 }
896
897 fn complete(&self) {
898 let mut inner = self.lock();
899 inner.completed = true;
900 drop(inner);
901 self.available.notify_all();
902 }
903
904 fn fail(&self, error: StreamError) {
905 let mut inner = self.lock();
906 inner.queue.clear();
907 inner.queue.push_back(Err(error));
908 inner.completed = true;
909 drop(inner);
910 self.available.notify_all();
911 }
912
913 fn mark_ready(&self) {
914 let mut inner = self.lock();
915 if !inner.completed {
916 inner.ready = true;
917 }
918 drop(inner);
919 self.available.notify_all();
920 }
921}
922
923struct ActorSourceInner<T> {
924 queue: VecDeque<StreamResult<T>>,
925 completed: bool,
926 ready: bool,
927}
928
929struct SourceActor<T, AckMsg> {
930 shared: Arc<ActorSourceShared<T>>,
931 backpressure: Option<SourceBackpressure<AckMsg>>,
932}
933
934#[derive(Default)]
935struct SourceActorState {
936 stopped_by_stream: bool,
937}
938
939impl<T, AckMsg> Actor for SourceActor<T, AckMsg>
940where
941 T: Send + 'static,
942 AckMsg: Message,
943{
944 type Msg = ActorSourceMessage<T>;
945 type State = SourceActorState;
946 type Arguments = SourceActorState;
947
948 async fn pre_start(
949 &self,
950 myself: ActorRef<Self::Msg>,
951 args: Self::Arguments,
952 ) -> ActorResult<Self::State> {
953 if let Some(backpressure) = &self.backpressure {
954 match send_source_ack(backpressure) {
955 Ok(()) => {
956 self.shared.mark_ready();
957 }
958 Err(error) => {
959 self.shared.fail(error);
960 myself.stop(None);
961 }
962 }
963 }
964 Ok(args)
965 }
966
967 async fn handle(
968 &self,
969 myself: ActorRef<Self::Msg>,
970 message: Self::Msg,
971 _state: &mut Self::State,
972 ) -> ActorResult {
973 match message {
974 ActorSourceMessage::Element(item) => {
975 if self.backpressure.is_some() {
976 let mut inner = self.shared.lock();
977 if !inner.ready {
978 inner.queue.clear();
979 inner.queue.push_back(Err(actor_interop_error(
980 "actor source backpressure protocol violation: element arrived before ack",
981 )));
982 inner.completed = true;
983 drop(inner);
984 self.shared.available.notify_all();
985 myself.stop(None);
986 return Ok(());
987 }
988 inner.ready = false;
989 drop(inner);
990 }
991 self.shared.push(item);
992 }
993 ActorSourceMessage::Complete => {
994 self.shared.complete();
995 myself.stop(None);
996 }
997 ActorSourceMessage::Fail(reason) => {
998 self.shared.fail(StreamError::Failed(reason));
999 myself.stop(None);
1000 }
1001 }
1002 Ok(())
1003 }
1004
1005 async fn post_stop(
1006 &self,
1007 _myself: ActorRef<Self::Msg>,
1008 state: &mut Self::State,
1009 ) -> ActorResult {
1010 if !state.stopped_by_stream {
1011 self.shared.complete();
1012 }
1013 Ok(())
1014 }
1015}
1016
1017struct ActorSourceStream<T, AckMsg> {
1018 shared: Arc<ActorSourceShared<T>>,
1019 actor_ref: Option<ActorRef<ActorSourceMessage<T>>>,
1020 backpressure: Option<SourceBackpressure<AckMsg>>,
1021 terminated: bool,
1022}
1023
1024impl<T, AckMsg> Iterator for ActorSourceStream<T, AckMsg>
1025where
1026 T: Send + 'static,
1027 AckMsg: Message,
1028{
1029 type Item = StreamResult<T>;
1030
1031 fn next(&mut self) -> Option<Self::Item> {
1032 if self.terminated {
1033 return None;
1034 }
1035
1036 let next = {
1037 let mut inner = self.shared.lock();
1038 loop {
1039 if let Some(item) = inner.queue.pop_front() {
1040 break Some(item);
1041 }
1042 if inner.completed {
1043 self.terminated = true;
1044 break None;
1045 }
1046 inner = self
1047 .shared
1048 .available
1049 .wait(inner)
1050 .unwrap_or_else(|poison| poison.into_inner());
1051 }
1052 };
1053
1054 if let Some(Ok(_)) = &next
1055 && let Some(backpressure) = &self.backpressure
1056 {
1057 match send_source_ack(backpressure) {
1058 Ok(()) => self.shared.mark_ready(),
1059 Err(error) => {
1060 self.shared.fail(error.clone());
1061 return Some(Err(error));
1062 }
1063 }
1064 }
1065
1066 next
1067 }
1068}
1069
1070impl<T, AckMsg> Drop for ActorSourceStream<T, AckMsg> {
1071 fn drop(&mut self) {
1072 if let Some(actor_ref) = self.actor_ref.take() {
1073 actor_ref.stop(None);
1074 }
1075 }
1076}
1077
1078fn send_source_ack<AckMsg>(backpressure: &SourceBackpressure<AckMsg>) -> StreamResult<()>
1079where
1080 AckMsg: Message,
1081{
1082 let ack = catch_unwind(AssertUnwindSafe(|| (backpressure.make_ack)())).map_err(|panic| {
1083 StreamError::Failed(format!(
1084 "actor source ack builder failed: {}",
1085 panic_reason(panic)
1086 ))
1087 })?;
1088 send_actor_message(&backpressure.ack_to, ack)
1089}
1090
1091fn run_actor_ref_sink<In, Msg, Elem, Complete, Failure>(
1092 input: &mut BoxStream<In>,
1093 cancelled: Arc<AtomicBool>,
1094 actor_ref: ActorRef<Msg>,
1095 make_element_message: Arc<Elem>,
1096 on_complete_message: Arc<Complete>,
1097 on_failure_message: Arc<Failure>,
1098) -> StreamResult<NotUsed>
1099where
1100 Msg: Message,
1101 Elem: Fn(In) -> Msg,
1102 Complete: Fn() -> Msg,
1103 Failure: Fn(StreamError) -> Msg,
1104{
1105 loop {
1106 if cancelled.load(Ordering::SeqCst) {
1107 return Err(StreamError::Cancelled);
1108 }
1109 match input.next() {
1110 Some(Ok(item)) => {
1111 let message = catch_unwind(AssertUnwindSafe(|| make_element_message(item)))
1112 .map_err(actor_sink_panic)?;
1113 send_actor_message(&actor_ref, message)?;
1114 }
1115 Some(Err(error)) => {
1116 let message = catch_unwind(AssertUnwindSafe(|| on_failure_message(error.clone())))
1117 .map_err(actor_sink_panic)?;
1118 send_actor_message(&actor_ref, message)?;
1119 return Err(error);
1120 }
1121 None => {
1122 let message = catch_unwind(AssertUnwindSafe(|| on_complete_message()))
1123 .map_err(actor_sink_panic)?;
1124 send_actor_message(&actor_ref, message)?;
1125 return Ok(NotUsed);
1126 }
1127 }
1128 }
1129}
1130
1131fn run_actor_ref_backpressure_sink<In, Msg, Ack, Elem, Complete, Failure>(
1132 input: &mut BoxStream<In>,
1133 cancelled: Arc<AtomicBool>,
1134 actor_ref: ActorRef<Msg>,
1135 timeout: Duration,
1136 make_element_message: Arc<Elem>,
1137 on_complete_message: Arc<Complete>,
1138 on_failure_message: Arc<Failure>,
1139) -> StreamResult<NotUsed>
1140where
1141 Msg: Message,
1142 Ack: Send + 'static,
1143 Elem: Fn(In, ReplyPort<Ack>) -> Msg,
1144 Complete: Fn() -> Msg,
1145 Failure: Fn(StreamError) -> Msg,
1146{
1147 loop {
1148 if cancelled.load(Ordering::SeqCst) {
1149 return Err(StreamError::Cancelled);
1150 }
1151 match input.next() {
1152 Some(Ok(item)) => {
1153 send_and_wait_ack(&actor_ref, timeout, |reply_to| {
1154 make_element_message(item, reply_to)
1155 })?;
1156 }
1157 Some(Err(error)) => {
1158 let message = catch_unwind(AssertUnwindSafe(|| on_failure_message(error.clone())))
1159 .map_err(actor_sink_panic)?;
1160 send_actor_message(&actor_ref, message)?;
1161 return Err(error);
1162 }
1163 None => {
1164 let message = catch_unwind(AssertUnwindSafe(|| on_complete_message()))
1165 .map_err(actor_sink_panic)?;
1166 send_actor_message(&actor_ref, message)?;
1167 return Ok(NotUsed);
1168 }
1169 }
1170 }
1171}
1172
1173fn send_and_wait_ack<Msg, Ack, Build>(
1174 actor_ref: &ActorRef<Msg>,
1175 timeout: Duration,
1176 build: Build,
1177) -> StreamResult<()>
1178where
1179 Msg: Message,
1180 Ack: Send + 'static,
1181 Build: FnOnce(ReplyPort<Ack>) -> Msg,
1182{
1183 let reply_state = Arc::new(ReplyState::new(timeout));
1184 let reply_to = ReplyPort::new(Arc::clone(&reply_state));
1185 let message = catch_unwind(AssertUnwindSafe(|| build(reply_to))).map_err(actor_sink_panic)?;
1186 send_actor_message(actor_ref, message)?;
1187 let ask = InFlightAsk {
1188 index: 0,
1189 state: Some(reply_state),
1190 deadline: Instant::now().checked_add(timeout),
1191 };
1192 let mut in_flight = vec![ask];
1193 wait_for_ready_ask(&mut in_flight, timeout)
1194 .result
1195 .map(|_ack| ())
1196}
1197
1198fn actor_sink_panic(panic: Box<dyn std::any::Any + Send>) -> StreamError {
1199 StreamError::Failed(format!(
1200 "actor sink message builder failed: {}",
1201 panic_reason(panic)
1202 ))
1203}
1204
1205fn send_actor_message<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
1206where
1207 Msg: Message,
1208{
1209 match actor_ref.cast(message) {
1210 Ok(()) => Ok(()),
1211 Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
1212 Err(StreamError::ActorTerminated)
1213 }
1214 Err(error) => Err(StreamError::ActorAskSendFailed {
1215 reason: error.to_string(),
1216 }),
1217 }
1218}
1219
1220fn broadcast_to_group<T>(group: &str, message: ActorSourceMessage<T>) -> StreamResult<()>
1221where
1222 T: Clone + Send + 'static,
1223{
1224 for member in ractor::pg::get_members(&group.to_owned()) {
1225 let actor_ref: ActorRef<ActorSourceMessage<T>> = member.into();
1226 match send_actor_message(&actor_ref, message.clone()) {
1227 Ok(()) | Err(StreamError::ActorTerminated) => {}
1228 Err(error) => return Err(error),
1229 }
1230 }
1231 Ok(())
1232}
1233
1234struct WatchShared<T> {
1235 inner: Mutex<WatchInner<T>>,
1236 available: Condvar,
1237}
1238
1239impl<T> Default for WatchShared<T> {
1240 fn default() -> Self {
1241 Self {
1242 inner: Mutex::new(WatchInner {
1243 queue: VecDeque::new(),
1244 event: None,
1245 upstream_done: false,
1246 }),
1247 available: Condvar::new(),
1248 }
1249 }
1250}
1251
1252impl<T> WatchShared<T> {
1253 fn lock(&self) -> MutexGuard<'_, WatchInner<T>> {
1254 self.inner
1255 .lock()
1256 .unwrap_or_else(|poison| poison.into_inner())
1257 }
1258
1259 fn push(&self, item: StreamResult<T>) -> bool {
1260 let mut inner = self.lock();
1261 if inner.event.is_some() || inner.upstream_done {
1262 return false;
1263 }
1264 inner.queue.push_back(item);
1265 drop(inner);
1266 self.available.notify_all();
1267 true
1268 }
1269
1270 fn complete(&self) {
1271 let mut inner = self.lock();
1272 inner.upstream_done = true;
1273 drop(inner);
1274 self.available.notify_all();
1275 }
1276
1277 fn fail(&self, event: WatchEvent) {
1278 let mut inner = self.lock();
1279 if inner.event.is_none() {
1280 inner.queue.clear();
1281 inner.event = Some(event);
1282 inner.upstream_done = true;
1283 }
1284 drop(inner);
1285 self.available.notify_all();
1286 }
1287}
1288
1289struct WatchInner<T> {
1290 queue: VecDeque<StreamResult<T>>,
1291 event: Option<WatchEvent>,
1292 upstream_done: bool,
1293}
1294
1295struct WatchMonitorActor<T> {
1296 watched: ractor::ActorCell,
1297 shared: Arc<WatchShared<T>>,
1298 ready: Mutex<Option<std::sync::mpsc::Sender<()>>>,
1299}
1300
1301impl<T> Actor for WatchMonitorActor<T>
1302where
1303 T: Send + 'static,
1304{
1305 type Msg = ();
1306 type State = ();
1307 type Arguments = ();
1308
1309 async fn pre_start(
1310 &self,
1311 myself: ActorRef<Self::Msg>,
1312 _args: Self::Arguments,
1313 ) -> ActorResult<Self::State> {
1314 myself.monitor(self.watched.clone());
1315 let ready = self
1316 .ready
1317 .lock()
1318 .unwrap_or_else(|poison| poison.into_inner())
1319 .take();
1320 if let Some(ready) = ready {
1321 let _ = ready.send(());
1322 }
1323 Ok(())
1324 }
1325
1326 async fn handle_supervisor_evt(
1327 &self,
1328 myself: ActorRef<Self::Msg>,
1329 event: ractor::SupervisionEvent,
1330 _state: &mut Self::State,
1331 ) -> ActorResult {
1332 match event {
1333 ractor::SupervisionEvent::ActorTerminated(who, _, reason)
1334 if who.get_id() == self.watched.get_id() =>
1335 {
1336 let detail = reason.unwrap_or_else(|| "terminated".to_owned());
1337 self.shared.fail(WatchEvent {
1338 reason: format!("watched actor terminated: {detail}"),
1339 });
1340 myself.stop(None);
1341 }
1342 ractor::SupervisionEvent::ActorFailed(who, error)
1343 if who.get_id() == self.watched.get_id() =>
1344 {
1345 self.shared.fail(WatchEvent {
1346 reason: format!("watched actor terminated: {error}"),
1347 });
1348 myself.stop(None);
1349 }
1350 _ => {}
1351 }
1352 Ok(())
1353 }
1354
1355 async fn post_stop(
1356 &self,
1357 myself: ActorRef<Self::Msg>,
1358 _state: &mut Self::State,
1359 ) -> ActorResult {
1360 myself.unmonitor(self.watched.clone());
1361 Ok(())
1362 }
1363}
1364
1365fn spawn_watch_monitor<T>(
1366 watched: ractor::ActorCell,
1367 shared: Arc<WatchShared<T>>,
1368) -> StreamResult<(ActorRef<()>, ractor::concurrency::JoinHandle<()>)>
1369where
1370 T: Send + 'static,
1371{
1372 let (ready_tx, ready_rx) = std::sync::mpsc::channel();
1373 block_on_ractor_runtime(Actor::spawn(
1374 None,
1375 WatchMonitorActor {
1376 watched,
1377 shared,
1378 ready: Mutex::new(Some(ready_tx)),
1379 },
1380 (),
1381 ))?
1382 .map_err(|error| StreamError::Failed(format!("watch monitor failed to spawn: {error}")))
1383 .and_then(|(monitor_ref, handle)| {
1384 ready_rx.recv_timeout(Duration::from_secs(1)).map_err(|_| {
1385 monitor_ref.stop(None);
1386 StreamError::Failed("watch monitor did not become ready".to_owned())
1387 })?;
1388 Ok((monitor_ref, handle))
1389 })
1390}
1391
1392struct WatchStream<T> {
1393 shared: Arc<WatchShared<T>>,
1394 monitor_ref: Option<ActorRef<()>>,
1395 producer_completion: Option<StreamCompletion<NotUsed>>,
1396 terminated: bool,
1397}
1398
1399impl<T> Iterator for WatchStream<T>
1400where
1401 T: Send + 'static,
1402{
1403 type Item = StreamResult<T>;
1404
1405 fn next(&mut self) -> Option<Self::Item> {
1406 if self.terminated {
1407 return None;
1408 }
1409
1410 let mut inner = self.shared.lock();
1411 loop {
1412 if let Some(event) = inner.event.take() {
1413 self.terminated = true;
1414 return Some(Err(event.into_stream_error()));
1415 }
1416 if let Some(item) = inner.queue.pop_front() {
1417 if item.is_err() {
1418 self.terminated = true;
1419 }
1420 return Some(item);
1421 }
1422 if inner.upstream_done {
1423 self.terminated = true;
1424 return None;
1425 }
1426 inner = self
1427 .shared
1428 .available
1429 .wait(inner)
1430 .unwrap_or_else(|poison| poison.into_inner());
1431 }
1432 }
1433}
1434
1435impl<T> Drop for WatchStream<T> {
1436 fn drop(&mut self) {
1437 if let Some(monitor_ref) = self.monitor_ref.take() {
1438 monitor_ref.stop(None);
1439 }
1440 drop(self.producer_completion.take());
1441 }
1442}
1443
1444fn run_watch_upstream<T>(
1445 mut input: BoxStream<T>,
1446 shared: Arc<WatchShared<T>>,
1447 cancelled: Arc<AtomicBool>,
1448) -> StreamResult<NotUsed>
1449where
1450 T: Send + 'static,
1451{
1452 loop {
1453 if cancelled.load(Ordering::SeqCst) {
1454 shared.complete();
1455 return Err(StreamError::Cancelled);
1456 }
1457 match input.next() {
1458 Some(Ok(item)) => {
1459 if !shared.push(Ok(item)) {
1460 return Err(StreamError::Cancelled);
1461 }
1462 }
1463 Some(Err(error)) => {
1464 let _ = shared.push(Err(error.clone()));
1465 shared.complete();
1466 return Err(error);
1467 }
1468 None => {
1469 shared.complete();
1470 return Ok(NotUsed);
1471 }
1472 }
1473 }
1474}
1475
1476fn actor_interop_error(reason: impl Into<String>) -> StreamError {
1477 StreamError::Failed(reason.into())
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482 use super::*;
1483 use crate::stream::{Keep, Sink};
1484 use std::{
1485 sync::{
1486 Arc as StdArc, Mutex as StdMutex,
1487 atomic::{AtomicUsize, Ordering},
1488 mpsc,
1489 },
1490 time::{Duration as StdDuration, Instant as StdInstant},
1491 };
1492
1493 fn wait_until<F>(timeout: StdDuration, mut condition: F) -> bool
1494 where
1495 F: FnMut() -> bool,
1496 {
1497 let deadline = StdInstant::now() + timeout;
1498 while StdInstant::now() < deadline {
1499 if condition() {
1500 return true;
1501 }
1502 std::thread::park_timeout(StdDuration::from_millis(1));
1503 }
1504 condition()
1505 }
1506
1507 fn spawn_actor<A>(
1508 actor: A,
1509 args: A::Arguments,
1510 ) -> (ActorRef<A::Msg>, ractor::concurrency::JoinHandle<()>)
1511 where
1512 A: Actor,
1513 A::Msg: Message,
1514 A::Arguments: Send + 'static,
1515 {
1516 block_on_ractor_runtime(Actor::spawn(None, actor, args))
1517 .expect("ractor runtime runs")
1518 .expect("actor spawns")
1519 }
1520
1521 fn stop_actor<Msg>(actor_ref: ActorRef<Msg>, handle: ractor::concurrency::JoinHandle<()>)
1522 where
1523 Msg: Message,
1524 {
1525 actor_ref.stop(None);
1526 block_on_ractor_runtime(async move {
1527 handle.await.expect("actor task joins");
1528 })
1529 .expect("ractor runtime joins actor");
1530 }
1531
1532 enum AskInteropMsg {
1533 Status {
1534 input: u64,
1535 reply_to: ReplyPort<ActorStatus<u64>>,
1536 },
1537 Plain {
1538 input: u64,
1539 reply_to: ReplyPort<u64>,
1540 },
1541 }
1542
1543 #[cfg(feature = "cluster")]
1544 impl Message for AskInteropMsg {}
1545
1546 struct AskInteropActor;
1547
1548 impl Actor for AskInteropActor {
1549 type Msg = AskInteropMsg;
1550 type State = ();
1551 type Arguments = ();
1552
1553 async fn pre_start(
1554 &self,
1555 _myself: ActorRef<Self::Msg>,
1556 _args: Self::Arguments,
1557 ) -> ActorResult<Self::State> {
1558 Ok(())
1559 }
1560
1561 async fn handle(
1562 &self,
1563 _myself: ActorRef<Self::Msg>,
1564 message: Self::Msg,
1565 _state: &mut Self::State,
1566 ) -> ActorResult {
1567 match message {
1568 AskInteropMsg::Status { input: 0, reply_to } => {
1569 let _ =
1570 reply_to.send(ActorStatus::Err(StreamError::Failed("bad status".into())));
1571 }
1572 AskInteropMsg::Status { input, reply_to } => {
1573 let _ = reply_to.send(ActorStatus::Ok(input + 10));
1574 }
1575 AskInteropMsg::Plain { input, reply_to } => {
1576 let _ = reply_to.send(input * 2);
1577 }
1578 }
1579 Ok(())
1580 }
1581 }
1582
1583 #[test]
1584 fn ask_with_status_unwraps_ok_and_fails_err() {
1585 let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
1586
1587 let values = Source::from_iter([1_u64, 2])
1588 .via(ActorFlow::ask_with_status(
1589 actor_ref.clone(),
1590 2,
1591 Duration::from_secs(1),
1592 |input, reply_to| AskInteropMsg::Status { input, reply_to },
1593 ))
1594 .run_collect()
1595 .unwrap();
1596 assert_eq!(values, vec![11, 12]);
1597
1598 let failed = Source::single(0_u64)
1599 .via(ActorFlow::ask_with_status(
1600 actor_ref.clone(),
1601 1,
1602 Duration::from_secs(1),
1603 |input, reply_to| AskInteropMsg::Status { input, reply_to },
1604 ))
1605 .run_collect();
1606 assert_eq!(failed, Err(StreamError::Failed("bad status".into())));
1607
1608 stop_actor(actor_ref, handle);
1609 }
1610
1611 #[test]
1612 fn ask_with_context_preserves_context() {
1613 let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
1614
1615 let values = Source::from_iter([1_u64, 2, 3])
1616 .as_source_with_context(|item| item + 100)
1617 .via(ActorFlow::ask_with_context(
1618 actor_ref.clone(),
1619 2,
1620 Duration::from_secs(1),
1621 |input, reply_to| AskInteropMsg::Plain { input, reply_to },
1622 ))
1623 .run_collect()
1624 .unwrap();
1625
1626 assert_eq!(values, vec![(2, 101), (4, 102), (6, 103)]);
1627 stop_actor(actor_ref, handle);
1628 }
1629
1630 #[test]
1631 fn ask_with_status_and_context_preserves_context_and_fails_status() {
1632 let (actor_ref, handle) = spawn_actor(AskInteropActor, ());
1633
1634 let values = Source::from_iter([1_u64, 2])
1635 .as_source_with_context(|item| item + 10)
1636 .via(ActorFlow::ask_with_status_and_context(
1637 actor_ref.clone(),
1638 2,
1639 Duration::from_secs(1),
1640 |input, reply_to| AskInteropMsg::Status { input, reply_to },
1641 ))
1642 .run_collect()
1643 .unwrap();
1644 assert_eq!(values, vec![(11, 11), (12, 12)]);
1645
1646 let failed = Source::single(0_u64)
1647 .as_source_with_context(|_| 99_u64)
1648 .via(ActorFlow::ask_with_status_and_context(
1649 actor_ref.clone(),
1650 1,
1651 Duration::from_secs(1),
1652 |input, reply_to| AskInteropMsg::Status { input, reply_to },
1653 ))
1654 .run_collect();
1655 assert_eq!(failed, Err(StreamError::Failed("bad status".into())));
1656
1657 stop_actor(actor_ref, handle);
1658 }
1659
1660 #[test]
1661 fn actor_source_actor_ref_emits_and_completes() {
1662 let (actor_ref, completion) = ActorSource::actor_ref::<u64>()
1663 .to_mat(Sink::collect(), Keep::both)
1664 .run()
1665 .unwrap();
1666
1667 actor_ref.cast(ActorSourceMessage::Element(1)).unwrap();
1668 actor_ref.cast(ActorSourceMessage::Element(2)).unwrap();
1669 actor_ref.cast(ActorSourceMessage::Complete).unwrap();
1670
1671 assert_eq!(completion.wait().unwrap(), vec![1, 2]);
1672 }
1673
1674 #[derive(Clone)]
1675 enum SourceAckMsg {
1676 Ack,
1677 }
1678
1679 #[cfg(feature = "cluster")]
1680 impl Message for SourceAckMsg {}
1681
1682 struct SourceAckActor {
1683 count: StdArc<AtomicUsize>,
1684 }
1685
1686 impl Actor for SourceAckActor {
1687 type Msg = SourceAckMsg;
1688 type State = ();
1689 type Arguments = ();
1690
1691 async fn pre_start(
1692 &self,
1693 _myself: ActorRef<Self::Msg>,
1694 _args: Self::Arguments,
1695 ) -> ActorResult<Self::State> {
1696 Ok(())
1697 }
1698
1699 async fn handle(
1700 &self,
1701 _myself: ActorRef<Self::Msg>,
1702 _message: Self::Msg,
1703 _state: &mut Self::State,
1704 ) -> ActorResult {
1705 self.count.fetch_add(1, Ordering::SeqCst);
1706 Ok(())
1707 }
1708 }
1709
1710 #[test]
1711 fn actor_source_with_backpressure_acks_startup_and_each_element() {
1712 let ack_count = StdArc::new(AtomicUsize::new(0));
1713 let (ack_ref, ack_handle) = spawn_actor(
1714 SourceAckActor {
1715 count: StdArc::clone(&ack_count),
1716 },
1717 (),
1718 );
1719
1720 let (actor_ref, completion) =
1721 ActorSource::actor_ref_with_backpressure::<u64, SourceAckMsg>(
1722 ack_ref.clone(),
1723 SourceAckMsg::Ack,
1724 )
1725 .to_mat(Sink::collect(), Keep::both)
1726 .run()
1727 .unwrap();
1728
1729 assert!(wait_until(StdDuration::from_secs(1), || {
1730 ack_count.load(Ordering::SeqCst) >= 1
1731 }));
1732 actor_ref.cast(ActorSourceMessage::Element(1)).unwrap();
1733 assert!(wait_until(StdDuration::from_secs(1), || {
1734 ack_count.load(Ordering::SeqCst) >= 2
1735 }));
1736 actor_ref.cast(ActorSourceMessage::Element(2)).unwrap();
1737 assert!(wait_until(StdDuration::from_secs(1), || {
1738 ack_count.load(Ordering::SeqCst) >= 3
1739 }));
1740 actor_ref.cast(ActorSourceMessage::Complete).unwrap();
1741
1742 assert_eq!(completion.wait().unwrap(), vec![1, 2]);
1743 stop_actor(ack_ref, ack_handle);
1744 }
1745
1746 #[derive(Debug, Clone, PartialEq, Eq)]
1747 enum SinkEvent {
1748 Element(u64),
1749 Complete,
1750 Fail(String),
1751 }
1752
1753 #[cfg(feature = "cluster")]
1754 impl Message for SinkEvent {}
1755
1756 struct EventActor {
1757 sender: mpsc::Sender<SinkEvent>,
1758 }
1759
1760 impl Actor for EventActor {
1761 type Msg = SinkEvent;
1762 type State = ();
1763 type Arguments = ();
1764
1765 async fn pre_start(
1766 &self,
1767 _myself: ActorRef<Self::Msg>,
1768 _args: Self::Arguments,
1769 ) -> ActorResult<Self::State> {
1770 Ok(())
1771 }
1772
1773 async fn handle(
1774 &self,
1775 _myself: ActorRef<Self::Msg>,
1776 message: Self::Msg,
1777 _state: &mut Self::State,
1778 ) -> ActorResult {
1779 self.sender
1780 .send(message)
1781 .expect("event receiver stays open");
1782 Ok(())
1783 }
1784 }
1785
1786 #[test]
1787 fn actor_sink_actor_ref_sends_elements_and_complete() {
1788 let (tx, rx) = mpsc::channel();
1789 let (actor_ref, handle) = spawn_actor(EventActor { sender: tx }, ());
1790
1791 Source::from_iter([1_u64, 2])
1792 .run_with(ActorSink::actor_ref(
1793 actor_ref.clone(),
1794 SinkEvent::Element,
1795 || SinkEvent::Complete,
1796 |error| SinkEvent::Fail(error.to_string()),
1797 ))
1798 .unwrap()
1799 .wait()
1800 .unwrap();
1801
1802 assert_eq!(
1803 [
1804 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1805 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1806 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1807 ],
1808 [
1809 SinkEvent::Element(1),
1810 SinkEvent::Element(2),
1811 SinkEvent::Complete,
1812 ]
1813 );
1814 stop_actor(actor_ref, handle);
1815 }
1816
1817 #[derive(Debug, Clone, PartialEq, Eq)]
1818 enum BackpressureEvent {
1819 Init,
1820 Element(u64),
1821 Complete,
1822 Fail(String),
1823 }
1824
1825 enum BackpressureSinkMsg {
1826 Init(ReplyPort<()>),
1827 Element(u64, ReplyPort<()>),
1828 Complete,
1829 Fail(String),
1830 }
1831
1832 #[cfg(feature = "cluster")]
1833 impl Message for BackpressureSinkMsg {}
1834
1835 struct BackpressureSinkActor {
1836 sender: mpsc::Sender<BackpressureEvent>,
1837 }
1838
1839 impl Actor for BackpressureSinkActor {
1840 type Msg = BackpressureSinkMsg;
1841 type State = ();
1842 type Arguments = ();
1843
1844 async fn pre_start(
1845 &self,
1846 _myself: ActorRef<Self::Msg>,
1847 _args: Self::Arguments,
1848 ) -> ActorResult<Self::State> {
1849 Ok(())
1850 }
1851
1852 async fn handle(
1853 &self,
1854 _myself: ActorRef<Self::Msg>,
1855 message: Self::Msg,
1856 _state: &mut Self::State,
1857 ) -> ActorResult {
1858 match message {
1859 BackpressureSinkMsg::Init(reply_to) => {
1860 self.sender.send(BackpressureEvent::Init).unwrap();
1861 let _ = reply_to.send(());
1862 }
1863 BackpressureSinkMsg::Element(item, reply_to) => {
1864 self.sender.send(BackpressureEvent::Element(item)).unwrap();
1865 let _ = reply_to.send(());
1866 }
1867 BackpressureSinkMsg::Complete => {
1868 self.sender.send(BackpressureEvent::Complete).unwrap();
1869 }
1870 BackpressureSinkMsg::Fail(error) => {
1871 self.sender.send(BackpressureEvent::Fail(error)).unwrap();
1872 }
1873 }
1874 Ok(())
1875 }
1876 }
1877
1878 #[test]
1879 fn actor_sink_with_backpressure_waits_for_init_and_element_acks() {
1880 let (tx, rx) = mpsc::channel();
1881 let (actor_ref, handle) = spawn_actor(BackpressureSinkActor { sender: tx }, ());
1882
1883 Source::from_iter([1_u64, 2])
1884 .run_with(ActorSink::actor_ref_with_backpressure(
1885 actor_ref.clone(),
1886 Duration::from_secs(1),
1887 BackpressureSinkMsg::Init,
1888 BackpressureSinkMsg::Element,
1889 || BackpressureSinkMsg::Complete,
1890 |error| BackpressureSinkMsg::Fail(error.to_string()),
1891 ))
1892 .unwrap()
1893 .wait()
1894 .unwrap();
1895
1896 assert_eq!(
1897 [
1898 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1899 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1900 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1901 rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1902 ],
1903 [
1904 BackpressureEvent::Init,
1905 BackpressureEvent::Element(1),
1906 BackpressureEvent::Element(2),
1907 BackpressureEvent::Complete,
1908 ]
1909 );
1910 stop_actor(actor_ref, handle);
1911 }
1912
1913 enum WatchMsg {
1914 Stop,
1915 }
1916
1917 #[cfg(feature = "cluster")]
1918 impl Message for WatchMsg {}
1919
1920 struct WatchActor;
1921
1922 impl Actor for WatchActor {
1923 type Msg = WatchMsg;
1924 type State = ();
1925 type Arguments = ();
1926
1927 async fn pre_start(
1928 &self,
1929 _myself: ActorRef<Self::Msg>,
1930 _args: Self::Arguments,
1931 ) -> ActorResult<Self::State> {
1932 Ok(())
1933 }
1934
1935 async fn handle(
1936 &self,
1937 myself: ActorRef<Self::Msg>,
1938 _message: Self::Msg,
1939 _state: &mut Self::State,
1940 ) -> ActorResult {
1941 myself.stop(Some("watched-stop".into()));
1942 Ok(())
1943 }
1944 }
1945
1946 #[test]
1947 fn watch_fails_idle_stream_when_actor_terminates() {
1948 let (watched_ref, watched_handle) = spawn_actor(WatchActor, ());
1949 let (source_ref, completion) = ActorSource::actor_ref::<u64>()
1950 .via(ActorFlow::watch(watched_ref.clone()))
1951 .to_mat(Sink::collect(), Keep::both)
1952 .run()
1953 .unwrap();
1954
1955 watched_ref.cast(WatchMsg::Stop).unwrap();
1956 assert!(wait_until(StdDuration::from_secs(1), || {
1957 watched_ref.get_status() == ractor::ActorStatus::Stopped
1958 }));
1959
1960 let result = completion.wait();
1961 let _ = source_ref.cast(ActorSourceMessage::Complete);
1962 assert!(
1963 matches!(result, Err(StreamError::Failed(reason)) if reason.contains("watched actor terminated"))
1964 );
1965 block_on_ractor_runtime(async move {
1966 watched_handle.await.expect("watched actor joins");
1967 })
1968 .expect("ractor runtime joins watched actor");
1969 }
1970
1971 static NEXT_GROUP: AtomicUsize = AtomicUsize::new(0);
1972
1973 #[test]
1974 fn pubsub_sink_broadcasts_to_pg_source() {
1975 let group = format!(
1976 "datum-test-pubsub-{}",
1977 NEXT_GROUP.fetch_add(1, Ordering::SeqCst)
1978 );
1979
1980 let (_source_ref, completion) = ActorPubSub::source::<u64>(group.clone())
1981 .to_mat(Sink::collect(), Keep::both)
1982 .run()
1983 .unwrap();
1984
1985 assert!(wait_until(StdDuration::from_secs(1), || {
1986 !ractor::pg::get_members(&group).is_empty()
1987 }));
1988
1989 Source::from_iter([1_u64, 2])
1990 .run_with(ActorPubSub::sink(group))
1991 .unwrap()
1992 .wait()
1993 .unwrap();
1994
1995 assert_eq!(completion.wait().unwrap(), vec![1, 2]);
1996 }
1997
1998 #[test]
1999 fn actor_source_fail_message_fails_stream() {
2000 let (actor_ref, completion) = ActorSource::typed::<u64>()
2001 .to_mat(Sink::collect(), Keep::both)
2002 .run()
2003 .unwrap();
2004
2005 actor_ref
2006 .cast(ActorSourceMessage::Fail("source failed".into()))
2007 .unwrap();
2008
2009 assert_eq!(
2010 completion.wait(),
2011 Err(StreamError::Failed("source failed".into()))
2012 );
2013 }
2014
2015 #[test]
2016 fn actor_sink_typed_sends_protocol_messages() {
2017 let received = StdArc::new(StdMutex::new(Vec::<String>::new()));
2018
2019 struct TypedSinkActor {
2020 received: StdArc<StdMutex<Vec<String>>>,
2021 }
2022
2023 impl Actor for TypedSinkActor {
2024 type Msg = ActorSinkMessage<u64>;
2025 type State = ();
2026 type Arguments = ();
2027
2028 async fn pre_start(
2029 &self,
2030 _myself: ActorRef<Self::Msg>,
2031 _args: Self::Arguments,
2032 ) -> ActorResult<Self::State> {
2033 Ok(())
2034 }
2035
2036 async fn handle(
2037 &self,
2038 _myself: ActorRef<Self::Msg>,
2039 message: Self::Msg,
2040 _state: &mut Self::State,
2041 ) -> ActorResult {
2042 let label = match message {
2043 ActorSinkMessage::Element(item) => format!("element:{item}"),
2044 ActorSinkMessage::Complete => "complete".to_owned(),
2045 ActorSinkMessage::Fail(error) => format!("fail:{error}"),
2046 };
2047 self.received.lock().unwrap().push(label);
2048 Ok(())
2049 }
2050 }
2051
2052 let (actor_ref, handle) = spawn_actor(
2053 TypedSinkActor {
2054 received: StdArc::clone(&received),
2055 },
2056 (),
2057 );
2058
2059 Source::from_iter([7_u64])
2060 .run_with(ActorSink::typed(actor_ref.clone()))
2061 .unwrap()
2062 .wait()
2063 .unwrap();
2064
2065 assert!(wait_until(StdDuration::from_secs(1), || {
2066 received.lock().unwrap().len() == 2
2067 }));
2068 assert_eq!(
2069 *received.lock().unwrap(),
2070 vec!["element:7".to_owned(), "complete".to_owned()]
2071 );
2072 stop_actor(actor_ref, handle);
2073 }
2074}