monolib/
lib.rs

1//! A library implementing the lonely radio audio streaming protocol
2//!
3//! Example usage (play for 10 seconds):
4//! ```rust
5//! extern crate monolib;
6//! use std::thread::{sleep, spawn};
7//! use std::time::Duration;
8//! use monolib::lonelyradio_types::{Settings, Encoder};
9//!
10//! spawn(|| monolib::run("someserver:someport", Settings {encoder: Encoder::Flac, cover: -1}, "my_playlist"));
11//! while monolib::get_metadata().is_none() {}
12//! let seconds = md.length / md.sample_rate as u64 / 2;
13//! println!("Playing: {} - {} - {} ({}:{:02})", md.artist, md.album, md.title, seconds / 60, seconds % 60);
14//! sleep(Duration::from_secs(10));
15//! monolib::stop();
16//! ```
17
18/// Functions, providing C-like API
19pub mod c;
20
21pub use lonelyradio_types;
22
23use anyhow::{bail, Context};
24use decode::decode;
25use lonelyradio_types::{
26	Encoder, PlayMessage, Request, RequestResult, ServerCapabilities, Settings, TrackMetadata,
27};
28use rodio::buffer::SamplesBuffer;
29use rodio::{OutputStream, Sink};
30use std::io::Write;
31use std::net::TcpStream;
32use std::sync::atomic::AtomicU8;
33use std::sync::RwLock;
34use std::time::Instant;
35
36mod decode;
37
38const CACHE_SIZE_PCM: usize = 32;
39const CACHE_SIZE_COMPRESSED: usize = 4;
40
41const SUPPORTED_DECODERS: &[Encoder] = &[
42	Encoder::Pcm16,
43	Encoder::PcmFloat,
44	#[cfg(feature = "flac")]
45	Encoder::Flac,
46	#[cfg(feature = "alac")]
47	Encoder::Alac,
48	#[cfg(feature = "vorbis")]
49	Encoder::Vorbis,
50];
51
52static SINK: RwLock<Option<Sink>> = RwLock::new(None);
53static VOLUME: AtomicU8 = AtomicU8::new(255);
54static MD: RwLock<Option<TrackMetadata>> = RwLock::new(None);
55static STATE: RwLock<State> = RwLock::new(State::NotStarted);
56
57/// Player state
58#[derive(Clone, Copy, PartialEq)]
59#[repr(u8)]
60pub enum State {
61	NotStarted = 0,
62	Resetting = 1,
63	Playing = 2,
64	Paused = 3,
65}
66
67/// Play/pauses playback
68pub fn toggle() {
69	let mut state = crate::STATE.write().unwrap();
70	if *state == State::Playing {
71		*state = State::Paused;
72
73		let sink = SINK.read().unwrap();
74		if let Some(sink) = sink.as_ref() {
75			sink.pause()
76		}
77	} else if *state == State::Paused {
78		*state = State::Playing;
79
80		let sink = SINK.read().unwrap();
81		if let Some(sink) = sink.as_ref() {
82			sink.play()
83		}
84	}
85}
86
87/// Stops playback
88pub fn stop() {
89	let mut state = STATE.write().unwrap();
90	if *state == State::NotStarted {
91		return;
92	}
93	*state = State::Resetting;
94
95	let sink = SINK.read().unwrap();
96	if let Some(sink) = sink.as_ref() {
97		sink.pause();
98		sink.clear()
99	}
100	drop(sink);
101	drop(state);
102
103	// Blocking main thread
104	while *STATE.read().unwrap() == State::Resetting {
105		std::thread::sleep(std::time::Duration::from_secs_f32(0.1))
106	}
107}
108
109pub fn get_state() -> State {
110	*STATE.read().unwrap()
111}
112
113pub fn get_metadata() -> Option<TrackMetadata> {
114	MD.read().unwrap().clone()
115}
116
117fn _stop() {
118	let sink = SINK.read().unwrap();
119	if let Some(sink) = sink.as_ref() {
120		sink.clear();
121	}
122	let mut md = MD.write().unwrap();
123	if md.is_some() {
124		*md = None;
125	}
126
127	*STATE.write().unwrap() = State::NotStarted;
128}
129
130// Reset - true, not - false
131fn watching_sleep(dur: f32) -> bool {
132	let start = Instant::now();
133	while Instant::now() < start + std::time::Duration::from_secs_f32(dur) {
134		std::thread::sleep(std::time::Duration::from_secs_f32(0.01));
135		if *STATE.read().unwrap() == State::Resetting {
136			return true;
137		}
138	}
139	false
140}
141
142fn watching_sleep_until_end() -> bool {
143	while SINK.read().unwrap().as_ref().unwrap().len() != 0 {
144		std::thread::sleep(std::time::Duration::from_secs_f32(0.01));
145		if *STATE.read().unwrap() == State::Resetting {
146			return true;
147		}
148	}
149	false
150}
151
152pub fn get_volume() -> u8 {
153	VOLUME.load(std::sync::atomic::Ordering::Acquire)
154}
155
156pub fn set_volume(volume: u8) {
157	let sink = SINK.read().unwrap();
158	if let Some(sink) = sink.as_ref() {
159		sink.set_volume(get_volume() as f32 / 255.0)
160	}
161	VOLUME.store(volume, std::sync::atomic::Ordering::Relaxed)
162}
163
164/// Download track as samples
165pub fn get_track(
166	server: &str,
167	mut settings: Settings,
168	playlist: &str,
169) -> anyhow::Result<(TrackMetadata, Vec<f32>)> {
170	let mut connection = TcpStream::connect(server)?;
171	connection.write_all(lonelyradio_types::HELLO_MAGIC)?;
172	let capabilities: ServerCapabilities = rmp_serde::from_read(&mut connection)?;
173	if !capabilities.encoders.contains(&settings.encoder) {
174		settings.encoder = Encoder::Pcm16
175	}
176
177	let request = if playlist.is_empty() {
178		Request::Play(settings)
179	} else {
180		Request::PlayPlaylist(playlist.to_string(), settings)
181	};
182	connection.write_all(&rmp_serde::to_vec_named(&request).unwrap())?;
183
184	let response: RequestResult = rmp_serde::from_read(&connection)?;
185	if let RequestResult::Error(e) = response {
186		bail!("{e:?}")
187	}
188
189	let mut samples = vec![];
190	let mut md: Option<TrackMetadata> = None;
191
192	loop {
193		let recv_md: PlayMessage = rmp_serde::from_read(&mut connection)?;
194		match recv_md {
195			PlayMessage::T(tmd) => {
196				if md.is_some() {
197					break;
198				}
199				md = Some(tmd);
200			}
201			PlayMessage::F(fmd) => {
202				samples.extend(decode(&mut connection, md.as_ref().unwrap(), &fmd)?)
203			}
204		}
205	}
206
207	if let Some(md) = md {
208		Ok((md, samples))
209	} else {
210		bail!("No metadata")
211	}
212}
213
214pub fn list_playlists(server: &str) -> Option<Vec<String>> {
215	let mut connection = TcpStream::connect(server).ok()?;
216	connection.write_all(lonelyradio_types::HELLO_MAGIC).ok()?;
217	let _: ServerCapabilities = rmp_serde::from_read(&mut connection).ok()?;
218	connection.write_all(&rmp_serde::to_vec_named(&Request::ListPlaylist).ok()?).ok()?;
219	let res: RequestResult = rmp_serde::from_read(connection).ok()?;
220	match res {
221		RequestResult::Playlist(plist) => Some(plist.playlists),
222		_ => None,
223	}
224}
225
226/// Starts playing at "server:port"
227pub fn run(server: &str, settings: Settings, playlist: &str) {
228	let result = _run(server, settings, playlist);
229	if let Err(e) = result {
230		println!("{:?}", e);
231		*STATE.write().unwrap() = State::NotStarted;
232	}
233}
234
235pub(crate) fn _run(server: &str, mut settings: Settings, playlist: &str) -> anyhow::Result<()> {
236	if !SUPPORTED_DECODERS.contains(&settings.encoder) {
237		eprintln!(
238			"monolib was built without support for {:?}, falling back to Pcm16",
239			settings.encoder
240		);
241		settings.encoder = Encoder::Pcm16
242	}
243	let mut state = STATE.write().unwrap();
244	if *state == State::Playing || *state == State::Paused {
245		return Ok(());
246	}
247	*state = State::Playing;
248	drop(state);
249
250	let mut connection = TcpStream::connect(server).context("failed to connect to the server")?;
251	connection.write_all(lonelyradio_types::HELLO_MAGIC)?;
252	let capabilities: ServerCapabilities = rmp_serde::from_read(&mut connection)?;
253	if !capabilities.encoders.contains(&settings.encoder) {
254		settings.encoder = Encoder::Pcm16
255	}
256
257	let request = if playlist.is_empty() {
258		Request::Play(settings)
259	} else {
260		Request::PlayPlaylist(playlist.to_string(), settings)
261	};
262	connection.write_all(&rmp_serde::to_vec_named(&request).unwrap())?;
263
264	let response: RequestResult = rmp_serde::from_read(&connection).unwrap();
265	if let RequestResult::Error(e) = response {
266		bail!("{:?}", e)
267	}
268	let mut stream = connection;
269
270	let mut sink = SINK.write().unwrap();
271	let (_stream, stream_handle) =
272		OutputStream::try_default().context("failed to determine audio device")?;
273
274	// Can't reuse old sink for some reason
275	let audio_sink = Sink::try_new(&stream_handle).context("failed to create audio sink")?;
276	*sink = Some(audio_sink);
277	drop(sink);
278
279	let mut samples = Vec::with_capacity(8192);
280	loop {
281		let recv_md: PlayMessage =
282			rmp_serde::from_read(&mut stream).expect("Failed to parse message");
283		match recv_md {
284			PlayMessage::T(tmd) => {
285				// No metadata shift
286				if watching_sleep_until_end() {
287					_stop();
288					return Ok(());
289				}
290				let mut md = MD.write().unwrap();
291				*md = Some(tmd.clone());
292
293				drop(md);
294			}
295			PlayMessage::F(fmd) => {
296				while *STATE.read().unwrap() == State::Paused {
297					std::thread::sleep(std::time::Duration::from_secs_f32(0.25))
298				}
299				if *STATE.read().unwrap() == State::Resetting {
300					_stop();
301					return Ok(());
302				}
303
304				samples.extend(decode(&mut stream, &MD.read().unwrap().clone().unwrap(), &fmd)?);
305
306				// Synchronizing with sink
307				let sink = SINK.read().unwrap();
308				let _md = MD.read().unwrap();
309				let md = _md.as_ref().unwrap().clone();
310				drop(_md);
311				if let Some(sink) = sink.as_ref() {
312					while (sink.len() >= CACHE_SIZE_PCM
313						&& md.encoder == Encoder::Pcm16
314						&& md.encoder == Encoder::PcmFloat)
315						|| (sink.len() >= CACHE_SIZE_COMPRESSED
316							&& md.encoder != Encoder::Pcm16
317							&& md.encoder != Encoder::PcmFloat)
318					{
319						// Sleeping exactly one buffer and watching for reset signal
320						if watching_sleep(
321							if sink.len() > 2 {
322								sink.len() as f32 - 2.0
323							} else {
324								0.25
325							} * samples.len() as f32 / md.sample_rate as f32
326								/ 4.0,
327						) {
328							_stop();
329							return Ok(());
330						}
331					}
332					sink.append(SamplesBuffer::new(
333						md.channels,
334						md.sample_rate,
335						samples.as_slice(),
336					));
337					samples.clear();
338				}
339			}
340		}
341	}
342}