use crate::UsenetDownloader;
use crate::config::{RssFeedConfig, RssFilter};
use crate::db::Database;
use crate::error::{Error, Result};
use chrono::{DateTime, Utc};
use regex::Regex;
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Clone, Debug)]
pub struct RssItem {
pub title: String,
pub link: Option<String>,
pub guid: String,
pub pub_date: Option<DateTime<Utc>>,
pub description: Option<String>,
pub size: Option<u64>,
pub nzb_url: Option<String>,
}
pub struct RssManager {
http_client: reqwest::Client,
db: Arc<Database>,
downloader: Arc<UsenetDownloader>,
feeds: Vec<RssFeedConfig>,
}
impl RssManager {
pub fn new(
db: Arc<Database>,
downloader: Arc<UsenetDownloader>,
feeds: Vec<RssFeedConfig>,
) -> Result<Self> {
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.user_agent("usenet-dl RSS Reader")
.build()
.map_err(|e| Error::Other(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
http_client,
db,
downloader,
feeds,
})
}
pub fn start(&self) -> Result<()> {
info!("RSS manager initialized with {} feeds", self.feeds.len());
Ok(())
}
pub async fn stop(&self) {
info!("RSS manager stopped");
}
pub async fn check_feed(&self, feed_config: &RssFeedConfig) -> Result<Vec<RssItem>> {
debug!("Checking RSS feed: {}", feed_config.url);
let response = self
.http_client
.get(&feed_config.url)
.send()
.await
.map_err(|e| Error::Other(format!("Failed to fetch RSS feed: {}", e)))?;
let status = response.status();
if !status.is_success() {
return Err(Error::Other(format!(
"RSS feed returned HTTP {}: {}",
status.as_u16(),
feed_config.url
)));
}
let content = response
.text()
.await
.map_err(|e| Error::Other(format!("Failed to read RSS feed content: {}", e)))?;
match self.parse_as_rss(&content) {
Ok(items) => {
debug!("Successfully parsed as RSS, found {} items", items.len());
Ok(items)
}
Err(rss_err) => {
debug!("Failed to parse as RSS: {}, trying Atom", rss_err);
match self.parse_as_atom(&content) {
Ok(items) => {
debug!("Successfully parsed as Atom, found {} items", items.len());
Ok(items)
}
Err(atom_err) => Err(Error::Other(format!(
"Failed to parse feed as RSS or Atom. RSS error: {}. Atom error: {}",
rss_err, atom_err
))),
}
}
}
}
fn parse_as_rss(&self, content: &str) -> Result<Vec<RssItem>> {
let channel = content
.parse::<rss::Channel>()
.map_err(|e| Error::Other(format!("RSS parse error: {}", e)))?;
let items = channel
.items()
.iter()
.map(|item| {
let guid = item
.guid()
.map(|g| g.value().to_string())
.or_else(|| item.link().map(|l| l.to_string()))
.unwrap_or_else(|| item.title().unwrap_or("").to_string());
let pub_date = item.pub_date().and_then(|date_str| {
chrono::DateTime::parse_from_rfc2822(date_str)
.ok()
.map(|dt| dt.with_timezone(&Utc))
});
let nzb_url = item
.enclosure()
.map(|enc| enc.url().to_string())
.or_else(|| {
item.link()
.filter(|link| link.ends_with(".nzb"))
.map(|l| l.to_string())
});
let size = item
.enclosure()
.and_then(|enc| enc.length().parse::<u64>().ok());
RssItem {
title: item.title().unwrap_or("").to_string(),
link: item.link().map(|l| l.to_string()),
guid,
pub_date,
description: item.description().map(|d| d.to_string()),
size,
nzb_url,
}
})
.collect();
Ok(items)
}
fn parse_as_atom(&self, content: &str) -> Result<Vec<RssItem>> {
let feed = atom_syndication::Feed::read_from(content.as_bytes())
.map_err(|e| Error::Other(format!("Atom parse error: {}", e)))?;
let items = feed
.entries()
.iter()
.map(|entry| {
let guid = entry.id().to_string();
let pub_date = entry
.published()
.or_else(|| Some(entry.updated()))
.and_then(|dt| {
chrono::DateTime::parse_from_rfc3339(&dt.to_rfc3339())
.ok()
.map(|dt| dt.with_timezone(&Utc))
});
let nzb_url = entry
.links()
.iter()
.find(|link| {
link.href().ends_with(".nzb")
|| link.mime_type() == Some("application/x-nzb")
})
.map(|link| link.href().to_string());
let link = entry.links().first().map(|link| link.href().to_string());
let size = entry
.links()
.iter()
.find(|link| link.rel() == "enclosure")
.and_then(|link| link.length().and_then(|l| l.parse::<u64>().ok()));
let description = entry.summary().map(|s| s.as_str().to_string()).or_else(|| {
entry
.content()
.and_then(|c| c.value().map(|v| v.to_string()))
});
RssItem {
title: entry.title().as_str().to_string(),
link,
guid,
pub_date,
description,
size,
nzb_url,
}
})
.collect();
Ok(items)
}
fn compile_patterns(patterns: &[String], kind: &str) -> Vec<Regex> {
patterns
.iter()
.filter_map(|pattern| {
regex::RegexBuilder::new(pattern)
.size_limit(1024 * 1024) .build()
.map_err(|e| {
warn!("Invalid {} regex pattern '{}': {}", kind, pattern, e);
})
.ok()
})
.collect()
}
pub fn matches_filters(&self, item: &RssItem, filter: &RssFilter) -> bool {
let search_text = format!(
"{} {}",
item.title,
item.description.as_deref().unwrap_or("")
);
if !filter.include.is_empty() {
let compiled_includes = Self::compile_patterns(&filter.include, "include");
let any_include_matches = compiled_includes.iter().any(|re| re.is_match(&search_text));
if !any_include_matches {
debug!(
"Item '{}' rejected: no include patterns matched",
item.title
);
return false;
}
}
let compiled_excludes = Self::compile_patterns(&filter.exclude, "exclude");
for re in &compiled_excludes {
if re.is_match(&search_text) {
debug!(
"Item '{}' rejected: matched exclude pattern '{}'",
item.title,
re.as_str()
);
return false;
}
}
if let Some(size) = item.size {
if let Some(min_size) = filter.min_size
&& size < min_size
{
debug!(
"Item '{}' rejected: size {} < min {}",
item.title, size, min_size
);
return false;
}
if let Some(max_size) = filter.max_size
&& size > max_size
{
debug!(
"Item '{}' rejected: size {} > max {}",
item.title, size, max_size
);
return false;
}
}
if let Some(max_age) = filter.max_age
&& let Some(pub_date) = item.pub_date
{
let age = Utc::now().signed_duration_since(pub_date);
let max_age_chrono =
chrono::Duration::from_std(max_age).unwrap_or(chrono::Duration::MAX);
if age > max_age_chrono {
debug!(
"Item '{}' rejected: age {:?} > max {:?}",
item.title, age, max_age_chrono
);
return false;
}
}
debug!("Item '{}' accepted: passed all filter checks", item.title);
true
}
pub async fn process_feed_items(
&self,
feed_id: i64,
feed_config: &RssFeedConfig,
items: Vec<RssItem>,
) -> Result<usize> {
let mut downloaded_count = 0;
for item in items {
if self.db.is_rss_item_seen(feed_id, &item.guid).await? {
debug!("Skipping already seen item: {}", item.title);
continue;
}
let matches = if feed_config.filters.is_empty() {
true
} else {
feed_config
.filters
.iter()
.any(|filter| self.matches_filters(&item, filter))
};
if !matches {
debug!("Item '{}' did not match any filters, skipping", item.title);
continue;
}
self.db.mark_rss_item_seen(feed_id, &item.guid).await?;
info!("New RSS item matched filters: {}", item.title);
if feed_config.auto_download {
if let Some(nzb_url) = &item.nzb_url {
let options = crate::types::DownloadOptions {
category: feed_config.category.clone(),
destination: None,
post_process: None,
priority: feed_config.priority,
password: None,
};
match self.downloader.add_nzb_url(nzb_url, options).await {
Ok(download_id) => {
info!(
"Auto-downloaded '{}' from RSS feed (download_id: {})",
item.title, download_id
);
downloaded_count += 1;
}
Err(e) => {
warn!(
"Failed to auto-download '{}' from RSS feed: {}",
item.title, e
);
}
}
} else {
debug!("Item '{}' has no NZB URL, cannot auto-download", item.title);
}
}
}
Ok(downloaded_count)
}
}
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
mod tests;