#![allow(unused_imports)]
#![allow(unused_variables)]
use websocket_toolkit::controller::WebSocketController;
use tokio::time::{timeout, Duration, sleep};
use log::{info, error};
use env_logger;
use serde::{Deserialize, Serialize};
use serde_cbor;
use tokio::sync::Mutex;
use std::sync::Arc;
#[derive(Serialize, Deserialize, Debug)]
struct Message {
#[serde(rename = "type")]
msg_type: String,
content: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let url = "ws://node_server:9001";
let retries = 3;
let ping_interval = Some(5); let mut controller = WebSocketController::new(url, retries, ping_interval);
info!("Attempting to connect...");
let ws_stream = match timeout(Duration::from_secs(5), controller.connect()).await {
Ok(Ok(stream)) => {
info!("Connection successful.");
Arc::new(Mutex::new(stream))
}
Ok(Err(e)) => {
error!("Connection failed: {}", e);
return Ok(());
}
Err(_) => {
error!("Connection timed out.");
return Ok(());
}
};
send_test_messages(&mut controller, &ws_stream).await?;
simulate_keep_alive_and_reconnect(&mut controller, ws_stream, ping_interval).await?;
info!("Example successfully tested. Closing connection and exiting.");
Ok(())
}
async fn send_test_messages(
controller: &mut WebSocketController,
ws_stream: &Arc<Mutex<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>,
) -> Result<(), Box<dyn std::error::Error>> {
let json_message = serde_json::to_vec(&Message {
msg_type: "greeting".to_string(),
content: "Hello, server (JSON)!".to_string(),
})?;
controller
.send_message(&mut *ws_stream.lock().await, json_message.as_slice())
.await?;
info!("Sent JSON message.");
let cbor_message = serde_cbor::to_vec(&Message {
msg_type: "greeting".to_string(),
content: "Hello, server (CBOR)!".to_string(),
})?;
controller
.send_message(&mut *ws_stream.lock().await, cbor_message.as_slice())
.await?;
info!("Sent CBOR message.");
Ok(())
}
async fn simulate_keep_alive_and_reconnect(
controller: &mut WebSocketController,
ws_stream: Arc<Mutex<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>,
ping_interval: Option<u64>,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(interval) = ping_interval {
controller.maintain_connection(ws_stream.clone()).await?;
}
for _ in 0..3 {
let mut stream = ws_stream.lock().await;
match controller.receive_message(&mut *stream).await {
Ok(Some(msg)) => {
if let Ok(json_msg) = serde_json::from_slice::<Message>(&msg) {
info!("Received JSON: {:?}", json_msg);
} else if let Ok(cbor_msg) = serde_cbor::from_slice::<Message>(&msg) {
info!("Received CBOR: {:?}", cbor_msg);
} else {
error!(
"Unsupported message format: {:?}",
String::from_utf8_lossy(&msg)
);
}
}
Ok(None) => continue, Err(e) => {
error!("Error receiving message: {}", e);
break;
}
}
sleep(Duration::from_secs(ping_interval.unwrap_or(5))).await;
}
info!("Simulating server disconnect...");
controller.disconnect().await?;
sleep(Duration::from_secs(2)).await;
info!("Reconnecting...");
match controller.connect().await {
Ok(new_stream) => {
let ws_stream = Arc::new(Mutex::new(new_stream));
info!("Reconnected successfully!");
}
Err(e) => error!("Reconnection failed: {}", e),
}
Ok(())
}