atomr_core/routing/
scatter_gather.rs1use std::time::Duration;
5
6use tokio::sync::oneshot;
7
8use crate::actor::{ActorRef, AskError};
9
10pub struct ScatterGatherFirstCompletedRouter<M: Send + 'static> {
11 pub routees: Vec<ActorRef<M>>,
12 pub within: Duration,
13}
14
15impl<M: Send + 'static> ScatterGatherFirstCompletedRouter<M> {
16 pub fn new(routees: Vec<ActorRef<M>>, within: Duration) -> Self {
17 Self { routees, within }
18 }
19
20 pub async fn ask_first<R, F>(&self, mut build: F) -> Result<R, AskError>
22 where
23 R: Send + 'static,
24 F: FnMut(oneshot::Sender<R>) -> M,
25 {
26 let mut joins = futures_util::stream::FuturesUnordered::new();
27 for r in &self.routees {
28 let (tx, rx) = oneshot::channel::<R>();
29 r.tell(build(tx));
30 joins.push(async move { tokio::time::timeout(self.within, rx).await });
31 }
32 use futures_util::StreamExt;
33 while let Some(res) = joins.next().await {
34 match res {
35 Ok(Ok(v)) => return Ok(v),
36 _ => continue,
37 }
38 }
39 Err(AskError::Timeout)
40 }
41}