use std::{
io,
sync::{
Arc,
atomic::{AtomicBool, AtomicU32, Ordering},
},
thread::{self, JoinHandle},
};
use lunar_lib::log::error;
use crate::{
ConnectionType, HandshakeType,
client::{ConnectError, ServerHandle},
common::{
AudioHandshakeRequest, AudioHandshakeResponse, HandshakeError, IPC_PROTOCOL_VERSION,
SessionTarget, SessionToken, Stream,
},
cpal_thread::CpalHandle,
ipc_common::{AudioPacket, Packetable},
};
trait AudioStreamExt: Stream {
fn handshake(
&mut self,
handshake: AudioHandshakeRequest,
) -> io::Result<Result<AudioHandshakeResponse, HandshakeError>> {
self.write_all(&HandshakeType::AUDIO)?;
let mut version = [0u8; 4];
self.read_exact(&mut version)?;
if version != IPC_PROTOCOL_VERSION {
return Ok(Err(HandshakeError::WrongVersion {
expected: u32::from_be_bytes(IPC_PROTOCOL_VERSION),
connected: u32::from_be_bytes(version),
}));
}
handshake.serialize_into_writer(self)?;
let handshake_response = self.read_data()?;
Packetable::deserialize_packet(&handshake_response)
}
}
impl<T: Stream + ?Sized> AudioStreamExt for T {}
pub struct AudioClient {
shutdown_sig: Arc<AtomicBool>,
thread_handle: Option<JoinHandle<()>>,
volume: Arc<AtomicU32>,
}
pub struct AudioConnectionBuilder {
connection_type: ConnectionType,
session_target: SessionTarget,
}
impl AudioConnectionBuilder {
#[must_use]
pub fn new(connection_type: ConnectionType, session_target: SessionTarget) -> Self {
Self {
connection_type,
session_target,
}
}
pub fn connect(self) -> Result<AudioClient, ConnectError> {
let mut stream = self.connection_type.connect()?;
let AudioHandshakeResponse { track_info } = stream.handshake(AudioHandshakeRequest {
session_target: self.session_target,
})??;
let shutdown_sig = Arc::new(AtomicBool::new(false));
let mut cpal_handle = CpalHandle::open()?;
if let Some(track_info) = track_info {
cpal_handle.set_track_info(track_info)?;
}
let volume = cpal_handle.volume();
let handle_sig = shutdown_sig.clone();
let handle = thread::spawn(move || {
let thread = AudioClientThread {
stream,
cpal_handle,
shutdown_sig: handle_sig,
};
if let Err(err) = thread.run() {
error!("Client returned with error: {err}");
}
});
Ok(AudioClient {
shutdown_sig,
thread_handle: Some(handle),
volume,
})
}
}
impl Default for AudioConnectionBuilder {
fn default() -> Self {
Self {
connection_type: ConnectionType::Local("Selene".to_string()),
session_target: SessionTarget::Join {
token: SessionToken::LOCAL_SESSION,
},
}
}
}
impl AudioClient {
#[must_use]
pub fn new_connection(
connection_type: ConnectionType,
session_target: SessionTarget,
) -> AudioConnectionBuilder {
AudioConnectionBuilder {
connection_type,
session_target,
}
}
#[must_use]
pub fn default_connection() -> AudioConnectionBuilder {
AudioConnectionBuilder::default()
}
pub fn set_volume(&self, volume: f32) {
let volume = volume.clamp(0.0, 1.0).to_bits();
self.volume.store(volume, Ordering::Relaxed);
}
#[must_use]
pub fn get_volume(&self) -> f32 {
f32::from_bits(self.volume.load(Ordering::Relaxed))
}
pub fn disconnect(mut self) {
self.shutdown_sig.store(true, Ordering::Relaxed);
if let Err(err) = self
.thread_handle
.take()
.expect("thread_handle should never be none")
.join()
{
error!("Failed to join AudioClient handle: {err:?}");
}
}
}
impl Drop for AudioClient {
fn drop(&mut self) {
self.shutdown_sig.store(true, Ordering::Relaxed);
}
}
pub struct AudioClientThread {
stream: ServerHandle,
cpal_handle: CpalHandle,
shutdown_sig: Arc<AtomicBool>,
}
impl AudioClientThread {
fn run(mut self) -> anyhow::Result<()> {
while self.shutdown_sig.load(Ordering::Relaxed) {
let data = self.stream.read_data()?;
let packet = AudioPacket::deserialize_packet(&data)?;
match packet {
AudioPacket::TrackInfo(track_info) => {
self.cpal_handle.set_track_info(track_info)?;
}
AudioPacket::Audio(items) => self.cpal_handle.input_audio_packet(&items)?,
AudioPacket::Disconnect => {
self.shutdown_sig.store(true, Ordering::Relaxed);
break;
}
}
}
Ok(())
}
}