atomr_core/routing/
scatter_gather.rs1use std::time::Duration;
4
5use tokio::sync::oneshot;
6
7use crate::actor::{ActorRef, AskError};
8
9pub struct ScatterGatherFirstCompletedRouter<M: Send + 'static> {
10 pub routees: Vec<ActorRef<M>>,
11 pub within: Duration,
12}
13
14impl<M: Send + 'static> ScatterGatherFirstCompletedRouter<M> {
15 pub fn new(routees: Vec<ActorRef<M>>, within: Duration) -> Self {
16 Self { routees, within }
17 }
18
19 pub async fn ask_first<R, F>(&self, mut build: F) -> Result<R, AskError>
21 where
22 R: Send + 'static,
23 F: FnMut(oneshot::Sender<R>) -> M,
24 {
25 let mut joins = futures_util::stream::FuturesUnordered::new();
26 for r in &self.routees {
27 let (tx, rx) = oneshot::channel::<R>();
28 r.tell(build(tx));
29 joins.push(async move { tokio::time::timeout(self.within, rx).await });
30 }
31 use futures_util::StreamExt;
32 while let Some(res) = joins.next().await {
33 match res {
34 Ok(Ok(v)) => return Ok(v),
35 _ => continue,
36 }
37 }
38 Err(AskError::Timeout)
39 }
40}