flows_math/
add.rs

1// This is free and unencumbered software released into the public domain.
2
3use async_flow::{Inputs, Outputs, Result};
4use core::ops::Add;
5use tokio::try_join;
6
7/// A block that outputs the sums of input numbers.
8pub async fn add<T>(mut lhs: Inputs<T>, mut rhs: Inputs<T>, sums: Outputs<T>) -> Result
9where
10    T: Add<Output = T>,
11{
12    loop {
13        let (a, b) = try_join!(lhs.recv(), rhs.recv())?;
14        match (a, b) {
15            (Some(a), Some(b)) => sums.send(a + b).await?,
16            _ => break,
17        }
18    }
19    Ok(())
20}
21
22#[cfg(test)]
23mod tests {
24    use super::*;
25    use alloc::boxed::Box;
26    use async_flow::Channel;
27    use core::error::Error;
28
29    #[tokio::test]
30    async fn test_add() -> Result<(), Box<dyn Error>> {
31        let mut lhs = Channel::bounded(1);
32        let mut rhs = Channel::bounded(1);
33        let mut sums = Channel::bounded(10);
34
35        let adder = tokio::spawn(add::<isize>(lhs.rx, rhs.rx, sums.tx));
36
37        lhs.tx.send(1).await.unwrap();
38        lhs.tx.close();
39
40        rhs.tx.send(2).await.unwrap();
41        rhs.tx.close();
42
43        let _ = tokio::join!(adder);
44
45        let sum = sums.rx.recv().await.unwrap();
46        assert_eq!(sum, Some(3));
47
48        let sum = sums.rx.recv().await.unwrap();
49        assert_eq!(sum, None);
50
51        Ok(())
52    }
53}