1use crate::models::{Format, NewTrack, Track, TrackBounds};
2use crate::{Db, ReadWrite, Result};
3use rusqlite::{Row, params};
4
5macro_rules! track_select {
10 ($tail:literal) => {
11 concat!(
12 "SELECT id, backing_path, format, audio_offset, audio_length, \
13 backing_size, backing_mtime_ns, backing_ctime_ns, content_version, updated_at, \
14 fingerprint, content_hash \
15 FROM tracks ",
16 $tail
17 )
18 };
19}
20
21fn parse_format_col(fmt: &str) -> rusqlite::Result<Format> {
24 fmt.parse::<Format>().ok().ok_or_else(|| {
25 rusqlite::Error::FromSqlConversionFailure(
26 usize::MAX,
27 rusqlite::types::Type::Text,
28 format!("unknown format {fmt}").into(),
29 )
30 })
31}
32
33fn row_to_track(r: &Row) -> rusqlite::Result<Track> {
34 let fmt: String = r.get("format")?;
35 let format = parse_format_col(&fmt)?;
36 let audio_offset: u64 = r.get("audio_offset")?;
37 let audio_length: u64 = r.get("audio_length")?;
38 let backing_size: u64 = r.get("backing_size")?;
39 let bounds = TrackBounds::new(audio_offset, audio_length, backing_size).map_err(|e| {
40 rusqlite::Error::FromSqlConversionFailure(
41 usize::MAX,
42 rusqlite::types::Type::Integer,
43 e.to_string().into(),
44 )
45 })?;
46 Ok(Track {
47 id: r.get("id")?,
48 backing_path: r.get("backing_path")?,
49 format,
50 bounds,
51 backing_size,
52 backing_mtime_ns: r.get("backing_mtime_ns")?,
53 backing_ctime_ns: r.get("backing_ctime_ns")?,
54 content_version: r.get("content_version")?,
55 updated_at: r.get("updated_at")?,
56 fingerprint: r.get("fingerprint")?,
57 content_hash: r.get("content_hash")?,
58 })
59}
60
61pub(crate) fn upsert_track_in(conn: &rusqlite::Connection, t: &NewTrack) -> Result<i64> {
65 Ok(conn.query_row(
66 "INSERT INTO tracks
67 (backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
68 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
69 ON CONFLICT(backing_path) DO UPDATE SET
70 format=excluded.format, audio_offset=excluded.audio_offset,
71 audio_length=excluded.audio_length, backing_size=excluded.backing_size,
72 backing_mtime_ns=excluded.backing_mtime_ns,
73 backing_ctime_ns=excluded.backing_ctime_ns,
74 updated_at=CAST(strftime('%s','now') AS INTEGER)
75 RETURNING id",
76 params![
77 t.backing_path,
78 t.format.as_str(),
79 t.audio_offset,
80 t.audio_length,
81 t.backing_size,
82 t.backing_mtime_ns,
83 t.backing_ctime_ns,
84 ],
85 |r| r.get(0),
86 )?)
87}
88
89pub(crate) fn get_track_by_path_in(
90 conn: &rusqlite::Connection,
91 path: &str,
92) -> Result<Option<Track>> {
93 crate::query_optional(
94 conn,
95 track_select!("WHERE backing_path = ?1"),
96 params![path],
97 |r| Ok(row_to_track(r)?),
98 )
99}
100
101pub(crate) fn tracks_by_fingerprint_in(
102 conn: &rusqlite::Connection,
103 fp: &str,
104) -> Result<Vec<Track>> {
105 let mut stmt = conn.prepare_cached(track_select!("WHERE fingerprint = ?1 ORDER BY id"))?;
106 let rows = stmt.query_map(params![fp], row_to_track)?;
107 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
108}
109
110pub(crate) fn set_track_checksums_in(
111 conn: &rusqlite::Connection,
112 id: i64,
113 fingerprint: Option<&str>,
114 content_hash: Option<&str>,
115) -> Result<()> {
116 conn.execute(
117 "UPDATE tracks SET
118 fingerprint = COALESCE(?2, fingerprint),
119 content_hash = COALESCE(?3, content_hash)
120 WHERE id = ?1",
121 params![id, fingerprint, content_hash],
122 )?;
123 Ok(())
124}
125
126#[allow(clippy::too_many_arguments)]
127pub(crate) fn retarget_track_in(
128 conn: &rusqlite::Connection,
129 id: i64,
130 new_backing_path: &str,
131 backing_size: u64,
132 backing_mtime_ns: i64,
133 backing_ctime_ns: i64,
134 audio_offset: u64,
135 audio_length: u64,
136 fingerprint: Option<&str>,
137 content_hash: Option<&str>,
138) -> Result<()> {
139 conn.execute(
140 "UPDATE tracks SET
141 backing_path = ?2,
142 backing_size = ?3,
143 backing_mtime_ns = ?4,
144 backing_ctime_ns = ?5,
145 audio_offset = ?6,
146 audio_length = ?7,
147 fingerprint = COALESCE(?8, fingerprint),
148 content_hash = COALESCE(?9, content_hash),
149 updated_at = CAST(strftime('%s','now') AS INTEGER)
150 WHERE id = ?1",
151 params![
152 id,
153 new_backing_path,
154 backing_size,
155 backing_mtime_ns,
156 backing_ctime_ns,
157 audio_offset,
158 audio_length,
159 fingerprint,
160 content_hash,
161 ],
162 )?;
163 Ok(())
164}
165
166#[derive(Debug, Default, PartialEq, Eq)]
170pub struct ChangelogRead {
171 pub changed_ids: Vec<i64>,
172 pub min_seq: i64,
173 pub max_seq: i64,
174}
175
176impl<M> Db<M> {
177 pub fn get_track(&self, id: i64) -> Result<Option<Track>> {
178 self.query_optional_track(track_select!("WHERE id = ?1"), params![id])
179 }
180
181 pub fn get_track_by_path(&self, path: &str) -> Result<Option<Track>> {
182 get_track_by_path_in(&self.conn, path)
183 }
184
185 pub fn list_tracks(&self) -> Result<Vec<Track>> {
186 let mut stmt = self.conn.prepare_cached(track_select!("ORDER BY id"))?;
187 let rows = stmt.query_map([], row_to_track)?;
188 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
189 }
190
191 pub fn track_content_version(&self, id: i64) -> Result<i64> {
192 Ok(self.conn.query_row(
193 "SELECT content_version FROM tracks WHERE id = ?1",
194 params![id],
195 |r| r.get(0),
196 )?)
197 }
198
199 pub fn track_version_and_path(&self, id: i64) -> Result<Option<(i64, String)>> {
204 crate::query_optional(
205 &self.conn,
206 "SELECT content_version, backing_path FROM tracks WHERE id = ?1",
207 params![id],
208 |r| Ok((r.get(0)?, r.get(1)?)),
209 )
210 }
211
212 pub fn begin_read(&self) -> Result<()> {
216 if !self.conn.is_autocommit() {
224 log::warn!(
225 "begin_read found a leaked read transaction on this connection; \
226 rolling it back (a prior end_read likely failed to release the snapshot)"
227 );
228 self.conn.execute_batch("ROLLBACK")?;
229 }
230 self.conn.execute_batch("BEGIN DEFERRED")?;
231 Ok(())
232 }
233
234 pub fn end_read(&self) -> Result<()> {
236 self.conn.execute_batch("ROLLBACK")?;
237 Ok(())
238 }
239
240 fn query_optional_track(&self, sql: &str, p: impl rusqlite::Params) -> Result<Option<Track>> {
241 crate::query_optional(&self.conn, sql, p, |r| Ok(row_to_track(r)?))
242 }
243
244 pub fn list_render_keys(&self) -> Result<Vec<(i64, i64, Format)>> {
248 let mut stmt = self
249 .conn
250 .prepare("SELECT id, content_version, format FROM tracks ORDER BY id")?;
251 let rows = stmt.query_map([], |r| {
252 let fmt: String = r.get(2)?;
253 Ok((
254 r.get::<_, i64>(0)?,
255 r.get::<_, i64>(1)?,
256 parse_format_col(&fmt)?,
257 ))
258 })?;
259 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
260 }
261
262 pub fn changelog_since(&self, last_seq: i64) -> Result<ChangelogRead> {
266 let tx = self.conn.unchecked_transaction()?;
271 let (min_seq, max_seq): (i64, i64) = tx.query_row(
272 "SELECT COALESCE(MIN(seq),0), COALESCE(MAX(seq),0) FROM track_changes",
273 [],
274 |r| Ok((r.get(0)?, r.get(1)?)),
275 )?;
276 let changed_ids = {
277 let mut stmt = tx.prepare(
278 "SELECT DISTINCT track_id FROM track_changes WHERE seq > ?1 ORDER BY track_id",
279 )?;
280 stmt.query_map([last_seq], |r| r.get(0))?
281 .collect::<rusqlite::Result<Vec<i64>>>()?
282 };
283 tx.commit()?;
284 Ok(ChangelogRead {
285 changed_ids,
286 min_seq,
287 max_seq,
288 })
289 }
290
291 pub fn render_keys_for(&self, ids: &[i64]) -> Result<Vec<(i64, i64, Format)>> {
294 let mut out = Vec::with_capacity(ids.len());
295 crate::query_in_chunks(
296 &self.conn,
297 ids,
298 |ph| {
299 format!(
300 "SELECT id, content_version, format FROM tracks \
301 WHERE id IN ({ph}) ORDER BY id"
302 )
303 },
304 |rows| {
305 while let Some(r) = rows.next()? {
306 let fmt: String = r.get(2)?;
307 out.push((
308 r.get::<_, i64>(0)?,
309 r.get::<_, i64>(1)?,
310 parse_format_col(&fmt)?,
311 ));
312 }
313 Ok(())
314 },
315 )?;
316 Ok(out)
317 }
318}
319
320impl Db<ReadWrite> {
321 pub fn upsert_track(&self, t: &NewTrack) -> Result<i64> {
322 upsert_track_in(&self.conn, t)
323 }
324
325 pub fn delete_track(&self, id: i64) -> Result<()> {
328 self.conn
329 .execute("DELETE FROM tracks WHERE id = ?1", params![id])?;
330 Ok(())
331 }
332
333 pub fn tracks_by_fingerprint(&self, fp: &str) -> Result<Vec<Track>> {
336 tracks_by_fingerprint_in(&self.conn, fp)
337 }
338
339 pub fn set_track_checksums(
343 &self,
344 id: i64,
345 fingerprint: Option<&str>,
346 content_hash: Option<&str>,
347 ) -> Result<()> {
348 set_track_checksums_in(&self.conn, id, fingerprint, content_hash)
349 }
350
351 #[allow(clippy::too_many_arguments)]
358 pub fn retarget_track(
359 &self,
360 id: i64,
361 new_backing_path: &str,
362 backing_size: u64,
363 backing_mtime_ns: i64,
364 backing_ctime_ns: i64,
365 audio_offset: u64,
366 audio_length: u64,
367 fingerprint: Option<&str>,
368 content_hash: Option<&str>,
369 ) -> Result<()> {
370 retarget_track_in(
371 &self.conn,
372 id,
373 new_backing_path,
374 backing_size,
375 backing_mtime_ns,
376 backing_ctime_ns,
377 audio_offset,
378 audio_length,
379 fingerprint,
380 content_hash,
381 )
382 }
383
384 #[doc(hidden)]
390 pub fn set_format_for_test(&self, id: i64, fmt: Format) -> Result<()> {
391 self.conn.execute(
392 "UPDATE tracks SET format = ?1, updated_at = CAST(strftime('%s','now') AS INTEGER) WHERE id = ?2",
393 params![fmt.as_str(), id],
394 )?;
395 Ok(())
396 }
397
398 #[doc(hidden)]
402 pub fn delete_changelog_through_for_test(&self, seq: i64) -> Result<()> {
403 self.conn
404 .execute("DELETE FROM track_changes WHERE seq <= ?1", [seq])?;
405 Ok(())
406 }
407}
408
409#[cfg(test)]
410mod negative_audio_bounds_tests {
411 use crate::{Db, Format, NewTrack};
412
413 #[test]
414 fn negative_audio_bounds_error_at_row_read() {
415 let db = Db::open_in_memory().unwrap();
416 let id = db
417 .upsert_track(&NewTrack {
418 backing_path: "/x.flac".into(),
419 format: Format::Flac,
420 audio_offset: 0,
421 audio_length: 1,
422 backing_size: 1,
423 backing_mtime_ns: 0,
424 backing_ctime_ns: 0,
425 })
426 .unwrap();
427 db.conn
432 .pragma_update(None, "ignore_check_constraints", true)
433 .unwrap();
434 db.conn
435 .execute("UPDATE tracks SET audio_offset = -1 WHERE id = ?1", [id])
436 .unwrap();
437 db.conn
438 .pragma_update(None, "ignore_check_constraints", false)
439 .unwrap();
440 assert!(
441 db.get_track(id).is_err(),
442 "negative audio_offset must fail row-read, not wrap"
443 );
444 }
445
446 #[test]
447 fn out_of_range_bounds_error_at_row_read() {
448 let db = Db::open_in_memory().unwrap();
449 let id = db
450 .upsert_track(&NewTrack {
451 backing_path: "/x.flac".into(),
452 format: Format::Flac,
453 audio_offset: 0,
454 audio_length: 1,
455 backing_size: 1,
456 backing_mtime_ns: 0,
457 backing_ctime_ns: 0,
458 })
459 .unwrap();
460 db.conn
463 .pragma_update(None, "ignore_check_constraints", true)
464 .unwrap();
465 db.conn
466 .execute("UPDATE tracks SET audio_length = 5 WHERE id = ?1", [id])
467 .unwrap();
468 db.conn
469 .pragma_update(None, "ignore_check_constraints", false)
470 .unwrap();
471 assert!(
472 db.get_track(id).is_err(),
473 "audio_offset + audio_length > backing_size must fail row-read"
474 );
475 }
476}
477
478#[cfg(test)]
479mod render_key_tests {
480 use super::*;
481 use crate::{Format, NewTrack, Tag};
482
483 fn open_mem() -> Db {
484 Db::open_in_memory().unwrap()
485 }
486
487 fn new_track(path: &str, fmt: Format) -> NewTrack {
488 NewTrack {
489 backing_path: path.to_string(),
490 format: fmt,
491 audio_offset: 0,
492 audio_length: 1,
493 backing_size: 1,
494 backing_mtime_ns: 0,
495 backing_ctime_ns: 0,
496 }
497 }
498
499 #[test]
500 fn list_render_keys_returns_id_version_format_sorted_by_id() {
501 let db = open_mem();
502 let a = db
503 .upsert_track(&new_track("/a.flac", Format::Flac))
504 .unwrap();
505 let b = db.upsert_track(&new_track("/b.mp3", Format::Mp3)).unwrap();
506 db.replace_tags(a, &[Tag::new("TITLE", "x", 0)]).unwrap();
508
509 let keys = db.list_render_keys().unwrap();
510 assert_eq!(keys.len(), 2);
511 assert_eq!(keys[0].0, a);
512 assert_eq!(keys[1].0, b);
513 assert!(keys[0].1 >= 1, "a content_version should have risen");
514 assert_eq!(keys[1].1, 0, "b content_version untouched");
515 assert_eq!(keys[0].2, Format::Flac);
516 assert_eq!(keys[1].2, Format::Mp3);
517 }
518
519 #[test]
520 fn set_format_for_test_persists_the_new_format() {
521 let db = open_mem();
522 let id = db
523 .upsert_track(&new_track("/a.flac", Format::Flac))
524 .unwrap();
525 db.set_format_for_test(id, Format::Mp3).unwrap();
526 let keys = db.list_render_keys().unwrap();
527 assert_eq!(keys[0].0, id);
528 assert_eq!(
529 keys[0].2,
530 Format::Mp3,
531 "set_format_for_test must actually UPDATE the format column"
532 );
533 }
534
535 #[test]
541 fn begin_read_pins_a_single_wal_snapshot_against_external_writes() {
542 let dir = tempfile::tempdir().unwrap();
543 let path = dir.path().join("m.db");
544 let writer = Db::open(&path).unwrap();
545 let id = writer
546 .upsert_track(&new_track("/a.mp3", Format::Mp3))
547 .unwrap();
548 assert_eq!(writer.track_content_version(id).unwrap(), 0);
549
550 let reader = Db::open(&path).unwrap();
552 assert_eq!(reader.track_content_version(id).unwrap(), 0);
553
554 reader.begin_read().unwrap();
555 assert_eq!(reader.track_content_version(id).unwrap(), 0);
557
558 writer
560 .replace_tags(id, &[Tag::new("artist", "Alice", 0)])
561 .unwrap();
562 assert_eq!(
563 reader.track_content_version(id).unwrap(),
564 0,
565 "snapshot must pin to the pre-write content_version"
566 );
567 assert_eq!(writer.track_content_version(id).unwrap(), 1);
569
570 reader.end_read().unwrap();
571 assert_eq!(reader.track_content_version(id).unwrap(), 1);
573 }
574
575 #[test]
581 fn begin_read_self_heals_a_leaked_prior_snapshot() {
582 let db = open_mem();
583 db.begin_read().unwrap();
584 assert!(
586 db.begin_read().is_ok(),
587 "a leaked read snapshot must self-heal, not surface an opaque error"
588 );
589 db.end_read().unwrap();
590 }
591}
592
593#[cfg(test)]
594mod checksum_tests {
595 use crate::{Db, NewTrack, models::Format};
596
597 fn new_track(path: &str) -> NewTrack {
598 NewTrack {
599 backing_path: path.to_string(),
600 format: Format::Flac,
601 audio_offset: 0,
602 audio_length: 10,
603 backing_size: 10,
604 backing_mtime_ns: 0,
605 backing_ctime_ns: 0,
606 }
607 }
608
609 #[test]
610 fn set_and_read_back_checksums() {
611 let db = Db::open_in_memory().unwrap();
612 let id = db.upsert_track(&new_track("/a.flac")).unwrap();
613 db.set_track_checksums(id, Some(&"a".repeat(64)), Some(&"d".repeat(64)))
614 .unwrap();
615 let t = db.get_track(id).unwrap().unwrap();
616 assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..]));
617 assert_eq!(t.content_hash.as_deref(), Some(&"d".repeat(64)[..]));
618 }
619
620 #[test]
621 fn set_checksums_none_does_not_clobber_existing() {
622 let db = Db::open_in_memory().unwrap();
623 let id = db.upsert_track(&new_track("/a.flac")).unwrap();
624 db.set_track_checksums(id, Some(&"a".repeat(64)), Some(&"d".repeat(64)))
625 .unwrap();
626 db.set_track_checksums(id, None, None).unwrap();
628 let t = db.get_track(id).unwrap().unwrap();
629 assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..]));
630 assert_eq!(t.content_hash.as_deref(), Some(&"d".repeat(64)[..]));
631 }
632
633 #[test]
634 fn tracks_by_fingerprint_returns_matches() {
635 let db = Db::open_in_memory().unwrap();
636 let a = db.upsert_track(&new_track("/a.flac")).unwrap();
637 let b = db.upsert_track(&new_track("/b.flac")).unwrap();
638 db.set_track_checksums(a, Some(&"b".repeat(64)), None)
639 .unwrap();
640 db.set_track_checksums(b, Some(&"b".repeat(64)), None)
641 .unwrap();
642 db.upsert_track(&new_track("/c.flac")).unwrap(); let mut ids: Vec<i64> = db
644 .tracks_by_fingerprint(&"b".repeat(64))
645 .unwrap()
646 .into_iter()
647 .map(|t| t.id)
648 .collect();
649 ids.sort_unstable();
650 assert_eq!(ids, vec![a, b]);
651 assert!(
652 db.tracks_by_fingerprint(&"c".repeat(64))
653 .unwrap()
654 .is_empty()
655 );
656 }
657
658 #[test]
659 fn retarget_updates_path_stamp_and_bounds_keeping_id() {
660 let db = Db::open_in_memory().unwrap();
661 let id = db.upsert_track(&new_track("/old.flac")).unwrap();
662 db.set_track_checksums(id, Some(&"a".repeat(64)), None)
663 .unwrap();
664 db.retarget_track(
665 id,
666 "/new.flac",
667 99,
668 1234,
669 5678,
670 42,
671 50,
672 None,
673 Some(&"e".repeat(64)),
674 )
675 .unwrap();
676 let t = db.get_track(id).unwrap().unwrap();
677 assert_eq!(t.id, id);
678 assert_eq!(t.backing_path, "/new.flac");
679 assert_eq!(t.backing_size, 99);
680 assert_eq!(t.backing_mtime_ns, 1234);
681 assert_eq!(t.backing_ctime_ns, 5678);
682 assert_eq!(t.bounds.audio_offset(), 42);
683 assert_eq!(t.bounds.audio_length(), 50);
684 assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..])); assert_eq!(t.content_hash.as_deref(), Some(&"e".repeat(64)[..]));
686 assert!(db.get_track_by_path("/old.flac").unwrap().is_none());
687 }
688
689 #[test]
693 fn bulk_writer_reads_back_fingerprint_and_path() {
694 let db = Db::open_in_memory().unwrap();
695 let fp = "f".repeat(64);
696 let mut bw = db.bulk_writer().unwrap();
697 let id = bw.upsert_track(&new_track("/x.flac")).unwrap();
698 bw.set_track_checksums(id, Some(&fp), None).unwrap();
699
700 let by_fp = bw.tracks_by_fingerprint(&fp).unwrap();
701 assert_eq!(by_fp.len(), 1, "fingerprint match must be returned");
702 assert_eq!(by_fp[0].id, id);
703
704 let by_path = bw.get_track_by_path("/x.flac").unwrap();
705 assert_eq!(by_path.map(|t| t.id), Some(id), "path lookup must hit");
706
707 bw.commit().unwrap();
708 }
709
710 #[test]
711 fn bulk_writer_retarget_and_checksums_match_db() {
712 let db = Db::open_in_memory().unwrap();
713 let id = {
714 let mut bw = db.bulk_writer().unwrap();
715 let id = bw.upsert_track(&new_track("/old.flac")).unwrap();
716 bw.set_track_checksums(id, Some(&"a".repeat(64)), None)
717 .unwrap();
718 bw.retarget_track(id, "/new.flac", 10, 1, 2, 0, 10, None, None)
719 .unwrap();
720 bw.commit().unwrap();
721 id
722 };
723 let t = db.get_track(id).unwrap().unwrap();
724 assert_eq!(t.backing_path, "/new.flac");
725 assert_eq!(t.fingerprint.as_deref(), Some(&"a".repeat(64)[..]));
726 }
727}