use futures::StreamExt as _;
use futures::TryStreamExt as _;
pub struct Global {
pub config: crate::Config,
pub http: reqwest::Client,
pub now: std::time::SystemTime,
pub qbt: url::Url,
}
impl Global {
pub async fn rules(
&self,
t: &crate::Torrent,
force_pinned: bool,
) -> anyhow::Result<crate::RuleResult> {
let last_activity = self.now.duration_since(t.last_activity)
.unwrap_or(std::time::Duration::ZERO);
let mut categories_allowed = crate::Requirement::new(
|c| c.contains(&t.category),
&self.config.categories_allowed);
let mut last_activity_min = crate::Requirement::new(
|&l| last_activity >= l,
&self.config.last_activity_min);
let mut pinned = crate::Requirement::new(
|&v| v,
&None);
let mut seed_time_min = crate::Requirement::new(
|&c| t.seeding_time > c,
&self.config.seed_time_min);
let mut seeder_count_min = crate::Requirement::new(
|&s| t.seeders >= s,
&self.config.seeder_count_min);
let mut state_allowed = crate::Requirement::new(
|s| s.is_empty() || s.contains(&t.state),
&self.config.state_allowed);
if force_pinned {
pinned.add_definite_match(&Some(Some(true)));
}
let mut include_in_score = crate::Requirement::new(
|&v| v,
&true);
for rule in self.config.rules.iter().rev() {
let pin_relevant = pinned.is_relevant(&rule.pin.map(Some))
|| (pinned.is_possible(None) && (
categories_allowed.is_relevant(&rule.categories_allowed)
|| last_activity_min.is_relevant(&rule.last_activity_min)
|| seed_time_min.is_relevant(&rule.seed_time_min)
|| seeder_count_min.is_relevant(&rule.seeder_count_min)
|| state_allowed.is_relevant(&rule.state_allowed)));
if !(pin_relevant || include_in_score.is_relevant(&rule.include_in_score)) {
continue
}
let Some(r) = rule.match_.match_fast(self.now, &t) else {
categories_allowed.add_possible_match(&rule.categories_allowed);
last_activity_min.add_possible_match(&rule.last_activity_min);
pinned.add_possible_match(&rule.pin.map(Some));
seed_time_min.add_possible_match(&rule.seed_time_min);
seeder_count_min.add_possible_match(&rule.seeder_count_min);
state_allowed.add_possible_match(&rule.state_allowed);
include_in_score.add_possible_match(&rule.include_in_score);
continue;
};
if !r { continue }
categories_allowed.add_definite_match(&rule.categories_allowed);
last_activity_min.add_definite_match(&rule.last_activity_min);
pinned.add_definite_match(&rule.pin.map(Some));
seed_time_min.add_definite_match(&rule.seed_time_min);
seeder_count_min.add_definite_match(&rule.seeder_count_min);
state_allowed.add_definite_match(&rule.state_allowed);
include_in_score.add_definite_match(&rule.include_in_score);
if let Some(Some(pinned)) = pinned.get_finalized()
&& let Some(include_in_score) = include_in_score.get_finalized()
{
return Ok(crate::RuleResult {
include_in_score,
pinned: pinned.then_some(crate::PinReason::Explicit),
})
}
}
if let Some(include_in_score) = include_in_score.get_definite()
&& let Some(pinned) =
if let Some(Some(pinned)) = pinned.get_definite() {
Some(pinned.then_some(crate::PinReason::Explicit))
} else if pinned.is_possible(Some(false)) {
None
} else if categories_allowed.get_definite() == Some(false) {
Some(Some(crate::PinReason::Category(t.category.clone())))
} else if state_allowed.get_definite() == Some(false) {
Some(Some(crate::PinReason::State(t.state)))
} else if seed_time_min.get_definite() == Some(false) {
Some(Some(crate::PinReason::SeedTime(t.seeding_time)))
} else if last_activity_min.get_definite() == Some(false) {
Some(Some(crate::PinReason::Activity(last_activity)))
} else if seeder_count_min.get_definite() == Some(false) {
Some(Some(crate::PinReason::Seeders(t.seeders)))
} else if pinned.is_possible(Some(true)) {
None
} else if categories_allowed.is_possible(false)
|| state_allowed.is_possible(false)
|| seed_time_min.is_possible(false)
|| last_activity_min.is_possible(false)
|| seeder_count_min.is_possible(false)
{
None
} else {
Some(None)
}
{
return Ok(crate::RuleResult {
include_in_score,
pinned,
})
}
categories_allowed.reset();
last_activity_min.reset();
pinned.reset();
seed_time_min.reset();
seeder_count_min.reset();
state_allowed.reset();
if force_pinned {
pinned.add_definite_match(&Some(Some(true)));
}
include_in_score.reset();
let trackers = self.http.get(self.qbt.join("/api/v2/torrents/trackers")?)
.query(&[
("hash", t.hash),
])
.send().await?
.error_for_status()?
.json::<Vec<crate::Tracker>>().await?;
for rule in self.config.rules.iter().rev() {
let pin_relevant = pinned.is_relevant(&rule.pin.map(Some))
|| (pinned.is_possible(None) && (
categories_allowed.is_relevant(&rule.categories_allowed)
|| last_activity_min.is_relevant(&rule.last_activity_min)
|| seed_time_min.is_relevant(&rule.seed_time_min)
|| seeder_count_min.is_relevant(&rule.seeder_count_min)
|| state_allowed.is_relevant(&rule.state_allowed)));
if !(pin_relevant || include_in_score.is_relevant(&rule.include_in_score)) {
continue
}
if let Some(r) = rule.match_.match_fast(self.now, &t) {
if !r {
continue
}
} else if !rule.match_.match_slow(&t, &trackers) {
continue
}
categories_allowed.add_definite_match(&rule.categories_allowed);
last_activity_min.add_definite_match(&rule.last_activity_min);
pinned.add_definite_match(&rule.pin.map(Some));
seed_time_min.add_definite_match(&rule.seed_time_min);
seeder_count_min.add_definite_match(&rule.seeder_count_min);
state_allowed.add_definite_match(&rule.state_allowed);
include_in_score.add_definite_match(&rule.include_in_score);
}
let pinned = if let Some(pinned) = pinned.get_definite().unwrap() {
pinned.then_some(crate::PinReason::Explicit)
} else if categories_allowed.get_definite().unwrap() == false {
Some(crate::PinReason::Category(t.category.clone()))
} else if state_allowed.get_definite().unwrap() == false {
Some(crate::PinReason::State(t.state))
} else if seed_time_min.get_definite().unwrap() == false {
Some(crate::PinReason::SeedTime(t.seeding_time))
} else if last_activity_min.get_definite().unwrap() == false {
Some(crate::PinReason::Activity(last_activity))
} else if seeder_count_min.get_definite().unwrap() == false {
Some(crate::PinReason::Seeders(t.seeders))
} else {
None
};
Ok(crate::RuleResult {
include_in_score: include_in_score.get_definite().unwrap(),
pinned,
})
}
pub async fn group(
&self,
filter_name: Option<®ex::Regex>,
) -> anyhow::Result<impl Iterator<Item=crate::Group>> {
let mut torrents = self.http.get(self.qbt.join("/api/v2/torrents/info")?)
.send().await?
.error_for_status()?
.json::<Vec<crate::Torrent>>().await?;
torrents.retain(|t| {
let Some(filter_name) = filter_name else { return true };
filter_name.is_match(&t.name)
});
let groups = std::sync::Mutex::<
std::collections::HashMap::<
u64,
std::sync::Arc<
tokio::sync::Mutex<crate::Group>>>>::default();
#[derive(Debug,serde::Deserialize)]
struct File {
size: u64,
}
let () = futures::stream::iter(torrents)
.map(async |t| -> anyhow::Result<()> {
if t.state.is_downloading_metadata() {
return Ok(())
}
let files = self.http.get(self.qbt.join("/api/v2/torrents/files")?)
.query(&[
("hash", t.hash),
])
.send().await?
.error_for_status()?
.json::<Vec<File>>().await?;
let total_size = u64::try_from(t.total_size)
.map_err(|_| anyhow::format_err!(
"Torrent {} {:?} has negative size {}.",
t.hash,
t.name,
t.total_size))?;
let average_size = total_size / u64::try_from(files.len())?;
let cutoff = average_size / 2;
let mut counted_size = 0;
for f in files {
if f.size >= cutoff {
counted_size += f.size;
}
}
let group = {
groups
.lock().unwrap()
.entry(counted_size)
.or_insert_with(||
std::sync::Arc::new(
crate::Group::new(counted_size).into()))
.clone()
};
let mut group = group.lock().await;
let rules = self.rules(&t, group.pinned.is_some()).await?;
if group.pinned.is_none() {
group.pinned = rules.pinned;
}
group.torrents.push(crate::GroupTorrent {
include_in_score: rules.include_in_score,
torrent: t,
});
Ok(())
})
.buffer_unordered(self.config.parallelism)
.try_collect().await?;
Ok(groups.into_inner()?
.into_iter()
.map(|(_, g)| {
std::sync::Arc::into_inner(g).unwrap()
.into_inner()
}))
}
}