torrust_index_backend/databases/
mysql.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::NaiveDateTime;
6use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};
8
9use super::database::TABLES_TO_TRUNCATE;
10use crate::databases::database;
11use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
12use crate::models::category::CategoryId;
13use crate::models::info_hash::InfoHash;
14use crate::models::response::TorrentsResponse;
15use crate::models::torrent::TorrentListing;
16use crate::models::torrent_file::{DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentInfo, Torrent, TorrentFile};
17use crate::models::torrent_tag::{TagId, TorrentTag};
18use crate::models::tracker_key::TrackerKey;
19use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
20use crate::utils::clock;
21use crate::utils::hex::from_bytes;
22
23pub struct Mysql {
24    pub pool: MySqlPool,
25}
26
27#[async_trait]
28impl Database for Mysql {
29    fn get_database_driver(&self) -> Driver {
30        Driver::Mysql
31    }
32
33    async fn new(database_url: &str) -> Self {
34        let mut connection_options = MySqlConnectOptions::from_str(database_url).expect("Unable to create connection options.");
35        connection_options
36            .log_statements(log::LevelFilter::Error)
37            .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(1));
38
39        let db = MySqlPoolOptions::new()
40            .connect_with(connection_options)
41            .await
42            .expect("Unable to create database pool.");
43
44        sqlx::migrate!("migrations/mysql")
45            .run(&db)
46            .await
47            .expect("Could not run database migrations.");
48
49        Self { pool: db }
50    }
51
52    async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
53        // open pool connection
54        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
55
56        // start db transaction
57        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
58
59        // create the user account and get the user id
60        let user_id = query("INSERT INTO torrust_users (date_registered) VALUES (UTC_TIMESTAMP())")
61            .execute(&mut tx)
62            .await
63            .map(|v| v.last_insert_id())
64            .map_err(|_| database::Error::Error)?;
65
66        // add password hash for account
67        let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
68            .bind(user_id)
69            .bind(password_hash)
70            .execute(&mut tx)
71            .await
72            .map_err(|_| database::Error::Error);
73
74        // rollback transaction on error
75        if let Err(e) = insert_user_auth_result {
76            let _ = tx.rollback().await;
77            return Err(e);
78        }
79
80        // add account profile details
81        let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
82            .bind(user_id)
83            .bind(username)
84            .bind(email)
85            .execute(&mut tx)
86            .await
87            .map_err(|e| match e {
88                sqlx::Error::Database(err) => {
89                    if err.message().contains("username") {
90                        database::Error::UsernameTaken
91                    } else if err.message().contains("email") {
92                        database::Error::EmailTaken
93                    } else {
94                        database::Error::Error
95                    }
96                }
97                _ => database::Error::Error
98            });
99
100        // commit or rollback transaction and return user_id on success
101        match insert_user_profile_result {
102            Ok(_) => {
103                let _ = tx.commit().await;
104                Ok(i64::overflowing_add_unsigned(0, user_id).0)
105            }
106            Err(e) => {
107                let _ = tx.rollback().await;
108                Err(e)
109            }
110        }
111    }
112
113    async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
114        query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
115            .bind(user_id)
116            .fetch_one(&self.pool)
117            .await
118            .map_err(|_| database::Error::UserNotFound)
119    }
120
121    async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
122        query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
123            .bind(user_id)
124            .fetch_one(&self.pool)
125            .await
126            .map_err(|_| database::Error::UserNotFound)
127    }
128
129    async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
130        query_as::<_, UserProfile>(r#"SELECT user_id, username, COALESCE(email, "") as email, email_verified, COALESCE(bio, "") as bio, COALESCE(avatar, "") as avatar FROM torrust_user_profiles WHERE username = ?"#)
131            .bind(username)
132            .fetch_one(&self.pool)
133            .await
134            .map_err(|_| database::Error::UserNotFound)
135    }
136
137    async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
138        query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
139            .bind(user_id)
140            .fetch_one(&self.pool)
141            .await
142            .map_err(|_| database::Error::UserNotFound)
143    }
144
145    /// Gets User Tracker Key
146    ///
147    /// # Panics
148    ///
149    /// Will panic if the input time overflows the `u64` seconds overflows the `i64` type.
150    /// (this will naturally happen in 292.5 billion years)
151    async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
152        const HOUR_IN_SECONDS: i64 = 3600;
153
154        let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
155
156        // get tracker key that is valid for at least one hour from now
157        query_as::<_, TrackerKey>("SELECT tracker_key AS 'key', date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = ? AND date_expiry > ? ORDER BY date_expiry DESC")
158            .bind(user_id)
159            .bind(current_time_plus_hour)
160            .fetch_one(&self.pool)
161            .await
162            .ok()
163    }
164
165    async fn count_users(&self) -> Result<i64, database::Error> {
166        query_as("SELECT COUNT(*) FROM torrust_users")
167            .fetch_one(&self.pool)
168            .await
169            .map(|(v,)| v)
170            .map_err(|_| database::Error::Error)
171    }
172
173    async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
174        // date needs to be in ISO 8601 format
175        let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
176
177        query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES (?, ?, ?)")
178            .bind(user_id)
179            .bind(reason)
180            .bind(date_expiry_string)
181            .execute(&self.pool)
182            .await
183            .map(|_| ())
184            .map_err(|_| database::Error::Error)
185    }
186
187    async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
188        query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
189            .bind(user_id)
190            .execute(&self.pool)
191            .await
192            .map_err(|_| database::Error::Error)
193            .and_then(|v| {
194                if v.rows_affected() > 0 {
195                    Ok(())
196                } else {
197                    Err(database::Error::UserNotFound)
198                }
199            })
200    }
201
202    async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
203        query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
204            .bind(user_id)
205            .execute(&self.pool)
206            .await
207            .map_err(|_| database::Error::Error)
208            .and_then(|v| {
209                if v.rows_affected() > 0 {
210                    Ok(())
211                } else {
212                    Err(database::Error::UserNotFound)
213                }
214            })
215    }
216
217    async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
218        let key = tracker_key.key.clone();
219
220        query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES (?, ?, ?)")
221            .bind(user_id)
222            .bind(key)
223            .bind(tracker_key.valid_until)
224            .execute(&self.pool)
225            .await
226            .map(|_| ())
227            .map_err(|_| database::Error::Error)
228    }
229
230    async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
231        query("DELETE FROM torrust_users WHERE user_id = ?")
232            .bind(user_id)
233            .execute(&self.pool)
234            .await
235            .map_err(|_| database::Error::Error)
236            .and_then(|v| {
237                if v.rows_affected() > 0 {
238                    Ok(())
239                } else {
240                    Err(database::Error::UserNotFound)
241                }
242            })
243    }
244
245    async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
246        query("INSERT INTO torrust_categories (name) VALUES (?)")
247            .bind(category_name)
248            .execute(&self.pool)
249            .await
250            .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
251            .map_err(|e| match e {
252                sqlx::Error::Database(err) => {
253                    if err.message().contains("UNIQUE") {
254                        database::Error::CategoryAlreadyExists
255                    } else {
256                        database::Error::Error
257                    }
258                }
259                _ => database::Error::Error,
260            })
261    }
262
263    async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
264        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
265            .bind(category_id)
266            .fetch_one(&self.pool)
267            .await
268            .map_err(|_| database::Error::CategoryNotFound)
269    }
270
271    async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
272        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
273            .bind(category_name)
274            .fetch_one(&self.pool)
275            .await
276            .map_err(|_| database::Error::CategoryNotFound)
277    }
278
279    async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
280        query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name")
281            .fetch_all(&self.pool)
282            .await
283            .map_err(|_| database::Error::Error)
284    }
285
286    async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
287        query("DELETE FROM torrust_categories WHERE name = ?")
288            .bind(category_name)
289            .execute(&self.pool)
290            .await
291            .map_err(|_| database::Error::Error)
292            .and_then(|v| {
293                if v.rows_affected() > 0 {
294                    Ok(())
295                } else {
296                    Err(database::Error::CategoryNotFound)
297                }
298            })
299    }
300
301    // TODO: refactor this
302    async fn get_torrents_search_sorted_paginated(
303        &self,
304        search: &Option<String>,
305        categories: &Option<Vec<String>>,
306        tags: &Option<Vec<String>>,
307        sort: &Sorting,
308        offset: u64,
309        limit: u8,
310    ) -> Result<TorrentsResponse, database::Error> {
311        let title = match search {
312            None => "%".to_string(),
313            Some(v) => format!("%{v}%"),
314        };
315
316        let sort_query: String = match sort {
317            Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
318            Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
319            Sorting::SeedersAsc => "seeders ASC".to_string(),
320            Sorting::SeedersDesc => "seeders DESC".to_string(),
321            Sorting::LeechersAsc => "leechers ASC".to_string(),
322            Sorting::LeechersDesc => "leechers DESC".to_string(),
323            Sorting::NameAsc => "title ASC".to_string(),
324            Sorting::NameDesc => "title DESC".to_string(),
325            Sorting::SizeAsc => "size ASC".to_string(),
326            Sorting::SizeDesc => "size DESC".to_string(),
327        };
328
329        let category_filter_query = if let Some(c) = categories {
330            let mut i = 0;
331            let mut category_filters = String::new();
332            for category in c {
333                // don't take user input in the db query
334                if let Ok(sanitized_category) = self.get_category_from_name(category).await {
335                    let mut str = format!("tc.name = '{}'", sanitized_category.name);
336                    if i > 0 {
337                        str = format!(" OR {str}");
338                    }
339                    category_filters.push_str(&str);
340                    i += 1;
341                }
342            }
343            if category_filters.is_empty() {
344                String::new()
345            } else {
346                format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
347            }
348        } else {
349            String::new()
350        };
351
352        let tag_filter_query = if let Some(t) = tags {
353            let mut i = 0;
354            let mut tag_filters = String::new();
355            for tag in t {
356                // don't take user input in the db query
357                if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
358                    let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
359                    if i > 0 {
360                        str = format!(" OR {str}");
361                    }
362                    tag_filters.push_str(&str);
363                    i += 1;
364                }
365            }
366            if tag_filters.is_empty() {
367                String::new()
368            } else {
369                format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
370            }
371        } else {
372            String::new()
373        };
374
375        let mut query_string = format!(
376            "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded, tt.size AS file_size,
377            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
378            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
379            FROM torrust_torrents tt
380            {category_filter_query}
381            {tag_filter_query}
382            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
383            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
384            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
385            WHERE title LIKE ?
386            GROUP BY tt.torrent_id"
387        );
388
389        let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
390
391        let count_result: Result<i64, database::Error> = query_as(&count_query)
392            .bind(title.clone())
393            .fetch_one(&self.pool)
394            .await
395            .map(|(v,)| v)
396            .map_err(|_| database::Error::Error);
397
398        let count = count_result?;
399
400        query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
401
402        let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
403            .bind(title)
404            .bind(i64::saturating_add_unsigned(0, offset))
405            .bind(limit)
406            .fetch_all(&self.pool)
407            .await
408            .map_err(|_| database::Error::Error)?;
409
410        Ok(TorrentsResponse {
411            total: u32::try_from(count).expect("variable `count` is larger than u32"),
412            results: res,
413        })
414    }
415
416    #[allow(clippy::too_many_lines)]
417    async fn insert_torrent_and_get_id(
418        &self,
419        torrent: &Torrent,
420        uploader_id: UserId,
421        category_id: i64,
422        title: &str,
423        description: &str,
424    ) -> Result<i64, database::Error> {
425        let info_hash = torrent.info_hash();
426
427        // open pool connection
428        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
429
430        // start db transaction
431        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
432
433        // torrent file can only hold a pieces key or a root hash key: http://www.bittorrent.org/beps/bep_0030.html
434        let (pieces, root_hash): (String, bool) = if let Some(pieces) = &torrent.info.pieces {
435            (from_bytes(pieces.as_ref()), false)
436        } else {
437            let root_hash = torrent.info.root_hash.as_ref().ok_or(database::Error::Error)?;
438            (root_hash.to_string(), true)
439        };
440
441        let private = torrent.info.private.unwrap_or(0);
442
443        // add torrent
444        let torrent_id = query("INSERT INTO torrust_torrents (uploader_id, category_id, info_hash, size, name, pieces, piece_length, private, root_hash, date_uploaded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, UTC_TIMESTAMP())")
445            .bind(uploader_id)
446            .bind(category_id)
447            .bind(info_hash.to_lowercase())
448            .bind(torrent.file_size())
449            .bind(torrent.info.name.to_string())
450            .bind(pieces)
451            .bind(torrent.info.piece_length)
452            .bind(private)
453            .bind(root_hash)
454            .execute(&self.pool)
455            .await
456            .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
457            .map_err(|e| match e {
458                sqlx::Error::Database(err) => {
459                    if err.message().contains("info_hash") {
460                        database::Error::TorrentAlreadyExists
461                    } else if err.message().contains("title") {
462                        database::Error::TorrentTitleAlreadyExists
463                    } else {
464                        database::Error::Error
465                    }
466                }
467                _ => database::Error::Error
468            })?;
469
470        let insert_torrent_files_result = if let Some(length) = torrent.info.length {
471            query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
472                .bind(torrent.info.md5sum.clone())
473                .bind(torrent_id)
474                .bind(length)
475                .execute(&mut tx)
476                .await
477                .map(|_| ())
478                .map_err(|_| database::Error::Error)
479        } else {
480            let files = torrent.info.files.as_ref().unwrap();
481
482            for file in files {
483                let path = file.path.join("/");
484
485                let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
486                    .bind(file.md5sum.clone())
487                    .bind(torrent_id)
488                    .bind(file.length)
489                    .bind(path)
490                    .execute(&mut tx)
491                    .await
492                    .map_err(|_| database::Error::Error)?;
493            }
494
495            Ok(())
496        };
497
498        // rollback transaction on error
499        if let Err(e) = insert_torrent_files_result {
500            let _ = tx.rollback().await;
501            return Err(e);
502        }
503
504        let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
505        {
506            // flatten the nested vec (this will however remove the)
507            let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
508
509            for tracker_url in &announce_urls {
510                let _ = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
511                    .bind(torrent_id)
512                    .bind(tracker_url)
513                    .execute(&mut tx)
514                    .await
515                    .map(|_| ())
516                    .map_err(|_| database::Error::Error)?;
517            }
518
519            Ok(())
520        } else {
521            let tracker_url = torrent.announce.as_ref().unwrap();
522
523            query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
524                .bind(torrent_id)
525                .bind(tracker_url)
526                .execute(&mut tx)
527                .await
528                .map(|_| ())
529                .map_err(|_| database::Error::Error)
530        };
531
532        // rollback transaction on error
533        if let Err(e) = insert_torrent_announce_urls_result {
534            let _ = tx.rollback().await;
535            return Err(e);
536        }
537
538        let insert_torrent_info_result =
539            query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
540                .bind(torrent_id)
541                .bind(title)
542                .bind(description)
543                .execute(&mut tx)
544                .await
545                .map_err(|e| match e {
546                    sqlx::Error::Database(err) => {
547                        if err.message().contains("info_hash") {
548                            database::Error::TorrentAlreadyExists
549                        } else if err.message().contains("title") {
550                            database::Error::TorrentTitleAlreadyExists
551                        } else {
552                            database::Error::Error
553                        }
554                    }
555                    _ => database::Error::Error,
556                });
557
558        // commit or rollback transaction and return user_id on success
559        match insert_torrent_info_result {
560            Ok(_) => {
561                let _ = tx.commit().await;
562                Ok(torrent_id)
563            }
564            Err(e) => {
565                let _ = tx.rollback().await;
566                Err(e)
567            }
568        }
569    }
570
571    async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrentInfo, database::Error> {
572        query_as::<_, DbTorrentInfo>(
573            "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE torrent_id = ?",
574        )
575        .bind(torrent_id)
576        .fetch_one(&self.pool)
577        .await
578        .map_err(|_| database::Error::TorrentNotFound)
579    }
580
581    async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrentInfo, database::Error> {
582        query_as::<_, DbTorrentInfo>(
583            "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE info_hash = ?",
584        )
585        .bind(info_hash.to_hex_string().to_lowercase())
586        .fetch_one(&self.pool)
587        .await
588        .map_err(|_| database::Error::TorrentNotFound)
589    }
590
591    async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
592        let db_torrent_files =
593            query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
594                .bind(torrent_id)
595                .fetch_all(&self.pool)
596                .await
597                .map_err(|_| database::Error::TorrentNotFound)?;
598
599        let torrent_files: Vec<TorrentFile> = db_torrent_files
600            .into_iter()
601            .map(|tf| TorrentFile {
602                path: tf
603                    .path
604                    .unwrap_or_default()
605                    .split('/')
606                    .map(std::string::ToString::to_string)
607                    .collect(),
608                length: tf.length,
609                md5sum: tf.md5sum,
610            })
611            .collect();
612
613        Ok(torrent_files)
614    }
615
616    async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
617        query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
618            .bind(torrent_id)
619            .fetch_all(&self.pool)
620            .await
621            .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
622            .map_err(|_| database::Error::TorrentNotFound)
623    }
624
625    async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
626        query_as::<_, TorrentListing>(
627            "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded, tt.size AS file_size,
628            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
629            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
630            FROM torrust_torrents tt
631            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
632            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
633            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
634            WHERE tt.torrent_id = ?
635            GROUP BY torrent_id"
636        )
637            .bind(torrent_id)
638            .fetch_one(&self.pool)
639            .await
640            .map_err(|_| database::Error::TorrentNotFound)
641    }
642
643    async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
644        query_as::<_, TorrentListing>(
645            "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded, tt.size AS file_size,
646            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
647            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
648            FROM torrust_torrents tt
649            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
650            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
651            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
652            WHERE tt.info_hash = ?
653            GROUP BY torrent_id"
654        )
655            .bind(info_hash.to_hex_string().to_lowercase())
656            .fetch_one(&self.pool)
657            .await
658            .map_err(|_| database::Error::TorrentNotFound)
659    }
660
661    async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
662        query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
663            .fetch_all(&self.pool)
664            .await
665            .map_err(|_| database::Error::Error)
666    }
667
668    async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
669        query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
670            .bind(title)
671            .bind(torrent_id)
672            .execute(&self.pool)
673            .await
674            .map_err(|e| match e {
675                sqlx::Error::Database(err) => {
676                    if err.message().contains("UNIQUE") {
677                        database::Error::TorrentTitleAlreadyExists
678                    } else {
679                        database::Error::Error
680                    }
681                }
682                _ => database::Error::Error,
683            })
684            .and_then(|v| {
685                if v.rows_affected() > 0 {
686                    Ok(())
687                } else {
688                    Err(database::Error::TorrentNotFound)
689                }
690            })
691    }
692
693    async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
694        query("UPDATE torrust_torrent_info SET description = ? WHERE torrent_id = ?")
695            .bind(description)
696            .bind(torrent_id)
697            .execute(&self.pool)
698            .await
699            .map_err(|_| database::Error::Error)
700            .and_then(|v| {
701                if v.rows_affected() > 0 {
702                    Ok(())
703                } else {
704                    Err(database::Error::TorrentNotFound)
705                }
706            })
707    }
708
709    async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
710        query("UPDATE torrust_torrents SET category_id = ? WHERE torrent_id = ?")
711            .bind(category_id)
712            .bind(torrent_id)
713            .execute(&self.pool)
714            .await
715            .map_err(|_| database::Error::Error)
716            .and_then(|v| {
717                if v.rows_affected() > 0 {
718                    Ok(())
719                } else {
720                    Err(database::Error::TorrentNotFound)
721                }
722            })
723    }
724
725    async fn add_tag(&self, name: &str) -> Result<(), database::Error> {
726        query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
727            .bind(name)
728            .execute(&self.pool)
729            .await
730            .map(|_| ())
731            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
732    }
733
734    async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
735        query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
736            .bind(tag_id)
737            .execute(&self.pool)
738            .await
739            .map(|_| ())
740            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
741    }
742
743    async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
744        query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
745            .bind(torrent_id)
746            .bind(tag_id)
747            .execute(&self.pool)
748            .await
749            .map(|_| ())
750            .map_err(|_| database::Error::Error)
751    }
752
753    async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
754        let mut transaction = self
755            .pool
756            .begin()
757            .await
758            .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
759
760        for tag_id in tag_ids {
761            query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
762                .bind(torrent_id)
763                .bind(tag_id)
764                .execute(&mut transaction)
765                .await
766                .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
767        }
768
769        transaction
770            .commit()
771            .await
772            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
773    }
774
775    async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
776        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
777            .bind(torrent_id)
778            .bind(tag_id)
779            .execute(&self.pool)
780            .await
781            .map(|_| ())
782            .map_err(|_| database::Error::Error)
783    }
784
785    async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
786        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
787            .bind(torrent_id)
788            .execute(&self.pool)
789            .await
790            .map(|_| ())
791            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
792    }
793
794    async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
795        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
796            .bind(name)
797            .fetch_one(&self.pool)
798            .await
799            .map_err(|_| database::Error::TagNotFound)
800    }
801
802    async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
803        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
804            .fetch_all(&self.pool)
805            .await
806            .map_err(|_| database::Error::Error)
807    }
808
809    async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
810        query_as::<_, TorrentTag>(
811            "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
812            FROM torrust_torrent_tags
813            JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
814            WHERE torrust_torrent_tag_links.torrent_id = ?",
815        )
816        .bind(torrent_id)
817        .fetch_all(&self.pool)
818        .await
819        .map_err(|_| database::Error::Error)
820    }
821
822    async fn update_tracker_info(
823        &self,
824        torrent_id: i64,
825        tracker_url: &str,
826        seeders: i64,
827        leechers: i64,
828    ) -> Result<(), database::Error> {
829        query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES (?, ?, ?, ?)")
830            .bind(torrent_id)
831            .bind(tracker_url)
832            .bind(seeders)
833            .bind(leechers)
834            .execute(&self.pool)
835            .await
836            .map(|_| ())
837            .map_err(|_| database::Error::TorrentNotFound)
838    }
839
840    async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
841        query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
842            .bind(torrent_id)
843            .execute(&self.pool)
844            .await
845            .map_err(|_| database::Error::Error)
846            .and_then(|v| {
847                if v.rows_affected() > 0 {
848                    Ok(())
849                } else {
850                    Err(database::Error::TorrentNotFound)
851                }
852            })
853    }
854
855    async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
856        for table in TABLES_TO_TRUNCATE {
857            query(&format!("DELETE FROM {table};"))
858                .execute(&self.pool)
859                .await
860                .map_err(|_| database::Error::Error)?;
861        }
862
863        Ok(())
864    }
865}