1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
//! Download lifecycle control — pause, resume, cancel, priority, reprocess.
use crate::error::{DatabaseError, DownloadError, Error, Result};
use crate::types::{DownloadId, Event, Priority, Stage, Status};
use std::path::PathBuf;
use super::UsenetDownloader;
impl UsenetDownloader {
/// Pause a download
///
/// This method pauses a download without removing it from the queue.
/// If the download is currently downloading, it will be stopped gracefully
/// (after completing the current article). The download will be marked as
/// Paused in the database and can be resumed later with `resume()`.
///
/// # Arguments
///
/// * `id` - The download ID to pause
///
/// # Returns
///
/// Returns Ok(()) if the download was successfully paused, or an error if:
/// - The download doesn't exist
/// - The download is already paused, complete, or failed
/// - Database update fails
///
/// # Examples
///
/// ```no_run
/// # use usenet_dl::*;
/// # async fn example(downloader: UsenetDownloader, id: DownloadId) -> Result<()> {
/// // Pause a download
/// downloader.pause(id).await?;
/// # Ok(())
/// # }
/// ```
pub async fn pause(&self, id: DownloadId) -> Result<()> {
// Fetch download from database
let download = self.db.get_download(id).await?.ok_or_else(|| {
Error::Database(DatabaseError::NotFound(format!(
"Download {} not found",
id
)))
})?;
let current_status = Status::from_i32(download.status);
// Check if download can be paused
match current_status {
Status::Paused => {
// Already paused, nothing to do
return Ok(());
}
Status::Complete | Status::Failed => {
return Err(Error::Download(DownloadError::InvalidState {
id: id.into(),
operation: "pause".to_string(),
current_state: format!("{:?}", current_status),
}));
}
Status::Queued | Status::Downloading | Status::Processing => {
// Can be paused
}
}
// If download is actively running, cancel its task
let mut active_downloads = self.queue_state.active_downloads.lock().await;
if let Some(cancel_token) = active_downloads.get(&id) {
// Signal the download task to stop
cancel_token.cancel();
// Remove from active downloads (task will clean up)
active_downloads.remove(&id);
}
drop(active_downloads); // Release lock
// Remove from queue if it's still queued (not yet started)
self.remove_from_queue(id).await;
// Update status to Paused in database
self.db.update_status(id, Status::Paused.to_i32()).await?;
Ok(())
}
/// Resume a paused download
///
/// This method restarts a paused download by changing its status back to Queued
/// and adding it to the priority queue. The queue processor will automatically
/// pick it up and continue downloading from where it left off.
///
/// Downloads resume at the article level - any articles that were already
/// downloaded are skipped, and only pending articles are fetched.
///
/// # Arguments
///
/// * `id` - The download ID to resume
///
/// # Returns
///
/// Returns Ok(()) if the download was successfully resumed, or an error if:
/// - The download doesn't exist
/// - The download is not paused (already queued, downloading, complete, or failed)
/// - Database update fails
/// - Queue insertion fails
///
/// # Examples
///
/// ```no_run
/// # use usenet_dl::*;
/// # async fn example(downloader: UsenetDownloader, id: DownloadId) -> Result<()> {
/// // Resume a paused download
/// downloader.resume(id).await?;
/// # Ok(())
/// # }
/// ```
pub async fn resume(&self, id: DownloadId) -> Result<()> {
// Fetch download from database
let download = self.db.get_download(id).await?.ok_or_else(|| {
Error::Database(DatabaseError::NotFound(format!(
"Download {} not found",
id
)))
})?;
let current_status = Status::from_i32(download.status);
// Check if download can be resumed
match current_status {
Status::Paused => {
// Can be resumed
}
Status::Queued | Status::Downloading | Status::Processing => {
// Already active, nothing to do (idempotent)
return Ok(());
}
Status::Complete | Status::Failed => {
return Err(Error::Download(DownloadError::InvalidState {
id: id.into(),
operation: "resume".to_string(),
current_state: format!("{:?}", current_status),
}));
}
}
// Update status back to Queued
self.db.update_status(id, Status::Queued.to_i32()).await?;
// Add back to priority queue for processing
// The queue processor will automatically pick it up
// Article-level tracking ensures only pending articles are downloaded
self.add_to_queue(id).await?;
Ok(())
}
/// Resume a partially downloaded job from where it left off
///
/// This method is the low-level resume operation that queries pending articles
/// and adds the download back to the queue for processing. It checks if there are
/// any pending articles remaining - if none, it proceeds directly to post-processing.
/// If articles remain, it re-queues the download for the queue processor to continue.
///
/// This method is primarily used internally by restore_queue() during startup to
/// resume interrupted downloads, but can also be called directly for explicit resume operations.
pub async fn resume_download(&self, id: DownloadId) -> Result<()> {
// Get pending articles for this download, excluding paused files.
let pending_articles = self.db.get_pending_articles(id).await?;
if !pending_articles.is_empty() {
tracing::info!(
download_id = id.0,
pending_articles = pending_articles.len(),
"Resuming download with pending articles"
);
self.db.update_status(id, Status::Queued.to_i32()).await?;
self.add_to_queue(id).await?;
return Ok(());
}
if self.db.has_any_pending_articles(id).await? {
tracing::info!(
download_id = id.0,
"Only paused file articles remain - keeping download paused"
);
self.db.update_status(id, Status::Paused.to_i32()).await?;
return Ok(());
}
tracing::info!(
download_id = id.0,
"No pending articles - marking as Processing"
);
self.db
.update_status(id, Status::Processing.to_i32())
.await?;
Ok(())
}
/// Cancel a download and delete its files
///
/// This method removes a download from the queue, stops it if actively running,
/// deletes all downloaded files from the temp directory, and removes it from the database.
///
/// # Arguments
///
/// * `id` - The download ID to cancel
///
/// # Examples
///
/// ```no_run
/// # use usenet_dl::*;
/// # async fn example(downloader: UsenetDownloader, id: DownloadId) -> Result<()> {
/// // Cancel a download and remove all files
/// downloader.cancel(id).await?;
/// # Ok(())
/// # }
/// ```
pub async fn cancel(&self, id: DownloadId) -> Result<()> {
// Verify download exists
let _download = self.db.get_download(id).await?.ok_or_else(|| {
Error::Database(DatabaseError::NotFound(format!(
"Download {} not found",
id
)))
})?;
// If download is actively running, cancel its task
let mut active_downloads = self.queue_state.active_downloads.lock().await;
if let Some(cancel_token) = active_downloads.get(&id) {
// Signal the download task to stop
cancel_token.cancel();
// Remove from active downloads
active_downloads.remove(&id);
}
drop(active_downloads); // Release lock
// Remove from queue if it's still queued (not yet started)
self.remove_from_queue(id).await;
// Delete downloaded files from temp directory
let download_temp_dir = self
.config
.download
.temp_dir
.join(format!("download_{}", id.0));
if download_temp_dir.exists()
&& let Err(e) = tokio::fs::remove_dir_all(&download_temp_dir).await
{
tracing::warn!(
download_id = id.0,
path = ?download_temp_dir,
error = %e,
"Failed to delete download temp directory"
);
// Continue anyway - database deletion is more important
}
// Delete download from database (cascades to articles, passwords)
self.db.delete_download(id).await?;
// Emit Removed event
self.emit_event(crate::types::Event::Removed { id });
Ok(())
}
/// Set the priority of a download
///
/// This method changes the priority of a download. If the download is queued,
/// it will be re-queued with the new priority. Active downloads keep running
/// but the priority is saved for when they're queued again.
///
/// # Arguments
///
/// * `id` - The download ID to update
/// * `priority` - The new priority level (Low, Normal, High, or Force)
///
/// # Examples
///
/// ```no_run
/// # use usenet_dl::*;
/// # async fn example(downloader: UsenetDownloader, id: DownloadId) -> Result<()> {
/// // Set download to high priority
/// downloader.set_priority(id, Priority::High).await?;
/// # Ok(())
/// # }
/// ```
pub async fn set_priority(&self, id: DownloadId, priority: Priority) -> Result<()> {
// Verify download exists
let download = self.db.get_download(id).await?.ok_or_else(|| {
Error::Database(DatabaseError::NotFound(format!(
"Download {} not found",
id
)))
})?;
let current_status = Status::from_i32(download.status);
// Update priority in database
self.db.update_priority(id, priority as i32).await?;
// If download is queued (not actively downloading), reorder the queue
// by removing and re-adding with new priority
if current_status == Status::Queued {
// Remove from queue
self.remove_from_queue(id).await;
// Re-add with new priority
// We need to fetch the download again to get updated priority
self.add_to_queue(id).await?;
}
Ok(())
}
/// Re-run post-processing on a completed or failed download
///
/// This method allows re-running the post-processing pipeline on a download.
/// This is useful when:
/// - Extraction failed due to missing password (now added)
/// - Post-processing settings changed
/// - Files were manually repaired
///
/// The download files must still exist in the temp directory for reprocessing to work.
pub async fn reprocess(&self, id: DownloadId) -> Result<()> {
// Get download from database
let _download = self
.db
.get_download(id)
.await?
.ok_or_else(|| Error::NotFound(format!("Download {} not found", id.0)))?;
// Determine download path (temp directory)
let download_path = self
.config
.download
.temp_dir
.join(format!("download_{}", id.0));
// Verify download files still exist
if !download_path.exists() {
return Err(Error::NotFound(format!(
"Download files not found at {}. Cannot reprocess.",
download_path.display()
)));
}
tracing::info!(
download_id = id.0,
path = %download_path.display(),
"Starting reprocessing"
);
// Reset status and re-queue for post-processing
self.db
.update_status(id, Status::Processing.to_i32())
.await?;
// Clear any previous error message
self.db.set_error(id, "").await?;
// Emit Verifying event to indicate post-processing is starting
self.emit_event(Event::Verifying { id });
// Start post-processing pipeline
// This will run asynchronously
let downloader = self.clone();
tokio::spawn(async move {
if let Err(e) = downloader.start_post_processing(id).await {
tracing::error!(
download_id = id.0,
error = %e,
"Reprocessing failed"
);
}
});
Ok(())
}
/// Re-run extraction only (skip verify/repair)
///
/// This method re-runs the extraction stage for a download that has already been downloaded.
/// Unlike `reprocess()`, this skips PAR2 verification and repair stages and goes straight
/// to archive extraction. This is useful when:
/// - Extraction failed due to missing password (now added)
/// - Extraction settings changed
/// - User wants to re-extract without re-downloading
pub async fn reextract(&self, id: DownloadId) -> Result<()> {
// Get download from database
let download = self
.db
.get_download(id)
.await?
.ok_or_else(|| Error::NotFound(format!("Download {} not found", id.0)))?;
// Determine download path (temp directory)
let download_path = self
.config
.download
.temp_dir
.join(format!("download_{}", id.0));
// Verify download files still exist
if !download_path.exists() {
return Err(Error::NotFound(format!(
"Download files not found at {}. Cannot re-extract.",
download_path.display()
)));
}
tracing::info!(
download_id = id.0,
path = %download_path.display(),
"Starting re-extraction (skip verify/repair)"
);
// Reset status to processing
self.db
.update_status(id, Status::Processing.to_i32())
.await?;
// Clear any previous error message
self.db.set_error(id, "").await?;
// Emit Extracting event to indicate extraction is starting
self.emit_event(Event::Extracting {
id,
archive: String::new(),
percent: 0.0,
});
// Run extraction stage only (skip verify/repair)
// This will run asynchronously
let downloader = self.clone();
let destination = PathBuf::from(download.destination.clone());
let post_processor = self.processing.post_processor.clone();
tokio::spawn(async move {
// Run re-extraction (extract + move, skip verify/repair)
match post_processor
.reextract(id, download_path, destination)
.await
{
Ok(final_path) => {
downloader
.handle_reextract_success(id, final_path, download)
.await;
}
Err(e) => {
downloader.handle_reextract_failure(id, e, download).await;
}
}
});
Ok(())
}
/// Handle successful re-extraction completion
async fn handle_reextract_success(
&self,
id: DownloadId,
final_path: PathBuf,
download: crate::db::Download,
) {
tracing::info!(download_id = id.0, ?final_path, "Re-extraction complete");
// Update status to complete
if let Err(e) = self.db.update_status(id, Status::Complete.to_i32()).await {
tracing::error!(
download_id = id.0,
error = %e,
"Failed to update status to complete"
);
}
// Emit Complete event
self.emit_event(Event::Complete {
id,
path: final_path.clone(),
});
// Trigger webhooks for complete event
self.trigger_webhooks(super::webhooks::TriggerWebhooksParams {
event_type: crate::config::WebhookEvent::OnComplete,
download_id: id,
name: download.name.clone(),
category: download.category.clone(),
status: "complete".to_string(),
destination: Some(final_path.clone()),
error: None,
});
// Trigger scripts for complete event
self.trigger_scripts(super::webhooks::TriggerScriptsParams {
event_type: crate::config::ScriptEvent::OnComplete,
download_id: id,
name: download.name.clone(),
category: download.category.clone(),
status: "complete".to_string(),
destination: Some(final_path),
error: None,
size_bytes: download.size_bytes as u64,
});
}
/// Handle re-extraction failure
async fn handle_reextract_failure(
&self,
id: DownloadId,
error: crate::error::Error,
download: crate::db::Download,
) {
// Convert error to string once, reuse throughout
let error_msg = error.to_string();
tracing::error!(
download_id = id.0,
error = %error_msg,
"Re-extraction failed"
);
// Update status to failed
if let Err(db_err) = self.db.update_status(id, Status::Failed.to_i32()).await {
tracing::error!(
download_id = id.0,
error = %db_err,
"Failed to update status to failed"
);
}
// Set error message
if let Err(db_err) = self.db.set_error(id, &error_msg).await {
tracing::error!(
download_id = id.0,
error = %db_err,
"Failed to set error message"
);
}
// Emit Failed event
self.emit_event(Event::Failed {
id,
stage: Stage::Extract,
error: error_msg.clone(),
files_kept: true,
});
// Trigger webhooks for failed event
self.trigger_webhooks(super::webhooks::TriggerWebhooksParams {
event_type: crate::config::WebhookEvent::OnFailed,
download_id: id,
name: download.name.clone(),
category: download.category.clone(),
status: "failed".to_string(),
destination: None,
error: Some(error_msg.clone()),
});
// Trigger scripts for failed event
self.trigger_scripts(super::webhooks::TriggerScriptsParams {
event_type: crate::config::ScriptEvent::OnFailed,
download_id: id,
name: download.name.clone(),
category: download.category.clone(),
status: "failed".to_string(),
destination: None,
error: Some(error_msg),
size_bytes: download.size_bytes as u64,
});
}
/// Pause all active downloads
///
/// This method pauses all downloads that are currently queued, downloading, or processing.
/// Already paused, completed, or failed downloads are not affected.
pub async fn pause_all(&self) -> Result<()> {
// Get all downloads that can be paused (Queued, Downloading, Processing)
let all_downloads = self.db.list_downloads().await?;
let mut paused_count = 0;
for download in all_downloads {
let status = Status::from_i32(download.status);
// Only pause active downloads
match status {
Status::Queued | Status::Downloading | Status::Processing => {
if let Err(e) = self.pause(DownloadId(download.id)).await {
tracing::warn!(
download_id = download.id,
error = %e,
"Failed to pause download during pause_all"
);
// Continue with other downloads
} else {
paused_count += 1;
}
}
Status::Paused | Status::Complete | Status::Failed => {
// Skip already paused/finished downloads
}
}
}
tracing::info!(paused_count = paused_count, "Paused all active downloads");
// Emit global QueuePaused event
self.emit_event(crate::types::Event::QueuePaused);
Ok(())
}
/// Resume all paused downloads
///
/// This method resumes all downloads that are currently paused.
/// Downloads in other states (queued, downloading, complete, failed) are not affected.
pub async fn resume_all(&self) -> Result<()> {
// Get all paused downloads
let paused_downloads = self
.db
.list_downloads_by_status(Status::Paused.to_i32())
.await?;
let mut resumed_count = 0;
for download in paused_downloads {
if let Err(e) = self.resume(DownloadId(download.id)).await {
tracing::warn!(
download_id = download.id,
error = %e,
"Failed to resume download during resume_all"
);
// Continue with other downloads
} else {
resumed_count += 1;
}
}
tracing::info!(
resumed_count = resumed_count,
"Resumed all paused downloads"
);
// Emit global QueueResumed event
self.emit_event(crate::types::Event::QueueResumed);
Ok(())
}
}