flows_math/
add.rs

1// This is free and unencumbered software released into the public domain.
2
3use async_flow::{Inputs, Outputs, Result};
4use core::ops::Add;
5use tokio::try_join;
6
7/// A block that outputs the sums of input numbers.
8pub async fn add<T>(mut lhs: Inputs<T>, mut rhs: Inputs<T>, sums: Outputs<T>) -> Result
9where
10    T: Add<Output = T>,
11{
12    loop {
13        let (a, b) = try_join!(lhs.recv(), rhs.recv())?;
14        match (a, b) {
15            (Some(a), Some(b)) => sums.send(a + b).await?,
16            _ => break,
17        }
18    }
19    Ok(())
20}
21
22#[cfg(test)]
23mod tests {
24    use super::*;
25
26    #[tokio::test]
27    async fn test_add() {
28        use async_flow::{Port, bounded};
29
30        let (mut lhs_tx, lhs_rx) = bounded(1);
31        let (mut rhs_tx, rhs_rx) = bounded(1);
32        let (sums_tx, mut sums_rx) = bounded(10);
33
34        let adder = tokio::spawn(add::<isize>(lhs_rx, rhs_rx, sums_tx));
35
36        lhs_tx.send(1).await.unwrap();
37        lhs_tx.close();
38
39        rhs_tx.send(2).await.unwrap();
40        rhs_tx.close();
41
42        let _ = tokio::join!(adder);
43
44        let sum = sums_rx.recv().await.unwrap();
45        assert_eq!(sum, Some(3));
46
47        let sum = sums_rx.recv().await.unwrap();
48        assert_eq!(sum, None);
49    }
50}