use core::time::Duration;
use futures_util::StreamExt;
use rand::prelude::*;
use tokio::net::TcpListener;
use tokio_websockets::{Error, ServerBuilder};
use tracing::{error, info, level_filters::LevelFilter};
use tracing_subscriber::EnvFilter;
use arduino_plotter::{
protocol::{ClientCommand, EndOfLine, MonitorModelState, MonitorSettings},
Client, Server, ServerError,
};
async fn run_server_task(mut server: Server, client: Client) {
while let Some(value) = server.next().await {
match value {
Ok(message) => {
info!("Received message: {message:?}");
match message {
ClientCommand::SendMessage(_) => {}
ClientCommand::ChangeSettings(monitor_settings) => {
match monitor_settings.monitor_ui_settings {
Some(MonitorModelState {
line_ending: Some(eol),
..
}) => {
let eol_result = client
.set_monitor_settings(MonitorSettings {
monitor_ui_settings: Some(MonitorModelState {
line_ending: Some(eol),
..Default::default()
}),
..Default::default()
})
.await;
match eol_result {
Ok(_) => info!("New End of Line is set: {eol}"),
Err(err) => {
error!(?err, "New End of Line was not set in the UI")
}
}
}
_ => {}
}
}
}
}
Err(err) => {
error!(?err, "Error when receiving from socket");
match err {
ServerError::Ws(Error::Io(_))
| ServerError::Ws(Error::AlreadyClosed)
| ServerError::Ws(Error::CannotResolveHost) => {
break;
}
_ => {}
}
}
}
}
}
async fn run_client_task(client: Client) {
{
let settings = MonitorSettings {
pluggable_monitor_settings: None,
monitor_ui_settings: Some(MonitorModelState {
dark_theme: Some(true),
connected: Some(true),
line_ending: Some(EndOfLine::NewLine),
..Default::default()
}),
};
info!("Monitor Settings to be sent: {settings:?}");
match client.set_monitor_settings(settings).await {
Ok(_) => {}
Err(err) => error!("Failed to set settings: {err}"),
}
}
loop {
let mut data = vec![];
for _i in 0..6 {
let mut rng = rand::thread_rng();
let rand: u32 = rng.gen_range(0..100);
data.push(rand);
}
let data1_str = format!("L1:{},L2:{},L3:{}\n", data[0], data[1], data[2]);
let data2_str = format!("A:{},B:{},C:{}\n", data[3], data[4], data[5]);
let data: Vec<&str> = vec![&data1_str, &data2_str];
let send_result = client.send(&data).await;
match send_result {
Ok(_) => info!("Sent data message: {data:?}"),
Err(err) => {
error!("Sending data message failed: {err:?}");
if matches!(
err,
Error::AlreadyClosed | Error::Io(_) | Error::CannotResolveHost
) {
break;
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::TRACE.into())
.from_env_lossy();
tracing_subscriber::fmt().with_env_filter(env_filter).init();
let listener = TcpListener::bind("127.0.0.1:3030").await?;
loop {
while let Ok((stream, _plotter_addr)) = listener.accept().await {
let ws_stream = match ServerBuilder::new().accept(stream).await {
Ok(x) => x,
Err(err) => {
error!("Error performing HTTP upgrade handshake request: {err}");
continue;
}
};
let (ws_sink, ws_stream) = ws_stream.split();
let (client, server) = (Client::new(ws_sink), Server::new(ws_stream));
tokio::spawn(run_server_task(server, client.clone()));
tokio::spawn(run_client_task(client));
}
}
}