Skip to main content

modde_sources/
queue.rs

1//! Per-download state machine backed by `.meta` sidecars, used to resume,
2//! pause, and report progress for individual download tasks.
3
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context, Result};
7
8use crate::meta::{DownloadMeta, meta_path};
9
10/// State machine for a single download task.
11#[derive(Debug, Clone, PartialEq)]
12pub enum DownloadState {
13    Queued,
14    Active {
15        bytes_downloaded: u64,
16        total_bytes: Option<u64>,
17    },
18    Paused {
19        bytes_downloaded: u64,
20        total_bytes: Option<u64>,
21    },
22    Complete {
23        path: PathBuf,
24        hash: u64,
25    },
26    Failed {
27        error: String,
28    },
29}
30
31/// A single download in the queue.
32pub struct DownloadTask {
33    pub id: usize,
34    pub url: String,
35    pub dest: PathBuf,
36    pub expected_hash: Option<u64>,
37    pub state: DownloadState,
38    pub meta: DownloadMeta,
39}
40
41/// Synchronous download queue that tracks tasks and enforces concurrency limits.
42///
43/// This is a pure data structure — it does not perform I/O. The caller is
44/// responsible for taking tasks via [`take_next`](DownloadQueue::take_next), spawning async downloads,
45/// and updating task state when downloads progress or complete.
46pub struct DownloadQueue {
47    tasks: Vec<DownloadTask>,
48    max_concurrent: usize,
49    next_id: usize,
50}
51
52impl DownloadQueue {
53    /// Create a new queue with the given concurrency limit.
54    #[must_use]
55    pub fn new(max_concurrent: usize) -> Self {
56        Self {
57            tasks: Vec::new(),
58            max_concurrent,
59            next_id: 0,
60        }
61    }
62
63    /// Add a task to the queue. Returns the assigned task ID.
64    pub fn enqueue(
65        &mut self,
66        url: String,
67        dest: PathBuf,
68        expected_hash: Option<u64>,
69        meta: DownloadMeta,
70    ) -> usize {
71        let id = self.next_id;
72        self.next_id += 1;
73        let mut meta = meta;
74        meta.expected_hash = expected_hash.or(meta.expected_hash);
75        self.tasks.push(DownloadTask {
76            id,
77            url,
78            dest,
79            expected_hash,
80            state: DownloadState::Queued,
81            meta,
82        });
83        id
84    }
85
86    /// Rebuild a queue from persisted `.meta` sidecars in a download directory.
87    ///
88    /// Active downloads are restored as paused because no transport is running
89    /// after process startup. Complete entries are restored only when the
90    /// downloaded file still exists.
91    pub fn load_from_sidecars(download_dir: &Path, max_concurrent: usize) -> Result<Self> {
92        let mut queue = Self::new(max_concurrent);
93        if !download_dir.exists() {
94            return Ok(queue);
95        }
96
97        let mut sidecars = std::fs::read_dir(download_dir)
98            .with_context(|| format!("reading {}", download_dir.display()))?
99            .collect::<std::result::Result<Vec<_>, _>>()?;
100        sidecars.sort_by_key(std::fs::DirEntry::path);
101
102        for entry in sidecars {
103            let path = entry.path();
104            if path.extension().and_then(|ext| ext.to_str()) != Some("meta") {
105                continue;
106            }
107            let meta = DownloadMeta::load(&path)?;
108            let Some(dest) = download_path_from_meta_path(&path) else {
109                continue;
110            };
111            let state = state_from_meta(&meta, &dest);
112            let id = queue.next_id;
113            queue.next_id += 1;
114            queue.tasks.push(DownloadTask {
115                id,
116                url: meta.url.clone(),
117                dest,
118                expected_hash: meta.expected_hash,
119                state,
120                meta,
121            });
122        }
123
124        Ok(queue)
125    }
126
127    /// Persist every task's `.meta` sidecar beside its destination file.
128    pub fn save_sidecars(&mut self) -> Result<()> {
129        for task in &mut self.tasks {
130            sync_meta_from_state(task);
131            task.meta.save(&meta_path(&task.dest))?;
132        }
133        Ok(())
134    }
135
136    /// Pause an active download. No-op if the task is not `Active`.
137    pub fn pause(&mut self, id: usize) {
138        if let Some(task) = self.tasks.iter_mut().find(|t| t.id == id)
139            && let DownloadState::Active {
140                bytes_downloaded,
141                total_bytes,
142            } = task.state
143        {
144            task.state = DownloadState::Paused {
145                bytes_downloaded,
146                total_bytes,
147            };
148            sync_meta_from_state(task);
149        }
150    }
151
152    /// Resume a paused download by moving it back to `Queued`.
153    pub fn resume(&mut self, id: usize) {
154        if let Some(task) = self.tasks.iter_mut().find(|t| t.id == id)
155            && matches!(task.state, DownloadState::Paused { .. })
156        {
157            task.state = DownloadState::Queued;
158            sync_meta_from_state(task);
159        }
160    }
161
162    /// Remove a task from the queue entirely.
163    pub fn cancel(&mut self, id: usize) {
164        self.tasks.retain(|t| t.id != id);
165    }
166
167    /// Number of currently active downloads.
168    #[must_use]
169    pub fn active_count(&self) -> usize {
170        self.tasks
171            .iter()
172            .filter(|t| matches!(t.state, DownloadState::Active { .. }))
173            .count()
174    }
175
176    /// All tasks in the `Queued` state, ready to be started.
177    #[must_use]
178    pub fn pending(&self) -> Vec<&DownloadTask> {
179        self.tasks
180            .iter()
181            .filter(|t| matches!(t.state, DownloadState::Queued))
182            .collect()
183    }
184
185    /// Get the next `Queued` task if the concurrency limit has not been reached.
186    ///
187    /// Transitions the task to `Active` and returns a mutable reference so the
188    /// caller can spawn the download.
189    pub fn take_next(&mut self) -> Option<&mut DownloadTask> {
190        if self.active_count() >= self.max_concurrent {
191            return None;
192        }
193
194        // Find the index of the first Queued task.
195        let idx = self
196            .tasks
197            .iter()
198            .position(|t| matches!(t.state, DownloadState::Queued))?;
199
200        self.tasks[idx].state = DownloadState::Active {
201            bytes_downloaded: 0,
202            total_bytes: None,
203        };
204        sync_meta_from_state(&mut self.tasks[idx]);
205        Some(&mut self.tasks[idx])
206    }
207
208    /// Look up a task by ID.
209    #[must_use]
210    pub fn get(&self, id: usize) -> Option<&DownloadTask> {
211        self.tasks.iter().find(|t| t.id == id)
212    }
213
214    /// Look up a task by ID (mutable).
215    pub fn get_mut(&mut self, id: usize) -> Option<&mut DownloadTask> {
216        self.tasks.iter_mut().find(|t| t.id == id)
217    }
218
219    /// View all tracked tasks in insertion order.
220    #[must_use]
221    pub fn all(&self) -> &[DownloadTask] {
222        &self.tasks
223    }
224
225    /// Total number of tasks in the queue (all states).
226    #[must_use]
227    pub fn len(&self) -> usize {
228        self.tasks.len()
229    }
230
231    /// Whether the queue is empty.
232    #[must_use]
233    pub fn is_empty(&self) -> bool {
234        self.tasks.is_empty()
235    }
236}
237
238fn sync_meta_from_state(task: &mut DownloadTask) {
239    task.meta.expected_hash = task.expected_hash.or(task.meta.expected_hash);
240    match &task.state {
241        DownloadState::Queued => {
242            task.meta.status = "queued".to_string();
243        }
244        DownloadState::Active {
245            bytes_downloaded,
246            total_bytes,
247        } => {
248            task.meta.status = "downloading".to_string();
249            task.meta.bytes_downloaded = *bytes_downloaded;
250            task.meta.total_bytes = *total_bytes;
251        }
252        DownloadState::Paused {
253            bytes_downloaded,
254            total_bytes,
255        } => {
256            task.meta.status = "paused".to_string();
257            task.meta.bytes_downloaded = *bytes_downloaded;
258            task.meta.total_bytes = *total_bytes;
259        }
260        DownloadState::Complete { hash, .. } => {
261            task.meta.status = "complete".to_string();
262            task.meta.expected_hash = Some(*hash);
263            if let Ok(metadata) = std::fs::metadata(&task.dest) {
264                task.meta.bytes_downloaded = metadata.len();
265                task.meta.total_bytes = Some(metadata.len());
266            }
267        }
268        DownloadState::Failed { error } => {
269            task.meta.status = format!("failed: {error}");
270        }
271    }
272}
273
274fn state_from_meta(meta: &DownloadMeta, dest: &Path) -> DownloadState {
275    if meta.status == "complete" && dest.exists() {
276        return DownloadState::Complete {
277            path: dest.to_path_buf(),
278            hash: meta.expected_hash.unwrap_or_default(),
279        };
280    }
281    if meta.status == "paused" || meta.status == "downloading" {
282        return DownloadState::Paused {
283            bytes_downloaded: meta.bytes_downloaded,
284            total_bytes: meta.total_bytes,
285        };
286    }
287    if let Some(error) = meta.status.strip_prefix("failed: ") {
288        return DownloadState::Failed {
289            error: error.to_string(),
290        };
291    }
292    DownloadState::Queued
293}
294
295fn download_path_from_meta_path(path: &Path) -> Option<PathBuf> {
296    let file_name = path.file_name()?.to_str()?;
297    let download_name = file_name.strip_suffix(".meta")?;
298    Some(path.with_file_name(download_name))
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    fn test_meta() -> DownloadMeta {
306        DownloadMeta {
307            url: "https://example.com/mod.zip".into(),
308            expected_hash: None,
309            bytes_downloaded: 0,
310            total_bytes: None,
311            nexus_mod_id: None,
312            nexus_file_id: None,
313            game_domain: None,
314            mod_name: None,
315            version: None,
316            status: "queued".into(),
317        }
318    }
319
320    #[test]
321    fn test_enqueue_and_take_next() {
322        let mut q = DownloadQueue::new(2);
323        let id = q.enqueue(
324            "https://example.com/a.zip".into(),
325            PathBuf::from("/tmp/a.zip"),
326            Some(123),
327            test_meta(),
328        );
329        assert_eq!(id, 0);
330        assert_eq!(q.len(), 1);
331        assert_eq!(q.pending().len(), 1);
332
333        let task = q.take_next().unwrap();
334        assert_eq!(task.id, 0);
335        assert!(matches!(task.state, DownloadState::Active { .. }));
336
337        // No more queued tasks.
338        assert!(q.pending().is_empty());
339        assert!(q.take_next().is_none());
340    }
341
342    #[test]
343    fn test_concurrency_limit() {
344        let mut q = DownloadQueue::new(2);
345
346        q.enqueue("https://a".into(), PathBuf::from("/a"), None, test_meta());
347        q.enqueue("https://b".into(), PathBuf::from("/b"), None, test_meta());
348        q.enqueue("https://c".into(), PathBuf::from("/c"), None, test_meta());
349
350        // Take two — should succeed.
351        assert!(q.take_next().is_some());
352        assert!(q.take_next().is_some());
353
354        // Third should be blocked by concurrency limit.
355        assert_eq!(q.active_count(), 2);
356        assert!(q.take_next().is_none());
357
358        // Still one pending.
359        assert_eq!(q.pending().len(), 1);
360    }
361
362    #[test]
363    fn test_pause_resume() {
364        let mut q = DownloadQueue::new(2);
365        let id = q.enqueue("https://a".into(), PathBuf::from("/a"), None, test_meta());
366
367        // Take to make Active.
368        q.take_next();
369        assert_eq!(q.active_count(), 1);
370
371        // Pause.
372        q.pause(id);
373        assert_eq!(q.active_count(), 0);
374        assert!(matches!(
375            q.get(id).unwrap().state,
376            DownloadState::Paused { .. }
377        ));
378
379        // Resume — goes back to Queued.
380        q.resume(id);
381        assert!(matches!(q.get(id).unwrap().state, DownloadState::Queued));
382        assert_eq!(q.pending().len(), 1);
383
384        // Can take again.
385        let task = q.take_next().unwrap();
386        assert!(matches!(task.state, DownloadState::Active { .. }));
387    }
388
389    #[test]
390    fn test_cancel() {
391        let mut q = DownloadQueue::new(2);
392        let id0 = q.enqueue("https://a".into(), PathBuf::from("/a"), None, test_meta());
393        let id1 = q.enqueue("https://b".into(), PathBuf::from("/b"), None, test_meta());
394
395        assert_eq!(q.len(), 2);
396
397        q.cancel(id0);
398        assert_eq!(q.len(), 1);
399        assert!(q.get(id0).is_none());
400        assert!(q.get(id1).is_some());
401    }
402
403    #[test]
404    fn test_queue_sidecar_roundtrip_restores_active_as_paused() {
405        let dir = tempfile::tempdir().unwrap();
406        let dest = dir.path().join("mod.zip");
407        std::fs::write(&dest, b"partial").unwrap();
408        let mut queue = DownloadQueue::new(2);
409        let id = queue.enqueue(
410            "https://example.com/mod.zip".into(),
411            dest.clone(),
412            Some(123),
413            test_meta(),
414        );
415        queue.get_mut(id).unwrap().state = DownloadState::Active {
416            bytes_downloaded: 7,
417            total_bytes: Some(100),
418        };
419
420        queue.save_sidecars().unwrap();
421        let restored = DownloadQueue::load_from_sidecars(dir.path(), 2).unwrap();
422        let task = restored.all().first().unwrap();
423
424        assert_eq!(task.url, "https://example.com/mod.zip");
425        assert_eq!(task.dest, dest);
426        assert_eq!(task.expected_hash, Some(123));
427        assert!(matches!(
428            task.state,
429            DownloadState::Paused {
430                bytes_downloaded: 7,
431                total_bytes: Some(100)
432            }
433        ));
434    }
435}