Skip to main content

musefs_db/
tracks.rs

1use crate::models::{Format, NewTrack, Track, TrackBounds};
2use crate::{Db, ReadWrite, Result};
3use rusqlite::{Row, params};
4
5/// Build a `SELECT <track columns> FROM tracks <tail>` as a compile-time string
6/// literal, so every track read shares one column list (kept in lockstep with
7/// `row_to_track`) and can be served via `prepare_cached` — no per-call `format!`
8/// allocation and no SQL recompilation on the `getattr`/`read` hot path.
9macro_rules! track_select {
10    ($tail:literal) => {
11        concat!(
12            "SELECT id, backing_path, format, audio_offset, audio_length, \
13             backing_size, backing_mtime_ns, backing_ctime_ns, content_version, updated_at \
14             FROM tracks ",
15            $tail
16        )
17    };
18}
19
20/// Parse a `format` column value, mapping an unknown name to the rusqlite
21/// conversion error every row-mapper needs (single source — three readers).
22fn parse_format_col(fmt: &str) -> rusqlite::Result<Format> {
23    fmt.parse::<Format>().ok().ok_or_else(|| {
24        rusqlite::Error::FromSqlConversionFailure(
25            usize::MAX,
26            rusqlite::types::Type::Text,
27            format!("unknown format {fmt}").into(),
28        )
29    })
30}
31
32fn row_to_track(r: &Row) -> rusqlite::Result<Track> {
33    let fmt: String = r.get("format")?;
34    let format = parse_format_col(&fmt)?;
35    let audio_offset: u64 = r.get("audio_offset")?;
36    let audio_length: u64 = r.get("audio_length")?;
37    let backing_size: u64 = r.get("backing_size")?;
38    let bounds = TrackBounds::new(audio_offset, audio_length, backing_size).map_err(|e| {
39        rusqlite::Error::FromSqlConversionFailure(
40            usize::MAX,
41            rusqlite::types::Type::Integer,
42            e.to_string().into(),
43        )
44    })?;
45    Ok(Track {
46        id: r.get("id")?,
47        backing_path: r.get("backing_path")?,
48        format,
49        bounds,
50        backing_size,
51        backing_mtime_ns: r.get("backing_mtime_ns")?,
52        backing_ctime_ns: r.get("backing_ctime_ns")?,
53        content_version: r.get("content_version")?,
54        updated_at: r.get("updated_at")?,
55    })
56}
57
58/// One read of the changelog ring past `last_seq`: the distinct changed track
59/// ids (ascending) plus the table's retained seq bounds (0/0 when empty). The
60/// caller derives gap detection from `min_seq` (see musefs-core's refresh).
61#[derive(Debug, Default, PartialEq, Eq)]
62pub struct ChangelogRead {
63    pub changed_ids: Vec<i64>,
64    pub min_seq: i64,
65    pub max_seq: i64,
66}
67
68impl<M> Db<M> {
69    pub fn get_track(&self, id: i64) -> Result<Option<Track>> {
70        self.query_optional_track(track_select!("WHERE id = ?1"), params![id])
71    }
72
73    pub fn get_track_by_path(&self, path: &str) -> Result<Option<Track>> {
74        self.query_optional_track(track_select!("WHERE backing_path = ?1"), params![path])
75    }
76
77    pub fn list_tracks(&self) -> Result<Vec<Track>> {
78        let mut stmt = self.conn.prepare_cached(track_select!("ORDER BY id"))?;
79        let rows = stmt.query_map([], row_to_track)?;
80        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
81    }
82
83    pub fn track_content_version(&self, id: i64) -> Result<i64> {
84        Ok(self.conn.query_row(
85            "SELECT content_version FROM tracks WHERE id = ?1",
86            params![id],
87            |r| r.get(0),
88        )?)
89    }
90
91    /// Begin a deferred (read) transaction: subsequent reads on this connection see
92    /// a single consistent snapshot until `end_read`. Used to make a binary-tag
93    /// read's content_version check and its blob reads mutually consistent.
94    pub fn begin_read(&self) -> Result<()> {
95        self.conn.execute_batch("BEGIN DEFERRED")?;
96        Ok(())
97    }
98
99    /// End the read transaction opened by `begin_read` (rollback — it is read-only).
100    pub fn end_read(&self) -> Result<()> {
101        self.conn.execute_batch("ROLLBACK")?;
102        Ok(())
103    }
104
105    fn query_optional_track(&self, sql: &str, p: impl rusqlite::Params) -> Result<Option<Track>> {
106        let mut stmt = self.conn.prepare_cached(sql)?;
107        let mut rows = stmt.query(p)?;
108        match rows.next()? {
109            Some(r) => Ok(Some(row_to_track(r)?)),
110            None => Ok(None),
111        }
112    }
113
114    /// Cheap render-key identity scan for incremental refresh: `(id, content_version,
115    /// format)` for every track, ordered by id. No tags, no path columns — just the
116    /// two track-level inputs that determine a rendered path. See SP2 Component 1.
117    pub fn list_render_keys(&self) -> Result<Vec<(i64, i64, Format)>> {
118        let mut stmt = self
119            .conn
120            .prepare("SELECT id, content_version, format FROM tracks ORDER BY id")?;
121        let rows = stmt.query_map([], |r| {
122            let fmt: String = r.get(2)?;
123            Ok((
124                r.get::<_, i64>(0)?,
125                r.get::<_, i64>(1)?,
126                parse_format_col(&fmt)?,
127            ))
128        })?;
129        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
130    }
131
132    /// One read of the changelog ring past `last_seq`: the distinct changed track
133    /// ids (ascending) plus the table's retained seq bounds (0/0 when empty). The
134    /// caller derives gap detection from `min_seq` (see musefs-core's refresh).
135    pub fn changelog_since(&self, last_seq: i64) -> Result<ChangelogRead> {
136        // One deferred read transaction pins a single WAL snapshot for both
137        // queries: under separate implicit snapshots a concurrent write burst
138        // (with track_changes_prune trimming the old end) could pair fresh ids
139        // with stale bounds — masking a prune gap while advancing the watermark.
140        let tx = self.conn.unchecked_transaction()?;
141        let (min_seq, max_seq): (i64, i64) = tx.query_row(
142            "SELECT COALESCE(MIN(seq),0), COALESCE(MAX(seq),0) FROM track_changes",
143            [],
144            |r| Ok((r.get(0)?, r.get(1)?)),
145        )?;
146        let changed_ids = {
147            let mut stmt = tx.prepare(
148                "SELECT DISTINCT track_id FROM track_changes WHERE seq > ?1 ORDER BY track_id",
149            )?;
150            stmt.query_map([last_seq], |r| r.get(0))?
151                .collect::<rusqlite::Result<Vec<i64>>>()?
152        };
153        tx.commit()?;
154        Ok(ChangelogRead {
155            changed_ids,
156            min_seq,
157            max_seq,
158        })
159    }
160
161    /// Render keys for a specific id set (the changelog ids); ids no longer in
162    /// `tracks` are simply absent from the result. Chunked like `tags_for_tracks`.
163    pub fn render_keys_for(&self, ids: &[i64]) -> Result<Vec<(i64, i64, Format)>> {
164        const CHUNK: usize = 900;
165        let mut out = Vec::with_capacity(ids.len());
166        for chunk in ids.chunks(CHUNK) {
167            let placeholders = vec!["?"; chunk.len()].join(",");
168            let sql = format!(
169                "SELECT id, content_version, format FROM tracks \
170                 WHERE id IN ({placeholders}) ORDER BY id"
171            );
172            let mut stmt = self.conn.prepare(&sql)?;
173            let params = rusqlite::params_from_iter(chunk.iter());
174            let rows = stmt.query_map(params, |r| {
175                let fmt: String = r.get(2)?;
176                Ok((
177                    r.get::<_, i64>(0)?,
178                    r.get::<_, i64>(1)?,
179                    parse_format_col(&fmt)?,
180                ))
181            })?;
182            out.extend(rows.collect::<rusqlite::Result<Vec<_>>>()?);
183        }
184        Ok(out)
185    }
186}
187
188impl Db<ReadWrite> {
189    pub fn upsert_track(&self, t: &NewTrack) -> Result<i64> {
190        self.conn.execute(
191            "INSERT INTO tracks
192                (backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
193             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
194             ON CONFLICT(backing_path) DO UPDATE SET
195                format        = excluded.format,
196                audio_offset  = excluded.audio_offset,
197                audio_length  = excluded.audio_length,
198                backing_size  = excluded.backing_size,
199                backing_mtime_ns = excluded.backing_mtime_ns,
200                backing_ctime_ns = excluded.backing_ctime_ns,
201                updated_at    = CAST(strftime('%s','now') AS INTEGER)",
202            params![
203                t.backing_path,
204                t.format.as_str(),
205                t.audio_offset,
206                t.audio_length,
207                t.backing_size,
208                t.backing_mtime_ns,
209                t.backing_ctime_ns,
210            ],
211        )?;
212        let id = self.conn.query_row(
213            "SELECT id FROM tracks WHERE backing_path = ?1",
214            params![t.backing_path],
215            |r| r.get(0),
216        )?;
217        Ok(id)
218    }
219
220    /// Delete a track row. Foreign keys cascade to its `tags` and `track_art`
221    /// rows; the referenced `art` rows are left for `gc_orphan_art`.
222    pub fn delete_track(&self, id: i64) -> Result<()> {
223        self.conn
224            .execute("DELETE FROM tracks WHERE id = ?1", params![id])?;
225        Ok(())
226    }
227
228    /// Test-only: force a track's format column directly (no rescan), bumping
229    /// data_version. The only way to exercise a format-only change — production
230    /// never mutates format without a rescan. As of V5 this also bumps
231    /// content_version (the `tracks_geometry_au` format guard); it is no longer a
232    /// content_version-neutral edit.
233    #[doc(hidden)]
234    pub fn set_format_for_test(&self, id: i64, fmt: Format) -> Result<()> {
235        self.conn.execute(
236            "UPDATE tracks SET format = ?1, updated_at = CAST(strftime('%s','now') AS INTEGER) WHERE id = ?2",
237            params![fmt.as_str(), id],
238        )?;
239        Ok(())
240    }
241
242    /// Test-only: delete changelog rows up to and including `seq`, simulating the
243    /// ring having pruned past a sleeping mount (gap-path coverage). Follows the
244    /// `set_format_for_test` precedent.
245    #[doc(hidden)]
246    pub fn delete_changelog_through_for_test(&self, seq: i64) -> Result<()> {
247        self.conn
248            .execute("DELETE FROM track_changes WHERE seq <= ?1", [seq])?;
249        Ok(())
250    }
251}
252
253#[cfg(test)]
254mod negative_audio_bounds_tests {
255    use crate::{Db, Format, NewTrack};
256
257    #[test]
258    fn negative_audio_bounds_error_at_row_read() {
259        let db = Db::open_in_memory().unwrap();
260        let id = db
261            .upsert_track(&NewTrack {
262                backing_path: "/x.flac".into(),
263                format: Format::Flac,
264                audio_offset: 0,
265                audio_length: 1,
266                backing_size: 1,
267                backing_mtime_ns: 0,
268                backing_ctime_ns: 0,
269            })
270            .unwrap();
271        // Simulate a malformed external write to a contract column. The V4
272        // `audio_offset >= 0` CHECK would reject this on a normal connection, so
273        // bypass CHECK enforcement to plant the bad row — the row-reader defensive
274        // path (not the CHECK) is what this test pins.
275        db.conn
276            .pragma_update(None, "ignore_check_constraints", true)
277            .unwrap();
278        db.conn
279            .execute("UPDATE tracks SET audio_offset = -1 WHERE id = ?1", [id])
280            .unwrap();
281        db.conn
282            .pragma_update(None, "ignore_check_constraints", false)
283            .unwrap();
284        assert!(
285            db.get_track(id).is_err(),
286            "negative audio_offset must fail row-read, not wrap"
287        );
288    }
289
290    #[test]
291    fn out_of_range_bounds_error_at_row_read() {
292        let db = Db::open_in_memory().unwrap();
293        let id = db
294            .upsert_track(&NewTrack {
295                backing_path: "/x.flac".into(),
296                format: Format::Flac,
297                audio_offset: 0,
298                audio_length: 1,
299                backing_size: 1,
300                backing_mtime_ns: 0,
301                backing_ctime_ns: 0,
302            })
303            .unwrap();
304        // Plant offset+length > backing_size past the V4 CHECK (layer 1) so we can
305        // prove TrackBounds (layer 2) rejects it at row read.
306        db.conn
307            .pragma_update(None, "ignore_check_constraints", true)
308            .unwrap();
309        db.conn
310            .execute("UPDATE tracks SET audio_length = 5 WHERE id = ?1", [id])
311            .unwrap();
312        db.conn
313            .pragma_update(None, "ignore_check_constraints", false)
314            .unwrap();
315        assert!(
316            db.get_track(id).is_err(),
317            "audio_offset + audio_length > backing_size must fail row-read"
318        );
319    }
320}
321
322#[cfg(test)]
323mod render_key_tests {
324    use super::*;
325    use crate::{Format, NewTrack, Tag};
326
327    fn open_mem() -> Db {
328        Db::open_in_memory().unwrap()
329    }
330
331    fn new_track(path: &str, fmt: Format) -> NewTrack {
332        NewTrack {
333            backing_path: path.to_string(),
334            format: fmt,
335            audio_offset: 0,
336            audio_length: 1,
337            backing_size: 1,
338            backing_mtime_ns: 0,
339            backing_ctime_ns: 0,
340        }
341    }
342
343    #[test]
344    fn list_render_keys_returns_id_version_format_sorted_by_id() {
345        let db = open_mem();
346        let a = db
347            .upsert_track(&new_track("/a.flac", Format::Flac))
348            .unwrap();
349        let b = db.upsert_track(&new_track("/b.mp3", Format::Mp3)).unwrap();
350        // Bump a's content_version via a tag write (trigger).
351        db.replace_tags(a, &[Tag::new("TITLE", "x", 0)]).unwrap();
352
353        let keys = db.list_render_keys().unwrap();
354        assert_eq!(keys.len(), 2);
355        assert_eq!(keys[0].0, a);
356        assert_eq!(keys[1].0, b);
357        assert!(keys[0].1 >= 1, "a content_version should have risen");
358        assert_eq!(keys[1].1, 0, "b content_version untouched");
359        assert_eq!(keys[0].2, Format::Flac);
360        assert_eq!(keys[1].2, Format::Mp3);
361    }
362
363    #[test]
364    fn set_format_for_test_persists_the_new_format() {
365        let db = open_mem();
366        let id = db
367            .upsert_track(&new_track("/a.flac", Format::Flac))
368            .unwrap();
369        db.set_format_for_test(id, Format::Mp3).unwrap();
370        let keys = db.list_render_keys().unwrap();
371        assert_eq!(keys[0].0, id);
372        assert_eq!(
373            keys[0].2,
374            Format::Mp3,
375            "set_format_for_test must actually UPDATE the format column"
376        );
377    }
378
379    /// `begin_read`/`end_read` bracket a single WAL read snapshot on a connection,
380    /// so a write by another connection that bumps `content_version` (or reuses a
381    /// freed binary-tag rowid) is invisible until the snapshot ends. The
382    /// `read` fast path's BinaryTag guard depends on this consistency: it pins the
383    /// version + the blob reads to one snapshot so a reused rowid can't be served.
384    #[test]
385    fn begin_read_pins_a_single_wal_snapshot_against_external_writes() {
386        let dir = tempfile::tempdir().unwrap();
387        let path = dir.path().join("m.db");
388        let writer = Db::open(&path).unwrap();
389        let id = writer
390            .upsert_track(&new_track("/a.mp3", Format::Mp3))
391            .unwrap();
392        assert_eq!(writer.track_content_version(id).unwrap(), 0);
393
394        // The reader opens a second connection; the two share the WAL.
395        let reader = Db::open(&path).unwrap();
396        assert_eq!(reader.track_content_version(id).unwrap(), 0);
397
398        reader.begin_read().unwrap();
399        // Within the snapshot: the version is 0.
400        assert_eq!(reader.track_content_version(id).unwrap(), 0);
401
402        // An external write bumps the version. The reader's snapshot must NOT see it.
403        writer
404            .replace_tags(id, &[Tag::new("artist", "Alice", 0)])
405            .unwrap();
406        assert_eq!(
407            reader.track_content_version(id).unwrap(),
408            0,
409            "snapshot must pin to the pre-write content_version"
410        );
411        // Latest version (visible without the snapshot) is bumped.
412        assert_eq!(writer.track_content_version(id).unwrap(), 1);
413
414        reader.end_read().unwrap();
415        // After the snapshot ends, the reader sees the new version.
416        assert_eq!(reader.track_content_version(id).unwrap(), 1);
417    }
418}