torrust_index/databases/
sqlite.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};
8use url::Url;
9
10use super::database::TABLES_TO_TRUNCATE;
11use crate::databases::database;
12use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
13use crate::models::category::CategoryId;
14use crate::models::info_hash::InfoHash;
15use crate::models::response::TorrentsResponse;
16use crate::models::torrent::{Metadata, TorrentListing};
17use crate::models::torrent_file::{
18    DbTorrent, DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentHttpSeedUrl, DbTorrentNode, Torrent, TorrentFile,
19};
20use crate::models::torrent_tag::{TagId, TorrentTag};
21use crate::models::tracker_key::TrackerKey;
22use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
23use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
24use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
25use crate::utils::hex::from_bytes;
26
27pub struct Sqlite {
28    pub pool: SqlitePool,
29}
30
31#[async_trait]
32impl Database for Sqlite {
33    fn get_database_driver(&self) -> Driver {
34        Driver::Sqlite3
35    }
36
37    async fn new(database_url: &str) -> Self {
38        let connection_options = SqliteConnectOptions::from_str(database_url)
39            .expect("Unable to create connection options.")
40            .log_statements(log::LevelFilter::Debug)
41            .log_slow_statements(log::LevelFilter::Info, Duration::from_secs(1));
42
43        let db = SqlitePoolOptions::new()
44            .connect_with(connection_options)
45            .await
46            .expect("Unable to create database pool.");
47
48        sqlx::migrate!("migrations/sqlite3")
49            .run(&db)
50            .await
51            .expect("Could not run database migrations.");
52
53        Self { pool: db }
54    }
55
56    async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
57        // open pool connection
58        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
59
60        // start db transaction
61        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
62
63        // create the user account and get the user id
64        let user_id =
65            query("INSERT INTO torrust_users (date_registered) VALUES (strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')))")
66                .execute(&mut *tx)
67                .await
68                .map(|v| v.last_insert_rowid())
69                .map_err(|_| database::Error::Error)?;
70
71        // add password hash for account
72        let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
73            .bind(user_id)
74            .bind(password_hash)
75            .execute(&mut *tx)
76            .await
77            .map_err(|_| database::Error::Error);
78
79        // rollback transaction on error
80        if let Err(e) = insert_user_auth_result {
81            drop(tx.rollback().await);
82            return Err(e);
83        }
84
85        // add account profile details
86        let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
87            .bind(user_id)
88            .bind(username)
89            .bind(email)
90            .execute(&mut *tx)
91            .await
92            .map_err(|e| match e {
93                sqlx::Error::Database(err) => {
94                    if err.message().contains("username") {
95                        database::Error::UsernameTaken
96                    } else if err.message().contains("email") {
97                        database::Error::EmailTaken
98                    } else {
99                        database::Error::Error
100                    }
101                }
102                _ => database::Error::Error
103            });
104
105        // commit or rollback transaction and return user_id on success
106        match insert_user_profile_result {
107            Ok(_) => {
108                drop(tx.commit().await);
109                Ok(user_id)
110            }
111            Err(e) => {
112                drop(tx.rollback().await);
113                Err(e)
114            }
115        }
116    }
117
118    /// Change user's password.
119    async fn change_user_password(&self, user_id: i64, new_password: &str) -> Result<(), database::Error> {
120        query("UPDATE torrust_user_authentication SET password_hash = ? WHERE user_id = ?")
121            .bind(new_password)
122            .bind(user_id)
123            .execute(&self.pool)
124            .await
125            .map_err(|_| database::Error::Error)
126            .and_then(|v| {
127                if v.rows_affected() > 0 {
128                    Ok(())
129                } else {
130                    Err(database::Error::UserNotFound)
131                }
132            })
133    }
134
135    async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
136        query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
137            .bind(user_id)
138            .fetch_one(&self.pool)
139            .await
140            .map_err(|_| database::Error::UserNotFound)
141    }
142
143    async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
144        query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
145            .bind(user_id)
146            .fetch_one(&self.pool)
147            .await
148            .map_err(|_| database::Error::UserNotFound)
149    }
150
151    async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
152        query_as::<_, UserProfile>("SELECT * FROM torrust_user_profiles WHERE username = ?")
153            .bind(username)
154            .fetch_one(&self.pool)
155            .await
156            .map_err(|_| database::Error::UserNotFound)
157    }
158
159    async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
160        query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
161            .bind(user_id)
162            .fetch_one(&self.pool)
163            .await
164            .map_err(|_| database::Error::UserNotFound)
165    }
166
167    async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
168        const HOUR_IN_SECONDS: i64 = 3600;
169
170        // casting current_time() to i64 will overflow in the year 2262
171        let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
172
173        // get tracker key that is valid for at least one hour from now
174        query_as::<_, TrackerKey>("SELECT tracker_key AS key, date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = $1 AND date_expiry > $2 ORDER BY date_expiry DESC")
175            .bind(user_id)
176            .bind(current_time_plus_hour)
177            .fetch_one(&self.pool)
178            .await
179            .ok()
180    }
181
182    async fn count_users(&self) -> Result<i64, database::Error> {
183        query_as("SELECT COUNT(*) FROM torrust_users")
184            .fetch_one(&self.pool)
185            .await
186            .map(|(v,)| v)
187            .map_err(|_| database::Error::Error)
188    }
189
190    async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
191        // date needs to be in ISO 8601 format
192        let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
193
194        query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES ($1, $2, $3)")
195            .bind(user_id)
196            .bind(reason)
197            .bind(date_expiry_string)
198            .execute(&self.pool)
199            .await
200            .map(|_| ())
201            .map_err(|_| database::Error::Error)
202    }
203
204    async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
205        query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
206            .bind(user_id)
207            .execute(&self.pool)
208            .await
209            .map_err(|_| database::Error::Error)
210            .and_then(|v| {
211                if v.rows_affected() > 0 {
212                    Ok(())
213                } else {
214                    Err(database::Error::UserNotFound)
215                }
216            })
217    }
218
219    async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
220        query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
221            .bind(user_id)
222            .execute(&self.pool)
223            .await
224            .map(|_| ())
225            .map_err(|_| database::Error::Error)
226    }
227
228    async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
229        let key = tracker_key.key.clone();
230
231        query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES ($1, $2, $3)")
232            .bind(user_id)
233            .bind(key)
234            .bind(tracker_key.valid_until)
235            .execute(&self.pool)
236            .await
237            .map(|_| ())
238            .map_err(|_| database::Error::Error)
239    }
240
241    async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
242        query("DELETE FROM torrust_users WHERE user_id = ?")
243            .bind(user_id)
244            .execute(&self.pool)
245            .await
246            .map_err(|_| database::Error::Error)
247            .and_then(|v| {
248                if v.rows_affected() > 0 {
249                    Ok(())
250                } else {
251                    Err(database::Error::UserNotFound)
252                }
253            })
254    }
255
256    async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
257        query("INSERT INTO torrust_categories (name) VALUES (?)")
258            .bind(category_name)
259            .execute(&self.pool)
260            .await
261            .map(|v| v.last_insert_rowid())
262            .map_err(|_| database::Error::Error)
263    }
264
265    async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
266        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
267            .bind(category_id)
268            .fetch_one(&self.pool)
269            .await
270            .map_err(|_| database::Error::CategoryNotFound)
271    }
272
273    async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
274        query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
275            .bind(category_name)
276            .fetch_one(&self.pool)
277            .await
278            .map_err(|_| database::Error::CategoryNotFound)
279    }
280
281    async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
282        query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name;")
283            .fetch_all(&self.pool)
284            .await
285            .map_err(|_| database::Error::Error)
286    }
287
288    async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
289        query("DELETE FROM torrust_categories WHERE name = ?")
290            .bind(category_name)
291            .execute(&self.pool)
292            .await
293            .map_err(|_| database::Error::Error)
294            .and_then(|v| {
295                if v.rows_affected() > 0 {
296                    Ok(())
297                } else {
298                    Err(database::Error::CategoryNotFound)
299                }
300            })
301    }
302
303    // todo: refactor this
304    #[allow(clippy::too_many_lines)]
305    async fn get_torrents_search_sorted_paginated(
306        &self,
307        search: &Option<String>,
308        categories: &Option<Vec<String>>,
309        tags: &Option<Vec<String>>,
310        sort: &Sorting,
311        offset: u64,
312        limit: u8,
313    ) -> Result<TorrentsResponse, database::Error> {
314        let title = match search {
315            None => "%".to_string(),
316            Some(v) => format!("%{v}%"),
317        };
318
319        let sort_query: String = match sort {
320            Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
321            Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
322            Sorting::SeedersAsc => "seeders ASC".to_string(),
323            Sorting::SeedersDesc => "seeders DESC".to_string(),
324            Sorting::LeechersAsc => "leechers ASC".to_string(),
325            Sorting::LeechersDesc => "leechers DESC".to_string(),
326            Sorting::NameAsc => "title ASC".to_string(),
327            Sorting::NameDesc => "title DESC".to_string(),
328            Sorting::SizeAsc => "size ASC".to_string(),
329            Sorting::SizeDesc => "size DESC".to_string(),
330        };
331
332        let category_filter_query = if let Some(c) = categories {
333            let mut i = 0;
334            let mut category_filters = String::new();
335            for category in c {
336                // don't take user input in the db query
337                if let Ok(sanitized_category) = self.get_category_from_name(category).await {
338                    let mut str = format!("tc.name = '{}'", sanitized_category.name);
339                    if i > 0 {
340                        str = format!(" OR {str}");
341                    }
342                    category_filters.push_str(&str);
343                    i += 1;
344                }
345            }
346            if category_filters.is_empty() {
347                String::new()
348            } else {
349                format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
350            }
351        } else {
352            String::new()
353        };
354
355        let tag_filter_query = if let Some(t) = tags {
356            let mut i = 0;
357            let mut tag_filters = String::new();
358            for tag in t {
359                // don't take user input in the db query
360                if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
361                    let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
362                    if i > 0 {
363                        str = format!(" OR {str}");
364                    }
365                    tag_filters.push_str(&str);
366                    i += 1;
367                }
368            }
369            if tag_filters.is_empty() {
370                String::new()
371            } else {
372                format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
373            }
374        } else {
375            String::new()
376        };
377
378        let mut query_string = format!(
379            "SELECT
380            tt.torrent_id,
381            tp.username AS uploader,
382            tt.info_hash,
383            ti.title,
384            ti.description,
385            tt.category_id,
386            tt.date_uploaded,
387            tt.size AS file_size,
388            tt.name,
389            tt.comment,
390            tt.creation_date,
391            tt.created_by,
392            tt.`encoding`,
393            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
394            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
395            FROM torrust_torrents tt
396            {category_filter_query}
397            {tag_filter_query}
398            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
399            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
400            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
401            WHERE title LIKE ?
402            GROUP BY tt.torrent_id"
403        );
404
405        let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
406
407        let count_result: Result<i64, database::Error> = query_as(&count_query)
408            .bind(title.clone())
409            .fetch_one(&self.pool)
410            .await
411            .map(|(v,)| v)
412            .map_err(|_| database::Error::Error);
413
414        let count = count_result?;
415
416        query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
417
418        let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
419            .bind(title)
420            .bind(i64::saturating_add_unsigned(0, offset))
421            .bind(limit)
422            .fetch_all(&self.pool)
423            .await
424            .map_err(|_| database::Error::Error)?;
425
426        Ok(TorrentsResponse {
427            total: u32::try_from(count).expect("variable `count` is larger than u32"),
428            results: res,
429        })
430    }
431
432    #[allow(clippy::too_many_lines)]
433    async fn insert_torrent_and_get_id(
434        &self,
435        original_info_hash: &InfoHash,
436        torrent: &Torrent,
437        uploader_id: UserId,
438        metadata: &Metadata,
439    ) -> Result<i64, database::Error> {
440        let info_hash = torrent.canonical_info_hash_hex();
441        let canonical_info_hash = torrent.canonical_info_hash();
442
443        // open pool connection
444        let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
445
446        // start db transaction
447        let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
448
449        // BEP 30: <http://www.bittorrent.org/beps/bep_0030.html>.
450        // Torrent file can only hold a `pieces` key or a `root hash` key
451        let is_bep_30 = !matches!(&torrent.info.pieces, Some(_pieces));
452
453        let pieces = torrent.info.pieces.as_ref().map(|pieces| from_bytes(pieces.as_ref()));
454
455        let root_hash = torrent
456            .info
457            .root_hash
458            .as_ref()
459            .map(|root_hash| from_bytes(root_hash.as_ref()));
460
461        // add torrent
462        let torrent_id = query(
463            "INSERT INTO torrust_torrents (
464            uploader_id,
465            category_id,
466            info_hash,
467            size,
468            name,
469            pieces,
470            root_hash,
471            piece_length,
472            private,
473            is_bep_30,
474            `source`,
475            comment,
476            date_uploaded,
477            creation_date,
478            created_by,
479            `encoding`
480        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')), ?, ?, ?)",
481        )
482        .bind(uploader_id)
483        .bind(metadata.category_id)
484        .bind(info_hash.to_lowercase())
485        .bind(torrent.file_size())
486        .bind(torrent.info.name.to_string())
487        .bind(pieces)
488        .bind(root_hash)
489        .bind(torrent.info.piece_length)
490        .bind(torrent.info.private)
491        .bind(is_bep_30)
492        .bind(torrent.info.source.clone())
493        .bind(torrent.comment.clone())
494        .bind(torrent.creation_date)
495        .bind(torrent.created_by.clone())
496        .bind(torrent.encoding.clone())
497        .execute(&mut *tx)
498        .await
499        .map(|v| v.last_insert_rowid())
500        .map_err(|e| match e {
501            sqlx::Error::Database(err) => {
502                tracing::error!("DB error: {:?}", err);
503                if err.message().contains("UNIQUE") && err.message().contains("info_hash") {
504                    database::Error::TorrentAlreadyExists
505                } else {
506                    database::Error::Error
507                }
508            }
509            _ => database::Error::Error,
510        })?;
511
512        // add torrent canonical infohash
513
514        let insert_info_hash_result =
515            query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
516                .bind(original_info_hash.to_hex_string())
517                .bind(canonical_info_hash.to_hex_string())
518                .bind(true)
519                .execute(&mut *tx)
520                .await
521                .map(|_| ())
522                .map_err(|err| {
523                    tracing::error!("DB error: {:?}", err);
524                    database::Error::Error
525                });
526
527        // rollback transaction on error
528        if let Err(e) = insert_info_hash_result {
529            drop(tx.rollback().await);
530            return Err(e);
531        }
532
533        // add torrent files
534
535        let insert_torrent_files_result = if let Some(length) = torrent.info.length {
536            query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
537                .bind(torrent.info.md5sum.clone())
538                .bind(torrent_id)
539                .bind(length)
540                .execute(&mut *tx)
541                .await
542                .map(|_| ())
543                .map_err(|_| database::Error::Error)
544        } else {
545            let files = torrent.info.files.as_ref().unwrap();
546
547            for file in files {
548                let path = file.path.join("/");
549
550                let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
551                    .bind(file.md5sum.clone())
552                    .bind(torrent_id)
553                    .bind(file.length)
554                    .bind(path)
555                    .execute(&mut *tx)
556                    .await
557                    .map_err(|_| database::Error::Error)?;
558            }
559
560            Ok(())
561        };
562
563        // rollback transaction on error
564        if let Err(e) = insert_torrent_files_result {
565            drop(tx.rollback().await);
566            return Err(e);
567        }
568
569        // add announce URLs
570
571        let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
572        {
573            // flatten the nested vec (this will however remove the)
574            let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
575
576            for tracker_url in &announce_urls {
577                let () = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
578                    .bind(torrent_id)
579                    .bind(tracker_url)
580                    .execute(&mut *tx)
581                    .await
582                    .map(|_| ())
583                    .map_err(|_| database::Error::Error)?;
584            }
585
586            Ok(())
587        } else {
588            let tracker_url = torrent.announce.as_ref().unwrap();
589
590            query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
591                .bind(torrent_id)
592                .bind(tracker_url)
593                .execute(&mut *tx)
594                .await
595                .map(|_| ())
596                .map_err(|_| database::Error::Error)
597        };
598
599        // rollback transaction on error
600        if let Err(e) = insert_torrent_announce_urls_result {
601            drop(tx.rollback().await);
602            return Err(e);
603        }
604
605        // add HTTP seeds
606
607        let insert_torrent_http_seeds_result: Result<(), database::Error> = if let Some(http_seeds) = &torrent.httpseeds {
608            for seed_url in http_seeds {
609                let () = query("INSERT INTO torrust_torrent_http_seeds (torrent_id, seed_url) VALUES (?, ?)")
610                    .bind(torrent_id)
611                    .bind(seed_url)
612                    .execute(&mut *tx)
613                    .await
614                    .map(|_| ())
615                    .map_err(|_| database::Error::Error)?;
616            }
617
618            Ok(())
619        } else {
620            Ok(())
621        };
622
623        // rollback transaction on error
624        if let Err(e) = insert_torrent_http_seeds_result {
625            drop(tx.rollback().await);
626            return Err(e);
627        }
628
629        // add nodes
630
631        let insert_torrent_nodes_result: Result<(), database::Error> = if let Some(nodes) = &torrent.nodes {
632            for node in nodes {
633                let () = query("INSERT INTO torrust_torrent_nodes (torrent_id, node_ip, node_port) VALUES (?, ?, ?)")
634                    .bind(torrent_id)
635                    .bind(node.0.clone())
636                    .bind(node.1)
637                    .execute(&mut *tx)
638                    .await
639                    .map(|_| ())
640                    .map_err(|_| database::Error::Error)?;
641            }
642
643            Ok(())
644        } else {
645            Ok(())
646        };
647
648        // rollback transaction on error
649        if let Err(e) = insert_torrent_nodes_result {
650            drop(tx.rollback().await);
651            return Err(e);
652        }
653
654        // add tags
655
656        for tag_id in &metadata.tags {
657            let insert_torrent_tag_result = query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
658                .bind(torrent_id)
659                .bind(tag_id)
660                .execute(&mut *tx)
661                .await
662                .map_err(|err| database::Error::ErrorWithText(err.to_string()));
663
664            // rollback transaction on error
665            if let Err(e) = insert_torrent_tag_result {
666                drop(tx.rollback().await);
667                return Err(e);
668            }
669        }
670
671        let insert_torrent_info_result =
672            query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
673                .bind(torrent_id)
674                .bind(metadata.title.clone())
675                .bind(metadata.description.clone())
676                .execute(&mut *tx)
677                .await
678                .map_err(|e| match e {
679                    sqlx::Error::Database(err) => {
680                        tracing::error!("DB error: {:?}", err);
681                        if err.message().contains("UNIQUE") && err.message().contains("title") {
682                            database::Error::TorrentTitleAlreadyExists
683                        } else {
684                            database::Error::Error
685                        }
686                    }
687                    _ => database::Error::Error,
688                });
689
690        // commit or rollback transaction and return user_id on success
691        match insert_torrent_info_result {
692            Ok(_) => {
693                drop(tx.commit().await);
694                Ok(torrent_id)
695            }
696            Err(e) => {
697                drop(tx.rollback().await);
698                Err(e)
699            }
700        }
701    }
702
703    async fn get_torrent_canonical_info_hash_group(
704        &self,
705        canonical: &InfoHash,
706    ) -> Result<CanonicalInfoHashGroup, database::Error> {
707        let db_info_hashes = query_as::<_, DbTorrentInfoHash>(
708            "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE canonical_info_hash = ?",
709        )
710        .bind(canonical.to_hex_string())
711        .fetch_all(&self.pool)
712        .await
713        .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
714
715        let info_hashes: Vec<InfoHash> = db_info_hashes
716            .into_iter()
717            .map(|db_info_hash| {
718                InfoHash::from_str(&db_info_hash.info_hash)
719                    .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_info_hash.info_hash))
720            })
721            .collect();
722
723        Ok(CanonicalInfoHashGroup {
724            canonical_info_hash: *canonical,
725            original_info_hashes: info_hashes,
726        })
727    }
728
729    async fn find_canonical_info_hash_for(&self, info_hash: &InfoHash) -> Result<Option<InfoHash>, database::Error> {
730        let maybe_db_torrent_info_hash = query_as::<_, DbTorrentInfoHash>(
731            "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE info_hash = ?",
732        )
733        .bind(info_hash.to_hex_string())
734        .fetch_optional(&self.pool)
735        .await
736        .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
737
738        match maybe_db_torrent_info_hash {
739            Some(db_torrent_info_hash) => Ok(Some(
740                InfoHash::from_str(&db_torrent_info_hash.canonical_info_hash)
741                    .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_torrent_info_hash.canonical_info_hash)),
742            )),
743            None => Ok(None),
744        }
745    }
746
747    async fn add_info_hash_to_canonical_info_hash_group(
748        &self,
749        original: &InfoHash,
750        canonical: &InfoHash,
751    ) -> Result<(), database::Error> {
752        query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
753            .bind(original.to_hex_string())
754            .bind(canonical.to_hex_string())
755            .bind(true)
756            .execute(&self.pool)
757            .await
758            .map(|_| ())
759            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
760    }
761
762    async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrent, database::Error> {
763        query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE torrent_id = ?")
764            .bind(torrent_id)
765            .fetch_one(&self.pool)
766            .await
767            .map_err(|_| database::Error::TorrentNotFound)
768    }
769
770    async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrent, database::Error> {
771        query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE info_hash = ?")
772            .bind(info_hash.to_hex_string().to_lowercase())
773            .fetch_one(&self.pool)
774            .await
775            .map_err(|_| database::Error::TorrentNotFound)
776    }
777
778    async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
779        let db_torrent_files =
780            query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
781                .bind(torrent_id)
782                .fetch_all(&self.pool)
783                .await
784                .map_err(|_| database::Error::TorrentNotFound)?;
785
786        let torrent_files: Vec<TorrentFile> = db_torrent_files
787            .into_iter()
788            .map(|tf| TorrentFile {
789                path: tf
790                    .path
791                    .unwrap_or_default()
792                    .split('/')
793                    .map(std::string::ToString::to_string)
794                    .collect(),
795                length: tf.length,
796                md5sum: tf.md5sum,
797            })
798            .collect();
799
800        Ok(torrent_files)
801    }
802
803    async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
804        query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
805            .bind(torrent_id)
806            .fetch_all(&self.pool)
807            .await
808            .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
809            .map_err(|_| database::Error::TorrentNotFound)
810    }
811
812    async fn get_torrent_http_seed_urls_from_id(&self, torrent_id: i64) -> Result<Vec<String>, database::Error> {
813        query_as::<_, DbTorrentHttpSeedUrl>("SELECT seed_url FROM torrust_torrent_http_seeds WHERE torrent_id = ?")
814            .bind(torrent_id)
815            .fetch_all(&self.pool)
816            .await
817            .map(|v| v.iter().map(|a| a.seed_url.to_string()).collect())
818            .map_err(|_| database::Error::TorrentNotFound)
819    }
820
821    async fn get_torrent_nodes_from_id(&self, torrent_id: i64) -> Result<Vec<(String, i64)>, database::Error> {
822        query_as::<_, DbTorrentNode>("SELECT node_ip, node_port FROM torrust_torrent_nodes WHERE torrent_id = ?")
823            .bind(torrent_id)
824            .fetch_all(&self.pool)
825            .await
826            .map(|v| v.iter().map(|a| (a.node_ip.to_string(), a.node_port)).collect())
827            .map_err(|_| database::Error::TorrentNotFound)
828    }
829
830    async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
831        query_as::<_, TorrentListing>(
832            "SELECT 
833            tt.torrent_id,
834            tp.username AS uploader,
835            tt.info_hash, ti.title,
836            ti.description,
837            tt.category_id,
838            tt.date_uploaded,
839            tt.size AS file_size,
840            tt.name,
841            tt.comment,
842            tt.creation_date,
843            tt.created_by,
844            tt.`encoding`,
845            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
846            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
847            FROM torrust_torrents tt
848            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
849            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
850            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
851            WHERE tt.torrent_id = ?
852            GROUP BY ts.torrent_id",
853        )
854        .bind(torrent_id)
855        .fetch_one(&self.pool)
856        .await
857        .map_err(|_| database::Error::TorrentNotFound)
858    }
859
860    async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
861        query_as::<_, TorrentListing>(
862            "SELECT
863            tt.torrent_id,
864            tp.username AS uploader,
865            tt.info_hash, ti.title,
866            ti.description,
867            tt.category_id,
868            tt.date_uploaded,
869            tt.size AS file_size,
870            tt.name,
871            tt.comment,
872            tt.creation_date,
873            tt.created_by,
874            tt.`encoding`,
875            CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
876            CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
877            FROM torrust_torrents tt
878            INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
879            INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
880            LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
881            WHERE tt.info_hash = ?
882            GROUP BY ts.torrent_id",
883        )
884        .bind(info_hash.to_string().to_lowercase())
885        .fetch_one(&self.pool)
886        .await
887        .map_err(|_| database::Error::TorrentNotFound)
888    }
889
890    async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
891        query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
892            .fetch_all(&self.pool)
893            .await
894            .map_err(|_| database::Error::Error)
895    }
896
897    async fn get_torrents_with_stats_not_updated_since(
898        &self,
899        datetime: DateTime<Utc>,
900        limit: i64,
901    ) -> Result<Vec<TorrentCompact>, database::Error> {
902        query_as::<_, TorrentCompact>(
903            "SELECT tt.torrent_id, tt.info_hash
904             FROM torrust_torrents tt
905             LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
906             WHERE tts.updated_at < ? OR tts.updated_at IS NULL
907             ORDER BY tts.updated_at ASC
908             LIMIT ?
909        ",
910        )
911        .bind(datetime.format(DATETIME_FORMAT).to_string())
912        .bind(limit)
913        .fetch_all(&self.pool)
914        .await
915        .map_err(|_| database::Error::Error)
916    }
917
918    async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
919        query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
920            .bind(title)
921            .bind(torrent_id)
922            .execute(&self.pool)
923            .await
924            .map_err(|e| match e {
925                sqlx::Error::Database(err) => {
926                    tracing::error!("DB error: {:?}", err);
927                    if err.message().contains("UNIQUE") && err.message().contains("title") {
928                        database::Error::TorrentTitleAlreadyExists
929                    } else {
930                        database::Error::Error
931                    }
932                }
933                _ => database::Error::Error,
934            })
935            .and_then(|v| {
936                if v.rows_affected() > 0 {
937                    Ok(())
938                } else {
939                    Err(database::Error::TorrentNotFound)
940                }
941            })
942    }
943
944    async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
945        query("UPDATE torrust_torrent_info SET description = $1 WHERE torrent_id = $2")
946            .bind(description)
947            .bind(torrent_id)
948            .execute(&self.pool)
949            .await
950            .map_err(|_| database::Error::Error)
951            .and_then(|v| {
952                if v.rows_affected() > 0 {
953                    Ok(())
954                } else {
955                    Err(database::Error::TorrentNotFound)
956                }
957            })
958    }
959
960    async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
961        query("UPDATE torrust_torrents SET category_id = $1 WHERE torrent_id = $2")
962            .bind(category_id)
963            .bind(torrent_id)
964            .execute(&self.pool)
965            .await
966            .map_err(|_| database::Error::Error)
967            .and_then(|v| {
968                if v.rows_affected() > 0 {
969                    Ok(())
970                } else {
971                    Err(database::Error::TorrentNotFound)
972                }
973            })
974    }
975
976    async fn insert_tag_and_get_id(&self, tag_name: &str) -> Result<i64, database::Error> {
977        query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
978            .bind(tag_name)
979            .execute(&self.pool)
980            .await
981            .map(|v| v.last_insert_rowid())
982            .map_err(|e| match e {
983                sqlx::Error::Database(err) => {
984                    tracing::error!("DB error: {:?}", err);
985                    if err.message().contains("UNIQUE") && err.message().contains("name") {
986                        database::Error::TagAlreadyExists
987                    } else {
988                        database::Error::Error
989                    }
990                }
991                _ => database::Error::Error,
992            })
993    }
994
995    async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
996        query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
997            .bind(tag_id)
998            .execute(&self.pool)
999            .await
1000            .map(|_| ())
1001            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1002    }
1003
1004    async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1005        query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1006            .bind(torrent_id)
1007            .bind(tag_id)
1008            .execute(&self.pool)
1009            .await
1010            .map(|_| ())
1011            .map_err(|_| database::Error::Error)
1012    }
1013
1014    async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
1015        let mut tx = self
1016            .pool
1017            .begin()
1018            .await
1019            .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1020
1021        for tag_id in tag_ids {
1022            query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1023                .bind(torrent_id)
1024                .bind(tag_id)
1025                .execute(&mut *tx)
1026                .await
1027                .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1028        }
1029
1030        tx.commit()
1031            .await
1032            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1033    }
1034
1035    async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1036        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
1037            .bind(torrent_id)
1038            .bind(tag_id)
1039            .execute(&self.pool)
1040            .await
1041            .map(|_| ())
1042            .map_err(|_| database::Error::Error)
1043    }
1044
1045    async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
1046        query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
1047            .bind(torrent_id)
1048            .execute(&self.pool)
1049            .await
1050            .map(|_| ())
1051            .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1052    }
1053
1054    async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
1055        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
1056            .bind(name)
1057            .fetch_one(&self.pool)
1058            .await
1059            .map_err(|_| database::Error::TagNotFound)
1060    }
1061
1062    async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
1063        query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
1064            .fetch_all(&self.pool)
1065            .await
1066            .map_err(|_| database::Error::Error)
1067    }
1068
1069    async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
1070        query_as::<_, TorrentTag>(
1071            "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
1072            FROM torrust_torrent_tags
1073            JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
1074            WHERE torrust_torrent_tag_links.torrent_id = ?",
1075        )
1076        .bind(torrent_id)
1077        .fetch_all(&self.pool)
1078        .await
1079        .map_err(|_| database::Error::Error)
1080    }
1081
1082    async fn update_tracker_info(
1083        &self,
1084        torrent_id: i64,
1085        tracker_url: &Url,
1086        seeders: i64,
1087        leechers: i64,
1088    ) -> Result<(), database::Error> {
1089        query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES ($1, $2, $3, $4, $5)")
1090            .bind(torrent_id)
1091            .bind(tracker_url.to_string())
1092            .bind(seeders)
1093            .bind(leechers)
1094            .bind(datetime_now())
1095            .execute(&self.pool)
1096            .await
1097            .map(|_| ())
1098            .map_err(|_| database::Error::TorrentNotFound)
1099    }
1100
1101    async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
1102        query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
1103            .bind(torrent_id)
1104            .execute(&self.pool)
1105            .await
1106            .map_err(|_| database::Error::Error)
1107            .and_then(|v| {
1108                if v.rows_affected() > 0 {
1109                    Ok(())
1110                } else {
1111                    Err(database::Error::TorrentNotFound)
1112                }
1113            })
1114    }
1115
1116    async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
1117        for table in TABLES_TO_TRUNCATE {
1118            query(&format!("DELETE FROM {table};"))
1119                .execute(&self.pool)
1120                .await
1121                .map_err(|_| database::Error::Error)?;
1122        }
1123
1124        Ok(())
1125    }
1126}