use std::thread;
use bitvec::order::{Lsb0, Msb0};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error},
net::TcpListener,
};
use crate::{
protocol::{
managers::bits::{
decoder::{BitDecoder, FrameDecoder},
encoder::FrameEncoder,
},
prelude::common::{
registry::EVENT_REGISTRY_MSB,
utils::{Listener, DEVICES},
},
},
server::prelude::answer_error,
};
#[allow(unused_must_use)]
pub async fn listen(port: String) -> Result<(), Error> {
let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await?;
let static_listener: &'static TcpListener = Box::leak(Box::new(listener));
DEVICES.lock().unwrap().insert(
("127.0.0.1".to_string(), port.clone()),
Listener::TokioServer(static_listener),
);
println!("[SHDP:TCP] Listening on port {}", port.clone());
while let Ok((stream, _)) = DEVICES
.lock()
.unwrap()
.get(&("127.0.0.1".to_string(), port.clone()))
.unwrap()
.get_tokio_server()
.accept()
.await
{
thread::spawn(move || {
println!(
"[SHDP:TCP] New connection from {}",
stream.peer_addr().unwrap()
);
handle_client(stream);
});
}
Ok(())
}
pub async fn handle_client<IO: AsyncRead + AsyncWrite + Unpin>(mut stream: IO) {
let mut buffer = [0u8; 2usize.pow(32) / 8];
loop {
match stream.read(&mut buffer).await {
Ok(0) => break,
Ok(size) => {
let mut decoder = BitDecoder::<Msb0>::new(buffer[..size].to_vec());
let mut frame_decoder = FrameDecoder::<Msb0>::new(decoder);
let data = frame_decoder.decode().unwrap();
decoder = frame_decoder.get_decoder().to_owned();
let registry = EVENT_REGISTRY_MSB.lock().unwrap();
let factory = match registry.get_event((data.version, data.event)) {
Some(event) => event,
None => {
println!(
"[SHDP:TCP] Event not found: {} <-> {}",
data.version, data.event
);
stream
.write_all(&answer_error(
data.version,
crate::protocol::errors::Error {
code: 404,
message: "Event not found".to_string(),
kind: crate::protocol::errors::ErrorKind::NotFound,
},
))
.await
.unwrap();
return;
}
};
let mut event = factory(decoder);
match event.decode(data.clone()) {
Ok(_) => (),
Err(e) => {
stream
.write_all(&answer_error(data.version, e))
.await
.unwrap();
return;
}
}
let responses = match event.get_responses() {
Ok(responses) => responses,
Err(e) => {
stream
.write_all(&answer_error(data.version, e))
.await
.unwrap();
return;
}
};
for response in responses {
let mut encoder = match FrameEncoder::<Lsb0>::new(data.version) {
Ok(encoder) => encoder,
Err(e) => {
println!("[SHDP:TCP] Error creating encoder: {}", e);
return;
}
};
let frame = encoder.encode(response).unwrap();
match stream.write_all(&frame).await {
Ok(_) => (),
Err(e) => {
println!("[SHDP:TCP] Error writing to stream: {}", e);
return;
}
}
}
}
Err(e) => {
println!("[SHDP:TCP] Error reading from stream: {}", e);
break;
}
}
}
println!("[SHDP:TCP] Connection closed");
}