use rodio::{Decoder, OutputStream, Sink};
use std::io::{Read, Write};
use std::sync::mpsc;
use std::sync::atomic::{AtomicU64, AtomicU8, AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Condvar};
use std::collections::VecDeque;
use std::time::Duration;
use std::fs::File;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub enum AudioCommand {
Play(String), Pause,
Resume,
Stop,
SetVolume(f32), StartRecording {
recording_dir: String,
category: String,
keep_snippets: bool,
min_song_duration_secs: u32,
},
StopRecording,
}
#[derive(Debug, Clone)]
pub enum AudioStatus {
Playing,
Paused,
Stopped,
Error(String),
Connecting,
TrackChanged { url: String, title: String },
RecordingStateChanged { state: u8, filepath: Option<String> }, BufferLevel { percent: u8, seconds: u32 },
}
pub struct RecordStateShared {
pub state: AtomicU8, pub recording_dir: Mutex<String>,
pub category: Mutex<String>,
pub keep_snippets: AtomicBool,
pub min_song_duration_secs: std::sync::atomic::AtomicU32,
}
struct BufferQueue {
queue: Mutex<VecDeque<u8>>,
cv_read: Condvar,
cv_write: Condvar,
capacity: usize,
disconnected: AtomicBool,
}
impl BufferQueue {
fn new(capacity: usize) -> Self {
Self {
queue: Mutex::new(VecDeque::with_capacity(capacity)),
cv_read: Condvar::new(),
cv_write: Condvar::new(),
capacity,
disconnected: AtomicBool::new(false),
}
}
fn push(&self, bytes: &[u8]) {
let mut queue = self.queue.lock().unwrap();
while queue.len() + bytes.len() > self.capacity && !self.disconnected.load(Ordering::SeqCst) {
queue = self.cv_write.wait(queue).unwrap();
}
if self.disconnected.load(Ordering::SeqCst) {
return;
}
queue.extend(bytes);
self.cv_read.notify_all();
}
fn pop(&self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() && !self.disconnected.load(Ordering::SeqCst) {
queue = self.cv_read.wait(queue).unwrap();
}
if queue.is_empty() && self.disconnected.load(Ordering::SeqCst) {
return Ok(0); }
let count = std::cmp::min(buf.len(), queue.len());
for slot in buf.iter_mut().take(count) {
*slot = queue.pop_front().unwrap();
}
self.cv_write.notify_all();
Ok(count)
}
fn len(&self) -> usize {
self.queue.lock().unwrap().len()
}
fn set_disconnected(&self, disc: bool) {
self.disconnected.store(disc, Ordering::SeqCst);
let _queue = self.queue.lock().unwrap();
self.cv_read.notify_all();
self.cv_write.notify_all();
}
}
pub struct AudioEngine {
cmd_tx: mpsc::Sender<AudioCommand>,
pub status_rx: mpsc::Receiver<AudioStatus>,
}
impl AudioEngine {
pub fn spawn() -> Self {
let (cmd_tx, cmd_rx) = mpsc::channel::<AudioCommand>();
let (status_tx, status_rx) = mpsc::channel::<AudioStatus>();
std::thread::spawn(move || {
audio_loop(cmd_rx, status_tx);
});
Self { cmd_tx, status_rx }
}
pub fn send(&self, cmd: AudioCommand) {
let _ = self.cmd_tx.send(cmd);
}
}
fn audio_loop(
cmd_rx: mpsc::Receiver<AudioCommand>,
status_tx: mpsc::Sender<AudioStatus>,
) {
let (_stream, handle) = match OutputStream::try_default() {
Ok(s) => s,
Err(e) => {
let _ = status_tx.send(AudioStatus::Error(format!("Soundcard error: {}", e)));
return;
}
};
let mut current_sink: Option<Sink> = None;
let mut connect_thread: Option<std::thread::JoinHandle<Result<Sink, String>>> = None;
let active_conn_id = Arc::new(AtomicU64::new(0));
let mut current_conn_id: u64 = 0;
let record_state = Arc::new(RecordStateShared {
state: AtomicU8::new(0), recording_dir: Mutex::new(String::new()),
category: Mutex::new(String::new()),
keep_snippets: AtomicBool::new(false),
min_song_duration_secs: std::sync::atomic::AtomicU32::new(90),
});
let mut target_volume: f32 = 0.8;
let mut current_fade_volume: Option<f32> = None;
let mut pending_action: Option<AudioCommand> = None;
loop {
match cmd_rx.recv_timeout(Duration::from_millis(10)) {
Ok(cmd) => {
match cmd {
AudioCommand::Play(url) => {
if current_sink.is_some() {
pending_action = Some(AudioCommand::Play(url));
} else {
drop(connect_thread.take());
current_conn_id += 1;
active_conn_id.store(current_conn_id, Ordering::SeqCst);
let _ = status_tx.send(AudioStatus::Connecting);
let handle_clone = handle.clone();
let status_tx_clone = status_tx.clone();
let conn_id = current_conn_id;
let active_conn_id_clone = active_conn_id.clone();
let record_state_clone = record_state.clone();
connect_thread = Some(std::thread::spawn(move || {
connect_and_decode(&url, &handle_clone, status_tx_clone, conn_id, active_conn_id_clone, record_state_clone)
}));
}
}
AudioCommand::Pause => {
if current_sink.is_some() {
pending_action = Some(AudioCommand::Pause);
} else {
let _ = status_tx.send(AudioStatus::Paused);
}
}
AudioCommand::Resume => {
if let Some(ref sink) = current_sink {
sink.play();
let _ = status_tx.send(AudioStatus::Playing);
current_fade_volume = Some(0.0);
}
}
AudioCommand::Stop => {
if current_sink.is_some() {
pending_action = Some(AudioCommand::Stop);
} else {
active_conn_id.store(0, Ordering::SeqCst); connect_thread = None;
let _ = status_tx.send(AudioStatus::Stopped);
}
}
AudioCommand::SetVolume(vol) => {
target_volume = vol;
if current_fade_volume.is_none() && pending_action.is_none() {
if let Some(ref sink) = current_sink {
sink.set_volume(vol);
}
}
}
AudioCommand::StartRecording { recording_dir, category, keep_snippets, min_song_duration_secs } => {
*record_state.recording_dir.lock().unwrap() = recording_dir;
*record_state.category.lock().unwrap() = category;
record_state.keep_snippets.store(keep_snippets, Ordering::SeqCst);
record_state.min_song_duration_secs.store(min_song_duration_secs, Ordering::SeqCst);
record_state.state.store(1, Ordering::SeqCst); let _ = status_tx.send(AudioStatus::RecordingStateChanged { state: 1, filepath: None });
}
AudioCommand::StopRecording => {
record_state.state.store(0, Ordering::SeqCst); let _ = status_tx.send(AudioStatus::RecordingStateChanged { state: 0, filepath: None });
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
if pending_action.is_some() {
if let Some(ref sink) = current_sink {
let current_vol = sink.volume();
if current_vol <= 0.05 {
sink.set_volume(0.0);
let cmd = pending_action.take().unwrap();
match cmd {
AudioCommand::Play(url) => {
current_conn_id += 1;
active_conn_id.store(current_conn_id, Ordering::SeqCst);
let _ = status_tx.send(AudioStatus::Connecting);
let handle_clone = handle.clone();
let status_tx_clone = status_tx.clone();
let conn_id = current_conn_id;
let active_conn_id_clone = active_conn_id.clone();
let record_state_clone = record_state.clone();
if let Some(old_sink) = current_sink.take() {
old_sink.stop();
}
drop(connect_thread.take());
connect_thread = Some(std::thread::spawn(move || {
connect_and_decode(&url, &handle_clone, status_tx_clone, conn_id, active_conn_id_clone, record_state_clone)
}));
}
AudioCommand::Stop => {
active_conn_id.store(0, Ordering::SeqCst); connect_thread = None;
if let Some(old_sink) = current_sink.take() {
old_sink.stop();
}
let _ = status_tx.send(AudioStatus::Stopped);
}
AudioCommand::Pause => {
sink.pause();
let _ = status_tx.send(AudioStatus::Paused);
}
_ => {}
}
} else {
let step = current_vol * 0.15; sink.set_volume((current_vol - step).max(0.0));
}
} else {
let cmd = pending_action.take().unwrap();
match cmd {
AudioCommand::Play(url) => {
current_conn_id += 1;
active_conn_id.store(current_conn_id, Ordering::SeqCst);
let _ = status_tx.send(AudioStatus::Connecting);
let handle_clone = handle.clone();
let status_tx_clone = status_tx.clone();
let conn_id = current_conn_id;
let active_conn_id_clone = active_conn_id.clone();
let record_state_clone = record_state.clone();
drop(connect_thread.take());
connect_thread = Some(std::thread::spawn(move || {
connect_and_decode(&url, &handle_clone, status_tx_clone, conn_id, active_conn_id_clone, record_state_clone)
}));
}
AudioCommand::Stop => {
active_conn_id.store(0, Ordering::SeqCst);
connect_thread = None;
let _ = status_tx.send(AudioStatus::Stopped);
}
AudioCommand::Pause => {
let _ = status_tx.send(AudioStatus::Paused);
}
_ => {}
}
}
}
if pending_action.is_none() && current_fade_volume.is_some() {
if let Some(ref sink) = current_sink {
let current_vol = sink.volume();
if (current_vol - target_volume).abs() <= 0.03 {
sink.set_volume(target_volume);
current_fade_volume = None;
} else {
let step = (target_volume - current_vol) * 0.15;
sink.set_volume(current_vol + step);
}
} else {
current_fade_volume = None;
}
}
if let Some(ref handle) = connect_thread {
if handle.is_finished() {
let finished = connect_thread.take().unwrap();
match finished.join() {
Ok(Ok(sink)) => {
sink.set_volume(0.0);
current_sink = Some(sink);
let _ = status_tx.send(AudioStatus::Playing);
current_fade_volume = Some(0.0);
}
Ok(Err(e)) => {
if e != "Abandoned" {
let _ = status_tx.send(AudioStatus::Error(e));
}
}
Err(_) => {
let _ = status_tx.send(AudioStatus::Error(
"Connection thread panicked".into(),
));
}
}
}
}
if let Some(ref sink) = current_sink {
if sink.empty() {
current_sink = None;
let _ = status_tx.send(AudioStatus::Stopped);
}
}
}
}
fn connect_and_decode(
url: &str,
handle: &rodio::OutputStreamHandle,
status_tx: mpsc::Sender<AudioStatus>,
conn_id: u64,
active_conn_id: Arc<AtomicU64>,
record_state: Arc<RecordStateShared>,
) -> Result<Sink, String> {
let mut retries = 0;
let max_retries = 5;
let mut backoff = Duration::from_secs(1);
loop {
if active_conn_id.load(Ordering::SeqCst) != conn_id {
return Err("Abandoned".into());
}
match try_connect_and_decode_once(url, handle, status_tx.clone(), conn_id, active_conn_id.clone(), record_state.clone()) {
Ok(sink) => return Ok(sink),
Err(e) => {
if e == "Abandoned" {
return Err("Abandoned".into());
}
retries += 1;
if retries >= max_retries {
return Err(format!("Failed after {} retries: {}", max_retries, e));
}
let _ = status_tx.send(AudioStatus::Connecting);
let sleep_step = Duration::from_millis(100);
let steps = (backoff.as_millis() / sleep_step.as_millis()) as usize;
for _ in 0..steps {
if active_conn_id.load(Ordering::SeqCst) != conn_id {
return Err("Abandoned".into());
}
std::thread::sleep(sleep_step);
}
backoff = (backoff * 2).min(Duration::from_secs(8));
}
}
}
}
fn try_connect_and_decode_once(
url: &str,
handle: &rodio::OutputStreamHandle,
status_tx: mpsc::Sender<AudioStatus>,
conn_id: u64,
active_conn_id: Arc<AtomicU64>,
record_state: Arc<RecordStateShared>,
) -> Result<Sink, String> {
let client = reqwest::blocking::Client::builder()
.timeout(None)
.connect_timeout(Duration::from_secs(5))
.user_agent("DriftFM/0.1.0")
.build()
.map_err(|e| format!("HTTP client error: {}", e))?;
if active_conn_id.load(Ordering::SeqCst) != conn_id {
return Err("Abandoned".into());
}
let response = client
.get(url)
.header("Icy-MetaData", "1")
.send()
.map_err(|e| format!("Connection failed: {}", e))?;
if !response.status().is_success() {
return Err(format!("HTTP {}", response.status()));
}
if active_conn_id.load(Ordering::SeqCst) != conn_id {
return Err("Abandoned".into());
}
let buffer_capacity = 1024 * 1024; let queue = Arc::new(BufferQueue::new(buffer_capacity));
let queue_clone = queue.clone();
let active_conn_id_clone = active_conn_id.clone();
let conn_id_clone = conn_id;
let status_tx_clone = status_tx.clone();
let mut response_reader = response;
std::thread::spawn(move || {
let mut buf = [0u8; 8192];
loop {
if active_conn_id_clone.load(Ordering::SeqCst) != conn_id_clone {
queue_clone.set_disconnected(true);
break;
}
match response_reader.read(&mut buf) {
Ok(0) => {
queue_clone.set_disconnected(true);
break;
}
Ok(n) => {
queue_clone.push(&buf[..n]);
let len = queue_clone.len();
let cap = queue_clone.capacity;
let percent = ((len * 100) / cap) as u8;
let seconds = (len / 16000) as u32;
let _ = status_tx_clone.send(AudioStatus::BufferLevel { percent, seconds });
}
Err(_) => {
queue_clone.set_disconnected(true);
break;
}
}
}
});
let reader = StreamReader::new(url.to_string(), queue, status_tx, conn_id, active_conn_id, record_state);
let source = Decoder::new(reader)
.map_err(|e| format!("Decode error: {}", e))?;
let sink = Sink::try_new(handle)
.map_err(|e| format!("Sink error: {}", e))?;
sink.append(source);
Ok(sink)
}
struct StreamReader {
url: String,
queue: Arc<BufferQueue>,
pos: u64,
metaint: Option<usize>,
bytes_until_meta: usize,
status_tx: mpsc::Sender<AudioStatus>,
conn_id: u64,
active_conn_id: Arc<AtomicU64>,
record_state: Arc<RecordStateShared>,
active_writer: Option<File>,
active_file_path: Option<String>,
active_track_title: Option<String>,
active_track_start_time: Option<std::time::Instant>,
}
impl StreamReader {
fn new(
url: String,
queue: Arc<BufferQueue>,
status_tx: mpsc::Sender<AudioStatus>,
conn_id: u64,
active_conn_id: Arc<AtomicU64>,
record_state: Arc<RecordStateShared>,
) -> Self {
Self {
url,
queue,
pos: 0,
metaint: Some(16000), bytes_until_meta: 16000,
status_tx,
conn_id,
active_conn_id,
record_state,
active_writer: None,
active_file_path: None,
active_track_title: None,
active_track_start_time: None,
}
}
fn read_metadata_block(&mut self) -> std::io::Result<()> {
let mut length_byte = [0u8; 1];
self.queue.pop(&mut length_byte)?;
let length = length_byte[0] as usize * 16;
if length > 0 {
let mut meta_buf = vec![0u8; length];
self.queue.pop(&mut meta_buf)?;
if let Ok(meta_str) = String::from_utf8(meta_buf) {
if let Some(title) = parse_stream_title(&meta_str) {
let _ = self.status_tx.send(AudioStatus::TrackChanged {
url: self.url.clone(),
title: title.clone(),
});
self.handle_track_change(&title);
}
}
}
Ok(())
}
fn handle_track_change(&mut self, new_title: &str) {
let record = self.record_state.clone();
let mut state = record.state.load(Ordering::SeqCst);
if state == 1 {
record.state.store(2, Ordering::SeqCst);
state = 2;
}
if state == 2 {
self.close_recording_file();
self.open_recording_file(new_title);
}
}
fn open_recording_file(&mut self, title: &str) {
let recording_dir = self.record_state.recording_dir.lock().unwrap().clone();
let category = self.record_state.category.lock().unwrap().clone();
if recording_dir.is_empty() {
return;
}
let clean_category = sanitize_filename(&category);
let clean_title = sanitize_filename(title);
let mut path = PathBuf::from(&recording_dir);
path.push(&clean_category);
if let Err(e) = std::fs::create_dir_all(&path) {
let _ = self.status_tx.send(AudioStatus::Error(format!("Failed to create folders: {}", e)));
return;
}
let ext = if self.url.contains("aac") || self.url.contains("m4a") {
"aac"
} else {
"mp3"
};
path.push(format!("{}.{}", clean_title, ext));
match File::create(&path) {
Ok(file) => {
self.active_writer = Some(file);
self.active_file_path = Some(path.to_string_lossy().to_string());
self.active_track_title = Some(title.to_string());
self.active_track_start_time = Some(std::time::Instant::now());
let _ = self.status_tx.send(AudioStatus::RecordingStateChanged {
state: 2,
filepath: Some(path.to_string_lossy().to_string()),
});
}
Err(e) => {
let _ = self.status_tx.send(AudioStatus::Error(format!("Failed to create segment file: {}", e)));
}
}
}
fn close_recording_file(&mut self) {
if let Some(mut file) = self.active_writer.take() {
let _ = file.flush();
drop(file);
let duration = self.active_track_start_time.take()
.map(|t| t.elapsed())
.unwrap_or(std::time::Duration::ZERO);
let keep_snippets = self.record_state.keep_snippets.load(Ordering::SeqCst);
let min_secs = self.record_state.min_song_duration_secs.load(Ordering::SeqCst);
let mut is_ad_or_speech = false;
if let Some(ref title) = self.active_track_title {
let t_upper = title.to_uppercase();
if t_upper.contains("ADVERT")
|| t_upper.contains("COMMERCIAL")
|| t_upper.contains("STATION ID")
|| t_upper.contains("DJ SPEECH")
|| t_upper.contains("NEWS CAST")
|| t_upper.contains("NEWS UPDATE")
|| t_upper.contains("WEATHER REPORT")
|| t_upper.contains("TRAFFIC REPORT")
|| t_upper.trim().is_empty()
{
is_ad_or_speech = true;
}
} else {
is_ad_or_speech = true;
}
let is_short = duration.as_secs() < min_secs as u64;
if let Some(ref filepath) = self.active_file_path {
if (is_short || is_ad_or_speech) && !keep_snippets {
if let Err(e) = std::fs::remove_file(filepath) {
let _ = self.status_tx.send(AudioStatus::Error(format!("Failed to delete partial file: {}", e)));
} else {
let title = self.active_track_title.as_deref().unwrap_or("Unknown Track");
let reason = if is_ad_or_speech { "Speech/Ad Filter" } else { "Short Snippet" };
let _ = self.status_tx.send(AudioStatus::Error(format!(
"🗑️ Discarded {} - {} ({:.1}s)",
reason, title, duration.as_secs_f32()
)));
}
} else {
if filepath.ends_with(".mp3") {
if let Some(ref title) = self.active_track_title {
let _ = inject_id3_tags(filepath, title);
}
}
}
}
}
self.active_file_path = None;
self.active_track_title = None;
self.active_track_start_time = None;
}
}
impl std::io::Read for StreamReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.active_conn_id.load(Ordering::SeqCst) != self.conn_id {
self.close_recording_file();
return Err(std::io::Error::other("Abandoned"));
}
let global_state = self.record_state.state.load(Ordering::SeqCst);
if global_state == 0 && self.active_writer.is_some() {
self.close_recording_file();
}
let Some(metaint) = self.metaint else {
let n = self.queue.pop(buf)?;
self.pos += n as u64;
if global_state == 2 {
if let Some(ref mut file) = self.active_writer {
let _ = file.write_all(&buf[..n]);
}
}
return Ok(n);
};
if self.bytes_until_meta == 0 {
self.read_metadata_block()?;
self.bytes_until_meta = metaint;
}
let max_to_read = buf.len().min(self.bytes_until_meta);
let n = self.queue.pop(&mut buf[..max_to_read])?;
self.pos += n as u64;
self.bytes_until_meta -= n;
if global_state == 2 {
if let Some(ref mut file) = self.active_writer {
let _ = file.write_all(&buf[..n]);
}
}
Ok(n)
}
}
impl std::io::Seek for StreamReader {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
match pos {
std::io::SeekFrom::Current(0) | std::io::SeekFrom::Start(_) => Ok(self.pos),
std::io::SeekFrom::Current(offset) if offset > 0 => {
let mut remaining = offset as u64;
let mut discard = [0u8; 8192];
while remaining > 0 {
let to_read = remaining.min(discard.len() as u64) as usize;
let n = self.read(&mut discard[..to_read])?;
if n == 0 {
break;
}
remaining -= n as u64;
}
Ok(self.pos)
}
_ => Ok(self.pos),
}
}
}
fn inject_id3_tags(filepath: &str, title: &str) -> Result<(), Box<dyn std::error::Error>> {
use id3::{Tag, TagLike, Version};
let mut artist = "Unknown Artist".to_string();
let mut track_title = title.to_string();
if let Some(pos) = title.find(" - ") {
artist = title[..pos].trim().to_string();
track_title = title[pos + 3..].trim().to_string();
}
let mut tag = Tag::new();
tag.set_artist(artist);
tag.set_title(track_title);
tag.set_album("DriftFM live capturing");
tag.write_to_path(filepath, Version::Id3v24)?;
Ok(())
}
fn sanitize_filename(name: &str) -> String {
name.chars()
.map(|c| match c {
'\\' | '/' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '-',
other => other,
})
.collect()
}
fn parse_stream_title(meta: &str) -> Option<String> {
let key = "StreamTitle='";
if let Some(start_idx) = meta.find(key) {
let value_start = start_idx + key.len();
if let Some(end_idx) = meta[value_start..].find("';") {
let title = &meta[value_start..value_start + end_idx];
return Some(title.trim().to_string());
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_filename() {
assert_eq!(sanitize_filename("normal_file.mp3"), "normal_file.mp3");
assert_eq!(sanitize_filename("artist/song?.mp3"), "artist-song-.mp3");
assert_eq!(sanitize_filename("windows\\invalid:name*char\".mp3"), "windows-invalid-name-char-.mp3");
assert_eq!(sanitize_filename("<tag> | pipe.mp3"), "-tag- - pipe.mp3");
}
#[test]
fn test_parse_stream_title() {
assert_eq!(
parse_stream_title("StreamTitle='Lazerhawk - King of The Streets';StreamUrl='';"),
Some("Lazerhawk - King of The Streets".to_string())
);
assert_eq!(
parse_stream_title("StreamTitle=' Kavinsky - Nightcall ';StreamUrl='';"),
Some("Kavinsky - Nightcall".to_string())
);
assert_eq!(
parse_stream_title("StreamUrl='';"),
None
);
assert_eq!(
parse_stream_title("StreamTitle='';"),
Some("".to_string())
);
}
}