thfmr_protocol/
consumer.rs1use std::net::IpAddr;
2use tokio::sync::mpsc;
3use {
4 crate::{channels, decenc, error::Result, message},
5 futures::future,
6 log::info,
7 tokio::{io, net, time},
8};
9
10pub struct Consumer {
11 consumer_address: (IpAddr, u16),
12}
13
14impl Consumer {
15 pub fn new(consumer_address: (IpAddr, u16)) -> Consumer {
16 Consumer { consumer_address }
17 }
18
19 pub async fn run(
20 &self,
21 mut consumer_recv: channels::ChanPair<message::Message>,
22 consumer_conn: mpsc::Sender<mpsc::Sender<message::Message>>,
23 ) -> Result<()> {
24 let (send, mut recv) = consumer_recv.split();
25
26 loop {
27 info!(
28 "Connecting to {}:{}",
29 self.consumer_address.0, self.consumer_address.1
30 );
31 let socket = net::TcpStream::connect(self.consumer_address).await;
32
33 let (r, w) = io::split(match socket {
34 Err(_) => {
35 time::sleep(time::Duration::from_secs(5)).await;
36 continue;
37 }
38 Ok(s) => {
39 info!("Connected!");
40 s
41 }
42 });
43
44 let read_future = decenc::run_read(r, &send);
45 let write_future = decenc::run_write(w, &mut recv);
46
47 consumer_conn.send(send.clone()).await?;
48
49 future::try_join(read_future, write_future).await?;
50 }
51 }
52}