1use crate::models::{Format, NewTrack, Track, TrackBounds};
2use crate::{Db, ReadWrite, Result};
3use rusqlite::{Row, params};
4
5macro_rules! track_select {
10 ($tail:literal) => {
11 concat!(
12 "SELECT id, backing_path, format, audio_offset, audio_length, \
13 backing_size, backing_mtime_ns, backing_ctime_ns, content_version, updated_at \
14 FROM tracks ",
15 $tail
16 )
17 };
18}
19
20fn parse_format_col(fmt: &str) -> rusqlite::Result<Format> {
23 fmt.parse::<Format>().ok().ok_or_else(|| {
24 rusqlite::Error::FromSqlConversionFailure(
25 usize::MAX,
26 rusqlite::types::Type::Text,
27 format!("unknown format {fmt}").into(),
28 )
29 })
30}
31
32fn row_to_track(r: &Row) -> rusqlite::Result<Track> {
33 let fmt: String = r.get("format")?;
34 let format = parse_format_col(&fmt)?;
35 let audio_offset: u64 = r.get("audio_offset")?;
36 let audio_length: u64 = r.get("audio_length")?;
37 let backing_size: u64 = r.get("backing_size")?;
38 let bounds = TrackBounds::new(audio_offset, audio_length, backing_size).map_err(|e| {
39 rusqlite::Error::FromSqlConversionFailure(
40 usize::MAX,
41 rusqlite::types::Type::Integer,
42 e.to_string().into(),
43 )
44 })?;
45 Ok(Track {
46 id: r.get("id")?,
47 backing_path: r.get("backing_path")?,
48 format,
49 bounds,
50 backing_size,
51 backing_mtime_ns: r.get("backing_mtime_ns")?,
52 backing_ctime_ns: r.get("backing_ctime_ns")?,
53 content_version: r.get("content_version")?,
54 updated_at: r.get("updated_at")?,
55 })
56}
57
58#[derive(Debug, Default, PartialEq, Eq)]
62pub struct ChangelogRead {
63 pub changed_ids: Vec<i64>,
64 pub min_seq: i64,
65 pub max_seq: i64,
66}
67
68impl<M> Db<M> {
69 pub fn get_track(&self, id: i64) -> Result<Option<Track>> {
70 self.query_optional_track(track_select!("WHERE id = ?1"), params![id])
71 }
72
73 pub fn get_track_by_path(&self, path: &str) -> Result<Option<Track>> {
74 self.query_optional_track(track_select!("WHERE backing_path = ?1"), params![path])
75 }
76
77 pub fn list_tracks(&self) -> Result<Vec<Track>> {
78 let mut stmt = self.conn.prepare_cached(track_select!("ORDER BY id"))?;
79 let rows = stmt.query_map([], row_to_track)?;
80 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
81 }
82
83 pub fn track_content_version(&self, id: i64) -> Result<i64> {
84 Ok(self.conn.query_row(
85 "SELECT content_version FROM tracks WHERE id = ?1",
86 params![id],
87 |r| r.get(0),
88 )?)
89 }
90
91 pub fn begin_read(&self) -> Result<()> {
95 self.conn.execute_batch("BEGIN DEFERRED")?;
96 Ok(())
97 }
98
99 pub fn end_read(&self) -> Result<()> {
101 self.conn.execute_batch("ROLLBACK")?;
102 Ok(())
103 }
104
105 fn query_optional_track(&self, sql: &str, p: impl rusqlite::Params) -> Result<Option<Track>> {
106 let mut stmt = self.conn.prepare_cached(sql)?;
107 let mut rows = stmt.query(p)?;
108 match rows.next()? {
109 Some(r) => Ok(Some(row_to_track(r)?)),
110 None => Ok(None),
111 }
112 }
113
114 pub fn list_render_keys(&self) -> Result<Vec<(i64, i64, Format)>> {
118 let mut stmt = self
119 .conn
120 .prepare("SELECT id, content_version, format FROM tracks ORDER BY id")?;
121 let rows = stmt.query_map([], |r| {
122 let fmt: String = r.get(2)?;
123 Ok((
124 r.get::<_, i64>(0)?,
125 r.get::<_, i64>(1)?,
126 parse_format_col(&fmt)?,
127 ))
128 })?;
129 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
130 }
131
132 pub fn changelog_since(&self, last_seq: i64) -> Result<ChangelogRead> {
136 let tx = self.conn.unchecked_transaction()?;
141 let (min_seq, max_seq): (i64, i64) = tx.query_row(
142 "SELECT COALESCE(MIN(seq),0), COALESCE(MAX(seq),0) FROM track_changes",
143 [],
144 |r| Ok((r.get(0)?, r.get(1)?)),
145 )?;
146 let changed_ids = {
147 let mut stmt = tx.prepare(
148 "SELECT DISTINCT track_id FROM track_changes WHERE seq > ?1 ORDER BY track_id",
149 )?;
150 stmt.query_map([last_seq], |r| r.get(0))?
151 .collect::<rusqlite::Result<Vec<i64>>>()?
152 };
153 tx.commit()?;
154 Ok(ChangelogRead {
155 changed_ids,
156 min_seq,
157 max_seq,
158 })
159 }
160
161 pub fn render_keys_for(&self, ids: &[i64]) -> Result<Vec<(i64, i64, Format)>> {
164 const CHUNK: usize = 900;
165 let mut out = Vec::with_capacity(ids.len());
166 for chunk in ids.chunks(CHUNK) {
167 let placeholders = vec!["?"; chunk.len()].join(",");
168 let sql = format!(
169 "SELECT id, content_version, format FROM tracks \
170 WHERE id IN ({placeholders}) ORDER BY id"
171 );
172 let mut stmt = self.conn.prepare(&sql)?;
173 let params = rusqlite::params_from_iter(chunk.iter());
174 let rows = stmt.query_map(params, |r| {
175 let fmt: String = r.get(2)?;
176 Ok((
177 r.get::<_, i64>(0)?,
178 r.get::<_, i64>(1)?,
179 parse_format_col(&fmt)?,
180 ))
181 })?;
182 out.extend(rows.collect::<rusqlite::Result<Vec<_>>>()?);
183 }
184 Ok(out)
185 }
186}
187
188impl Db<ReadWrite> {
189 pub fn upsert_track(&self, t: &NewTrack) -> Result<i64> {
190 self.conn.execute(
191 "INSERT INTO tracks
192 (backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
193 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
194 ON CONFLICT(backing_path) DO UPDATE SET
195 format = excluded.format,
196 audio_offset = excluded.audio_offset,
197 audio_length = excluded.audio_length,
198 backing_size = excluded.backing_size,
199 backing_mtime_ns = excluded.backing_mtime_ns,
200 backing_ctime_ns = excluded.backing_ctime_ns,
201 updated_at = CAST(strftime('%s','now') AS INTEGER)",
202 params![
203 t.backing_path,
204 t.format.as_str(),
205 t.audio_offset,
206 t.audio_length,
207 t.backing_size,
208 t.backing_mtime_ns,
209 t.backing_ctime_ns,
210 ],
211 )?;
212 let id = self.conn.query_row(
213 "SELECT id FROM tracks WHERE backing_path = ?1",
214 params![t.backing_path],
215 |r| r.get(0),
216 )?;
217 Ok(id)
218 }
219
220 pub fn delete_track(&self, id: i64) -> Result<()> {
223 self.conn
224 .execute("DELETE FROM tracks WHERE id = ?1", params![id])?;
225 Ok(())
226 }
227
228 #[doc(hidden)]
234 pub fn set_format_for_test(&self, id: i64, fmt: Format) -> Result<()> {
235 self.conn.execute(
236 "UPDATE tracks SET format = ?1, updated_at = CAST(strftime('%s','now') AS INTEGER) WHERE id = ?2",
237 params![fmt.as_str(), id],
238 )?;
239 Ok(())
240 }
241
242 #[doc(hidden)]
246 pub fn delete_changelog_through_for_test(&self, seq: i64) -> Result<()> {
247 self.conn
248 .execute("DELETE FROM track_changes WHERE seq <= ?1", [seq])?;
249 Ok(())
250 }
251}
252
253#[cfg(test)]
254mod negative_audio_bounds_tests {
255 use crate::{Db, Format, NewTrack};
256
257 #[test]
258 fn negative_audio_bounds_error_at_row_read() {
259 let db = Db::open_in_memory().unwrap();
260 let id = db
261 .upsert_track(&NewTrack {
262 backing_path: "/x.flac".into(),
263 format: Format::Flac,
264 audio_offset: 0,
265 audio_length: 1,
266 backing_size: 1,
267 backing_mtime_ns: 0,
268 backing_ctime_ns: 0,
269 })
270 .unwrap();
271 db.conn
276 .pragma_update(None, "ignore_check_constraints", true)
277 .unwrap();
278 db.conn
279 .execute("UPDATE tracks SET audio_offset = -1 WHERE id = ?1", [id])
280 .unwrap();
281 db.conn
282 .pragma_update(None, "ignore_check_constraints", false)
283 .unwrap();
284 assert!(
285 db.get_track(id).is_err(),
286 "negative audio_offset must fail row-read, not wrap"
287 );
288 }
289
290 #[test]
291 fn out_of_range_bounds_error_at_row_read() {
292 let db = Db::open_in_memory().unwrap();
293 let id = db
294 .upsert_track(&NewTrack {
295 backing_path: "/x.flac".into(),
296 format: Format::Flac,
297 audio_offset: 0,
298 audio_length: 1,
299 backing_size: 1,
300 backing_mtime_ns: 0,
301 backing_ctime_ns: 0,
302 })
303 .unwrap();
304 db.conn
307 .pragma_update(None, "ignore_check_constraints", true)
308 .unwrap();
309 db.conn
310 .execute("UPDATE tracks SET audio_length = 5 WHERE id = ?1", [id])
311 .unwrap();
312 db.conn
313 .pragma_update(None, "ignore_check_constraints", false)
314 .unwrap();
315 assert!(
316 db.get_track(id).is_err(),
317 "audio_offset + audio_length > backing_size must fail row-read"
318 );
319 }
320}
321
322#[cfg(test)]
323mod render_key_tests {
324 use super::*;
325 use crate::{Format, NewTrack, Tag};
326
327 fn open_mem() -> Db {
328 Db::open_in_memory().unwrap()
329 }
330
331 fn new_track(path: &str, fmt: Format) -> NewTrack {
332 NewTrack {
333 backing_path: path.to_string(),
334 format: fmt,
335 audio_offset: 0,
336 audio_length: 1,
337 backing_size: 1,
338 backing_mtime_ns: 0,
339 backing_ctime_ns: 0,
340 }
341 }
342
343 #[test]
344 fn list_render_keys_returns_id_version_format_sorted_by_id() {
345 let db = open_mem();
346 let a = db
347 .upsert_track(&new_track("/a.flac", Format::Flac))
348 .unwrap();
349 let b = db.upsert_track(&new_track("/b.mp3", Format::Mp3)).unwrap();
350 db.replace_tags(a, &[Tag::new("TITLE", "x", 0)]).unwrap();
352
353 let keys = db.list_render_keys().unwrap();
354 assert_eq!(keys.len(), 2);
355 assert_eq!(keys[0].0, a);
356 assert_eq!(keys[1].0, b);
357 assert!(keys[0].1 >= 1, "a content_version should have risen");
358 assert_eq!(keys[1].1, 0, "b content_version untouched");
359 assert_eq!(keys[0].2, Format::Flac);
360 assert_eq!(keys[1].2, Format::Mp3);
361 }
362
363 #[test]
364 fn set_format_for_test_persists_the_new_format() {
365 let db = open_mem();
366 let id = db
367 .upsert_track(&new_track("/a.flac", Format::Flac))
368 .unwrap();
369 db.set_format_for_test(id, Format::Mp3).unwrap();
370 let keys = db.list_render_keys().unwrap();
371 assert_eq!(keys[0].0, id);
372 assert_eq!(
373 keys[0].2,
374 Format::Mp3,
375 "set_format_for_test must actually UPDATE the format column"
376 );
377 }
378
379 #[test]
385 fn begin_read_pins_a_single_wal_snapshot_against_external_writes() {
386 let dir = tempfile::tempdir().unwrap();
387 let path = dir.path().join("m.db");
388 let writer = Db::open(&path).unwrap();
389 let id = writer
390 .upsert_track(&new_track("/a.mp3", Format::Mp3))
391 .unwrap();
392 assert_eq!(writer.track_content_version(id).unwrap(), 0);
393
394 let reader = Db::open(&path).unwrap();
396 assert_eq!(reader.track_content_version(id).unwrap(), 0);
397
398 reader.begin_read().unwrap();
399 assert_eq!(reader.track_content_version(id).unwrap(), 0);
401
402 writer
404 .replace_tags(id, &[Tag::new("artist", "Alice", 0)])
405 .unwrap();
406 assert_eq!(
407 reader.track_content_version(id).unwrap(),
408 0,
409 "snapshot must pin to the pre-write content_version"
410 );
411 assert_eq!(writer.track_content_version(id).unwrap(), 1);
413
414 reader.end_read().unwrap();
415 assert_eq!(reader.track_content_version(id).unwrap(), 1);
417 }
418}