1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use arc_swap::ArcSwap;
use chrono::Utc;
use tracing::{info, warn};
use crate::nzb_core::config::{AppConfig, RssFeedConfig};
use crate::nzb_core::models::{Priority, RssItem};
use crate::queue_manager::QueueManager;
/// Background RSS feed monitor that polls configured feeds for NZB links,
/// persists all discovered items to the database, and automatically enqueues
/// items that match download rules.
pub struct RssMonitor {
config: Arc<ArcSwap<AppConfig>>,
queue_manager: Arc<QueueManager>,
data_dir: PathBuf,
}
impl RssMonitor {
pub fn new(
config: Arc<ArcSwap<AppConfig>>,
queue_manager: Arc<QueueManager>,
data_dir: PathBuf,
) -> Self {
Self {
config,
queue_manager,
data_dir,
}
}
/// Migrate legacy rss_seen.json entries into the database on first run.
fn migrate_seen_json(&self) {
let seen_file = self.data_dir.join("rss_seen.json");
if !seen_file.exists() {
return;
}
let seen: HashSet<String> = std::fs::read_to_string(&seen_file)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
if seen.is_empty() {
let _ = std::fs::remove_file(&seen_file);
return;
}
info!(
count = seen.len(),
"Migrating legacy rss_seen.json to database"
);
for id in &seen {
let item = RssItem {
id: id.clone(),
feed_name: "migrated".into(),
title: id.clone(),
url: None,
published_at: None,
first_seen_at: Utc::now(),
downloaded: true,
downloaded_at: Some(Utc::now()),
category: None,
size_bytes: 0,
};
let _ = self.queue_manager.rss_item_upsert(&item);
}
// Remove the legacy file after migration
let _ = std::fs::remove_file(&seen_file);
info!("Legacy rss_seen.json migrated and removed");
}
/// Run the monitor loop forever, polling feeds at their configured intervals.
/// Reads feed config from the shared ArcSwap on each iteration so that
/// feeds added/removed/toggled via the API take effect without a restart.
pub async fn run(self) {
info!("RSS monitor started");
// Migrate legacy seen file on first run
self.migrate_seen_json();
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
loop {
let cfg = self.config.load();
let feeds = &cfg.rss_feeds;
for feed in feeds {
if !feed.enabled {
continue;
}
if let Err(e) = self.check_feed(&client, feed).await {
warn!(feed = %feed.name, error = %e, "RSS feed check failed");
}
}
// Prune old items based on config
let limit = cfg.general.rss_history_limit.unwrap_or(500);
if let Ok(count) = self.queue_manager.rss_item_count()
&& count > limit
&& let Ok(pruned) = self.queue_manager.rss_items_prune(limit)
&& pruned > 0
{
info!(pruned, "Pruned old RSS items");
}
// Use the minimum poll interval across all enabled feeds, defaulting to 15 min
let interval = feeds
.iter()
.filter(|f| f.enabled)
.map(|f| f.poll_interval_secs)
.min()
.unwrap_or(900);
drop(cfg);
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
async fn check_feed(
&self,
client: &reqwest::Client,
feed: &RssFeedConfig,
) -> anyhow::Result<()> {
info!(feed = %feed.name, url = %feed.url, "Checking RSS feed");
let response = client.get(&feed.url).send().await?;
let body = response.bytes().await?;
let parsed = feed_rs::parser::parse(&body[..])?;
// Compile filter regex if provided
let filter = feed
.filter_regex
.as_ref()
.and_then(|r| regex::Regex::new(r).ok());
// Load download rules for this feed
let rules = self
.queue_manager
.rss_rule_list()
.unwrap_or_default()
.into_iter()
.filter(|r| r.enabled && r.feed_names.iter().any(|n| n == &feed.name))
.collect::<Vec<_>>();
// Collect all items for batch insert (single DB lock)
struct PendingItem {
item: RssItem,
title: String,
nzb_url: Option<String>,
}
let now = Utc::now();
let mut pending: Vec<PendingItem> = Vec::new();
for entry in &parsed.entries {
let title = entry
.title
.as_ref()
.map(|t| t.content.clone())
.unwrap_or_default();
let nzb_url = Self::extract_nzb_url(entry);
let size_bytes = entry
.media
.iter()
.flat_map(|m| &m.content)
.filter_map(|c| c.size)
.next()
.unwrap_or(0);
let published_at = entry.published.or(entry.updated);
pending.push(PendingItem {
item: RssItem {
id: entry.id.clone(),
feed_name: feed.name.clone(),
title: title.clone(),
url: nzb_url.clone(),
published_at,
first_seen_at: now,
downloaded: false,
downloaded_at: None,
category: feed.category.clone(),
size_bytes,
},
title,
nzb_url,
});
}
// Batch insert all items in one transaction (single DB lock)
let items_for_insert: Vec<RssItem> = pending.iter().map(|p| p.item.clone()).collect();
let new_items = self
.queue_manager
.rss_items_batch_upsert(&items_for_insert)
.unwrap_or(0);
// Now process auto-downloads for newly inserted items only
// (batch_upsert uses INSERT OR IGNORE so only new items get inserted)
for p in &pending {
let Some(ref url) = p.nzb_url else { continue };
// Feed-level filter must pass (if set)
let passes_filter = match filter {
Some(ref re) => re.is_match(&p.title),
None => true,
};
if !passes_filter {
continue;
}
// Check download rules
let matched_rule = rules.iter().find(|r| {
regex::Regex::new(&r.match_regex)
.map(|re| re.is_match(&p.title))
.unwrap_or(false)
});
// Auto-download logic:
// 1. If a download rule matches → download with rule's category/priority
// 2. If feed has auto_download enabled (and no filter_regex) → download all
// 3. Otherwise → don't auto-download
let (should_download, category, priority) = if let Some(rule) = matched_rule {
(
true,
rule.category.clone().or_else(|| feed.category.clone()),
rule.priority,
)
} else if feed.auto_download && feed.filter_regex.is_none() {
(true, feed.category.clone(), 1)
} else {
(false, None, 1)
};
if !should_download {
continue;
}
// Skip if already downloaded (existing item in DB)
if self
.queue_manager
.rss_item_exists(&p.item.id)
.unwrap_or(false)
{
// Item existed before this batch — already processed previously
// Check if it was newly inserted by seeing if it's in our new count
// Actually, we can just check the downloaded flag
if let Ok(Some(existing)) = self.queue_manager.rss_item_get(&p.item.id)
&& existing.downloaded
{
continue;
}
}
info!(feed = %feed.name, title = %p.title, url = %url, "Auto-downloading RSS item");
match self
.fetch_and_enqueue(client, url, &p.title, feed, category.as_deref(), priority)
.await
{
Ok(()) => {
let _ = self
.queue_manager
.rss_item_mark_downloaded(&p.item.id, category.as_deref());
info!(title = %p.title, "RSS item enqueued successfully");
}
Err(e) => {
warn!(title = %p.title, error = %e, "Failed to enqueue RSS item");
}
}
}
if new_items > 0 {
info!(feed = %feed.name, new_items, "RSS feed check complete");
}
Ok(())
}
/// Extract NZB URL from a feed entry's links or media content.
fn extract_nzb_url(entry: &feed_rs::model::Entry) -> Option<String> {
entry
.links
.iter()
.find(|l| {
l.href.ends_with(".nzb")
|| l.media_type
.as_deref()
.is_some_and(|mt| mt == "application/x-nzb")
})
.map(|l| l.href.clone())
.or_else(|| {
// Check media content for NZB URLs
entry
.media
.iter()
.flat_map(|m| &m.content)
.find(|c| c.url.as_ref().is_some_and(|u| u.as_str().ends_with(".nzb")))
.and_then(|c| c.url.as_ref().map(|u| u.to_string()))
})
.or_else(|| {
// Fall back to first link
entry.links.first().map(|l| l.href.clone())
})
}
async fn fetch_and_enqueue(
&self,
client: &reqwest::Client,
url: &str,
name: &str,
feed: &RssFeedConfig,
category: Option<&str>,
priority: i32,
) -> anyhow::Result<()> {
let response = client.get(url).send().await?;
if !response.status().is_success() {
anyhow::bail!("HTTP {}", response.status());
}
let data = response.bytes().await?;
let mut job = crate::nzb_core::nzb_parser::parse_nzb(name, &data)?;
if let Some(cat) = category {
job.category = cat.to_string();
} else if let Some(ref cat) = feed.category {
job.category = cat.clone();
}
job.priority = match priority {
0 => Priority::Low,
2 => Priority::High,
3 => Priority::Force,
_ => Priority::Normal,
};
job.work_dir = self.queue_manager.incomplete_dir().join(&job.id);
job.output_dir = if !job.category.is_empty() && job.category != "Default" {
self.queue_manager
.complete_dir()
.join(&job.category)
.join(&job.name)
} else {
self.queue_manager.complete_dir().join(&job.name)
};
std::fs::create_dir_all(&job.work_dir)?;
self.queue_manager.add_job(job, Some(data.to_vec()))?;
Ok(())
}
}