1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
use std::net::IpAddr; use tokio::sync::mpsc; use { crate::{channels, decenc, error::Result, message}, futures::future, log::info, tokio::{io, net, time}, }; pub struct Consumer { consumer_address: (IpAddr, u16), } impl Consumer { pub fn new(consumer_address: (IpAddr, u16)) -> Consumer { Consumer { consumer_address } } pub async fn run( &self, mut consumer_recv: channels::ChanPair<message::Message>, consumer_conn: mpsc::Sender<mpsc::Sender<message::Message>>, ) -> Result<()> { let (send, mut recv) = consumer_recv.split(); loop { info!( "Connecting to {}:{}", self.consumer_address.0, self.consumer_address.1 ); let socket = net::TcpStream::connect(self.consumer_address).await; let (r, w) = io::split(match socket { Err(_) => { time::sleep(time::Duration::from_secs(5)).await; continue; } Ok(s) => { info!("Connected!"); s } }); let read_future = decenc::run_read(r, &send); let write_future = decenc::run_write(w, &mut recv); consumer_conn.send(send.clone()).await?; future::try_join(read_future, write_future).await?; } } }