use std::{sync::{Arc, Mutex}, net::TcpStream, io::{Write, Read}, thread};
use crate::{common::audiobuffer::AudioBuffer, config::BUFFER_SIZE};
use crate::common::handshakedata::HandshakeData;
pub struct ResonatorClient {
send_port: u16, receive_port: u16, server_address: String, handshake_port: u16, outgoing_buffers: Arc<Mutex<Vec<AudioBuffer>>>, incoming_buffers: Arc<Mutex<Vec<AudioBuffer>>>, id: i32, receiver_id: i32 }
impl ResonatorClient {
pub fn new(server_address: String, send_port: u16, receive_port: u16, handshake_port: u16, id: i32, receiver_id: i32, external_buffer_input: Option<Arc<Mutex<Vec<AudioBuffer>>>>, external_buffer_output: Option<Arc<Mutex<Vec<AudioBuffer>>>>) -> ResonatorClient {
ResonatorClient {
server_address,
send_port,
receive_port,
handshake_port,
outgoing_buffers: external_buffer_input.unwrap_or(Arc::new(Mutex::new(Vec::new()))), incoming_buffers: external_buffer_output.unwrap_or(Arc::new(Mutex::new(Vec::new()))), id,
receiver_id
}
}
pub fn get_incoming_buffers(&self) -> Arc<Mutex<Vec<AudioBuffer>>> {
self.incoming_buffers.clone()
}
pub fn get_outgoing_buffers(&self) -> Arc<Mutex<Vec<AudioBuffer>>> {
self.outgoing_buffers.clone()
}
pub fn process_buffer(&self) -> Option<AudioBuffer> {
if self.incoming_buffers.lock().unwrap().len() > 0 {
return Some(self.incoming_buffers.lock().unwrap().remove(0));
}
None
}
pub fn push_buffer(&self, audio_buffer: AudioBuffer) {
self.outgoing_buffers.lock().unwrap().push(audio_buffer);
}
pub fn begin(&self) {
println!("Starting client on ports {} and {}", self.send_port, self.receive_port);
self.handshake();
thread::sleep(std::time::Duration::from_millis(1000));
{
let server_address = self.server_address.clone();
let receive_port = self.receive_port.clone();
let incoming_buffers = self.incoming_buffers.clone();
let id = self.id.clone();
thread::spawn(move || {
ResonatorClient::_receiver_thread(server_address.clone(), receive_port.clone(), incoming_buffers.clone(), id.clone());
loop {}
});
}
{
let server_address = self.server_address.clone();
let send_port = self.send_port.clone();
let outgoning_buffers = self.outgoing_buffers.clone();
let id = self.id.clone();
let receiver_id = self.receiver_id.clone();
thread::spawn(move || {
ResonatorClient::_sender_thread(server_address.clone(), send_port.clone(), outgoning_buffers.clone(), id.clone(), receiver_id.clone());
loop {}
});
}
}
pub fn handshake(&self) {
let mut stream = TcpStream::connect(format!("{}:{}", self.server_address, self.handshake_port)).unwrap();
let handshake_data = HandshakeData {
send_port: self.send_port,
receive_port: self.receive_port,
id: self.id,
};
stream.write_all(&handshake_data.to_bytes()).unwrap();
}
fn _receiver_thread(server_address: String, port: u16, incoming_buffers: Arc<Mutex<Vec<AudioBuffer>>>, id: i32) {
println!("Connecting to receiver thread on server at {}:{}", server_address, port);
let mut stream = TcpStream::connect(format!("{}:{}", server_address, port)).unwrap();
stream.write_all(&id.to_le_bytes()).unwrap();
let mut buffer = [0; BUFFER_SIZE];
loop {
match stream.read(&mut buffer) {
Ok(size) if size == 0 => {
println!("Client closed connection");
break;
}
Ok(size) => size,
Err(_) => {
println!("Client error");
break;
}
};
let audio_buffer = AudioBuffer::from_bytes(&buffer);
incoming_buffers.lock().unwrap().push(audio_buffer.clone());
println!("Received buffer!");
}
}
fn _sender_thread(server_address: String, port: u16, outgoing_buffers: Arc<Mutex<Vec<AudioBuffer>>>, id: i32, receiver_id: i32) {
println!("Connecting to sender thread on server at {}:{}", server_address, port);
let mut stream = TcpStream::connect(format!("{}:{}", server_address, port)).unwrap();
println!("Connected");
loop {
match outgoing_buffers.lock() {
Ok(mut outbuf) => {
if outbuf.len() == 0 {
continue;
}
println!("Sending buffer from client");
let audio_buffer = outbuf.remove(0);
let raw_buffer = audio_buffer.as_buffer().unwrap().raw_buffer_with_header(id, receiver_id);
println!("Sending buffer of size {}", raw_buffer.len());
stream.write_all(&raw_buffer).unwrap();
},
Err(e) => panic!("Failed to lock buffer mutex: {}", e),
}
}
}
}