1use std::str::FromStr;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
7use sqlx::{query, query_as, Acquire, ConnectOptions, SqlitePool};
8use url::Url;
9
10use super::database::TABLES_TO_TRUNCATE;
11use crate::databases::database;
12use crate::databases::database::{Category, Database, Driver, Sorting, TorrentCompact};
13use crate::models::category::CategoryId;
14use crate::models::info_hash::InfoHash;
15use crate::models::response::TorrentsResponse;
16use crate::models::torrent::{Metadata, TorrentListing};
17use crate::models::torrent_file::{
18 DbTorrent, DbTorrentAnnounceUrl, DbTorrentFile, DbTorrentHttpSeedUrl, DbTorrentNode, Torrent, TorrentFile,
19};
20use crate::models::torrent_tag::{TagId, TorrentTag};
21use crate::models::tracker_key::TrackerKey;
22use crate::models::user::{User, UserAuthentication, UserCompact, UserId, UserProfile};
23use crate::services::torrent::{CanonicalInfoHashGroup, DbTorrentInfoHash};
24use crate::utils::clock::{self, datetime_now, DATETIME_FORMAT};
25use crate::utils::hex::from_bytes;
26
27pub struct Sqlite {
28 pub pool: SqlitePool,
29}
30
31#[async_trait]
32impl Database for Sqlite {
33 fn get_database_driver(&self) -> Driver {
34 Driver::Sqlite3
35 }
36
37 async fn new(database_url: &str) -> Self {
38 let connection_options = SqliteConnectOptions::from_str(database_url)
39 .expect("Unable to create connection options.")
40 .log_statements(log::LevelFilter::Debug)
41 .log_slow_statements(log::LevelFilter::Info, Duration::from_secs(1));
42
43 let db = SqlitePoolOptions::new()
44 .connect_with(connection_options)
45 .await
46 .expect("Unable to create database pool.");
47
48 sqlx::migrate!("migrations/sqlite3")
49 .run(&db)
50 .await
51 .expect("Could not run database migrations.");
52
53 Self { pool: db }
54 }
55
56 async fn insert_user_and_get_id(&self, username: &str, email: &str, password_hash: &str) -> Result<i64, database::Error> {
57 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
59
60 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
62
63 let user_id =
65 query("INSERT INTO torrust_users (date_registered) VALUES (strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')))")
66 .execute(&mut *tx)
67 .await
68 .map(|v| v.last_insert_rowid())
69 .map_err(|_| database::Error::Error)?;
70
71 let insert_user_auth_result = query("INSERT INTO torrust_user_authentication (user_id, password_hash) VALUES (?, ?)")
73 .bind(user_id)
74 .bind(password_hash)
75 .execute(&mut *tx)
76 .await
77 .map_err(|_| database::Error::Error);
78
79 if let Err(e) = insert_user_auth_result {
81 drop(tx.rollback().await);
82 return Err(e);
83 }
84
85 let insert_user_profile_result = query(r#"INSERT INTO torrust_user_profiles (user_id, username, email, email_verified, bio, avatar) VALUES (?, ?, NULLIF(?, ""), 0, NULL, NULL)"#)
87 .bind(user_id)
88 .bind(username)
89 .bind(email)
90 .execute(&mut *tx)
91 .await
92 .map_err(|e| match e {
93 sqlx::Error::Database(err) => {
94 if err.message().contains("username") {
95 database::Error::UsernameTaken
96 } else if err.message().contains("email") {
97 database::Error::EmailTaken
98 } else {
99 database::Error::Error
100 }
101 }
102 _ => database::Error::Error
103 });
104
105 match insert_user_profile_result {
107 Ok(_) => {
108 drop(tx.commit().await);
109 Ok(user_id)
110 }
111 Err(e) => {
112 drop(tx.rollback().await);
113 Err(e)
114 }
115 }
116 }
117
118 async fn change_user_password(&self, user_id: i64, new_password: &str) -> Result<(), database::Error> {
120 query("UPDATE torrust_user_authentication SET password_hash = ? WHERE user_id = ?")
121 .bind(new_password)
122 .bind(user_id)
123 .execute(&self.pool)
124 .await
125 .map_err(|_| database::Error::Error)
126 .and_then(|v| {
127 if v.rows_affected() > 0 {
128 Ok(())
129 } else {
130 Err(database::Error::UserNotFound)
131 }
132 })
133 }
134
135 async fn get_user_from_id(&self, user_id: i64) -> Result<User, database::Error> {
136 query_as::<_, User>("SELECT * FROM torrust_users WHERE user_id = ?")
137 .bind(user_id)
138 .fetch_one(&self.pool)
139 .await
140 .map_err(|_| database::Error::UserNotFound)
141 }
142
143 async fn get_user_authentication_from_id(&self, user_id: UserId) -> Result<UserAuthentication, database::Error> {
144 query_as::<_, UserAuthentication>("SELECT * FROM torrust_user_authentication WHERE user_id = ?")
145 .bind(user_id)
146 .fetch_one(&self.pool)
147 .await
148 .map_err(|_| database::Error::UserNotFound)
149 }
150
151 async fn get_user_profile_from_username(&self, username: &str) -> Result<UserProfile, database::Error> {
152 query_as::<_, UserProfile>("SELECT * FROM torrust_user_profiles WHERE username = ?")
153 .bind(username)
154 .fetch_one(&self.pool)
155 .await
156 .map_err(|_| database::Error::UserNotFound)
157 }
158
159 async fn get_user_compact_from_id(&self, user_id: i64) -> Result<UserCompact, database::Error> {
160 query_as::<_, UserCompact>("SELECT tu.user_id, tp.username, tu.administrator FROM torrust_users tu INNER JOIN torrust_user_profiles tp ON tu.user_id = tp.user_id WHERE tu.user_id = ?")
161 .bind(user_id)
162 .fetch_one(&self.pool)
163 .await
164 .map_err(|_| database::Error::UserNotFound)
165 }
166
167 async fn get_user_tracker_key(&self, user_id: i64) -> Option<TrackerKey> {
168 const HOUR_IN_SECONDS: i64 = 3600;
169
170 let current_time_plus_hour = i64::try_from(clock::now()).unwrap().saturating_add(HOUR_IN_SECONDS);
172
173 query_as::<_, TrackerKey>("SELECT tracker_key AS key, date_expiry AS valid_until FROM torrust_tracker_keys WHERE user_id = $1 AND date_expiry > $2 ORDER BY date_expiry DESC")
175 .bind(user_id)
176 .bind(current_time_plus_hour)
177 .fetch_one(&self.pool)
178 .await
179 .ok()
180 }
181
182 async fn count_users(&self) -> Result<i64, database::Error> {
183 query_as("SELECT COUNT(*) FROM torrust_users")
184 .fetch_one(&self.pool)
185 .await
186 .map(|(v,)| v)
187 .map_err(|_| database::Error::Error)
188 }
189
190 async fn ban_user(&self, user_id: i64, reason: &str, date_expiry: NaiveDateTime) -> Result<(), database::Error> {
191 let date_expiry_string = date_expiry.format("%Y-%m-%d %H:%M:%S").to_string();
193
194 query("INSERT INTO torrust_user_bans (user_id, reason, date_expiry) VALUES ($1, $2, $3)")
195 .bind(user_id)
196 .bind(reason)
197 .bind(date_expiry_string)
198 .execute(&self.pool)
199 .await
200 .map(|_| ())
201 .map_err(|_| database::Error::Error)
202 }
203
204 async fn grant_admin_role(&self, user_id: i64) -> Result<(), database::Error> {
205 query("UPDATE torrust_users SET administrator = TRUE WHERE user_id = ?")
206 .bind(user_id)
207 .execute(&self.pool)
208 .await
209 .map_err(|_| database::Error::Error)
210 .and_then(|v| {
211 if v.rows_affected() > 0 {
212 Ok(())
213 } else {
214 Err(database::Error::UserNotFound)
215 }
216 })
217 }
218
219 async fn verify_email(&self, user_id: i64) -> Result<(), database::Error> {
220 query("UPDATE torrust_user_profiles SET email_verified = TRUE WHERE user_id = ?")
221 .bind(user_id)
222 .execute(&self.pool)
223 .await
224 .map(|_| ())
225 .map_err(|_| database::Error::Error)
226 }
227
228 async fn add_tracker_key(&self, user_id: i64, tracker_key: &TrackerKey) -> Result<(), database::Error> {
229 let key = tracker_key.key.clone();
230
231 query("INSERT INTO torrust_tracker_keys (user_id, tracker_key, date_expiry) VALUES ($1, $2, $3)")
232 .bind(user_id)
233 .bind(key)
234 .bind(tracker_key.valid_until)
235 .execute(&self.pool)
236 .await
237 .map(|_| ())
238 .map_err(|_| database::Error::Error)
239 }
240
241 async fn delete_user(&self, user_id: i64) -> Result<(), database::Error> {
242 query("DELETE FROM torrust_users WHERE user_id = ?")
243 .bind(user_id)
244 .execute(&self.pool)
245 .await
246 .map_err(|_| database::Error::Error)
247 .and_then(|v| {
248 if v.rows_affected() > 0 {
249 Ok(())
250 } else {
251 Err(database::Error::UserNotFound)
252 }
253 })
254 }
255
256 async fn insert_category_and_get_id(&self, category_name: &str) -> Result<i64, database::Error> {
257 query("INSERT INTO torrust_categories (name) VALUES (?)")
258 .bind(category_name)
259 .execute(&self.pool)
260 .await
261 .map(|v| v.last_insert_rowid())
262 .map_err(|_| database::Error::Error)
263 }
264
265 async fn get_category_from_id(&self, category_id: i64) -> Result<Category, database::Error> {
266 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE category_id = ?")
267 .bind(category_id)
268 .fetch_one(&self.pool)
269 .await
270 .map_err(|_| database::Error::CategoryNotFound)
271 }
272
273 async fn get_category_from_name(&self, category_name: &str) -> Result<Category, database::Error> {
274 query_as::<_, Category>("SELECT category_id, name, (SELECT COUNT(*) FROM torrust_torrents WHERE torrust_torrents.category_id = torrust_categories.category_id) AS num_torrents FROM torrust_categories WHERE name = ?")
275 .bind(category_name)
276 .fetch_one(&self.pool)
277 .await
278 .map_err(|_| database::Error::CategoryNotFound)
279 }
280
281 async fn get_categories(&self) -> Result<Vec<Category>, database::Error> {
282 query_as::<_, Category>("SELECT tc.category_id, tc.name, COUNT(tt.category_id) as num_torrents FROM torrust_categories tc LEFT JOIN torrust_torrents tt on tc.category_id = tt.category_id GROUP BY tc.name;")
283 .fetch_all(&self.pool)
284 .await
285 .map_err(|_| database::Error::Error)
286 }
287
288 async fn delete_category(&self, category_name: &str) -> Result<(), database::Error> {
289 query("DELETE FROM torrust_categories WHERE name = ?")
290 .bind(category_name)
291 .execute(&self.pool)
292 .await
293 .map_err(|_| database::Error::Error)
294 .and_then(|v| {
295 if v.rows_affected() > 0 {
296 Ok(())
297 } else {
298 Err(database::Error::CategoryNotFound)
299 }
300 })
301 }
302
303 #[allow(clippy::too_many_lines)]
305 async fn get_torrents_search_sorted_paginated(
306 &self,
307 search: &Option<String>,
308 categories: &Option<Vec<String>>,
309 tags: &Option<Vec<String>>,
310 sort: &Sorting,
311 offset: u64,
312 limit: u8,
313 ) -> Result<TorrentsResponse, database::Error> {
314 let title = match search {
315 None => "%".to_string(),
316 Some(v) => format!("%{v}%"),
317 };
318
319 let sort_query: String = match sort {
320 Sorting::UploadedAsc => "date_uploaded ASC".to_string(),
321 Sorting::UploadedDesc => "date_uploaded DESC".to_string(),
322 Sorting::SeedersAsc => "seeders ASC".to_string(),
323 Sorting::SeedersDesc => "seeders DESC".to_string(),
324 Sorting::LeechersAsc => "leechers ASC".to_string(),
325 Sorting::LeechersDesc => "leechers DESC".to_string(),
326 Sorting::NameAsc => "title ASC".to_string(),
327 Sorting::NameDesc => "title DESC".to_string(),
328 Sorting::SizeAsc => "size ASC".to_string(),
329 Sorting::SizeDesc => "size DESC".to_string(),
330 };
331
332 let category_filter_query = if let Some(c) = categories {
333 let mut i = 0;
334 let mut category_filters = String::new();
335 for category in c {
336 if let Ok(sanitized_category) = self.get_category_from_name(category).await {
338 let mut str = format!("tc.name = '{}'", sanitized_category.name);
339 if i > 0 {
340 str = format!(" OR {str}");
341 }
342 category_filters.push_str(&str);
343 i += 1;
344 }
345 }
346 if category_filters.is_empty() {
347 String::new()
348 } else {
349 format!("INNER JOIN torrust_categories tc ON tt.category_id = tc.category_id AND ({category_filters}) ")
350 }
351 } else {
352 String::new()
353 };
354
355 let tag_filter_query = if let Some(t) = tags {
356 let mut i = 0;
357 let mut tag_filters = String::new();
358 for tag in t {
359 if let Ok(sanitized_tag) = self.get_tag_from_name(tag).await {
361 let mut str = format!("tl.tag_id = '{}'", sanitized_tag.tag_id);
362 if i > 0 {
363 str = format!(" OR {str}");
364 }
365 tag_filters.push_str(&str);
366 i += 1;
367 }
368 }
369 if tag_filters.is_empty() {
370 String::new()
371 } else {
372 format!("INNER JOIN torrust_torrent_tag_links tl ON tt.torrent_id = tl.torrent_id AND ({tag_filters}) ")
373 }
374 } else {
375 String::new()
376 };
377
378 let mut query_string = format!(
379 "SELECT
380 tt.torrent_id,
381 tp.username AS uploader,
382 tt.info_hash,
383 ti.title,
384 ti.description,
385 tt.category_id,
386 tt.date_uploaded,
387 tt.size AS file_size,
388 tt.name,
389 tt.comment,
390 tt.creation_date,
391 tt.created_by,
392 tt.`encoding`,
393 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
394 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
395 FROM torrust_torrents tt
396 {category_filter_query}
397 {tag_filter_query}
398 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
399 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
400 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
401 WHERE title LIKE ?
402 GROUP BY tt.torrent_id"
403 );
404
405 let count_query = format!("SELECT COUNT(*) as count FROM ({query_string}) AS count_table");
406
407 let count_result: Result<i64, database::Error> = query_as(&count_query)
408 .bind(title.clone())
409 .fetch_one(&self.pool)
410 .await
411 .map(|(v,)| v)
412 .map_err(|_| database::Error::Error);
413
414 let count = count_result?;
415
416 query_string = format!("{query_string} ORDER BY {sort_query} LIMIT ?, ?");
417
418 let res: Vec<TorrentListing> = sqlx::query_as::<_, TorrentListing>(&query_string)
419 .bind(title)
420 .bind(i64::saturating_add_unsigned(0, offset))
421 .bind(limit)
422 .fetch_all(&self.pool)
423 .await
424 .map_err(|_| database::Error::Error)?;
425
426 Ok(TorrentsResponse {
427 total: u32::try_from(count).expect("variable `count` is larger than u32"),
428 results: res,
429 })
430 }
431
432 #[allow(clippy::too_many_lines)]
433 async fn insert_torrent_and_get_id(
434 &self,
435 original_info_hash: &InfoHash,
436 torrent: &Torrent,
437 uploader_id: UserId,
438 metadata: &Metadata,
439 ) -> Result<i64, database::Error> {
440 let info_hash = torrent.canonical_info_hash_hex();
441 let canonical_info_hash = torrent.canonical_info_hash();
442
443 let mut conn = self.pool.acquire().await.map_err(|_| database::Error::Error)?;
445
446 let mut tx = conn.begin().await.map_err(|_| database::Error::Error)?;
448
449 let is_bep_30 = !matches!(&torrent.info.pieces, Some(_pieces));
452
453 let pieces = torrent.info.pieces.as_ref().map(|pieces| from_bytes(pieces.as_ref()));
454
455 let root_hash = torrent
456 .info
457 .root_hash
458 .as_ref()
459 .map(|root_hash| from_bytes(root_hash.as_ref()));
460
461 let torrent_id = query(
463 "INSERT INTO torrust_torrents (
464 uploader_id,
465 category_id,
466 info_hash,
467 size,
468 name,
469 pieces,
470 root_hash,
471 piece_length,
472 private,
473 is_bep_30,
474 `source`,
475 comment,
476 date_uploaded,
477 creation_date,
478 created_by,
479 `encoding`
480 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S',DATETIME('now', 'utc')), ?, ?, ?)",
481 )
482 .bind(uploader_id)
483 .bind(metadata.category_id)
484 .bind(info_hash.to_lowercase())
485 .bind(torrent.file_size())
486 .bind(torrent.info.name.to_string())
487 .bind(pieces)
488 .bind(root_hash)
489 .bind(torrent.info.piece_length)
490 .bind(torrent.info.private)
491 .bind(is_bep_30)
492 .bind(torrent.info.source.clone())
493 .bind(torrent.comment.clone())
494 .bind(torrent.creation_date)
495 .bind(torrent.created_by.clone())
496 .bind(torrent.encoding.clone())
497 .execute(&mut *tx)
498 .await
499 .map(|v| v.last_insert_rowid())
500 .map_err(|e| match e {
501 sqlx::Error::Database(err) => {
502 tracing::error!("DB error: {:?}", err);
503 if err.message().contains("UNIQUE") && err.message().contains("info_hash") {
504 database::Error::TorrentAlreadyExists
505 } else {
506 database::Error::Error
507 }
508 }
509 _ => database::Error::Error,
510 })?;
511
512 let insert_info_hash_result =
515 query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
516 .bind(original_info_hash.to_hex_string())
517 .bind(canonical_info_hash.to_hex_string())
518 .bind(true)
519 .execute(&mut *tx)
520 .await
521 .map(|_| ())
522 .map_err(|err| {
523 tracing::error!("DB error: {:?}", err);
524 database::Error::Error
525 });
526
527 if let Err(e) = insert_info_hash_result {
529 drop(tx.rollback().await);
530 return Err(e);
531 }
532
533 let insert_torrent_files_result = if let Some(length) = torrent.info.length {
536 query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length) VALUES (?, ?, ?)")
537 .bind(torrent.info.md5sum.clone())
538 .bind(torrent_id)
539 .bind(length)
540 .execute(&mut *tx)
541 .await
542 .map(|_| ())
543 .map_err(|_| database::Error::Error)
544 } else {
545 let files = torrent.info.files.as_ref().unwrap();
546
547 for file in files {
548 let path = file.path.join("/");
549
550 let _ = query("INSERT INTO torrust_torrent_files (md5sum, torrent_id, length, path) VALUES (?, ?, ?, ?)")
551 .bind(file.md5sum.clone())
552 .bind(torrent_id)
553 .bind(file.length)
554 .bind(path)
555 .execute(&mut *tx)
556 .await
557 .map_err(|_| database::Error::Error)?;
558 }
559
560 Ok(())
561 };
562
563 if let Err(e) = insert_torrent_files_result {
565 drop(tx.rollback().await);
566 return Err(e);
567 }
568
569 let insert_torrent_announce_urls_result: Result<(), database::Error> = if let Some(announce_urls) = &torrent.announce_list
572 {
573 let announce_urls = announce_urls.iter().flatten().collect::<Vec<&String>>();
575
576 for tracker_url in &announce_urls {
577 let () = query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
578 .bind(torrent_id)
579 .bind(tracker_url)
580 .execute(&mut *tx)
581 .await
582 .map(|_| ())
583 .map_err(|_| database::Error::Error)?;
584 }
585
586 Ok(())
587 } else {
588 let tracker_url = torrent.announce.as_ref().unwrap();
589
590 query("INSERT INTO torrust_torrent_announce_urls (torrent_id, tracker_url) VALUES (?, ?)")
591 .bind(torrent_id)
592 .bind(tracker_url)
593 .execute(&mut *tx)
594 .await
595 .map(|_| ())
596 .map_err(|_| database::Error::Error)
597 };
598
599 if let Err(e) = insert_torrent_announce_urls_result {
601 drop(tx.rollback().await);
602 return Err(e);
603 }
604
605 let insert_torrent_http_seeds_result: Result<(), database::Error> = if let Some(http_seeds) = &torrent.httpseeds {
608 for seed_url in http_seeds {
609 let () = query("INSERT INTO torrust_torrent_http_seeds (torrent_id, seed_url) VALUES (?, ?)")
610 .bind(torrent_id)
611 .bind(seed_url)
612 .execute(&mut *tx)
613 .await
614 .map(|_| ())
615 .map_err(|_| database::Error::Error)?;
616 }
617
618 Ok(())
619 } else {
620 Ok(())
621 };
622
623 if let Err(e) = insert_torrent_http_seeds_result {
625 drop(tx.rollback().await);
626 return Err(e);
627 }
628
629 let insert_torrent_nodes_result: Result<(), database::Error> = if let Some(nodes) = &torrent.nodes {
632 for node in nodes {
633 let () = query("INSERT INTO torrust_torrent_nodes (torrent_id, node_ip, node_port) VALUES (?, ?, ?)")
634 .bind(torrent_id)
635 .bind(node.0.clone())
636 .bind(node.1)
637 .execute(&mut *tx)
638 .await
639 .map(|_| ())
640 .map_err(|_| database::Error::Error)?;
641 }
642
643 Ok(())
644 } else {
645 Ok(())
646 };
647
648 if let Err(e) = insert_torrent_nodes_result {
650 drop(tx.rollback().await);
651 return Err(e);
652 }
653
654 for tag_id in &metadata.tags {
657 let insert_torrent_tag_result = query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
658 .bind(torrent_id)
659 .bind(tag_id)
660 .execute(&mut *tx)
661 .await
662 .map_err(|err| database::Error::ErrorWithText(err.to_string()));
663
664 if let Err(e) = insert_torrent_tag_result {
666 drop(tx.rollback().await);
667 return Err(e);
668 }
669 }
670
671 let insert_torrent_info_result =
672 query(r#"INSERT INTO torrust_torrent_info (torrent_id, title, description) VALUES (?, ?, NULLIF(?, ""))"#)
673 .bind(torrent_id)
674 .bind(metadata.title.clone())
675 .bind(metadata.description.clone())
676 .execute(&mut *tx)
677 .await
678 .map_err(|e| match e {
679 sqlx::Error::Database(err) => {
680 tracing::error!("DB error: {:?}", err);
681 if err.message().contains("UNIQUE") && err.message().contains("title") {
682 database::Error::TorrentTitleAlreadyExists
683 } else {
684 database::Error::Error
685 }
686 }
687 _ => database::Error::Error,
688 });
689
690 match insert_torrent_info_result {
692 Ok(_) => {
693 drop(tx.commit().await);
694 Ok(torrent_id)
695 }
696 Err(e) => {
697 drop(tx.rollback().await);
698 Err(e)
699 }
700 }
701 }
702
703 async fn get_torrent_canonical_info_hash_group(
704 &self,
705 canonical: &InfoHash,
706 ) -> Result<CanonicalInfoHashGroup, database::Error> {
707 let db_info_hashes = query_as::<_, DbTorrentInfoHash>(
708 "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE canonical_info_hash = ?",
709 )
710 .bind(canonical.to_hex_string())
711 .fetch_all(&self.pool)
712 .await
713 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
714
715 let info_hashes: Vec<InfoHash> = db_info_hashes
716 .into_iter()
717 .map(|db_info_hash| {
718 InfoHash::from_str(&db_info_hash.info_hash)
719 .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_info_hash.info_hash))
720 })
721 .collect();
722
723 Ok(CanonicalInfoHashGroup {
724 canonical_info_hash: *canonical,
725 original_info_hashes: info_hashes,
726 })
727 }
728
729 async fn find_canonical_info_hash_for(&self, info_hash: &InfoHash) -> Result<Option<InfoHash>, database::Error> {
730 let maybe_db_torrent_info_hash = query_as::<_, DbTorrentInfoHash>(
731 "SELECT info_hash, canonical_info_hash, original_is_known FROM torrust_torrent_info_hashes WHERE info_hash = ?",
732 )
733 .bind(info_hash.to_hex_string())
734 .fetch_optional(&self.pool)
735 .await
736 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
737
738 match maybe_db_torrent_info_hash {
739 Some(db_torrent_info_hash) => Ok(Some(
740 InfoHash::from_str(&db_torrent_info_hash.canonical_info_hash)
741 .unwrap_or_else(|_| panic!("Invalid info-hash in database: {}", db_torrent_info_hash.canonical_info_hash)),
742 )),
743 None => Ok(None),
744 }
745 }
746
747 async fn add_info_hash_to_canonical_info_hash_group(
748 &self,
749 original: &InfoHash,
750 canonical: &InfoHash,
751 ) -> Result<(), database::Error> {
752 query("INSERT INTO torrust_torrent_info_hashes (info_hash, canonical_info_hash, original_is_known) VALUES (?, ?, ?)")
753 .bind(original.to_hex_string())
754 .bind(canonical.to_hex_string())
755 .bind(true)
756 .execute(&self.pool)
757 .await
758 .map(|_| ())
759 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
760 }
761
762 async fn get_torrent_info_from_id(&self, torrent_id: i64) -> Result<DbTorrent, database::Error> {
763 query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE torrent_id = ?")
764 .bind(torrent_id)
765 .fetch_one(&self.pool)
766 .await
767 .map_err(|_| database::Error::TorrentNotFound)
768 }
769
770 async fn get_torrent_info_from_info_hash(&self, info_hash: &InfoHash) -> Result<DbTorrent, database::Error> {
771 query_as::<_, DbTorrent>("SELECT * FROM torrust_torrents WHERE info_hash = ?")
772 .bind(info_hash.to_hex_string().to_lowercase())
773 .fetch_one(&self.pool)
774 .await
775 .map_err(|_| database::Error::TorrentNotFound)
776 }
777
778 async fn get_torrent_files_from_id(&self, torrent_id: i64) -> Result<Vec<TorrentFile>, database::Error> {
779 let db_torrent_files =
780 query_as::<_, DbTorrentFile>("SELECT md5sum, length, path FROM torrust_torrent_files WHERE torrent_id = ?")
781 .bind(torrent_id)
782 .fetch_all(&self.pool)
783 .await
784 .map_err(|_| database::Error::TorrentNotFound)?;
785
786 let torrent_files: Vec<TorrentFile> = db_torrent_files
787 .into_iter()
788 .map(|tf| TorrentFile {
789 path: tf
790 .path
791 .unwrap_or_default()
792 .split('/')
793 .map(std::string::ToString::to_string)
794 .collect(),
795 length: tf.length,
796 md5sum: tf.md5sum,
797 })
798 .collect();
799
800 Ok(torrent_files)
801 }
802
803 async fn get_torrent_announce_urls_from_id(&self, torrent_id: i64) -> Result<Vec<Vec<String>>, database::Error> {
804 query_as::<_, DbTorrentAnnounceUrl>("SELECT tracker_url FROM torrust_torrent_announce_urls WHERE torrent_id = ?")
805 .bind(torrent_id)
806 .fetch_all(&self.pool)
807 .await
808 .map(|v| v.iter().map(|a| vec![a.tracker_url.to_string()]).collect())
809 .map_err(|_| database::Error::TorrentNotFound)
810 }
811
812 async fn get_torrent_http_seed_urls_from_id(&self, torrent_id: i64) -> Result<Vec<String>, database::Error> {
813 query_as::<_, DbTorrentHttpSeedUrl>("SELECT seed_url FROM torrust_torrent_http_seeds WHERE torrent_id = ?")
814 .bind(torrent_id)
815 .fetch_all(&self.pool)
816 .await
817 .map(|v| v.iter().map(|a| a.seed_url.to_string()).collect())
818 .map_err(|_| database::Error::TorrentNotFound)
819 }
820
821 async fn get_torrent_nodes_from_id(&self, torrent_id: i64) -> Result<Vec<(String, i64)>, database::Error> {
822 query_as::<_, DbTorrentNode>("SELECT node_ip, node_port FROM torrust_torrent_nodes WHERE torrent_id = ?")
823 .bind(torrent_id)
824 .fetch_all(&self.pool)
825 .await
826 .map(|v| v.iter().map(|a| (a.node_ip.to_string(), a.node_port)).collect())
827 .map_err(|_| database::Error::TorrentNotFound)
828 }
829
830 async fn get_torrent_listing_from_id(&self, torrent_id: i64) -> Result<TorrentListing, database::Error> {
831 query_as::<_, TorrentListing>(
832 "SELECT
833 tt.torrent_id,
834 tp.username AS uploader,
835 tt.info_hash, ti.title,
836 ti.description,
837 tt.category_id,
838 tt.date_uploaded,
839 tt.size AS file_size,
840 tt.name,
841 tt.comment,
842 tt.creation_date,
843 tt.created_by,
844 tt.`encoding`,
845 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
846 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
847 FROM torrust_torrents tt
848 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
849 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
850 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
851 WHERE tt.torrent_id = ?
852 GROUP BY ts.torrent_id",
853 )
854 .bind(torrent_id)
855 .fetch_one(&self.pool)
856 .await
857 .map_err(|_| database::Error::TorrentNotFound)
858 }
859
860 async fn get_torrent_listing_from_info_hash(&self, info_hash: &InfoHash) -> Result<TorrentListing, database::Error> {
861 query_as::<_, TorrentListing>(
862 "SELECT
863 tt.torrent_id,
864 tp.username AS uploader,
865 tt.info_hash, ti.title,
866 ti.description,
867 tt.category_id,
868 tt.date_uploaded,
869 tt.size AS file_size,
870 tt.name,
871 tt.comment,
872 tt.creation_date,
873 tt.created_by,
874 tt.`encoding`,
875 CAST(COALESCE(sum(ts.seeders),0) as signed) as seeders,
876 CAST(COALESCE(sum(ts.leechers),0) as signed) as leechers
877 FROM torrust_torrents tt
878 INNER JOIN torrust_user_profiles tp ON tt.uploader_id = tp.user_id
879 INNER JOIN torrust_torrent_info ti ON tt.torrent_id = ti.torrent_id
880 LEFT JOIN torrust_torrent_tracker_stats ts ON tt.torrent_id = ts.torrent_id
881 WHERE tt.info_hash = ?
882 GROUP BY ts.torrent_id",
883 )
884 .bind(info_hash.to_string().to_lowercase())
885 .fetch_one(&self.pool)
886 .await
887 .map_err(|_| database::Error::TorrentNotFound)
888 }
889
890 async fn get_all_torrents_compact(&self) -> Result<Vec<TorrentCompact>, database::Error> {
891 query_as::<_, TorrentCompact>("SELECT torrent_id, info_hash FROM torrust_torrents")
892 .fetch_all(&self.pool)
893 .await
894 .map_err(|_| database::Error::Error)
895 }
896
897 async fn get_torrents_with_stats_not_updated_since(
898 &self,
899 datetime: DateTime<Utc>,
900 limit: i64,
901 ) -> Result<Vec<TorrentCompact>, database::Error> {
902 query_as::<_, TorrentCompact>(
903 "SELECT tt.torrent_id, tt.info_hash
904 FROM torrust_torrents tt
905 LEFT JOIN torrust_torrent_tracker_stats tts ON tt.torrent_id = tts.torrent_id
906 WHERE tts.updated_at < ? OR tts.updated_at IS NULL
907 ORDER BY tts.updated_at ASC
908 LIMIT ?
909 ",
910 )
911 .bind(datetime.format(DATETIME_FORMAT).to_string())
912 .bind(limit)
913 .fetch_all(&self.pool)
914 .await
915 .map_err(|_| database::Error::Error)
916 }
917
918 async fn update_torrent_title(&self, torrent_id: i64, title: &str) -> Result<(), database::Error> {
919 query("UPDATE torrust_torrent_info SET title = $1 WHERE torrent_id = $2")
920 .bind(title)
921 .bind(torrent_id)
922 .execute(&self.pool)
923 .await
924 .map_err(|e| match e {
925 sqlx::Error::Database(err) => {
926 tracing::error!("DB error: {:?}", err);
927 if err.message().contains("UNIQUE") && err.message().contains("title") {
928 database::Error::TorrentTitleAlreadyExists
929 } else {
930 database::Error::Error
931 }
932 }
933 _ => database::Error::Error,
934 })
935 .and_then(|v| {
936 if v.rows_affected() > 0 {
937 Ok(())
938 } else {
939 Err(database::Error::TorrentNotFound)
940 }
941 })
942 }
943
944 async fn update_torrent_description(&self, torrent_id: i64, description: &str) -> Result<(), database::Error> {
945 query("UPDATE torrust_torrent_info SET description = $1 WHERE torrent_id = $2")
946 .bind(description)
947 .bind(torrent_id)
948 .execute(&self.pool)
949 .await
950 .map_err(|_| database::Error::Error)
951 .and_then(|v| {
952 if v.rows_affected() > 0 {
953 Ok(())
954 } else {
955 Err(database::Error::TorrentNotFound)
956 }
957 })
958 }
959
960 async fn update_torrent_category(&self, torrent_id: i64, category_id: CategoryId) -> Result<(), database::Error> {
961 query("UPDATE torrust_torrents SET category_id = $1 WHERE torrent_id = $2")
962 .bind(category_id)
963 .bind(torrent_id)
964 .execute(&self.pool)
965 .await
966 .map_err(|_| database::Error::Error)
967 .and_then(|v| {
968 if v.rows_affected() > 0 {
969 Ok(())
970 } else {
971 Err(database::Error::TorrentNotFound)
972 }
973 })
974 }
975
976 async fn insert_tag_and_get_id(&self, tag_name: &str) -> Result<i64, database::Error> {
977 query("INSERT INTO torrust_torrent_tags (name) VALUES (?)")
978 .bind(tag_name)
979 .execute(&self.pool)
980 .await
981 .map(|v| v.last_insert_rowid())
982 .map_err(|e| match e {
983 sqlx::Error::Database(err) => {
984 tracing::error!("DB error: {:?}", err);
985 if err.message().contains("UNIQUE") && err.message().contains("name") {
986 database::Error::TagAlreadyExists
987 } else {
988 database::Error::Error
989 }
990 }
991 _ => database::Error::Error,
992 })
993 }
994
995 async fn delete_tag(&self, tag_id: TagId) -> Result<(), database::Error> {
996 query("DELETE FROM torrust_torrent_tags WHERE tag_id = ?")
997 .bind(tag_id)
998 .execute(&self.pool)
999 .await
1000 .map(|_| ())
1001 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1002 }
1003
1004 async fn add_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1005 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1006 .bind(torrent_id)
1007 .bind(tag_id)
1008 .execute(&self.pool)
1009 .await
1010 .map(|_| ())
1011 .map_err(|_| database::Error::Error)
1012 }
1013
1014 async fn add_torrent_tag_links(&self, torrent_id: i64, tag_ids: &[TagId]) -> Result<(), database::Error> {
1015 let mut tx = self
1016 .pool
1017 .begin()
1018 .await
1019 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1020
1021 for tag_id in tag_ids {
1022 query("INSERT INTO torrust_torrent_tag_links (torrent_id, tag_id) VALUES (?, ?)")
1023 .bind(torrent_id)
1024 .bind(tag_id)
1025 .execute(&mut *tx)
1026 .await
1027 .map_err(|err| database::Error::ErrorWithText(err.to_string()))?;
1028 }
1029
1030 tx.commit()
1031 .await
1032 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1033 }
1034
1035 async fn delete_torrent_tag_link(&self, torrent_id: i64, tag_id: TagId) -> Result<(), database::Error> {
1036 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ? AND tag_id = ?")
1037 .bind(torrent_id)
1038 .bind(tag_id)
1039 .execute(&self.pool)
1040 .await
1041 .map(|_| ())
1042 .map_err(|_| database::Error::Error)
1043 }
1044
1045 async fn delete_all_torrent_tag_links(&self, torrent_id: i64) -> Result<(), database::Error> {
1046 query("DELETE FROM torrust_torrent_tag_links WHERE torrent_id = ?")
1047 .bind(torrent_id)
1048 .execute(&self.pool)
1049 .await
1050 .map(|_| ())
1051 .map_err(|err| database::Error::ErrorWithText(err.to_string()))
1052 }
1053
1054 async fn get_tag_from_name(&self, name: &str) -> Result<TorrentTag, database::Error> {
1055 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags WHERE name = ?")
1056 .bind(name)
1057 .fetch_one(&self.pool)
1058 .await
1059 .map_err(|_| database::Error::TagNotFound)
1060 }
1061
1062 async fn get_tags(&self) -> Result<Vec<TorrentTag>, database::Error> {
1063 query_as::<_, TorrentTag>("SELECT tag_id, name FROM torrust_torrent_tags")
1064 .fetch_all(&self.pool)
1065 .await
1066 .map_err(|_| database::Error::Error)
1067 }
1068
1069 async fn get_tags_for_torrent_id(&self, torrent_id: i64) -> Result<Vec<TorrentTag>, database::Error> {
1070 query_as::<_, TorrentTag>(
1071 "SELECT torrust_torrent_tags.tag_id, torrust_torrent_tags.name
1072 FROM torrust_torrent_tags
1073 JOIN torrust_torrent_tag_links ON torrust_torrent_tags.tag_id = torrust_torrent_tag_links.tag_id
1074 WHERE torrust_torrent_tag_links.torrent_id = ?",
1075 )
1076 .bind(torrent_id)
1077 .fetch_all(&self.pool)
1078 .await
1079 .map_err(|_| database::Error::Error)
1080 }
1081
1082 async fn update_tracker_info(
1083 &self,
1084 torrent_id: i64,
1085 tracker_url: &Url,
1086 seeders: i64,
1087 leechers: i64,
1088 ) -> Result<(), database::Error> {
1089 query("REPLACE INTO torrust_torrent_tracker_stats (torrent_id, tracker_url, seeders, leechers, updated_at) VALUES ($1, $2, $3, $4, $5)")
1090 .bind(torrent_id)
1091 .bind(tracker_url.to_string())
1092 .bind(seeders)
1093 .bind(leechers)
1094 .bind(datetime_now())
1095 .execute(&self.pool)
1096 .await
1097 .map(|_| ())
1098 .map_err(|_| database::Error::TorrentNotFound)
1099 }
1100
1101 async fn delete_torrent(&self, torrent_id: i64) -> Result<(), database::Error> {
1102 query("DELETE FROM torrust_torrents WHERE torrent_id = ?")
1103 .bind(torrent_id)
1104 .execute(&self.pool)
1105 .await
1106 .map_err(|_| database::Error::Error)
1107 .and_then(|v| {
1108 if v.rows_affected() > 0 {
1109 Ok(())
1110 } else {
1111 Err(database::Error::TorrentNotFound)
1112 }
1113 })
1114 }
1115
1116 async fn delete_all_database_rows(&self) -> Result<(), database::Error> {
1117 for table in TABLES_TO_TRUNCATE {
1118 query(&format!("DELETE FROM {table};"))
1119 .execute(&self.pool)
1120 .await
1121 .map_err(|_| database::Error::Error)?;
1122 }
1123
1124 Ok(())
1125 }
1126}