use std::{sync::{Arc, Mutex}, net::{TcpListener, TcpStream}, thread, io::{Read, Write}};
use crate::{config::{AUDIO_BUFFER_SIZE, BUFFER_WITH_HEADER_SIZE, HEADER_SIZE, METADATA_SIZE}, common::audiobuffer::AudioBuffer};
use super::{client::{ClientManager, Client}, metadata::Metadata};
pub struct AudioData {
pub sender: u32,
pub reciever: u32,
pub buffer_size: i32,
pub sample_rate: i32,
pub samples: [f32; AUDIO_BUFFER_SIZE],
}
pub struct ResonatorServer {
pub client_send_port: u16,
pub client_receive_port: u16,
}
impl ResonatorServer {
pub fn new(client_send_port: u16, client_receive_port: u16) -> ResonatorServer {
ResonatorServer {
client_send_port,
client_receive_port
}
}
pub fn begin(&self) {
println!("Beginning socket server. Port {} will be used for sending audio data to this server, and port {} will be used for recieving outgoing audio data from this server.", self.client_send_port, self.client_receive_port);
let client_manager: Arc<Mutex<ClientManager>> = Arc::new(Mutex::new(ClientManager::new()));
let client_send_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", self.client_send_port)).unwrap();
let client_receive_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", self.client_receive_port)).unwrap();
{
let client_manager = client_manager.clone();
thread::spawn(move || {
for stream in client_send_listener.incoming() {
match stream {
Ok(tcp_stream) => {
let client_manager = client_manager.clone();
thread::spawn(move || {
handle_client_sent_data(tcp_stream, client_manager);
});
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
});
}
{
let client_manager = client_manager.clone();
thread::spawn(move || {
for stream in client_receive_listener.incoming() {
match stream {
Ok(tcp_stream) => {
let client_manager = client_manager.clone();
thread::spawn(move || {
handle_client_will_receive_data(tcp_stream, client_manager);
});
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
});
}
}
}
fn handle_client_sent_data(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>) {
let mut buffer = [0; BUFFER_WITH_HEADER_SIZE]; let mut id: i32 = 0;
loop {
match tcp_stream.read(&mut buffer) {
Ok(size) if size == 0 => {
println!("Client closed connection");
break;
}
Ok(size) => size,
Err(_) => {
println!("Client error");
break;
}
};
let bytes = &buffer[..];
let header_bytes = &bytes[..HEADER_SIZE];
let metadata_bytes = &bytes[HEADER_SIZE..(HEADER_SIZE + METADATA_SIZE)];
let audio_bytes = &bytes[(HEADER_SIZE + METADATA_SIZE)..];
let h_sender_id = i32::from_le_bytes(header_bytes[0..4].try_into().unwrap());
let h_reciever_id = i32::from_le_bytes(header_bytes[4..8].try_into().unwrap());
let m_buff_size = i32::from_le_bytes(metadata_bytes[0..4].try_into().unwrap());
let m_sample_rate = i32::from_le_bytes(metadata_bytes[4..8].try_into().unwrap());
let samples: Vec<f32> = audio_bytes.chunks_exact(4).map(|chunk| {
f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]])
})
.collect();
id = h_sender_id;
let mut temp_client_manager = client_manager.lock().unwrap();
let client_added = temp_client_manager.get_by_id(h_sender_id).is_some();
if !client_added {
temp_client_manager.add(h_sender_id);
}
let new_buff = AudioBuffer {
samples,
metadata: Metadata {
sample_rate: m_sample_rate,
buffer_size: m_buff_size,
}
};
if temp_client_manager.get_by_id(h_reciever_id).is_none() {
temp_client_manager.add(h_reciever_id);
}
let temp_client: &mut Client = temp_client_manager.get_by_id(h_reciever_id).unwrap();
temp_client.enqueue_buffer(new_buff.clone());
}
let mut temp_client_manager = client_manager.lock().unwrap();
temp_client_manager.remove(id);
}
fn handle_client_will_receive_data(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>) {
let mut buffer = [0; std::mem::size_of::<i32>()]; let mut id: i32 = 0;
loop {
if id == 0 {
match tcp_stream.read(&mut buffer) {
Ok(size) if size == 0 => {
println!("Client closed connection");
break;
}
Ok(size) => size,
Err(_) => {
println!("Client error");
break;
}
};
let bytes = &buffer[..];
id = i32::from_le_bytes(bytes.try_into().unwrap());
let mut temp_client_manager = client_manager.lock().unwrap();
if temp_client_manager.get_by_id(id).is_none() {
temp_client_manager.add(id);
}
} else {
let mut temp_client_manager = client_manager.lock().unwrap();
let temp_client: &mut Client = temp_client_manager.get_by_id(id).unwrap();
let next_buffer = temp_client.get_next_buffer();
if next_buffer.is_some() {
let next_buffer = next_buffer.unwrap().as_buffer().unwrap();
let raw_buffer = next_buffer.raw_buffer();
tcp_stream.write_all(raw_buffer).unwrap();
}
}
}
let mut temp_client_manager = client_manager.lock().unwrap();
temp_client_manager.remove(id);
}