Skip to main content

usenet_dl/db/
migrations.rs

1//! Database lifecycle and schema migrations.
2
3use crate::error::DatabaseError;
4use crate::{Error, Result};
5use sqlx::SqliteConnection;
6use sqlx::sqlite::SqlitePool;
7use std::path::Path;
8
9use super::Database;
10
11impl Database {
12    /// Create a new database connection
13    ///
14    /// Creates the database file if it doesn't exist and runs migrations.
15    pub async fn new(path: &Path) -> Result<Self> {
16        // Create parent directory if it doesn't exist
17        if let Some(parent) = path.parent() {
18            tokio::fs::create_dir_all(parent).await.map_err(|e| {
19                Error::Database(DatabaseError::ConnectionFailed(format!(
20                    "Failed to create database directory: {}",
21                    e
22                )))
23            })?;
24        }
25
26        // Connect to database with foreign key enforcement and WAL mode
27        use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
28        use std::str::FromStr;
29
30        let options = SqliteConnectOptions::from_str(&format!("sqlite:{}", path.display()))
31            .map_err(|e| {
32                Error::Database(DatabaseError::ConnectionFailed(format!(
33                    "Failed to parse database path: {}",
34                    e
35                )))
36            })?
37            .create_if_missing(true)
38            .foreign_keys(true)
39            .journal_mode(SqliteJournalMode::Wal);
40
41        let pool = SqlitePool::connect_with(options).await.map_err(|e| {
42            Error::Database(DatabaseError::ConnectionFailed(format!(
43                "Failed to connect to database: {}",
44                e
45            )))
46        })?;
47
48        let db = Self { pool };
49
50        // Run migrations
51        db.run_migrations().await?;
52
53        Ok(db)
54    }
55
56    /// Run database migrations
57    async fn run_migrations(&self) -> Result<()> {
58        let mut conn = self.pool.acquire().await.map_err(|e| {
59            Error::Database(DatabaseError::ConnectionFailed(format!(
60                "Failed to acquire connection: {}",
61                e
62            )))
63        })?;
64
65        // Create schema version table
66        sqlx::query(
67            r#"
68            CREATE TABLE IF NOT EXISTS schema_version (
69                version INTEGER PRIMARY KEY,
70                applied_at INTEGER NOT NULL
71            )
72            "#,
73        )
74        .execute(&mut *conn)
75        .await
76        .map_err(|e| {
77            Error::Database(DatabaseError::MigrationFailed(format!(
78                "Failed to create schema_version table: {}",
79                e
80            )))
81        })?;
82
83        // Check current version
84        let current_version: Option<i64> =
85            sqlx::query_scalar("SELECT MAX(version) FROM schema_version")
86                .fetch_optional(&mut *conn)
87                .await
88                .map_err(|e| {
89                    Error::Database(DatabaseError::QueryFailed(format!(
90                        "Failed to query schema version: {}",
91                        e
92                    )))
93                })?;
94
95        let current_version = current_version.unwrap_or(0);
96
97        // Apply migrations
98        if current_version < 1 {
99            Self::migrate_v1(&mut conn).await?;
100        }
101        if current_version < 2 {
102            Self::migrate_v2(&mut conn).await?;
103        }
104        if current_version < 3 {
105            Self::migrate_v3(&mut conn).await?;
106        }
107        if current_version < 4 {
108            Self::migrate_v4(&mut conn).await?;
109        }
110        if current_version < 5 {
111            Self::migrate_v5(&mut conn).await?;
112        }
113        if current_version < 6 {
114            Self::migrate_v6(&mut conn).await?;
115        }
116        if current_version < 7 {
117            Self::migrate_v7(&mut conn).await?;
118        }
119
120        Ok(())
121    }
122
123    /// Migration v1: Create initial schema
124    async fn migrate_v1(conn: &mut SqliteConnection) -> Result<()> {
125        tracing::info!("Applying database migration v1");
126
127        // Wrap migration in a transaction so partial failures don't leave the DB in a broken state
128        sqlx::query("BEGIN")
129            .execute(&mut *conn)
130            .await
131            .map_err(|e| {
132                Error::Database(DatabaseError::MigrationFailed(format!(
133                    "Failed to begin transaction: {}",
134                    e
135                )))
136            })?;
137
138        let result = async {
139            Self::create_downloads_schema(conn).await?;
140            Self::create_articles_schema(conn).await?;
141            Self::create_passwords_table(conn).await?;
142            Self::create_processed_nzbs_table(conn).await?;
143            Self::create_history_schema(conn).await?;
144            Self::record_migration(conn, 1).await?;
145            Ok::<(), Error>(())
146        }
147        .await;
148
149        match result {
150            Ok(()) => {
151                sqlx::query("COMMIT")
152                    .execute(&mut *conn)
153                    .await
154                    .map_err(|e| {
155                        Error::Database(DatabaseError::MigrationFailed(format!(
156                            "Failed to commit migration v1: {}",
157                            e
158                        )))
159                    })?;
160            }
161            Err(e) => {
162                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
163                return Err(e);
164            }
165        }
166
167        tracing::info!("Database migration v1 complete");
168        Ok(())
169    }
170
171    /// Create downloads table and its indexes
172    async fn create_downloads_schema(conn: &mut SqliteConnection) -> Result<()> {
173        sqlx::query(
174            r#"
175            CREATE TABLE downloads (
176                id INTEGER PRIMARY KEY AUTOINCREMENT,
177                name TEXT NOT NULL,
178                nzb_path TEXT NOT NULL,
179                nzb_meta_name TEXT,
180                nzb_hash TEXT,
181                job_name TEXT,
182                category TEXT,
183                destination TEXT NOT NULL,
184                post_process INTEGER NOT NULL,
185                priority INTEGER NOT NULL DEFAULT 0,
186                status INTEGER NOT NULL DEFAULT 0,
187                progress REAL DEFAULT 0.0,
188                speed_bps INTEGER DEFAULT 0,
189                size_bytes INTEGER DEFAULT 0,
190                downloaded_bytes INTEGER DEFAULT 0,
191                error_message TEXT,
192                created_at INTEGER NOT NULL,
193                started_at INTEGER,
194                completed_at INTEGER
195            )
196            "#,
197        )
198        .execute(&mut *conn)
199        .await
200        .map_err(|e| {
201            Error::Database(DatabaseError::MigrationFailed(format!(
202                "Failed to create downloads table: {}",
203                e
204            )))
205        })?;
206
207        // Create indexes
208        sqlx::query("CREATE INDEX idx_downloads_status ON downloads(status)")
209            .execute(&mut *conn)
210            .await
211            .map_err(|e| {
212                Error::Database(DatabaseError::MigrationFailed(format!(
213                    "Failed to create index: {}",
214                    e
215                )))
216            })?;
217
218        sqlx::query(
219            "CREATE INDEX idx_downloads_priority ON downloads(priority DESC, created_at ASC)",
220        )
221        .execute(&mut *conn)
222        .await
223        .map_err(|e| {
224            Error::Database(DatabaseError::MigrationFailed(format!(
225                "Failed to create index: {}",
226                e
227            )))
228        })?;
229
230        sqlx::query("CREATE INDEX idx_downloads_nzb_hash ON downloads(nzb_hash)")
231            .execute(&mut *conn)
232            .await
233            .map_err(|e| {
234                Error::Database(DatabaseError::MigrationFailed(format!(
235                    "Failed to create index: {}",
236                    e
237                )))
238            })?;
239
240        sqlx::query("CREATE INDEX idx_downloads_job_name ON downloads(job_name)")
241            .execute(&mut *conn)
242            .await
243            .map_err(|e| {
244                Error::Database(DatabaseError::MigrationFailed(format!(
245                    "Failed to create index: {}",
246                    e
247                )))
248            })?;
249
250        Ok(())
251    }
252
253    /// Create download_articles table and its indexes
254    async fn create_articles_schema(conn: &mut SqliteConnection) -> Result<()> {
255        sqlx::query(
256            r#"
257            CREATE TABLE download_articles (
258                id INTEGER PRIMARY KEY AUTOINCREMENT,
259                download_id INTEGER NOT NULL REFERENCES downloads(id) ON DELETE CASCADE,
260                message_id TEXT NOT NULL,
261                segment_number INTEGER NOT NULL,
262                size_bytes INTEGER NOT NULL,
263                status INTEGER NOT NULL DEFAULT 0,
264                downloaded_at INTEGER,
265                UNIQUE(download_id, message_id)
266            )
267            "#,
268        )
269        .execute(&mut *conn)
270        .await
271        .map_err(|e| {
272            Error::Database(DatabaseError::MigrationFailed(format!(
273                "Failed to create download_articles table: {}",
274                e
275            )))
276        })?;
277
278        // Create indexes
279        sqlx::query("CREATE INDEX idx_articles_download ON download_articles(download_id)")
280            .execute(&mut *conn)
281            .await
282            .map_err(|e| {
283                Error::Database(DatabaseError::MigrationFailed(format!(
284                    "Failed to create index: {}",
285                    e
286                )))
287            })?;
288
289        sqlx::query("CREATE INDEX idx_articles_status ON download_articles(download_id, status)")
290            .execute(&mut *conn)
291            .await
292            .map_err(|e| {
293                Error::Database(DatabaseError::MigrationFailed(format!(
294                    "Failed to create index: {}",
295                    e
296                )))
297            })?;
298
299        Ok(())
300    }
301
302    /// Create passwords table
303    async fn create_passwords_table(conn: &mut SqliteConnection) -> Result<()> {
304        sqlx::query(
305            r#"
306            CREATE TABLE passwords (
307                download_id INTEGER PRIMARY KEY REFERENCES downloads(id) ON DELETE CASCADE,
308                correct_password TEXT NOT NULL
309            )
310            "#,
311        )
312        .execute(&mut *conn)
313        .await
314        .map_err(|e| {
315            Error::Database(DatabaseError::MigrationFailed(format!(
316                "Failed to create passwords table: {}",
317                e
318            )))
319        })?;
320
321        Ok(())
322    }
323
324    /// Create processed_nzbs table
325    async fn create_processed_nzbs_table(conn: &mut SqliteConnection) -> Result<()> {
326        sqlx::query(
327            r#"
328            CREATE TABLE processed_nzbs (
329                path TEXT PRIMARY KEY,
330                processed_at INTEGER NOT NULL
331            )
332            "#,
333        )
334        .execute(&mut *conn)
335        .await
336        .map_err(|e| {
337            Error::Database(DatabaseError::MigrationFailed(format!(
338                "Failed to create processed_nzbs table: {}",
339                e
340            )))
341        })?;
342
343        Ok(())
344    }
345
346    /// Create history table and its index
347    async fn create_history_schema(conn: &mut SqliteConnection) -> Result<()> {
348        sqlx::query(
349            r#"
350            CREATE TABLE history (
351                id INTEGER PRIMARY KEY AUTOINCREMENT,
352                name TEXT NOT NULL,
353                category TEXT,
354                destination TEXT,
355                status INTEGER NOT NULL,
356                size_bytes INTEGER,
357                download_time_secs INTEGER,
358                completed_at INTEGER NOT NULL
359            )
360            "#,
361        )
362        .execute(&mut *conn)
363        .await
364        .map_err(|e| {
365            Error::Database(DatabaseError::MigrationFailed(format!(
366                "Failed to create history table: {}",
367                e
368            )))
369        })?;
370
371        // Create index
372        sqlx::query("CREATE INDEX idx_history_completed ON history(completed_at DESC)")
373            .execute(&mut *conn)
374            .await
375            .map_err(|e| {
376                Error::Database(DatabaseError::MigrationFailed(format!(
377                    "Failed to create index: {}",
378                    e
379                )))
380            })?;
381
382        Ok(())
383    }
384
385    /// Record a migration version
386    async fn record_migration(conn: &mut SqliteConnection, version: i32) -> Result<()> {
387        let now = chrono::Utc::now().timestamp();
388        sqlx::query("INSERT INTO schema_version (version, applied_at) VALUES (?, ?)")
389            .bind(version)
390            .bind(now)
391            .execute(&mut *conn)
392            .await
393            .map_err(|e| {
394                Error::Database(DatabaseError::MigrationFailed(format!(
395                    "Failed to record migration: {}",
396                    e
397                )))
398            })?;
399
400        Ok(())
401    }
402
403    /// Migration v2: Add runtime state table for shutdown tracking
404    async fn migrate_v2(conn: &mut SqliteConnection) -> Result<()> {
405        tracing::info!("Applying database migration v2");
406
407        sqlx::query("BEGIN")
408            .execute(&mut *conn)
409            .await
410            .map_err(|e| {
411                Error::Database(DatabaseError::MigrationFailed(format!(
412                    "Failed to begin transaction: {}",
413                    e
414                )))
415            })?;
416
417        let result = async {
418            // Runtime state table for tracking clean/unclean shutdown
419            sqlx::query(
420                r#"
421                CREATE TABLE IF NOT EXISTS runtime_state (
422                    key TEXT PRIMARY KEY,
423                    value TEXT NOT NULL,
424                    updated_at INTEGER NOT NULL
425                )
426                "#,
427            )
428            .execute(&mut *conn)
429            .await
430            .map_err(|e| {
431                Error::Database(DatabaseError::MigrationFailed(format!(
432                    "Failed to create runtime_state table: {}",
433                    e
434                )))
435            })?;
436
437            // Initialize shutdown state as unclean (will be set to clean on proper startup)
438            let now = chrono::Utc::now().timestamp();
439            sqlx::query(
440                r#"
441                INSERT INTO runtime_state (key, value, updated_at)
442                VALUES ('clean_shutdown', 'false', ?)
443                "#,
444            )
445            .bind(now)
446            .execute(&mut *conn)
447            .await
448            .map_err(|e| {
449                Error::Database(DatabaseError::MigrationFailed(format!(
450                    "Failed to initialize runtime_state: {}",
451                    e
452                )))
453            })?;
454
455            // Record migration
456            Self::record_migration(conn, 2).await?;
457            Ok::<(), Error>(())
458        }
459        .await;
460
461        match result {
462            Ok(()) => {
463                sqlx::query("COMMIT")
464                    .execute(&mut *conn)
465                    .await
466                    .map_err(|e| {
467                        Error::Database(DatabaseError::MigrationFailed(format!(
468                            "Failed to commit migration v2: {}",
469                            e
470                        )))
471                    })?;
472            }
473            Err(e) => {
474                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
475                return Err(e);
476            }
477        }
478
479        tracing::info!("Database migration v2 complete");
480        Ok(())
481    }
482
483    /// Migration v3: Add RSS feed tables
484    async fn migrate_v3(conn: &mut SqliteConnection) -> Result<()> {
485        tracing::info!("Applying database migration v3");
486
487        sqlx::query("BEGIN")
488            .execute(&mut *conn)
489            .await
490            .map_err(|e| {
491                Error::Database(DatabaseError::MigrationFailed(format!(
492                    "Failed to begin transaction: {}",
493                    e
494                )))
495            })?;
496
497        let result = async {
498            // RSS feeds table
499            sqlx::query(
500                r#"
501                CREATE TABLE IF NOT EXISTS rss_feeds (
502                    id INTEGER PRIMARY KEY AUTOINCREMENT,
503                    name TEXT NOT NULL,
504                    url TEXT NOT NULL,
505                    check_interval_secs INTEGER NOT NULL DEFAULT 900,
506                    category TEXT,
507                    auto_download INTEGER NOT NULL DEFAULT 1,
508                    priority INTEGER NOT NULL DEFAULT 0,
509                    enabled INTEGER NOT NULL DEFAULT 1,
510                    last_check INTEGER,
511                    last_error TEXT,
512                    created_at INTEGER NOT NULL
513                )
514                "#,
515            )
516            .execute(&mut *conn)
517            .await
518            .map_err(|e| {
519                Error::Database(DatabaseError::MigrationFailed(format!(
520                    "Failed to create rss_feeds table: {}",
521                    e
522                )))
523            })?;
524
525            // RSS filters table (per feed)
526            sqlx::query(
527                r#"
528                CREATE TABLE IF NOT EXISTS rss_filters (
529                    id INTEGER PRIMARY KEY AUTOINCREMENT,
530                    feed_id INTEGER NOT NULL REFERENCES rss_feeds(id) ON DELETE CASCADE,
531                    name TEXT NOT NULL,
532                    include_patterns TEXT,
533                    exclude_patterns TEXT,
534                    min_size INTEGER,
535                    max_size INTEGER,
536                    max_age_secs INTEGER
537                )
538                "#,
539            )
540            .execute(&mut *conn)
541            .await
542            .map_err(|e| {
543                Error::Database(DatabaseError::MigrationFailed(format!(
544                    "Failed to create rss_filters table: {}",
545                    e
546                )))
547            })?;
548
549            // RSS seen items table (prevent re-downloading)
550            sqlx::query(
551                r#"
552                CREATE TABLE IF NOT EXISTS rss_seen (
553                    feed_id INTEGER NOT NULL REFERENCES rss_feeds(id) ON DELETE CASCADE,
554                    guid TEXT NOT NULL,
555                    seen_at INTEGER NOT NULL,
556                    PRIMARY KEY (feed_id, guid)
557                )
558                "#,
559            )
560            .execute(&mut *conn)
561            .await
562            .map_err(|e| {
563                Error::Database(DatabaseError::MigrationFailed(format!(
564                    "Failed to create rss_seen table: {}",
565                    e
566                )))
567            })?;
568
569            // Record migration
570            Self::record_migration(conn, 3).await?;
571            Ok::<(), Error>(())
572        }
573        .await;
574
575        match result {
576            Ok(()) => {
577                sqlx::query("COMMIT")
578                    .execute(&mut *conn)
579                    .await
580                    .map_err(|e| {
581                        Error::Database(DatabaseError::MigrationFailed(format!(
582                            "Failed to commit migration v3: {}",
583                            e
584                        )))
585                    })?;
586            }
587            Err(e) => {
588                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
589                return Err(e);
590            }
591        }
592
593        tracing::info!("Database migration v3 complete");
594        Ok(())
595    }
596
597    /// Migration v4: Add download_files table and file_index column to download_articles
598    async fn migrate_v4(conn: &mut SqliteConnection) -> Result<()> {
599        tracing::info!("Applying database migration v4");
600
601        sqlx::query("BEGIN")
602            .execute(&mut *conn)
603            .await
604            .map_err(|e| {
605                Error::Database(DatabaseError::MigrationFailed(format!(
606                    "Failed to begin transaction: {}",
607                    e
608                )))
609            })?;
610
611        let result = async {
612            // File-level metadata table for NZB files
613            sqlx::query(
614                r#"
615                CREATE TABLE download_files (
616                    id INTEGER PRIMARY KEY AUTOINCREMENT,
617                    download_id INTEGER NOT NULL REFERENCES downloads(id) ON DELETE CASCADE,
618                    file_index INTEGER NOT NULL,
619                    filename TEXT NOT NULL,
620                    subject TEXT,
621                    total_segments INTEGER NOT NULL,
622                    UNIQUE(download_id, file_index)
623                )
624                "#,
625            )
626            .execute(&mut *conn)
627            .await
628            .map_err(|e| {
629                Error::Database(DatabaseError::MigrationFailed(format!(
630                    "Failed to create download_files table: {}",
631                    e
632                )))
633            })?;
634
635            sqlx::query("CREATE INDEX idx_download_files_download ON download_files(download_id)")
636                .execute(&mut *conn)
637                .await
638                .map_err(|e| {
639                    Error::Database(DatabaseError::MigrationFailed(format!(
640                        "Failed to create index: {}",
641                        e
642                    )))
643                })?;
644
645            // Add file_index column to existing download_articles table
646            sqlx::query(
647                "ALTER TABLE download_articles ADD COLUMN file_index INTEGER NOT NULL DEFAULT 0",
648            )
649            .execute(&mut *conn)
650            .await
651            .map_err(|e| {
652                Error::Database(DatabaseError::MigrationFailed(format!(
653                    "Failed to add file_index column: {}",
654                    e
655                )))
656            })?;
657
658            // Record migration
659            Self::record_migration(conn, 4).await?;
660            Ok::<(), Error>(())
661        }
662        .await;
663
664        match result {
665            Ok(()) => {
666                sqlx::query("COMMIT")
667                    .execute(&mut *conn)
668                    .await
669                    .map_err(|e| {
670                        Error::Database(DatabaseError::MigrationFailed(format!(
671                            "Failed to commit migration v4: {}",
672                            e
673                        )))
674                    })?;
675            }
676            Err(e) => {
677                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
678                return Err(e);
679            }
680        }
681
682        tracing::info!("Database migration v4 complete");
683        Ok(())
684    }
685
686    /// Migration v5: Add DirectUnpack support columns
687    async fn migrate_v5(conn: &mut SqliteConnection) -> Result<()> {
688        tracing::info!("Applying database migration v5");
689
690        sqlx::query("BEGIN")
691            .execute(&mut *conn)
692            .await
693            .map_err(|e| {
694                Error::Database(DatabaseError::MigrationFailed(format!(
695                    "Failed to begin transaction: {}",
696                    e
697                )))
698            })?;
699
700        let result = async {
701            // Add direct_unpack_state to downloads table
702            sqlx::query(
703                "ALTER TABLE downloads ADD COLUMN direct_unpack_state INTEGER NOT NULL DEFAULT 0",
704            )
705            .execute(&mut *conn)
706            .await
707            .map_err(|e| {
708                Error::Database(DatabaseError::MigrationFailed(format!(
709                    "Failed to add direct_unpack_state column: {}",
710                    e
711                )))
712            })?;
713
714            // Add completed flag to download_files table
715            sqlx::query(
716                "ALTER TABLE download_files ADD COLUMN completed INTEGER NOT NULL DEFAULT 0",
717            )
718            .execute(&mut *conn)
719            .await
720            .map_err(|e| {
721                Error::Database(DatabaseError::MigrationFailed(format!(
722                    "Failed to add completed column: {}",
723                    e
724                )))
725            })?;
726
727            // Add original_filename to download_files table (for DirectRename)
728            sqlx::query("ALTER TABLE download_files ADD COLUMN original_filename TEXT")
729                .execute(&mut *conn)
730                .await
731                .map_err(|e| {
732                    Error::Database(DatabaseError::MigrationFailed(format!(
733                        "Failed to add original_filename column: {}",
734                        e
735                    )))
736                })?;
737
738            // Index for efficiently finding completed files per download
739            sqlx::query(
740                "CREATE INDEX idx_download_files_completed ON download_files(download_id, completed)",
741            )
742            .execute(&mut *conn)
743            .await
744            .map_err(|e| {
745                Error::Database(DatabaseError::MigrationFailed(format!(
746                    "Failed to create index: {}",
747                    e
748                )))
749            })?;
750
751            // Record migration
752            Self::record_migration(conn, 5).await?;
753            Ok::<(), Error>(())
754        }
755        .await;
756
757        match result {
758            Ok(()) => {
759                sqlx::query("COMMIT")
760                    .execute(&mut *conn)
761                    .await
762                    .map_err(|e| {
763                        Error::Database(DatabaseError::MigrationFailed(format!(
764                            "Failed to commit migration v5: {}",
765                            e
766                        )))
767                    })?;
768            }
769            Err(e) => {
770                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
771                return Err(e);
772            }
773        }
774
775        tracing::info!("Database migration v5 complete");
776        Ok(())
777    }
778
779    /// Migration v6: Add direct_unpack_extracted_count column
780    async fn migrate_v6(conn: &mut SqliteConnection) -> Result<()> {
781        tracing::info!("Applying database migration v6");
782
783        sqlx::query("BEGIN")
784            .execute(&mut *conn)
785            .await
786            .map_err(|e| {
787                Error::Database(DatabaseError::MigrationFailed(format!(
788                    "Failed to begin transaction: {}",
789                    e
790                )))
791            })?;
792
793        let result = async {
794            sqlx::query(
795                "ALTER TABLE downloads ADD COLUMN direct_unpack_extracted_count INTEGER NOT NULL DEFAULT 0",
796            )
797            .execute(&mut *conn)
798            .await
799            .map_err(|e| {
800                Error::Database(DatabaseError::MigrationFailed(format!(
801                    "Failed to add direct_unpack_extracted_count column: {}",
802                    e
803                )))
804            })?;
805
806            Self::record_migration(conn, 6).await?;
807            Ok::<(), Error>(())
808        }
809        .await;
810
811        match result {
812            Ok(()) => {
813                sqlx::query("COMMIT")
814                    .execute(&mut *conn)
815                    .await
816                    .map_err(|e| {
817                        Error::Database(DatabaseError::MigrationFailed(format!(
818                            "Failed to commit migration v6: {}",
819                            e
820                        )))
821                    })?;
822            }
823            Err(e) => {
824                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
825                return Err(e);
826            }
827        }
828
829        tracing::info!("Database migration v6 complete");
830        Ok(())
831    }
832
833    /// Migration v7: Persist per-file paused state.
834    async fn migrate_v7(conn: &mut SqliteConnection) -> Result<()> {
835        tracing::info!("Applying database migration v7");
836
837        sqlx::query("BEGIN")
838            .execute(&mut *conn)
839            .await
840            .map_err(|e| {
841                Error::Database(DatabaseError::MigrationFailed(format!(
842                    "Failed to begin transaction: {}",
843                    e
844                )))
845            })?;
846
847        let result = async {
848            sqlx::query("ALTER TABLE download_files ADD COLUMN paused INTEGER NOT NULL DEFAULT 0")
849                .execute(&mut *conn)
850                .await
851                .map_err(|e| {
852                    Error::Database(DatabaseError::MigrationFailed(format!(
853                        "Failed to add paused column: {}",
854                        e
855                    )))
856                })?;
857
858            sqlx::query(
859                "CREATE INDEX idx_download_files_paused ON download_files(download_id, paused)",
860            )
861            .execute(&mut *conn)
862            .await
863            .map_err(|e| {
864                Error::Database(DatabaseError::MigrationFailed(format!(
865                    "Failed to create paused index: {}",
866                    e
867                )))
868            })?;
869
870            Self::record_migration(conn, 7).await?;
871            Ok::<(), Error>(())
872        }
873        .await;
874
875        match result {
876            Ok(()) => {
877                sqlx::query("COMMIT")
878                    .execute(&mut *conn)
879                    .await
880                    .map_err(|e| {
881                        Error::Database(DatabaseError::MigrationFailed(format!(
882                            "Failed to commit migration v7: {}",
883                            e
884                        )))
885                    })?;
886            }
887            Err(e) => {
888                let _ = sqlx::query("ROLLBACK").execute(&mut *conn).await;
889                return Err(e);
890            }
891        }
892
893        tracing::info!("Database migration v7 complete");
894        Ok(())
895    }
896
897    /// Close the database connection
898    pub async fn close(self) {
899        self.pool.close().await;
900    }
901
902    /// Get the underlying connection pool
903    pub fn pool(&self) -> &SqlitePool {
904        &self.pool
905    }
906}