1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};
8use url::Url;
9
10use super::database::TABLES_TO_TRUNCATE;
11use crate::databases::database;
12use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
13use crate::models::category::CategoryId;
14use crate::models::info_hash::InfoHash;
15use crate::models::response::TorrentsResponse;
16use crate::models::torrent::{Metadata, TorrentListing};
17use crate::models::torrent_file::{
18 DbTorrent, DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentHttpSeedUrl, DbTorrentNode, Torrent, TorrentFile,
19};
20use crate::models::torrent_tag::{TagId, TorrentTag};
21use crate::models::tracker_key::TrackerKey;
22use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
23use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
24use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
25use crate::utils::hex::from_bytes;
26
27pub struct Mysql {
28 pub pool: MySqlPool,
29}
30
31#[async_trait]
32impl Database for Mysql {
33 fn get_database_driver(&self) -> Driver {
34 Driver::Mysql
35 }
36
37 async fn new(database_url: &str) -> Self {
38 let connection_options = MySqlConnectOptions::from_str(database_url)
39 .expect("Unable to create connection options.")
40 .log_statements(log::LevelFilter::Debug)
41 .log_slow_statements(log::LevelFilter::Info, Duration::from_secs(1));
42
43 let db = MySqlPoolOptions::new()
44 .connect_with(connection_options)
45 .await
46 .expect("Unable to create database pool.");
47
48 sqlx::migrate!("migrations/mysql")
49 .run(&db)
50 .await
51 .expect("Could not run database migrations.");
52
53 Self { pool: db }
54 }
55
56 async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
57 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
59
60 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
62
63 let user_id = query("INSERT INTO torrust_users (date_registered) VALUES (UTC_TIMESTAMP())")
65 .execute(&mut *tx)
66 .await
67 .map(|v| v.last_insert_id())
68 .map_err(|_| database::Error::Error)?;
69
70 let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
72 .bind(user_id)
73 .bind(password_hash)
74 .execute(&mut *tx)
75 .await
76 .map_err(|_| database::Error::Error);
77
78 if let Err(e) = insert_user_auth_result {
80 drop(tx.rollback().await);
81 return Err(e);
82 }
83
84 let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
86 .bind(user_id)
87 .bind(username)
88 .bind(email)
89 .execute(&mut *tx)
90 .await
91 .map_err(|e| match e {
92 sqlx::Error::Database(err) => {
93 if err.message().contains("username") {
94 database::Error::UsernameTaken
95 } else if err.message().contains("email") {
96 database::Error::EmailTaken
97 } else {
98 database::Error::Error
99 }
100 }
101 _ => database::Error::Error
102 });
103
104 match insert_user_profile_result {
106 Ok(_) => {
107 drop(tx.commit().await);
108 Ok(i64::overflowing_add_unsigned(0, user_id).0)
109 }
110 Err(e) => {
111 drop(tx.rollback().await);
112 Err(e)
113 }
114 }
115 }
116
117 async fn change_user_password(&self, user_id: i64, new_password: &str) -> Result<(), database::Error> {
119 query("UPDATE torrust_user_authentication SET password_hash = ? WHERE user_id = ?")
120 .bind(new_password)
121 .bind(user_id)
122 .execute(&self.pool)
123 .await
124 .map_err(|_| database::Error::Error)
125 .and_then(|v| {
126 if v.rows_affected() > 0 {
127 Ok(())
128 } else {
129 Err(database::Error::UserNotFound)
130 }
131 })
132 }
133
134 async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
135 query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
136 .bind(user_id)
137 .fetch_one(&self.pool)
138 .await
139 .map_err(|_| database::Error::UserNotFound)
140 }
141
142 async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
143 query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
144 .bind(user_id)
145 .fetch_one(&self.pool)
146 .await
147 .map_err(|_| database::Error::UserNotFound)
148 }
149
150 async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
151 query_as::<_, UserProfile>(r#"SELECT user_id, username, COALESCE(email, "") as email, email_verified, COALESCE(bio, "") as bio, COALESCE(avatar, "") as avatar FROM torrust_user_profiles WHERE username = ?"#)
152 .bind(username)
153 .fetch_one(&self.pool)
154 .await
155 .map_err(|_| database::Error::UserNotFound)
156 }
157
158 async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
159 query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
160 .bind(user_id)
161 .fetch_one(&self.pool)
162 .await
163 .map_err(|_| database::Error::UserNotFound)
164 }
165
166 async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
173 const HOUR_IN_SECONDS: i64 = 3600;
174
175 let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
176
177 query_as::<_, TrackerKey>("SELECT tracker_key AS 'key', date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = ? AND date_expiry > ? ORDER BY date_expiry DESC")
179 .bind(user_id)
180 .bind(current_time_plus_hour)
181 .fetch_one(&self.pool)
182 .await
183 .ok()
184 }
185
186 async fn count_users(&self) -> Result<i64, database::Error> {
187 query_as("SELECT COUNT(*) FROM torrust_users")
188 .fetch_one(&self.pool)
189 .await
190 .map(|(v,)| v)
191 .map_err(|_| database::Error::Error)
192 }
193
194 async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
195 let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
197
198 query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES (?, ?, ?)")
199 .bind(user_id)
200 .bind(reason)
201 .bind(date_expiry_string)
202 .execute(&self.pool)
203 .await
204 .map(|_| ())
205 .map_err(|_| database::Error::Error)
206 }
207
208 async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
209 query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
210 .bind(user_id)
211 .execute(&self.pool)
212 .await
213 .map_err(|_| database::Error::Error)
214 .and_then(|v| {
215 if v.rows_affected() > 0 {
216 Ok(())
217 } else {
218 Err(database::Error::UserNotFound)
219 }
220 })
221 }
222
223 async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
224 query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
225 .bind(user_id)
226 .execute(&self.pool)
227 .await
228 .map_err(|_| database::Error::Error)
229 .and_then(|v| {
230 if v.rows_affected() > 0 {
231 Ok(())
232 } else {
233 Err(database::Error::UserNotFound)
234 }
235 })
236 }
237
238 async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
239 let key = tracker_key.key.clone();
240
241 query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES (?, ?, ?)")
242 .bind(user_id)
243 .bind(key)
244 .bind(tracker_key.valid_until)
245 .execute(&self.pool)
246 .await
247 .map(|_| ())
248 .map_err(|_| database::Error::Error)
249 }
250
251 async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
252 query("DELETE FROM torrust_users WHERE user_id = ?")
253 .bind(user_id)
254 .execute(&self.pool)
255 .await
256 .map_err(|_| database::Error::Error)
257 .and_then(|v| {
258 if v.rows_affected() > 0 {
259 Ok(())
260 } else {
261 Err(database::Error::UserNotFound)
262 }
263 })
264 }
265
266 async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
267 query("INSERT INTO torrust_categories (name) VALUES (?)")
268 .bind(category_name)
269 .execute(&self.pool)
270 .await
271 .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
272 .map_err(|_| database::Error::Error)
273 }
274
275 async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
276 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
277 .bind(category_id)
278 .fetch_one(&self.pool)
279 .await
280 .map_err(|_| database::Error::CategoryNotFound)
281 }
282
283 async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
284 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
285 .bind(category_name)
286 .fetch_one(&self.pool)
287 .await
288 .map_err(|_| database::Error::CategoryNotFound)
289 }
290
291 async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
292 query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name")
293 .fetch_all(&self.pool)
294 .await
295 .map_err(|_| database::Error::Error)
296 }
297
298 async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
299 query("DELETE FROM torrust_categories WHERE name = ?")
300 .bind(category_name)
301 .execute(&self.pool)
302 .await
303 .map_err(|_| database::Error::Error)
304 .and_then(|v| {
305 if v.rows_affected() > 0 {
306 Ok(())
307 } else {
308 Err(database::Error::CategoryNotFound)
309 }
310 })
311 }
312
313 #[allow(clippy::too_many_lines)]
315 async fn get_torrents_search_sorted_paginated(
316 &self,
317 search: &Option<String>,
318 categories: &Option<Vec<String>>,
319 tags: &Option<Vec<String>>,
320 sort: &Sorting,
321 offset: u64,
322 limit: u8,
323 ) -> Result<TorrentsResponse, database::Error> {
324 let title = match search {
325 None => "%".to_string(),
326 Some(v) => format!("%{v}%"),
327 };
328
329 let sort_query: String = match sort {
330 Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
331 Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
332 Sorting::SeedersAsc => "seeders ASC".to_string(),
333 Sorting::SeedersDesc => "seeders DESC".to_string(),
334 Sorting::LeechersAsc => "leechers ASC".to_string(),
335 Sorting::LeechersDesc => "leechers DESC".to_string(),
336 Sorting::NameAsc => "title ASC".to_string(),
337 Sorting::NameDesc => "title DESC".to_string(),
338 Sorting::SizeAsc => "size ASC".to_string(),
339 Sorting::SizeDesc => "size DESC".to_string(),
340 };
341
342 let category_filter_query = if let Some(c) = categories {
343 let mut i = 0;
344 let mut category_filters = String::new();
345 for category in c {
346 if let Ok(sanitized_category) = self.get_category_from_name(category).await {
348 let mut str = format!("tc.name = '{}'", sanitized_category.name);
349 if i > 0 {
350 str = format!(" OR {str}");
351 }
352 category_filters.push_str(&str);
353 i += 1;
354 }
355 }
356 if category_filters.is_empty() {
357 String::new()
358 } else {
359 format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
360 }
361 } else {
362 String::new()
363 };
364
365 let tag_filter_query = if let Some(t) = tags {
366 let mut i = 0;
367 let mut tag_filters = String::new();
368 for tag in t {
369 if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
371 let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
372 if i > 0 {
373 str = format!(" OR {str}");
374 }
375 tag_filters.push_str(&str);
376 i += 1;
377 }
378 }
379 if tag_filters.is_empty() {
380 String::new()
381 } else {
382 format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
383 }
384 } else {
385 String::new()
386 };
387
388 let mut query_string = format!(
389 "SELECT
390 tt.torrent_id,
391 tp.username AS uploader,
392 tt.info_hash,
393 ti.title,
394 ti.description,
395 tt.category_id,
396 DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded,
397 tt.size AS file_size,
398 tt.name,
399 tt.comment,
400 tt.creation_date,
401 tt.created_by,
402 tt.`encoding`,
403 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
404 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
405 FROM torrust_torrents tt
406 {category_filter_query}
407 {tag_filter_query}
408 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
409 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
410 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
411 WHERE title LIKE ?
412 GROUP BY tt.torrent_id"
413 );
414
415 let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
416
417 let count_result: Result<i64, database::Error> = query_as(&count_query)
418 .bind(title.clone())
419 .fetch_one(&self.pool)
420 .await
421 .map(|(v,)| v)
422 .map_err(|_| database::Error::Error);
423
424 let count = count_result?;
425
426 query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
427
428 let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
429 .bind(title)
430 .bind(i64::saturating_add_unsigned(0, offset))
431 .bind(limit)
432 .fetch_all(&self.pool)
433 .await
434 .map_err(|_| database::Error::Error)?;
435
436 Ok(TorrentsResponse {
437 total: u32::try_from(count).expect("variable `count` is larger than u32"),
438 results: res,
439 })
440 }
441
442 #[allow(clippy::too_many_lines)]
443 async fn insert_torrent_and_get_id(
444 &self,
445 original_info_hash: &InfoHash,
446 torrent: &Torrent,
447 uploader_id: UserId,
448 metadata: &Metadata,
449 ) -> Result<i64, database::Error> {
450 let info_hash = torrent.canonical_info_hash_hex();
451 let canonical_info_hash = torrent.canonical_info_hash();
452
453 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
455
456 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
458
459 let is_bep_30 = !matches!(&torrent.info.pieces, Some(_pieces));
462
463 let pieces = torrent.info.pieces.as_ref().map(|pieces| from_bytes(pieces.as_ref()));
464
465 let root_hash = torrent
466 .info
467 .root_hash
468 .as_ref()
469 .map(|root_hash| from_bytes(root_hash.as_ref()));
470
471 let torrent_id = query(
473 "INSERT INTO torrust_torrents (
474 uploader_id,
475 category_id,
476 info_hash,
477 size,
478 name,
479 pieces,
480 root_hash,
481 piece_length,
482 private,
483 is_bep_30,
484 `source`,
485 comment,
486 date_uploaded,
487 creation_date,
488 created_by,
489 `encoding`
490 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, UTC_TIMESTAMP(), ?, ?, ?)",
491 )
492 .bind(uploader_id)
493 .bind(metadata.category_id)
494 .bind(info_hash.to_lowercase())
495 .bind(torrent.file_size())
496 .bind(torrent.info.name.to_string())
497 .bind(pieces)
498 .bind(root_hash)
499 .bind(torrent.info.piece_length)
500 .bind(torrent.info.private)
501 .bind(is_bep_30)
502 .bind(torrent.info.source.clone())
503 .bind(torrent.comment.clone())
504 .bind(torrent.creation_date)
505 .bind(torrent.created_by.clone())
506 .bind(torrent.encoding.clone())
507 .execute(&mut *tx)
508 .await
509 .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
510 .map_err(|e| match e {
511 sqlx::Error::Database(err) => {
512 tracing::error!("DB error: {:?}", err);
513 if err.message().contains("Duplicate entry") && err.message().contains("info_hash") {
514 database::Error::TorrentAlreadyExists
515 } else {
516 database::Error::Error
517 }
518 }
519 _ => database::Error::Error,
520 })?;
521
522 let insert_info_hash_result =
525 query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
526 .bind(original_info_hash.to_hex_string())
527 .bind(canonical_info_hash.to_hex_string())
528 .bind(true)
529 .execute(&mut *tx)
530 .await
531 .map(|_| ())
532 .map_err(|err| {
533 tracing::error!("DB error: {:?}", err);
534 database::Error::Error
535 });
536
537 if let Err(e) = insert_info_hash_result {
539 drop(tx.rollback().await);
540 return Err(e);
541 }
542
543 let insert_torrent_files_result = if let Some(length) = torrent.info.length {
544 query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
545 .bind(torrent.info.md5sum.clone())
546 .bind(torrent_id)
547 .bind(length)
548 .execute(&mut *tx)
549 .await
550 .map(|_| ())
551 .map_err(|_| database::Error::Error)
552 } else {
553 let files = torrent.info.files.as_ref().unwrap();
554
555 for file in files {
556 let path = file.path.join("/");
557
558 let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
559 .bind(file.md5sum.clone())
560 .bind(torrent_id)
561 .bind(file.length)
562 .bind(path)
563 .execute(&mut *tx)
564 .await
565 .map_err(|_| database::Error::Error)?;
566 }
567
568 Ok(())
569 };
570
571 if let Err(e) = insert_torrent_files_result {
573 drop(tx.rollback().await);
574 return Err(e);
575 }
576
577 let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
578 {
579 let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
581
582 for tracker_url in &announce_urls {
583 let () = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
584 .bind(torrent_id)
585 .bind(tracker_url)
586 .execute(&mut *tx)
587 .await
588 .map(|_| ())
589 .map_err(|_| database::Error::Error)?;
590 }
591
592 Ok(())
593 } else {
594 let tracker_url = torrent.announce.as_ref().unwrap();
595
596 query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
597 .bind(torrent_id)
598 .bind(tracker_url)
599 .execute(&mut *tx)
600 .await
601 .map(|_| ())
602 .map_err(|_| database::Error::Error)
603 };
604
605 if let Err(e) = insert_torrent_announce_urls_result {
607 drop(tx.rollback().await);
608 return Err(e);
609 }
610
611 let insert_torrent_http_seeds_result: Result<(), database::Error> = if let Some(http_seeds) = &torrent.httpseeds {
614 for seed_url in http_seeds {
615 let () = query("INSERT INTO torrust_torrent_http_seeds (torrent_id, seed_url) VALUES (?, ?)")
616 .bind(torrent_id)
617 .bind(seed_url)
618 .execute(&mut *tx)
619 .await
620 .map(|_| ())
621 .map_err(|_| database::Error::Error)?;
622 }
623
624 Ok(())
625 } else {
626 Ok(())
627 };
628
629 if let Err(e) = insert_torrent_http_seeds_result {
631 drop(tx.rollback().await);
632 return Err(e);
633 }
634
635 let insert_torrent_nodes_result: Result<(), database::Error> = if let Some(nodes) = &torrent.nodes {
638 for node in nodes {
639 let () = query("INSERT INTO torrust_torrent_nodes (torrent_id, node_ip, node_port) VALUES (?, ?, ?)")
640 .bind(torrent_id)
641 .bind(node.0.clone())
642 .bind(node.1)
643 .execute(&mut *tx)
644 .await
645 .map(|_| ())
646 .map_err(|_| database::Error::Error)?;
647 }
648
649 Ok(())
650 } else {
651 Ok(())
652 };
653
654 if let Err(e) = insert_torrent_nodes_result {
656 drop(tx.rollback().await);
657 return Err(e);
658 }
659
660 for tag_id in &metadata.tags {
663 let insert_torrent_tag_result = query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
664 .bind(torrent_id)
665 .bind(tag_id)
666 .execute(&mut *tx)
667 .await
668 .map_err(|err| database::Error::ErrorWithText(err.to_string()));
669
670 if let Err(e) = insert_torrent_tag_result {
672 drop(tx.rollback().await);
673 return Err(e);
674 }
675 }
676
677 let insert_torrent_info_result =
678 query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
679 .bind(torrent_id)
680 .bind(metadata.title.clone())
681 .bind(metadata.description.clone())
682 .execute(&mut *tx)
683 .await
684 .map_err(|e| match e {
685 sqlx::Error::Database(err) => {
686 tracing::error!("DB error: {:?}", err);
687 if err.message().contains("Duplicate entry") && err.message().contains("title") {
688 database::Error::TorrentTitleAlreadyExists
689 } else {
690 database::Error::Error
691 }
692 }
693 _ => database::Error::Error,
694 });
695
696 match insert_torrent_info_result {
698 Ok(_) => {
699 drop(tx.commit().await);
700 Ok(torrent_id)
701 }
702 Err(e) => {
703 drop(tx.rollback().await);
704 Err(e)
705 }
706 }
707 }
708
709 async fn get_torrent_canonical_info_hash_group(
710 &self,
711 canonical: &InfoHash,
712 ) -> Result<CanonicalInfoHashGroup, database::Error> {
713 let db_info_hashes = query_as::<_, DbTorrentInfoHash>(
714 "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE canonical_info_hash = ?",
715 )
716 .bind(canonical.to_hex_string())
717 .fetch_all(&self.pool)
718 .await
719 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
720
721 let info_hashes: Vec<InfoHash> = db_info_hashes
722 .into_iter()
723 .map(|db_info_hash| {
724 InfoHash::from_str(&db_info_hash.info_hash)
725 .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_info_hash.info_hash))
726 })
727 .collect();
728
729 Ok(CanonicalInfoHashGroup {
730 canonical_info_hash: *canonical,
731 original_info_hashes: info_hashes,
732 })
733 }
734
735 async fn find_canonical_info_hash_for(&self, info_hash: &InfoHash) -> Result<Option<InfoHash>, database::Error> {
736 let maybe_db_torrent_info_hash = query_as::<_, DbTorrentInfoHash>(
737 "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE info_hash = ?",
738 )
739 .bind(info_hash.to_hex_string())
740 .fetch_optional(&self.pool)
741 .await
742 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
743
744 match maybe_db_torrent_info_hash {
745 Some(db_torrent_info_hash) => Ok(Some(
746 InfoHash::from_str(&db_torrent_info_hash.canonical_info_hash)
747 .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_torrent_info_hash.canonical_info_hash)),
748 )),
749 None => Ok(None),
750 }
751 }
752
753 async fn add_info_hash_to_canonical_info_hash_group(
754 &self,
755 info_hash: &InfoHash,
756 canonical: &InfoHash,
757 ) -> Result<(), database::Error> {
758 query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
759 .bind(info_hash.to_hex_string())
760 .bind(canonical.to_hex_string())
761 .bind(true)
762 .execute(&self.pool)
763 .await
764 .map(|_| ())
765 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
766 }
767
768 async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrent, database::Error> {
769 query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE torrent_id = ?")
770 .bind(torrent_id)
771 .fetch_one(&self.pool)
772 .await
773 .map_err(|_| database::Error::TorrentNotFound)
774 }
775
776 async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrent, database::Error> {
777 query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE info_hash = ?")
778 .bind(info_hash.to_hex_string().to_lowercase())
779 .fetch_one(&self.pool)
780 .await
781 .map_err(|_| database::Error::TorrentNotFound)
782 }
783
784 async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
785 let db_torrent_files =
786 query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
787 .bind(torrent_id)
788 .fetch_all(&self.pool)
789 .await
790 .map_err(|_| database::Error::TorrentNotFound)?;
791
792 let torrent_files: Vec<TorrentFile> = db_torrent_files
793 .into_iter()
794 .map(|tf| TorrentFile {
795 path: tf
796 .path
797 .unwrap_or_default()
798 .split('/')
799 .map(std::string::ToString::to_string)
800 .collect(),
801 length: tf.length,
802 md5sum: tf.md5sum,
803 })
804 .collect();
805
806 Ok(torrent_files)
807 }
808
809 async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
810 query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
811 .bind(torrent_id)
812 .fetch_all(&self.pool)
813 .await
814 .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
815 .map_err(|_| database::Error::TorrentNotFound)
816 }
817
818 async fn get_torrent_http_seed_urls_from_id(&self, torrent_id: i64) -> Result<Vec<String>, database::Error> {
819 query_as::<_, DbTorrentHttpSeedUrl>("SELECT seed_url FROM torrust_torrent_http_seeds WHERE torrent_id = ?")
820 .bind(torrent_id)
821 .fetch_all(&self.pool)
822 .await
823 .map(|v| v.iter().map(|a| a.seed_url.to_string()).collect())
824 .map_err(|_| database::Error::TorrentNotFound)
825 }
826
827 async fn get_torrent_nodes_from_id(&self, torrent_id: i64) -> Result<Vec<(String, i64)>, database::Error> {
828 query_as::<_, DbTorrentNode>("SELECT node_ip, node_port FROM torrust_torrent_nodes WHERE torrent_id = ?")
829 .bind(torrent_id)
830 .fetch_all(&self.pool)
831 .await
832 .map(|v| v.iter().map(|a| (a.node_ip.to_string(), a.node_port)).collect())
833 .map_err(|_| database::Error::TorrentNotFound)
834 }
835
836 async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
837 query_as::<_, TorrentListing>(
838 "SELECT
839 tt.torrent_id,
840 tp.username AS uploader,
841 tt.info_hash,
842 ti.title,
843 ti.description,
844 tt.category_id,
845 DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded,
846 tt.size AS file_size,
847 tt.name,
848 tt.comment,
849 tt.creation_date,
850 tt.created_by,
851 tt.`encoding`,
852 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
853 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
854 FROM torrust_torrents tt
855 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
856 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
857 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
858 WHERE tt.torrent_id = ?
859 GROUP BY torrent_id",
860 )
861 .bind(torrent_id)
862 .fetch_one(&self.pool)
863 .await
864 .map_err(|_| database::Error::TorrentNotFound)
865 }
866
867 async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
868 query_as::<_, TorrentListing>(
869 "SELECT
870 tt.torrent_id,
871 tp.username AS uploader,
872 tt.info_hash,
873 ti.title,
874 ti.description,
875 tt.category_id,
876 DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded,
877 tt.size AS file_size,
878 tt.name,
879 tt.comment,
880 tt.creation_date,
881 tt.created_by,
882 tt.`encoding`,
883 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
884 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
885 FROM torrust_torrents tt
886 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
887 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
888 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
889 WHERE tt.info_hash = ?
890 GROUP BY torrent_id",
891 )
892 .bind(info_hash.to_hex_string().to_lowercase())
893 .fetch_one(&self.pool)
894 .await
895 .map_err(|_| database::Error::TorrentNotFound)
896 }
897
898 async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
899 query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
900 .fetch_all(&self.pool)
901 .await
902 .map_err(|_| database::Error::Error)
903 }
904
905 async fn get_torrents_with_stats_not_updated_since(
906 &self,
907 datetime: DateTime<Utc>,
908 limit: i64,
909 ) -> Result<Vec<TorrentCompact>, database::Error> {
910 query_as::<_, TorrentCompact>(
911 "SELECT tt.torrent_id, tt.info_hash
912 FROM torrust_torrents tt
913 LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
914 WHERE tts.updated_at < ? OR tts.updated_at IS NULL
915 ORDER BY tts.updated_at ASC
916 LIMIT ?
917 ",
918 )
919 .bind(datetime.format(DATETIME_FORMAT).to_string())
920 .bind(limit)
921 .fetch_all(&self.pool)
922 .await
923 .map_err(|_| database::Error::Error)
924 }
925
926 async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
927 query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
928 .bind(title)
929 .bind(torrent_id)
930 .execute(&self.pool)
931 .await
932 .map_err(|e| match e {
933 sqlx::Error::Database(err) => {
934 tracing::error!("DB error: {:?}", err);
935 if err.message().contains("Duplicate entry") && err.message().contains("title") {
936 database::Error::TorrentTitleAlreadyExists
937 } else {
938 database::Error::Error
939 }
940 }
941 _ => database::Error::Error,
942 })
943 .and_then(|v| {
944 if v.rows_affected() > 0 {
945 Ok(())
946 } else {
947 Err(database::Error::TorrentNotFound)
948 }
949 })
950 }
951
952 async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
953 query("UPDATE torrust_torrent_info SET description = ? WHERE torrent_id = ?")
954 .bind(description)
955 .bind(torrent_id)
956 .execute(&self.pool)
957 .await
958 .map_err(|_| database::Error::Error)
959 .and_then(|v| {
960 if v.rows_affected() > 0 {
961 Ok(())
962 } else {
963 Err(database::Error::TorrentNotFound)
964 }
965 })
966 }
967
968 async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
969 query("UPDATE torrust_torrents SET category_id = ? WHERE torrent_id = ?")
970 .bind(category_id)
971 .bind(torrent_id)
972 .execute(&self.pool)
973 .await
974 .map_err(|_| database::Error::Error)
975 .and_then(|v| {
976 if v.rows_affected() > 0 {
977 Ok(())
978 } else {
979 Err(database::Error::TorrentNotFound)
980 }
981 })
982 }
983
984 async fn insert_tag_and_get_id(&self, name: &str) -> Result<i64, database::Error> {
985 query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
986 .bind(name)
987 .execute(&self.pool)
988 .await
989 .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
990 .map_err(|e| match e {
991 sqlx::Error::Database(err) => {
992 tracing::error!("DB error: {:?}", err);
993 if err.message().contains("Duplicate entry") && err.message().contains("name") {
994 database::Error::TagAlreadyExists
995 } else {
996 database::Error::Error
997 }
998 }
999 _ => database::Error::Error,
1000 })
1001 }
1002
1003 async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
1004 query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
1005 .bind(tag_id)
1006 .execute(&self.pool)
1007 .await
1008 .map(|_| ())
1009 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1010 }
1011
1012 async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1013 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1014 .bind(torrent_id)
1015 .bind(tag_id)
1016 .execute(&self.pool)
1017 .await
1018 .map(|_| ())
1019 .map_err(|_| database::Error::Error)
1020 }
1021
1022 async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
1023 let mut tx = self
1024 .pool
1025 .begin()
1026 .await
1027 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1028
1029 for tag_id in tag_ids {
1030 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1031 .bind(torrent_id)
1032 .bind(tag_id)
1033 .execute(&mut *tx)
1034 .await
1035 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1036 }
1037
1038 tx.commit()
1039 .await
1040 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1041 }
1042
1043 async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1044 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
1045 .bind(torrent_id)
1046 .bind(tag_id)
1047 .execute(&self.pool)
1048 .await
1049 .map(|_| ())
1050 .map_err(|_| database::Error::Error)
1051 }
1052
1053 async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
1054 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
1055 .bind(torrent_id)
1056 .execute(&self.pool)
1057 .await
1058 .map(|_| ())
1059 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1060 }
1061
1062 async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
1063 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
1064 .bind(name)
1065 .fetch_one(&self.pool)
1066 .await
1067 .map_err(|_| database::Error::TagNotFound)
1068 }
1069
1070 async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
1071 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
1072 .fetch_all(&self.pool)
1073 .await
1074 .map_err(|_| database::Error::Error)
1075 }
1076
1077 async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
1078 query_as::<_, TorrentTag>(
1079 "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
1080 FROM torrust_torrent_tags
1081 JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
1082 WHERE torrust_torrent_tag_links.torrent_id = ?",
1083 )
1084 .bind(torrent_id)
1085 .fetch_all(&self.pool)
1086 .await
1087 .map_err(|_| database::Error::Error)
1088 }
1089
1090 async fn update_tracker_info(
1091 &self,
1092 torrent_id: i64,
1093 tracker_url: &Url,
1094 seeders: i64,
1095 leechers: i64,
1096 ) -> Result<(), database::Error> {
1097 query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES (?, ?, ?, ?, ?)")
1098 .bind(torrent_id)
1099 .bind(tracker_url.to_string())
1100 .bind(seeders)
1101 .bind(leechers)
1102 .bind(datetime_now())
1103 .execute(&self.pool)
1104 .await
1105 .map(|_| ())
1106 .map_err(|_| database::Error::TorrentNotFound)
1107 }
1108
1109 async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
1110 query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
1111 .bind(torrent_id)
1112 .execute(&self.pool)
1113 .await
1114 .map_err(|_| database::Error::Error)
1115 .and_then(|v| {
1116 if v.rows_affected() > 0 {
1117 Ok(())
1118 } else {
1119 Err(database::Error::TorrentNotFound)
1120 }
1121 })
1122 }
1123
1124 async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
1125 for table in TABLES_TO_TRUNCATE {
1126 query(&format!("DELETE FROM {table};"))
1127 .execute(&self.pool)
1128 .await
1129 .map_err(|_| database::Error::Error)?;
1130 }
1131
1132 Ok(())
1133 }
1134}