resonator 0.3.2

This crate allows 2 devices to send live PCM audio data to each other through a server
Documentation
use std::{sync::{Arc, Mutex}, thread};

use crate::{common::audiobuffer::AudioBuffer};

use rust_socketio::{ClientBuilder, Payload, RawClient};
use rust_socketio::client::Client;
use serde_json::json;
use crate::common::headerdata::HeaderData;
use crate::common::metadata::Metadata;
use crate::common::packet::Packet;

#[derive(Clone)]
pub struct ResonatorClient {
    server_address: String,                          //The address of the server
    outgoing_buffers: Arc<Mutex<Vec<AudioBuffer>>>,  //The buffers which are to be sent to the server
    incoming_buffers: Arc<Mutex<Vec<AudioBuffer>>>,  //The buffers which have been received from the server, but not yet processed
    id: i32,                                         //The id of this client
    room_id: String,                                 //The id of the session which this client resides in
    socket: Option<Arc<Mutex<Client>>>,
    pub on_info_log: Option<fn(String)>,
    pub on_error_log: Option<fn(String)>,
    pub on_warn_log: Option<fn(String)>,
}

impl ResonatorClient {
    /**

     * Constructs a new ResonatorClient using the specified ports
     * If external_buffer_input is provided then the client will use that buffer as the audio source
     * If external_buffer_output is provided then the client will use that buffer as the audio output
     */
    pub fn new(server_address: String, id: i32, room_id: String, external_buffer_input: Option<Arc<Mutex<Vec<AudioBuffer>>>>, external_buffer_output: Option<Arc<Mutex<Vec<AudioBuffer>>>>) -> ResonatorClient {
        ResonatorClient {
            server_address,
            outgoing_buffers: external_buffer_input.unwrap_or(Arc::new(Mutex::new(Vec::new()))), //Outgoing buffers will be pulled from the input buffer
            incoming_buffers: external_buffer_output.unwrap_or(Arc::new(Mutex::new(Vec::new()))), //Incoming buffers will be pushed to the output buffer
            id,
            room_id,
            socket: None,
            on_info_log: Some(|msg| println!("{}", msg)),
            on_error_log: Some(|msg| eprintln!("{}", msg)),
            on_warn_log: Some(|msg| eprintln!("{}", msg)),
        }
    }

    //Getter function for returning the vector of recieved audio buffers
    pub fn get_incoming_buffers(&self) -> Arc<Mutex<Vec<AudioBuffer>>> {
        self.incoming_buffers.clone()
    }

    pub fn get_outgoing_buffers(&self) -> Arc<Mutex<Vec<AudioBuffer>>> {
        self.outgoing_buffers.clone()
    }

    //Returns the first buffer in the incoming buffers vec, and removes it
    pub fn process_buffer(&self) -> Option<AudioBuffer> {
        if self.incoming_buffers.lock().unwrap().len() > 0 {
            return Some(self.incoming_buffers.lock().unwrap().remove(0));
        }

        None
    }

    //Pushes a buffer to the outgoing buffers vec to be sent to the server
    pub fn push_buffer(&self, audio_buffer: AudioBuffer) {
        self.outgoing_buffers.lock().unwrap().push(audio_buffer);
    }

    //Begins the client, spawning the receiver and sender threads
    pub fn begin(&mut self) {
        let audiodatacallback = |incoming_buffers: Arc<Mutex<Vec<AudioBuffer>>>, payload: Payload, socket: RawClient| {
            println!("Received packet from server");

            match payload {
                Payload::String(packet_str) => {
                    let packet: Packet = serde_json::from_slice(packet_str.as_ref()).unwrap();
                    incoming_buffers.lock().unwrap().push(packet.buffer);
                    println!("Received buffers: {}", incoming_buffers.lock().unwrap().len());
                },
                Payload::Binary(_) => {
                    println!("Received binary packet");
                }
            }
        };

        println!("Connecting to server at {}", self.server_address);
        self.on_info_log.as_ref().unwrap()("Connecting to server".to_string());
        let incoming_buffers = self.incoming_buffers.clone();
        self.socket = Some(Arc::new(Mutex::new(ClientBuilder::new(self.server_address.clone())
            .namespace("/")
            .on("audiobuffer", move |payload: Payload, socket: RawClient| {
                audiodatacallback(incoming_buffers.clone(), payload, socket);
            })
            .connect()
            .unwrap())));

        self.socket.clone().unwrap().lock().unwrap().emit("joinsession", json!(self.room_id)).unwrap();

        let arc_self = Arc::new(Mutex::new(self.clone()));
        {
            thread::spawn(move || {
                arc_self.lock().unwrap()._sender_thread();
            });
        }
    }

    fn _sender_thread(&self) {
        loop {
            match self.outgoing_buffers.lock() {
                Ok(mut outbuf) => {
                    if outbuf.len() == 0 {
                        continue;
                    }

                    //Otherwise send the first buffer in the outgoing buffers vec
                    println!("Sending buffer to client");
                    self.on_info_log.as_ref().unwrap()("Sending buffer to client".to_string());
                    let audio_buffer = outbuf.remove(0);
                    //let raw_buffer = audio_buffer.as_buffer().unwrap().raw_buffer_with_header(self.id, self.receiver_id);

                    let header = HeaderData {
                        sender_id: self.id,
                        room_id: self.room_id.clone(),
                    };

                    let metadata = Metadata {
                        sample_rate: audio_buffer.metadata.sample_rate,
                        buffer_size: audio_buffer.metadata.buffer_size,
                    };

                    let packet = Packet {
                        header,
                        metadata,
                        buffer: audio_buffer,
                    };

                    let buffer_payload = json!(packet);
                    self.socket.as_ref().unwrap().lock().unwrap().emit("audiobuffer", buffer_payload).unwrap();
                },
                Err(e) => panic!("Failed to lock buffer mutex: {}", e),
            }
        }
    }
}