use std::{sync::{Arc, Mutex}, thread};
use crate::{common::audiobuffer::AudioBuffer};
use rust_socketio::{ClientBuilder, Payload, RawClient};
use rust_socketio::client::Client;
use serde_json::json;
use crate::common::headerdata::HeaderData;
use crate::common::metadata::Metadata;
use crate::common::packet::Packet;
#[derive(Clone)]
pub struct ResonatorClient {
server_address: String, outgoing_buffers: Arc<Mutex<Vec<AudioBuffer>>>, incoming_buffers: Arc<Mutex<Vec<AudioBuffer>>>, id: i32, room_id: String, socket: Option<Arc<Mutex<Client>>>,
pub on_info_log: Option<fn(String)>,
pub on_error_log: Option<fn(String)>,
pub on_warn_log: Option<fn(String)>,
}
impl ResonatorClient {
pub fn new(server_address: String, id: i32, room_id: String, external_buffer_input: Option<Arc<Mutex<Vec<AudioBuffer>>>>, external_buffer_output: Option<Arc<Mutex<Vec<AudioBuffer>>>>) -> ResonatorClient {
ResonatorClient {
server_address,
outgoing_buffers: external_buffer_input.unwrap_or(Arc::new(Mutex::new(Vec::new()))), incoming_buffers: external_buffer_output.unwrap_or(Arc::new(Mutex::new(Vec::new()))), id,
room_id,
socket: None,
on_info_log: Some(|msg| println!("{}", msg)),
on_error_log: Some(|msg| eprintln!("{}", msg)),
on_warn_log: Some(|msg| eprintln!("{}", msg)),
}
}
pub fn get_incoming_buffers(&self) -> Arc<Mutex<Vec<AudioBuffer>>> {
self.incoming_buffers.clone()
}
pub fn get_outgoing_buffers(&self) -> Arc<Mutex<Vec<AudioBuffer>>> {
self.outgoing_buffers.clone()
}
pub fn process_buffer(&self) -> Option<AudioBuffer> {
if self.incoming_buffers.lock().unwrap().len() > 0 {
return Some(self.incoming_buffers.lock().unwrap().remove(0));
}
None
}
pub fn push_buffer(&self, audio_buffer: AudioBuffer) {
self.outgoing_buffers.lock().unwrap().push(audio_buffer);
}
pub fn begin(&mut self) {
let audiodatacallback = |incoming_buffers: Arc<Mutex<Vec<AudioBuffer>>>, payload: Payload, socket: RawClient| {
println!("Received packet from server");
match payload {
Payload::String(packet_str) => {
let packet: Packet = serde_json::from_slice(packet_str.as_ref()).unwrap();
incoming_buffers.lock().unwrap().push(packet.buffer);
println!("Received buffers: {}", incoming_buffers.lock().unwrap().len());
},
Payload::Binary(_) => {
println!("Received binary packet");
}
}
};
println!("Connecting to server at {}", self.server_address);
self.on_info_log.as_ref().unwrap()("Connecting to server".to_string());
let incoming_buffers = self.incoming_buffers.clone();
self.socket = Some(Arc::new(Mutex::new(ClientBuilder::new(self.server_address.clone())
.namespace("/")
.on("audiobuffer", move |payload: Payload, socket: RawClient| {
audiodatacallback(incoming_buffers.clone(), payload, socket);
})
.connect()
.unwrap())));
self.socket.clone().unwrap().lock().unwrap().emit("joinsession", json!(self.room_id)).unwrap();
let arc_self = Arc::new(Mutex::new(self.clone()));
{
thread::spawn(move || {
arc_self.lock().unwrap()._sender_thread();
});
}
}
fn _sender_thread(&self) {
loop {
match self.outgoing_buffers.lock() {
Ok(mut outbuf) => {
if outbuf.len() == 0 {
continue;
}
println!("Sending buffer to client");
self.on_info_log.as_ref().unwrap()("Sending buffer to client".to_string());
let audio_buffer = outbuf.remove(0);
let header = HeaderData {
sender_id: self.id,
room_id: self.room_id.clone(),
};
let metadata = Metadata {
sample_rate: audio_buffer.metadata.sample_rate,
buffer_size: audio_buffer.metadata.buffer_size,
};
let packet = Packet {
header,
metadata,
buffer: audio_buffer,
};
let buffer_payload = json!(packet);
self.socket.as_ref().unwrap().lock().unwrap().emit("audiobuffer", buffer_payload).unwrap();
},
Err(e) => panic!("Failed to lock buffer mutex: {}", e),
}
}
}
}