use std::sync::Arc;
use std::time::Instant;
use chrono::{DateTime, Utc};
use text_colorizer::Colorize;
use tracing::{debug, error, info};
use url::Url;
use super::service::{Service, TorrentInfo, TrackerAPIError};
use crate::config::Configuration;
use crate::databases::database::{self, Database};
const LOG_TARGET: &str = "Tracker Stats Importer";
pub struct StatisticsImporter {
database: Arc<Box<dyn Database>>,
tracker_service: Arc<Service>,
tracker_url: Url,
}
impl StatisticsImporter {
pub async fn new(cfg: Arc<Configuration>, tracker_service: Arc<Service>, database: Arc<Box<dyn Database>>) -> Self {
let settings = cfg.settings.read().await;
let tracker_url = settings.tracker.url.clone();
drop(settings);
Self {
database,
tracker_service,
tracker_url,
}
}
pub async fn import_all_torrents_statistics(&self) -> Result<(), database::Error> {
let torrents = self.database.get_all_torrents_compact().await?;
if torrents.is_empty() {
return Ok(());
}
info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.to_string().yellow());
let start_time = Instant::now();
for torrent in torrents {
info!(target: LOG_TARGET, "Importing torrent #{} statistics ...", torrent.torrent_id.to_string().yellow());
let ret = self.import_torrent_statistics(torrent.torrent_id, &torrent.info_hash).await;
if let Some(err) = ret.err() {
if err != TrackerAPIError::TorrentNotFound {
let message = format!(
"Error updating torrent tracker stats for torrent. Torrent: id {}; infohash {}. Error: {:?}",
torrent.torrent_id, torrent.info_hash, err
);
error!(target: "statistics_importer", "{}", message);
}
}
}
let elapsed_time = start_time.elapsed();
info!(target: LOG_TARGET, "Statistics import completed in {:.2?}", elapsed_time);
Ok(())
}
pub async fn import_torrents_statistics_not_updated_since(
&self,
datetime: DateTime<Utc>,
limit: i64,
) -> Result<(), database::Error> {
debug!(target: LOG_TARGET, "Importing torrents statistics not updated since {} limited to a maximum of {} torrents ...", datetime.to_string().yellow(), limit.to_string().yellow());
let torrents = self
.database
.get_torrents_with_stats_not_updated_since(datetime, limit)
.await?;
if torrents.is_empty() {
return Ok(());
}
info!(target: LOG_TARGET, "Importing {} torrents statistics from tracker {} ...", torrents.len().to_string().yellow(), self.tracker_url.to_string().yellow());
let info_hashes: Vec<String> = torrents.iter().map(|t| t.info_hash.clone()).collect();
let torrent_info_vec = match self.tracker_service.get_torrents_info(&info_hashes).await {
Ok(torrents_info) => torrents_info,
Err(err) => {
let message = format!("Error getting torrents tracker stats. Error: {err:?}");
error!(target: LOG_TARGET, "{}", message);
return Ok(());
}
};
for torrent in torrents {
match torrent_info_vec.iter().find(|t| t.info_hash == torrent.info_hash) {
None => {
drop(
self.database
.update_tracker_info(torrent.torrent_id, &self.tracker_url, 0, 0)
.await,
);
}
Some(torrent_info) => {
drop(
self.database
.update_tracker_info(
torrent.torrent_id,
&self.tracker_url,
torrent_info.seeders,
torrent_info.leechers,
)
.await,
);
}
}
}
Ok(())
}
pub async fn import_torrent_statistics(&self, torrent_id: i64, info_hash: &str) -> Result<TorrentInfo, TrackerAPIError> {
match self.tracker_service.get_torrent_info(info_hash).await {
Ok(torrent_info) => {
drop(
self.database
.update_tracker_info(torrent_id, &self.tracker_url, torrent_info.seeders, torrent_info.leechers)
.await,
);
Ok(torrent_info)
}
Err(err) => {
drop(self.database.update_tracker_info(torrent_id, &self.tracker_url, 0, 0).await);
Err(err)
}
}
}
}