pub mod c;
pub use lonelyradio_types;
use anyhow::{bail, Context};
use decode::decode;
use lonelyradio_types::{
Encoder, PlayMessage, Request, RequestResult, ServerCapabilities, Settings, TrackMetadata,
};
use rodio::buffer::SamplesBuffer;
use rodio::{OutputStream, Sink};
use std::io::Write;
use std::net::TcpStream;
use std::sync::atomic::AtomicU8;
use std::sync::RwLock;
use std::time::Instant;
mod decode;
const CACHE_SIZE_PCM: usize = 32;
const CACHE_SIZE_COMPRESSED: usize = 4;
const SUPPORTED_DECODERS: &[Encoder] = &[
Encoder::Pcm16,
Encoder::PcmFloat,
#[cfg(feature = "flac")]
Encoder::Flac,
#[cfg(feature = "alac")]
Encoder::Alac,
#[cfg(feature = "vorbis")]
Encoder::Vorbis,
];
static SINK: RwLock<Option<Sink>> = RwLock::new(None);
static VOLUME: AtomicU8 = AtomicU8::new(255);
static MD: RwLock<Option<TrackMetadata>> = RwLock::new(None);
static STATE: RwLock<State> = RwLock::new(State::NotStarted);
#[derive(Clone, Copy, PartialEq)]
#[repr(u8)]
pub enum State {
NotStarted = 0,
Resetting = 1,
Playing = 2,
Paused = 3,
}
pub fn toggle() {
let mut state = crate::STATE.write().unwrap();
if *state == State::Playing {
*state = State::Paused;
let sink = SINK.read().unwrap();
if let Some(sink) = sink.as_ref() {
sink.pause()
}
} else if *state == State::Paused {
*state = State::Playing;
let sink = SINK.read().unwrap();
if let Some(sink) = sink.as_ref() {
sink.play()
}
}
}
pub fn stop() {
let mut state = STATE.write().unwrap();
if *state == State::NotStarted {
return;
}
*state = State::Resetting;
let sink = SINK.read().unwrap();
if let Some(sink) = sink.as_ref() {
sink.pause();
sink.clear()
}
drop(sink);
drop(state);
while *STATE.read().unwrap() == State::Resetting {
std::thread::sleep(std::time::Duration::from_secs_f32(0.1))
}
}
pub fn get_state() -> State {
*STATE.read().unwrap()
}
pub fn get_metadata() -> Option<TrackMetadata> {
MD.read().unwrap().clone()
}
fn _stop() {
let sink = SINK.read().unwrap();
if let Some(sink) = sink.as_ref() {
sink.clear();
}
let mut md = MD.write().unwrap();
if md.is_some() {
*md = None;
}
*STATE.write().unwrap() = State::NotStarted;
}
fn watching_sleep(dur: f32) -> bool {
let start = Instant::now();
while Instant::now() < start + std::time::Duration::from_secs_f32(dur) {
std::thread::sleep(std::time::Duration::from_secs_f32(0.01));
if *STATE.read().unwrap() == State::Resetting {
return true;
}
}
false
}
fn watching_sleep_until_end() -> bool {
while SINK.read().unwrap().as_ref().unwrap().len() != 0 {
std::thread::sleep(std::time::Duration::from_secs_f32(0.01));
if *STATE.read().unwrap() == State::Resetting {
return true;
}
}
false
}
pub fn get_volume() -> u8 {
VOLUME.load(std::sync::atomic::Ordering::Acquire)
}
pub fn set_volume(volume: u8) {
let sink = SINK.read().unwrap();
if let Some(sink) = sink.as_ref() {
sink.set_volume(get_volume() as f32 / 255.0)
}
VOLUME.store(volume, std::sync::atomic::Ordering::Relaxed)
}
pub fn get_track(
server: &str,
mut settings: Settings,
playlist: &str,
) -> anyhow::Result<(TrackMetadata, Vec<f32>)> {
let mut connection = TcpStream::connect(server)?;
connection.write_all(lonelyradio_types::HELLO_MAGIC)?;
let capabilities: ServerCapabilities = rmp_serde::from_read(&mut connection)?;
if !capabilities.encoders.contains(&settings.encoder) {
settings.encoder = Encoder::Pcm16
}
let request = if playlist.is_empty() {
Request::Play(settings)
} else {
Request::PlayPlaylist(playlist.to_string(), settings)
};
connection.write_all(&rmp_serde::to_vec_named(&request).unwrap())?;
let response: RequestResult = rmp_serde::from_read(&connection)?;
if let RequestResult::Error(e) = response {
bail!("{e:?}")
}
let mut samples = vec![];
let mut md: Option<TrackMetadata> = None;
loop {
let recv_md: PlayMessage = rmp_serde::from_read(&mut connection)?;
match recv_md {
PlayMessage::T(tmd) => {
if md.is_some() {
break;
}
md = Some(tmd);
}
PlayMessage::F(fmd) => {
samples.extend(decode(&mut connection, md.as_ref().unwrap(), &fmd)?)
}
}
}
if let Some(md) = md {
Ok((md, samples))
} else {
bail!("No metadata")
}
}
pub fn list_playlists(server: &str) -> Option<Vec<String>> {
let mut connection = TcpStream::connect(server).ok()?;
connection.write_all(lonelyradio_types::HELLO_MAGIC).ok()?;
let _: ServerCapabilities = rmp_serde::from_read(&mut connection).ok()?;
connection.write_all(&rmp_serde::to_vec_named(&Request::ListPlaylist).ok()?).ok()?;
let res: RequestResult = rmp_serde::from_read(connection).ok()?;
match res {
RequestResult::Playlist(plist) => Some(plist.playlists),
_ => None,
}
}
pub fn run(server: &str, settings: Settings, playlist: &str) {
let result = _run(server, settings, playlist);
if let Err(e) = result {
println!("{:?}", e);
*STATE.write().unwrap() = State::NotStarted;
}
}
pub(crate) fn _run(server: &str, mut settings: Settings, playlist: &str) -> anyhow::Result<()> {
if !SUPPORTED_DECODERS.contains(&settings.encoder) {
eprintln!(
"monolib was built without support for {:?}, falling back to Pcm16",
settings.encoder
);
settings.encoder = Encoder::Pcm16
}
let mut state = STATE.write().unwrap();
if *state == State::Playing || *state == State::Paused {
return Ok(());
}
*state = State::Playing;
drop(state);
let mut connection = TcpStream::connect(server).context("failed to connect to the server")?;
connection.write_all(lonelyradio_types::HELLO_MAGIC)?;
let capabilities: ServerCapabilities = rmp_serde::from_read(&mut connection)?;
if !capabilities.encoders.contains(&settings.encoder) {
settings.encoder = Encoder::Pcm16
}
let request = if playlist.is_empty() {
Request::Play(settings)
} else {
Request::PlayPlaylist(playlist.to_string(), settings)
};
connection.write_all(&rmp_serde::to_vec_named(&request).unwrap())?;
let response: RequestResult = rmp_serde::from_read(&connection).unwrap();
if let RequestResult::Error(e) = response {
bail!("{:?}", e)
}
let mut stream = connection;
let mut sink = SINK.write().unwrap();
let (_stream, stream_handle) =
OutputStream::try_default().context("failed to determine audio device")?;
let audio_sink = Sink::try_new(&stream_handle).context("failed to create audio sink")?;
*sink = Some(audio_sink);
drop(sink);
let mut samples = Vec::with_capacity(8192);
loop {
let recv_md: PlayMessage =
rmp_serde::from_read(&mut stream).expect("Failed to parse message");
match recv_md {
PlayMessage::T(tmd) => {
if watching_sleep_until_end() {
_stop();
return Ok(());
}
let mut md = MD.write().unwrap();
*md = Some(tmd.clone());
drop(md);
}
PlayMessage::F(fmd) => {
while *STATE.read().unwrap() == State::Paused {
std::thread::sleep(std::time::Duration::from_secs_f32(0.25))
}
if *STATE.read().unwrap() == State::Resetting {
_stop();
return Ok(());
}
samples.extend(decode(&mut stream, &MD.read().unwrap().clone().unwrap(), &fmd)?);
let sink = SINK.read().unwrap();
let _md = MD.read().unwrap();
let md = _md.as_ref().unwrap().clone();
drop(_md);
if let Some(sink) = sink.as_ref() {
while (sink.len() >= CACHE_SIZE_PCM
&& md.encoder == Encoder::Pcm16
&& md.encoder == Encoder::PcmFloat)
|| (sink.len() >= CACHE_SIZE_COMPRESSED
&& md.encoder != Encoder::Pcm16
&& md.encoder != Encoder::PcmFloat)
{
if watching_sleep(
if sink.len() > 2 {
sink.len() as f32 - 2.0
} else {
0.25
} * samples.len() as f32 / md.sample_rate as f32
/ 4.0,
) {
_stop();
return Ok(());
}
}
sink.append(SamplesBuffer::new(
md.channels,
md.sample_rate,
samples.as_slice(),
));
samples.clear();
}
}
}
}
}