1use crate::error::DatabaseError;
4use crate::types::DownloadId;
5use crate::{Error, Result};
6
7use super::{Article, Database, DownloadFile, NewArticle, article_status};
8
9impl Database {
10 pub async fn insert_article(&self, article: &NewArticle) -> Result<i64> {
12 let result = sqlx::query(
13 r#"
14 INSERT INTO download_articles (
15 download_id, message_id, segment_number, file_index, size_bytes, status
16 ) VALUES (?, ?, ?, ?, ?, 0)
17 "#,
18 )
19 .bind(article.download_id)
20 .bind(&article.message_id)
21 .bind(article.segment_number)
22 .bind(article.file_index)
23 .bind(article.size_bytes)
24 .execute(&self.pool)
25 .await
26 .map_err(|e| {
27 Error::Database(DatabaseError::QueryFailed(format!(
28 "Failed to insert article: {}",
29 e
30 )))
31 })?;
32
33 Ok(result.last_insert_rowid())
34 }
35
36 pub async fn insert_articles_batch(&self, articles: &[NewArticle]) -> Result<()> {
41 if articles.is_empty() {
42 return Ok(());
43 }
44
45 const MAX_ARTICLES_PER_BATCH: usize = 166;
48
49 for chunk in articles.chunks(MAX_ARTICLES_PER_BATCH) {
50 let mut query_builder = sqlx::QueryBuilder::new(
51 "INSERT INTO download_articles (download_id, message_id, segment_number, file_index, size_bytes, status) ",
52 );
53
54 query_builder.push_values(chunk, |mut b, article| {
55 b.push_bind(article.download_id)
56 .push_bind(&article.message_id)
57 .push_bind(article.segment_number)
58 .push_bind(article.file_index)
59 .push_bind(article.size_bytes)
60 .push_bind(0); });
62
63 let query = query_builder.build();
64 query.execute(&self.pool).await.map_err(|e| {
65 Error::Database(DatabaseError::QueryFailed(format!(
66 "Failed to insert articles batch: {}",
67 e
68 )))
69 })?;
70 }
71
72 Ok(())
73 }
74
75 pub async fn update_article_status(&self, article_id: i64, status: i32) -> Result<()> {
77 let now = chrono::Utc::now().timestamp();
78
79 sqlx::query(
80 r#"
81 UPDATE download_articles
82 SET status = ?, downloaded_at = ?
83 WHERE id = ?
84 "#,
85 )
86 .bind(status)
87 .bind(if status == article_status::DOWNLOADED {
88 Some(now)
89 } else {
90 None
91 })
92 .bind(article_id)
93 .execute(&self.pool)
94 .await
95 .map_err(|e| {
96 Error::Database(DatabaseError::QueryFailed(format!(
97 "Failed to update article status: {}",
98 e
99 )))
100 })?;
101
102 Ok(())
103 }
104
105 pub async fn update_article_status_by_message_id(
107 &self,
108 download_id: DownloadId,
109 message_id: &str,
110 status: i32,
111 ) -> Result<()> {
112 let now = chrono::Utc::now().timestamp();
113
114 sqlx::query(
115 r#"
116 UPDATE download_articles
117 SET status = ?, downloaded_at = ?
118 WHERE download_id = ? AND message_id = ?
119 "#,
120 )
121 .bind(status)
122 .bind(if status == article_status::DOWNLOADED {
123 Some(now)
124 } else {
125 None
126 })
127 .bind(download_id)
128 .bind(message_id)
129 .execute(&self.pool)
130 .await
131 .map_err(|e| {
132 Error::Database(DatabaseError::QueryFailed(format!(
133 "Failed to update article status: {}",
134 e
135 )))
136 })?;
137
138 Ok(())
139 }
140
141 pub async fn update_articles_status_batch(&self, updates: &[(i64, i32)]) -> Result<()> {
166 if updates.is_empty() {
167 return Ok(());
168 }
169
170 const MAX_UPDATES_PER_BATCH: usize = 100;
173
174 let now = chrono::Utc::now().timestamp();
175
176 for chunk in updates.chunks(MAX_UPDATES_PER_BATCH) {
177 let mut query_builder =
178 sqlx::QueryBuilder::new("UPDATE download_articles SET status = CASE ");
179
180 for (article_id, status) in chunk {
182 query_builder.push("WHEN id = ");
183 query_builder.push_bind(*article_id);
184 query_builder.push(" THEN ");
185 query_builder.push_bind(*status);
186 query_builder.push(" ");
187 }
188 query_builder.push("END, downloaded_at = CASE ");
189
190 for (article_id, status) in chunk {
192 query_builder.push("WHEN id = ");
193 query_builder.push_bind(*article_id);
194 if *status == article_status::DOWNLOADED {
195 query_builder.push(" THEN ");
196 query_builder.push_bind(now);
197 } else {
198 query_builder.push(" THEN downloaded_at"); }
200 query_builder.push(" ");
201 }
202 query_builder.push("END WHERE id IN (");
203
204 let mut first = true;
206 for (article_id, _) in chunk {
207 if !first {
208 query_builder.push(", ");
209 }
210 query_builder.push_bind(*article_id);
211 first = false;
212 }
213 query_builder.push(")");
214
215 let query = query_builder.build();
216 query.execute(&self.pool).await.map_err(|e| {
217 Error::Database(DatabaseError::QueryFailed(format!(
218 "Failed to update articles status batch: {}",
219 e
220 )))
221 })?;
222 }
223
224 Ok(())
225 }
226
227 pub async fn get_articles(&self, download_id: DownloadId) -> Result<Vec<Article>> {
229 let rows = sqlx::query_as::<_, Article>(
230 r#"
231 SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
232 FROM download_articles
233 WHERE download_id = ?
234 ORDER BY file_index ASC, segment_number ASC
235 "#,
236 )
237 .bind(download_id)
238 .fetch_all(&self.pool)
239 .await
240 .map_err(|e| {
241 Error::Database(DatabaseError::QueryFailed(format!(
242 "Failed to get articles: {}",
243 e
244 )))
245 })?;
246
247 Ok(rows)
248 }
249
250 pub async fn get_pending_articles(&self, download_id: DownloadId) -> Result<Vec<Article>> {
252 let rows = sqlx::query_as::<_, Article>(
253 r#"
254 SELECT da.id, da.download_id, da.message_id, da.segment_number, da.file_index, da.size_bytes, da.status, da.downloaded_at
255 FROM download_articles da
256 LEFT JOIN download_files df
257 ON df.download_id = da.download_id
258 AND df.file_index = da.file_index
259 WHERE da.download_id = ?
260 AND da.status = 0
261 AND COALESCE(df.paused, 0) = 0
262 ORDER BY da.file_index ASC, da.segment_number ASC
263 "#,
264 )
265 .bind(download_id)
266 .fetch_all(&self.pool)
267 .await
268 .map_err(|e| {
269 Error::Database(DatabaseError::QueryFailed(format!(
270 "Failed to get pending articles: {}",
271 e
272 )))
273 })?;
274
275 Ok(rows)
276 }
277
278 pub async fn get_article_by_message_id(
280 &self,
281 download_id: DownloadId,
282 message_id: &str,
283 ) -> Result<Option<Article>> {
284 let row = sqlx::query_as::<_, Article>(
285 r#"
286 SELECT id, download_id, message_id, segment_number, file_index, size_bytes, status, downloaded_at
287 FROM download_articles
288 WHERE download_id = ? AND message_id = ?
289 "#,
290 )
291 .bind(download_id)
292 .bind(message_id)
293 .fetch_optional(&self.pool)
294 .await
295 .map_err(|e| {
296 Error::Database(DatabaseError::QueryFailed(format!(
297 "Failed to get article: {}",
298 e
299 )))
300 })?;
301
302 Ok(row)
303 }
304
305 pub async fn count_articles_by_status(
307 &self,
308 download_id: DownloadId,
309 status: i32,
310 ) -> Result<i64> {
311 let count: i64 = sqlx::query_scalar(
312 "SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = ?",
313 )
314 .bind(download_id)
315 .bind(status)
316 .fetch_one(&self.pool)
317 .await
318 .map_err(|e| {
319 Error::Database(DatabaseError::QueryFailed(format!(
320 "Failed to count articles: {}",
321 e
322 )))
323 })?;
324
325 Ok(count)
326 }
327
328 pub async fn count_articles(&self, download_id: DownloadId) -> Result<i64> {
330 let count: i64 =
331 sqlx::query_scalar("SELECT COUNT(*) FROM download_articles WHERE download_id = ?")
332 .bind(download_id)
333 .fetch_one(&self.pool)
334 .await
335 .map_err(|e| {
336 Error::Database(DatabaseError::QueryFailed(format!(
337 "Failed to count articles: {}",
338 e
339 )))
340 })?;
341
342 Ok(count)
343 }
344
345 pub async fn delete_articles(&self, download_id: DownloadId) -> Result<()> {
347 sqlx::query("DELETE FROM download_articles WHERE download_id = ?")
348 .bind(download_id)
349 .execute(&self.pool)
350 .await
351 .map_err(|e| {
352 Error::Database(DatabaseError::QueryFailed(format!(
353 "Failed to delete articles: {}",
354 e
355 )))
356 })?;
357
358 Ok(())
359 }
360
361 pub async fn insert_files_batch(&self, files: &[super::NewDownloadFile]) -> Result<()> {
363 if files.is_empty() {
364 return Ok(());
365 }
366
367 const MAX_FILES_PER_BATCH: usize = 199;
369
370 for chunk in files.chunks(MAX_FILES_PER_BATCH) {
371 let mut query_builder = sqlx::QueryBuilder::new(
372 "INSERT INTO download_files (download_id, file_index, filename, subject, total_segments) ",
373 );
374
375 query_builder.push_values(chunk, |mut b, file| {
376 b.push_bind(file.download_id)
377 .push_bind(file.file_index)
378 .push_bind(&file.filename)
379 .push_bind(&file.subject)
380 .push_bind(file.total_segments);
381 });
382
383 let query = query_builder.build();
384 query.execute(&self.pool).await.map_err(|e| {
385 Error::Database(DatabaseError::QueryFailed(format!(
386 "Failed to insert files batch: {}",
387 e
388 )))
389 })?;
390 }
391
392 Ok(())
393 }
394
395 pub async fn get_download_files(
397 &self,
398 download_id: DownloadId,
399 ) -> Result<Vec<super::DownloadFile>> {
400 let rows = sqlx::query_as::<_, super::DownloadFile>(
401 r#"
402 SELECT id, download_id, file_index, filename, subject, total_segments, paused, completed, original_filename
403 FROM download_files
404 WHERE download_id = ?
405 ORDER BY file_index ASC
406 "#,
407 )
408 .bind(download_id)
409 .fetch_all(&self.pool)
410 .await
411 .map_err(|e| {
412 Error::Database(DatabaseError::QueryFailed(format!(
413 "Failed to get download files: {}",
414 e
415 )))
416 })?;
417
418 Ok(rows)
419 }
420
421 pub async fn get_newly_completed_files(
425 &self,
426 download_id: DownloadId,
427 ) -> Result<Vec<super::DownloadFile>> {
428 let rows = sqlx::query_as::<_, super::DownloadFile>(
429 r#"
430 SELECT df.id, df.download_id, df.file_index, df.filename, df.subject,
431 df.total_segments, df.paused, df.completed, df.original_filename
432 FROM download_files df
433 WHERE df.download_id = ?
434 AND df.paused = 0
435 AND df.completed = 0
436 AND df.total_segments = (
437 SELECT COUNT(*) FROM download_articles da
438 WHERE da.download_id = df.download_id
439 AND da.file_index = df.file_index
440 AND da.status = 1
441 )
442 "#,
443 )
444 .bind(download_id)
445 .fetch_all(&self.pool)
446 .await
447 .map_err(|e| {
448 Error::Database(DatabaseError::QueryFailed(format!(
449 "Failed to get newly completed files: {}",
450 e
451 )))
452 })?;
453
454 Ok(rows)
455 }
456
457 pub async fn mark_file_completed(
459 &self,
460 download_id: DownloadId,
461 file_index: i32,
462 ) -> Result<()> {
463 sqlx::query(
464 "UPDATE download_files SET completed = 1 WHERE download_id = ? AND file_index = ?",
465 )
466 .bind(download_id)
467 .bind(file_index)
468 .execute(&self.pool)
469 .await
470 .map_err(|e| {
471 Error::Database(DatabaseError::QueryFailed(format!(
472 "Failed to mark file completed: {}",
473 e
474 )))
475 })?;
476
477 Ok(())
478 }
479
480 pub async fn update_direct_unpack_state(
482 &self,
483 download_id: DownloadId,
484 state: i32,
485 ) -> Result<()> {
486 sqlx::query("UPDATE downloads SET direct_unpack_state = ? WHERE id = ?")
487 .bind(state)
488 .bind(download_id)
489 .execute(&self.pool)
490 .await
491 .map_err(|e| {
492 Error::Database(DatabaseError::QueryFailed(format!(
493 "Failed to update direct_unpack_state: {}",
494 e
495 )))
496 })?;
497
498 Ok(())
499 }
500
501 pub async fn get_direct_unpack_state(&self, download_id: DownloadId) -> Result<i32> {
503 let state: i32 =
504 sqlx::query_scalar("SELECT direct_unpack_state FROM downloads WHERE id = ?")
505 .bind(download_id)
506 .fetch_one(&self.pool)
507 .await
508 .map_err(|e| {
509 Error::Database(DatabaseError::QueryFailed(format!(
510 "Failed to get direct_unpack_state: {}",
511 e
512 )))
513 })?;
514
515 Ok(state)
516 }
517
518 pub async fn rename_download_file(
520 &self,
521 download_id: DownloadId,
522 file_index: i32,
523 new_filename: &str,
524 ) -> Result<()> {
525 sqlx::query(
526 r#"
527 UPDATE download_files
528 SET original_filename = CASE WHEN original_filename IS NULL THEN filename ELSE original_filename END,
529 filename = ?
530 WHERE download_id = ? AND file_index = ?
531 "#,
532 )
533 .bind(new_filename)
534 .bind(download_id)
535 .bind(file_index)
536 .execute(&self.pool)
537 .await
538 .map_err(|e| {
539 Error::Database(DatabaseError::QueryFailed(format!(
540 "Failed to rename download file: {}",
541 e
542 )))
543 })?;
544
545 Ok(())
546 }
547
548 pub async fn update_direct_unpack_extracted_count(
550 &self,
551 download_id: DownloadId,
552 count: i32,
553 ) -> Result<()> {
554 sqlx::query("UPDATE downloads SET direct_unpack_extracted_count = ? WHERE id = ?")
555 .bind(count)
556 .bind(download_id)
557 .execute(&self.pool)
558 .await
559 .map_err(|e| {
560 Error::Database(DatabaseError::QueryFailed(format!(
561 "Failed to update direct_unpack_extracted_count: {}",
562 e
563 )))
564 })?;
565
566 Ok(())
567 }
568
569 pub async fn get_direct_unpack_extracted_count(&self, download_id: DownloadId) -> Result<i32> {
571 let count: i32 =
572 sqlx::query_scalar("SELECT direct_unpack_extracted_count FROM downloads WHERE id = ?")
573 .bind(download_id)
574 .fetch_one(&self.pool)
575 .await
576 .map_err(|e| {
577 Error::Database(DatabaseError::QueryFailed(format!(
578 "Failed to get direct_unpack_extracted_count: {}",
579 e
580 )))
581 })?;
582
583 Ok(count)
584 }
585
586 pub async fn set_file_paused(
588 &self,
589 download_id: DownloadId,
590 file_index: i32,
591 paused: bool,
592 ) -> Result<()> {
593 sqlx::query(
594 "UPDATE download_files SET paused = ? WHERE download_id = ? AND file_index = ?",
595 )
596 .bind(if paused { 1 } else { 0 })
597 .bind(download_id)
598 .bind(file_index)
599 .execute(&self.pool)
600 .await
601 .map_err(|e| {
602 Error::Database(DatabaseError::QueryFailed(format!(
603 "Failed to update file paused state: {}",
604 e
605 )))
606 })?;
607
608 Ok(())
609 }
610
611 pub async fn get_download_file(
613 &self,
614 download_id: DownloadId,
615 file_index: i32,
616 ) -> Result<Option<DownloadFile>> {
617 let row = sqlx::query_as::<_, DownloadFile>(
618 r#"
619 SELECT id, download_id, file_index, filename, subject, total_segments, paused, completed, original_filename
620 FROM download_files
621 WHERE download_id = ? AND file_index = ?
622 "#,
623 )
624 .bind(download_id)
625 .bind(file_index)
626 .fetch_optional(&self.pool)
627 .await
628 .map_err(|e| {
629 Error::Database(DatabaseError::QueryFailed(format!(
630 "Failed to get download file: {}",
631 e
632 )))
633 })?;
634
635 Ok(row)
636 }
637
638 pub async fn has_active_pending_articles(&self, download_id: DownloadId) -> Result<bool> {
640 let count: i64 = sqlx::query_scalar(
641 r#"
642 SELECT COUNT(*)
643 FROM download_articles da
644 LEFT JOIN download_files df
645 ON df.download_id = da.download_id
646 AND df.file_index = da.file_index
647 WHERE da.download_id = ?
648 AND da.status = 0
649 AND COALESCE(df.paused, 0) = 0
650 "#,
651 )
652 .bind(download_id)
653 .fetch_one(&self.pool)
654 .await
655 .map_err(|e| {
656 Error::Database(DatabaseError::QueryFailed(format!(
657 "Failed to count active pending articles: {}",
658 e
659 )))
660 })?;
661
662 Ok(count > 0)
663 }
664
665 pub async fn has_any_pending_articles(&self, download_id: DownloadId) -> Result<bool> {
667 let count: i64 = sqlx::query_scalar(
668 "SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = 0",
669 )
670 .bind(download_id)
671 .fetch_one(&self.pool)
672 .await
673 .map_err(|e| {
674 Error::Database(DatabaseError::QueryFailed(format!(
675 "Failed to count pending articles: {}",
676 e
677 )))
678 })?;
679
680 Ok(count > 0)
681 }
682
683 pub async fn count_failed_articles(&self, download_id: DownloadId) -> Result<i64> {
685 let count: i64 = sqlx::query_scalar(
686 "SELECT COUNT(*) FROM download_articles WHERE download_id = ? AND status = 2",
687 )
688 .bind(download_id)
689 .fetch_one(&self.pool)
690 .await
691 .map_err(|e| {
692 Error::Database(DatabaseError::QueryFailed(format!(
693 "Failed to count failed articles: {}",
694 e
695 )))
696 })?;
697
698 Ok(count)
699 }
700}