Skip to main content

atomr_core/routing/
scatter_gather.rs

1//! Scatter-gather-first-completed router.
2//! akka.net: `Routing/ScatterGatherFirstCompletedPool.cs`.
3
4use std::time::Duration;
5
6use tokio::sync::oneshot;
7
8use crate::actor::{ActorRef, AskError};
9
10pub struct ScatterGatherFirstCompletedRouter<M: Send + 'static> {
11    pub routees: Vec<ActorRef<M>>,
12    pub within: Duration,
13}
14
15impl<M: Send + 'static> ScatterGatherFirstCompletedRouter<M> {
16    pub fn new(routees: Vec<ActorRef<M>>, within: Duration) -> Self {
17        Self { routees, within }
18    }
19
20    /// Fan out a query and return the first non-error reply.
21    pub async fn ask_first<R, F>(&self, mut build: F) -> Result<R, AskError>
22    where
23        R: Send + 'static,
24        F: FnMut(oneshot::Sender<R>) -> M,
25    {
26        let mut joins = futures_util::stream::FuturesUnordered::new();
27        for r in &self.routees {
28            let (tx, rx) = oneshot::channel::<R>();
29            r.tell(build(tx));
30            joins.push(async move { tokio::time::timeout(self.within, rx).await });
31        }
32        use futures_util::StreamExt;
33        while let Some(res) = joins.next().await {
34            match res {
35                Ok(Ok(v)) => return Ok(v),
36                _ => continue,
37            }
38        }
39        Err(AskError::Timeout)
40    }
41}