client/client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
use futures::StreamExt;
use log::*;
use rand::distr::Alphanumeric;
use rand::{thread_rng, Rng};
use socket_flow::handshake::connect_async;
use tokio::select;
use tokio::time::{interval, Duration};
async fn handle_connection(addr: &str) {
match connect_async(addr, None).await {
Ok(mut ws_connection) => {
let mut ticker = interval(Duration::from_secs(5));
// it will be used for closing the connection
let mut counter = 0;
loop {
select! {
Some(result) = ws_connection.next() => {
match result {
Ok(message) => {
info!("Received message: {}", message.as_text().unwrap());
counter = counter + 1;
// close the connection if 3 messages have already been sent and received
if counter >= 3 {
if ws_connection.close_connection().await.is_err() {
error!("Error occurred when closing connection");
}
break;
}
}
Err(err) => {
error!("Received error from the stream: {}", err);
break;
}
}
}
_ = ticker.tick() => {
let random_string = generate_random_string();
let binary_data = Vec::from(random_string);
if ws_connection.send(binary_data).await.is_err() {
eprintln!("Failed to send message");
break;
}
}
}
}
}
Err(err) => error!("Error when performing handshake: {}", err),
}
}
#[tokio::main]
async fn main() {
env_logger::init();
handle_connection("ws://localhost:9002").await;
}
fn generate_random_string() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect()
}