Skip to main content

atomr_core/routing/
scatter_gather.rs

1//! Scatter-gather-first-completed router.
2
3use std::time::Duration;
4
5use tokio::sync::oneshot;
6
7use crate::actor::{ActorRef, AskError};
8
9pub struct ScatterGatherFirstCompletedRouter<M: Send + 'static> {
10    pub routees: Vec<ActorRef<M>>,
11    pub within: Duration,
12}
13
14impl<M: Send + 'static> ScatterGatherFirstCompletedRouter<M> {
15    pub fn new(routees: Vec<ActorRef<M>>, within: Duration) -> Self {
16        Self { routees, within }
17    }
18
19    /// Fan out a query and return the first non-error reply.
20    pub async fn ask_first<R, F>(&self, mut build: F) -> Result<R, AskError>
21    where
22        R: Send + 'static,
23        F: FnMut(oneshot::Sender<R>) -> M,
24    {
25        let mut joins = futures_util::stream::FuturesUnordered::new();
26        for r in &self.routees {
27            let (tx, rx) = oneshot::channel::<R>();
28            r.tell(build(tx));
29            joins.push(async move { tokio::time::timeout(self.within, rx).await });
30        }
31        use futures_util::StreamExt;
32        while let Some(res) = joins.next().await {
33            match res {
34                Ok(Ok(v)) => return Ok(v),
35                _ => continue,
36            }
37        }
38        Err(AskError::Timeout)
39    }
40}