1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
use std::net::IpAddr;
use tokio::sync::mpsc;
use {
    crate::{channels, decenc, error::Result, message},
    futures::future,
    log::info,
    tokio::{io, net, time},
};

pub struct Consumer {
    consumer_address: (IpAddr, u16),
}

impl Consumer {
    pub fn new(consumer_address: (IpAddr, u16)) -> Consumer {
        Consumer { consumer_address }
    }

    pub async fn run(
        &self,
        mut consumer_recv: channels::ChanPair<message::Message>,
        consumer_conn: mpsc::Sender<mpsc::Sender<message::Message>>,
    ) -> Result<()> {
        let (send, mut recv) = consumer_recv.split();

        loop {
            info!(
                "Connecting to {}:{}",
                self.consumer_address.0, self.consumer_address.1
            );
            let socket = net::TcpStream::connect(self.consumer_address).await;

            let (r, w) = io::split(match socket {
                Err(_) => {
                    time::sleep(time::Duration::from_secs(5)).await;
                    continue;
                }
                Ok(s) => {
                    info!("Connected!");
                    s
                }
            });

            let read_future = decenc::run_read(r, &send);
            let write_future = decenc::run_write(w, &mut recv);

            consumer_conn.send(send.clone()).await?;

            future::try_join(read_future, write_future).await?;
        }
    }
}