1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::NaiveDateTime;
6use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};
8
9use super::database::TABLES_TO_TRUNCATE;
10use crate::databases::database;
11use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
12use crate::models::category::CategoryId;
13use crate::models::info_hash::InfoHash;
14use crate::models::response::TorrentsResponse;
15use crate::models::torrent::TorrentListing;
16use crate::models::torrent_file::{DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentInfo, Torrent, TorrentFile};
17use crate::models::torrent_tag::{TagId, TorrentTag};
18use crate::models::tracker_key::TrackerKey;
19use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
20use crate::utils::clock;
21use crate::utils::hex::from_bytes;
22
23pub struct Sqlite {
24 pub pool: SqlitePool,
25}
26
27#[async_trait]
28impl Database for Sqlite {
29 fn get_database_driver(&self) -> Driver {
30 Driver::Sqlite3
31 }
32
33 async fn new(database_url: &str) -> Self {
34 let mut connection_options = SqliteConnectOptions::from_str(database_url).expect("Unable to create connection options.");
35 connection_options
36 .log_statements(log::LevelFilter::Error)
37 .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(1));
38
39 let db = SqlitePoolOptions::new()
40 .connect_with(connection_options)
41 .await
42 .expect("Unable to create database pool.");
43
44 sqlx::migrate!("migrations/sqlite3")
45 .run(&db)
46 .await
47 .expect("Could not run database migrations.");
48
49 Self { pool: db }
50 }
51
52 async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
53 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
55
56 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
58
59 let user_id =
61 query("INSERT INTO torrust_users (date_registered) VALUES (strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')))")
62 .execute(&mut tx)
63 .await
64 .map(|v| v.last_insert_rowid())
65 .map_err(|_| database::Error::Error)?;
66
67 let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
69 .bind(user_id)
70 .bind(password_hash)
71 .execute(&mut tx)
72 .await
73 .map_err(|_| database::Error::Error);
74
75 if let Err(e) = insert_user_auth_result {
77 let _ = tx.rollback().await;
78 return Err(e);
79 }
80
81 let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
83 .bind(user_id)
84 .bind(username)
85 .bind(email)
86 .execute(&mut tx)
87 .await
88 .map_err(|e| match e {
89 sqlx::Error::Database(err) => {
90 if err.message().contains("username") {
91 database::Error::UsernameTaken
92 } else if err.message().contains("email") {
93 database::Error::EmailTaken
94 } else {
95 database::Error::Error
96 }
97 }
98 _ => database::Error::Error
99 });
100
101 match insert_user_profile_result {
103 Ok(_) => {
104 let _ = tx.commit().await;
105 Ok(user_id)
106 }
107 Err(e) => {
108 let _ = tx.rollback().await;
109 Err(e)
110 }
111 }
112 }
113
114 async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
115 query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
116 .bind(user_id)
117 .fetch_one(&self.pool)
118 .await
119 .map_err(|_| database::Error::UserNotFound)
120 }
121
122 async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
123 query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
124 .bind(user_id)
125 .fetch_one(&self.pool)
126 .await
127 .map_err(|_| database::Error::UserNotFound)
128 }
129
130 async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
131 query_as::<_, UserProfile>("SELECT * FROM torrust_user_profiles WHERE username = ?")
132 .bind(username)
133 .fetch_one(&self.pool)
134 .await
135 .map_err(|_| database::Error::UserNotFound)
136 }
137
138 async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
139 query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
140 .bind(user_id)
141 .fetch_one(&self.pool)
142 .await
143 .map_err(|_| database::Error::UserNotFound)
144 }
145
146 async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
147 const HOUR_IN_SECONDS: i64 = 3600;
148
149 let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
151
152 query_as::<_, TrackerKey>("SELECT tracker_key AS key, date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = $1 AND date_expiry > $2 ORDER BY date_expiry DESC")
154 .bind(user_id)
155 .bind(current_time_plus_hour)
156 .fetch_one(&self.pool)
157 .await
158 .ok()
159 }
160
161 async fn count_users(&self) -> Result<i64, database::Error> {
162 query_as("SELECT COUNT(*) FROM torrust_users")
163 .fetch_one(&self.pool)
164 .await
165 .map(|(v,)| v)
166 .map_err(|_| database::Error::Error)
167 }
168
169 async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
170 let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
172
173 query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES ($1, $2, $3)")
174 .bind(user_id)
175 .bind(reason)
176 .bind(date_expiry_string)
177 .execute(&self.pool)
178 .await
179 .map(|_| ())
180 .map_err(|_| database::Error::Error)
181 }
182
183 async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
184 query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
185 .bind(user_id)
186 .execute(&self.pool)
187 .await
188 .map_err(|_| database::Error::Error)
189 .and_then(|v| {
190 if v.rows_affected() > 0 {
191 Ok(())
192 } else {
193 Err(database::Error::UserNotFound)
194 }
195 })
196 }
197
198 async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
199 query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
200 .bind(user_id)
201 .execute(&self.pool)
202 .await
203 .map(|_| ())
204 .map_err(|_| database::Error::Error)
205 }
206
207 async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
208 let key = tracker_key.key.clone();
209
210 query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES ($1, $2, $3)")
211 .bind(user_id)
212 .bind(key)
213 .bind(tracker_key.valid_until)
214 .execute(&self.pool)
215 .await
216 .map(|_| ())
217 .map_err(|_| database::Error::Error)
218 }
219
220 async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
221 query("DELETE FROM torrust_users WHERE user_id = ?")
222 .bind(user_id)
223 .execute(&self.pool)
224 .await
225 .map_err(|_| database::Error::Error)
226 .and_then(|v| {
227 if v.rows_affected() > 0 {
228 Ok(())
229 } else {
230 Err(database::Error::UserNotFound)
231 }
232 })
233 }
234
235 async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
236 query("INSERT INTO torrust_categories (name) VALUES (?)")
237 .bind(category_name)
238 .execute(&self.pool)
239 .await
240 .map(|v| v.last_insert_rowid())
241 .map_err(|e| match e {
242 sqlx::Error::Database(err) => {
243 if err.message().contains("UNIQUE") {
244 database::Error::CategoryAlreadyExists
245 } else {
246 database::Error::Error
247 }
248 }
249 _ => database::Error::Error,
250 })
251 }
252
253 async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
254 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
255 .bind(category_id)
256 .fetch_one(&self.pool)
257 .await
258 .map_err(|_| database::Error::CategoryNotFound)
259 }
260
261 async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
262 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
263 .bind(category_name)
264 .fetch_one(&self.pool)
265 .await
266 .map_err(|_| database::Error::CategoryNotFound)
267 }
268
269 async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
270 query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name")
271 .fetch_all(&self.pool)
272 .await
273 .map_err(|_| database::Error::Error)
274 }
275
276 async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
277 query("DELETE FROM torrust_categories WHERE name = ?")
278 .bind(category_name)
279 .execute(&self.pool)
280 .await
281 .map_err(|_| database::Error::Error)
282 .and_then(|v| {
283 if v.rows_affected() > 0 {
284 Ok(())
285 } else {
286 Err(database::Error::CategoryNotFound)
287 }
288 })
289 }
290
291 async fn get_torrents_search_sorted_paginated(
293 &self,
294 search: &Option<String>,
295 categories: &Option<Vec<String>>,
296 tags: &Option<Vec<String>>,
297 sort: &Sorting,
298 offset: u64,
299 limit: u8,
300 ) -> Result<TorrentsResponse, database::Error> {
301 let title = match search {
302 None => "%".to_string(),
303 Some(v) => format!("%{v}%"),
304 };
305
306 let sort_query: String = match sort {
307 Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
308 Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
309 Sorting::SeedersAsc => "seeders ASC".to_string(),
310 Sorting::SeedersDesc => "seeders DESC".to_string(),
311 Sorting::LeechersAsc => "leechers ASC".to_string(),
312 Sorting::LeechersDesc => "leechers DESC".to_string(),
313 Sorting::NameAsc => "title ASC".to_string(),
314 Sorting::NameDesc => "title DESC".to_string(),
315 Sorting::SizeAsc => "size ASC".to_string(),
316 Sorting::SizeDesc => "size DESC".to_string(),
317 };
318
319 let category_filter_query = if let Some(c) = categories {
320 let mut i = 0;
321 let mut category_filters = String::new();
322 for category in c {
323 if let Ok(sanitized_category) = self.get_category_from_name(category).await {
325 let mut str = format!("tc.name = '{}'", sanitized_category.name);
326 if i > 0 {
327 str = format!(" OR {str}");
328 }
329 category_filters.push_str(&str);
330 i += 1;
331 }
332 }
333 if category_filters.is_empty() {
334 String::new()
335 } else {
336 format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
337 }
338 } else {
339 String::new()
340 };
341
342 let tag_filter_query = if let Some(t) = tags {
343 let mut i = 0;
344 let mut tag_filters = String::new();
345 for tag in t {
346 if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
348 let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
349 if i > 0 {
350 str = format!(" OR {str}");
351 }
352 tag_filters.push_str(&str);
353 i += 1;
354 }
355 }
356 if tag_filters.is_empty() {
357 String::new()
358 } else {
359 format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
360 }
361 } else {
362 String::new()
363 };
364
365 let mut query_string = format!(
366 "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, tt.date_uploaded, tt.size AS file_size,
367 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
368 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
369 FROM torrust_torrents tt
370 {category_filter_query}
371 {tag_filter_query}
372 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
373 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
374 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
375 WHERE title LIKE ?
376 GROUP BY tt.torrent_id"
377 );
378
379 let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
380
381 let count_result: Result<i64, database::Error> = query_as(&count_query)
382 .bind(title.clone())
383 .fetch_one(&self.pool)
384 .await
385 .map(|(v,)| v)
386 .map_err(|_| database::Error::Error);
387
388 let count = count_result?;
389
390 query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
391
392 let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
393 .bind(title)
394 .bind(i64::saturating_add_unsigned(0, offset))
395 .bind(limit)
396 .fetch_all(&self.pool)
397 .await
398 .map_err(|_| database::Error::Error)?;
399
400 Ok(TorrentsResponse {
401 total: u32::try_from(count).expect("variable `count` is larger than u32"),
402 results: res,
403 })
404 }
405
406 #[allow(clippy::too_many_lines)]
407 async fn insert_torrent_and_get_id(
408 &self,
409 torrent: &Torrent,
410 uploader_id: UserId,
411 category_id: i64,
412 title: &str,
413 description: &str,
414 ) -> Result<i64, database::Error> {
415 let info_hash = torrent.info_hash();
416
417 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
419
420 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
422
423 let (pieces, root_hash): (String, bool) = if let Some(pieces) = &torrent.info.pieces {
425 (from_bytes(pieces.as_ref()), false)
426 } else {
427 let root_hash = torrent.info.root_hash.as_ref().ok_or(database::Error::Error)?;
428 (root_hash.to_string(), true)
429 };
430
431 let private = torrent.info.private.unwrap_or(0);
432
433 let torrent_id = query("INSERT INTO torrust_torrents (uploader_id, category_id, info_hash, size, name, pieces, piece_length, private, root_hash, date_uploaded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')))")
435 .bind(uploader_id)
436 .bind(category_id)
437 .bind(info_hash.to_lowercase())
438 .bind(torrent.file_size())
439 .bind(torrent.info.name.to_string())
440 .bind(pieces)
441 .bind(torrent.info.piece_length)
442 .bind(private)
443 .bind(root_hash)
444 .execute(&self.pool)
445 .await
446 .map(|v| v.last_insert_rowid())
447 .map_err(|e| match e {
448 sqlx::Error::Database(err) => {
449 if err.message().contains("info_hash") {
450 database::Error::TorrentAlreadyExists
451 } else if err.message().contains("title") {
452 database::Error::TorrentTitleAlreadyExists
453 } else {
454 database::Error::Error
455 }
456 }
457 _ => database::Error::Error
458 })?;
459
460 let insert_torrent_files_result = if let Some(length) = torrent.info.length {
461 query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
462 .bind(torrent.info.md5sum.clone())
463 .bind(torrent_id)
464 .bind(length)
465 .execute(&mut tx)
466 .await
467 .map(|_| ())
468 .map_err(|_| database::Error::Error)
469 } else {
470 let files = torrent.info.files.as_ref().unwrap();
471
472 for file in files {
473 let path = file.path.join("/");
474
475 let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
476 .bind(file.md5sum.clone())
477 .bind(torrent_id)
478 .bind(file.length)
479 .bind(path)
480 .execute(&mut tx)
481 .await
482 .map_err(|_| database::Error::Error)?;
483 }
484
485 Ok(())
486 };
487
488 if let Err(e) = insert_torrent_files_result {
490 let _ = tx.rollback().await;
491 return Err(e);
492 }
493
494 let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
495 {
496 let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
498
499 for tracker_url in &announce_urls {
500 let _ = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
501 .bind(torrent_id)
502 .bind(tracker_url)
503 .execute(&mut tx)
504 .await
505 .map(|_| ())
506 .map_err(|_| database::Error::Error)?;
507 }
508
509 Ok(())
510 } else {
511 let tracker_url = torrent.announce.as_ref().unwrap();
512
513 query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
514 .bind(torrent_id)
515 .bind(tracker_url)
516 .execute(&mut tx)
517 .await
518 .map(|_| ())
519 .map_err(|_| database::Error::Error)
520 };
521
522 if let Err(e) = insert_torrent_announce_urls_result {
524 let _ = tx.rollback().await;
525 return Err(e);
526 }
527
528 let insert_torrent_info_result =
529 query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
530 .bind(torrent_id)
531 .bind(title)
532 .bind(description)
533 .execute(&mut tx)
534 .await
535 .map_err(|e| match e {
536 sqlx::Error::Database(err) => {
537 if err.message().contains("info_hash") {
538 database::Error::TorrentAlreadyExists
539 } else if err.message().contains("title") {
540 database::Error::TorrentTitleAlreadyExists
541 } else {
542 database::Error::Error
543 }
544 }
545 _ => database::Error::Error,
546 });
547
548 match insert_torrent_info_result {
550 Ok(_) => {
551 let _ = tx.commit().await;
552 Ok(torrent_id)
553 }
554 Err(e) => {
555 let _ = tx.rollback().await;
556 Err(e)
557 }
558 }
559 }
560
561 async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrentInfo, database::Error> {
562 query_as::<_, DbTorrentInfo>(
563 "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE torrent_id = ?",
564 )
565 .bind(torrent_id)
566 .fetch_one(&self.pool)
567 .await
568 .map_err(|_| database::Error::TorrentNotFound)
569 }
570
571 async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrentInfo, database::Error> {
572 query_as::<_, DbTorrentInfo>(
573 "SELECT torrent_id, info_hash, name, pieces, piece_length, private, root_hash FROM torrust_torrents WHERE info_hash = ?",
574 )
575 .bind(info_hash.to_hex_string().to_lowercase())
576 .fetch_one(&self.pool)
577 .await
578 .map_err(|_| database::Error::TorrentNotFound)
579 }
580
581 async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
582 let db_torrent_files =
583 query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
584 .bind(torrent_id)
585 .fetch_all(&self.pool)
586 .await
587 .map_err(|_| database::Error::TorrentNotFound)?;
588
589 let torrent_files: Vec<TorrentFile> = db_torrent_files
590 .into_iter()
591 .map(|tf| TorrentFile {
592 path: tf
593 .path
594 .unwrap_or_default()
595 .split('/')
596 .map(std::string::ToString::to_string)
597 .collect(),
598 length: tf.length,
599 md5sum: tf.md5sum,
600 })
601 .collect();
602
603 Ok(torrent_files)
604 }
605
606 async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
607 query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
608 .bind(torrent_id)
609 .fetch_all(&self.pool)
610 .await
611 .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
612 .map_err(|_| database::Error::TorrentNotFound)
613 }
614
615 async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
616 query_as::<_, TorrentListing>(
617 "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, tt.date_uploaded, tt.size AS file_size,
618 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
619 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
620 FROM torrust_torrents tt
621 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
622 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
623 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
624 WHERE tt.torrent_id = ?
625 GROUP BY ts.torrent_id"
626 )
627 .bind(torrent_id)
628 .fetch_one(&self.pool)
629 .await
630 .map_err(|_| database::Error::TorrentNotFound)
631 }
632
633 async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
634 query_as::<_, TorrentListing>(
635 "SELECT tt.torrent_id, tp.username AS uploader, tt.info_hash, ti.title, ti.description, tt.category_id, tt.date_uploaded, tt.size AS file_size,
636 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
637 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
638 FROM torrust_torrents tt
639 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
640 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
641 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
642 WHERE tt.info_hash = ?
643 GROUP BY ts.torrent_id"
644 )
645 .bind(info_hash.to_string().to_lowercase())
646 .fetch_one(&self.pool)
647 .await
648 .map_err(|_| database::Error::TorrentNotFound)
649 }
650
651 async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
652 query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
653 .fetch_all(&self.pool)
654 .await
655 .map_err(|_| database::Error::Error)
656 }
657
658 async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
659 query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
660 .bind(title)
661 .bind(torrent_id)
662 .execute(&self.pool)
663 .await
664 .map_err(|e| match e {
665 sqlx::Error::Database(err) => {
666 if err.message().contains("UNIQUE") {
667 database::Error::TorrentTitleAlreadyExists
668 } else {
669 database::Error::Error
670 }
671 }
672 _ => database::Error::Error,
673 })
674 .and_then(|v| {
675 if v.rows_affected() > 0 {
676 Ok(())
677 } else {
678 Err(database::Error::TorrentNotFound)
679 }
680 })
681 }
682
683 async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
684 query("UPDATE torrust_torrent_info SET description = $1 WHERE torrent_id = $2")
685 .bind(description)
686 .bind(torrent_id)
687 .execute(&self.pool)
688 .await
689 .map_err(|_| database::Error::Error)
690 .and_then(|v| {
691 if v.rows_affected() > 0 {
692 Ok(())
693 } else {
694 Err(database::Error::TorrentNotFound)
695 }
696 })
697 }
698
699 async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
700 query("UPDATE torrust_torrents SET category_id = $1 WHERE torrent_id = $2")
701 .bind(category_id)
702 .bind(torrent_id)
703 .execute(&self.pool)
704 .await
705 .map_err(|_| database::Error::Error)
706 .and_then(|v| {
707 if v.rows_affected() > 0 {
708 Ok(())
709 } else {
710 Err(database::Error::TorrentNotFound)
711 }
712 })
713 }
714
715 async fn add_tag(&self, name: &str) -> Result<(), database::Error> {
716 query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
717 .bind(name)
718 .execute(&self.pool)
719 .await
720 .map(|_| ())
721 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
722 }
723
724 async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
725 query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
726 .bind(tag_id)
727 .execute(&self.pool)
728 .await
729 .map(|_| ())
730 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
731 }
732
733 async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
734 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
735 .bind(torrent_id)
736 .bind(tag_id)
737 .execute(&self.pool)
738 .await
739 .map(|_| ())
740 .map_err(|_| database::Error::Error)
741 }
742
743 async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
744 let mut transaction = self
745 .pool
746 .begin()
747 .await
748 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
749
750 for tag_id in tag_ids {
751 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
752 .bind(torrent_id)
753 .bind(tag_id)
754 .execute(&mut transaction)
755 .await
756 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
757 }
758
759 transaction
760 .commit()
761 .await
762 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
763 }
764
765 async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
766 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
767 .bind(torrent_id)
768 .bind(tag_id)
769 .execute(&self.pool)
770 .await
771 .map(|_| ())
772 .map_err(|_| database::Error::Error)
773 }
774
775 async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
776 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
777 .bind(torrent_id)
778 .execute(&self.pool)
779 .await
780 .map(|_| ())
781 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
782 }
783
784 async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
785 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
786 .bind(name)
787 .fetch_one(&self.pool)
788 .await
789 .map_err(|_| database::Error::TagNotFound)
790 }
791
792 async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
793 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
794 .fetch_all(&self.pool)
795 .await
796 .map_err(|_| database::Error::Error)
797 }
798
799 async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
800 query_as::<_, TorrentTag>(
801 "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
802 FROM torrust_torrent_tags
803 JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
804 WHERE torrust_torrent_tag_links.torrent_id = ?",
805 )
806 .bind(torrent_id)
807 .fetch_all(&self.pool)
808 .await
809 .map_err(|_| database::Error::Error)
810 }
811
812 async fn update_tracker_info(
813 &self,
814 torrent_id: i64,
815 tracker_url: &str,
816 seeders: i64,
817 leechers: i64,
818 ) -> Result<(), database::Error> {
819 query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers) VALUES ($1, $2, $3, $4)")
820 .bind(torrent_id)
821 .bind(tracker_url)
822 .bind(seeders)
823 .bind(leechers)
824 .execute(&self.pool)
825 .await
826 .map(|_| ())
827 .map_err(|_| database::Error::TorrentNotFound)
828 }
829
830 async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
831 query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
832 .bind(torrent_id)
833 .execute(&self.pool)
834 .await
835 .map_err(|_| database::Error::Error)
836 .and_then(|v| {
837 if v.rows_affected() > 0 {
838 Ok(())
839 } else {
840 Err(database::Error::TorrentNotFound)
841 }
842 })
843 }
844
845 async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
846 for table in TABLES_TO_TRUNCATE {
847 query(&format!("DELETE FROM {table};"))
848 .execute(&self.pool)
849 .await
850 .map_err(|_| database::Error::Error)?;
851 }
852
853 Ok(())
854 }
855}