Skip to main content

mecomp_daemon/
persistence.rs

1//! This module contains the implementation of the daemon's queue persistence mechanism.
2//!
3//! On shutdown, the daemon will save the current audio state (`StateAudio`) to disk.
4//! This includes:
5//! - The current position in the queue (`usize`)
6//! - The queues repeat mode (`RepeatMode`)
7//! - The current seek position in the current song (`Duration`)
8//! - The songs in the queue
9//! - etc.
10//!
11//! On startup, the daemon will load that data from disk and use it to restore the queue by:
12//! 1. restoring the repeat mode
13//! 2. setting the volume and mute
14//! 3. loading the songs into the queue
15//! 4. pausing playback
16//! 5. skipping to the correct position in the queue
17//! 6. skipping to the correct position in the current song
18//!
19//! > we always restore the queue as paused, even if the last state was playing.
20//!
21//! Both of these tasks should be atomic, that it, if any step fails the process should be aborted.
22//! - on startup, this means just logging the error and not restoring the queue
23//! - on shutdown, this means logging the error and exiting as normal
24
25use anyhow::{Context, Result};
26use mecomp_core::audio::AudioKernelSender;
27use mecomp_core::audio::commands::{AudioCommand, QueueCommand, VolumeCommand};
28use mecomp_storage::db::schemas::song::SongBrief;
29use serde::{Deserialize, Serialize};
30use std::path::Path;
31use std::time::Duration;
32use std::{
33    fs::File,
34    io::{BufReader, BufWriter},
35};
36
37use mecomp_core::state::{RepeatMode, SeekType, StateAudio};
38
39#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
40pub struct QueueState {
41    pub repeat_mode: RepeatMode,
42    pub muted: bool,
43    pub volume: f32,
44    pub queue: Box<[SongBrief]>,
45    pub queue_position: Option<usize>,
46    pub seek_position: Option<Duration>,
47}
48
49impl From<StateAudio> for QueueState {
50    #[inline]
51    fn from(state: StateAudio) -> Self {
52        // I unpack `state` this way so that any change to the `StateAudio` struct
53        // will cause an error here so I know to update the `QueueState` struct
54        // as well
55        let StateAudio {
56            queue,
57            queue_position,
58            current_song: _,
59            repeat_mode,
60            runtime,
61            status: _,
62            muted,
63            volume,
64        } = state;
65
66        let seek_position = runtime.map(|r| r.seek_position);
67
68        Self {
69            repeat_mode,
70            muted,
71            volume,
72            queue,
73            queue_position,
74            seek_position,
75        }
76    }
77}
78
79impl QueueState {
80    /// Get the state from the audio kernel
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the status cannot be retrieved from the audio kernel
85    #[inline]
86    pub async fn retrieve(audio_kernel: &AudioKernelSender) -> Result<Self> {
87        let (tx, rx) = tokio::sync::oneshot::channel();
88        audio_kernel.send(AudioCommand::ReportStatus(tx));
89        Ok(rx.await?.into())
90    }
91
92    #[doc(hidden)]
93    #[inline]
94    pub fn retrieve_blocking(audio_kernel: &AudioKernelSender) -> Result<Self> {
95        let (tx, rx) = tokio::sync::oneshot::channel();
96        audio_kernel.send(AudioCommand::ReportStatus(tx));
97        Ok(rx.blocking_recv()?.into())
98    }
99
100    /// Restore the state into the audio kernel
101    #[inline]
102    pub fn restore_to(&self, audio_kernel: &AudioKernelSender) {
103        // restore the repeat mode
104        audio_kernel.send(AudioCommand::Queue(QueueCommand::SetRepeatMode(
105            self.repeat_mode,
106        )));
107        // volume state
108        let mute_command = if self.muted {
109            AudioCommand::Volume(VolumeCommand::Mute)
110        } else {
111            AudioCommand::Volume(VolumeCommand::Unmute)
112        };
113        audio_kernel.send(mute_command);
114        audio_kernel.send(AudioCommand::Volume(VolumeCommand::Set(self.volume)));
115        // load songs into queue
116        audio_kernel.send(AudioCommand::Queue(QueueCommand::AddToQueue(
117            self.queue.as_ref().into(),
118        )));
119        // pause playback
120        audio_kernel.send(AudioCommand::Pause);
121        // skip to correct song
122        if let Some(position) = self.queue_position {
123            audio_kernel.send(AudioCommand::Queue(QueueCommand::SetPosition(position)));
124        }
125        // seek to the correct position
126        if let Some(seek) = self.seek_position {
127            audio_kernel.send(AudioCommand::Seek(SeekType::Absolute, seek));
128        }
129    }
130
131    /// Save the state to a file
132    /// This will overwrite the file if it exists
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the file cannot be created or written to
137    /// or if the state cannot be serialized.
138    #[inline]
139    pub fn save_to_file(&self, path: &Path) -> Result<()> {
140        let writer = BufWriter::new(File::create(path).context(format!(
141            "Queue Persistence: Failed to create/open {}",
142            path.display()
143        ))?);
144
145        serde_json::to_writer_pretty(writer, self)
146            .context("Queue Persistence: Failed to serialize state")?;
147
148        Ok(())
149    }
150
151    /// Load the state from a file
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the file cannot be opened or read
156    /// or if the state cannot be deserialized.
157    #[inline]
158    pub fn load_from_file(path: &Path) -> Result<Self> {
159        let reader = BufReader::new(File::open(path).context(format!(
160            "Queue Persistence: Failed to read {}",
161            path.display()
162        ))?);
163
164        serde_json::from_reader(reader).context("Queue Persistence: Failed to deserialize state")
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::fs;
171    use std::sync::{Arc, mpsc};
172
173    use super::*;
174    use mecomp_core::udp::StateChange;
175    use mecomp_storage::db::schemas::song::Song;
176    use mecomp_storage::test_utils::{
177        IndexMode, SongCase, arb_song_case, arb_vec, arb_vec_and_index, create_song_metadata,
178        init_test_database,
179    };
180    use pretty_assertions::assert_eq;
181    use rstest::{fixture, rstest};
182    use tempfile::tempdir;
183
184    #[fixture]
185    fn kernel() -> (Arc<AudioKernelSender>, mpsc::Receiver<StateChange>) {
186        let (event_tx, event_rx) = mpsc::channel();
187        (AudioKernelSender::start(event_tx), event_rx)
188    }
189
190    #[rstest]
191    fn test_retrieve_plain(kernel: (Arc<AudioKernelSender>, mpsc::Receiver<StateChange>)) {
192        let (audio_kernel, _event_rx) = kernel;
193        let state = QueueState::retrieve_blocking(&audio_kernel).unwrap();
194        assert_eq!(state, StateAudio::default().into());
195    }
196
197    #[rstest]
198    #[case::one_song(arb_vec_and_index( &arb_song_case(), 1..=1, IndexMode::InBounds)())]
199    #[case::many_songs(arb_vec_and_index( &arb_song_case(), 2..=10, IndexMode::InBounds)())]
200    #[case::many_songs_guaranteed_nonzero_index((arb_vec( &arb_song_case(), 2..=10)(), 1))]
201    #[tokio::test]
202    async fn test_restore_retrieve_e2e(
203        kernel: (Arc<AudioKernelSender>, mpsc::Receiver<StateChange>),
204        #[case] (song_cases, index): (Vec<SongCase>, usize),
205        #[values(true, false)] is_muted: bool,
206        #[values(0.0, 1.0)] volume: f32,
207        #[values(RepeatMode::None, RepeatMode::All)] repeat_mode: RepeatMode,
208    ) {
209        let temp_dir = tempdir().unwrap();
210        // load songs into the database
211        let db = init_test_database().await.unwrap();
212        let mut songs = Vec::new();
213        for sc in song_cases {
214            let metadata = create_song_metadata(&temp_dir, sc).unwrap();
215            let song = Song::try_load_into_db(&db, metadata).await.unwrap();
216            songs.push(song.into());
217        }
218
219        // create the queue state
220        let expected_queue_state = QueueState {
221            repeat_mode,
222            volume,
223            muted: is_muted,
224            queue: songs.into_boxed_slice(),
225            queue_position: Some(index),
226            seek_position: Some(Duration::from_secs(5)),
227        };
228
229        // load the queue state into the audio kernel
230        let (audio_kernel, event_rx) = kernel;
231
232        expected_queue_state.restore_to(&audio_kernel);
233
234        // event breakdown:
235        // 1. repeat mode change (1 event)
236        // 2. volume state (2 events) (mute/unmute + set volume)
237        // 3. loading songs into queue (2 event) (queue change + song change + unpause)
238        // 4. pause playback (1 event) (pause)
239        // 5. skip to correct song (1 event) (song change)
240        // 6. seek to correct position (1 event) (seek)
241
242        let mut expected_number_of_events = 9;
243        if volume == 1.0 {
244            expected_number_of_events -= 1; // no volume change event
245        }
246        if index == 0 {
247            expected_number_of_events -= 1; // no 2nd song change event
248        }
249        let mut event_count = 0;
250        while event_count < expected_number_of_events {
251            match event_rx.recv_timeout(std::time::Duration::from_millis(500)) {
252                Ok(event) => {
253                    dbg!(event);
254                    event_count += 1;
255                }
256                Err(_) => break,
257            }
258        }
259
260        // ensure that no more than the expected number of events were received
261        assert_eq!(event_count, expected_number_of_events);
262        // ensure that no more events remain
263        assert!(event_rx.try_recv().is_err());
264
265        // retrieve the state from the audio kernel
266        let retrieved_state = QueueState::retrieve(&audio_kernel).await.unwrap();
267
268        // check that the retrieved state is the same as the original state
269        assert_eq!(retrieved_state, expected_queue_state);
270    }
271
272    #[rstest]
273    #[case::one_song(arb_vec_and_index( &arb_song_case(), 1..=1, IndexMode::InBounds)())]
274    #[case::many_songs(arb_vec_and_index( &arb_song_case(), 2..=10, IndexMode::InBounds)())]
275    #[case::many_songs_guaranteed_nonzero_index((arb_vec( &arb_song_case(), 2..=10)(), 1))]
276    #[tokio::test]
277    async fn test_save_load_e2e(
278        #[case] (song_cases, index): (Vec<SongCase>, usize),
279        #[values(true, false)] is_muted: bool,
280        #[values(0.0, 1.0)] volume: f32,
281        #[values(RepeatMode::None, RepeatMode::One)] repeat_mode: RepeatMode,
282    ) {
283        let temp_dir = tempdir().unwrap();
284        // load songs into the database
285        let db = init_test_database().await.unwrap();
286        let mut songs = Vec::new();
287        for sc in song_cases {
288            let metadata = create_song_metadata(&temp_dir, sc).unwrap();
289            let song = Song::try_load_into_db(&db, metadata).await.unwrap();
290            songs.push(song.into());
291        }
292
293        // create the queue state
294        let queue_state = QueueState {
295            repeat_mode,
296            volume,
297            muted: is_muted,
298            queue: songs.into_boxed_slice(),
299            queue_position: Some(index),
300            seek_position: Some(Duration::from_secs(10)),
301        };
302
303        // save the queue state to a file
304        let path = temp_dir.path().join("test_queue_state.json");
305        queue_state.save_to_file(&path).unwrap();
306
307        // load the queue state from the file
308        let loaded_queue_state = QueueState::load_from_file(&path).unwrap();
309
310        // check that the loaded state is the same as the original state
311        assert_eq!(loaded_queue_state, queue_state);
312
313        // clean up
314        fs::remove_file(path).unwrap();
315    }
316}