torrust_index_backend/databases/
sqlite.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::NaiveDateTime;
6use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};
8
9use super::database::TABLES_TO_TRUNCATE;
10use crate::databases::database;
11use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
12use crate::models::category::CategoryId;
13use crate::models::info_hash::InfoHash;
14use crate::models::response::TorrentsResponse;
15use crate::models::torrent::TorrentListing;
16use crate::models::torrent_file::{DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentInfo, Torrent, TorrentFile};
17use crate::models::torrent_tag::{TagId, TorrentTag};
18use crate::models::tracker_key::TrackerKey;
19use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
20use crate::utils::clock;
21use crate::utils::hex::from_bytes;
22
23pub struct Sqlite {
24    pub pool: SqlitePool,
25}
26
27#[async_trait]
28impl Database for Sqlite {
29    fn get_database_driver(&self) -> Driver {
30        Driver::Sqlite3
31    }
32
33    async fn new(database_url: &str) -> Self {
34        let mut connection_options = SqliteConnectOptions::from_str(database_url).expect("Unable to create connection options.");
35        connection_options
36            .log_statements(log::LevelFilter::Error)
37            .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(1));
38
39        let db = SqlitePoolOptions::new()
40            .connect_with(connection_options)
41            .await
42            .expect("Unable to create database pool.");
43
44        sqlx::migrate!("migrations/sqlite3")
45            .run(&db)
46            .await
47            .expect("Could not run database migrations.");
48
49        Self { pool: db }
50    }
51
52    async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
53        // open pool connection
54        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
55
56        // start db transaction
57        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
58
59        // create the user account and get the user id
60        let user_id =
61            query("INSERT INTO torrust_users (date_registered) VALUES (strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')))")
62                .execute(&mut tx)
63                .await
64                .map(|v| v.last_insert_rowid())
65                .map_err(|_| database::Error::Error)?;
66
67        // add password hash for account
68        let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
69            .bind(user_id)
70            .bind(password_hash)
71            .execute(&mut tx)
72            .await
73            .map_err(|_| database::Error::Error);
74
75        // rollback transaction on error
76        if let Err(e) = insert_user_auth_result {
77            let _ = tx.rollback().await;
78            return Err(e);
79        }
80
81        // add account profile details
82        let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
83            .bind(user_id)
84            .bind(username)
85            .bind(email)
86            .execute(&mut tx)
87            .await
88            .map_err(|e| match e {
89                sqlx::Error::Database(err) => {
90                    if err.message().contains("username") {
91                        database::Error::UsernameTaken
92                    } else if err.message().contains("email") {
93                        database::Error::EmailTaken
94                    } else {
95                        database::Error::Error
96                    }
97                }
98                _ => database::Error::Error
99            });
100
101        // commit or rollback transaction and return user_id on success
102        match insert_user_profile_result {
103            Ok(_) => {
104                let _ = tx.commit().await;
105                Ok(user_id)
106            }
107            Err(e) => {
108                let _ = tx.rollback().await;
109                Err(e)
110            }
111        }
112    }
113
114    async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
115        query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
116            .bind(user_id)
117            .fetch_one(&self.pool)
118            .await
119            .map_err(|_| database::Error::UserNotFound)
120    }
121
122    async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
123        query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
124            .bind(user_id)
125            .fetch_one(&self.pool)
126            .await
127            .map_err(|_| database::Error::UserNotFound)
128    }
129
130    async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
131        query_as::<_, UserProfile>("SELECT * FROM torrust_user_profiles WHERE username = ?")
132            .bind(username)
133            .fetch_one(&self.pool)
134            .await
135            .map_err(|_| database::Error::UserNotFound)
136    }
137
138    async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
139        query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
140            .bind(user_id)
141            .fetch_one(&self.pool)
142            .await
143            .map_err(|_| database::Error::UserNotFound)
144    }
145
146    async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
147        const HOUR_IN_SECONDS: i64 = 3600;
148
149        // casting current_time() to i64 will overflow in the year 2262
150        let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
151
152        // get tracker key that is valid for at least one hour from now
153        query_as::<_, TrackerKey>("SELECT tracker_key AS key, date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = $1 AND date_expiry > $2 ORDER BY date_expiry DESC")
154            .bind(user_id)
155            .bind(current_time_plus_hour)
156            .fetch_one(&self.pool)
157            .await
158            .ok()
159    }
160
161    async fn count_users(&self) -> Result<i64, database::Error> {
162        query_as("SELECT COUNT(*) FROM torrust_users")
163            .fetch_one(&self.pool)
164            .await
165            .map(|(v,)| v)
166            .map_err(|_| database::Error::Error)
167    }
168
169    async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
170        // date needs to be in ISO 8601 format
171        let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
172
173        query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES ($1, $2, $3)")
174            .bind(user_id)
175            .bind(reason)
176            .bind(date_expiry_string)
177            .execute(&self.pool)
178            .await
179            .map(|_| ())
180            .map_err(|_| database::Error::Error)
181    }
182
183    async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
184        query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
185            .bind(user_id)
186            .execute(&self.pool)
187            .await
188            .map_err(|_| database::Error::Error)
189            .and_then(|v| {
190                if v.rows_affected() > 0 {
191                    Ok(())
192                } else {
193                    Err(database::Error::UserNotFound)
194                }
195            })
196    }
197
198    async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
199        query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
200            .bind(user_id)
201            .execute(&self.pool)
202            .await
203            .map(|_| ())
204            .map_err(|_| database::Error::Error)
205    }
206
207    async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
208        let key = tracker_key.key.clone();
209
210        query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES ($1, $2, $3)")
211            .bind(user_id)
212            .bind(key)
213            .bind(tracker_key.valid_until)
214            .execute(&self.pool)
215            .await
216            .map(|_| ())
217            .map_err(|_| database::Error::Error)
218    }
219
220    async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
221        query("DELETE FROM torrust_users WHERE user_id = ?")
222            .bind(user_id)
223            .execute(&self.pool)
224            .await
225            .map_err(|_| database::Error::Error)
226            .and_then(|v| {
227                if v.rows_affected() > 0 {
228                    Ok(())
229                } else {
230                    Err(database::Error::UserNotFound)
231                }
232            })
233    }
234
235    async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
236        query("INSERT INTO torrust_categories (name) VALUES (?)")
237            .bind(category_name)
238            .execute(&self.pool)
239            .await
240            .map(|v| v.last_insert_rowid())
241            .map_err(|e| match e {
242                sqlx::Error::Database(err) => {
243                    if err.message().contains("UNIQUE") {
244                        database::Error::CategoryAlreadyExists
245                    } else {
246                        database::Error::Error
247                    }
248                }
249                _ => database::Error::Error,
250            })
251    }
252
253    async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
254        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
255            .bind(category_id)
256            .fetch_one(&self.pool)
257            .await
258            .map_err(|_| database::Error::CategoryNotFound)
259    }
260
261    async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
262        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
263            .bind(category_name)
264            .fetch_one(&self.pool)
265            .await
266            .map_err(|_| database::Error::CategoryNotFound)
267    }
268
269    async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
270        query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name")
271            .fetch_all(&self.pool)
272            .await
273            .map_err(|_| database::Error::Error)
274    }
275
276    async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
277        query("DELETE FROM torrust_categories WHERE name = ?")
278            .bind(category_name)
279            .execute(&self.pool)
280            .await
281            .map_err(|_| database::Error::Error)
282            .and_then(|v| {
283                if v.rows_affected() > 0 {
284                    Ok(())
285                } else {
286                    Err(database::Error::CategoryNotFound)
287                }
288            })
289    }
290
291    // TODO: refactor this
292    async fn get_torrents_search_sorted_paginated(
293        &self,
294        search: &Option<String>,
295        categories: &Option<Vec<String>>,
296        tags: &Option<Vec<String>>,
297        sort: &Sorting,
298        offset: u64,
299        limit: u8,
300    ) -> Result<TorrentsResponse, database::Error> {
301        let title = match search {
302            None => "%".to_string(),
303            Some(v) => format!("%{v}%"),
304        };
305
306        let sort_query: String = match sort {
307            Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
308            Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
309            Sorting::SeedersAsc => "seeders ASC".to_string(),
310            Sorting::SeedersDesc => "seeders DESC".to_string(),
311            Sorting::LeechersAsc => "leechers ASC".to_string(),
312            Sorting::LeechersDesc => "leechers DESC".to_string(),
313            Sorting::NameAsc => "title ASC".to_string(),
314            Sorting::NameDesc => "title DESC".to_string(),
315            Sorting::SizeAsc => "size ASC".to_string(),
316            Sorting::SizeDesc => "size DESC".to_string(),
317        };
318
319        let category_filter_query = if let Some(c) = categories {
320            let mut i = 0;
321            let mut category_filters = String::new();
322            for category in c {
323                // don't take user input in the db query
324                if let Ok(sanitized_category) = self.get_category_from_name(category).await {
325                    let mut str = format!("tc.name = '{}'", sanitized_category.name);
326                    if i > 0 {
327                        str = format!(" OR {str}");
328                    }
329                    category_filters.push_str(&str);
330                    i += 1;
331                }
332            }
333            if category_filters.is_empty() {
334                String::new()
335            } else {
336                format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
337            }
338        } else {
339            String::new()
340        };
341
342        let tag_filter_query = if let Some(t) = tags {
343            let mut i = 0;
344            let mut tag_filters = String::new();
345            for tag in t {
346                // don't take user input in the db query
347                if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
348                    let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
349                    if i > 0 {
350                        str = format!(" OR {str}");
351                    }
352                    tag_filters.push_str(&str);
353                    i += 1;
354                }
355            }
356            if tag_filters.is_empty() {
357                String::new()
358            } else {
359                format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
360            }
361        } else {
362            String::new()
363        };
364
365        let mut query_string = format!(
366            "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, tt.date_uploaded, tt.size AS file_size,
367            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
368            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
369            FROM torrust_torrents tt
370            {category_filter_query}
371            {tag_filter_query}
372            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
373            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
374            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
375            WHERE title LIKE ?
376            GROUP BY tt.torrent_id"
377        );
378
379        let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
380
381        let count_result: Result<i64, database::Error> = query_as(&count_query)
382            .bind(title.clone())
383            .fetch_one(&self.pool)
384            .await
385            .map(|(v,)| v)
386            .map_err(|_| database::Error::Error);
387
388        let count = count_result?;
389
390        query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
391
392        let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
393            .bind(title)
394            .bind(i64::saturating_add_unsigned(0, offset))
395            .bind(limit)
396            .fetch_all(&self.pool)
397            .await
398            .map_err(|_| database::Error::Error)?;
399
400        Ok(TorrentsResponse {
401            total: u32::try_from(count).expect("variable `count` is larger than u32"),
402            results: res,
403        })
404    }
405
406    #[allow(clippy::too_many_lines)]
407    async fn insert_torrent_and_get_id(
408        &self,
409        torrent: &Torrent,
410        uploader_id: UserId,
411        category_id: i64,
412        title: &str,
413        description: &str,
414    ) -> Result<i64, database::Error> {
415        let info_hash = torrent.info_hash();
416
417        // open pool connection
418        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
419
420        // start db transaction
421        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
422
423        // torrent file can only hold a pieces key or a root hash key: http://www.bittorrent.org/beps/bep_0030.html
424        let (pieces, root_hash): (String, bool) = if let Some(pieces) = &torrent.info.pieces {
425            (from_bytes(pieces.as_ref()), false)
426        } else {
427            let root_hash = torrent.info.root_hash.as_ref().ok_or(database::Error::Error)?;
428            (root_hash.to_string(), true)
429        };
430
431        let private = torrent.info.private.unwrap_or(0);
432
433        // add torrent
434        let torrent_id = query("INSERT INTO torrust_torrents (uploader_id, category_id, info_hash, size, name, pieces, piece_length, private, root_hash, date_uploaded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')))")
435            .bind(uploader_id)
436            .bind(category_id)
437            .bind(info_hash.to_lowercase())
438            .bind(torrent.file_size())
439            .bind(torrent.info.name.to_string())
440            .bind(pieces)
441            .bind(torrent.info.piece_length)
442            .bind(private)
443            .bind(root_hash)
444            .execute(&self.pool)
445            .await
446            .map(|v| v.last_insert_rowid())
447            .map_err(|e| match e {
448                sqlx::Error::Database(err) => {
449                    if err.message().contains("info_hash") {
450                        database::Error::TorrentAlreadyExists
451                    } else if err.message().contains("title") {
452                        database::Error::TorrentTitleAlreadyExists
453                    } else {
454                        database::Error::Error
455                    }
456                }
457                _ => database::Error::Error
458            })?;
459
460        let insert_torrent_files_result = if let Some(length) = torrent.info.length {
461            query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
462                .bind(torrent.info.md5sum.clone())
463                .bind(torrent_id)
464                .bind(length)
465                .execute(&mut tx)
466                .await
467                .map(|_| ())
468                .map_err(|_| database::Error::Error)
469        } else {
470            let files = torrent.info.files.as_ref().unwrap();
471
472            for file in files {
473                let path = file.path.join("/");
474
475                let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
476                    .bind(file.md5sum.clone())
477                    .bind(torrent_id)
478                    .bind(file.length)
479                    .bind(path)
480                    .execute(&mut tx)
481                    .await
482                    .map_err(|_| database::Error::Error)?;
483            }
484
485            Ok(())
486        };
487
488        // rollback transaction on error
489        if let Err(e) = insert_torrent_files_result {
490            let _ = tx.rollback().await;
491            return Err(e);
492        }
493
494        let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
495        {
496            // flatten the nested vec (this will however remove the)
497            let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
498
499            for tracker_url in &announce_urls {
500                let _ = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
501                    .bind(torrent_id)
502                    .bind(tracker_url)
503                    .execute(&mut tx)
504                    .await
505                    .map(|_| ())
506                    .map_err(|_| database::Error::Error)?;
507            }
508
509            Ok(())
510        } else {
511            let tracker_url = torrent.announce.as_ref().unwrap();
512
513            query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
514                .bind(torrent_id)
515                .bind(tracker_url)
516                .execute(&mut tx)
517                .await
518                .map(|_| ())
519                .map_err(|_| database::Error::Error)
520        };
521
522        // rollback transaction on error
523        if let Err(e) = insert_torrent_announce_urls_result {
524            let _ = tx.rollback().await;
525            return Err(e);
526        }
527
528        let insert_torrent_info_result =
529            query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
530                .bind(torrent_id)
531                .bind(title)
532                .bind(description)
533                .execute(&mut tx)
534                .await
535                .map_err(|e| match e {
536                    sqlx::Error::Database(err) => {
537                        if err.message().contains("info_hash") {
538                            database::Error::TorrentAlreadyExists
539                        } else if err.message().contains("title") {
540                            database::Error::TorrentTitleAlreadyExists
541                        } else {
542                            database::Error::Error
543                        }
544                    }
545                    _ => database::Error::Error,
546                });
547
548        // commit or rollback transaction and return user_id on success
549        match insert_torrent_info_result {
550            Ok(_) => {
551                let _ = tx.commit().await;
552                Ok(torrent_id)
553            }
554            Err(e) => {
555                let _ = tx.rollback().await;
556                Err(e)
557            }
558        }
559    }
560
561    async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrentInfo, database::Error> {
562        query_as::<_, DbTorrentInfo>(
563            "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE torrent_id = ?",
564        )
565        .bind(torrent_id)
566        .fetch_one(&self.pool)
567        .await
568        .map_err(|_| database::Error::TorrentNotFound)
569    }
570
571    async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrentInfo, database::Error> {
572        query_as::<_, DbTorrentInfo>(
573            "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE info_hash = ?",
574        )
575        .bind(info_hash.to_hex_string().to_lowercase())
576        .fetch_one(&self.pool)
577        .await
578        .map_err(|_| database::Error::TorrentNotFound)
579    }
580
581    async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
582        let db_torrent_files =
583            query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
584                .bind(torrent_id)
585                .fetch_all(&self.pool)
586                .await
587                .map_err(|_| database::Error::TorrentNotFound)?;
588
589        let torrent_files: Vec<TorrentFile> = db_torrent_files
590            .into_iter()
591            .map(|tf| TorrentFile {
592                path: tf
593                    .path
594                    .unwrap_or_default()
595                    .split('/')
596                    .map(std::string::ToString::to_string)
597                    .collect(),
598                length: tf.length,
599                md5sum: tf.md5sum,
600            })
601            .collect();
602
603        Ok(torrent_files)
604    }
605
606    async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
607        query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
608            .bind(torrent_id)
609            .fetch_all(&self.pool)
610            .await
611            .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
612            .map_err(|_| database::Error::TorrentNotFound)
613    }
614
615    async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
616        query_as::<_, TorrentListing>(
617            "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, tt.date_uploaded, tt.size AS file_size,
618            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
619            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
620            FROM torrust_torrents tt
621            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
622            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
623            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
624            WHERE tt.torrent_id = ?
625            GROUP BY ts.torrent_id"
626        )
627            .bind(torrent_id)
628            .fetch_one(&self.pool)
629            .await
630            .map_err(|_| database::Error::TorrentNotFound)
631    }
632
633    async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
634        query_as::<_, TorrentListing>(
635            "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, tt.date_uploaded, tt.size AS file_size,
636            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
637            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
638            FROM torrust_torrents tt
639            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
640            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
641            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
642            WHERE tt.info_hash = ?
643            GROUP BY ts.torrent_id"
644        )
645            .bind(info_hash.to_string().to_lowercase())
646            .fetch_one(&self.pool)
647            .await
648            .map_err(|_| database::Error::TorrentNotFound)
649    }
650
651    async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
652        query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
653            .fetch_all(&self.pool)
654            .await
655            .map_err(|_| database::Error::Error)
656    }
657
658    async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
659        query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
660            .bind(title)
661            .bind(torrent_id)
662            .execute(&self.pool)
663            .await
664            .map_err(|e| match e {
665                sqlx::Error::Database(err) => {
666                    if err.message().contains("UNIQUE") {
667                        database::Error::TorrentTitleAlreadyExists
668                    } else {
669                        database::Error::Error
670                    }
671                }
672                _ => database::Error::Error,
673            })
674            .and_then(|v| {
675                if v.rows_affected() > 0 {
676                    Ok(())
677                } else {
678                    Err(database::Error::TorrentNotFound)
679                }
680            })
681    }
682
683    async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
684        query("UPDATE torrust_torrent_info SET description = $1 WHERE torrent_id = $2")
685            .bind(description)
686            .bind(torrent_id)
687            .execute(&self.pool)
688            .await
689            .map_err(|_| database::Error::Error)
690            .and_then(|v| {
691                if v.rows_affected() > 0 {
692                    Ok(())
693                } else {
694                    Err(database::Error::TorrentNotFound)
695                }
696            })
697    }
698
699    async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
700        query("UPDATE torrust_torrents SET category_id = $1 WHERE torrent_id = $2")
701            .bind(category_id)
702            .bind(torrent_id)
703            .execute(&self.pool)
704            .await
705            .map_err(|_| database::Error::Error)
706            .and_then(|v| {
707                if v.rows_affected() > 0 {
708                    Ok(())
709                } else {
710                    Err(database::Error::TorrentNotFound)
711                }
712            })
713    }
714
715    async fn add_tag(&self, name: &str) -> Result<(), database::Error> {
716        query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
717            .bind(name)
718            .execute(&self.pool)
719            .await
720            .map(|_| ())
721            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
722    }
723
724    async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
725        query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
726            .bind(tag_id)
727            .execute(&self.pool)
728            .await
729            .map(|_| ())
730            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
731    }
732
733    async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
734        query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
735            .bind(torrent_id)
736            .bind(tag_id)
737            .execute(&self.pool)
738            .await
739            .map(|_| ())
740            .map_err(|_| database::Error::Error)
741    }
742
743    async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
744        let mut transaction = self
745            .pool
746            .begin()
747            .await
748            .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
749
750        for tag_id in tag_ids {
751            query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
752                .bind(torrent_id)
753                .bind(tag_id)
754                .execute(&mut transaction)
755                .await
756                .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
757        }
758
759        transaction
760            .commit()
761            .await
762            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
763    }
764
765    async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
766        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
767            .bind(torrent_id)
768            .bind(tag_id)
769            .execute(&self.pool)
770            .await
771            .map(|_| ())
772            .map_err(|_| database::Error::Error)
773    }
774
775    async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
776        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
777            .bind(torrent_id)
778            .execute(&self.pool)
779            .await
780            .map(|_| ())
781            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
782    }
783
784    async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
785        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
786            .bind(name)
787            .fetch_one(&self.pool)
788            .await
789            .map_err(|_| database::Error::TagNotFound)
790    }
791
792    async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
793        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
794            .fetch_all(&self.pool)
795            .await
796            .map_err(|_| database::Error::Error)
797    }
798
799    async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
800        query_as::<_, TorrentTag>(
801            "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
802            FROM torrust_torrent_tags
803            JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
804            WHERE torrust_torrent_tag_links.torrent_id = ?",
805        )
806        .bind(torrent_id)
807        .fetch_all(&self.pool)
808        .await
809        .map_err(|_| database::Error::Error)
810    }
811
812    async fn update_tracker_info(
813        &self,
814        torrent_id: i64,
815        tracker_url: &str,
816        seeders: i64,
817        leechers: i64,
818    ) -> Result<(), database::Error> {
819        query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES ($1, $2, $3, $4)")
820            .bind(torrent_id)
821            .bind(tracker_url)
822            .bind(seeders)
823            .bind(leechers)
824            .execute(&self.pool)
825            .await
826            .map(|_| ())
827            .map_err(|_| database::Error::TorrentNotFound)
828    }
829
830    async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
831        query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
832            .bind(torrent_id)
833            .execute(&self.pool)
834            .await
835            .map_err(|_| database::Error::Error)
836            .and_then(|v| {
837                if v.rows_affected() > 0 {
838                    Ok(())
839                } else {
840                    Err(database::Error::TorrentNotFound)
841                }
842            })
843    }
844
845    async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
846        for table in TABLES_TO_TRUNCATE {
847            query(&format!("DELETE FROM {table};"))
848                .execute(&self.pool)
849                .await
850                .map_err(|_| database::Error::Error)?;
851        }
852
853        Ok(())
854    }
855}