use crate::models::{Format, NewTrack, Track, TrackBounds};
use crate::{Db, ReadWrite, Result};
use rusqlite::{Row, params};
macro_rules! track_select {
($tail:literal) => {
concat!(
"SELECT id, backing_path, format, audio_offset, audio_length, \
backing_size, backing_mtime_ns, backing_ctime_ns, content_version, updated_at \
FROM tracks ",
$tail
)
};
}
fn parse_format_col(fmt: &str) -> rusqlite::Result<Format> {
fmt.parse::<Format>().ok().ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
usize::MAX,
rusqlite::types::Type::Text,
format!("unknown format {fmt}").into(),
)
})
}
fn row_to_track(r: &Row) -> rusqlite::Result<Track> {
let fmt: String = r.get("format")?;
let format = parse_format_col(&fmt)?;
let audio_offset: u64 = r.get("audio_offset")?;
let audio_length: u64 = r.get("audio_length")?;
let backing_size: u64 = r.get("backing_size")?;
let bounds = TrackBounds::new(audio_offset, audio_length, backing_size).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
usize::MAX,
rusqlite::types::Type::Integer,
e.to_string().into(),
)
})?;
Ok(Track {
id: r.get("id")?,
backing_path: r.get("backing_path")?,
format,
bounds,
backing_size,
backing_mtime_ns: r.get("backing_mtime_ns")?,
backing_ctime_ns: r.get("backing_ctime_ns")?,
content_version: r.get("content_version")?,
updated_at: r.get("updated_at")?,
})
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ChangelogRead {
pub changed_ids: Vec<i64>,
pub min_seq: i64,
pub max_seq: i64,
}
impl<M> Db<M> {
pub fn get_track(&self, id: i64) -> Result<Option<Track>> {
self.query_optional_track(track_select!("WHERE id = ?1"), params![id])
}
pub fn get_track_by_path(&self, path: &str) -> Result<Option<Track>> {
self.query_optional_track(track_select!("WHERE backing_path = ?1"), params![path])
}
pub fn list_tracks(&self) -> Result<Vec<Track>> {
let mut stmt = self.conn.prepare_cached(track_select!("ORDER BY id"))?;
let rows = stmt.query_map([], row_to_track)?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn track_content_version(&self, id: i64) -> Result<i64> {
Ok(self.conn.query_row(
"SELECT content_version FROM tracks WHERE id = ?1",
params![id],
|r| r.get(0),
)?)
}
pub fn begin_read(&self) -> Result<()> {
self.conn.execute_batch("BEGIN DEFERRED")?;
Ok(())
}
pub fn end_read(&self) -> Result<()> {
self.conn.execute_batch("ROLLBACK")?;
Ok(())
}
fn query_optional_track(&self, sql: &str, p: impl rusqlite::Params) -> Result<Option<Track>> {
let mut stmt = self.conn.prepare_cached(sql)?;
let mut rows = stmt.query(p)?;
match rows.next()? {
Some(r) => Ok(Some(row_to_track(r)?)),
None => Ok(None),
}
}
pub fn list_render_keys(&self) -> Result<Vec<(i64, i64, Format)>> {
let mut stmt = self
.conn
.prepare("SELECT id, content_version, format FROM tracks ORDER BY id")?;
let rows = stmt.query_map([], |r| {
let fmt: String = r.get(2)?;
Ok((
r.get::<_, i64>(0)?,
r.get::<_, i64>(1)?,
parse_format_col(&fmt)?,
))
})?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn changelog_since(&self, last_seq: i64) -> Result<ChangelogRead> {
let tx = self.conn.unchecked_transaction()?;
let (min_seq, max_seq): (i64, i64) = tx.query_row(
"SELECT COALESCE(MIN(seq),0), COALESCE(MAX(seq),0) FROM track_changes",
[],
|r| Ok((r.get(0)?, r.get(1)?)),
)?;
let changed_ids = {
let mut stmt = tx.prepare(
"SELECT DISTINCT track_id FROM track_changes WHERE seq > ?1 ORDER BY track_id",
)?;
stmt.query_map([last_seq], |r| r.get(0))?
.collect::<rusqlite::Result<Vec<i64>>>()?
};
tx.commit()?;
Ok(ChangelogRead {
changed_ids,
min_seq,
max_seq,
})
}
pub fn render_keys_for(&self, ids: &[i64]) -> Result<Vec<(i64, i64, Format)>> {
const CHUNK: usize = 900;
let mut out = Vec::with_capacity(ids.len());
for chunk in ids.chunks(CHUNK) {
let placeholders = vec!["?"; chunk.len()].join(",");
let sql = format!(
"SELECT id, content_version, format FROM tracks \
WHERE id IN ({placeholders}) ORDER BY id"
);
let mut stmt = self.conn.prepare(&sql)?;
let params = rusqlite::params_from_iter(chunk.iter());
let rows = stmt.query_map(params, |r| {
let fmt: String = r.get(2)?;
Ok((
r.get::<_, i64>(0)?,
r.get::<_, i64>(1)?,
parse_format_col(&fmt)?,
))
})?;
out.extend(rows.collect::<rusqlite::Result<Vec<_>>>()?);
}
Ok(out)
}
}
impl Db<ReadWrite> {
pub fn upsert_track(&self, t: &NewTrack) -> Result<i64> {
self.conn.execute(
"INSERT INTO tracks
(backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
ON CONFLICT(backing_path) DO UPDATE SET
format = excluded.format,
audio_offset = excluded.audio_offset,
audio_length = excluded.audio_length,
backing_size = excluded.backing_size,
backing_mtime_ns = excluded.backing_mtime_ns,
backing_ctime_ns = excluded.backing_ctime_ns,
updated_at = CAST(strftime('%s','now') AS INTEGER)",
params![
t.backing_path,
t.format.as_str(),
t.audio_offset,
t.audio_length,
t.backing_size,
t.backing_mtime_ns,
t.backing_ctime_ns,
],
)?;
let id = self.conn.query_row(
"SELECT id FROM tracks WHERE backing_path = ?1",
params![t.backing_path],
|r| r.get(0),
)?;
Ok(id)
}
pub fn delete_track(&self, id: i64) -> Result<()> {
self.conn
.execute("DELETE FROM tracks WHERE id = ?1", params![id])?;
Ok(())
}
#[doc(hidden)]
pub fn set_format_for_test(&self, id: i64, fmt: Format) -> Result<()> {
self.conn.execute(
"UPDATE tracks SET format = ?1, updated_at = CAST(strftime('%s','now') AS INTEGER) WHERE id = ?2",
params![fmt.as_str(), id],
)?;
Ok(())
}
#[doc(hidden)]
pub fn delete_changelog_through_for_test(&self, seq: i64) -> Result<()> {
self.conn
.execute("DELETE FROM track_changes WHERE seq <= ?1", [seq])?;
Ok(())
}
}
#[cfg(test)]
mod negative_audio_bounds_tests {
use crate::{Db, Format, NewTrack};
#[test]
fn negative_audio_bounds_error_at_row_read() {
let db = Db::open_in_memory().unwrap();
let id = db
.upsert_track(&NewTrack {
backing_path: "/x.flac".into(),
format: Format::Flac,
audio_offset: 0,
audio_length: 1,
backing_size: 1,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
})
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", true)
.unwrap();
db.conn
.execute("UPDATE tracks SET audio_offset = -1 WHERE id = ?1", [id])
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", false)
.unwrap();
assert!(
db.get_track(id).is_err(),
"negative audio_offset must fail row-read, not wrap"
);
}
#[test]
fn out_of_range_bounds_error_at_row_read() {
let db = Db::open_in_memory().unwrap();
let id = db
.upsert_track(&NewTrack {
backing_path: "/x.flac".into(),
format: Format::Flac,
audio_offset: 0,
audio_length: 1,
backing_size: 1,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
})
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", true)
.unwrap();
db.conn
.execute("UPDATE tracks SET audio_length = 5 WHERE id = ?1", [id])
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", false)
.unwrap();
assert!(
db.get_track(id).is_err(),
"audio_offset + audio_length > backing_size must fail row-read"
);
}
}
#[cfg(test)]
mod render_key_tests {
use super::*;
use crate::{Format, NewTrack, Tag};
fn open_mem() -> Db {
Db::open_in_memory().unwrap()
}
fn new_track(path: &str, fmt: Format) -> NewTrack {
NewTrack {
backing_path: path.to_string(),
format: fmt,
audio_offset: 0,
audio_length: 1,
backing_size: 1,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
}
}
#[test]
fn list_render_keys_returns_id_version_format_sorted_by_id() {
let db = open_mem();
let a = db
.upsert_track(&new_track("/a.flac", Format::Flac))
.unwrap();
let b = db.upsert_track(&new_track("/b.mp3", Format::Mp3)).unwrap();
db.replace_tags(a, &[Tag::new("TITLE", "x", 0)]).unwrap();
let keys = db.list_render_keys().unwrap();
assert_eq!(keys.len(), 2);
assert_eq!(keys[0].0, a);
assert_eq!(keys[1].0, b);
assert!(keys[0].1 >= 1, "a content_version should have risen");
assert_eq!(keys[1].1, 0, "b content_version untouched");
assert_eq!(keys[0].2, Format::Flac);
assert_eq!(keys[1].2, Format::Mp3);
}
#[test]
fn set_format_for_test_persists_the_new_format() {
let db = open_mem();
let id = db
.upsert_track(&new_track("/a.flac", Format::Flac))
.unwrap();
db.set_format_for_test(id, Format::Mp3).unwrap();
let keys = db.list_render_keys().unwrap();
assert_eq!(keys[0].0, id);
assert_eq!(
keys[0].2,
Format::Mp3,
"set_format_for_test must actually UPDATE the format column"
);
}
#[test]
fn begin_read_pins_a_single_wal_snapshot_against_external_writes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("m.db");
let writer = Db::open(&path).unwrap();
let id = writer
.upsert_track(&new_track("/a.mp3", Format::Mp3))
.unwrap();
assert_eq!(writer.track_content_version(id).unwrap(), 0);
let reader = Db::open(&path).unwrap();
assert_eq!(reader.track_content_version(id).unwrap(), 0);
reader.begin_read().unwrap();
assert_eq!(reader.track_content_version(id).unwrap(), 0);
writer
.replace_tags(id, &[Tag::new("artist", "Alice", 0)])
.unwrap();
assert_eq!(
reader.track_content_version(id).unwrap(),
0,
"snapshot must pin to the pre-write content_version"
);
assert_eq!(writer.track_content_version(id).unwrap(), 1);
reader.end_read().unwrap();
assert_eq!(reader.track_content_version(id).unwrap(), 1);
}
}