1use crate::art::sha256_hex;
2use crate::models::{BinaryTag, NewArt, NewTrack, StructuralBlock, Tag, TrackArt};
3use crate::{Db, ReadWrite, Result};
4use rusqlite::{Transaction, params};
5
6impl Db<ReadWrite> {
7 pub(crate) fn apply_bulk_pragmas(conn: &rusqlite::Connection) -> Result<()> {
11 conn.pragma_update(None, "synchronous", "NORMAL")?;
12 conn.pragma_update(None, "cache_size", -65536)?; conn.pragma_update(None, "temp_store", "MEMORY")?;
14 Ok(())
15 }
16
17 pub fn apply_bulk_pragmas_self(&self) -> Result<()> {
19 Self::apply_bulk_pragmas(&self.conn)
20 }
21
22 pub fn bulk_writer(&self) -> Result<BulkWriter<'_>> {
25 Ok(BulkWriter {
26 tx: self.conn.unchecked_transaction()?,
27 })
28 }
29}
30
31pub struct BulkWriter<'c> {
35 tx: Transaction<'c>,
36}
37
38impl BulkWriter<'_> {
39 pub fn upsert_track(&mut self, t: &NewTrack) -> Result<i64> {
40 self.tx.execute(
41 "INSERT INTO tracks
42 (backing_path, format, audio_offset, audio_length, backing_size, backing_mtime_ns, backing_ctime_ns, updated_at)
43 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, CAST(strftime('%s','now') AS INTEGER))
44 ON CONFLICT(backing_path) DO UPDATE SET
45 format=excluded.format, audio_offset=excluded.audio_offset,
46 audio_length=excluded.audio_length, backing_size=excluded.backing_size,
47 backing_mtime_ns=excluded.backing_mtime_ns,
48 backing_ctime_ns=excluded.backing_ctime_ns,
49 updated_at=CAST(strftime('%s','now') AS INTEGER)",
50 params![t.backing_path, t.format.as_str(), t.audio_offset, t.audio_length, t.backing_size, t.backing_mtime_ns, t.backing_ctime_ns],
51 )?;
52 Ok(self.tx.query_row(
53 "SELECT id FROM tracks WHERE backing_path = ?1",
54 params![t.backing_path],
55 |r| r.get(0),
56 )?)
57 }
58
59 pub fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> Result<()> {
60 self.tx.execute(
61 "DELETE FROM tags WHERE track_id = ?1 AND value_blob IS NULL",
62 params![track_id],
63 )?;
64 let mut stmt = self.tx.prepare_cached(
65 "INSERT INTO tags (track_id, key, value, ordinal) VALUES (?1, ?2, ?3, ?4)",
66 )?;
67 for t in tags {
68 stmt.execute(params![track_id, t.key, t.value, t.ordinal])?;
69 }
70 Ok(())
71 }
72
73 pub fn set_binary_tags(&mut self, track_id: i64, tags: &[BinaryTag]) -> Result<()> {
74 self.tx.execute(
75 "DELETE FROM tags WHERE track_id = ?1 AND value_blob IS NOT NULL",
76 params![track_id],
77 )?;
78 let mut stmt = self.tx.prepare(
79 "INSERT INTO tags (track_id, key, value, value_blob, ordinal) \
80 VALUES (?1, ?2, '', ?3, ?4)",
81 )?;
82 for t in tags {
83 stmt.execute(params![track_id, t.key, t.payload, t.ordinal])?;
84 }
85 Ok(())
86 }
87
88 pub fn set_structural_blocks(
89 &mut self,
90 track_id: i64,
91 blocks: &[StructuralBlock],
92 ) -> Result<()> {
93 self.tx.execute(
94 "DELETE FROM structural_blocks WHERE track_id = ?1",
95 params![track_id],
96 )?;
97 let mut stmt = self.tx.prepare(
98 "INSERT INTO structural_blocks (track_id, kind, ordinal, body) \
99 VALUES (?1, ?2, ?3, ?4)",
100 )?;
101 for b in blocks {
102 stmt.execute(params![track_id, b.kind, b.ordinal, b.body])?;
103 }
104 Ok(())
105 }
106
107 pub fn upsert_art(&mut self, a: &NewArt) -> Result<i64> {
108 let sha = sha256_hex(&a.data);
109 self.tx.execute(
110 "INSERT INTO art (sha256, mime, width, height, byte_len, data)
111 VALUES (?1, ?2, ?3, ?4, ?5, ?6) ON CONFLICT(sha256) DO NOTHING",
112 params![sha, a.mime, a.width, a.height, a.data.len() as u64, a.data],
113 )?;
114 Ok(self
115 .tx
116 .query_row("SELECT id FROM art WHERE sha256 = ?1", params![sha], |r| {
117 r.get(0)
118 })?)
119 }
120
121 pub fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> Result<()> {
122 self.tx.execute(
123 "DELETE FROM track_art WHERE track_id = ?1",
124 params![track_id],
125 )?;
126 let mut stmt = self.tx.prepare_cached(
127 "INSERT INTO track_art (track_id, art_id, picture_type, description, ordinal)
128 VALUES (?1, ?2, ?3, ?4, ?5)",
129 )?;
130 for it in items {
131 stmt.execute(params![
132 track_id,
133 it.art_id,
134 it.picture_type,
135 it.description,
136 it.ordinal
137 ])?;
138 }
139 Ok(())
140 }
141
142 pub fn commit(self) -> Result<()> {
143 self.tx.commit()?;
144 Ok(())
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use crate::Db;
151 use crate::models::{Format, NewArt, NewTrack, Tag, TrackArt};
152
153 #[test]
154 fn bulk_writer_persists_a_batch_in_one_commit() {
155 let db = Db::open_in_memory().unwrap();
156 {
157 let mut bw = db.bulk_writer().unwrap();
158 for i in 0..3 {
159 let id = bw
160 .upsert_track(&NewTrack {
161 backing_path: format!("/m/{i}.flac"),
162 format: Format::Flac,
163 audio_offset: 100,
164 audio_length: 200,
165 backing_size: 300,
166 backing_mtime_ns: 1,
167 backing_ctime_ns: 0,
168 })
169 .unwrap();
170 bw.replace_tags(id, &[Tag::new("title", &format!("t{i}"), 0)])
171 .unwrap();
172 let art_id = bw
173 .upsert_art(&NewArt {
174 mime: "image/png".into(),
175 width: None,
176 height: None,
177 data: vec![1, 2, 3, 4],
178 })
179 .unwrap();
180 bw.set_track_art(
181 id,
182 &[TrackArt {
183 art_id,
184 picture_type: 3,
185 description: String::new(),
186 ordinal: 0,
187 }],
188 )
189 .unwrap();
190 }
191 bw.commit().unwrap();
192 }
193 assert_eq!(db.list_tracks().unwrap().len(), 3);
194 let count: i64 = db
196 .conn
197 .query_row("SELECT COUNT(*) FROM art", [], |r| r.get(0))
198 .unwrap();
199 assert_eq!(count, 1);
200 let tag_count: i64 = db
202 .conn
203 .query_row("SELECT COUNT(*) FROM tags", [], |r| r.get(0))
204 .unwrap();
205 assert_eq!(tag_count, 3);
206 let title0: String = db
207 .conn
208 .query_row(
209 "SELECT value FROM tags WHERE key = 'title' ORDER BY value LIMIT 1",
210 [],
211 |r| r.get(0),
212 )
213 .unwrap();
214 assert_eq!(title0, "t0");
215 let track_art_count: i64 = db
217 .conn
218 .query_row("SELECT COUNT(*) FROM track_art", [], |r| r.get(0))
219 .unwrap();
220 assert_eq!(track_art_count, 3);
221 }
222
223 #[test]
224 fn sha256_hex_matches_known_digest() {
225 assert_eq!(
227 super::sha256_hex(b"abc"),
228 "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
229 );
230 }
231
232 #[test]
233 fn apply_bulk_pragmas_self_sets_non_default_pragmas() {
234 let db = Db::open_in_memory().unwrap();
235 db.apply_bulk_pragmas_self().unwrap();
236 let synchronous: i64 = db
238 .conn
239 .pragma_query_value(None, "synchronous", |r| r.get(0))
240 .unwrap();
241 assert_eq!(synchronous, 1);
242 let cache_size: i64 = db
244 .conn
245 .pragma_query_value(None, "cache_size", |r| r.get(0))
246 .unwrap();
247 assert_eq!(cache_size, -65536);
248 let temp_store: i64 = db
250 .conn
251 .pragma_query_value(None, "temp_store", |r| r.get(0))
252 .unwrap();
253 assert_eq!(temp_store, 2);
254 }
255
256 #[test]
257 fn bulk_replace_tags_preserves_binary_rows() {
258 let db = Db::open_in_memory().unwrap();
259 let tid = db
260 .upsert_track(&crate::NewTrack {
261 backing_path: "/a.mp3".into(),
262 format: crate::Format::Mp3,
263 audio_offset: 0,
264 audio_length: 0,
265 backing_size: 0,
266 backing_mtime_ns: 0,
267 backing_ctime_ns: 0,
268 })
269 .unwrap();
270 db.set_binary_tags(
271 tid,
272 &[crate::BinaryTag {
273 key: "PRIV".into(),
274 payload: vec![1, 2, 3],
275 ordinal: 0,
276 }],
277 )
278 .unwrap();
279
280 {
281 let mut bw = db.bulk_writer().unwrap();
282 bw.replace_tags(tid, &[crate::Tag::new("artist", "A", 0)])
283 .unwrap();
284 bw.commit().unwrap();
285 }
286
287 assert_eq!(
288 db.get_binary_tags(tid).unwrap().len(),
289 1,
290 "bulk replace_tags wiped binary rows"
291 );
292 assert_eq!(
293 db.get_tags(tid).unwrap(),
294 vec![crate::Tag::new("artist", "A", 0)]
295 );
296 }
297
298 #[test]
299 fn bulk_set_binary_tags_round_trips_and_scopes_to_binary_rows() {
300 let db = Db::open_in_memory().unwrap();
301 let tid = db
302 .upsert_track(&crate::NewTrack {
303 backing_path: "/a.mp3".into(),
304 format: crate::Format::Mp3,
305 audio_offset: 0,
306 audio_length: 0,
307 backing_size: 0,
308 backing_mtime_ns: 0,
309 backing_ctime_ns: 0,
310 })
311 .unwrap();
312 {
313 let mut bw = db.bulk_writer().unwrap();
314 bw.replace_tags(tid, &[crate::Tag::new("artist", "A", 0)])
315 .unwrap();
316 bw.set_binary_tags(
317 tid,
318 &[crate::BinaryTag {
319 key: "PRIV".into(),
320 payload: vec![7, 7, 7],
321 ordinal: 0,
322 }],
323 )
324 .unwrap();
325 bw.commit().unwrap();
326 }
327 let rows = db.get_binary_tags(tid).unwrap();
328 assert_eq!(rows.len(), 1);
329 assert_eq!(rows[0].key, "PRIV");
330 assert_eq!(rows[0].byte_len, 3);
331 assert_eq!(
332 db.get_tags(tid).unwrap(),
333 vec![crate::Tag::new("artist", "A", 0)]
334 );
335 }
336
337 #[test]
338 fn bulk_set_structural_blocks_round_trips() {
339 use crate::StructuralBlock;
340 let db = Db::open_in_memory().unwrap();
341 let id = {
342 let mut bw = db.bulk_writer().unwrap();
343 let id = bw
344 .upsert_track(&NewTrack {
345 backing_path: "/a.flac".into(),
346 format: Format::Flac,
347 audio_offset: 0,
348 audio_length: 1,
349 backing_size: 1,
350 backing_mtime_ns: 0,
351 backing_ctime_ns: 0,
352 })
353 .unwrap();
354 bw.set_structural_blocks(
355 id,
356 &[
357 StructuralBlock {
358 kind: "STREAMINFO".into(),
359 ordinal: 0,
360 body: vec![1, 2],
361 },
362 StructuralBlock {
363 kind: "SEEKTABLE".into(),
364 ordinal: 0,
365 body: vec![3],
366 },
367 ],
368 )
369 .unwrap();
370 bw.commit().unwrap();
371 id
372 };
373 let got = db.get_structural_blocks(id).unwrap();
374 assert_eq!(got.len(), 2);
375 assert_eq!(got[0].kind, "SEEKTABLE");
377 assert_eq!(got[1].body, vec![1, 2]);
378 }
379
380 #[test]
381 fn bulk_writer_dropped_without_commit_rolls_back() {
382 let db = Db::open_in_memory().unwrap();
383 {
384 let mut bw = db.bulk_writer().unwrap();
385 bw.upsert_track(&NewTrack {
386 backing_path: "/m/ghost.flac".into(),
387 format: Format::Flac,
388 audio_offset: 0,
389 audio_length: 0,
390 backing_size: 0,
391 backing_mtime_ns: 0,
392 backing_ctime_ns: 0,
393 })
394 .unwrap();
395 }
397 assert_eq!(db.list_tracks().unwrap().len(), 0);
398 }
399}