1use std::path::{Path, PathBuf};
2use std::time::Duration;
3
4use ahash::AHashMap;
5use anyhow::{anyhow, Context, Result};
6use chrono::{DateTime, Utc};
7use episode_db::{EpisodeDB, EpisodeDBInsertable};
8use file_db::{FileDB, FileDBInsertable};
9use rusqlite::{params, Connection};
10
11use super::{Episode, EpisodeNoId, Podcast, PodcastNoId, RE_ARTICLES};
12use crate::track::Track;
13use podcast_db::{PodcastDB, PodcastDBInsertable};
14
15mod episode_db;
16mod file_db;
17mod migration;
18mod podcast_db;
19
20pub type PodcastDBId = i64;
22
23#[derive(Debug)]
24pub struct SyncResult {
25 pub added: u64,
26 pub updated: u64,
27}
28
29#[derive(Debug)]
32pub struct Database {
33 path: PathBuf,
34 conn: Connection,
35}
36
37impl Database {
38 pub fn new(path: &Path) -> Result<Database> {
46 let mut db_path = path.to_path_buf();
47 std::fs::create_dir_all(&db_path).context("Unable to create subdirectory for database.")?;
48 db_path.push("data.db");
49 let conn = Connection::open(&db_path)?;
50
51 migration::migrate(&conn).context("Database creation / migration")?;
52
53 conn.execute("PRAGMA foreign_keys=ON;", [])
55 .context("Could not set database parameters.")?;
56
57 Ok(Database {
58 path: db_path,
59 conn,
60 })
61 }
62
63 pub fn insert_podcast(&self, podcast: &PodcastNoId) -> Result<u64> {
66 let mut conn = Connection::open(&self.path).context("Error connecting to database.")?;
67 let tx = conn.transaction()?;
68
69 PodcastDBInsertable::from(podcast).insert_podcast(&tx)?;
70
71 let pod_id: PodcastDBId = {
72 let mut stmt = tx.prepare_cached("SELECT id FROM podcasts WHERE url = ?")?;
73 stmt.query_row(params![podcast.url], |row| row.get(0))?
74 };
75 let mut inserted = 0;
76 for ep in podcast.episodes.iter().rev() {
77 Self::insert_episode(&tx, pod_id, ep)?;
78 inserted += 1;
79 }
80 tx.commit()?;
81
82 Ok(inserted)
83 }
84
85 pub fn insert_episode(
87 conn: &Connection,
88 podcast_id: PodcastDBId,
89 episode: &EpisodeNoId,
90 ) -> Result<PodcastDBId> {
91 EpisodeDBInsertable::new(episode, podcast_id).insert_episode(conn)?;
92
93 Ok(conn.last_insert_rowid())
94 }
95
96 pub fn insert_file(&self, episode_id: PodcastDBId, path: &Path) -> Result<()> {
98 FileDBInsertable::new(episode_id, path).insert_file(&self.conn)?;
99
100 Ok(())
101 }
102
103 pub fn remove_file(&self, episode_id: PodcastDBId) -> Result<()> {
106 file_db::delete_file(episode_id, &self.conn)?;
107
108 Ok(())
109 }
110
111 pub fn remove_files(&self, episode_ids: &[PodcastDBId]) -> Result<()> {
113 file_db::delete_files(episode_ids, &self.conn)?;
114
115 Ok(())
116 }
117
118 pub fn remove_podcast(&self, podcast_id: PodcastDBId) -> Result<()> {
120 podcast_db::delete_podcast(podcast_id, &self.conn)?;
121
122 Ok(())
123 }
124
125 pub fn update_podcast(&self, pod_id: PodcastDBId, podcast: &PodcastNoId) -> Result<SyncResult> {
129 PodcastDBInsertable::from(podcast).update_podcast(pod_id, &self.conn)?;
130
131 let result = self.update_episodes(pod_id, &podcast.episodes)?;
132 Ok(result)
133 }
134
135 fn update_episodes(
144 &self,
145 podcast_id: PodcastDBId,
146 episodes: &[EpisodeNoId],
147 ) -> Result<SyncResult> {
148 let old_episodes = self.get_episodes(podcast_id, true)?;
149 let mut old_ep_map = AHashMap::new();
150 for ep in &old_episodes {
151 if !ep.guid.is_empty() {
152 old_ep_map.insert(&ep.guid, ep);
153 }
154 }
155
156 let mut conn = Connection::open(&self.path).context("Error connecting to database.")?;
157 let tx = conn.transaction()?;
158
159 let mut inserted = 0;
160 let mut updated = 0;
161 for new_ep in episodes.iter().rev() {
162 let new_pd = new_ep.pubdate.map(|dt| dt.timestamp());
163
164 let mut existing_id = None;
165 let mut update = false;
166
167 if !new_ep.guid.is_empty() {
170 if let Some(old_ep) = old_ep_map.get(&new_ep.guid) {
171 existing_id = Some(old_ep.id);
172 update = Self::check_for_updates(old_ep, new_ep);
173 }
174 }
175
176 if existing_id.is_none() {
181 for old_ep in old_episodes.iter().rev() {
182 let mut matching = 0;
183 matching += i32::from(new_ep.title == old_ep.title);
184 matching += i32::from(new_ep.url == old_ep.url);
185
186 if let Some(pd) = new_pd {
187 if let Some(old_pd) = old_ep.pubdate {
188 matching += i32::from(pd == old_pd.timestamp());
189 }
190 }
191
192 if matching >= 2 {
193 existing_id = Some(old_ep.id);
194 update = Self::check_for_updates(old_ep, new_ep);
195 break;
196 }
197 }
198 }
199
200 if let Some(id) = existing_id {
201 if update {
202 EpisodeDBInsertable::new(new_ep, podcast_id).update_episode(id, &tx)?;
203
204 updated += 1;
205 }
206 } else {
207 Self::insert_episode(&tx, podcast_id, new_ep)?;
208
209 inserted += 1;
210 }
211 }
212 tx.commit()?;
213 Ok(SyncResult {
214 added: inserted,
215 updated,
216 })
217 }
218
219 fn check_for_updates(old_ep: &Episode, new_ep: &EpisodeNoId) -> bool {
223 let new_pd = new_ep.pubdate.map(|dt| dt.timestamp());
224 let mut pd_match = false;
225 if let Some(pd) = new_pd {
226 if let Some(old_pd) = old_ep.pubdate {
227 pd_match = pd == old_pd.timestamp();
228 }
229 }
230 if !(new_ep.title == old_ep.title
231 && new_ep.url == old_ep.url
232 && new_ep.guid == old_ep.guid
233 && new_ep.description == old_ep.description
234 && new_ep.duration == old_ep.duration
235 && pd_match)
236 {
237 return true;
238 }
239 false
240 }
241
242 pub fn set_played_status(&self, episode_id: PodcastDBId, played: bool) -> Result<()> {
244 let mut stmt = self
245 .conn
246 .prepare_cached("UPDATE episodes SET played = ? WHERE id = ?;")?;
247 stmt.execute(params![played, episode_id])?;
248 Ok(())
249 }
250
251 pub fn set_all_played_status(
253 &self,
254 episode_id_vec: &[PodcastDBId],
255 played: bool,
256 ) -> Result<()> {
257 let mut conn = Connection::open(&self.path).context("Error connecting to database.")?;
258 let tx = conn.transaction()?;
259
260 for episode_id in episode_id_vec {
261 let mut stmt = tx.prepare_cached("UPDATE episodes SET played = ? WHERE id = ?;")?;
262 stmt.execute(params![played, episode_id])?;
263 }
264 tx.commit()?;
265 Ok(())
266 }
267
268 pub fn hide_episode(&self, episode_id: PodcastDBId, hide: bool) -> Result<()> {
272 let mut stmt = self
273 .conn
274 .prepare_cached("UPDATE episodes SET hidden = ? WHERE id = ?;")?;
275 stmt.execute(params![hide, episode_id])?;
276 Ok(())
277 }
278
279 pub fn get_podcasts(&self) -> Result<Vec<Podcast>> {
282 let mut stmt = self.conn.prepare_cached("SELECT * FROM podcasts;")?;
283 let podcasts = stmt
284 .query_map([], PodcastDB::try_from_row_named)?
285 .flatten()
286 .map(|podcast| {
287 let episodes = match self.get_episodes(podcast.id, false) {
288 Ok(ep_list) => Ok(ep_list),
289 Err(_) => Err(rusqlite::Error::QueryReturnedNoRows),
290 }?;
291
292 let title_lower = podcast.title.to_lowercase();
293 let sort_title = RE_ARTICLES.replace(&title_lower, "").to_string();
294
295 Ok(Podcast {
296 id: podcast.id,
297 title: podcast.title,
298 sort_title,
299 url: podcast.url,
300 description: podcast.description,
301 author: podcast.author,
302 explicit: podcast.explicit,
303 last_checked: podcast.last_checked,
304 episodes,
305 image_url: podcast.image_url,
306 })
307 })
308 .collect::<Result<_, rusqlite::Error>>()?;
309
310 Ok(podcasts)
311 }
312
313 pub fn get_episodes(&self, pod_id: PodcastDBId, include_hidden: bool) -> Result<Vec<Episode>> {
315 let mut stmt = if include_hidden {
316 self.conn.prepare_cached(
317 "SELECT episodes.id as epid, files.id as fileid, * FROM episodes
318 LEFT JOIN files ON episodes.id = files.episode_id
319 WHERE episodes.podcast_id = ?
320 ORDER BY pubdate DESC;",
321 )?
322 } else {
323 self.conn.prepare_cached(
324 "SELECT episodes.id as epid, files.id as fileid, * FROM episodes
325 LEFT JOIN files ON episodes.id = files.episode_id
326 WHERE episodes.podcast_id = ?
327 AND episodes.hidden = 0
328 ORDER BY pubdate DESC;",
329 )?
330 };
331
332 let episodes = stmt
333 .query_map(params![pod_id], |row| {
334 let episode = EpisodeDB::try_from_row_named_alias_id(row)?;
335 let file = FileDB::try_from_row_named_alias_id(row).ok();
336
337 Ok(Episode {
338 id: episode.id,
339 pod_id,
340 title: episode.title,
341 url: episode.url,
342 guid: episode.guid,
343 description: episode.description,
344 pubdate: episode.pubdate,
345 duration: episode.duration,
346 path: file.map(|v| v.path),
347 played: episode.played,
348 last_position: episode.last_position,
349 image_url: episode.image_url,
350 })
351 })?
352 .flatten()
353 .collect();
354
355 Ok(episodes)
356 }
357
358 pub fn get_episode_by_url(&self, ep_uri: &str) -> Result<Episode> {
360 let mut stmt = self.conn.prepare_cached(
361 "SELECT episodes.id as epid, files.id as fileid, * FROM episodes
362 LEFT JOIN files ON episodes.id = files.episode_id
363 WHERE episodes.url = ?
364 ORDER BY pubdate DESC;",
365 )?;
366
367 let episode = stmt
368 .query_map(params![ep_uri], |row| {
369 let episode = EpisodeDB::try_from_row_named_alias_id(row)?;
370 let file = FileDB::try_from_row_named_alias_id(row).ok();
371
372 Ok(Episode {
373 id: episode.id,
374 pod_id: episode.pod_id,
375 title: episode.title,
376 url: episode.url,
377 guid: episode.guid,
378 description: episode.description,
379 pubdate: episode.pubdate,
380 duration: episode.duration,
381 path: file.map(|v| v.path),
382 played: episode.played,
383 last_position: episode.last_position,
384 image_url: episode.image_url,
385 })
386 })?
387 .flatten()
388 .next();
389
390 episode.ok_or(anyhow!("No Episode found with url \"{ep_uri}\""))
391 }
392
393 pub fn clear_db(&self) -> Result<()> {
395 self.conn.execute("DELETE FROM files;", [])?;
396 self.conn.execute("DELETE FROM episodes;", [])?;
397 self.conn.execute("DELETE FROM podcasts;", [])?;
398 Ok(())
399 }
400
401 pub fn get_last_position(&mut self, track: &Track) -> Result<Duration> {
402 let query = "SELECT last_position FROM episodes WHERE url = ?1";
403
404 let mut last_position: Duration = Duration::from_secs(0);
405 self.conn.query_row(
406 query,
407 params![track.file().unwrap_or("Unknown File").to_string(),],
408 |row| {
409 let last_position_u64: u64 = row.get(0)?;
410 last_position = Duration::from_secs(last_position_u64);
412 Ok(last_position)
413 },
414 )?;
415 Ok(last_position)
417 }
418
419 pub fn set_last_position(&self, track: &Track, last_position: Duration) -> Result<()> {
424 let query = "UPDATE episodes SET last_position = ?1 WHERE url = ?2";
425 self.conn
426 .execute(
427 query,
428 params![
429 last_position.as_secs(),
430 track.file().unwrap_or("Unknown File Name").to_string(),
431 ],
432 )
433 .context("update last position failed.")?;
434 Ok(())
437 }
438}
439
440fn convert_date(result: &Result<i64, rusqlite::Error>) -> Option<DateTime<Utc>> {
443 match result {
444 Ok(timestamp) => DateTime::from_timestamp(*timestamp, 0),
445 Err(_) => None,
446 }
447}
448
449#[cfg(test)]
450mod test_utils {
451 use rusqlite::Connection;
452
453 pub fn gen_database() -> Connection {
455 Connection::open_in_memory().expect("open db failed")
456 }
457}