Skip to main content

musefs_db/
bulk.rs

1use crate::art::{set_track_art_in, upsert_art_in};
2use crate::models::{BinaryTag, NewArt, NewTrack, StructuralBlock, Tag, Track, TrackArt};
3use crate::structural::set_structural_blocks_in;
4use crate::tags::{replace_tags_in, set_binary_tags_in};
5use crate::tracks::{
6    get_track_by_path_in, retarget_track_in, set_track_checksums_in, tracks_by_fingerprint_in,
7    upsert_track_in,
8};
9use crate::{Db, ReadWrite, Result};
10use rusqlite::Transaction;
11
12impl Db<ReadWrite> {
13    /// Apply the bulk-write pragmas to an open connection. WAL is left untouched
14    /// (retained from `open`), so concurrent mount readers keep working. Safe on
15    /// in-memory DBs. Intended for a scan-scoped `Db` the caller drops at scan end.
16    pub(crate) fn apply_bulk_pragmas(conn: &rusqlite::Connection) -> Result<()> {
17        conn.pragma_update(None, "synchronous", "NORMAL")?;
18        conn.pragma_update(None, "cache_size", -65536)?; // 64 MiB
19        conn.pragma_update(None, "temp_store", "MEMORY")?;
20        Ok(())
21    }
22
23    /// Apply bulk pragmas to this DB's own connection.
24    pub fn apply_bulk_pragmas_self(&self) -> Result<()> {
25        Self::apply_bulk_pragmas(&self.conn)
26    }
27
28    /// Begin a batch transaction. All writes go through the returned handle and
29    /// land atomically on `commit()`.
30    pub fn bulk_writer(&self) -> Result<BulkWriter<'_>> {
31        Ok(BulkWriter {
32            tx: self.conn.unchecked_transaction()?,
33        })
34    }
35}
36
37/// A batch of track writes held in one transaction. Each method delegates to the
38/// shared per-row writer helper (`upsert_track_in` / `replace_tags_in` /
39/// `set_binary_tags_in` / `set_structural_blocks_in` / `upsert_art_in` /
40/// `set_track_art_in`) that also backs the `Db<ReadWrite>` writers, but runs it
41/// on a single caller-held transaction so a whole batch commits with one fsync.
42pub struct BulkWriter<'c> {
43    tx: Transaction<'c>,
44}
45
46impl BulkWriter<'_> {
47    pub fn upsert_track(&mut self, t: &NewTrack) -> Result<i64> {
48        upsert_track_in(&self.tx, t)
49    }
50
51    pub fn tracks_by_fingerprint(&self, fp: &str) -> Result<Vec<Track>> {
52        tracks_by_fingerprint_in(&self.tx, fp)
53    }
54
55    pub fn set_track_checksums(
56        &self,
57        id: i64,
58        fingerprint: Option<&str>,
59        content_hash: Option<&str>,
60    ) -> Result<()> {
61        set_track_checksums_in(&self.tx, id, fingerprint, content_hash)
62    }
63
64    #[allow(clippy::too_many_arguments)]
65    pub fn retarget_track(
66        &self,
67        id: i64,
68        new_backing_path: &str,
69        backing_size: u64,
70        backing_mtime_ns: i64,
71        backing_ctime_ns: i64,
72        audio_offset: u64,
73        audio_length: u64,
74        fingerprint: Option<&str>,
75        content_hash: Option<&str>,
76    ) -> Result<()> {
77        retarget_track_in(
78            &self.tx,
79            id,
80            new_backing_path,
81            backing_size,
82            backing_mtime_ns,
83            backing_ctime_ns,
84            audio_offset,
85            audio_length,
86            fingerprint,
87            content_hash,
88        )
89    }
90
91    pub fn get_track_by_path(&self, path: &str) -> Result<Option<Track>> {
92        get_track_by_path_in(&self.tx, path)
93    }
94
95    pub fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> Result<()> {
96        replace_tags_in(&self.tx, track_id, tags)
97    }
98
99    pub fn set_binary_tags(&mut self, track_id: i64, tags: &[BinaryTag]) -> Result<()> {
100        set_binary_tags_in(&self.tx, track_id, tags)
101    }
102
103    pub fn set_structural_blocks(
104        &mut self,
105        track_id: i64,
106        blocks: &[StructuralBlock],
107    ) -> Result<()> {
108        set_structural_blocks_in(&self.tx, track_id, blocks)
109    }
110
111    pub fn upsert_art(&mut self, a: &NewArt) -> Result<i64> {
112        upsert_art_in(&self.tx, a)
113    }
114
115    pub fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> Result<()> {
116        set_track_art_in(&self.tx, track_id, items)
117    }
118
119    pub fn commit(self) -> Result<()> {
120        self.tx.commit()?;
121        Ok(())
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use crate::Db;
128    use crate::models::{Format, NewArt, NewTrack, Tag, TrackArt};
129
130    #[test]
131    fn bulk_writer_persists_a_batch_in_one_commit() {
132        let db = Db::open_in_memory().unwrap();
133        {
134            let mut bw = db.bulk_writer().unwrap();
135            for i in 0..3 {
136                let id = bw
137                    .upsert_track(&NewTrack {
138                        backing_path: format!("/m/{i}.flac"),
139                        format: Format::Flac,
140                        audio_offset: 100,
141                        audio_length: 200,
142                        backing_size: 300,
143                        backing_mtime_ns: 1,
144                        backing_ctime_ns: 0,
145                    })
146                    .unwrap();
147                bw.replace_tags(id, &[Tag::new("title", &format!("t{i}"), 0)])
148                    .unwrap();
149                let art_id = bw
150                    .upsert_art(&NewArt {
151                        mime: "image/png".into(),
152                        width: None,
153                        height: None,
154                        data: vec![1, 2, 3, 4],
155                    })
156                    .unwrap();
157                bw.set_track_art(
158                    id,
159                    &[TrackArt {
160                        art_id,
161                        picture_type: 3,
162                        description: String::new(),
163                        ordinal: 0,
164                    }],
165                )
166                .unwrap();
167            }
168            bw.commit().unwrap();
169        }
170        assert_eq!(db.list_tracks().unwrap().len(), 3);
171        // Dedup: identical art blob stored once.
172        let count: i64 = db
173            .conn
174            .query_row("SELECT COUNT(*) FROM art", [], |r| r.get(0))
175            .unwrap();
176        assert_eq!(count, 1);
177        // replace_tags actually persisted one tag per track (kills no-op replace_tags).
178        let tag_count: i64 = db
179            .conn
180            .query_row("SELECT COUNT(*) FROM tags", [], |r| r.get(0))
181            .unwrap();
182        assert_eq!(tag_count, 3);
183        let title0: String = db
184            .conn
185            .query_row(
186                "SELECT value FROM tags WHERE key = 'title' ORDER BY value LIMIT 1",
187                [],
188                |r| r.get(0),
189            )
190            .unwrap();
191        assert_eq!(title0, "t0");
192        // set_track_art actually persisted one link per track (kills no-op set_track_art).
193        let track_art_count: i64 = db
194            .conn
195            .query_row("SELECT COUNT(*) FROM track_art", [], |r| r.get(0))
196            .unwrap();
197        assert_eq!(track_art_count, 3);
198    }
199
200    #[test]
201    fn apply_bulk_pragmas_self_sets_non_default_pragmas() {
202        let db = Db::open_in_memory().unwrap();
203        db.apply_bulk_pragmas_self().unwrap();
204        // synchronous NORMAL == 1 (default for in-memory is FULL == 2).
205        let synchronous: i64 = db
206            .conn
207            .pragma_query_value(None, "synchronous", |r| r.get(0))
208            .unwrap();
209        assert_eq!(synchronous, 1);
210        // cache_size == -65536 (negative => KiB; sign matters, default is -2000).
211        let cache_size: i64 = db
212            .conn
213            .pragma_query_value(None, "cache_size", |r| r.get(0))
214            .unwrap();
215        assert_eq!(cache_size, -65536);
216        // temp_store MEMORY == 2 (default is 0).
217        let temp_store: i64 = db
218            .conn
219            .pragma_query_value(None, "temp_store", |r| r.get(0))
220            .unwrap();
221        assert_eq!(temp_store, 2);
222    }
223
224    #[test]
225    fn bulk_replace_tags_preserves_binary_rows() {
226        let db = Db::open_in_memory().unwrap();
227        let tid = db
228            .upsert_track(&crate::NewTrack {
229                backing_path: "/a.mp3".into(),
230                format: crate::Format::Mp3,
231                audio_offset: 0,
232                audio_length: 0,
233                backing_size: 0,
234                backing_mtime_ns: 0,
235                backing_ctime_ns: 0,
236            })
237            .unwrap();
238        db.set_binary_tags(
239            tid,
240            &[crate::BinaryTag {
241                key: "PRIV".into(),
242                payload: vec![1, 2, 3],
243                ordinal: 0,
244            }],
245        )
246        .unwrap();
247
248        {
249            let mut bw = db.bulk_writer().unwrap();
250            bw.replace_tags(tid, &[crate::Tag::new("artist", "A", 0)])
251                .unwrap();
252            bw.commit().unwrap();
253        }
254
255        assert_eq!(
256            db.get_binary_tags(tid).unwrap().len(),
257            1,
258            "bulk replace_tags wiped binary rows"
259        );
260        assert_eq!(
261            db.get_tags(tid).unwrap(),
262            vec![crate::Tag::new("artist", "A", 0)]
263        );
264    }
265
266    #[test]
267    fn bulk_set_binary_tags_round_trips_and_scopes_to_binary_rows() {
268        let db = Db::open_in_memory().unwrap();
269        let tid = db
270            .upsert_track(&crate::NewTrack {
271                backing_path: "/a.mp3".into(),
272                format: crate::Format::Mp3,
273                audio_offset: 0,
274                audio_length: 0,
275                backing_size: 0,
276                backing_mtime_ns: 0,
277                backing_ctime_ns: 0,
278            })
279            .unwrap();
280        {
281            let mut bw = db.bulk_writer().unwrap();
282            bw.replace_tags(tid, &[crate::Tag::new("artist", "A", 0)])
283                .unwrap();
284            bw.set_binary_tags(
285                tid,
286                &[crate::BinaryTag {
287                    key: "PRIV".into(),
288                    payload: vec![7, 7, 7],
289                    ordinal: 0,
290                }],
291            )
292            .unwrap();
293            bw.commit().unwrap();
294        }
295        let rows = db.get_binary_tags(tid).unwrap();
296        assert_eq!(rows.len(), 1);
297        assert_eq!(rows[0].key, "PRIV");
298        assert_eq!(rows[0].byte_len, 3);
299        assert_eq!(
300            db.get_tags(tid).unwrap(),
301            vec![crate::Tag::new("artist", "A", 0)]
302        );
303    }
304
305    #[test]
306    fn bulk_set_structural_blocks_round_trips() {
307        use crate::StructuralBlock;
308        let db = Db::open_in_memory().unwrap();
309        let id = {
310            let mut bw = db.bulk_writer().unwrap();
311            let id = bw
312                .upsert_track(&NewTrack {
313                    backing_path: "/a.flac".into(),
314                    format: Format::Flac,
315                    audio_offset: 0,
316                    audio_length: 1,
317                    backing_size: 1,
318                    backing_mtime_ns: 0,
319                    backing_ctime_ns: 0,
320                })
321                .unwrap();
322            bw.set_structural_blocks(
323                id,
324                &[
325                    StructuralBlock {
326                        kind: "STREAMINFO".into(),
327                        ordinal: 0,
328                        body: vec![1, 2],
329                    },
330                    StructuralBlock {
331                        kind: "SEEKTABLE".into(),
332                        ordinal: 0,
333                        body: vec![3],
334                    },
335                ],
336            )
337            .unwrap();
338            bw.commit().unwrap();
339            id
340        };
341        let got = db.get_structural_blocks(id).unwrap();
342        assert_eq!(got.len(), 2);
343        // get_structural_blocks orders by kind: SEEKTABLE before STREAMINFO.
344        assert_eq!(got[0].kind, "SEEKTABLE");
345        assert_eq!(got[1].body, vec![1, 2]);
346    }
347
348    #[test]
349    fn bulk_writer_dropped_without_commit_rolls_back() {
350        let db = Db::open_in_memory().unwrap();
351        {
352            let mut bw = db.bulk_writer().unwrap();
353            bw.upsert_track(&NewTrack {
354                backing_path: "/m/ghost.flac".into(),
355                format: Format::Flac,
356                audio_offset: 0,
357                audio_length: 0,
358                backing_size: 0,
359                backing_mtime_ns: 0,
360                backing_ctime_ns: 0,
361            })
362            .unwrap();
363            // Dropped here without `commit()` → Transaction rolls back.
364        }
365        assert_eq!(db.list_tracks().unwrap().len(), 0);
366    }
367}