Skip to main content

usenet_dl/db/
articles.rs

1//! Article-level tracking operations for download resume support.
2
3use crate::error::DatabaseError;
4use crate::types::DownloadId;
5use crate::{Error, Result};
6
7use super::{Article, Database, DownloadFile, NewArticle, article_status};
8
9impl Database {
10    /// Insert a single article
11    pub async fn insert_article(&self, article: &NewArticle) -> Result<i64> {
12        let result = sqlx::query(
13            r#"
14            INSERT INTO download_articles (
15                download_id, message_id, segment_number, file_index, size_bytes, status
16            ) VALUES (?, ?, ?, ?, ?, 0)
17            "#,
18        )
19        .bind(article.download_id)
20        .bind(&article.message_id)
21        .bind(article.segment_number)
22        .bind(article.file_index)
23        .bind(article.size_bytes)
24        .execute(&self.pool)
25        .await
26        .map_err(|e| {
27            Error::Database(DatabaseError::QueryFailed(format!(
28                "Failed to insert article: {}",
29                e
30            )))
31        })?;
32
33        Ok(result.last_insert_rowid())
34    }
35
36    /// Insert multiple articles in a batch (more efficient for large NZB files)
37    ///
38    /// Automatically chunks the input to stay within SQLite's bind variable limit
39    /// (5 variables per article, chunked to max 199 articles per INSERT).
40    pub async fn insert_articles_batch(&self, articles: &[NewArticle]) -> Result<()> {
41        if articles.is_empty() {
42            return Ok(());
43        }
44
45        // SQLite default SQLITE_MAX_VARIABLE_NUMBER is 999.
46        // Each article uses 6 bind variables, so max 166 articles per batch.
47        const MAX_ARTICLES_PER_BATCH: usize = 166;
48
49        for chunk in articles.chunks(MAX_ARTICLES_PER_BATCH) {
50            let mut query_builder = sqlx::QueryBuilder::new(
51                "INSERT INTO download_articles (download_id, message_id, segment_number, file_index, size_bytes, status) ",
52            );
53
54            query_builder.push_values(chunk, |mut b, article| {
55                b.push_bind(article.download_id)
56                    .push_bind(&article.message_id)
57                    .push_bind(article.segment_number)
58                    .push_bind(article.file_index)
59                    .push_bind(article.size_bytes)
60                    .push_bind(0); // status = PENDING
61            });
62
63            let query = query_builder.build();
64            query.execute(&self.pool).await.map_err(|e| {
65                Error::Database(DatabaseError::QueryFailed(format!(
66                    "Failed to insert articles batch: {}",
67                    e
68                )))
69            })?;
70        }
71
72        Ok(())
73    }
74
75    /// Update article status
76    pub async fn update_article_status(&self, article_id: i64, status: i32) -> Result<()> {
77        let now = chrono::Utc::now().timestamp();
78
79        sqlx::query(
80            r#"
81            UPDATE download_articles
82            SET status = ?, downloaded_at = ?
83            WHERE id = ?
84            "#,
85        )
86        .bind(status)
87        .bind(if status == article_status::DOWNLOADED {
88            Some(now)
89        } else {
90            None
91        })
92        .bind(article_id)
93        .execute(&self.pool)
94        .await
95        .map_err(|e| {
96            Error::Database(DatabaseError::QueryFailed(format!(
97                "Failed to update article status: {}",
98                e
99            )))
100        })?;
101
102        Ok(())
103    }
104
105    /// Update article status by message_id
106    pub async fn update_article_status_by_message_id(
107        &self,
108        download_id: DownloadId,
109        message_id: &str,
110        status: i32,
111    ) -> Result<()> {
112        let now = chrono::Utc::now().timestamp();
113
114        sqlx::query(
115            r#"
116            UPDATE download_articles
117            SET status = ?, downloaded_at = ?
118            WHERE download_id = ? AND message_id = ?
119            "#,
120        )
121        .bind(status)
122        .bind(if status == article_status::DOWNLOADED {
123            Some(now)
124        } else {
125            None
126        })
127        .bind(download_id)
128        .bind(message_id)
129        .execute(&self.pool)
130        .await
131        .map_err(|e| {
132            Error::Database(DatabaseError::QueryFailed(format!(
133                "Failed to update article status: {}",
134                e
135            )))
136        })?;
137
138        Ok(())
139    }
140
141    /// Update multiple article statuses in a single transaction (more efficient for batch operations)
142    ///
143    /// # Arguments
144    /// * `updates` - Vector of tuples containing (article_id, status)
145    ///
146    /// # Performance
147    /// This method uses a CASE-WHEN statement to update multiple rows in a single query,
148    /// which is significantly faster than individual UPDATE statements. With 100 updates,
149    /// this can be 50-100x faster than calling `update_article_status` 100 times.
150    ///
151    /// # Example
152    /// ```rust,ignore
153    /// let updates = vec![
154    ///     (123, article_status::DOWNLOADED),
155    ///     (124, article_status::DOWNLOADED),
156    ///     (125, article_status::FAILED),
157    /// ];
158    /// db.update_articles_status_batch(&updates).await?;
159    /// ```
160    /// Update multiple article statuses in a single transaction (more efficient for batch operations)
161    ///
162    /// Automatically chunks the input to stay within SQLite's bind variable limit.
163    /// Each update uses ~3-4 bind variables (article_id x3 + optional timestamp),
164    /// so we chunk to max 100 updates per query.
165    pub async fn update_articles_status_batch(&self, updates: &[(i64, i32)]) -> Result<()> {
166        if updates.is_empty() {
167            return Ok(());
168        }
169
170        // Each update uses up to 4 bind variables (id in status CASE, status, id in downloaded_at CASE,
171        // optional timestamp, id in WHERE IN). Conservative limit of 100 per batch.
172        const MAX_UPDATES_PER_BATCH: usize = 100;
173
174        let now = chrono::Utc::now().timestamp();
175
176        for chunk in updates.chunks(MAX_UPDATES_PER_BATCH) {
177            let mut query_builder =
178                sqlx::QueryBuilder::new("UPDATE download_articles SET status = CASE ");
179
180            // Build status CASE clause
181            for (article_id, status) in chunk {
182                query_builder.push("WHEN id = ");
183                query_builder.push_bind(*article_id);
184                query_builder.push(" THEN ");
185                query_builder.push_bind(*status);
186                query_builder.push(" ");
187            }
188            query_builder.push("END, downloaded_at = CASE ");
189
190            // Build downloaded_at CASE clause (only set timestamp for DOWNLOADED status)
191            for (article_id, status) in chunk {
192                query_builder.push("WHEN id = ");
193                query_builder.push_bind(*article_id);
194                if *status == article_status::DOWNLOADED {
195                    query_builder.push(" THEN ");
196                    query_builder.push_bind(now);
197                } else {
198                    query_builder.push(" THEN downloaded_at"); // Keep existing value
199                }
200                query_builder.push(" ");
201            }
202            query_builder.push("END WHERE id IN (");
203
204            // Build WHERE IN clause
205            let mut first = true;
206            for (article_id, _) in chunk {
207                if !first {
208                    query_builder.push(", ");
209                }
210                query_builder.push_bind(*article_id);
211                first = false;
212            }
213            query_builder.push(")");
214
215            let query = query_builder.build();
216            query.execute(&self.pool).await.map_err(|e| {
217                Error::Database(DatabaseError::QueryFailed(format!(
218                    "Failed to update articles status batch: {}",
219                    e
220                )))
221            })?;
222        }
223
224        Ok(())
225    }
226
227    /// Get all articles for a download
228    pub async fn get_articles(&self, download_id: DownloadId) -> Result<Vec<Article>> {
229        let rows = sqlx::query_as::<_, Article>(
230            r#"
231            SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
232            FROM download_articles
233            WHERE download_id = ?
234            ORDER BY file_index ASC, segment_number ASC
235            "#,
236        )
237        .bind(download_id)
238        .fetch_all(&self.pool)
239        .await
240        .map_err(|e| {
241            Error::Database(DatabaseError::QueryFailed(format!(
242                "Failed to get articles: {}",
243                e
244            )))
245        })?;
246
247        Ok(rows)
248    }
249
250    /// Get pending articles for a download, excluding paused files.
251    pub async fn get_pending_articles(&self, download_id: DownloadId) -> Result<Vec<Article>> {
252        let rows = sqlx::query_as::<_, Article>(
253            r#"
254            SELECT da.id, da.download_id, da.message_id, da.segment_number, da.file_index, da.size_bytes, da.status, da.downloaded_at
255            FROM download_articles da
256            LEFT JOIN download_files df
257              ON df.download_id = da.download_id
258             AND df.file_index = da.file_index
259            WHERE da.download_id = ?
260              AND da.status = 0
261              AND COALESCE(df.paused, 0) = 0
262            ORDER BY da.file_index ASC, da.segment_number ASC
263            "#,
264        )
265        .bind(download_id)
266        .fetch_all(&self.pool)
267        .await
268        .map_err(|e| {
269            Error::Database(DatabaseError::QueryFailed(format!(
270                "Failed to get pending articles: {}",
271                e
272            )))
273        })?;
274
275        Ok(rows)
276    }
277
278    /// Get article by message_id
279    pub async fn get_article_by_message_id(
280        &self,
281        download_id: DownloadId,
282        message_id: &str,
283    ) -> Result<Option<Article>> {
284        let row = sqlx::query_as::<_, Article>(
285            r#"
286            SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
287            FROM download_articles
288            WHERE download_id = ? AND message_id = ?
289            "#,
290        )
291        .bind(download_id)
292        .bind(message_id)
293        .fetch_optional(&self.pool)
294        .await
295        .map_err(|e| {
296            Error::Database(DatabaseError::QueryFailed(format!(
297                "Failed to get article: {}",
298                e
299            )))
300        })?;
301
302        Ok(row)
303    }
304
305    /// Count articles by status for a download
306    pub async fn count_articles_by_status(
307        &self,
308        download_id: DownloadId,
309        status: i32,
310    ) -> Result<i64> {
311        let count: i64 = sqlx::query_scalar(
312            "SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = ?",
313        )
314        .bind(download_id)
315        .bind(status)
316        .fetch_one(&self.pool)
317        .await
318        .map_err(|e| {
319            Error::Database(DatabaseError::QueryFailed(format!(
320                "Failed to count articles: {}",
321                e
322            )))
323        })?;
324
325        Ok(count)
326    }
327
328    /// Get total article count for a download
329    pub async fn count_articles(&self, download_id: DownloadId) -> Result<i64> {
330        let count: i64 =
331            sqlx::query_scalar("SELECT COUNT(*) FROM download_articles WHERE download_id = ?")
332                .bind(download_id)
333                .fetch_one(&self.pool)
334                .await
335                .map_err(|e| {
336                    Error::Database(DatabaseError::QueryFailed(format!(
337                        "Failed to count articles: {}",
338                        e
339                    )))
340                })?;
341
342        Ok(count)
343    }
344
345    /// Delete all articles for a download (automatic via CASCADE, but explicit method for clarity)
346    pub async fn delete_articles(&self, download_id: DownloadId) -> Result<()> {
347        sqlx::query("DELETE FROM download_articles WHERE download_id = ?")
348            .bind(download_id)
349            .execute(&self.pool)
350            .await
351            .map_err(|e| {
352                Error::Database(DatabaseError::QueryFailed(format!(
353                    "Failed to delete articles: {}",
354                    e
355                )))
356            })?;
357
358        Ok(())
359    }
360
361    /// Insert multiple download files in a batch
362    pub async fn insert_files_batch(&self, files: &[super::NewDownloadFile]) -> Result<()> {
363        if files.is_empty() {
364            return Ok(());
365        }
366
367        // Each file uses 5 bind variables, max 199 per batch
368        const MAX_FILES_PER_BATCH: usize = 199;
369
370        for chunk in files.chunks(MAX_FILES_PER_BATCH) {
371            let mut query_builder = sqlx::QueryBuilder::new(
372                "INSERT INTO download_files (download_id, file_index, filename, subject, total_segments) ",
373            );
374
375            query_builder.push_values(chunk, |mut b, file| {
376                b.push_bind(file.download_id)
377                    .push_bind(file.file_index)
378                    .push_bind(&file.filename)
379                    .push_bind(&file.subject)
380                    .push_bind(file.total_segments);
381            });
382
383            let query = query_builder.build();
384            query.execute(&self.pool).await.map_err(|e| {
385                Error::Database(DatabaseError::QueryFailed(format!(
386                    "Failed to insert files batch: {}",
387                    e
388                )))
389            })?;
390        }
391
392        Ok(())
393    }
394
395    /// Get all download files for a download
396    pub async fn get_download_files(
397        &self,
398        download_id: DownloadId,
399    ) -> Result<Vec<super::DownloadFile>> {
400        let rows = sqlx::query_as::<_, super::DownloadFile>(
401            r#"
402            SELECT id, download_id, file_index, filename, subject, total_segments, paused, completed, original_filename
403            FROM download_files
404            WHERE download_id = ?
405            ORDER BY file_index ASC
406            "#,
407        )
408        .bind(download_id)
409        .fetch_all(&self.pool)
410        .await
411        .map_err(|e| {
412            Error::Database(DatabaseError::QueryFailed(format!(
413                "Failed to get download files: {}",
414                e
415            )))
416        })?;
417
418        Ok(rows)
419    }
420
421    /// Get newly completed files for DirectUnpack processing.
422    ///
423    /// Returns unpaused files where `completed=0` and all articles have been downloaded.
424    pub async fn get_newly_completed_files(
425        &self,
426        download_id: DownloadId,
427    ) -> Result<Vec<super::DownloadFile>> {
428        let rows = sqlx::query_as::<_, super::DownloadFile>(
429            r#"
430            SELECT df.id, df.download_id, df.file_index, df.filename, df.subject,
431                   df.total_segments, df.paused, df.completed, df.original_filename
432            FROM download_files df
433            WHERE df.download_id = ?
434              AND df.paused = 0
435              AND df.completed = 0
436              AND df.total_segments = (
437                SELECT COUNT(*) FROM download_articles da
438                WHERE da.download_id = df.download_id
439                  AND da.file_index = df.file_index
440                  AND da.status = 1
441              )
442            "#,
443        )
444        .bind(download_id)
445        .fetch_all(&self.pool)
446        .await
447        .map_err(|e| {
448            Error::Database(DatabaseError::QueryFailed(format!(
449                "Failed to get newly completed files: {}",
450                e
451            )))
452        })?;
453
454        Ok(rows)
455    }
456
457    /// Mark a file as completed (all segments downloaded)
458    pub async fn mark_file_completed(
459        &self,
460        download_id: DownloadId,
461        file_index: i32,
462    ) -> Result<()> {
463        sqlx::query(
464            "UPDATE download_files SET completed = 1 WHERE download_id = ? AND file_index = ?",
465        )
466        .bind(download_id)
467        .bind(file_index)
468        .execute(&self.pool)
469        .await
470        .map_err(|e| {
471            Error::Database(DatabaseError::QueryFailed(format!(
472                "Failed to mark file completed: {}",
473                e
474            )))
475        })?;
476
477        Ok(())
478    }
479
480    /// Update the DirectUnpack state for a download
481    pub async fn update_direct_unpack_state(
482        &self,
483        download_id: DownloadId,
484        state: i32,
485    ) -> Result<()> {
486        sqlx::query("UPDATE downloads SET direct_unpack_state = ? WHERE id = ?")
487            .bind(state)
488            .bind(download_id)
489            .execute(&self.pool)
490            .await
491            .map_err(|e| {
492                Error::Database(DatabaseError::QueryFailed(format!(
493                    "Failed to update direct_unpack_state: {}",
494                    e
495                )))
496            })?;
497
498        Ok(())
499    }
500
501    /// Get the DirectUnpack state for a download
502    pub async fn get_direct_unpack_state(&self, download_id: DownloadId) -> Result<i32> {
503        let state: i32 =
504            sqlx::query_scalar("SELECT direct_unpack_state FROM downloads WHERE id = ?")
505                .bind(download_id)
506                .fetch_one(&self.pool)
507                .await
508                .map_err(|e| {
509                    Error::Database(DatabaseError::QueryFailed(format!(
510                        "Failed to get direct_unpack_state: {}",
511                        e
512                    )))
513                })?;
514
515        Ok(state)
516    }
517
518    /// Rename a download file (for DirectRename), storing the original filename
519    pub async fn rename_download_file(
520        &self,
521        download_id: DownloadId,
522        file_index: i32,
523        new_filename: &str,
524    ) -> Result<()> {
525        sqlx::query(
526            r#"
527            UPDATE download_files
528            SET original_filename = CASE WHEN original_filename IS NULL THEN filename ELSE original_filename END,
529                filename = ?
530            WHERE download_id = ? AND file_index = ?
531            "#,
532        )
533        .bind(new_filename)
534        .bind(download_id)
535        .bind(file_index)
536        .execute(&self.pool)
537        .await
538        .map_err(|e| {
539            Error::Database(DatabaseError::QueryFailed(format!(
540                "Failed to rename download file: {}",
541                e
542            )))
543        })?;
544
545        Ok(())
546    }
547
548    /// Update the DirectUnpack extracted count for a download
549    pub async fn update_direct_unpack_extracted_count(
550        &self,
551        download_id: DownloadId,
552        count: i32,
553    ) -> Result<()> {
554        sqlx::query("UPDATE downloads SET direct_unpack_extracted_count = ? WHERE id = ?")
555            .bind(count)
556            .bind(download_id)
557            .execute(&self.pool)
558            .await
559            .map_err(|e| {
560                Error::Database(DatabaseError::QueryFailed(format!(
561                    "Failed to update direct_unpack_extracted_count: {}",
562                    e
563                )))
564            })?;
565
566        Ok(())
567    }
568
569    /// Get the DirectUnpack extracted count for a download
570    pub async fn get_direct_unpack_extracted_count(&self, download_id: DownloadId) -> Result<i32> {
571        let count: i32 =
572            sqlx::query_scalar("SELECT direct_unpack_extracted_count FROM downloads WHERE id = ?")
573                .bind(download_id)
574                .fetch_one(&self.pool)
575                .await
576                .map_err(|e| {
577                    Error::Database(DatabaseError::QueryFailed(format!(
578                        "Failed to get direct_unpack_extracted_count: {}",
579                        e
580                    )))
581                })?;
582
583        Ok(count)
584    }
585
586    /// Set a file's paused state.
587    pub async fn set_file_paused(
588        &self,
589        download_id: DownloadId,
590        file_index: i32,
591        paused: bool,
592    ) -> Result<()> {
593        sqlx::query(
594            "UPDATE download_files SET paused = ? WHERE download_id = ? AND file_index = ?",
595        )
596        .bind(if paused { 1 } else { 0 })
597        .bind(download_id)
598        .bind(file_index)
599        .execute(&self.pool)
600        .await
601        .map_err(|e| {
602            Error::Database(DatabaseError::QueryFailed(format!(
603                "Failed to update file paused state: {}",
604                e
605            )))
606        })?;
607
608        Ok(())
609    }
610
611    /// Get a single download file by download and file index.
612    pub async fn get_download_file(
613        &self,
614        download_id: DownloadId,
615        file_index: i32,
616    ) -> Result<Option<DownloadFile>> {
617        let row = sqlx::query_as::<_, DownloadFile>(
618            r#"
619            SELECT id, download_id, file_index, filename, subject, total_segments, paused, completed, original_filename
620            FROM download_files
621            WHERE download_id = ? AND file_index = ?
622            "#,
623        )
624        .bind(download_id)
625        .bind(file_index)
626        .fetch_optional(&self.pool)
627        .await
628        .map_err(|e| {
629            Error::Database(DatabaseError::QueryFailed(format!(
630                "Failed to get download file: {}",
631                e
632            )))
633        })?;
634
635        Ok(row)
636    }
637
638    /// Return true when a download still has unpaused pending articles.
639    pub async fn has_active_pending_articles(&self, download_id: DownloadId) -> Result<bool> {
640        let count: i64 = sqlx::query_scalar(
641            r#"
642            SELECT COUNT(*)
643            FROM download_articles da
644            LEFT JOIN download_files df
645              ON df.download_id = da.download_id
646             AND df.file_index = da.file_index
647            WHERE da.download_id = ?
648              AND da.status = 0
649              AND COALESCE(df.paused, 0) = 0
650            "#,
651        )
652        .bind(download_id)
653        .fetch_one(&self.pool)
654        .await
655        .map_err(|e| {
656            Error::Database(DatabaseError::QueryFailed(format!(
657                "Failed to count active pending articles: {}",
658                e
659            )))
660        })?;
661
662        Ok(count > 0)
663    }
664
665    /// Return true when a download still has any pending articles, including paused files.
666    pub async fn has_any_pending_articles(&self, download_id: DownloadId) -> Result<bool> {
667        let count: i64 = sqlx::query_scalar(
668            "SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = 0",
669        )
670        .bind(download_id)
671        .fetch_one(&self.pool)
672        .await
673        .map_err(|e| {
674            Error::Database(DatabaseError::QueryFailed(format!(
675                "Failed to count pending articles: {}",
676                e
677            )))
678        })?;
679
680        Ok(count > 0)
681    }
682
683    /// Count failed articles for a download
684    pub async fn count_failed_articles(&self, download_id: DownloadId) -> Result<i64> {
685        let count: i64 = sqlx::query_scalar(
686            "SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = 2",
687        )
688        .bind(download_id)
689        .fetch_one(&self.pool)
690        .await
691        .map_err(|e| {
692            Error::Database(DatabaseError::QueryFailed(format!(
693                "Failed to count failed articles: {}",
694                e
695            )))
696        })?;
697
698        Ok(count)
699    }
700}