Skip to main content

musefs_db/
bulk.rs

1use crate::art::sha256_hex;
2use crate::models::{BinaryTag, NewArt, NewTrack, StructuralBlock, Tag, TrackArt};
3use crate::{Db, ReadWrite, Result};
4use rusqlite::{Transaction, params};
5
6impl Db<ReadWrite> {
7    /// Apply the bulk-write pragmas to an open connection. WAL is left untouched
8    /// (retained from `open`), so concurrent mount readers keep working. Safe on
9    /// in-memory DBs. Intended for a scan-scoped `Db` the caller drops at scan end.
10    pub(crate) fn apply_bulk_pragmas(conn: &rusqlite::Connection) -> Result<()> {
11        conn.pragma_update(None, "synchronous", "NORMAL")?;
12        conn.pragma_update(None, "cache_size", -65536)?; // 64 MiB
13        conn.pragma_update(None, "temp_store", "MEMORY")?;
14        Ok(())
15    }
16
17    /// Apply bulk pragmas to this DB's own connection.
18    pub fn apply_bulk_pragmas_self(&self) -> Result<()> {
19        Self::apply_bulk_pragmas(&self.conn)
20    }
21
22    /// Begin a batch transaction. All writes go through the returned handle and
23    /// land atomically on `commit()`.
24    pub fn bulk_writer(&self) -> Result<BulkWriter<'_>> {
25        Ok(BulkWriter {
26            tx: self.conn.unchecked_transaction()?,
27        })
28    }
29}
30
31/// A batch of track writes held in one transaction. Mirrors `Db::upsert_track` /
32/// `replace_tags` / `upsert_art` / `set_track_art`, but executes on a single
33/// caller-held transaction so a whole batch commits with one fsync.
34pub struct BulkWriter<'c> {
35    tx: Transaction<'c>,
36}
37
38impl BulkWriter<'_> {
39    pub fn upsert_track(&mut self, t: &NewTrack) -> Result<i64> {
40        self.tx.execute(
41            "INSERT INTO tracks
42                (backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
43             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
44             ON CONFLICT(backing_path) DO UPDATE SET
45                format=excluded.format, audio_offset=excluded.audio_offset,
46                audio_length=excluded.audio_length, backing_size=excluded.backing_size,
47                backing_mtime_ns=excluded.backing_mtime_ns,
48                backing_ctime_ns=excluded.backing_ctime_ns,
49                updated_at=CAST(strftime('%s','now') AS INTEGER)",
50            params![t.backing_path, t.format.as_str(), t.audio_offset, t.audio_length, t.backing_size, t.backing_mtime_ns, t.backing_ctime_ns],
51        )?;
52        Ok(self.tx.query_row(
53            "SELECT id FROM tracks WHERE backing_path = ?1",
54            params![t.backing_path],
55            |r| r.get(0),
56        )?)
57    }
58
59    pub fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> Result<()> {
60        self.tx.execute(
61            "DELETE FROM tags WHERE track_id = ?1 AND value_blob IS NULL",
62            params![track_id],
63        )?;
64        let mut stmt = self.tx.prepare_cached(
65            "INSERT INTO tags (track_id, key, value, ordinal) VALUES (?1, ?2, ?3, ?4)",
66        )?;
67        for t in tags {
68            stmt.execute(params![track_id, t.key, t.value, t.ordinal])?;
69        }
70        Ok(())
71    }
72
73    pub fn set_binary_tags(&mut self, track_id: i64, tags: &[BinaryTag]) -> Result<()> {
74        self.tx.execute(
75            "DELETE FROM tags WHERE track_id = ?1 AND value_blob IS NOT NULL",
76            params![track_id],
77        )?;
78        let mut stmt = self.tx.prepare(
79            "INSERT INTO tags (track_id, key, value, value_blob, ordinal) \
80             VALUES (?1, ?2, '', ?3, ?4)",
81        )?;
82        for t in tags {
83            stmt.execute(params![track_id, t.key, t.payload, t.ordinal])?;
84        }
85        Ok(())
86    }
87
88    pub fn set_structural_blocks(
89        &mut self,
90        track_id: i64,
91        blocks: &[StructuralBlock],
92    ) -> Result<()> {
93        self.tx.execute(
94            "DELETE FROM structural_blocks WHERE track_id = ?1",
95            params![track_id],
96        )?;
97        let mut stmt = self.tx.prepare(
98            "INSERT INTO structural_blocks (track_id, kind, ordinal, body) \
99             VALUES (?1, ?2, ?3, ?4)",
100        )?;
101        for b in blocks {
102            stmt.execute(params![track_id, b.kind, b.ordinal, b.body])?;
103        }
104        Ok(())
105    }
106
107    pub fn upsert_art(&mut self, a: &NewArt) -> Result<i64> {
108        let sha = sha256_hex(&a.data);
109        self.tx.execute(
110            "INSERT INTO art (sha256, mime, width, height, byte_len, data)
111             VALUES (?1, ?2, ?3, ?4, ?5, ?6) ON CONFLICT(sha256) DO NOTHING",
112            params![sha, a.mime, a.width, a.height, a.data.len() as u64, a.data],
113        )?;
114        Ok(self
115            .tx
116            .query_row("SELECT id FROM art WHERE sha256 = ?1", params![sha], |r| {
117                r.get(0)
118            })?)
119    }
120
121    pub fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> Result<()> {
122        self.tx.execute(
123            "DELETE FROM track_art WHERE track_id = ?1",
124            params![track_id],
125        )?;
126        let mut stmt = self.tx.prepare_cached(
127            "INSERT INTO track_art (track_id, art_id, picture_type, description, ordinal)
128             VALUES (?1, ?2, ?3, ?4, ?5)",
129        )?;
130        for it in items {
131            stmt.execute(params![
132                track_id,
133                it.art_id,
134                it.picture_type,
135                it.description,
136                it.ordinal
137            ])?;
138        }
139        Ok(())
140    }
141
142    pub fn commit(self) -> Result<()> {
143        self.tx.commit()?;
144        Ok(())
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use crate::Db;
151    use crate::models::{Format, NewArt, NewTrack, Tag, TrackArt};
152
153    #[test]
154    fn bulk_writer_persists_a_batch_in_one_commit() {
155        let db = Db::open_in_memory().unwrap();
156        {
157            let mut bw = db.bulk_writer().unwrap();
158            for i in 0..3 {
159                let id = bw
160                    .upsert_track(&NewTrack {
161                        backing_path: format!("/m/{i}.flac"),
162                        format: Format::Flac,
163                        audio_offset: 100,
164                        audio_length: 200,
165                        backing_size: 300,
166                        backing_mtime_ns: 1,
167                        backing_ctime_ns: 0,
168                    })
169                    .unwrap();
170                bw.replace_tags(id, &[Tag::new("title", &format!("t{i}"), 0)])
171                    .unwrap();
172                let art_id = bw
173                    .upsert_art(&NewArt {
174                        mime: "image/png".into(),
175                        width: None,
176                        height: None,
177                        data: vec![1, 2, 3, 4],
178                    })
179                    .unwrap();
180                bw.set_track_art(
181                    id,
182                    &[TrackArt {
183                        art_id,
184                        picture_type: 3,
185                        description: String::new(),
186                        ordinal: 0,
187                    }],
188                )
189                .unwrap();
190            }
191            bw.commit().unwrap();
192        }
193        assert_eq!(db.list_tracks().unwrap().len(), 3);
194        // Dedup: identical art blob stored once.
195        let count: i64 = db
196            .conn
197            .query_row("SELECT COUNT(*) FROM art", [], |r| r.get(0))
198            .unwrap();
199        assert_eq!(count, 1);
200        // replace_tags actually persisted one tag per track (kills no-op replace_tags).
201        let tag_count: i64 = db
202            .conn
203            .query_row("SELECT COUNT(*) FROM tags", [], |r| r.get(0))
204            .unwrap();
205        assert_eq!(tag_count, 3);
206        let title0: String = db
207            .conn
208            .query_row(
209                "SELECT value FROM tags WHERE key = 'title' ORDER BY value LIMIT 1",
210                [],
211                |r| r.get(0),
212            )
213            .unwrap();
214        assert_eq!(title0, "t0");
215        // set_track_art actually persisted one link per track (kills no-op set_track_art).
216        let track_art_count: i64 = db
217            .conn
218            .query_row("SELECT COUNT(*) FROM track_art", [], |r| r.get(0))
219            .unwrap();
220        assert_eq!(track_art_count, 3);
221    }
222
223    #[test]
224    fn sha256_hex_matches_known_digest() {
225        // NIST sample vector: sha256("abc").
226        assert_eq!(
227            super::sha256_hex(b"abc"),
228            "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
229        );
230    }
231
232    #[test]
233    fn apply_bulk_pragmas_self_sets_non_default_pragmas() {
234        let db = Db::open_in_memory().unwrap();
235        db.apply_bulk_pragmas_self().unwrap();
236        // synchronous NORMAL == 1 (default for in-memory is FULL == 2).
237        let synchronous: i64 = db
238            .conn
239            .pragma_query_value(None, "synchronous", |r| r.get(0))
240            .unwrap();
241        assert_eq!(synchronous, 1);
242        // cache_size == -65536 (negative => KiB; sign matters, default is -2000).
243        let cache_size: i64 = db
244            .conn
245            .pragma_query_value(None, "cache_size", |r| r.get(0))
246            .unwrap();
247        assert_eq!(cache_size, -65536);
248        // temp_store MEMORY == 2 (default is 0).
249        let temp_store: i64 = db
250            .conn
251            .pragma_query_value(None, "temp_store", |r| r.get(0))
252            .unwrap();
253        assert_eq!(temp_store, 2);
254    }
255
256    #[test]
257    fn bulk_replace_tags_preserves_binary_rows() {
258        let db = Db::open_in_memory().unwrap();
259        let tid = db
260            .upsert_track(&crate::NewTrack {
261                backing_path: "/a.mp3".into(),
262                format: crate::Format::Mp3,
263                audio_offset: 0,
264                audio_length: 0,
265                backing_size: 0,
266                backing_mtime_ns: 0,
267                backing_ctime_ns: 0,
268            })
269            .unwrap();
270        db.set_binary_tags(
271            tid,
272            &[crate::BinaryTag {
273                key: "PRIV".into(),
274                payload: vec![1, 2, 3],
275                ordinal: 0,
276            }],
277        )
278        .unwrap();
279
280        {
281            let mut bw = db.bulk_writer().unwrap();
282            bw.replace_tags(tid, &[crate::Tag::new("artist", "A", 0)])
283                .unwrap();
284            bw.commit().unwrap();
285        }
286
287        assert_eq!(
288            db.get_binary_tags(tid).unwrap().len(),
289            1,
290            "bulk replace_tags wiped binary rows"
291        );
292        assert_eq!(
293            db.get_tags(tid).unwrap(),
294            vec![crate::Tag::new("artist", "A", 0)]
295        );
296    }
297
298    #[test]
299    fn bulk_set_binary_tags_round_trips_and_scopes_to_binary_rows() {
300        let db = Db::open_in_memory().unwrap();
301        let tid = db
302            .upsert_track(&crate::NewTrack {
303                backing_path: "/a.mp3".into(),
304                format: crate::Format::Mp3,
305                audio_offset: 0,
306                audio_length: 0,
307                backing_size: 0,
308                backing_mtime_ns: 0,
309                backing_ctime_ns: 0,
310            })
311            .unwrap();
312        {
313            let mut bw = db.bulk_writer().unwrap();
314            bw.replace_tags(tid, &[crate::Tag::new("artist", "A", 0)])
315                .unwrap();
316            bw.set_binary_tags(
317                tid,
318                &[crate::BinaryTag {
319                    key: "PRIV".into(),
320                    payload: vec![7, 7, 7],
321                    ordinal: 0,
322                }],
323            )
324            .unwrap();
325            bw.commit().unwrap();
326        }
327        let rows = db.get_binary_tags(tid).unwrap();
328        assert_eq!(rows.len(), 1);
329        assert_eq!(rows[0].key, "PRIV");
330        assert_eq!(rows[0].byte_len, 3);
331        assert_eq!(
332            db.get_tags(tid).unwrap(),
333            vec![crate::Tag::new("artist", "A", 0)]
334        );
335    }
336
337    #[test]
338    fn bulk_set_structural_blocks_round_trips() {
339        use crate::StructuralBlock;
340        let db = Db::open_in_memory().unwrap();
341        let id = {
342            let mut bw = db.bulk_writer().unwrap();
343            let id = bw
344                .upsert_track(&NewTrack {
345                    backing_path: "/a.flac".into(),
346                    format: Format::Flac,
347                    audio_offset: 0,
348                    audio_length: 1,
349                    backing_size: 1,
350                    backing_mtime_ns: 0,
351                    backing_ctime_ns: 0,
352                })
353                .unwrap();
354            bw.set_structural_blocks(
355                id,
356                &[
357                    StructuralBlock {
358                        kind: "STREAMINFO".into(),
359                        ordinal: 0,
360                        body: vec![1, 2],
361                    },
362                    StructuralBlock {
363                        kind: "SEEKTABLE".into(),
364                        ordinal: 0,
365                        body: vec![3],
366                    },
367                ],
368            )
369            .unwrap();
370            bw.commit().unwrap();
371            id
372        };
373        let got = db.get_structural_blocks(id).unwrap();
374        assert_eq!(got.len(), 2);
375        // get_structural_blocks orders by kind: SEEKTABLE before STREAMINFO.
376        assert_eq!(got[0].kind, "SEEKTABLE");
377        assert_eq!(got[1].body, vec![1, 2]);
378    }
379
380    #[test]
381    fn bulk_writer_dropped_without_commit_rolls_back() {
382        let db = Db::open_in_memory().unwrap();
383        {
384            let mut bw = db.bulk_writer().unwrap();
385            bw.upsert_track(&NewTrack {
386                backing_path: "/m/ghost.flac".into(),
387                format: Format::Flac,
388                audio_offset: 0,
389                audio_length: 0,
390                backing_size: 0,
391                backing_mtime_ns: 0,
392                backing_ctime_ns: 0,
393            })
394            .unwrap();
395            // Dropped here without `commit()` → Transaction rolls back.
396        }
397        assert_eq!(db.list_tracks().unwrap().len(), 0);
398    }
399}