use crate::models::{Format, NewTrack, Track, TrackBounds};
use crate::{Db, ReadWrite, Result};
use rusqlite::{Row, params};
macro_rules! track_select {
($tail:literal) => {
concat!(
"SELECT id, backing_path, format, audio_offset, audio_length, \
backing_size, backing_mtime_ns, backing_ctime_ns, content_version, updated_at, \
fingerprint, content_hash \
FROM tracks ",
$tail
)
};
}
fn parse_format_col(fmt: &str) -> rusqlite::Result<Format> {
fmt.parse::<Format>().ok().ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
usize::MAX,
rusqlite::types::Type::Text,
format!("unknown format {fmt}").into(),
)
})
}
fn row_to_track(r: &Row) -> rusqlite::Result<Track> {
let fmt: String = r.get("format")?;
let format = parse_format_col(&fmt)?;
let audio_offset: u64 = r.get("audio_offset")?;
let audio_length: u64 = r.get("audio_length")?;
let backing_size: u64 = r.get("backing_size")?;
let bounds = TrackBounds::new(audio_offset, audio_length, backing_size).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
usize::MAX,
rusqlite::types::Type::Integer,
e.to_string().into(),
)
})?;
Ok(Track {
id: r.get("id")?,
backing_path: r.get("backing_path")?,
format,
bounds,
backing_size,
backing_mtime_ns: r.get("backing_mtime_ns")?,
backing_ctime_ns: r.get("backing_ctime_ns")?,
content_version: r.get("content_version")?,
updated_at: r.get("updated_at")?,
fingerprint: r.get("fingerprint")?,
content_hash: r.get("content_hash")?,
})
}
pub(crate) fn upsert_track_in(conn: &rusqlite::Connection, t: &NewTrack) -> Result<i64> {
Ok(conn.query_row(
"INSERT INTO tracks
(backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
ON CONFLICT(backing_path) DO UPDATE SET
format=excluded.format, audio_offset=excluded.audio_offset,
audio_length=excluded.audio_length, backing_size=excluded.backing_size,
backing_mtime_ns=excluded.backing_mtime_ns,
backing_ctime_ns=excluded.backing_ctime_ns,
updated_at=CAST(strftime('%s','now') AS INTEGER)
RETURNING id",
params![
t.backing_path,
t.format.as_str(),
t.audio_offset,
t.audio_length,
t.backing_size,
t.backing_mtime_ns,
t.backing_ctime_ns,
],
|r| r.get(0),
)?)
}
pub(crate) fn get_track_by_path_in(
conn: &rusqlite::Connection,
path: &str,
) -> Result<Option<Track>> {
crate::query_optional(
conn,
track_select!("WHERE backing_path = ?1"),
params![path],
|r| Ok(row_to_track(r)?),
)
}
pub(crate) fn tracks_by_fingerprint_in(
conn: &rusqlite::Connection,
fp: &str,
) -> Result<Vec<Track>> {
let mut stmt = conn.prepare_cached(track_select!("WHERE fingerprint = ?1 ORDER BY id"))?;
let rows = stmt.query_map(params![fp], row_to_track)?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub(crate) fn set_track_checksums_in(
conn: &rusqlite::Connection,
id: i64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> Result<()> {
conn.execute(
"UPDATE tracks SET
fingerprint = COALESCE(?2, fingerprint),
content_hash = COALESCE(?3, content_hash)
WHERE id = ?1",
params![id, fingerprint, content_hash],
)?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn retarget_track_in(
conn: &rusqlite::Connection,
id: i64,
new_backing_path: &str,
backing_size: u64,
backing_mtime_ns: i64,
backing_ctime_ns: i64,
audio_offset: u64,
audio_length: u64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> Result<()> {
conn.execute(
"UPDATE tracks SET
backing_path = ?2,
backing_size = ?3,
backing_mtime_ns = ?4,
backing_ctime_ns = ?5,
audio_offset = ?6,
audio_length = ?7,
fingerprint = COALESCE(?8, fingerprint),
content_hash = COALESCE(?9, content_hash),
updated_at = CAST(strftime('%s','now') AS INTEGER)
WHERE id = ?1",
params![
id,
new_backing_path,
backing_size,
backing_mtime_ns,
backing_ctime_ns,
audio_offset,
audio_length,
fingerprint,
content_hash,
],
)?;
Ok(())
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ChangelogRead {
pub changed_ids: Vec<i64>,
pub min_seq: i64,
pub max_seq: i64,
}
impl<M> Db<M> {
pub fn get_track(&self, id: i64) -> Result<Option<Track>> {
self.query_optional_track(track_select!("WHERE id = ?1"), params![id])
}
pub fn get_track_by_path(&self, path: &str) -> Result<Option<Track>> {
get_track_by_path_in(&self.conn, path)
}
pub fn list_tracks(&self) -> Result<Vec<Track>> {
let mut stmt = self.conn.prepare_cached(track_select!("ORDER BY id"))?;
let rows = stmt.query_map([], row_to_track)?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn track_content_version(&self, id: i64) -> Result<i64> {
Ok(self.conn.query_row(
"SELECT content_version FROM tracks WHERE id = ?1",
params![id],
|r| r.get(0),
)?)
}
pub fn track_version_and_path(&self, id: i64) -> Result<Option<(i64, String)>> {
crate::query_optional(
&self.conn,
"SELECT content_version, backing_path FROM tracks WHERE id = ?1",
params![id],
|r| Ok((r.get(0)?, r.get(1)?)),
)
}
pub fn begin_read(&self) -> Result<()> {
if !self.conn.is_autocommit() {
log::warn!(
"begin_read found a leaked read transaction on this connection; \
rolling it back (a prior end_read likely failed to release the snapshot)"
);
self.conn.execute_batch("ROLLBACK")?;
}
self.conn.execute_batch("BEGIN DEFERRED")?;
Ok(())
}
pub fn end_read(&self) -> Result<()> {
self.conn.execute_batch("ROLLBACK")?;
Ok(())
}
fn query_optional_track(&self, sql: &str, p: impl rusqlite::Params) -> Result<Option<Track>> {
crate::query_optional(&self.conn, sql, p, |r| Ok(row_to_track(r)?))
}
pub fn list_render_keys(&self) -> Result<Vec<(i64, i64, Format)>> {
let mut stmt = self
.conn
.prepare("SELECT id, content_version, format FROM tracks ORDER BY id")?;
let rows = stmt.query_map([], |r| {
let fmt: String = r.get(2)?;
Ok((
r.get::<_, i64>(0)?,
r.get::<_, i64>(1)?,
parse_format_col(&fmt)?,
))
})?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn changelog_since(&self, last_seq: i64) -> Result<ChangelogRead> {
let tx = self.conn.unchecked_transaction()?;
let (min_seq, max_seq): (i64, i64) = tx.query_row(
"SELECT COALESCE(MIN(seq),0), COALESCE(MAX(seq),0) FROM track_changes",
[],
|r| Ok((r.get(0)?, r.get(1)?)),
)?;
let changed_ids = {
let mut stmt = tx.prepare(
"SELECT DISTINCT track_id FROM track_changes WHERE seq > ?1 ORDER BY track_id",
)?;
stmt.query_map([last_seq], |r| r.get(0))?
.collect::<rusqlite::Result<Vec<i64>>>()?
};
tx.commit()?;
Ok(ChangelogRead {
changed_ids,
min_seq,
max_seq,
})
}
pub fn render_keys_for(&self, ids: &[i64]) -> Result<Vec<(i64, i64, Format)>> {
let mut out = Vec::with_capacity(ids.len());
crate::query_in_chunks(
&self.conn,
ids,
|ph| {
format!(
"SELECT id, content_version, format FROM tracks \
WHERE id IN ({ph}) ORDER BY id"
)
},
|rows| {
while let Some(r) = rows.next()? {
let fmt: String = r.get(2)?;
out.push((
r.get::<_, i64>(0)?,
r.get::<_, i64>(1)?,
parse_format_col(&fmt)?,
));
}
Ok(())
},
)?;
Ok(out)
}
}
impl Db<ReadWrite> {
pub fn upsert_track(&self, t: &NewTrack) -> Result<i64> {
upsert_track_in(&self.conn, t)
}
pub fn delete_track(&self, id: i64) -> Result<()> {
self.conn
.execute("DELETE FROM tracks WHERE id = ?1", params![id])?;
Ok(())
}
pub fn tracks_by_fingerprint(&self, fp: &str) -> Result<Vec<Track>> {
tracks_by_fingerprint_in(&self.conn, fp)
}
pub fn set_track_checksums(
&self,
id: i64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> Result<()> {
set_track_checksums_in(&self.conn, id, fingerprint, content_hash)
}
#[allow(clippy::too_many_arguments)]
pub fn retarget_track(
&self,
id: i64,
new_backing_path: &str,
backing_size: u64,
backing_mtime_ns: i64,
backing_ctime_ns: i64,
audio_offset: u64,
audio_length: u64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> Result<()> {
retarget_track_in(
&self.conn,
id,
new_backing_path,
backing_size,
backing_mtime_ns,
backing_ctime_ns,
audio_offset,
audio_length,
fingerprint,
content_hash,
)
}
#[doc(hidden)]
pub fn set_format_for_test(&self, id: i64, fmt: Format) -> Result<()> {
self.conn.execute(
"UPDATE tracks SET format = ?1, updated_at = CAST(strftime('%s','now') AS INTEGER) WHERE id = ?2",
params![fmt.as_str(), id],
)?;
Ok(())
}
#[doc(hidden)]
pub fn delete_changelog_through_for_test(&self, seq: i64) -> Result<()> {
self.conn
.execute("DELETE FROM track_changes WHERE seq <= ?1", [seq])?;
Ok(())
}
}
#[cfg(test)]
mod negative_audio_bounds_tests {
use crate::{Db, Format, NewTrack};
#[test]
fn negative_audio_bounds_error_at_row_read() {
let db = Db::open_in_memory().unwrap();
let id = db
.upsert_track(&NewTrack {
backing_path: "/x.flac".into(),
format: Format::Flac,
audio_offset: 0,
audio_length: 1,
backing_size: 1,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
})
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", true)
.unwrap();
db.conn
.execute("UPDATE tracks SET audio_offset = -1 WHERE id = ?1", [id])
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", false)
.unwrap();
assert!(
db.get_track(id).is_err(),
"negative audio_offset must fail row-read, not wrap"
);
}
#[test]
fn out_of_range_bounds_error_at_row_read() {
let db = Db::open_in_memory().unwrap();
let id = db
.upsert_track(&NewTrack {
backing_path: "/x.flac".into(),
format: Format::Flac,
audio_offset: 0,
audio_length: 1,
backing_size: 1,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
})
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", true)
.unwrap();
db.conn
.execute("UPDATE tracks SET audio_length = 5 WHERE id = ?1", [id])
.unwrap();
db.conn
.pragma_update(None, "ignore_check_constraints", false)
.unwrap();
assert!(
db.get_track(id).is_err(),
"audio_offset + audio_length > backing_size must fail row-read"
);
}
}
#[cfg(test)]
mod render_key_tests {
use super::*;
use crate::{Format, NewTrack, Tag};
fn open_mem() -> Db {
Db::open_in_memory().unwrap()
}
fn new_track(path: &str, fmt: Format) -> NewTrack {
NewTrack {
backing_path: path.to_string(),
format: fmt,
audio_offset: 0,
audio_length: 1,
backing_size: 1,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
}
}
#[test]
fn list_render_keys_returns_id_version_format_sorted_by_id() {
let db = open_mem();
let a = db
.upsert_track(&new_track("/a.flac", Format::Flac))
.unwrap();
let b = db.upsert_track(&new_track("/b.mp3", Format::Mp3)).unwrap();
db.replace_tags(a, &[Tag::new("TITLE", "x", 0)]).unwrap();
let keys = db.list_render_keys().unwrap();
assert_eq!(keys.len(), 2);
assert_eq!(keys[0].0, a);
assert_eq!(keys[1].0, b);
assert!(keys[0].1 >= 1, "a content_version should have risen");
assert_eq!(keys[1].1, 0, "b content_version untouched");
assert_eq!(keys[0].2, Format::Flac);
assert_eq!(keys[1].2, Format::Mp3);
}
#[test]
fn set_format_for_test_persists_the_new_format() {
let db = open_mem();
let id = db
.upsert_track(&new_track("/a.flac", Format::Flac))
.unwrap();
db.set_format_for_test(id, Format::Mp3).unwrap();
let keys = db.list_render_keys().unwrap();
assert_eq!(keys[0].0, id);
assert_eq!(
keys[0].2,
Format::Mp3,
"set_format_for_test must actually UPDATE the format column"
);
}
#[test]
fn begin_read_pins_a_single_wal_snapshot_against_external_writes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("m.db");
let writer = Db::open(&path).unwrap();
let id = writer
.upsert_track(&new_track("/a.mp3", Format::Mp3))
.unwrap();
assert_eq!(writer.track_content_version(id).unwrap(), 0);
let reader = Db::open(&path).unwrap();
assert_eq!(reader.track_content_version(id).unwrap(), 0);
reader.begin_read().unwrap();
assert_eq!(reader.track_content_version(id).unwrap(), 0);
writer
.replace_tags(id, &[Tag::new("artist", "Alice", 0)])
.unwrap();
assert_eq!(
reader.track_content_version(id).unwrap(),
0,
"snapshot must pin to the pre-write content_version"
);
assert_eq!(writer.track_content_version(id).unwrap(), 1);
reader.end_read().unwrap();
assert_eq!(reader.track_content_version(id).unwrap(), 1);
}
#[test]
fn begin_read_self_heals_a_leaked_prior_snapshot() {
let db = open_mem();
db.begin_read().unwrap();
assert!(
db.begin_read().is_ok(),
"a leaked read snapshot must self-heal, not surface an opaque error"
);
db.end_read().unwrap();
}
}
#[cfg(test)]
mod checksum_tests {
use crate::{Db, NewTrack, models::Format};
fn new_track(path: &str) -> NewTrack {
NewTrack {
backing_path: path.to_string(),
format: Format::Flac,
audio_offset: 0,
audio_length: 10,
backing_size: 10,
backing_mtime_ns: 0,
backing_ctime_ns: 0,
}
}
#[test]
fn set_and_read_back_checksums() {
let db = Db::open_in_memory().unwrap();
let id = db.upsert_track(&new_track("/a.flac")).unwrap();
db.set_track_checksums(id, Some(&"a".repeat(64)), Some(&"d".repeat(64)))
.unwrap();
let t = db.get_track(id).unwrap().unwrap();
assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..]));
assert_eq!(t.content_hash.as_deref(), Some(&"d".repeat(64)[..]));
}
#[test]
fn set_checksums_none_does_not_clobber_existing() {
let db = Db::open_in_memory().unwrap();
let id = db.upsert_track(&new_track("/a.flac")).unwrap();
db.set_track_checksums(id, Some(&"a".repeat(64)), Some(&"d".repeat(64)))
.unwrap();
db.set_track_checksums(id, None, None).unwrap();
let t = db.get_track(id).unwrap().unwrap();
assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..]));
assert_eq!(t.content_hash.as_deref(), Some(&"d".repeat(64)[..]));
}
#[test]
fn tracks_by_fingerprint_returns_matches() {
let db = Db::open_in_memory().unwrap();
let a = db.upsert_track(&new_track("/a.flac")).unwrap();
let b = db.upsert_track(&new_track("/b.flac")).unwrap();
db.set_track_checksums(a, Some(&"b".repeat(64)), None)
.unwrap();
db.set_track_checksums(b, Some(&"b".repeat(64)), None)
.unwrap();
db.upsert_track(&new_track("/c.flac")).unwrap(); let mut ids: Vec<i64> = db
.tracks_by_fingerprint(&"b".repeat(64))
.unwrap()
.into_iter()
.map(|t| t.id)
.collect();
ids.sort_unstable();
assert_eq!(ids, vec![a, b]);
assert!(
db.tracks_by_fingerprint(&"c".repeat(64))
.unwrap()
.is_empty()
);
}
#[test]
fn retarget_updates_path_stamp_and_bounds_keeping_id() {
let db = Db::open_in_memory().unwrap();
let id = db.upsert_track(&new_track("/old.flac")).unwrap();
db.set_track_checksums(id, Some(&"a".repeat(64)), None)
.unwrap();
db.retarget_track(
id,
"/new.flac",
99,
1234,
5678,
42,
50,
None,
Some(&"e".repeat(64)),
)
.unwrap();
let t = db.get_track(id).unwrap().unwrap();
assert_eq!(t.id, id);
assert_eq!(t.backing_path, "/new.flac");
assert_eq!(t.backing_size, 99);
assert_eq!(t.backing_mtime_ns, 1234);
assert_eq!(t.backing_ctime_ns, 5678);
assert_eq!(t.bounds.audio_offset(), 42);
assert_eq!(t.bounds.audio_length(), 50);
assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..])); assert_eq!(t.content_hash.as_deref(), Some(&"e".repeat(64)[..]));
assert!(db.get_track_by_path("/old.flac").unwrap().is_none());
}
#[test]
fn bulk_writer_reads_back_fingerprint_and_path() {
let db = Db::open_in_memory().unwrap();
let fp = "f".repeat(64);
let mut bw = db.bulk_writer().unwrap();
let id = bw.upsert_track(&new_track("/x.flac")).unwrap();
bw.set_track_checksums(id, Some(&fp), None).unwrap();
let by_fp = bw.tracks_by_fingerprint(&fp).unwrap();
assert_eq!(by_fp.len(), 1, "fingerprint match must be returned");
assert_eq!(by_fp[0].id, id);
let by_path = bw.get_track_by_path("/x.flac").unwrap();
assert_eq!(by_path.map(|t| t.id), Some(id), "path lookup must hit");
bw.commit().unwrap();
}
#[test]
fn bulk_writer_retarget_and_checksums_match_db() {
let db = Db::open_in_memory().unwrap();
let id = {
let mut bw = db.bulk_writer().unwrap();
let id = bw.upsert_track(&new_track("/old.flac")).unwrap();
bw.set_track_checksums(id, Some(&"a".repeat(64)), None)
.unwrap();
bw.retarget_track(id, "/new.flac", 10, 1, 2, 0, 10, None, None)
.unwrap();
bw.commit().unwrap();
id
};
let t = db.get_track(id).unwrap().unwrap();
assert_eq!(t.backing_path, "/new.flac");
assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..]));
}
}