1use async_flow::{Inputs, Outputs, Result};
4use core::ops::Add;
5use tokio::try_join;
6
7pub async fn add<T>(mut lhs: Inputs<T>, mut rhs: Inputs<T>, sums: Outputs<T>) -> Result
9where
10 T: Add<Output = T>,
11{
12 loop {
13 let (a, b) = try_join!(lhs.recv(), rhs.recv())?;
14 match (a, b) {
15 (Some(a), Some(b)) => sums.send(a + b).await?,
16 _ => break,
17 }
18 }
19 Ok(())
20}
21
22#[cfg(test)]
23mod tests {
24 use super::*;
25
26 #[tokio::test]
27 async fn test_add() {
28 use async_flow::{Port, bounded};
29
30 let (mut lhs_tx, lhs_rx) = bounded(1);
31 let (mut rhs_tx, rhs_rx) = bounded(1);
32 let (sums_tx, mut sums_rx) = bounded(10);
33
34 let adder = tokio::spawn(add::<isize>(lhs_rx, rhs_rx, sums_tx));
35
36 lhs_tx.send(1).await.unwrap();
37 lhs_tx.close();
38
39 rhs_tx.send(2).await.unwrap();
40 rhs_tx.close();
41
42 let _ = tokio::join!(adder);
43
44 let sum = sums_rx.recv().await.unwrap();
45 assert_eq!(sum, Some(3));
46
47 let sum = sums_rx.recv().await.unwrap();
48 assert_eq!(sum, None);
49 }
50}