qbt-clean 0.122.0

Automated rules-based cleaning of qBittorrent torrents.
use futures::StreamExt as _;
use futures::TryStreamExt as _;

pub struct Global<Q: crate::Qbt> {
	pub config: crate::Config,
	pub now: std::time::SystemTime,
	pub qbt: Q,
}

impl<Q: crate::Qbt> Global<Q> {
	pub async fn rules(
		&self,
		t: &crate::Torrent,
		force_pinned: bool,
	) -> anyhow::Result<crate::RuleResult> {
		let last_activity = self.now.duration_since(t.last_activity)
			.unwrap_or(std::time::Duration::ZERO);

		let mut categories_allowed = crate::Requirement::new(
			|c| c.contains(&t.category),
			&self.config.categories_allowed);
		let mut last_activity_min = crate::Requirement::new(
			|&l| last_activity >= l,
			&self.config.last_activity_min);
		let mut pinned = crate::Requirement::new(
			|&v| v,
			&None);
		let mut seed_time_min = crate::Requirement::new(
			|&c| t.seeding_time > c,
			&self.config.seed_time_min);
		let mut seeder_count_min = crate::Requirement::new(
			|&s| t.seeders >= s,
			&self.config.seeder_count_min);
		let mut state_allowed = crate::Requirement::new(
			|s| s.is_empty() || s.contains(&t.state),
			&self.config.state_allowed);

		if force_pinned {
			pinned.add_definite_match(&Some(Some(true)));
		}

		let mut include_in_score = crate::Requirement::new(
			|&v| v,
			&true);

		for rule in self.config.rules.iter().rev() {
			let pin_relevant = pinned.is_relevant(&rule.pin.map(Some))
				|| (pinned.is_possible(None) && (
					categories_allowed.is_relevant(&rule.categories_allowed)
					|| last_activity_min.is_relevant(&rule.last_activity_min)
					|| seed_time_min.is_relevant(&rule.seed_time_min)
					|| seeder_count_min.is_relevant(&rule.seeder_count_min)
					|| state_allowed.is_relevant(&rule.state_allowed)));

			if !(pin_relevant || include_in_score.is_relevant(&rule.include_in_score)) {
				continue
			}

			let Some(r) = rule.match_.match_fast(self.now, &t) else {
				categories_allowed.add_possible_match(&rule.categories_allowed);
				last_activity_min.add_possible_match(&rule.last_activity_min);
				pinned.add_possible_match(&rule.pin.map(Some));
				seed_time_min.add_possible_match(&rule.seed_time_min);
				seeder_count_min.add_possible_match(&rule.seeder_count_min);
				state_allowed.add_possible_match(&rule.state_allowed);

				include_in_score.add_possible_match(&rule.include_in_score);

				continue;
			};

			if !r { continue }

			categories_allowed.add_definite_match(&rule.categories_allowed);
			last_activity_min.add_definite_match(&rule.last_activity_min);
			pinned.add_definite_match(&rule.pin.map(Some));
			seed_time_min.add_definite_match(&rule.seed_time_min);
			seeder_count_min.add_definite_match(&rule.seeder_count_min);
			state_allowed.add_definite_match(&rule.state_allowed);

			include_in_score.add_definite_match(&rule.include_in_score);

			if let Some(Some(pinned)) = pinned.get_finalized()
			&& let Some(include_in_score) = include_in_score.get_finalized()
			{
				return Ok(crate::RuleResult {
					include_in_score,
					pinned: pinned.then_some(crate::PinReason::Explicit),
				})
			}
		}

		if let Some(include_in_score) = include_in_score.get_definite()
		&& let Some(pinned) =
			if let Some(Some(pinned)) = pinned.get_definite() {
				Some(pinned.then_some(crate::PinReason::Explicit))
			} else if pinned.is_possible(Some(false)) {
				None
			} else if categories_allowed.get_definite() == Some(false) {
				Some(Some(crate::PinReason::Category(t.category.clone())))
			} else if state_allowed.get_definite() == Some(false) {
				Some(Some(crate::PinReason::State(t.state)))
			} else if seed_time_min.get_definite() == Some(false) {
				Some(Some(crate::PinReason::SeedTime(t.seeding_time)))
			} else if last_activity_min.get_definite() == Some(false) {
				Some(Some(crate::PinReason::Activity(last_activity)))
			} else if seeder_count_min.get_definite() == Some(false) {
				Some(Some(crate::PinReason::Seeders(t.seeders)))
			} else if pinned.is_possible(Some(true)) {
				None
			} else if categories_allowed.is_possible(false)
				|| state_allowed.is_possible(false)
				|| seed_time_min.is_possible(false)
				|| last_activity_min.is_possible(false)
				|| seeder_count_min.is_possible(false)
			{
				None
			} else {
				Some(None)
			}
		{
			return Ok(crate::RuleResult {
				include_in_score,
				pinned,
			})
		}

		categories_allowed.reset();
		last_activity_min.reset();
		pinned.reset();
		seed_time_min.reset();
		seeder_count_min.reset();
		state_allowed.reset();

		if force_pinned {
			pinned.add_definite_match(&Some(Some(true)));
		}

		include_in_score.reset();

		let trackers = self.qbt.torrent_trackers(t.hash).await?;

		for rule in self.config.rules.iter().rev() {
			let pin_relevant = pinned.is_relevant(&rule.pin.map(Some))
				|| (pinned.is_possible(None) && (
					categories_allowed.is_relevant(&rule.categories_allowed)
					|| last_activity_min.is_relevant(&rule.last_activity_min)
					|| seed_time_min.is_relevant(&rule.seed_time_min)
					|| seeder_count_min.is_relevant(&rule.seeder_count_min)
					|| state_allowed.is_relevant(&rule.state_allowed)));

			if !(pin_relevant || include_in_score.is_relevant(&rule.include_in_score)) {
				continue
			}

			if let Some(r) = rule.match_.match_fast(self.now, &t) {
				if !r {
					continue
				}
			} else if !rule.match_.match_slow(&t, &trackers) {
				continue
			}

			categories_allowed.add_definite_match(&rule.categories_allowed);
			last_activity_min.add_definite_match(&rule.last_activity_min);
			pinned.add_definite_match(&rule.pin.map(Some));
			seed_time_min.add_definite_match(&rule.seed_time_min);
			seeder_count_min.add_definite_match(&rule.seeder_count_min);
			state_allowed.add_definite_match(&rule.state_allowed);

			include_in_score.add_definite_match(&rule.include_in_score);
		}

		let pinned = if let Some(pinned) = pinned.get_definite().unwrap() {
			pinned.then_some(crate::PinReason::Explicit)
		} else if categories_allowed.get_definite().unwrap() == false {
			Some(crate::PinReason::Category(t.category.clone()))
		} else if state_allowed.get_definite().unwrap() == false {
			Some(crate::PinReason::State(t.state))
		} else if seed_time_min.get_definite().unwrap() == false {
			Some(crate::PinReason::SeedTime(t.seeding_time))
		} else if last_activity_min.get_definite().unwrap() == false {
			Some(crate::PinReason::Activity(last_activity))
		} else if seeder_count_min.get_definite().unwrap() == false {
			Some(crate::PinReason::Seeders(t.seeders))
		} else {
			None
		};

		Ok(crate::RuleResult {
			include_in_score: include_in_score.get_definite().unwrap(),
			pinned,
		})
	}

	pub async fn group(
		&self,
		filter_name: Option<&regex::Regex>,
	) -> anyhow::Result<impl Iterator<Item=crate::Group>> {
		let mut torrents = self.qbt.torrents_info().await?;

		torrents.retain(|t| {
			let Some(filter_name) = filter_name else { return true };
			filter_name.is_match(&t.name)
		});

		let groups = std::sync::Mutex::<
			std::collections::HashMap::<
				u64,
				std::sync::Arc<
					tokio::sync::Mutex<crate::Group>>>>::default();

		let () = futures::stream::iter(torrents)
			.map(async |t| -> anyhow::Result<()> {
				if t.state.is_downloading_metadata() {
					return Ok(())
				}

				let files = self.qbt.torrent_files(t.hash).await?;

				let total_size = u64::try_from(t.total_size)
					.map_err(|_| anyhow::format_err!(
						"Torrent {} {:?} has negative size {}.",
						t.hash,
						t.name,
						t.total_size))?;

				let average_size = total_size / u64::try_from(files.len())?;
				let cutoff = average_size / 2;

				let mut counted_size = 0;
				for f in files {
					if f.size >= cutoff {
						counted_size += f.size;
					}
				}

				let group = {
					groups
						.lock().unwrap()
						.entry(counted_size)
						.or_insert_with(||
							std::sync::Arc::new(
								crate::Group::new(counted_size).into()))
						.clone()
				};
				let mut group = group.lock().await;

				let rules = self.rules(&t, group.pinned.is_some()).await?;
				if group.pinned.is_none() {
					group.pinned = rules.pinned;
				}

				group.torrents.push(crate::GroupTorrent {
					include_in_score: rules.include_in_score,
					torrent: t,
				});

				Ok(())
			})
			.buffer_unordered(self.config.parallelism)
			.try_collect().await?;

		Ok(groups.into_inner()?
			.into_iter()
			.map(|(_, g)| {
				std::sync::Arc::into_inner(g).unwrap()
					.into_inner()
			}))
	}
}