pulsedeck 0.1.6

A cyber-synthwave internet radio player and smart tape recorder for your terminal
mod buffer;
mod metadata;
mod recording;
mod session;
mod stream_reader;
mod visualizer;

use rodio::{OutputStream, Sink};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};

use session::{connect_and_decode, ConnectionContext};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

/// Commands sent from the UI thread to the audio thread.
#[derive(Debug, Clone)]
pub enum AudioCommand {
    Play(String), // URL
    Pause,
    Resume,
    Stop,
    SetVolume(f32), // 0.0 — 1.0
    StartRecording {
        recording_dir: String,
        category: String,
        keep_snippets: bool,
        min_song_duration_secs: u32,
    },
    StopRecording,
}

/// Status updates sent from the audio thread back to the UI.
#[derive(Debug, Clone)]
pub enum AudioStatus {
    Playing,
    Paused,
    Stopped,
    Error(String),
    Connecting,
    TrackChanged { url: String, title: String },
    RecordingStateChanged { state: u8, filepath: Option<String> }, // 0 = Off, 1 = Pending, 2 = Active
    BufferLevel { percent: u8, seconds: u32 },
}

/// Shared thread-safe recording configuration state
pub struct RecordStateShared {
    pub state: AtomicU8, // 0 = Off, 1 = Pending, 2 = Active
    pub recording_dir: Mutex<String>,
    pub category: Mutex<String>,
    pub keep_snippets: AtomicBool,
    pub min_song_duration_secs: std::sync::atomic::AtomicU32,
}

/// Handle to communicate with the audio engine running on a background thread.
pub struct AudioEngine {
    cmd_tx: mpsc::Sender<AudioCommand>,
    pub status_rx: mpsc::Receiver<AudioStatus>,
    #[allow(dead_code)]
    pub sample_buffer: Arc<Mutex<VecDeque<f32>>>,
}

impl AudioEngine {
    /// Spawn the audio engine on a dedicated OS thread.
    pub fn spawn(sample_buffer: Arc<Mutex<VecDeque<f32>>>) -> Self {
        let (cmd_tx, cmd_rx) = mpsc::channel::<AudioCommand>();
        let (status_tx, status_rx) = mpsc::channel::<AudioStatus>();

        let sample_buffer_clone = sample_buffer.clone();
        std::thread::spawn(move || {
            audio_loop(cmd_rx, status_tx, sample_buffer_clone);
        });

        Self {
            cmd_tx,
            status_rx,
            sample_buffer,
        }
    }

    pub fn send(&self, cmd: AudioCommand) {
        let _ = self.cmd_tx.send(cmd);
    }
}

/// The main audio loop. Pure blocking I/O on a dedicated OS thread.
fn audio_loop(
    cmd_rx: mpsc::Receiver<AudioCommand>,
    status_tx: mpsc::Sender<AudioStatus>,
    sample_buffer: Arc<Mutex<VecDeque<f32>>>,
) {
    // Lazily opened on first playback. This keeps browsing/search usable on
    // systems without an immediately available output device.
    let mut output_stream: Option<OutputStream> = None;
    let mut output_handle: Option<rodio::OutputStreamHandle> = None;

    let mut current_sink: Option<Sink> = None;
    let mut connect_thread: Option<std::thread::JoinHandle<Result<Sink, String>>> = None;

    // Concurrency guard to abandon stale threads instantly
    let active_conn_id = Arc::new(AtomicU64::new(0));
    let mut current_conn_id: u64 = 0;

    // Shared thread-safe recording control state
    let record_state = Arc::new(RecordStateShared {
        state: AtomicU8::new(0), // Default: Off
        recording_dir: Mutex::new(String::new()),
        category: Mutex::new(String::new()),
        keep_snippets: AtomicBool::new(false),
        min_song_duration_secs: std::sync::atomic::AtomicU32::new(90),
    });

    // Premium non-blocking volume crossfade/ramping parameters
    let mut target_volume: f32 = 0.8;
    let mut current_fade_volume: Option<f32> = None;
    let mut pending_action: Option<AudioCommand> = None;

    macro_rules! spawn_connection_for {
        ($url:expr) => {
            spawn_connection(
                $url,
                &mut SpawnConnectionState {
                    conn_id_ref: &mut current_conn_id,
                    active_ref: &active_conn_id,
                    connect_ref: &mut connect_thread,
                    output_stream: &mut output_stream,
                    output_handle: &mut output_handle,
                    status_tx: &status_tx,
                    record_state: &record_state,
                    sample_buffer: &sample_buffer,
                },
            );
        };
    }

    loop {
        // Non-blocking check for commands (10ms poll)
        match cmd_rx.recv_timeout(Duration::from_millis(10)) {
            Ok(cmd) => match cmd {
                AudioCommand::Play(url) => {
                    if current_sink.is_some() {
                        pending_action = Some(AudioCommand::Play(url));
                    } else {
                        spawn_connection_for!(url);
                    }
                }
                AudioCommand::Pause => {
                    if let Some(ref sink) = current_sink {
                        pending_action = None;
                        current_fade_volume = None;
                        sink.pause();
                        let _ = status_tx.send(AudioStatus::Paused);
                    }
                }
                AudioCommand::Resume => {
                    if let Some(ref sink) = current_sink {
                        pending_action = None;
                        sink.play();
                        let _ = status_tx.send(AudioStatus::Playing);
                        // Smooth fade-in
                        current_fade_volume = Some(0.0);
                    }
                }
                AudioCommand::Stop => {
                    if current_sink.is_some() {
                        pending_action = Some(AudioCommand::Stop);
                    } else {
                        active_conn_id.store(0, Ordering::SeqCst); // abandon in-flight
                        connect_thread = None;
                        let _ = status_tx.send(AudioStatus::Stopped);
                    }
                }
                AudioCommand::SetVolume(vol) => {
                    target_volume = vol;
                    if current_fade_volume.is_none() && pending_action.is_none() {
                        if let Some(ref sink) = current_sink {
                            sink.set_volume(vol);
                        }
                    }
                }
                AudioCommand::StartRecording {
                    recording_dir,
                    category,
                    keep_snippets,
                    min_song_duration_secs,
                } => {
                    *record_state.recording_dir.lock().unwrap() = recording_dir;
                    *record_state.category.lock().unwrap() = category;
                    record_state
                        .keep_snippets
                        .store(keep_snippets, Ordering::SeqCst);
                    record_state
                        .min_song_duration_secs
                        .store(min_song_duration_secs, Ordering::SeqCst);
                    record_state.state.store(1, Ordering::SeqCst); // Transition to Pending
                    let _ = status_tx.send(AudioStatus::RecordingStateChanged {
                        state: 1,
                        filepath: None,
                    });
                }
                AudioCommand::StopRecording => {
                    record_state.state.store(0, Ordering::SeqCst); // Transition to Off
                    let _ = status_tx.send(AudioStatus::RecordingStateChanged {
                        state: 0,
                        filepath: None,
                    });
                }
            },
            Err(mpsc::RecvTimeoutError::Timeout) => {}
            Err(mpsc::RecvTimeoutError::Disconnected) => {
                break;
            }
        }

        // Process pending action / non-blocking fade-out
        if pending_action.is_some() {
            if let Some(ref sink) = current_sink {
                let current_vol = sink.volume();
                if current_vol <= 0.05 {
                    // Fade out completed! Execute pending command
                    sink.set_volume(0.0);
                    let cmd = pending_action.take().unwrap();
                    match cmd {
                        AudioCommand::Play(url) => {
                            // Stop current sink before spawning new connection
                            if let Some(old_sink) = current_sink.take() {
                                old_sink.stop();
                            }
                            spawn_connection_for!(url);
                        }
                        AudioCommand::Stop => {
                            active_conn_id.store(0, Ordering::SeqCst); // abandon in-flight
                            connect_thread = None;
                            if let Some(old_sink) = current_sink.take() {
                                old_sink.stop();
                            }
                            let _ = status_tx.send(AudioStatus::Stopped);
                        }
                        _ => {}
                    }
                } else {
                    // Exponential step-down for beautiful natural dimming
                    let step = current_vol * 0.15; // smooth 15% dimming step
                    sink.set_volume((current_vol - step).max(0.0));
                }
            } else {
                // No active sink, just execute pending immediately
                let cmd = pending_action.take().unwrap();
                match cmd {
                    AudioCommand::Play(url) => {
                        spawn_connection_for!(url);
                    }
                    AudioCommand::Stop => {
                        active_conn_id.store(0, Ordering::SeqCst);
                        connect_thread = None;
                        let _ = status_tx.send(AudioStatus::Stopped);
                    }
                    _ => {}
                }
            }
        }

        // Process non-blocking fade-in
        if pending_action.is_none() && current_fade_volume.is_some() {
            if let Some(ref sink) = current_sink {
                let current_vol = sink.volume();
                if (current_vol - target_volume).abs() <= 0.03 {
                    sink.set_volume(target_volume);
                    current_fade_volume = None;
                } else {
                    // Exponential step-up towards target_volume for organic swell
                    let step = (target_volume - current_vol) * 0.15;
                    sink.set_volume(current_vol + step);
                }
            } else {
                current_fade_volume = None;
            }
        }

        // Check if a pending connection has completed
        if let Some(ref handle) = connect_thread {
            if handle.is_finished() {
                let finished = connect_thread.take().unwrap();
                match finished.join() {
                    Ok(Ok(sink)) => {
                        // Start playing at 0.0 volume, trigger exponential swell
                        sink.set_volume(0.0);
                        current_sink = Some(sink);
                        let _ = status_tx.send(AudioStatus::Playing);
                        current_fade_volume = Some(0.0);
                    }
                    Ok(Err(e)) => {
                        // Stale thread errors are ignored (they are "Abandoned" or cancelled)
                        if e != "Abandoned" {
                            let _ = status_tx.send(AudioStatus::Error(e));
                        }
                    }
                    Err(_) => {
                        let _ =
                            status_tx.send(AudioStatus::Error("Connection thread panicked".into()));
                    }
                }
            }
        }

        // Check if current playback ended
        if let Some(ref sink) = current_sink {
            if sink.empty() {
                current_sink = None;
                let _ = status_tx.send(AudioStatus::Stopped);
            }
        }
    }
}

fn ensure_output_handle(
    output_stream: &mut Option<OutputStream>,
    output_handle: &mut Option<rodio::OutputStreamHandle>,
    status_tx: &mpsc::Sender<AudioStatus>,
) -> Option<rodio::OutputStreamHandle> {
    if output_handle.is_none() {
        match OutputStream::try_default() {
            Ok((stream, handle)) => {
                *output_stream = Some(stream);
                *output_handle = Some(handle);
            }
            Err(err) => {
                let _ = status_tx.send(AudioStatus::Error(format!("Soundcard error: {err}")));
                return None;
            }
        }
    }

    output_handle.clone()
}

struct SpawnConnectionState<'a> {
    conn_id_ref: &'a mut u64,
    active_ref: &'a Arc<AtomicU64>,
    connect_ref: &'a mut Option<std::thread::JoinHandle<Result<Sink, String>>>,
    output_stream: &'a mut Option<OutputStream>,
    output_handle: &'a mut Option<rodio::OutputStreamHandle>,
    status_tx: &'a mpsc::Sender<AudioStatus>,
    record_state: &'a Arc<RecordStateShared>,
    sample_buffer: &'a Arc<Mutex<VecDeque<f32>>>,
}

fn spawn_connection(url: String, state: &mut SpawnConnectionState<'_>) {
    let Some(handle) =
        ensure_output_handle(state.output_stream, state.output_handle, state.status_tx)
    else {
        return;
    };

    *state.conn_id_ref += 1;
    state.active_ref.store(*state.conn_id_ref, Ordering::SeqCst);
    let _ = state.status_tx.send(AudioStatus::Connecting);

    let conn_id = *state.conn_id_ref;
    let context = ConnectionContext {
        status_tx: state.status_tx.clone(),
        conn_id,
        active_conn_id: state.active_ref.clone(),
        record_state: state.record_state.clone(),
        sample_buffer: state.sample_buffer.clone(),
    };

    drop(state.connect_ref.take());
    *state.connect_ref = Some(std::thread::spawn(move || {
        connect_and_decode(url, handle, context)
    }));
}