1use std::path::{Path, PathBuf};
2use std::time::Duration;
3
4use ahash::AHashMap;
5use anyhow::{Context, Result, anyhow};
6use chrono::{DateTime, Utc};
7use episode_db::{EpisodeDB, EpisodeDBInsertable};
8use file_db::{FileDB, FileDBInsertable};
9use indoc::indoc;
10use rusqlite::{Connection, params};
11
12use super::{Episode, EpisodeNoId, Podcast, PodcastNoId, RE_ARTICLES};
13use crate::track::Track;
14use podcast_db::{PodcastDB, PodcastDBInsertable};
15
16mod episode_db;
17mod file_db;
18mod migration;
19mod podcast_db;
20
21pub type PodcastDBId = i64;
23
24#[derive(Debug)]
25pub struct SyncResult {
26 pub added: u64,
27 pub updated: u64,
28}
29
30#[derive(Debug)]
33pub struct Database {
34 path: PathBuf,
35 conn: Connection,
36}
37
38impl Database {
39 pub fn new(path: &Path) -> Result<Database> {
47 let mut db_path = path.to_path_buf();
48 std::fs::create_dir_all(&db_path).context("Unable to create subdirectory for database.")?;
49 db_path.push("data.db");
50 let conn = Connection::open(&db_path)?;
51
52 migration::migrate(&conn).context("Database creation / migration")?;
53
54 conn.execute("PRAGMA foreign_keys=ON;", [])
56 .context("Could not set database parameters.")?;
57
58 Ok(Database {
59 path: db_path,
60 conn,
61 })
62 }
63
64 pub fn insert_podcast(&self, podcast: &PodcastNoId) -> Result<u64> {
67 let mut conn = Connection::open(&self.path).context("Error connecting to database.")?;
68 let tx = conn.transaction()?;
69
70 PodcastDBInsertable::from(podcast).insert_podcast(&tx)?;
71
72 let pod_id: PodcastDBId = {
73 let mut stmt = tx.prepare_cached("SELECT id FROM podcasts WHERE url = ?")?;
74 stmt.query_row(params![podcast.url], |row| row.get(0))?
75 };
76 let mut inserted = 0;
77 for ep in podcast.episodes.iter().rev() {
78 Self::insert_episode(&tx, pod_id, ep)?;
79 inserted += 1;
80 }
81 tx.commit()?;
82
83 Ok(inserted)
84 }
85
86 pub fn insert_episode(
88 conn: &Connection,
89 podcast_id: PodcastDBId,
90 episode: &EpisodeNoId,
91 ) -> Result<PodcastDBId> {
92 EpisodeDBInsertable::new(episode, podcast_id).insert_episode(conn)?;
93
94 Ok(conn.last_insert_rowid())
95 }
96
97 pub fn insert_file(&self, episode_id: PodcastDBId, path: &Path) -> Result<()> {
99 FileDBInsertable::new(episode_id, path).insert_file(&self.conn)?;
100
101 Ok(())
102 }
103
104 pub fn remove_file(&self, episode_id: PodcastDBId) -> Result<()> {
107 file_db::delete_file(episode_id, &self.conn)?;
108
109 Ok(())
110 }
111
112 pub fn remove_files(&self, episode_ids: &[PodcastDBId]) -> Result<()> {
114 file_db::delete_files(episode_ids, &self.conn)?;
115
116 Ok(())
117 }
118
119 pub fn remove_podcast(&self, podcast_id: PodcastDBId) -> Result<()> {
121 podcast_db::delete_podcast(podcast_id, &self.conn)?;
122
123 Ok(())
124 }
125
126 pub fn update_podcast(&self, pod_id: PodcastDBId, podcast: &PodcastNoId) -> Result<SyncResult> {
130 PodcastDBInsertable::from(podcast).update_podcast(pod_id, &self.conn)?;
131
132 let result = self.update_episodes(pod_id, &podcast.episodes)?;
133 Ok(result)
134 }
135
136 fn update_episodes(
145 &self,
146 podcast_id: PodcastDBId,
147 episodes: &[EpisodeNoId],
148 ) -> Result<SyncResult> {
149 let old_episodes = self.get_episodes(podcast_id, true)?;
150 let mut old_ep_map = AHashMap::new();
151 for ep in &old_episodes {
152 if !ep.guid.is_empty() {
153 old_ep_map.insert(&ep.guid, ep);
154 }
155 }
156
157 let mut conn = Connection::open(&self.path).context("Error connecting to database.")?;
158 let tx = conn.transaction()?;
159
160 let mut inserted = 0;
161 let mut updated = 0;
162 for new_ep in episodes.iter().rev() {
163 let new_pd = new_ep.pubdate.map(|dt| dt.timestamp());
164
165 let mut existing_id = None;
166 let mut update = false;
167
168 if !new_ep.guid.is_empty() {
171 if let Some(old_ep) = old_ep_map.get(&new_ep.guid) {
172 existing_id = Some(old_ep.id);
173 update = Self::check_for_updates(old_ep, new_ep);
174 }
175 }
176
177 if existing_id.is_none() {
182 for old_ep in old_episodes.iter().rev() {
183 let mut matching = 0;
184 matching += i32::from(new_ep.title == old_ep.title);
185 matching += i32::from(new_ep.url == old_ep.url);
186
187 if let Some(pd) = new_pd {
188 if let Some(old_pd) = old_ep.pubdate {
189 matching += i32::from(pd == old_pd.timestamp());
190 }
191 }
192
193 if matching >= 2 {
194 existing_id = Some(old_ep.id);
195 update = Self::check_for_updates(old_ep, new_ep);
196 break;
197 }
198 }
199 }
200
201 if let Some(id) = existing_id {
202 if update {
203 EpisodeDBInsertable::new(new_ep, podcast_id).update_episode(id, &tx)?;
204
205 updated += 1;
206 }
207 } else {
208 Self::insert_episode(&tx, podcast_id, new_ep)?;
209
210 inserted += 1;
211 }
212 }
213 tx.commit()?;
214 Ok(SyncResult {
215 added: inserted,
216 updated,
217 })
218 }
219
220 fn check_for_updates(old_ep: &Episode, new_ep: &EpisodeNoId) -> bool {
224 let new_pd = new_ep.pubdate.map(|dt| dt.timestamp());
225 let mut pd_match = false;
226 if let Some(pd) = new_pd {
227 if let Some(old_pd) = old_ep.pubdate {
228 pd_match = pd == old_pd.timestamp();
229 }
230 }
231 if !(new_ep.title == old_ep.title
232 && new_ep.url == old_ep.url
233 && new_ep.guid == old_ep.guid
234 && new_ep.description == old_ep.description
235 && new_ep.duration == old_ep.duration
236 && pd_match)
237 {
238 return true;
239 }
240 false
241 }
242
243 pub fn set_played_status(&self, episode_id: PodcastDBId, played: bool) -> Result<()> {
245 let mut stmt = self
246 .conn
247 .prepare_cached("UPDATE episodes SET played = ? WHERE id = ?;")?;
248 stmt.execute(params![played, episode_id])?;
249 Ok(())
250 }
251
252 pub fn set_all_played_status(
254 &self,
255 episode_id_vec: &[PodcastDBId],
256 played: bool,
257 ) -> Result<()> {
258 let mut conn = Connection::open(&self.path).context("Error connecting to database.")?;
259 let tx = conn.transaction()?;
260
261 for episode_id in episode_id_vec {
262 let mut stmt = tx.prepare_cached("UPDATE episodes SET played = ? WHERE id = ?;")?;
263 stmt.execute(params![played, episode_id])?;
264 }
265 tx.commit()?;
266 Ok(())
267 }
268
269 pub fn hide_episode(&self, episode_id: PodcastDBId, hide: bool) -> Result<()> {
273 let mut stmt = self
274 .conn
275 .prepare_cached("UPDATE episodes SET hidden = ? WHERE id = ?;")?;
276 stmt.execute(params![hide, episode_id])?;
277 Ok(())
278 }
279
280 pub fn get_podcasts(&self) -> Result<Vec<Podcast>> {
283 let mut stmt = self.conn.prepare_cached("SELECT * FROM podcasts;")?;
284 let podcasts = stmt
285 .query_map([], PodcastDB::try_from_row_named)?
286 .flatten()
287 .map(|podcast| {
288 let episodes = match self.get_episodes(podcast.id, false) {
289 Ok(ep_list) => Ok(ep_list),
290 Err(_) => Err(rusqlite::Error::QueryReturnedNoRows),
291 }?;
292
293 let title_lower = podcast.title.to_lowercase();
294 let sort_title = RE_ARTICLES.replace(&title_lower, "").to_string();
295
296 Ok(Podcast {
297 id: podcast.id,
298 title: podcast.title,
299 sort_title,
300 url: podcast.url,
301 description: podcast.description,
302 author: podcast.author,
303 explicit: podcast.explicit,
304 last_checked: podcast.last_checked,
305 episodes,
306 image_url: podcast.image_url,
307 })
308 })
309 .collect::<Result<_, rusqlite::Error>>()?;
310
311 Ok(podcasts)
312 }
313
314 pub fn get_episodes(&self, pod_id: PodcastDBId, include_hidden: bool) -> Result<Vec<Episode>> {
316 let mut stmt = if include_hidden {
317 self.conn.prepare_cached(indoc! {
318 "SELECT episodes.id as epid, files.id as fileid, * FROM episodes
319 LEFT JOIN files ON episodes.id = files.episode_id
320 WHERE episodes.podcast_id = ?
321 ORDER BY pubdate DESC;
322 "})?
323 } else {
324 self.conn.prepare_cached(indoc! {"
325 SELECT episodes.id as epid, files.id as fileid, * FROM episodes
326 LEFT JOIN files ON episodes.id = files.episode_id
327 WHERE episodes.podcast_id = ?
328 AND episodes.hidden = 0
329 ORDER BY pubdate DESC;
330 "})?
331 };
332
333 let episodes = stmt
334 .query_map(params![pod_id], |row| {
335 let episode = EpisodeDB::try_from_row_named_alias_id(row)?;
336 let file = FileDB::try_from_row_named_alias_id(row).ok();
337
338 Ok(Episode {
339 id: episode.id,
340 pod_id,
341 title: episode.title,
342 url: episode.url,
343 guid: episode.guid,
344 description: episode.description,
345 pubdate: episode.pubdate,
346 duration: episode.duration,
347 path: file.map(|v| v.path),
348 played: episode.played,
349 last_position: episode.last_position,
350 image_url: episode.image_url,
351 })
352 })?
353 .flatten()
354 .collect();
355
356 Ok(episodes)
357 }
358
359 pub fn get_episode_by_url(&self, ep_uri: &str) -> Result<Episode> {
361 let mut stmt = self.conn.prepare_cached(indoc! {"
362 SELECT episodes.id as epid, files.id as fileid, * FROM episodes
363 LEFT JOIN files ON episodes.id = files.episode_id
364 WHERE episodes.url = ?
365 ORDER BY pubdate DESC;
366 "})?;
367
368 let episode = stmt
369 .query_map(params![ep_uri], |row| {
370 let episode = EpisodeDB::try_from_row_named_alias_id(row)?;
371 let file = FileDB::try_from_row_named_alias_id(row).ok();
372
373 Ok(Episode {
374 id: episode.id,
375 pod_id: episode.pod_id,
376 title: episode.title,
377 url: episode.url,
378 guid: episode.guid,
379 description: episode.description,
380 pubdate: episode.pubdate,
381 duration: episode.duration,
382 path: file.map(|v| v.path),
383 played: episode.played,
384 last_position: episode.last_position,
385 image_url: episode.image_url,
386 })
387 })?
388 .flatten()
389 .next();
390
391 episode.ok_or(anyhow!("No Episode found with url \"{ep_uri}\""))
392 }
393
394 pub fn clear_db(&self) -> Result<()> {
396 self.conn.execute("DELETE FROM files;", [])?;
397 self.conn.execute("DELETE FROM episodes;", [])?;
398 self.conn.execute("DELETE FROM podcasts;", [])?;
399 Ok(())
400 }
401
402 pub fn get_last_position(&mut self, track: &Track) -> Result<Duration> {
403 let podcast_data = track
404 .as_podcast()
405 .ok_or(anyhow!("Track is not a Podcast track!"))?;
406 let query = "SELECT last_position FROM episodes WHERE url = ?1";
407
408 let mut last_position: Duration = Duration::from_secs(0);
409 self.conn
410 .query_row(query, params![podcast_data.url()], |row| {
411 let last_position_u64: u64 = row.get(0)?;
412 last_position = Duration::from_secs(last_position_u64);
414 Ok(last_position)
415 })?;
416 Ok(last_position)
418 }
419
420 pub fn set_last_position(&self, track: &Track, last_position: Duration) -> Result<()> {
425 let podcast_data = track
426 .as_podcast()
427 .ok_or(anyhow!("Track is not a Podcast track!"))?;
428 let query = "UPDATE episodes SET last_position = ?1 WHERE url = ?2";
429 self.conn
430 .execute(query, params![last_position.as_secs(), podcast_data.url(),])
431 .context("update last position failed.")?;
432 Ok(())
435 }
436}
437
438fn convert_date(result: &Result<i64, rusqlite::Error>) -> Option<DateTime<Utc>> {
441 match result {
442 Ok(timestamp) => DateTime::from_timestamp(*timestamp, 0),
443 Err(_) => None,
444 }
445}
446
447#[cfg(test)]
448mod test_utils {
449 use rusqlite::Connection;
450
451 pub fn gen_database() -> Connection {
453 Connection::open_in_memory().expect("open db failed")
454 }
455}