resonator 0.2.12

This crate allows 2 devices to send live PCM audio data to each other through a server
Documentation
use std::{sync::{Arc, Mutex}, net::{TcpListener, TcpStream}, thread, io::{Read, Write}};
use std::thread::JoinHandle;
use crate::{config::{AUDIO_BUFFER_SIZE, BUFFER_WITH_HEADER_SIZE, HEADER_SIZE, METADATA_SIZE}, common::{audiobuffer::AudioBuffer, headerdata::HeaderData}};
use crate::common::audiosample::AudioSample;
use crate::common::clientport::ClientPort;
use crate::common::handshakedata::HandshakeData;
use crate::common::parsable::Parsable;
use crate::config::HANDSHAKE_BUFFER_SIZE;
use crate::server::portmanager::PortManager;

use super::{client::{ClientManager, Client}, metadata::Metadata};

pub struct AudioData {
    pub sender: u32,
    pub reciever: u32,
    pub buffer_size: i32,
    pub sample_rate: i32,
    pub samples: [f32; AUDIO_BUFFER_SIZE],
}

pub struct ResonatorServer {
    pub handshake_port: u16,                                                        //Port used on initial client connection to specify the client receive port
    pub client_receive_ports: Arc<Mutex< Vec<Arc<Mutex<ClientPort>>> >>,     //A vec of all of the client receive ports currently in use
    pub client_send_ports: Arc<Mutex< Vec<Arc<Mutex<ClientPort>>> >>,        //A vec of all of the client send ports currently in use
}

impl ResonatorServer {
    /**

     * Constructs a new Resonator using the specified ports
     */
    pub fn new(handshake_port: u16) -> ResonatorServer {
        ResonatorServer {
            client_send_ports: Arc::new(Mutex::new(Vec::new())),
            handshake_port,
            client_receive_ports: Arc::new(Mutex::new(Vec::new()))
        }
    }

    /**

     * Begins the server, listening for incoming connections on the specified ports
     */
    pub fn begin(&self) {
        println!("Beginning socket server. Port {} will be used for clients to handshake with the server.", self.handshake_port);
        let client_manager: Arc<Mutex<ClientManager>> = Arc::new(Mutex::new(ClientManager::new()));

        //Handles audio data being sent from a client
        println!("Spawning client handshake thread on port {}", self.handshake_port);
        let client_handshake_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", self.handshake_port)).unwrap();
        println!("Bound to port");

        //Spawn a new thread for handling the client handshake
        {
            let client_manager = client_manager.clone();
            let client_receive_ports = self.client_receive_ports.clone();

            thread::spawn(move || {
                for stream in client_handshake_listener.incoming() {
                    let client_receive_ports = client_receive_ports.clone();
                    match stream {
                        Ok(tcp_stream) => {
                            println!("Client connected to handshake port");
                            let client_manager = client_manager.clone();
                            thread::spawn(move || {
                                handle_client_handshake(tcp_stream, client_manager, client_receive_ports.clone());
                            });
                        }
                        Err(e) => eprintln!("Error accepting connection: {}", e),
                    }
                }
            });
        }

        println!("Spawning listeners for client send and receive ports");
        //Spawn a new thread for handling incoming data sent from connected clients
        //PortManager::allocate_port_if_neccesary(self.client_send_ports.clone(), client_manager.clone(), spawn_client_send_thread);

        //Spawn a new thread for spawning a thread for each client receive port
        //PortManager::allocate_port_if_neccesary(self.client_receive_ports.clone(), client_manager.clone(), spawn_client_receive_thread);
    }
}

//Spawns a new thread for handling sending outgoing data to their respective recipients
fn spawn_client_receive_thread(crp: Arc<Mutex<ClientPort>>, client_manager: Arc<Mutex<ClientManager>>) -> JoinHandle<()> {
    let port = crp.lock().unwrap().port;

    //Handles broadcasting audio data back down to a client
    println!("Spawning client receive thread on port {}", port);
    let client_receive_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
    println!("Bound to port");

    let client_manager = client_manager.clone();
    thread::spawn(move || {
        for stream in client_receive_listener.incoming() {
            let crp = crp.clone();
            match stream {
                Ok(tcp_stream) => {
                    let client_manager = client_manager.clone();
                    thread::spawn(move || {
                        handle_client_will_receive_data(tcp_stream, client_manager, crp.clone());
                    });
                }
                Err(e) => eprintln!("Error accepting connection: {}", e),
            }
        }
    })
}

fn spawn_client_send_thread(crp: Arc<Mutex<ClientPort>>, client_manager: Arc<Mutex<ClientManager>>) -> JoinHandle<()> {
    let port = crp.lock().unwrap().port;

    println!("Spawning client send thread on port {}", port);
    let client_send_listener: TcpListener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
    println!("Bound to port");

    let client_manager = client_manager.clone();

    thread::spawn(move || {
        for stream in client_send_listener.incoming() {
            match stream {
                Ok(tcp_stream) => {
                    println!("Client connected to incoming data port");
                    let client_manager = client_manager.clone();
                    let crp = crp.clone();

                    thread::spawn(move || {
                        handle_client_sent_data(tcp_stream, client_manager, crp.clone());
                    });
                }
                Err(e) => eprintln!("Error accepting connection: {}", e),
            }
        }
    })
}

fn handle_client_handshake(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>, client_receive_ports: Arc<Mutex< Vec<Arc<Mutex<ClientPort>>> >>) {
    let mut buffer = [0; HANDSHAKE_BUFFER_SIZE]; //Stores the raw bytes of the initial data sent on connection

    loop {
        //Read next data from the client if available
        match tcp_stream.read(&mut buffer) {
            Ok(size) if size == 0 => {
                println!("Client closed connection");
                break;
            }
            Ok(size) => {
                println!("Client handshake data received");
                size
            },
            Err(_) => {
                println!("Client error");
                break;
            }
        };

        //Copy the raw bytes into their respective buffers
        let bytes = &buffer[..];

        //Parse the handshake data
        let handshake_data = HandshakeData::from_bytes(bytes);
        println!("{:#?}", handshake_data);

        //Create a new client through the client manager
        client_manager.lock().unwrap().add(handshake_data.id, handshake_data.receive_port, handshake_data.send_port);

        //Allocate ports as needed
        PortManager::allocate_port_if_necessary(client_receive_ports.clone(), client_manager.clone(), handshake_data.send_port, spawn_client_send_thread);
        PortManager::allocate_port_if_necessary(client_receive_ports.clone(), client_manager.clone(), handshake_data.receive_port, spawn_client_receive_thread);

        //Add a ClientReceivePort instance to the client_receive_ports vec if there is not already a thread running on this port

        let temp_client_receive_port = ClientPort {
            port: handshake_data.receive_port,
            connected_client_ids: Vec::new(),
            handle: None
        };

        println!("Checking if thread already exists for client receive port {}", temp_client_receive_port.port);
        let mut port_found = false;
        let client_receive_ports = client_receive_ports.clone();
        for crp in client_receive_ports.clone().lock().unwrap().iter() {
            println!("Checking client receive port {}", crp.lock().unwrap().port);
            if crp.lock().unwrap().port == temp_client_receive_port.port {
                println!("Thread already exists for client receive port {}", temp_client_receive_port.port);
                port_found = true;
                break;
            }
        }

        if !port_found {
            println!("No thread exists for client receive port {}", temp_client_receive_port.port);
            client_receive_ports.lock().unwrap().push(Arc::new(Mutex::new(temp_client_receive_port.clone())));
        }
    }
}

/**

 * Loop which handles receiving incoming data sent from connected clients
 */
fn handle_client_sent_data(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>, crp: Arc<Mutex<ClientPort>>) {
    let mut buffer = [0; BUFFER_WITH_HEADER_SIZE]; //Stores the raw bytes of the audio buffer sent from the client
    let mut id: i32 = 0; //Stores the clients id

    loop {
        println!("Receiver Thread: Reading data from stream");
        //Read the next audio buffer sent from the client if available
        match tcp_stream.read(&mut buffer) {
            Ok(size) if size == 0 => {
                println!("Client closed connection");
                break;
            }
            Ok(size) => {
                println!("New data on incoming data port");
                size
            },
            Err(_) => {
                println!("Client error");
                break;
            }
        };
        println!("Receiver Thread: Data has been read");

        //Copy the raw bytes into their respective buffers
        let bytes = &buffer[..];
        let header_bytes = &bytes[..HEADER_SIZE];
        let metadata_bytes = &bytes[HEADER_SIZE..(HEADER_SIZE + METADATA_SIZE)];
        let audio_bytes = &bytes[(HEADER_SIZE + METADATA_SIZE)..];

        //Parse the sender and receiver ids from the header
        let header: HeaderData = HeaderData::from_bytes(header_bytes);

        //Parse the metadata
        let metadata: Metadata = Metadata::from_bytes(metadata_bytes);

        //Parse the audio samples
        let samples: Vec<f32> = AudioSample::parse_samples(audio_bytes).iter().map(|sample| sample.value).collect::<Vec<f32>>();

        println!("Received {} samples from client {} to send to client {}", samples.len(), header.sender_id, header.receiver_id);

        //Set the (sender) id to be the id specified in the header
        id = header.sender_id;

        //Lock & access the ClientManager Mutex
        let mut temp_client_manager = client_manager.lock().unwrap();
        
        //Add the client to the ClientManager if they are not already in it
        let client_added = temp_client_manager.get_by_id(header.sender_id).is_some();
        if !client_added {
            println!("Client not found in client manager but sent data.");
            break;
        }

        //Construct an AudioBuffer from the received byte data
        let new_buff = AudioBuffer {
            samples,
            metadata: Metadata {
                sample_rate: metadata.sample_rate,
                buffer_size: metadata.buffer_size,
            }
        };

        //Add the buffer to the queue of buffers to be sent to the intended recipient
        let temp_client: &mut Client = temp_client_manager.get_by_id(header.receiver_id).unwrap();
        temp_client.enqueue_buffer(new_buff.clone());
    }

    println!("Receiver Thread: Client disconnected");

    let mut temp_client_manager = client_manager.lock().unwrap();
    temp_client_manager.remove(id);

    //Remove the client from the ClientReceivePort connected client ids vector
    let mut crp = crp.lock().unwrap();
    let index = crp.connected_client_ids.iter().position(|_id| *_id == id);
    if index.is_some() {
        crp.connected_client_ids.remove(index.unwrap());
    }
}

/**

 * Loop which handles sending outgoing data to their respective recipients
 */
fn handle_client_will_receive_data(mut tcp_stream: TcpStream, client_manager: Arc<Mutex<ClientManager>>, crp: Arc<Mutex<ClientPort>>) {
    let mut buffer = [0; std::mem::size_of::<i32>()]; //Stores the raw bytes of the initial data sent on connection
    let mut id: i32 = 0; //The clients id

    loop {
        //If the client has not yet sent their id, read the next data sent from the client. Otherwise, send the next buffer to the client
        if id == 0 {
            //Read data sent from the client if available
            match tcp_stream.read(&mut buffer) {
                Ok(size) if size == 0 => {
                    println!("Client closed connection");
                    break;
                }
                Ok(size) => size,
                Err(_) => {
                    println!("Client error");
                    break;
                }
            };

            let bytes = &buffer[..];
            id = i32::from_le_bytes(bytes.try_into().unwrap());
        } else {
            //Lock & access the ClientManager Mutex
            let mut temp_client_manager = client_manager.lock().unwrap();
            let temp_client: &mut Client = temp_client_manager.get_by_id(id).unwrap();
            
            //Check if there is a buffer ready to be sent to the client
            let next_buffer = temp_client.get_next_buffer();
            if next_buffer.is_some() {
                //Send the buffer to the client
                let next_buffer = next_buffer.unwrap().as_buffer().unwrap();
                let raw_buffer = next_buffer.raw_buffer();
                tcp_stream.write_all(raw_buffer).unwrap();
            }
        }
    }

    //On disconnect remove the client from the ClientManager
    let mut temp_client_manager = client_manager.lock().unwrap();
    temp_client_manager.remove(id);

    //Remove the client from the ClientReceivePort connected client ids vector
    let mut crp = crp.lock().unwrap();
    let index = crp.connected_client_ids.iter().position(|_id| *_id == id);
    if index.is_some() {
        crp.connected_client_ids.remove(index.unwrap());
    }
}