torrust_index/databases/
mysql.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};
8use url::Url;
9
10use super::database::TABLES_TO_TRUNCATE;
11use crate::databases::database;
12use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
13use crate::models::category::CategoryId;
14use crate::models::info_hash::InfoHash;
15use crate::models::response::TorrentsResponse;
16use crate::models::torrent::{Metadata, TorrentListing};
17use crate::models::torrent_file::{
18    DbTorrent, DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentHttpSeedUrl, DbTorrentNode, Torrent, TorrentFile,
19};
20use crate::models::torrent_tag::{TagId, TorrentTag};
21use crate::models::tracker_key::TrackerKey;
22use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
23use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
24use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
25use crate::utils::hex::from_bytes;
26
27pub struct Mysql {
28    pub pool: MySqlPool,
29}
30
31#[async_trait]
32impl Database for Mysql {
33    fn get_database_driver(&self) -> Driver {
34        Driver::Mysql
35    }
36
37    async fn new(database_url: &str) -> Self {
38        let connection_options = MySqlConnectOptions::from_str(database_url)
39            .expect("Unable to create connection options.")
40            .log_statements(log::LevelFilter::Debug)
41            .log_slow_statements(log::LevelFilter::Info, Duration::from_secs(1));
42
43        let db = MySqlPoolOptions::new()
44            .connect_with(connection_options)
45            .await
46            .expect("Unable to create database pool.");
47
48        sqlx::migrate!("migrations/mysql")
49            .run(&db)
50            .await
51            .expect("Could not run database migrations.");
52
53        Self { pool: db }
54    }
55
56    async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
57        // open pool connection
58        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
59
60        // start db transaction
61        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
62
63        // create the user account and get the user id
64        let user_id = query("INSERT INTO torrust_users (date_registered) VALUES (UTC_TIMESTAMP())")
65            .execute(&mut *tx)
66            .await
67            .map(|v| v.last_insert_id())
68            .map_err(|_| database::Error::Error)?;
69
70        // add password hash for account
71        let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
72            .bind(user_id)
73            .bind(password_hash)
74            .execute(&mut *tx)
75            .await
76            .map_err(|_| database::Error::Error);
77
78        // rollback transaction on error
79        if let Err(e) = insert_user_auth_result {
80            drop(tx.rollback().await);
81            return Err(e);
82        }
83
84        // add account profile details
85        let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
86            .bind(user_id)
87            .bind(username)
88            .bind(email)
89            .execute(&mut *tx)
90            .await
91            .map_err(|e| match e {
92                sqlx::Error::Database(err) => {
93                    if err.message().contains("username") {
94                        database::Error::UsernameTaken
95                    } else if err.message().contains("email") {
96                        database::Error::EmailTaken
97                    } else {
98                        database::Error::Error
99                    }
100                }
101                _ => database::Error::Error
102            });
103
104        // commit or rollback transaction and return user_id on success
105        match insert_user_profile_result {
106            Ok(_) => {
107                drop(tx.commit().await);
108                Ok(i64::overflowing_add_unsigned(0, user_id).0)
109            }
110            Err(e) => {
111                drop(tx.rollback().await);
112                Err(e)
113            }
114        }
115    }
116
117    /// Change user's password.
118    async fn change_user_password(&self, user_id: i64, new_password: &str) -> Result<(), database::Error> {
119        query("UPDATE torrust_user_authentication SET password_hash = ? WHERE user_id = ?")
120            .bind(new_password)
121            .bind(user_id)
122            .execute(&self.pool)
123            .await
124            .map_err(|_| database::Error::Error)
125            .and_then(|v| {
126                if v.rows_affected() > 0 {
127                    Ok(())
128                } else {
129                    Err(database::Error::UserNotFound)
130                }
131            })
132    }
133
134    async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
135        query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
136            .bind(user_id)
137            .fetch_one(&self.pool)
138            .await
139            .map_err(|_| database::Error::UserNotFound)
140    }
141
142    async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
143        query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
144            .bind(user_id)
145            .fetch_one(&self.pool)
146            .await
147            .map_err(|_| database::Error::UserNotFound)
148    }
149
150    async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
151        query_as::<_, UserProfile>(r#"SELECT user_id, username, COALESCE(email, "") as email, email_verified, COALESCE(bio, "") as bio, COALESCE(avatar, "") as avatar FROM torrust_user_profiles WHERE username = ?"#)
152            .bind(username)
153            .fetch_one(&self.pool)
154            .await
155            .map_err(|_| database::Error::UserNotFound)
156    }
157
158    async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
159        query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
160            .bind(user_id)
161            .fetch_one(&self.pool)
162            .await
163            .map_err(|_| database::Error::UserNotFound)
164    }
165
166    /// Gets User Tracker Key
167    ///
168    /// # Panics
169    ///
170    /// Will panic if the input time overflows the `u64` seconds overflows the `i64` type.
171    /// (this will naturally happen in 292.5 billion years)
172    async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
173        const HOUR_IN_SECONDS: i64 = 3600;
174
175        let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
176
177        // get tracker key that is valid for at least one hour from now
178        query_as::<_, TrackerKey>("SELECT tracker_key AS 'key', date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = ? AND date_expiry > ? ORDER BY date_expiry DESC")
179            .bind(user_id)
180            .bind(current_time_plus_hour)
181            .fetch_one(&self.pool)
182            .await
183            .ok()
184    }
185
186    async fn count_users(&self) -> Result<i64, database::Error> {
187        query_as("SELECT COUNT(*) FROM torrust_users")
188            .fetch_one(&self.pool)
189            .await
190            .map(|(v,)| v)
191            .map_err(|_| database::Error::Error)
192    }
193
194    async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
195        // date needs to be in ISO 8601 format
196        let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
197
198        query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES (?, ?, ?)")
199            .bind(user_id)
200            .bind(reason)
201            .bind(date_expiry_string)
202            .execute(&self.pool)
203            .await
204            .map(|_| ())
205            .map_err(|_| database::Error::Error)
206    }
207
208    async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
209        query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
210            .bind(user_id)
211            .execute(&self.pool)
212            .await
213            .map_err(|_| database::Error::Error)
214            .and_then(|v| {
215                if v.rows_affected() > 0 {
216                    Ok(())
217                } else {
218                    Err(database::Error::UserNotFound)
219                }
220            })
221    }
222
223    async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
224        query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
225            .bind(user_id)
226            .execute(&self.pool)
227            .await
228            .map_err(|_| database::Error::Error)
229            .and_then(|v| {
230                if v.rows_affected() > 0 {
231                    Ok(())
232                } else {
233                    Err(database::Error::UserNotFound)
234                }
235            })
236    }
237
238    async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
239        let key = tracker_key.key.clone();
240
241        query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES (?, ?, ?)")
242            .bind(user_id)
243            .bind(key)
244            .bind(tracker_key.valid_until)
245            .execute(&self.pool)
246            .await
247            .map(|_| ())
248            .map_err(|_| database::Error::Error)
249    }
250
251    async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
252        query("DELETE FROM torrust_users WHERE user_id = ?")
253            .bind(user_id)
254            .execute(&self.pool)
255            .await
256            .map_err(|_| database::Error::Error)
257            .and_then(|v| {
258                if v.rows_affected() > 0 {
259                    Ok(())
260                } else {
261                    Err(database::Error::UserNotFound)
262                }
263            })
264    }
265
266    async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
267        query("INSERT INTO torrust_categories (name) VALUES (?)")
268            .bind(category_name)
269            .execute(&self.pool)
270            .await
271            .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
272            .map_err(|_| database::Error::Error)
273    }
274
275    async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
276        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
277            .bind(category_id)
278            .fetch_one(&self.pool)
279            .await
280            .map_err(|_| database::Error::CategoryNotFound)
281    }
282
283    async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
284        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
285            .bind(category_name)
286            .fetch_one(&self.pool)
287            .await
288            .map_err(|_| database::Error::CategoryNotFound)
289    }
290
291    async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
292        query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name")
293            .fetch_all(&self.pool)
294            .await
295            .map_err(|_| database::Error::Error)
296    }
297
298    async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
299        query("DELETE FROM torrust_categories WHERE name = ?")
300            .bind(category_name)
301            .execute(&self.pool)
302            .await
303            .map_err(|_| database::Error::Error)
304            .and_then(|v| {
305                if v.rows_affected() > 0 {
306                    Ok(())
307                } else {
308                    Err(database::Error::CategoryNotFound)
309                }
310            })
311    }
312
313    // todo: refactor this
314    #[allow(clippy::too_many_lines)]
315    async fn get_torrents_search_sorted_paginated(
316        &self,
317        search: &Option<String>,
318        categories: &Option<Vec<String>>,
319        tags: &Option<Vec<String>>,
320        sort: &Sorting,
321        offset: u64,
322        limit: u8,
323    ) -> Result<TorrentsResponse, database::Error> {
324        let title = match search {
325            None => "%".to_string(),
326            Some(v) => format!("%{v}%"),
327        };
328
329        let sort_query: String = match sort {
330            Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
331            Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
332            Sorting::SeedersAsc => "seeders ASC".to_string(),
333            Sorting::SeedersDesc => "seeders DESC".to_string(),
334            Sorting::LeechersAsc => "leechers ASC".to_string(),
335            Sorting::LeechersDesc => "leechers DESC".to_string(),
336            Sorting::NameAsc => "title ASC".to_string(),
337            Sorting::NameDesc => "title DESC".to_string(),
338            Sorting::SizeAsc => "size ASC".to_string(),
339            Sorting::SizeDesc => "size DESC".to_string(),
340        };
341
342        let category_filter_query = if let Some(c) = categories {
343            let mut i = 0;
344            let mut category_filters = String::new();
345            for category in c {
346                // don't take user input in the db query
347                if let Ok(sanitized_category) = self.get_category_from_name(category).await {
348                    let mut str = format!("tc.name = '{}'", sanitized_category.name);
349                    if i > 0 {
350                        str = format!(" OR {str}");
351                    }
352                    category_filters.push_str(&str);
353                    i += 1;
354                }
355            }
356            if category_filters.is_empty() {
357                String::new()
358            } else {
359                format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
360            }
361        } else {
362            String::new()
363        };
364
365        let tag_filter_query = if let Some(t) = tags {
366            let mut i = 0;
367            let mut tag_filters = String::new();
368            for tag in t {
369                // don't take user input in the db query
370                if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
371                    let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
372                    if i > 0 {
373                        str = format!(" OR {str}");
374                    }
375                    tag_filters.push_str(&str);
376                    i += 1;
377                }
378            }
379            if tag_filters.is_empty() {
380                String::new()
381            } else {
382                format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
383            }
384        } else {
385            String::new()
386        };
387
388        let mut query_string = format!(
389            "SELECT
390            tt.torrent_id,
391            tp.username AS uploader,
392            tt.info_hash,
393            ti.title,
394            ti.description,
395            tt.category_id,
396            DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded,
397            tt.size AS file_size,
398            tt.name,
399            tt.comment,
400            tt.creation_date,
401            tt.created_by,
402            tt.`encoding`,
403            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
404            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
405            FROM torrust_torrents tt
406            {category_filter_query}
407            {tag_filter_query}
408            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
409            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
410            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
411            WHERE title LIKE ?
412            GROUP BY tt.torrent_id"
413        );
414
415        let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
416
417        let count_result: Result<i64, database::Error> = query_as(&count_query)
418            .bind(title.clone())
419            .fetch_one(&self.pool)
420            .await
421            .map(|(v,)| v)
422            .map_err(|_| database::Error::Error);
423
424        let count = count_result?;
425
426        query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
427
428        let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
429            .bind(title)
430            .bind(i64::saturating_add_unsigned(0, offset))
431            .bind(limit)
432            .fetch_all(&self.pool)
433            .await
434            .map_err(|_| database::Error::Error)?;
435
436        Ok(TorrentsResponse {
437            total: u32::try_from(count).expect("variable `count` is larger than u32"),
438            results: res,
439        })
440    }
441
442    #[allow(clippy::too_many_lines)]
443    async fn insert_torrent_and_get_id(
444        &self,
445        original_info_hash: &InfoHash,
446        torrent: &Torrent,
447        uploader_id: UserId,
448        metadata: &Metadata,
449    ) -> Result<i64, database::Error> {
450        let info_hash = torrent.canonical_info_hash_hex();
451        let canonical_info_hash = torrent.canonical_info_hash();
452
453        // open pool connection
454        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
455
456        // start db transaction
457        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
458
459        // BEP 30: <http://www.bittorrent.org/beps/bep_0030.html>.
460        // Torrent file can only hold a `pieces` key or a `root hash` key
461        let is_bep_30 = !matches!(&torrent.info.pieces, Some(_pieces));
462
463        let pieces = torrent.info.pieces.as_ref().map(|pieces| from_bytes(pieces.as_ref()));
464
465        let root_hash = torrent
466            .info
467            .root_hash
468            .as_ref()
469            .map(|root_hash| from_bytes(root_hash.as_ref()));
470
471        // add torrent
472        let torrent_id = query(
473            "INSERT INTO torrust_torrents (
474            uploader_id,
475            category_id,
476            info_hash,
477            size,
478            name,
479            pieces,
480            root_hash,
481            piece_length,
482            private,
483            is_bep_30,
484            `source`,
485            comment,
486            date_uploaded,
487            creation_date,
488            created_by,
489            `encoding`
490        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, UTC_TIMESTAMP(), ?, ?, ?)",
491        )
492        .bind(uploader_id)
493        .bind(metadata.category_id)
494        .bind(info_hash.to_lowercase())
495        .bind(torrent.file_size())
496        .bind(torrent.info.name.to_string())
497        .bind(pieces)
498        .bind(root_hash)
499        .bind(torrent.info.piece_length)
500        .bind(torrent.info.private)
501        .bind(is_bep_30)
502        .bind(torrent.info.source.clone())
503        .bind(torrent.comment.clone())
504        .bind(torrent.creation_date)
505        .bind(torrent.created_by.clone())
506        .bind(torrent.encoding.clone())
507        .execute(&mut *tx)
508        .await
509        .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
510        .map_err(|e| match e {
511            sqlx::Error::Database(err) => {
512                tracing::error!("DB error: {:?}", err);
513                if err.message().contains("Duplicate entry") && err.message().contains("info_hash") {
514                    database::Error::TorrentAlreadyExists
515                } else {
516                    database::Error::Error
517                }
518            }
519            _ => database::Error::Error,
520        })?;
521
522        // add torrent canonical infohash
523
524        let insert_info_hash_result =
525            query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
526                .bind(original_info_hash.to_hex_string())
527                .bind(canonical_info_hash.to_hex_string())
528                .bind(true)
529                .execute(&mut *tx)
530                .await
531                .map(|_| ())
532                .map_err(|err| {
533                    tracing::error!("DB error: {:?}", err);
534                    database::Error::Error
535                });
536
537        // rollback transaction on error
538        if let Err(e) = insert_info_hash_result {
539            drop(tx.rollback().await);
540            return Err(e);
541        }
542
543        let insert_torrent_files_result = if let Some(length) = torrent.info.length {
544            query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
545                .bind(torrent.info.md5sum.clone())
546                .bind(torrent_id)
547                .bind(length)
548                .execute(&mut *tx)
549                .await
550                .map(|_| ())
551                .map_err(|_| database::Error::Error)
552        } else {
553            let files = torrent.info.files.as_ref().unwrap();
554
555            for file in files {
556                let path = file.path.join("/");
557
558                let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
559                    .bind(file.md5sum.clone())
560                    .bind(torrent_id)
561                    .bind(file.length)
562                    .bind(path)
563                    .execute(&mut *tx)
564                    .await
565                    .map_err(|_| database::Error::Error)?;
566            }
567
568            Ok(())
569        };
570
571        // rollback transaction on error
572        if let Err(e) = insert_torrent_files_result {
573            drop(tx.rollback().await);
574            return Err(e);
575        }
576
577        let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
578        {
579            // flatten the nested vec (this will however remove the)
580            let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
581
582            for tracker_url in &announce_urls {
583                let () = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
584                    .bind(torrent_id)
585                    .bind(tracker_url)
586                    .execute(&mut *tx)
587                    .await
588                    .map(|_| ())
589                    .map_err(|_| database::Error::Error)?;
590            }
591
592            Ok(())
593        } else {
594            let tracker_url = torrent.announce.as_ref().unwrap();
595
596            query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
597                .bind(torrent_id)
598                .bind(tracker_url)
599                .execute(&mut *tx)
600                .await
601                .map(|_| ())
602                .map_err(|_| database::Error::Error)
603        };
604
605        // rollback transaction on error
606        if let Err(e) = insert_torrent_announce_urls_result {
607            drop(tx.rollback().await);
608            return Err(e);
609        }
610
611        // add HTTP seeds
612
613        let insert_torrent_http_seeds_result: Result<(), database::Error> = if let Some(http_seeds) = &torrent.httpseeds {
614            for seed_url in http_seeds {
615                let () = query("INSERT INTO torrust_torrent_http_seeds (torrent_id, seed_url) VALUES (?, ?)")
616                    .bind(torrent_id)
617                    .bind(seed_url)
618                    .execute(&mut *tx)
619                    .await
620                    .map(|_| ())
621                    .map_err(|_| database::Error::Error)?;
622            }
623
624            Ok(())
625        } else {
626            Ok(())
627        };
628
629        // rollback transaction on error
630        if let Err(e) = insert_torrent_http_seeds_result {
631            drop(tx.rollback().await);
632            return Err(e);
633        }
634
635        // add nodes
636
637        let insert_torrent_nodes_result: Result<(), database::Error> = if let Some(nodes) = &torrent.nodes {
638            for node in nodes {
639                let () = query("INSERT INTO torrust_torrent_nodes (torrent_id, node_ip, node_port) VALUES (?, ?, ?)")
640                    .bind(torrent_id)
641                    .bind(node.0.clone())
642                    .bind(node.1)
643                    .execute(&mut *tx)
644                    .await
645                    .map(|_| ())
646                    .map_err(|_| database::Error::Error)?;
647            }
648
649            Ok(())
650        } else {
651            Ok(())
652        };
653
654        // rollback transaction on error
655        if let Err(e) = insert_torrent_nodes_result {
656            drop(tx.rollback().await);
657            return Err(e);
658        }
659
660        // add tags
661
662        for tag_id in &metadata.tags {
663            let insert_torrent_tag_result = query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
664                .bind(torrent_id)
665                .bind(tag_id)
666                .execute(&mut *tx)
667                .await
668                .map_err(|err| database::Error::ErrorWithText(err.to_string()));
669
670            // rollback transaction on error
671            if let Err(e) = insert_torrent_tag_result {
672                drop(tx.rollback().await);
673                return Err(e);
674            }
675        }
676
677        let insert_torrent_info_result =
678            query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
679                .bind(torrent_id)
680                .bind(metadata.title.clone())
681                .bind(metadata.description.clone())
682                .execute(&mut *tx)
683                .await
684                .map_err(|e| match e {
685                    sqlx::Error::Database(err) => {
686                        tracing::error!("DB error: {:?}", err);
687                        if err.message().contains("Duplicate entry") && err.message().contains("title") {
688                            database::Error::TorrentTitleAlreadyExists
689                        } else {
690                            database::Error::Error
691                        }
692                    }
693                    _ => database::Error::Error,
694                });
695
696        // commit or rollback transaction and return user_id on success
697        match insert_torrent_info_result {
698            Ok(_) => {
699                drop(tx.commit().await);
700                Ok(torrent_id)
701            }
702            Err(e) => {
703                drop(tx.rollback().await);
704                Err(e)
705            }
706        }
707    }
708
709    async fn get_torrent_canonical_info_hash_group(
710        &self,
711        canonical: &InfoHash,
712    ) -> Result<CanonicalInfoHashGroup, database::Error> {
713        let db_info_hashes = query_as::<_, DbTorrentInfoHash>(
714            "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE canonical_info_hash = ?",
715        )
716        .bind(canonical.to_hex_string())
717        .fetch_all(&self.pool)
718        .await
719        .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
720
721        let info_hashes: Vec<InfoHash> = db_info_hashes
722            .into_iter()
723            .map(|db_info_hash| {
724                InfoHash::from_str(&db_info_hash.info_hash)
725                    .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_info_hash.info_hash))
726            })
727            .collect();
728
729        Ok(CanonicalInfoHashGroup {
730            canonical_info_hash: *canonical,
731            original_info_hashes: info_hashes,
732        })
733    }
734
735    async fn find_canonical_info_hash_for(&self, info_hash: &InfoHash) -> Result<Option<InfoHash>, database::Error> {
736        let maybe_db_torrent_info_hash = query_as::<_, DbTorrentInfoHash>(
737            "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE info_hash = ?",
738        )
739        .bind(info_hash.to_hex_string())
740        .fetch_optional(&self.pool)
741        .await
742        .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
743
744        match maybe_db_torrent_info_hash {
745            Some(db_torrent_info_hash) => Ok(Some(
746                InfoHash::from_str(&db_torrent_info_hash.canonical_info_hash)
747                    .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_torrent_info_hash.canonical_info_hash)),
748            )),
749            None => Ok(None),
750        }
751    }
752
753    async fn add_info_hash_to_canonical_info_hash_group(
754        &self,
755        info_hash: &InfoHash,
756        canonical: &InfoHash,
757    ) -> Result<(), database::Error> {
758        query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
759            .bind(info_hash.to_hex_string())
760            .bind(canonical.to_hex_string())
761            .bind(true)
762            .execute(&self.pool)
763            .await
764            .map(|_| ())
765            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
766    }
767
768    async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrent, database::Error> {
769        query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE torrent_id = ?")
770            .bind(torrent_id)
771            .fetch_one(&self.pool)
772            .await
773            .map_err(|_| database::Error::TorrentNotFound)
774    }
775
776    async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrent, database::Error> {
777        query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE info_hash = ?")
778            .bind(info_hash.to_hex_string().to_lowercase())
779            .fetch_one(&self.pool)
780            .await
781            .map_err(|_| database::Error::TorrentNotFound)
782    }
783
784    async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
785        let db_torrent_files =
786            query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
787                .bind(torrent_id)
788                .fetch_all(&self.pool)
789                .await
790                .map_err(|_| database::Error::TorrentNotFound)?;
791
792        let torrent_files: Vec<TorrentFile> = db_torrent_files
793            .into_iter()
794            .map(|tf| TorrentFile {
795                path: tf
796                    .path
797                    .unwrap_or_default()
798                    .split('/')
799                    .map(std::string::ToString::to_string)
800                    .collect(),
801                length: tf.length,
802                md5sum: tf.md5sum,
803            })
804            .collect();
805
806        Ok(torrent_files)
807    }
808
809    async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
810        query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
811            .bind(torrent_id)
812            .fetch_all(&self.pool)
813            .await
814            .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
815            .map_err(|_| database::Error::TorrentNotFound)
816    }
817
818    async fn get_torrent_http_seed_urls_from_id(&self, torrent_id: i64) -> Result<Vec<String>, database::Error> {
819        query_as::<_, DbTorrentHttpSeedUrl>("SELECT seed_url FROM torrust_torrent_http_seeds WHERE torrent_id = ?")
820            .bind(torrent_id)
821            .fetch_all(&self.pool)
822            .await
823            .map(|v| v.iter().map(|a| a.seed_url.to_string()).collect())
824            .map_err(|_| database::Error::TorrentNotFound)
825    }
826
827    async fn get_torrent_nodes_from_id(&self, torrent_id: i64) -> Result<Vec<(String, i64)>, database::Error> {
828        query_as::<_, DbTorrentNode>("SELECT node_ip, node_port FROM torrust_torrent_nodes WHERE torrent_id = ?")
829            .bind(torrent_id)
830            .fetch_all(&self.pool)
831            .await
832            .map(|v| v.iter().map(|a| (a.node_ip.to_string(), a.node_port)).collect())
833            .map_err(|_| database::Error::TorrentNotFound)
834    }
835
836    async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
837        query_as::<_, TorrentListing>(
838            "SELECT
839            tt.torrent_id,
840            tp.username AS uploader,
841            tt.info_hash,
842            ti.title,
843            ti.description,
844            tt.category_id,
845            DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded,
846            tt.size AS file_size,
847            tt.name,
848            tt.comment,
849            tt.creation_date,
850            tt.created_by,
851            tt.`encoding`,
852            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
853            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
854            FROM torrust_torrents tt
855            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
856            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
857            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
858            WHERE tt.torrent_id = ?
859            GROUP BY torrent_id",
860        )
861        .bind(torrent_id)
862        .fetch_one(&self.pool)
863        .await
864        .map_err(|_| database::Error::TorrentNotFound)
865    }
866
867    async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
868        query_as::<_, TorrentListing>(
869            "SELECT
870            tt.torrent_id,
871            tp.username AS uploader,
872            tt.info_hash,
873            ti.title,
874            ti.description,
875            tt.category_id,
876            DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded,
877            tt.size AS file_size,
878            tt.name,
879            tt.comment,
880            tt.creation_date,
881            tt.created_by,
882            tt.`encoding`,
883            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
884            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
885            FROM torrust_torrents tt
886            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
887            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
888            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
889            WHERE tt.info_hash = ?
890            GROUP BY torrent_id",
891        )
892        .bind(info_hash.to_hex_string().to_lowercase())
893        .fetch_one(&self.pool)
894        .await
895        .map_err(|_| database::Error::TorrentNotFound)
896    }
897
898    async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
899        query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
900            .fetch_all(&self.pool)
901            .await
902            .map_err(|_| database::Error::Error)
903    }
904
905    async fn get_torrents_with_stats_not_updated_since(
906        &self,
907        datetime: DateTime<Utc>,
908        limit: i64,
909    ) -> Result<Vec<TorrentCompact>, database::Error> {
910        query_as::<_, TorrentCompact>(
911            "SELECT tt.torrent_id, tt.info_hash
912             FROM torrust_torrents tt
913             LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
914             WHERE tts.updated_at < ? OR tts.updated_at IS NULL
915             ORDER BY tts.updated_at ASC
916             LIMIT ?
917        ",
918        )
919        .bind(datetime.format(DATETIME_FORMAT).to_string())
920        .bind(limit)
921        .fetch_all(&self.pool)
922        .await
923        .map_err(|_| database::Error::Error)
924    }
925
926    async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
927        query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
928            .bind(title)
929            .bind(torrent_id)
930            .execute(&self.pool)
931            .await
932            .map_err(|e| match e {
933                sqlx::Error::Database(err) => {
934                    tracing::error!("DB error: {:?}", err);
935                    if err.message().contains("Duplicate entry") && err.message().contains("title") {
936                        database::Error::TorrentTitleAlreadyExists
937                    } else {
938                        database::Error::Error
939                    }
940                }
941                _ => database::Error::Error,
942            })
943            .and_then(|v| {
944                if v.rows_affected() > 0 {
945                    Ok(())
946                } else {
947                    Err(database::Error::TorrentNotFound)
948                }
949            })
950    }
951
952    async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
953        query("UPDATE torrust_torrent_info SET description = ? WHERE torrent_id = ?")
954            .bind(description)
955            .bind(torrent_id)
956            .execute(&self.pool)
957            .await
958            .map_err(|_| database::Error::Error)
959            .and_then(|v| {
960                if v.rows_affected() > 0 {
961                    Ok(())
962                } else {
963                    Err(database::Error::TorrentNotFound)
964                }
965            })
966    }
967
968    async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
969        query("UPDATE torrust_torrents SET category_id = ? WHERE torrent_id = ?")
970            .bind(category_id)
971            .bind(torrent_id)
972            .execute(&self.pool)
973            .await
974            .map_err(|_| database::Error::Error)
975            .and_then(|v| {
976                if v.rows_affected() > 0 {
977                    Ok(())
978                } else {
979                    Err(database::Error::TorrentNotFound)
980                }
981            })
982    }
983
984    async fn insert_tag_and_get_id(&self, name: &str) -> Result<i64, database::Error> {
985        query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
986            .bind(name)
987            .execute(&self.pool)
988            .await
989            .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
990            .map_err(|e| match e {
991                sqlx::Error::Database(err) => {
992                    tracing::error!("DB error: {:?}", err);
993                    if err.message().contains("Duplicate entry") && err.message().contains("name") {
994                        database::Error::TagAlreadyExists
995                    } else {
996                        database::Error::Error
997                    }
998                }
999                _ => database::Error::Error,
1000            })
1001    }
1002
1003    async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
1004        query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
1005            .bind(tag_id)
1006            .execute(&self.pool)
1007            .await
1008            .map(|_| ())
1009            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1010    }
1011
1012    async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1013        query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1014            .bind(torrent_id)
1015            .bind(tag_id)
1016            .execute(&self.pool)
1017            .await
1018            .map(|_| ())
1019            .map_err(|_| database::Error::Error)
1020    }
1021
1022    async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
1023        let mut tx = self
1024            .pool
1025            .begin()
1026            .await
1027            .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1028
1029        for tag_id in tag_ids {
1030            query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1031                .bind(torrent_id)
1032                .bind(tag_id)
1033                .execute(&mut *tx)
1034                .await
1035                .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1036        }
1037
1038        tx.commit()
1039            .await
1040            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1041    }
1042
1043    async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1044        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
1045            .bind(torrent_id)
1046            .bind(tag_id)
1047            .execute(&self.pool)
1048            .await
1049            .map(|_| ())
1050            .map_err(|_| database::Error::Error)
1051    }
1052
1053    async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
1054        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
1055            .bind(torrent_id)
1056            .execute(&self.pool)
1057            .await
1058            .map(|_| ())
1059            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1060    }
1061
1062    async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
1063        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
1064            .bind(name)
1065            .fetch_one(&self.pool)
1066            .await
1067            .map_err(|_| database::Error::TagNotFound)
1068    }
1069
1070    async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
1071        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
1072            .fetch_all(&self.pool)
1073            .await
1074            .map_err(|_| database::Error::Error)
1075    }
1076
1077    async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
1078        query_as::<_, TorrentTag>(
1079            "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
1080            FROM torrust_torrent_tags
1081            JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
1082            WHERE torrust_torrent_tag_links.torrent_id = ?",
1083        )
1084        .bind(torrent_id)
1085        .fetch_all(&self.pool)
1086        .await
1087        .map_err(|_| database::Error::Error)
1088    }
1089
1090    async fn update_tracker_info(
1091        &self,
1092        torrent_id: i64,
1093        tracker_url: &Url,
1094        seeders: i64,
1095        leechers: i64,
1096    ) -> Result<(), database::Error> {
1097        query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES (?, ?, ?, ?, ?)")
1098            .bind(torrent_id)
1099            .bind(tracker_url.to_string())
1100            .bind(seeders)
1101            .bind(leechers)
1102            .bind(datetime_now())
1103            .execute(&self.pool)
1104            .await
1105            .map(|_| ())
1106            .map_err(|_| database::Error::TorrentNotFound)
1107    }
1108
1109    async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
1110        query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
1111            .bind(torrent_id)
1112            .execute(&self.pool)
1113            .await
1114            .map_err(|_| database::Error::Error)
1115            .and_then(|v| {
1116                if v.rows_affected() > 0 {
1117                    Ok(())
1118                } else {
1119                    Err(database::Error::TorrentNotFound)
1120                }
1121            })
1122    }
1123
1124    async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
1125        for table in TABLES_TO_TRUNCATE {
1126            query(&format!("DELETE FROM {table};"))
1127                .execute(&self.pool)
1128                .await
1129                .map_err(|_| database::Error::Error)?;
1130        }
1131
1132        Ok(())
1133    }
1134}