use futures::StreamExt as _;
use futures::TryStreamExt as _;
pub struct Global<Q: crate::Qbt> {
pub config: crate::Config,
pub now: std::time::SystemTime,
pub qbt: Q,
}
impl<Q: crate::Qbt> Global<Q> {
pub async fn rules(
&self,
t: &crate::Torrent,
force_pinned: bool,
) -> anyhow::Result<crate::RuleResult> {
let last_activity = self.now.duration_since(t.last_activity)
.unwrap_or(std::time::Duration::ZERO);
let mut categories_allowed = crate::Requirement::new(
|c| c.contains(&t.category),
&self.config.categories_allowed);
let mut last_activity_min = crate::Requirement::new(
|&l| last_activity >= l,
&self.config.last_activity_min);
let mut pinned = crate::Requirement::new(
|&v| v,
&None);
let mut pin_reason: Option<Option<String>> = None;
let mut seed_time_min = crate::Requirement::new(
|&c| t.seeding_time > c,
&self.config.seed_time_min);
let mut seeder_count_min = crate::Requirement::new(
|&s| t.seeders >= s,
&self.config.seeder_count_min);
let mut state_allowed = crate::Requirement::new(
|s| s.is_empty() || s.contains(&t.state),
&self.config.state_allowed);
if force_pinned {
pinned.add_definite_match(&Some(Some(false)));
}
let mut include_in_score = crate::Requirement::new(
|&v| v,
&true);
for rule in self.config.rules.iter().rev() {
let pin_relevant = pinned.is_relevant(&rule.pin.map(Some))
|| (pinned.is_possible(None) && (
categories_allowed.is_relevant(&rule.categories_allowed)
|| last_activity_min.is_relevant(&rule.last_activity_min)
|| seed_time_min.is_relevant(&rule.seed_time_min)
|| seeder_count_min.is_relevant(&rule.seeder_count_min)
|| state_allowed.is_relevant(&rule.state_allowed)));
if !(pin_relevant || include_in_score.is_relevant(&rule.include_in_score)) {
continue
}
let Some(r) = rule.match_.match_fast(self.now, t) else {
categories_allowed.add_possible_match(&rule.categories_allowed);
last_activity_min.add_possible_match(&rule.last_activity_min);
pinned.add_possible_match(&rule.pin.map(Some));
seed_time_min.add_possible_match(&rule.seed_time_min);
seeder_count_min.add_possible_match(&rule.seeder_count_min);
state_allowed.add_possible_match(&rule.state_allowed);
include_in_score.add_possible_match(&rule.include_in_score);
continue;
};
if !r { continue }
categories_allowed.add_definite_match(&rule.categories_allowed);
last_activity_min.add_definite_match(&rule.last_activity_min);
if pinned.add_definite_match(&rule.pin.map(Some)) {
pin_reason = Some(rule.pin_reason.clone());
}
seed_time_min.add_definite_match(&rule.seed_time_min);
seeder_count_min.add_definite_match(&rule.seeder_count_min);
state_allowed.add_definite_match(&rule.state_allowed);
include_in_score.add_definite_match(&rule.include_in_score);
if let Some(Some(pinned)) = pinned.get_finalized()
&& let Some(include_in_score) = include_in_score.get_finalized()
{
return Ok(crate::RuleResult {
include_in_score,
pin_reason: pin_reason.clone().flatten(),
pinned: pinned.then_some(crate::PinReason::Explicit),
})
}
}
if let Some(include_in_score) = include_in_score.get_definite()
&& let Some((pinned, pin_reason)) =
if let Some(Some(pinned)) = pinned.get_definite() {
Some((pinned.then_some(crate::PinReason::Explicit), pin_reason.clone().flatten()))
} else if pinned.is_possible(Some(false)) {
None
} else if categories_allowed.get_definite() == Some(false) {
Some((Some(crate::PinReason::Category(t.category.clone())), None))
} else if state_allowed.get_definite() == Some(false) {
Some((Some(crate::PinReason::State(t.state)), None))
} else if seed_time_min.get_definite() == Some(false) {
Some((Some(crate::PinReason::SeedTime(t.seeding_time)), None))
} else if last_activity_min.get_definite() == Some(false) {
Some((Some(crate::PinReason::Activity(last_activity)), None))
} else if seeder_count_min.get_definite() == Some(false) {
Some((Some(crate::PinReason::Seeders(t.seeders)), None))
} else if pinned.is_possible(Some(true)) {
None
} else if categories_allowed.is_possible(false)
|| state_allowed.is_possible(false)
|| seed_time_min.is_possible(false)
|| last_activity_min.is_possible(false)
|| seeder_count_min.is_possible(false)
{
None
} else {
Some((None, None))
}
{
return Ok(crate::RuleResult {
include_in_score,
pin_reason,
pinned,
})
}
categories_allowed.reset();
last_activity_min.reset();
pinned.reset();
pin_reason = None;
seed_time_min.reset();
seeder_count_min.reset();
state_allowed.reset();
if force_pinned {
pinned.add_definite_match(&Some(Some(false)));
}
include_in_score.reset();
let trackers = self.qbt.torrent_trackers(t.hash).await?;
for rule in self.config.rules.iter().rev() {
let pin_relevant = pinned.is_relevant(&rule.pin.map(Some))
|| (pinned.is_possible(None) && (
categories_allowed.is_relevant(&rule.categories_allowed)
|| last_activity_min.is_relevant(&rule.last_activity_min)
|| seed_time_min.is_relevant(&rule.seed_time_min)
|| seeder_count_min.is_relevant(&rule.seeder_count_min)
|| state_allowed.is_relevant(&rule.state_allowed)));
if !(pin_relevant || include_in_score.is_relevant(&rule.include_in_score)) {
continue
}
if let Some(r) = rule.match_.match_fast(self.now, t) {
if !r {
continue
}
} else if !rule.match_.match_slow(&trackers) {
continue
}
categories_allowed.add_definite_match(&rule.categories_allowed);
last_activity_min.add_definite_match(&rule.last_activity_min);
if pinned.add_definite_match(&rule.pin.map(Some)) {
pin_reason = Some(rule.pin_reason.clone());
}
seed_time_min.add_definite_match(&rule.seed_time_min);
seeder_count_min.add_definite_match(&rule.seeder_count_min);
state_allowed.add_definite_match(&rule.state_allowed);
include_in_score.add_definite_match(&rule.include_in_score);
}
let pinned = if let Some(pinned) = pinned.get_definite().unwrap() {
pinned.then_some(crate::PinReason::Explicit)
} else if !categories_allowed.get_definite().unwrap() {
pin_reason = None;
Some(crate::PinReason::Category(t.category.clone()))
} else if !state_allowed.get_definite().unwrap() {
pin_reason = None;
Some(crate::PinReason::State(t.state))
} else if !seed_time_min.get_definite().unwrap() {
pin_reason = None;
Some(crate::PinReason::SeedTime(t.seeding_time))
} else if !last_activity_min.get_definite().unwrap() {
pin_reason = None;
Some(crate::PinReason::Activity(last_activity))
} else if !seeder_count_min.get_definite().unwrap() {
pin_reason = None;
Some(crate::PinReason::Seeders(t.seeders))
} else {
None
};
Ok(crate::RuleResult {
include_in_score: include_in_score.get_definite().unwrap(),
pin_reason: pin_reason.flatten(),
pinned,
})
}
pub async fn group(
&self,
filter_name: Option<®ex::Regex>,
) -> anyhow::Result<impl Iterator<Item=crate::Group>> {
let mut torrents = self.qbt.torrents_info().await?;
torrents.retain(|t| {
let Some(filter_name) = filter_name else { return true };
filter_name.is_match(&t.name)
});
let groups = std::sync::Mutex::<
std::collections::HashMap::<
u64,
std::sync::Arc<
tokio::sync::Mutex<crate::Group>>>>::default();
let () = futures::stream::iter(torrents)
.map(async |t| -> anyhow::Result<()> {
if t.state.is_downloading_metadata() {
return Ok(())
}
let files = self.qbt.torrent_files(t.hash).await?;
let total_size = u64::try_from(t.total_size)
.map_err(|_| anyhow::format_err!(
"Torrent {} {:?} has negative size {}.",
t.hash,
t.name,
t.total_size))?;
let average_size = total_size / u64::try_from(files.len())?;
let cutoff = average_size / 2;
let mut counted_size = 0;
for f in files {
if f.size >= cutoff {
counted_size += f.size;
}
}
let group = {
groups
.lock().unwrap()
.entry(counted_size)
.or_insert_with(||
std::sync::Arc::new(
crate::Group::new(counted_size).into()))
.clone()
};
let mut group = group.lock().await;
let rules = self.rules(&t, group.pinned.is_some()).await?;
if group.pinned.is_none() {
group.pinned = rules.pinned.clone();
}
group.torrents.push(crate::GroupTorrent {
include_in_score: rules.include_in_score,
pin_reason: rules.pin_reason,
pinned: rules.pinned,
torrent: t,
});
Ok(())
})
.buffer_unordered(self.config.parallelism)
.try_collect().await?;
Ok(groups.into_inner()?
.into_values()
.map(|g| {
std::sync::Arc::into_inner(g).unwrap()
.into_inner()
}))
}
}