1use async_flow::{Inputs, Outputs, Result};
4use core::ops::Add;
5use tokio::try_join;
6
7pub async fn add<T>(mut lhs: Inputs<T>, mut rhs: Inputs<T>, sums: Outputs<T>) -> Result
9where
10 T: Add<Output = T>,
11{
12 loop {
13 let (a, b) = try_join!(lhs.recv(), rhs.recv())?;
14 match (a, b) {
15 (Some(a), Some(b)) => sums.send(a + b).await?,
16 _ => break,
17 }
18 }
19 Ok(())
20}
21
22#[cfg(test)]
23mod tests {
24 use super::*;
25 use alloc::boxed::Box;
26 use async_flow::Channel;
27 use core::error::Error;
28
29 #[tokio::test]
30 async fn test_add() -> Result<(), Box<dyn Error>> {
31 let mut lhs = Channel::bounded(1);
32 let mut rhs = Channel::bounded(1);
33 let mut sums = Channel::bounded(10);
34
35 let adder = tokio::spawn(add::<isize>(lhs.rx, rhs.rx, sums.tx));
36
37 lhs.tx.send(1).await.unwrap();
38 lhs.tx.close();
39
40 rhs.tx.send(2).await.unwrap();
41 rhs.tx.close();
42
43 let _ = tokio::join!(adder);
44
45 let sum = sums.rx.recv().await.unwrap();
46 assert_eq!(sum, Some(3));
47
48 let sum = sums.rx.recv().await.unwrap();
49 assert_eq!(sum, None);
50
51 Ok(())
52 }
53}