Skip to main content

nzb_core/
db.rs

1use std::path::Path;
2
3use chrono::Utc;
4use rusqlite::{Connection, params};
5use tracing::info;
6
7use crate::error::NzbError;
8use crate::models::*;
9
10#[allow(dead_code)]
11const SCHEMA_VERSION: u32 = 1;
12
13/// Database handle for queue and history persistence.
14pub struct Database {
15    pub(crate) conn: Connection,
16}
17
18impl Database {
19    /// Open (or create) the database at the given path.
20    pub fn open(path: &Path) -> Result<Self, NzbError> {
21        let conn = Connection::open(path)?;
22
23        // Enable WAL mode for concurrent reads during downloads
24        conn.execute_batch("PRAGMA journal_mode=WAL;")?;
25        conn.execute_batch("PRAGMA foreign_keys=ON;")?;
26
27        let db = Self { conn };
28        db.migrate()?;
29        Ok(db)
30    }
31
32    /// Open an in-memory database (for testing).
33    pub fn open_memory() -> Result<Self, NzbError> {
34        let conn = Connection::open_in_memory()?;
35        conn.execute_batch("PRAGMA foreign_keys=ON;")?;
36        let db = Self { conn };
37        db.migrate()?;
38        Ok(db)
39    }
40
41    fn migrate(&self) -> Result<(), NzbError> {
42        self.conn.execute_batch(
43            "CREATE TABLE IF NOT EXISTS schema_version (
44                version INTEGER NOT NULL
45            );",
46        )?;
47
48        let version: u32 = self
49            .conn
50            .query_row(
51                "SELECT COALESCE(MAX(version), 0) FROM schema_version",
52                [],
53                |row| row.get(0),
54            )
55            .unwrap_or(0);
56
57        if version < 1 {
58            info!("Applying database migration v1");
59            self.conn.execute_batch(
60                "
61                -- Active download queue
62                CREATE TABLE IF NOT EXISTS queue (
63                    id TEXT PRIMARY KEY,
64                    name TEXT NOT NULL,
65                    category TEXT NOT NULL DEFAULT 'Default',
66                    status TEXT NOT NULL DEFAULT 'queued',
67                    priority INTEGER NOT NULL DEFAULT 1,
68                    total_bytes INTEGER NOT NULL DEFAULT 0,
69                    downloaded_bytes INTEGER NOT NULL DEFAULT 0,
70                    file_count INTEGER NOT NULL DEFAULT 0,
71                    files_completed INTEGER NOT NULL DEFAULT 0,
72                    article_count INTEGER NOT NULL DEFAULT 0,
73                    articles_downloaded INTEGER NOT NULL DEFAULT 0,
74                    articles_failed INTEGER NOT NULL DEFAULT 0,
75                    added_at TEXT NOT NULL,
76                    completed_at TEXT,
77                    work_dir TEXT NOT NULL,
78                    output_dir TEXT NOT NULL,
79                    password TEXT,
80                    error_message TEXT,
81                    -- Serialized NzbFile/Article data (bincode)
82                    job_data BLOB
83                );
84
85                CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(status);
86                CREATE INDEX IF NOT EXISTS idx_queue_priority ON queue(priority DESC, added_at ASC);
87
88                -- Completed/failed job history
89                CREATE TABLE IF NOT EXISTS history (
90                    id TEXT PRIMARY KEY,
91                    name TEXT NOT NULL,
92                    category TEXT NOT NULL DEFAULT 'Default',
93                    status TEXT NOT NULL,
94                    total_bytes INTEGER NOT NULL DEFAULT 0,
95                    downloaded_bytes INTEGER NOT NULL DEFAULT 0,
96                    added_at TEXT NOT NULL,
97                    completed_at TEXT NOT NULL,
98                    output_dir TEXT NOT NULL,
99                    stages TEXT, -- JSON array of StageResult
100                    error_message TEXT
101                );
102
103                CREATE INDEX IF NOT EXISTS idx_history_completed ON history(completed_at DESC);
104                CREATE INDEX IF NOT EXISTS idx_history_status ON history(status);
105
106                -- Server configuration (persisted separately from TOML for runtime changes)
107                CREATE TABLE IF NOT EXISTS servers (
108                    id TEXT PRIMARY KEY,
109                    config TEXT NOT NULL -- JSON ServerConfig
110                );
111
112                INSERT INTO schema_version (version) VALUES (1);
113                ",
114            )?;
115        }
116
117        if version < 2 {
118            info!("Applying database migration v2");
119            self.conn.execute_batch(
120                "
121                -- Add NZB data storage and server stats to history
122                ALTER TABLE history ADD COLUMN nzb_data BLOB;
123                ALTER TABLE history ADD COLUMN server_stats TEXT DEFAULT '[]';
124
125                -- Add server stats to queue
126                ALTER TABLE queue ADD COLUMN server_stats TEXT DEFAULT '[]';
127
128                -- Add NZB data to queue for preservation
129                ALTER TABLE queue ADD COLUMN nzb_raw BLOB;
130
131                UPDATE schema_version SET version = 2;
132                ",
133            )?;
134        }
135
136        if version < 3 {
137            info!("Applying database migration v3");
138            self.conn.execute_batch(
139                "
140                -- Per-job log storage for history
141                ALTER TABLE history ADD COLUMN job_logs TEXT DEFAULT '[]';
142
143                UPDATE schema_version SET version = 3;
144                ",
145            )?;
146        }
147
148        if version < 4 {
149            info!("Applying database migration v4");
150            self.conn.execute_batch(
151                "
152                -- RSS feed items (persistent feed cache)
153                CREATE TABLE IF NOT EXISTS rss_items (
154                    id TEXT PRIMARY KEY,
155                    feed_name TEXT NOT NULL,
156                    title TEXT NOT NULL,
157                    url TEXT,
158                    published_at TEXT,
159                    first_seen_at TEXT NOT NULL,
160                    downloaded INTEGER NOT NULL DEFAULT 0,
161                    downloaded_at TEXT,
162                    category TEXT,
163                    size_bytes INTEGER DEFAULT 0
164                );
165
166                CREATE INDEX IF NOT EXISTS idx_rss_items_feed ON rss_items(feed_name);
167                CREATE INDEX IF NOT EXISTS idx_rss_items_seen ON rss_items(first_seen_at DESC);
168
169                -- RSS download rules
170                CREATE TABLE IF NOT EXISTS rss_rules (
171                    id TEXT PRIMARY KEY,
172                    name TEXT NOT NULL,
173                    feed_name TEXT NOT NULL,
174                    category TEXT,
175                    priority INTEGER NOT NULL DEFAULT 1,
176                    match_regex TEXT NOT NULL,
177                    enabled INTEGER NOT NULL DEFAULT 1
178                );
179
180                UPDATE schema_version SET version = 4;
181                ",
182            )?;
183        }
184
185        if version < 5 {
186            info!("Applying database migration v5: settings table");
187            self.conn.execute_batch(
188                "
189                CREATE TABLE IF NOT EXISTS settings (
190                    key TEXT PRIMARY KEY,
191                    value TEXT NOT NULL
192                );
193
194                UPDATE schema_version SET version = 5;
195                ",
196            )?;
197        }
198
199        // Ensure settings table exists for databases that jumped to v5
200        // with groups tables but without settings (pre-extraction rustnzbd)
201        self.conn.execute_batch(
202            "CREATE TABLE IF NOT EXISTS settings (
203                key TEXT PRIMARY KEY,
204                value TEXT NOT NULL
205            );",
206        )?;
207
208        #[cfg(feature = "groups-db")]
209        if version < 6 {
210            info!("Applying database migration v6: newsgroup browsing");
211            self.conn.execute_batch(
212                "
213                CREATE TABLE IF NOT EXISTS groups (
214                    id INTEGER PRIMARY KEY AUTOINCREMENT,
215                    name TEXT NOT NULL UNIQUE,
216                    description TEXT,
217                    subscribed INTEGER NOT NULL DEFAULT 0,
218                    article_count INTEGER NOT NULL DEFAULT 0,
219                    first_article INTEGER NOT NULL DEFAULT 0,
220                    last_article INTEGER NOT NULL DEFAULT 0,
221                    last_scanned INTEGER NOT NULL DEFAULT 0,
222                    last_updated TEXT,
223                    created_at TEXT NOT NULL DEFAULT (datetime('now'))
224                );
225                CREATE INDEX IF NOT EXISTS idx_groups_subscribed ON groups(subscribed);
226                CREATE INDEX IF NOT EXISTS idx_groups_name ON groups(name);
227
228                CREATE TABLE IF NOT EXISTS headers (
229                    id INTEGER PRIMARY KEY AUTOINCREMENT,
230                    group_id INTEGER NOT NULL REFERENCES groups(id) ON DELETE CASCADE,
231                    article_num INTEGER NOT NULL,
232                    subject TEXT NOT NULL,
233                    author TEXT NOT NULL,
234                    date TEXT NOT NULL,
235                    message_id TEXT NOT NULL,
236                    references_ TEXT NOT NULL DEFAULT '',
237                    bytes INTEGER NOT NULL DEFAULT 0,
238                    lines INTEGER NOT NULL DEFAULT 0,
239                    read INTEGER NOT NULL DEFAULT 0,
240                    downloaded_at TEXT NOT NULL DEFAULT (datetime('now'))
241                );
242                CREATE INDEX IF NOT EXISTS idx_headers_group ON headers(group_id);
243                CREATE INDEX IF NOT EXISTS idx_headers_msgid ON headers(message_id);
244                CREATE INDEX IF NOT EXISTS idx_headers_artnum ON headers(group_id, article_num);
245
246                CREATE VIRTUAL TABLE IF NOT EXISTS headers_fts USING fts5(
247                    subject, author, content='headers', content_rowid='id',
248                    tokenize='porter unicode61'
249                );
250                CREATE TRIGGER IF NOT EXISTS headers_fts_ins AFTER INSERT ON headers BEGIN
251                    INSERT INTO headers_fts(rowid, subject, author) VALUES (new.id, new.subject, new.author);
252                END;
253                CREATE TRIGGER IF NOT EXISTS headers_fts_del AFTER DELETE ON headers BEGIN
254                    INSERT INTO headers_fts(headers_fts, rowid, subject, author) VALUES ('delete', old.id, old.subject, old.author);
255                END;
256
257                UPDATE schema_version SET version = 6;
258                ",
259            )?;
260        }
261
262        Ok(())
263    }
264
265    /// Read a setting by key.
266    pub fn get_setting(&self, key: &str) -> Option<String> {
267        self.conn
268            .query_row("SELECT value FROM settings WHERE key = ?1", [key], |row| {
269                row.get(0)
270            })
271            .ok()
272    }
273
274    /// Write a setting (upsert).
275    pub fn set_setting(&self, key: &str, value: &str) {
276        let _ = self.conn.execute(
277            "INSERT INTO settings (key, value) VALUES (?1, ?2)
278             ON CONFLICT(key) DO UPDATE SET value = ?2",
279            [key, value],
280        );
281    }
282
283    // -----------------------------------------------------------------------
284    // Queue operations
285    // -----------------------------------------------------------------------
286
287    /// Insert a new job into the queue.
288    pub fn queue_insert(&self, job: &NzbJob) -> Result<(), NzbError> {
289        self.conn.execute(
290            "INSERT INTO queue (id, name, category, status, priority, total_bytes,
291             downloaded_bytes, file_count, files_completed, article_count,
292             articles_downloaded, articles_failed, added_at, work_dir, output_dir, password)
293             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
294            params![
295                job.id,
296                job.name,
297                job.category,
298                job.status.to_string(),
299                job.priority as i32,
300                job.total_bytes as i64,
301                job.downloaded_bytes as i64,
302                job.file_count as i64,
303                job.files_completed as i64,
304                job.article_count as i64,
305                job.articles_downloaded as i64,
306                job.articles_failed as i64,
307                job.added_at.to_rfc3339(),
308                job.work_dir.to_string_lossy().to_string(),
309                job.output_dir.to_string_lossy().to_string(),
310                job.password,
311            ],
312        )?;
313        Ok(())
314    }
315
316    /// Update job progress in the queue.
317    pub fn queue_update_progress(
318        &self,
319        id: &str,
320        status: JobStatus,
321        downloaded_bytes: u64,
322        articles_downloaded: usize,
323        articles_failed: usize,
324        files_completed: usize,
325    ) -> Result<(), NzbError> {
326        self.conn.execute(
327            "UPDATE queue SET status=?2, downloaded_bytes=?3, articles_downloaded=?4,
328             articles_failed=?5, files_completed=?6 WHERE id=?1",
329            params![
330                id,
331                status.to_string(),
332                downloaded_bytes as i64,
333                articles_downloaded as i64,
334                articles_failed as i64,
335                files_completed as i64,
336            ],
337        )?;
338        Ok(())
339    }
340
341    /// Update job priority in the queue.
342    pub fn queue_update_priority(&self, id: &str, priority: i32) -> Result<(), NzbError> {
343        self.conn.execute(
344            "UPDATE queue SET priority = ?2 WHERE id = ?1",
345            params![id, priority],
346        )?;
347        Ok(())
348    }
349
350    /// Remove a job from the queue.
351    pub fn queue_remove(&self, id: &str) -> Result<(), NzbError> {
352        self.conn
353            .execute("DELETE FROM queue WHERE id=?1", params![id])?;
354        Ok(())
355    }
356
357    /// List all jobs in the queue, ordered by priority then add time.
358    pub fn queue_list(&self) -> Result<Vec<NzbJob>, NzbError> {
359        let mut stmt = self.conn.prepare(
360            "SELECT id, name, category, status, priority, total_bytes, downloaded_bytes,
361             file_count, files_completed, article_count, articles_downloaded, articles_failed,
362             added_at, completed_at, work_dir, output_dir, password, error_message
363             FROM queue ORDER BY priority DESC, added_at ASC",
364        )?;
365
366        let jobs = stmt
367            .query_map([], |row| {
368                Ok(NzbJob {
369                    id: row.get(0)?,
370                    name: row.get(1)?,
371                    category: row.get(2)?,
372                    status: parse_status(&row.get::<_, String>(3)?),
373                    priority: parse_priority(row.get::<_, i32>(4)?),
374                    total_bytes: row.get::<_, i64>(5)? as u64,
375                    downloaded_bytes: row.get::<_, i64>(6)? as u64,
376                    file_count: row.get::<_, i64>(7)? as usize,
377                    files_completed: row.get::<_, i64>(8)? as usize,
378                    article_count: row.get::<_, i64>(9)? as usize,
379                    articles_downloaded: row.get::<_, i64>(10)? as usize,
380                    articles_failed: row.get::<_, i64>(11)? as usize,
381                    added_at: parse_datetime(&row.get::<_, String>(12)?),
382                    completed_at: row
383                        .get::<_, Option<String>>(13)?
384                        .map(|s| parse_datetime(&s)),
385                    work_dir: row.get::<_, String>(14)?.into(),
386                    output_dir: row.get::<_, String>(15)?.into(),
387                    password: row.get(16)?,
388                    error_message: row.get(17)?,
389                    speed_bps: 0,
390                    server_stats: Vec::new(),
391                    files: Vec::new(), // Loaded separately
392                })
393            })?
394            .collect::<Result<Vec<_>, _>>()?;
395
396        Ok(jobs)
397    }
398
399    // -----------------------------------------------------------------------
400    // History operations
401    // -----------------------------------------------------------------------
402
403    /// Move a completed/failed job to history.
404    pub fn history_insert(&self, entry: &HistoryEntry) -> Result<(), NzbError> {
405        let stages_json = serde_json::to_string(&entry.stages).unwrap_or_default();
406        let server_stats_json = serde_json::to_string(&entry.server_stats).unwrap_or_default();
407        self.conn.execute(
408            "INSERT INTO history (id, name, category, status, total_bytes, downloaded_bytes,
409             added_at, completed_at, output_dir, stages, error_message, nzb_data, server_stats)
410             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
411            params![
412                entry.id,
413                entry.name,
414                entry.category,
415                entry.status.to_string(),
416                entry.total_bytes as i64,
417                entry.downloaded_bytes as i64,
418                entry.added_at.to_rfc3339(),
419                entry.completed_at.to_rfc3339(),
420                entry.output_dir.to_string_lossy().to_string(),
421                stages_json,
422                entry.error_message,
423                entry.nzb_data,
424                server_stats_json,
425            ],
426        )?;
427        Ok(())
428    }
429
430    /// List history entries, most recent first.
431    pub fn history_list(&self, limit: usize) -> Result<Vec<HistoryEntry>, NzbError> {
432        let mut stmt = self.conn.prepare(
433            "SELECT id, name, category, status, total_bytes, downloaded_bytes,
434             added_at, completed_at, output_dir, stages, error_message, server_stats,
435             CASE WHEN nzb_data IS NOT NULL THEN 1 ELSE 0 END as has_nzb
436             FROM history ORDER BY completed_at DESC LIMIT ?1",
437        )?;
438
439        let entries = stmt
440            .query_map(params![limit as i64], |row| {
441                let stages_json: String = row.get::<_, Option<String>>(9)?.unwrap_or_default();
442                let stages: Vec<StageResult> =
443                    serde_json::from_str(&stages_json).unwrap_or_default();
444                let stats_json: String = row.get::<_, Option<String>>(11)?.unwrap_or_default();
445                let server_stats: Vec<ServerArticleStats> =
446                    serde_json::from_str(&stats_json).unwrap_or_default();
447                let has_nzb: i64 = row.get(12)?;
448
449                Ok(HistoryEntry {
450                    id: row.get(0)?,
451                    name: row.get(1)?,
452                    category: row.get(2)?,
453                    status: parse_status(&row.get::<_, String>(3)?),
454                    total_bytes: row.get::<_, i64>(4)? as u64,
455                    downloaded_bytes: row.get::<_, i64>(5)? as u64,
456                    added_at: parse_datetime(&row.get::<_, String>(6)?),
457                    completed_at: parse_datetime(&row.get::<_, String>(7)?),
458                    output_dir: row.get::<_, String>(8)?.into(),
459                    stages,
460                    error_message: row.get(10)?,
461                    server_stats,
462                    // Don't load actual blob in list - just note if it exists
463                    nzb_data: if has_nzb != 0 { Some(Vec::new()) } else { None },
464                })
465            })?
466            .collect::<Result<Vec<_>, _>>()?;
467
468        Ok(entries)
469    }
470
471    /// Get the raw NZB data for a history entry (for retry).
472    pub fn history_get_nzb_data(&self, id: &str) -> Result<Option<Vec<u8>>, NzbError> {
473        let result = self.conn.query_row(
474            "SELECT nzb_data FROM history WHERE id = ?1",
475            params![id],
476            |row| row.get::<_, Option<Vec<u8>>>(0),
477        );
478        match result {
479            Ok(data) => Ok(data),
480            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
481            Err(e) => Err(NzbError::Database(e)),
482        }
483    }
484
485    /// Enforce history retention limit by deleting oldest entries.
486    pub fn history_enforce_retention(&self, max_entries: usize) -> Result<(), NzbError> {
487        self.conn.execute(
488            "DELETE FROM history WHERE id NOT IN (
489                SELECT id FROM history ORDER BY completed_at DESC LIMIT ?1
490            )",
491            params![max_entries as i64],
492        )?;
493        Ok(())
494    }
495
496    /// Get a single history entry by ID.
497    pub fn history_get(&self, id: &str) -> Result<Option<HistoryEntry>, NzbError> {
498        let mut stmt = self.conn.prepare(
499            "SELECT id, name, category, status, total_bytes, downloaded_bytes,
500             added_at, completed_at, output_dir, stages, error_message, server_stats
501             FROM history WHERE id = ?1",
502        )?;
503
504        let result = stmt.query_row(params![id], |row| {
505            let stages_json: String = row.get::<_, Option<String>>(9)?.unwrap_or_default();
506            let stages: Vec<StageResult> = serde_json::from_str(&stages_json).unwrap_or_default();
507            let stats_json: String = row.get::<_, Option<String>>(11)?.unwrap_or_default();
508            let server_stats: Vec<ServerArticleStats> =
509                serde_json::from_str(&stats_json).unwrap_or_default();
510
511            Ok(HistoryEntry {
512                id: row.get(0)?,
513                name: row.get(1)?,
514                category: row.get(2)?,
515                status: parse_status(&row.get::<_, String>(3)?),
516                total_bytes: row.get::<_, i64>(4)? as u64,
517                downloaded_bytes: row.get::<_, i64>(5)? as u64,
518                added_at: parse_datetime(&row.get::<_, String>(6)?),
519                completed_at: parse_datetime(&row.get::<_, String>(7)?),
520                output_dir: row.get::<_, String>(8)?.into(),
521                stages,
522                error_message: row.get(10)?,
523                server_stats,
524                nzb_data: None,
525            })
526        });
527
528        match result {
529            Ok(entry) => Ok(Some(entry)),
530            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
531            Err(e) => Err(NzbError::Database(e)),
532        }
533    }
534
535    /// Store serialized job file/article state for resume support.
536    pub fn queue_store_job_data(&self, id: &str, data: &[u8]) -> Result<(), NzbError> {
537        self.conn.execute(
538            "UPDATE queue SET job_data = ?2 WHERE id = ?1",
539            params![id, data],
540        )?;
541        Ok(())
542    }
543
544    /// Load serialized job file/article state for resume.
545    pub fn queue_load_job_data(&self, id: &str) -> Result<Option<Vec<u8>>, NzbError> {
546        let mut stmt = self
547            .conn
548            .prepare("SELECT job_data FROM queue WHERE id = ?1")?;
549        let result = stmt.query_row(params![id], |row| row.get::<_, Option<Vec<u8>>>(0))?;
550        Ok(result)
551    }
552
553    /// Store raw NZB data for a queue job.
554    pub fn queue_store_nzb_data(&self, id: &str, nzb_data: &[u8]) -> Result<(), NzbError> {
555        self.conn.execute(
556            "UPDATE queue SET nzb_raw = ?2 WHERE id = ?1",
557            params![id, nzb_data],
558        )?;
559        Ok(())
560    }
561
562    /// Get raw NZB data from a queue job.
563    pub fn queue_get_nzb_data(&self, id: &str) -> Result<Option<Vec<u8>>, NzbError> {
564        let result = self.conn.query_row(
565            "SELECT nzb_raw FROM queue WHERE id = ?1",
566            params![id],
567            |row| row.get::<_, Option<Vec<u8>>>(0),
568        );
569        match result {
570            Ok(data) => Ok(data),
571            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
572            Err(e) => Err(NzbError::Database(e)),
573        }
574    }
575
576    /// Count history entries.
577    pub fn history_count(&self) -> Result<usize, NzbError> {
578        let count: i64 = self
579            .conn
580            .query_row("SELECT COUNT(*) FROM history", [], |row| row.get(0))?;
581        Ok(count as usize)
582    }
583
584    /// Remove a history entry.
585    pub fn history_remove(&self, id: &str) -> Result<(), NzbError> {
586        self.conn
587            .execute("DELETE FROM history WHERE id=?1", params![id])?;
588        Ok(())
589    }
590
591    /// Clear all history.
592    pub fn history_clear(&self) -> Result<(), NzbError> {
593        self.conn.execute("DELETE FROM history", [])?;
594        Ok(())
595    }
596
597    /// Store per-job logs for a history entry.
598    pub fn history_store_logs(&self, id: &str, logs_json: &str) -> Result<(), NzbError> {
599        self.conn.execute(
600            "UPDATE history SET job_logs = ?2 WHERE id = ?1",
601            params![id, logs_json],
602        )?;
603        Ok(())
604    }
605
606    /// Get per-job logs for a history entry.
607    pub fn history_get_logs(&self, id: &str) -> Result<Option<String>, NzbError> {
608        let result = self.conn.query_row(
609            "SELECT job_logs FROM history WHERE id = ?1",
610            params![id],
611            |row| row.get::<_, Option<String>>(0),
612        );
613        match result {
614            Ok(data) => Ok(data),
615            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
616            Err(e) => Err(NzbError::Database(e)),
617        }
618    }
619
620    // -----------------------------------------------------------------------
621    // RSS item operations
622    // -----------------------------------------------------------------------
623
624    /// Upsert an RSS feed item (insert or ignore if already exists).
625    pub fn rss_item_upsert(&self, item: &RssItem) -> Result<(), NzbError> {
626        self.conn.execute(
627            "INSERT OR IGNORE INTO rss_items (id, feed_name, title, url, published_at,
628             first_seen_at, downloaded, downloaded_at, category, size_bytes)
629             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
630            params![
631                item.id,
632                item.feed_name,
633                item.title,
634                item.url,
635                item.published_at.map(|d| d.to_rfc3339()),
636                item.first_seen_at.to_rfc3339(),
637                item.downloaded as i32,
638                item.downloaded_at.map(|d| d.to_rfc3339()),
639                item.category,
640                item.size_bytes as i64,
641            ],
642        )?;
643        Ok(())
644    }
645
646    /// Batch upsert RSS feed items in a single transaction.
647    /// Returns the number of newly inserted items.
648    pub fn rss_items_batch_upsert(&self, items: &[RssItem]) -> Result<usize, NzbError> {
649        let tx = self.conn.unchecked_transaction()?;
650        let mut inserted = 0usize;
651        {
652            let mut stmt = tx.prepare_cached(
653                "INSERT OR IGNORE INTO rss_items (id, feed_name, title, url, published_at,
654                 first_seen_at, downloaded, downloaded_at, category, size_bytes)
655                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
656            )?;
657            for item in items {
658                let rows = stmt.execute(params![
659                    item.id,
660                    item.feed_name,
661                    item.title,
662                    item.url,
663                    item.published_at.map(|d| d.to_rfc3339()),
664                    item.first_seen_at.to_rfc3339(),
665                    item.downloaded as i32,
666                    item.downloaded_at.map(|d| d.to_rfc3339()),
667                    item.category,
668                    item.size_bytes as i64,
669                ])?;
670                if rows > 0 {
671                    inserted += 1;
672                }
673            }
674        }
675        tx.commit()?;
676        Ok(inserted)
677    }
678
679    /// Check if an RSS item ID already exists in the database.
680    pub fn rss_item_exists(&self, id: &str) -> Result<bool, NzbError> {
681        let count: i64 = self.conn.query_row(
682            "SELECT COUNT(*) FROM rss_items WHERE id = ?1",
683            params![id],
684            |row| row.get(0),
685        )?;
686        Ok(count > 0)
687    }
688
689    /// List RSS items, optionally filtered by feed name, ordered by first_seen_at DESC.
690    pub fn rss_items_list(
691        &self,
692        feed_name: Option<&str>,
693        limit: usize,
694    ) -> Result<Vec<RssItem>, NzbError> {
695        let (sql, limit_val) = if let Some(name) = feed_name {
696            let mut stmt = self.conn.prepare(
697                "SELECT id, feed_name, title, url, published_at, first_seen_at,
698                 downloaded, downloaded_at, category, size_bytes
699                 FROM rss_items WHERE feed_name = ?1
700                 ORDER BY first_seen_at DESC LIMIT ?2",
701            )?;
702            let items = stmt
703                .query_map(params![name, limit as i64], |row| self.map_rss_item(row))?
704                .collect::<Result<Vec<_>, _>>()?;
705            return Ok(items);
706        } else {
707            (
708                "SELECT id, feed_name, title, url, published_at, first_seen_at,
709                 downloaded, downloaded_at, category, size_bytes
710                 FROM rss_items ORDER BY first_seen_at DESC LIMIT ?1",
711                limit,
712            )
713        };
714        let mut stmt = self.conn.prepare(sql)?;
715        let items = stmt
716            .query_map(params![limit_val as i64], |row| self.map_rss_item(row))?
717            .collect::<Result<Vec<_>, _>>()?;
718        Ok(items)
719    }
720
721    /// Get a single RSS item by ID.
722    pub fn rss_item_get(&self, id: &str) -> Result<Option<RssItem>, NzbError> {
723        let mut stmt = self.conn.prepare(
724            "SELECT id, feed_name, title, url, published_at, first_seen_at,
725             downloaded, downloaded_at, category, size_bytes
726             FROM rss_items WHERE id = ?1",
727        )?;
728        let result = stmt.query_row(params![id], |row| self.map_rss_item(row));
729        match result {
730            Ok(item) => Ok(Some(item)),
731            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
732            Err(e) => Err(NzbError::Database(e)),
733        }
734    }
735
736    /// Mark an RSS item as downloaded.
737    pub fn rss_item_mark_downloaded(
738        &self,
739        id: &str,
740        category: Option<&str>,
741    ) -> Result<(), NzbError> {
742        self.conn.execute(
743            "UPDATE rss_items SET downloaded = 1, downloaded_at = ?2, category = ?3 WHERE id = ?1",
744            params![id, Utc::now().to_rfc3339(), category],
745        )?;
746        Ok(())
747    }
748
749    /// Count total RSS items.
750    pub fn rss_item_count(&self) -> Result<usize, NzbError> {
751        let count: i64 = self
752            .conn
753            .query_row("SELECT COUNT(*) FROM rss_items", [], |row| row.get(0))?;
754        Ok(count as usize)
755    }
756
757    /// Prune RSS items to keep only the N most recent (by first_seen_at).
758    pub fn rss_items_prune(&self, keep: usize) -> Result<usize, NzbError> {
759        let deleted = self.conn.execute(
760            "DELETE FROM rss_items WHERE id NOT IN (
761                SELECT id FROM rss_items ORDER BY first_seen_at DESC LIMIT ?1
762            )",
763            params![keep as i64],
764        )?;
765        Ok(deleted)
766    }
767
768    fn map_rss_item(&self, row: &rusqlite::Row<'_>) -> rusqlite::Result<RssItem> {
769        Ok(RssItem {
770            id: row.get(0)?,
771            feed_name: row.get(1)?,
772            title: row.get(2)?,
773            url: row.get(3)?,
774            published_at: row.get::<_, Option<String>>(4)?.map(|s| parse_datetime(&s)),
775            first_seen_at: parse_datetime(&row.get::<_, String>(5)?),
776            downloaded: row.get::<_, i32>(6)? != 0,
777            downloaded_at: row.get::<_, Option<String>>(7)?.map(|s| parse_datetime(&s)),
778            category: row.get(8)?,
779            size_bytes: row.get::<_, i64>(9)? as u64,
780        })
781    }
782
783    // -----------------------------------------------------------------------
784    // RSS rule operations
785    // -----------------------------------------------------------------------
786
787    /// Insert a new RSS download rule.
788    /// feed_names is stored as comma-separated string in the DB.
789    pub fn rss_rule_insert(&self, rule: &RssRule) -> Result<(), NzbError> {
790        let feed_names_str = rule.feed_names.join(",");
791        self.conn.execute(
792            "INSERT INTO rss_rules (id, name, feed_name, category, priority, match_regex, enabled)
793             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
794            params![
795                rule.id,
796                rule.name,
797                feed_names_str,
798                rule.category,
799                rule.priority,
800                rule.match_regex,
801                rule.enabled as i32,
802            ],
803        )?;
804        Ok(())
805    }
806
807    /// List all RSS download rules.
808    pub fn rss_rule_list(&self) -> Result<Vec<RssRule>, NzbError> {
809        let mut stmt = self.conn.prepare(
810            "SELECT id, name, feed_name, category, priority, match_regex, enabled
811             FROM rss_rules ORDER BY name ASC",
812        )?;
813        let rules = stmt
814            .query_map([], |row| {
815                let feed_names_str: String = row.get(2)?;
816                let feed_names: Vec<String> = feed_names_str
817                    .split(',')
818                    .map(|s| s.trim().to_string())
819                    .filter(|s| !s.is_empty())
820                    .collect();
821                Ok(RssRule {
822                    id: row.get(0)?,
823                    name: row.get(1)?,
824                    feed_names,
825                    category: row.get(3)?,
826                    priority: row.get(4)?,
827                    match_regex: row.get(5)?,
828                    enabled: row.get::<_, i32>(6)? != 0,
829                })
830            })?
831            .collect::<Result<Vec<_>, _>>()?;
832        Ok(rules)
833    }
834
835    /// Update an RSS download rule.
836    pub fn rss_rule_update(&self, rule: &RssRule) -> Result<(), NzbError> {
837        let feed_names_str = rule.feed_names.join(",");
838        self.conn.execute(
839            "UPDATE rss_rules SET name=?2, feed_name=?3, category=?4, priority=?5,
840             match_regex=?6, enabled=?7 WHERE id=?1",
841            params![
842                rule.id,
843                rule.name,
844                feed_names_str,
845                rule.category,
846                rule.priority,
847                rule.match_regex,
848                rule.enabled as i32,
849            ],
850        )?;
851        Ok(())
852    }
853
854    /// Delete an RSS download rule.
855    pub fn rss_rule_delete(&self, id: &str) -> Result<(), NzbError> {
856        self.conn
857            .execute("DELETE FROM rss_rules WHERE id=?1", params![id])?;
858        Ok(())
859    }
860}
861
862// ---------------------------------------------------------------------------
863// Parse helpers
864// ---------------------------------------------------------------------------
865
866fn parse_status(s: &str) -> JobStatus {
867    match s.to_lowercase().as_str() {
868        "queued" => JobStatus::Queued,
869        "downloading" => JobStatus::Downloading,
870        "paused" => JobStatus::Paused,
871        "verifying" => JobStatus::Verifying,
872        "repairing" => JobStatus::Repairing,
873        "extracting" => JobStatus::Extracting,
874        "postprocessing" => JobStatus::PostProcessing,
875        "completed" => JobStatus::Completed,
876        "failed" => JobStatus::Failed,
877        _ => JobStatus::Queued,
878    }
879}
880
881fn parse_priority(v: i32) -> Priority {
882    match v {
883        0 => Priority::Low,
884        1 => Priority::Normal,
885        2 => Priority::High,
886        3 => Priority::Force,
887        _ => Priority::Normal,
888    }
889}
890
891fn parse_datetime(s: &str) -> chrono::DateTime<Utc> {
892    chrono::DateTime::parse_from_rfc3339(s)
893        .map(|dt| dt.with_timezone(&Utc))
894        .unwrap_or_else(|_| Utc::now())
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900
901    fn make_job(id: &str, name: &str) -> NzbJob {
902        NzbJob {
903            id: id.into(),
904            name: name.into(),
905            category: "Default".into(),
906            status: JobStatus::Queued,
907            priority: Priority::Normal,
908            total_bytes: 1_000_000,
909            downloaded_bytes: 0,
910            file_count: 3,
911            files_completed: 0,
912            article_count: 30,
913            articles_downloaded: 0,
914            articles_failed: 0,
915            added_at: Utc::now(),
916            completed_at: None,
917            work_dir: "/tmp/test".into(),
918            output_dir: "/downloads/test".into(),
919            password: None,
920            error_message: None,
921            speed_bps: 0,
922            server_stats: Vec::new(),
923            files: Vec::new(),
924        }
925    }
926
927    fn make_history(id: &str, name: &str) -> HistoryEntry {
928        HistoryEntry {
929            id: id.into(),
930            name: name.into(),
931            category: "movies".into(),
932            status: JobStatus::Completed,
933            total_bytes: 5_000_000,
934            downloaded_bytes: 5_000_000,
935            added_at: Utc::now(),
936            completed_at: Utc::now(),
937            output_dir: "/downloads/complete".into(),
938            stages: vec![StageResult {
939                name: "Verify".into(),
940                status: StageStatus::Success,
941                message: None,
942                duration_secs: 2.5,
943            }],
944            error_message: None,
945            server_stats: Vec::new(),
946            nzb_data: None,
947        }
948    }
949
950    fn make_rss_item(id: &str, feed: &str, title: &str) -> RssItem {
951        RssItem {
952            id: id.into(),
953            feed_name: feed.into(),
954            title: title.into(),
955            url: Some("https://example.com/nzb".into()),
956            published_at: Some(Utc::now()),
957            first_seen_at: Utc::now(),
958            downloaded: false,
959            downloaded_at: None,
960            category: None,
961            size_bytes: 1_000_000,
962        }
963    }
964
965    // -----------------------------------------------------------------------
966    // Schema & migration
967    // -----------------------------------------------------------------------
968
969    #[test]
970    fn test_db_create_and_migrate() {
971        let db = Database::open_memory().unwrap();
972        let jobs = db.queue_list().unwrap();
973        assert!(jobs.is_empty());
974    }
975
976    // -----------------------------------------------------------------------
977    // Queue operations
978    // -----------------------------------------------------------------------
979
980    #[test]
981    fn test_queue_insert_and_list() {
982        let db = Database::open_memory().unwrap();
983        let job = make_job("test-123", "Test Download");
984
985        db.queue_insert(&job).unwrap();
986        let jobs = db.queue_list().unwrap();
987        assert_eq!(jobs.len(), 1);
988        assert_eq!(jobs[0].name, "Test Download");
989        assert_eq!(jobs[0].total_bytes, 1_000_000);
990    }
991
992    #[test]
993    fn test_queue_update_progress() {
994        let db = Database::open_memory().unwrap();
995        db.queue_insert(&make_job("q1", "Job 1")).unwrap();
996
997        db.queue_update_progress("q1", JobStatus::Downloading, 500_000, 15, 2, 1)
998            .unwrap();
999
1000        let jobs = db.queue_list().unwrap();
1001        assert_eq!(jobs[0].status, JobStatus::Downloading);
1002        assert_eq!(jobs[0].downloaded_bytes, 500_000);
1003        assert_eq!(jobs[0].articles_downloaded, 15);
1004        assert_eq!(jobs[0].articles_failed, 2);
1005        assert_eq!(jobs[0].files_completed, 1);
1006    }
1007
1008    #[test]
1009    fn test_queue_update_priority() {
1010        let db = Database::open_memory().unwrap();
1011        db.queue_insert(&make_job("q2", "Job 2")).unwrap();
1012
1013        db.queue_update_priority("q2", 3).unwrap();
1014
1015        let jobs = db.queue_list().unwrap();
1016        assert_eq!(jobs[0].priority, Priority::Force);
1017    }
1018
1019    #[test]
1020    fn test_queue_remove() {
1021        let db = Database::open_memory().unwrap();
1022        db.queue_insert(&make_job("q3", "Job 3")).unwrap();
1023        assert_eq!(db.queue_list().unwrap().len(), 1);
1024
1025        db.queue_remove("q3").unwrap();
1026        assert_eq!(db.queue_list().unwrap().len(), 0);
1027    }
1028
1029    #[test]
1030    fn test_queue_ordering() {
1031        let db = Database::open_memory().unwrap();
1032
1033        let mut low = make_job("low", "Low Priority");
1034        low.priority = Priority::Low;
1035        let mut high = make_job("high", "High Priority");
1036        high.priority = Priority::High;
1037        let normal = make_job("normal", "Normal Priority");
1038
1039        db.queue_insert(&low).unwrap();
1040        db.queue_insert(&high).unwrap();
1041        db.queue_insert(&normal).unwrap();
1042
1043        let jobs = db.queue_list().unwrap();
1044        assert_eq!(jobs[0].id, "high");
1045        assert_eq!(jobs[1].id, "normal");
1046        assert_eq!(jobs[2].id, "low");
1047    }
1048
1049    #[test]
1050    fn test_queue_store_and_load_job_data() {
1051        let db = Database::open_memory().unwrap();
1052        db.queue_insert(&make_job("jd1", "Job Data")).unwrap();
1053
1054        let blob = vec![1u8, 2, 3, 4, 5, 6, 7, 8];
1055        db.queue_store_job_data("jd1", &blob).unwrap();
1056
1057        let loaded = db.queue_load_job_data("jd1").unwrap();
1058        assert_eq!(loaded, Some(blob));
1059    }
1060
1061    #[test]
1062    fn test_queue_load_job_data_empty() {
1063        let db = Database::open_memory().unwrap();
1064        db.queue_insert(&make_job("jd2", "No Data")).unwrap();
1065
1066        let loaded = db.queue_load_job_data("jd2").unwrap();
1067        assert!(loaded.is_none());
1068    }
1069
1070    #[test]
1071    fn test_queue_store_and_get_nzb_data() {
1072        let db = Database::open_memory().unwrap();
1073        db.queue_insert(&make_job("nzb1", "NZB Store")).unwrap();
1074
1075        let nzb = b"<nzb>...</nzb>".to_vec();
1076        db.queue_store_nzb_data("nzb1", &nzb).unwrap();
1077
1078        let loaded = db.queue_get_nzb_data("nzb1").unwrap();
1079        assert_eq!(loaded, Some(nzb));
1080    }
1081
1082    #[test]
1083    fn test_queue_get_nzb_data_nonexistent() {
1084        let db = Database::open_memory().unwrap();
1085        let result = db.queue_get_nzb_data("nonexistent").unwrap();
1086        assert!(result.is_none());
1087    }
1088
1089    // -----------------------------------------------------------------------
1090    // History operations
1091    // -----------------------------------------------------------------------
1092
1093    #[test]
1094    fn test_history_insert_and_list() {
1095        let db = Database::open_memory().unwrap();
1096        let entry = make_history("hist-1", "Completed Job");
1097
1098        db.history_insert(&entry).unwrap();
1099        let history = db.history_list(10).unwrap();
1100        assert_eq!(history.len(), 1);
1101        assert_eq!(history[0].name, "Completed Job");
1102        assert_eq!(history[0].stages.len(), 1);
1103    }
1104
1105    #[test]
1106    fn test_history_get_by_id() {
1107        let db = Database::open_memory().unwrap();
1108        db.history_insert(&make_history("h1", "History 1")).unwrap();
1109
1110        let entry = db.history_get("h1").unwrap();
1111        assert!(entry.is_some());
1112        assert_eq!(entry.unwrap().name, "History 1");
1113    }
1114
1115    #[test]
1116    fn test_history_get_nonexistent() {
1117        let db = Database::open_memory().unwrap();
1118        let entry = db.history_get("nonexistent").unwrap();
1119        assert!(entry.is_none());
1120    }
1121
1122    #[test]
1123    fn test_history_get_nzb_data() {
1124        let db = Database::open_memory().unwrap();
1125        let mut entry = make_history("h-nzb", "With NZB");
1126        entry.nzb_data = Some(b"<nzb>data</nzb>".to_vec());
1127        db.history_insert(&entry).unwrap();
1128
1129        let data = db.history_get_nzb_data("h-nzb").unwrap();
1130        assert!(data.is_some());
1131        assert_eq!(data.unwrap(), b"<nzb>data</nzb>");
1132    }
1133
1134    #[test]
1135    fn test_history_get_nzb_data_nonexistent() {
1136        let db = Database::open_memory().unwrap();
1137        let data = db.history_get_nzb_data("missing").unwrap();
1138        assert!(data.is_none());
1139    }
1140
1141    #[test]
1142    fn test_history_count() {
1143        let db = Database::open_memory().unwrap();
1144        assert_eq!(db.history_count().unwrap(), 0);
1145
1146        db.history_insert(&make_history("hc1", "Job 1")).unwrap();
1147        db.history_insert(&make_history("hc2", "Job 2")).unwrap();
1148        db.history_insert(&make_history("hc3", "Job 3")).unwrap();
1149        assert_eq!(db.history_count().unwrap(), 3);
1150    }
1151
1152    #[test]
1153    fn test_history_remove() {
1154        let db = Database::open_memory().unwrap();
1155        db.history_insert(&make_history("hr1", "To Remove"))
1156            .unwrap();
1157        assert_eq!(db.history_count().unwrap(), 1);
1158
1159        db.history_remove("hr1").unwrap();
1160        assert_eq!(db.history_count().unwrap(), 0);
1161    }
1162
1163    #[test]
1164    fn test_history_clear() {
1165        let db = Database::open_memory().unwrap();
1166        db.history_insert(&make_history("hcl1", "Job 1")).unwrap();
1167        db.history_insert(&make_history("hcl2", "Job 2")).unwrap();
1168        assert_eq!(db.history_count().unwrap(), 2);
1169
1170        db.history_clear().unwrap();
1171        assert_eq!(db.history_count().unwrap(), 0);
1172    }
1173
1174    #[test]
1175    fn test_history_enforce_retention() {
1176        let db = Database::open_memory().unwrap();
1177        for i in 0..5 {
1178            db.history_insert(&make_history(&format!("ret-{i}"), &format!("Job {i}")))
1179                .unwrap();
1180        }
1181        assert_eq!(db.history_count().unwrap(), 5);
1182
1183        db.history_enforce_retention(3).unwrap();
1184        assert_eq!(db.history_count().unwrap(), 3);
1185    }
1186
1187    #[test]
1188    fn test_history_store_and_get_logs() {
1189        let db = Database::open_memory().unwrap();
1190        db.history_insert(&make_history("hl1", "With Logs"))
1191            .unwrap();
1192
1193        let logs = r#"[{"ts":"2024-01-01","msg":"Started"}]"#;
1194        db.history_store_logs("hl1", logs).unwrap();
1195
1196        let loaded = db.history_get_logs("hl1").unwrap();
1197        assert_eq!(loaded.as_deref(), Some(logs));
1198    }
1199
1200    #[test]
1201    fn test_history_get_logs_nonexistent() {
1202        let db = Database::open_memory().unwrap();
1203        let result = db.history_get_logs("missing").unwrap();
1204        assert!(result.is_none());
1205    }
1206
1207    #[test]
1208    fn test_history_with_server_stats() {
1209        let db = Database::open_memory().unwrap();
1210        let mut entry = make_history("hss", "Stats Job");
1211        entry.server_stats = vec![ServerArticleStats {
1212            server_id: "srv-1".into(),
1213            server_name: "Provider".into(),
1214            articles_downloaded: 100,
1215            articles_failed: 5,
1216            bytes_downloaded: 75_000_000,
1217        }];
1218        db.history_insert(&entry).unwrap();
1219
1220        let loaded = db.history_list(10).unwrap();
1221        assert_eq!(loaded[0].server_stats.len(), 1);
1222        assert_eq!(loaded[0].server_stats[0].server_id, "srv-1");
1223        assert_eq!(loaded[0].server_stats[0].articles_downloaded, 100);
1224    }
1225
1226    // -----------------------------------------------------------------------
1227    // RSS item operations
1228    // -----------------------------------------------------------------------
1229
1230    #[test]
1231    fn test_rss_item_upsert_and_get() {
1232        let db = Database::open_memory().unwrap();
1233        let item = make_rss_item("rss-1", "feed-a", "Test Title");
1234        db.rss_item_upsert(&item).unwrap();
1235
1236        let loaded = db.rss_item_get("rss-1").unwrap();
1237        assert!(loaded.is_some());
1238        let loaded = loaded.unwrap();
1239        assert_eq!(loaded.title, "Test Title");
1240        assert_eq!(loaded.feed_name, "feed-a");
1241        assert!(!loaded.downloaded);
1242    }
1243
1244    #[test]
1245    fn test_rss_item_upsert_ignores_duplicate() {
1246        let db = Database::open_memory().unwrap();
1247        let item = make_rss_item("dup-1", "feed-a", "Original");
1248        db.rss_item_upsert(&item).unwrap();
1249
1250        // Upsert again with different title — should be ignored (INSERT OR IGNORE)
1251        let item2 = make_rss_item("dup-1", "feed-a", "Updated");
1252        db.rss_item_upsert(&item2).unwrap();
1253
1254        let loaded = db.rss_item_get("dup-1").unwrap().unwrap();
1255        assert_eq!(loaded.title, "Original"); // Not updated
1256    }
1257
1258    #[test]
1259    fn test_rss_item_exists() {
1260        let db = Database::open_memory().unwrap();
1261        assert!(!db.rss_item_exists("rss-x").unwrap());
1262
1263        db.rss_item_upsert(&make_rss_item("rss-x", "feed", "Title"))
1264            .unwrap();
1265        assert!(db.rss_item_exists("rss-x").unwrap());
1266    }
1267
1268    #[test]
1269    fn test_rss_items_batch_upsert() {
1270        let db = Database::open_memory().unwrap();
1271        let items = vec![
1272            make_rss_item("b1", "feed-a", "Title 1"),
1273            make_rss_item("b2", "feed-a", "Title 2"),
1274            make_rss_item("b3", "feed-b", "Title 3"),
1275        ];
1276
1277        let inserted = db.rss_items_batch_upsert(&items).unwrap();
1278        assert_eq!(inserted, 3);
1279        assert_eq!(db.rss_item_count().unwrap(), 3);
1280
1281        // Batch again — duplicates ignored
1282        let inserted2 = db.rss_items_batch_upsert(&items).unwrap();
1283        assert_eq!(inserted2, 0);
1284        assert_eq!(db.rss_item_count().unwrap(), 3);
1285    }
1286
1287    #[test]
1288    fn test_rss_items_list_all() {
1289        let db = Database::open_memory().unwrap();
1290        db.rss_item_upsert(&make_rss_item("la1", "feed-a", "A1"))
1291            .unwrap();
1292        db.rss_item_upsert(&make_rss_item("la2", "feed-b", "B1"))
1293            .unwrap();
1294
1295        let items = db.rss_items_list(None, 100).unwrap();
1296        assert_eq!(items.len(), 2);
1297    }
1298
1299    #[test]
1300    fn test_rss_items_list_filtered() {
1301        let db = Database::open_memory().unwrap();
1302        db.rss_item_upsert(&make_rss_item("lf1", "feed-a", "A Item"))
1303            .unwrap();
1304        db.rss_item_upsert(&make_rss_item("lf2", "feed-b", "B Item"))
1305            .unwrap();
1306        db.rss_item_upsert(&make_rss_item("lf3", "feed-a", "A Item 2"))
1307            .unwrap();
1308
1309        let items = db.rss_items_list(Some("feed-a"), 100).unwrap();
1310        assert_eq!(items.len(), 2);
1311        for item in &items {
1312            assert_eq!(item.feed_name, "feed-a");
1313        }
1314    }
1315
1316    #[test]
1317    fn test_rss_item_mark_downloaded() {
1318        let db = Database::open_memory().unwrap();
1319        db.rss_item_upsert(&make_rss_item("md1", "feed", "Title"))
1320            .unwrap();
1321
1322        db.rss_item_mark_downloaded("md1", Some("movies")).unwrap();
1323
1324        let loaded = db.rss_item_get("md1").unwrap().unwrap();
1325        assert!(loaded.downloaded);
1326        assert!(loaded.downloaded_at.is_some());
1327        assert_eq!(loaded.category.as_deref(), Some("movies"));
1328    }
1329
1330    #[test]
1331    fn test_rss_item_count() {
1332        let db = Database::open_memory().unwrap();
1333        assert_eq!(db.rss_item_count().unwrap(), 0);
1334
1335        db.rss_item_upsert(&make_rss_item("c1", "f", "T1")).unwrap();
1336        db.rss_item_upsert(&make_rss_item("c2", "f", "T2")).unwrap();
1337        assert_eq!(db.rss_item_count().unwrap(), 2);
1338    }
1339
1340    #[test]
1341    fn test_rss_items_prune() {
1342        let db = Database::open_memory().unwrap();
1343        for i in 0..10 {
1344            db.rss_item_upsert(&make_rss_item(&format!("pr-{i}"), "f", &format!("T{i}")))
1345                .unwrap();
1346        }
1347        assert_eq!(db.rss_item_count().unwrap(), 10);
1348
1349        let deleted = db.rss_items_prune(5).unwrap();
1350        assert_eq!(deleted, 5);
1351        assert_eq!(db.rss_item_count().unwrap(), 5);
1352    }
1353
1354    // -----------------------------------------------------------------------
1355    // RSS rule operations
1356    // -----------------------------------------------------------------------
1357
1358    #[test]
1359    fn test_rss_rule_insert_and_list() {
1360        let db = Database::open_memory().unwrap();
1361        let rule = RssRule {
1362            id: "rule-1".into(),
1363            name: "Movies".into(),
1364            feed_names: vec!["feed-a".into(), "feed-b".into()],
1365            category: Some("movies".into()),
1366            priority: 2,
1367            match_regex: ".*1080p.*".into(),
1368            enabled: true,
1369        };
1370
1371        db.rss_rule_insert(&rule).unwrap();
1372        let rules = db.rss_rule_list().unwrap();
1373        assert_eq!(rules.len(), 1);
1374        assert_eq!(rules[0].name, "Movies");
1375        assert_eq!(rules[0].feed_names, vec!["feed-a", "feed-b"]);
1376        assert_eq!(rules[0].match_regex, ".*1080p.*");
1377        assert!(rules[0].enabled);
1378    }
1379
1380    #[test]
1381    fn test_rss_rule_update() {
1382        let db = Database::open_memory().unwrap();
1383        let rule = RssRule {
1384            id: "rule-u".into(),
1385            name: "Original".into(),
1386            feed_names: vec!["feed-a".into()],
1387            category: None,
1388            priority: 1,
1389            match_regex: ".*".into(),
1390            enabled: true,
1391        };
1392        db.rss_rule_insert(&rule).unwrap();
1393
1394        let updated = RssRule {
1395            id: "rule-u".into(),
1396            name: "Updated".into(),
1397            feed_names: vec!["feed-a".into(), "feed-c".into()],
1398            category: Some("tv".into()),
1399            priority: 3,
1400            match_regex: ".*2160p.*".into(),
1401            enabled: false,
1402        };
1403        db.rss_rule_update(&updated).unwrap();
1404
1405        let rules = db.rss_rule_list().unwrap();
1406        assert_eq!(rules[0].name, "Updated");
1407        assert_eq!(rules[0].feed_names, vec!["feed-a", "feed-c"]);
1408        assert_eq!(rules[0].category.as_deref(), Some("tv"));
1409        assert!(!rules[0].enabled);
1410    }
1411
1412    #[test]
1413    fn test_rss_rule_delete() {
1414        let db = Database::open_memory().unwrap();
1415        let rule = RssRule {
1416            id: "rule-d".into(),
1417            name: "Delete Me".into(),
1418            feed_names: vec!["f".into()],
1419            category: None,
1420            priority: 1,
1421            match_regex: ".*".into(),
1422            enabled: true,
1423        };
1424        db.rss_rule_insert(&rule).unwrap();
1425        assert_eq!(db.rss_rule_list().unwrap().len(), 1);
1426
1427        db.rss_rule_delete("rule-d").unwrap();
1428        assert_eq!(db.rss_rule_list().unwrap().len(), 0);
1429    }
1430
1431    #[test]
1432    fn test_settings_get_set() {
1433        let db = Database::open_memory().unwrap();
1434        db.set_setting("theme", "dark");
1435        assert_eq!(db.get_setting("theme"), Some("dark".to_string()));
1436    }
1437
1438    #[test]
1439    fn test_settings_upsert() {
1440        let db = Database::open_memory().unwrap();
1441        db.set_setting("speed", "100");
1442        db.set_setting("speed", "200");
1443        assert_eq!(db.get_setting("speed"), Some("200".to_string()));
1444    }
1445
1446    #[test]
1447    fn test_settings_missing_key() {
1448        let db = Database::open_memory().unwrap();
1449        assert_eq!(db.get_setting("nonexistent"), None);
1450    }
1451}