use std::{sync::{Arc, Mutex}, net::{TcpListener, TcpStream}, thread, io::{Read, Write}};
use std::thread::JoinHandle;
use crate::{config::{AUDIO_BUFFER_SIZE, BUFFER_WITH_HEADER_SIZE, HEADER_SIZE, METADATA_SIZE}, common::{audiobuffer::AudioBuffer, headerdata::HeaderData}};
use crate::common::audiosample::AudioSample;
use crate::common::clientport::ClientPort;
use crate::common::handshakedata::HandshakeData;
use crate::common::parsable::Parsable;
use crate::config::HANDSHAKE_BUFFER_SIZE;
use crate::server::portmanager::PortManager;
use super::{client::{ClientManager, Client}, metadata::Metadata};
pub struct AudioData {
pub sender: u32,
pub reciever: u32,
pub buffer_size: i32,
pub sample_rate: i32,
pub samples: [f32; AUDIO_BUFFER_SIZE],
}
pub struct ResonatorServer {
pub handshake_port: u16, pub client_receive_ports: Arc<Mutex< Vec<Arc<Mutex<ClientPort>>> >>, pub client_send_ports: Arc<Mutex< Vec<Arc<Mutex<ClientPort>>> >>, }
impl ResonatorServer {
pub fn new(handshake_port: u16) -> ResonatorServer {
ResonatorServer {
client_send_ports: Arc::new(Mutex::new(Vec::new())),
handshake_port,
client_receive_ports: Arc::new(Mutex::new(Vec::new()))
}
}
pub fn begin(&self) {
println!("Beginning socket server. Port {} will be used for clients to handshake with the server.", self.handshake_port);
let client_manager: Arc<Mutex<ClientManager>> = Arc::new(Mutex::new(ClientManager::new()));
println!("Spawning client handshake thread on port {}", self.handshake_port);
let client_handshake_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", self.handshake_port)).unwrap();
println!("Bound to port");
{
let client_manager = client_manager.clone();
let client_receive_ports = self.client_receive_ports.clone();
thread::spawn(move || {
for stream in client_handshake_listener.incoming() {
let client_receive_ports = client_receive_ports.clone();
match stream {
Ok(tcp_stream) => {
println!("Client connected to handshake port");
let client_manager = client_manager.clone();
thread::spawn(move || {
handle_client_handshake(tcp_stream, client_manager, client_receive_ports.clone());
});
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
});
}
println!("Spawning listeners for client send and receive ports");
}
}
fn spawn_client_receive_thread(crp: Arc<Mutex<ClientPort>>, client_manager: Arc<Mutex<ClientManager>>) -> JoinHandle<()> {
let port = crp.lock().unwrap().port;
println!("Spawning client receive thread on port {}", port);
let client_receive_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
println!("Bound to port");
let client_manager = client_manager.clone();
thread::spawn(move || {
for stream in client_receive_listener.incoming() {
let crp = crp.clone();
match stream {
Ok(tcp_stream) => {
let client_manager = client_manager.clone();
thread::spawn(move || {
handle_client_will_receive_data(tcp_stream, client_manager, crp.clone());
});
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
})
}
fn spawn_client_send_thread(crp: Arc<Mutex<ClientPort>>, client_manager: Arc<Mutex<ClientManager>>) -> JoinHandle<()> {
let port = crp.lock().unwrap().port;
println!("Spawning client send thread on port {}", port);
let client_send_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
println!("Bound to port");
let client_manager = client_manager.clone();
thread::spawn(move || {
for stream in client_send_listener.incoming() {
match stream {
Ok(tcp_stream) => {
println!("Client connected to incoming data port");
let client_manager = client_manager.clone();
let crp = crp.clone();
thread::spawn(move || {
handle_client_sent_data(tcp_stream, client_manager, crp.clone());
});
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
})
}
fn handle_client_handshake(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>, client_receive_ports: Arc<Mutex< Vec<Arc<Mutex<ClientPort>>> >>) {
let mut buffer = [0; HANDSHAKE_BUFFER_SIZE];
loop {
match tcp_stream.read(&mut buffer) {
Ok(size) if size == 0 => {
println!("Client closed connection");
break;
}
Ok(size) => {
println!("Client handshake data received");
size
},
Err(_) => {
println!("Client error");
break;
}
};
let bytes = &buffer[..];
let handshake_data = HandshakeData::from_bytes(bytes);
println!("{:#?}", handshake_data);
client_manager.lock().unwrap().add(handshake_data.id, handshake_data.receive_port, handshake_data.send_port);
PortManager::allocate_port_if_necessary(client_receive_ports.clone(), client_manager.clone(), handshake_data.send_port, spawn_client_send_thread);
PortManager::allocate_port_if_necessary(client_receive_ports.clone(), client_manager.clone(), handshake_data.receive_port, spawn_client_receive_thread);
let temp_client_receive_port = ClientPort {
port: handshake_data.receive_port,
connected_client_ids: Vec::new(),
handle: None
};
println!("Checking if thread already exists for client receive port {}", temp_client_receive_port.port);
let mut port_found = false;
let client_receive_ports = client_receive_ports.clone();
for crp in client_receive_ports.clone().lock().unwrap().iter() {
println!("Checking client receive port {}", crp.lock().unwrap().port);
if crp.lock().unwrap().port == temp_client_receive_port.port {
println!("Thread already exists for client receive port {}", temp_client_receive_port.port);
port_found = true;
break;
}
}
if !port_found {
println!("No thread exists for client receive port {}", temp_client_receive_port.port);
client_receive_ports.lock().unwrap().push(Arc::new(Mutex::new(temp_client_receive_port.clone())));
}
}
}
fn handle_client_sent_data(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>, crp: Arc<Mutex<ClientPort>>) {
let mut buffer = [0; BUFFER_WITH_HEADER_SIZE]; let mut id: i32 = 0;
loop {
println!("Receiver Thread: Reading data from stream");
match tcp_stream.read(&mut buffer) {
Ok(size) if size == 0 => {
println!("Client closed connection");
break;
}
Ok(size) => {
println!("New data on incoming data port");
size
},
Err(_) => {
println!("Client error");
break;
}
};
println!("Receiver Thread: Data has been read");
let bytes = &buffer[..];
let header_bytes = &bytes[..HEADER_SIZE];
let metadata_bytes = &bytes[HEADER_SIZE..(HEADER_SIZE + METADATA_SIZE)];
let audio_bytes = &bytes[(HEADER_SIZE + METADATA_SIZE)..];
let header: HeaderData = HeaderData::from_bytes(header_bytes);
let metadata: Metadata = Metadata::from_bytes(metadata_bytes);
let samples: Vec<f32> = AudioSample::parse_samples(audio_bytes).iter().map(|sample| sample.value).collect::<Vec<f32>>();
println!("Received {} samples from client {} to send to client {}", samples.len(), header.sender_id, header.receiver_id);
id = header.sender_id;
let mut temp_client_manager = client_manager.lock().unwrap();
let client_added = temp_client_manager.get_by_id(header.sender_id).is_some();
if !client_added {
println!("Client not found in client manager but sent data.");
break;
}
let new_buff = AudioBuffer {
samples,
metadata: Metadata {
sample_rate: metadata.sample_rate,
buffer_size: metadata.buffer_size,
}
};
let temp_client: &mut Client = temp_client_manager.get_by_id(header.receiver_id).unwrap();
temp_client.enqueue_buffer(new_buff.clone());
}
println!("Receiver Thread: Client disconnected");
let mut temp_client_manager = client_manager.lock().unwrap();
temp_client_manager.remove(id);
let mut crp = crp.lock().unwrap();
let index = crp.connected_client_ids.iter().position(|_id| *_id == id);
if index.is_some() {
crp.connected_client_ids.remove(index.unwrap());
}
}
fn handle_client_will_receive_data(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>, crp: Arc<Mutex<ClientPort>>) {
let mut buffer = [0; std::mem::size_of::<i32>()]; let mut id: i32 = 0;
loop {
if id == 0 {
match tcp_stream.read(&mut buffer) {
Ok(size) if size == 0 => {
println!("Client closed connection");
break;
}
Ok(size) => size,
Err(_) => {
println!("Client error");
break;
}
};
let bytes = &buffer[..];
id = i32::from_le_bytes(bytes.try_into().unwrap());
} else {
let mut temp_client_manager = client_manager.lock().unwrap();
let temp_client: &mut Client = temp_client_manager.get_by_id(id).unwrap();
let next_buffer = temp_client.get_next_buffer();
if next_buffer.is_some() {
let next_buffer = next_buffer.unwrap().as_buffer().unwrap();
let raw_buffer = next_buffer.raw_buffer();
tcp_stream.write_all(raw_buffer).unwrap();
}
}
}
let mut temp_client_manager = client_manager.lock().unwrap();
temp_client_manager.remove(id);
let mut crp = crp.lock().unwrap();
let index = crp.connected_client_ids.iter().position(|_id| *_id == id);
if index.is_some() {
crp.connected_client_ids.remove(index.unwrap());
}
}