selene-daemon 0.9.0-alpha.2

Official music player daemon for Selene
Documentation
use std::{
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    thread,
    time::{Duration, Instant, SystemTime},
};

use crossbeam::channel::{self, Receiver, Sender, TryRecvError};
use lunar_lib::{
    log::error,
    runners::{DispatchError, Executor, Runner},
};
use selene_core::symphonia_helpers::raw_decoder::{Decoder, SymphoniaError};

use crate::{
    SHUTDOWN,
    ipc_common::{AudioPacket, TrackInfo},
    session::{DecoderMessageExt, SessionEventExt, SessionEventThread, SessionMainThread},
    should_shutdown, wait,
};

pub(crate) struct DecoderHandle {
    pub runner: Runner<DecoderInner>,
    pub audio_buffer: Receiver<AudioPacket>,
}

pub(crate) struct DecoderInner {
    exec: Executor<Self>,
    session_runner: Runner<SessionMainThread>,
    event_runner: Runner<SessionEventThread>,

    decoder: Option<Decoder>,
    start_time: Option<(u64, Instant)>,
    sent_scrobble: bool,

    packet_buffer: Sender<AudioPacket>,

    is_playing: Arc<AtomicBool>,
    looping: Arc<AtomicBool>,
}

impl DecoderHandle {
    pub(crate) fn open(
        playback_state: Arc<AtomicBool>,
        looping: Arc<AtomicBool>,
        session_runner: Runner<SessionMainThread>,
        event_runner: Runner<SessionEventThread>,
    ) -> Self {
        let (buf_prod, buf_cons) = channel::bounded(2);
        let (runner, exec) = lunar_lib::runners::channel();

        let inner = DecoderInner {
            exec,
            session_runner,
            event_runner,

            decoder: None,
            start_time: None,
            sent_scrobble: false,

            packet_buffer: buf_prod,

            is_playing: playback_state,
            looping,
        };

        thread::spawn(move || {
            if let Err(err) = inner.run() {
                error!("DecoderHandle failed with error: {err}");
            }
            SHUTDOWN.store(true, Ordering::Relaxed);
        });

        Self {
            runner,
            audio_buffer: buf_cons,
        }
    }
}

impl DecoderInner {
    fn run(mut self) -> anyhow::Result<()> {
        while !should_shutdown() {
            loop {
                match self.exec.try_recv() {
                    Ok(command) => command(&mut self),
                    Err(TryRecvError::Empty) => break,
                    Err(TryRecvError::Disconnected) => return Ok(()),
                }
            }

            let decoder = if let Some(ref mut decoder) = self.decoder
                && self.is_playing.load(Ordering::Relaxed)
            {
                decoder
            } else {
                match self.exec.recv() {
                    Ok(command) => command(&mut self),
                    Err(_) => return Ok(()),
                }
                continue;
            };

            if self.packet_buffer.is_full() {
                wait();
                continue;
            }

            if let Some(packet) = decoder.decode_next_packet()? {
                let mut frames = vec![0.0; packet.samples_interleaved()];
                packet.copy_to_slice_interleaved(&mut frames);

                let (start_unix, start_instant) = *self.start_time.get_or_insert_with(|| {
                    let time = SystemTime::now()
                        .duration_since(SystemTime::UNIX_EPOCH)
                        .expect("Time went backwards")
                        .as_secs();
                    let instant = Instant::now();

                    self.packet_buffer
                        .send(AudioPacket::TrackInfo(TrackInfo::new(
                            decoder.stream.codec_params.sample_rate,
                            decoder.stream.codec_params.channels as u16,
                            None,
                            None,
                        )))
                        .expect("Session disconnected at the worst time");

                    self.event_runner
                        .now_playing()
                        .expect("Session disconnected at the worst time");

                    (time, instant)
                });

                let decoded_time = decoder.decoded_time();
                if !self.sent_scrobble
                    && (decoded_time >= (decoder.stream.duration() / 2.0) || decoded_time >= 240.0)
                {
                    self.sent_scrobble = true;
                    self.event_runner.scrobble(start_unix)?;
                }

                self.packet_buffer.try_send(AudioPacket::Audio(frames))?;

                let expected = decoder.current_frame as f64
                    / f64::from(decoder.stream.codec_params.sample_rate);
                let elapsed = start_instant.elapsed().as_secs_f64();

                if expected > elapsed {
                    thread::sleep(Duration::from_secs_f64(expected - elapsed));
                }
            } else {
                if self.looping.load(Ordering::Relaxed) {
                    decoder.seek(0.0, false)?;
                    self.start_time = None;
                    self.sent_scrobble = false;
                    decoder.decoded_frames = 0;
                    continue;
                }

                if let Some((preload, source)) = self.session_runner.request_preload()? {
                    *decoder = preload;
                    self.start_time = None;
                    self.sent_scrobble = false;
                    self.event_runner.currently_playing_changed(source)?;
                } else {
                    while !self.packet_buffer.is_empty() {
                        wait();
                        continue;
                    }

                    self.is_playing.store(false, Ordering::SeqCst);
                    self.event_runner.playback_stopped()?;
                }
            }
        }

        Ok(())
    }
}

lunar_lib::make_runner_command_ext!(DecoderInner => pub(crate) trait DecoderCommandExt {
    fn load(&mut self, decoder: Decoder) -> () {
        self.decoder = Some(decoder);
        self.start_time = None;
        self.sent_scrobble = false;
    }

    fn seek(&mut self, time: f64, increment: bool) -> Result<Option<f64>, SymphoniaError> {
        if let Some(ref mut decoder) = self.decoder {
            let time = decoder.seek(time, increment)?;

            self.start_time = self.start_time.map(|(unix, _)| {
                let new_instant = Instant::now()
                    .checked_sub(Duration::from_secs_f64(
                        decoder.current_frame as f64
                            / f64::from(decoder.stream.codec_params.sample_rate),
                    ))
                    .unwrap();
                (unix, new_instant)
            });

            Ok(Some(time))
        } else {
            Ok(None)
        }
    }

    fn get_time(&mut self) -> Option<f64> {
        self.decoder.as_ref().map(Decoder::time)
    }

    fn stop(&mut self) -> Result<(), DispatchError> {
        self.decoder = None;
        self.is_playing.store(false, Ordering::SeqCst);
        self.event_runner.playback_stopped()?;
        Ok(())
    }

    fn set_playing(&mut self, is_playing: bool) -> Result<bool, DispatchError> {
        if let Some(ref decoder) = self.decoder {
            self.is_playing.store(is_playing, Ordering::Relaxed);
            self.event_runner.playback_is_playing_changed(is_playing, decoder.time())?;
            Ok(is_playing)
        } else {
            Ok(false)
        }
    }

    fn toggle_playing(&mut self) -> Result<bool, DispatchError> {
        if let Some(ref decoder) = self.decoder {
            let state = !self.is_playing.fetch_not(Ordering::Relaxed);
            self.event_runner.playback_is_playing_changed(state, decoder.time())?;
            Ok(state)
        } else {
            self.event_runner.playback_is_playing_changed(false, 0.0)?;
            Ok(false)
        }
    }
});