1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::NaiveDateTime;
6use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, MySqlPool};
8
9use super::database::TABLES_TO_TRUNCATE;
10use crate::databases::database;
11use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
12use crate::models::category::CategoryId;
13use crate::models::info_hash::InfoHash;
14use crate::models::response::TorrentsResponse;
15use crate::models::torrent::TorrentListing;
16use crate::models::torrent_file::{DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentInfo, Torrent, TorrentFile};
17use crate::models::torrent_tag::{TagId, TorrentTag};
18use crate::models::tracker_key::TrackerKey;
19use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
20use crate::utils::clock;
21use crate::utils::hex::from_bytes;
22
23pub struct Mysql {
24 pub pool: MySqlPool,
25}
26
27#[async_trait]
28impl Database for Mysql {
29 fn get_database_driver(&self) -> Driver {
30 Driver::Mysql
31 }
32
33 async fn new(database_url: &str) -> Self {
34 let mut connection_options = MySqlConnectOptions::from_str(database_url).expect("Unable to create connection options.");
35 connection_options
36 .log_statements(log::LevelFilter::Error)
37 .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(1));
38
39 let db = MySqlPoolOptions::new()
40 .connect_with(connection_options)
41 .await
42 .expect("Unable to create database pool.");
43
44 sqlx::migrate!("migrations/mysql")
45 .run(&db)
46 .await
47 .expect("Could not run database migrations.");
48
49 Self { pool: db }
50 }
51
52 async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
53 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
55
56 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
58
59 let user_id = query("INSERT INTO torrust_users (date_registered) VALUES (UTC_TIMESTAMP())")
61 .execute(&mut tx)
62 .await
63 .map(|v| v.last_insert_id())
64 .map_err(|_| database::Error::Error)?;
65
66 let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
68 .bind(user_id)
69 .bind(password_hash)
70 .execute(&mut tx)
71 .await
72 .map_err(|_| database::Error::Error);
73
74 if let Err(e) = insert_user_auth_result {
76 let _ = tx.rollback().await;
77 return Err(e);
78 }
79
80 let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
82 .bind(user_id)
83 .bind(username)
84 .bind(email)
85 .execute(&mut tx)
86 .await
87 .map_err(|e| match e {
88 sqlx::Error::Database(err) => {
89 if err.message().contains("username") {
90 database::Error::UsernameTaken
91 } else if err.message().contains("email") {
92 database::Error::EmailTaken
93 } else {
94 database::Error::Error
95 }
96 }
97 _ => database::Error::Error
98 });
99
100 match insert_user_profile_result {
102 Ok(_) => {
103 let _ = tx.commit().await;
104 Ok(i64::overflowing_add_unsigned(0, user_id).0)
105 }
106 Err(e) => {
107 let _ = tx.rollback().await;
108 Err(e)
109 }
110 }
111 }
112
113 async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
114 query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
115 .bind(user_id)
116 .fetch_one(&self.pool)
117 .await
118 .map_err(|_| database::Error::UserNotFound)
119 }
120
121 async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
122 query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
123 .bind(user_id)
124 .fetch_one(&self.pool)
125 .await
126 .map_err(|_| database::Error::UserNotFound)
127 }
128
129 async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
130 query_as::<_, UserProfile>(r#"SELECT user_id, username, COALESCE(email, "") as email, email_verified, COALESCE(bio, "") as bio, COALESCE(avatar, "") as avatar FROM torrust_user_profiles WHERE username = ?"#)
131 .bind(username)
132 .fetch_one(&self.pool)
133 .await
134 .map_err(|_| database::Error::UserNotFound)
135 }
136
137 async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
138 query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
139 .bind(user_id)
140 .fetch_one(&self.pool)
141 .await
142 .map_err(|_| database::Error::UserNotFound)
143 }
144
145 async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
152 const HOUR_IN_SECONDS: i64 = 3600;
153
154 let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
155
156 query_as::<_, TrackerKey>("SELECT tracker_key AS 'key', date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = ? AND date_expiry > ? ORDER BY date_expiry DESC")
158 .bind(user_id)
159 .bind(current_time_plus_hour)
160 .fetch_one(&self.pool)
161 .await
162 .ok()
163 }
164
165 async fn count_users(&self) -> Result<i64, database::Error> {
166 query_as("SELECT COUNT(*) FROM torrust_users")
167 .fetch_one(&self.pool)
168 .await
169 .map(|(v,)| v)
170 .map_err(|_| database::Error::Error)
171 }
172
173 async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
174 let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
176
177 query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES (?, ?, ?)")
178 .bind(user_id)
179 .bind(reason)
180 .bind(date_expiry_string)
181 .execute(&self.pool)
182 .await
183 .map(|_| ())
184 .map_err(|_| database::Error::Error)
185 }
186
187 async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
188 query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
189 .bind(user_id)
190 .execute(&self.pool)
191 .await
192 .map_err(|_| database::Error::Error)
193 .and_then(|v| {
194 if v.rows_affected() > 0 {
195 Ok(())
196 } else {
197 Err(database::Error::UserNotFound)
198 }
199 })
200 }
201
202 async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
203 query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
204 .bind(user_id)
205 .execute(&self.pool)
206 .await
207 .map_err(|_| database::Error::Error)
208 .and_then(|v| {
209 if v.rows_affected() > 0 {
210 Ok(())
211 } else {
212 Err(database::Error::UserNotFound)
213 }
214 })
215 }
216
217 async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
218 let key = tracker_key.key.clone();
219
220 query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES (?, ?, ?)")
221 .bind(user_id)
222 .bind(key)
223 .bind(tracker_key.valid_until)
224 .execute(&self.pool)
225 .await
226 .map(|_| ())
227 .map_err(|_| database::Error::Error)
228 }
229
230 async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
231 query("DELETE FROM torrust_users WHERE user_id = ?")
232 .bind(user_id)
233 .execute(&self.pool)
234 .await
235 .map_err(|_| database::Error::Error)
236 .and_then(|v| {
237 if v.rows_affected() > 0 {
238 Ok(())
239 } else {
240 Err(database::Error::UserNotFound)
241 }
242 })
243 }
244
245 async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
246 query("INSERT INTO torrust_categories (name) VALUES (?)")
247 .bind(category_name)
248 .execute(&self.pool)
249 .await
250 .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
251 .map_err(|e| match e {
252 sqlx::Error::Database(err) => {
253 if err.message().contains("UNIQUE") {
254 database::Error::CategoryAlreadyExists
255 } else {
256 database::Error::Error
257 }
258 }
259 _ => database::Error::Error,
260 })
261 }
262
263 async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
264 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
265 .bind(category_id)
266 .fetch_one(&self.pool)
267 .await
268 .map_err(|_| database::Error::CategoryNotFound)
269 }
270
271 async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
272 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
273 .bind(category_name)
274 .fetch_one(&self.pool)
275 .await
276 .map_err(|_| database::Error::CategoryNotFound)
277 }
278
279 async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
280 query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name")
281 .fetch_all(&self.pool)
282 .await
283 .map_err(|_| database::Error::Error)
284 }
285
286 async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
287 query("DELETE FROM torrust_categories WHERE name = ?")
288 .bind(category_name)
289 .execute(&self.pool)
290 .await
291 .map_err(|_| database::Error::Error)
292 .and_then(|v| {
293 if v.rows_affected() > 0 {
294 Ok(())
295 } else {
296 Err(database::Error::CategoryNotFound)
297 }
298 })
299 }
300
301 async fn get_torrents_search_sorted_paginated(
303 &self,
304 search: &Option<String>,
305 categories: &Option<Vec<String>>,
306 tags: &Option<Vec<String>>,
307 sort: &Sorting,
308 offset: u64,
309 limit: u8,
310 ) -> Result<TorrentsResponse, database::Error> {
311 let title = match search {
312 None => "%".to_string(),
313 Some(v) => format!("%{v}%"),
314 };
315
316 let sort_query: String = match sort {
317 Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
318 Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
319 Sorting::SeedersAsc => "seeders ASC".to_string(),
320 Sorting::SeedersDesc => "seeders DESC".to_string(),
321 Sorting::LeechersAsc => "leechers ASC".to_string(),
322 Sorting::LeechersDesc => "leechers DESC".to_string(),
323 Sorting::NameAsc => "title ASC".to_string(),
324 Sorting::NameDesc => "title DESC".to_string(),
325 Sorting::SizeAsc => "size ASC".to_string(),
326 Sorting::SizeDesc => "size DESC".to_string(),
327 };
328
329 let category_filter_query = if let Some(c) = categories {
330 let mut i = 0;
331 let mut category_filters = String::new();
332 for category in c {
333 if let Ok(sanitized_category) = self.get_category_from_name(category).await {
335 let mut str = format!("tc.name = '{}'", sanitized_category.name);
336 if i > 0 {
337 str = format!(" OR {str}");
338 }
339 category_filters.push_str(&str);
340 i += 1;
341 }
342 }
343 if category_filters.is_empty() {
344 String::new()
345 } else {
346 format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
347 }
348 } else {
349 String::new()
350 };
351
352 let tag_filter_query = if let Some(t) = tags {
353 let mut i = 0;
354 let mut tag_filters = String::new();
355 for tag in t {
356 if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
358 let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
359 if i > 0 {
360 str = format!(" OR {str}");
361 }
362 tag_filters.push_str(&str);
363 i += 1;
364 }
365 }
366 if tag_filters.is_empty() {
367 String::new()
368 } else {
369 format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
370 }
371 } else {
372 String::new()
373 };
374
375 let mut query_string = format!(
376 "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded, tt.size AS file_size,
377 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
378 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
379 FROM torrust_torrents tt
380 {category_filter_query}
381 {tag_filter_query}
382 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
383 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
384 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
385 WHERE title LIKE ?
386 GROUP BY tt.torrent_id"
387 );
388
389 let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
390
391 let count_result: Result<i64, database::Error> = query_as(&count_query)
392 .bind(title.clone())
393 .fetch_one(&self.pool)
394 .await
395 .map(|(v,)| v)
396 .map_err(|_| database::Error::Error);
397
398 let count = count_result?;
399
400 query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
401
402 let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
403 .bind(title)
404 .bind(i64::saturating_add_unsigned(0, offset))
405 .bind(limit)
406 .fetch_all(&self.pool)
407 .await
408 .map_err(|_| database::Error::Error)?;
409
410 Ok(TorrentsResponse {
411 total: u32::try_from(count).expect("variable `count` is larger than u32"),
412 results: res,
413 })
414 }
415
416 #[allow(clippy::too_many_lines)]
417 async fn insert_torrent_and_get_id(
418 &self,
419 torrent: &Torrent,
420 uploader_id: UserId,
421 category_id: i64,
422 title: &str,
423 description: &str,
424 ) -> Result<i64, database::Error> {
425 let info_hash = torrent.info_hash();
426
427 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
429
430 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
432
433 let (pieces, root_hash): (String, bool) = if let Some(pieces) = &torrent.info.pieces {
435 (from_bytes(pieces.as_ref()), false)
436 } else {
437 let root_hash = torrent.info.root_hash.as_ref().ok_or(database::Error::Error)?;
438 (root_hash.to_string(), true)
439 };
440
441 let private = torrent.info.private.unwrap_or(0);
442
443 let torrent_id = query("INSERT INTO torrust_torrents (uploader_id, category_id, info_hash, size, name, pieces, piece_length, private, root_hash, date_uploaded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, UTC_TIMESTAMP())")
445 .bind(uploader_id)
446 .bind(category_id)
447 .bind(info_hash.to_lowercase())
448 .bind(torrent.file_size())
449 .bind(torrent.info.name.to_string())
450 .bind(pieces)
451 .bind(torrent.info.piece_length)
452 .bind(private)
453 .bind(root_hash)
454 .execute(&self.pool)
455 .await
456 .map(|v| i64::try_from(v.last_insert_id()).expect("last ID is larger than i64"))
457 .map_err(|e| match e {
458 sqlx::Error::Database(err) => {
459 if err.message().contains("info_hash") {
460 database::Error::TorrentAlreadyExists
461 } else if err.message().contains("title") {
462 database::Error::TorrentTitleAlreadyExists
463 } else {
464 database::Error::Error
465 }
466 }
467 _ => database::Error::Error
468 })?;
469
470 let insert_torrent_files_result = if let Some(length) = torrent.info.length {
471 query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
472 .bind(torrent.info.md5sum.clone())
473 .bind(torrent_id)
474 .bind(length)
475 .execute(&mut tx)
476 .await
477 .map(|_| ())
478 .map_err(|_| database::Error::Error)
479 } else {
480 let files = torrent.info.files.as_ref().unwrap();
481
482 for file in files {
483 let path = file.path.join("/");
484
485 let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
486 .bind(file.md5sum.clone())
487 .bind(torrent_id)
488 .bind(file.length)
489 .bind(path)
490 .execute(&mut tx)
491 .await
492 .map_err(|_| database::Error::Error)?;
493 }
494
495 Ok(())
496 };
497
498 if let Err(e) = insert_torrent_files_result {
500 let _ = tx.rollback().await;
501 return Err(e);
502 }
503
504 let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
505 {
506 let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
508
509 for tracker_url in &announce_urls {
510 let _ = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
511 .bind(torrent_id)
512 .bind(tracker_url)
513 .execute(&mut tx)
514 .await
515 .map(|_| ())
516 .map_err(|_| database::Error::Error)?;
517 }
518
519 Ok(())
520 } else {
521 let tracker_url = torrent.announce.as_ref().unwrap();
522
523 query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
524 .bind(torrent_id)
525 .bind(tracker_url)
526 .execute(&mut tx)
527 .await
528 .map(|_| ())
529 .map_err(|_| database::Error::Error)
530 };
531
532 if let Err(e) = insert_torrent_announce_urls_result {
534 let _ = tx.rollback().await;
535 return Err(e);
536 }
537
538 let insert_torrent_info_result =
539 query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
540 .bind(torrent_id)
541 .bind(title)
542 .bind(description)
543 .execute(&mut tx)
544 .await
545 .map_err(|e| match e {
546 sqlx::Error::Database(err) => {
547 if err.message().contains("info_hash") {
548 database::Error::TorrentAlreadyExists
549 } else if err.message().contains("title") {
550 database::Error::TorrentTitleAlreadyExists
551 } else {
552 database::Error::Error
553 }
554 }
555 _ => database::Error::Error,
556 });
557
558 match insert_torrent_info_result {
560 Ok(_) => {
561 let _ = tx.commit().await;
562 Ok(torrent_id)
563 }
564 Err(e) => {
565 let _ = tx.rollback().await;
566 Err(e)
567 }
568 }
569 }
570
571 async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrentInfo, database::Error> {
572 query_as::<_, DbTorrentInfo>(
573 "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE torrent_id = ?",
574 )
575 .bind(torrent_id)
576 .fetch_one(&self.pool)
577 .await
578 .map_err(|_| database::Error::TorrentNotFound)
579 }
580
581 async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrentInfo, database::Error> {
582 query_as::<_, DbTorrentInfo>(
583 "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE info_hash = ?",
584 )
585 .bind(info_hash.to_hex_string().to_lowercase())
586 .fetch_one(&self.pool)
587 .await
588 .map_err(|_| database::Error::TorrentNotFound)
589 }
590
591 async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
592 let db_torrent_files =
593 query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
594 .bind(torrent_id)
595 .fetch_all(&self.pool)
596 .await
597 .map_err(|_| database::Error::TorrentNotFound)?;
598
599 let torrent_files: Vec<TorrentFile> = db_torrent_files
600 .into_iter()
601 .map(|tf| TorrentFile {
602 path: tf
603 .path
604 .unwrap_or_default()
605 .split('/')
606 .map(std::string::ToString::to_string)
607 .collect(),
608 length: tf.length,
609 md5sum: tf.md5sum,
610 })
611 .collect();
612
613 Ok(torrent_files)
614 }
615
616 async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
617 query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
618 .bind(torrent_id)
619 .fetch_all(&self.pool)
620 .await
621 .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
622 .map_err(|_| database::Error::TorrentNotFound)
623 }
624
625 async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
626 query_as::<_, TorrentListing>(
627 "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded, tt.size AS file_size,
628 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
629 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
630 FROM torrust_torrents tt
631 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
632 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
633 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
634 WHERE tt.torrent_id = ?
635 GROUP BY torrent_id"
636 )
637 .bind(torrent_id)
638 .fetch_one(&self.pool)
639 .await
640 .map_err(|_| database::Error::TorrentNotFound)
641 }
642
643 async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
644 query_as::<_, TorrentListing>(
645 "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, DATE_FORMAT(tt.date_uploaded, '%Y-%m-%d %H:%i:%s') AS date_uploaded, tt.size AS file_size,
646 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
647 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
648 FROM torrust_torrents tt
649 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
650 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
651 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
652 WHERE tt.info_hash = ?
653 GROUP BY torrent_id"
654 )
655 .bind(info_hash.to_hex_string().to_lowercase())
656 .fetch_one(&self.pool)
657 .await
658 .map_err(|_| database::Error::TorrentNotFound)
659 }
660
661 async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
662 query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
663 .fetch_all(&self.pool)
664 .await
665 .map_err(|_| database::Error::Error)
666 }
667
668 async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
669 query("UPDATE torrust_torrent_info SET title = ? WHERE torrent_id = ?")
670 .bind(title)
671 .bind(torrent_id)
672 .execute(&self.pool)
673 .await
674 .map_err(|e| match e {
675 sqlx::Error::Database(err) => {
676 if err.message().contains("UNIQUE") {
677 database::Error::TorrentTitleAlreadyExists
678 } else {
679 database::Error::Error
680 }
681 }
682 _ => database::Error::Error,
683 })
684 .and_then(|v| {
685 if v.rows_affected() > 0 {
686 Ok(())
687 } else {
688 Err(database::Error::TorrentNotFound)
689 }
690 })
691 }
692
693 async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
694 query("UPDATE torrust_torrent_info SET description = ? WHERE torrent_id = ?")
695 .bind(description)
696 .bind(torrent_id)
697 .execute(&self.pool)
698 .await
699 .map_err(|_| database::Error::Error)
700 .and_then(|v| {
701 if v.rows_affected() > 0 {
702 Ok(())
703 } else {
704 Err(database::Error::TorrentNotFound)
705 }
706 })
707 }
708
709 async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
710 query("UPDATE torrust_torrents SET category_id = ? WHERE torrent_id = ?")
711 .bind(category_id)
712 .bind(torrent_id)
713 .execute(&self.pool)
714 .await
715 .map_err(|_| database::Error::Error)
716 .and_then(|v| {
717 if v.rows_affected() > 0 {
718 Ok(())
719 } else {
720 Err(database::Error::TorrentNotFound)
721 }
722 })
723 }
724
725 async fn add_tag(&self, name: &str) -> Result<(), database::Error> {
726 query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
727 .bind(name)
728 .execute(&self.pool)
729 .await
730 .map(|_| ())
731 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
732 }
733
734 async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
735 query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
736 .bind(tag_id)
737 .execute(&self.pool)
738 .await
739 .map(|_| ())
740 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
741 }
742
743 async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
744 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
745 .bind(torrent_id)
746 .bind(tag_id)
747 .execute(&self.pool)
748 .await
749 .map(|_| ())
750 .map_err(|_| database::Error::Error)
751 }
752
753 async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
754 let mut transaction = self
755 .pool
756 .begin()
757 .await
758 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
759
760 for tag_id in tag_ids {
761 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
762 .bind(torrent_id)
763 .bind(tag_id)
764 .execute(&mut transaction)
765 .await
766 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
767 }
768
769 transaction
770 .commit()
771 .await
772 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
773 }
774
775 async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
776 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
777 .bind(torrent_id)
778 .bind(tag_id)
779 .execute(&self.pool)
780 .await
781 .map(|_| ())
782 .map_err(|_| database::Error::Error)
783 }
784
785 async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
786 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
787 .bind(torrent_id)
788 .execute(&self.pool)
789 .await
790 .map(|_| ())
791 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
792 }
793
794 async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
795 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
796 .bind(name)
797 .fetch_one(&self.pool)
798 .await
799 .map_err(|_| database::Error::TagNotFound)
800 }
801
802 async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
803 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
804 .fetch_all(&self.pool)
805 .await
806 .map_err(|_| database::Error::Error)
807 }
808
809 async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
810 query_as::<_, TorrentTag>(
811 "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
812 FROM torrust_torrent_tags
813 JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
814 WHERE torrust_torrent_tag_links.torrent_id = ?",
815 )
816 .bind(torrent_id)
817 .fetch_all(&self.pool)
818 .await
819 .map_err(|_| database::Error::Error)
820 }
821
822 async fn update_tracker_info(
823 &self,
824 torrent_id: i64,
825 tracker_url: &str,
826 seeders: i64,
827 leechers: i64,
828 ) -> Result<(), database::Error> {
829 query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES (?, ?, ?, ?)")
830 .bind(torrent_id)
831 .bind(tracker_url)
832 .bind(seeders)
833 .bind(leechers)
834 .execute(&self.pool)
835 .await
836 .map(|_| ())
837 .map_err(|_| database::Error::TorrentNotFound)
838 }
839
840 async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
841 query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
842 .bind(torrent_id)
843 .execute(&self.pool)
844 .await
845 .map_err(|_| database::Error::Error)
846 .and_then(|v| {
847 if v.rows_affected() > 0 {
848 Ok(())
849 } else {
850 Err(database::Error::TorrentNotFound)
851 }
852 })
853 }
854
855 async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
856 for table in TABLES_TO_TRUNCATE {
857 query(&format!("DELETE FROM {table};"))
858 .execute(&self.pool)
859 .await
860 .map_err(|_| database::Error::Error)?;
861 }
862
863 Ok(())
864 }
865}