thfmr_protocol/
consumer.rs

1use std::net::IpAddr;
2use tokio::sync::mpsc;
3use {
4    crate::{channels, decenc, error::Result, message},
5    futures::future,
6    log::info,
7    tokio::{io, net, time},
8};
9
10pub struct Consumer {
11    consumer_address: (IpAddr, u16),
12}
13
14impl Consumer {
15    pub fn new(consumer_address: (IpAddr, u16)) -> Consumer {
16        Consumer { consumer_address }
17    }
18
19    pub async fn run(
20        &self,
21        mut consumer_recv: channels::ChanPair<message::Message>,
22        consumer_conn: mpsc::Sender<mpsc::Sender<message::Message>>,
23    ) -> Result<()> {
24        let (send, mut recv) = consumer_recv.split();
25
26        loop {
27            info!(
28                "Connecting to {}:{}",
29                self.consumer_address.0, self.consumer_address.1
30            );
31            let socket = net::TcpStream::connect(self.consumer_address).await;
32
33            let (r, w) = io::split(match socket {
34                Err(_) => {
35                    time::sleep(time::Duration::from_secs(5)).await;
36                    continue;
37                }
38                Ok(s) => {
39                    info!("Connected!");
40                    s
41                }
42            });
43
44            let read_future = decenc::run_read(r, &send);
45            let write_future = decenc::run_write(w, &mut recv);
46
47            consumer_conn.send(send.clone()).await?;
48
49            future::try_join(read_future, write_future).await?;
50        }
51    }
52}