Skip to main content

takanawa_http/
state.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3
4use takanawa_core::PartMetadata;
5
6#[repr(u32)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum DownloadPhase {
9    Created = 0,
10    Running = 1,
11    Paused = 2,
12    Cancelled = 3,
13    Completed = 4,
14    Failed = 5,
15    Pausing = 6,
16    Cancelling = 7,
17}
18
19#[derive(Debug, Clone)]
20pub struct DownloadSnapshot {
21    pub phase: DownloadPhase,
22    pub content_len: u64,
23    pub downloaded_bytes: u64,
24    pub chunk_size: u64,
25    pub chunk_count: u64,
26    pub completed_chunks: u64,
27    pub active_io: usize,
28    pub last_error: Option<String>,
29}
30
31#[derive(Debug, Clone)]
32pub(crate) struct SharedState {
33    inner: Arc<Inner>,
34}
35
36#[derive(Debug)]
37struct Inner {
38    progress: Mutex<Progress>,
39    active_io: AtomicUsize,
40}
41
42#[derive(Debug, Clone)]
43struct Progress {
44    lifecycle: DownloadLifecycle,
45    content_len: u64,
46    downloaded_bytes: u64,
47    chunk_size: u64,
48    chunk_count: u64,
49    completed_chunks: u64,
50    bitmap: Vec<u8>,
51    last_error: Option<String>,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DownloadLifecycle {
56    Created,
57    Running,
58    Pausing,
59    Paused,
60    Cancelling,
61    Cancelled,
62    Completed,
63    Failed,
64}
65
66impl DownloadLifecycle {
67    const fn phase(self) -> DownloadPhase {
68        match self {
69            Self::Created => DownloadPhase::Created,
70            Self::Running => DownloadPhase::Running,
71            Self::Pausing => DownloadPhase::Pausing,
72            Self::Paused => DownloadPhase::Paused,
73            Self::Cancelling => DownloadPhase::Cancelling,
74            Self::Cancelled => DownloadPhase::Cancelled,
75            Self::Completed => DownloadPhase::Completed,
76            Self::Failed => DownloadPhase::Failed,
77        }
78    }
79
80    const fn start(self) -> Self {
81        match self {
82            Self::Cancelling => Self::Cancelling,
83            Self::Running | Self::Pausing => self,
84            Self::Created | Self::Paused | Self::Cancelled | Self::Completed | Self::Failed => {
85                Self::Running
86            }
87        }
88    }
89
90    const fn request_pause(self) -> Self {
91        match self {
92            Self::Running | Self::Pausing => Self::Pausing,
93            _ => self,
94        }
95    }
96
97    const fn mark_paused(self) -> Self {
98        match self {
99            Self::Running | Self::Pausing => Self::Paused,
100            _ => self,
101        }
102    }
103
104    const fn request_cancel(self) -> Self {
105        match self {
106            Self::Created => Self::Cancelled,
107            Self::Running | Self::Pausing | Self::Paused => Self::Cancelling,
108            _ => self,
109        }
110    }
111
112    const fn mark_cancelled(self) -> Self {
113        match self {
114            Self::Created | Self::Running | Self::Pausing | Self::Paused | Self::Cancelling => {
115                Self::Cancelled
116            }
117            _ => self,
118        }
119    }
120
121    const fn mark_completed(self) -> Self {
122        match self {
123            Self::Running | Self::Pausing | Self::Paused => Self::Completed,
124            _ => self,
125        }
126    }
127
128    const fn mark_failed(self) -> Self {
129        match self {
130            Self::Created
131            | Self::Running
132            | Self::Pausing
133            | Self::Paused
134            | Self::Cancelling
135            | Self::Cancelled
136            | Self::Completed
137            | Self::Failed => Self::Failed,
138        }
139    }
140}
141
142impl SharedState {
143    #[must_use]
144    pub fn new() -> Self {
145        Self {
146            inner: Arc::new(Inner {
147                progress: Mutex::new(Progress {
148                    lifecycle: DownloadLifecycle::Created,
149                    content_len: 0,
150                    downloaded_bytes: 0,
151                    chunk_size: 0,
152                    chunk_count: 0,
153                    completed_chunks: 0,
154                    bitmap: Vec::new(),
155                    last_error: None,
156                }),
157                active_io: AtomicUsize::new(0),
158            }),
159        }
160    }
161
162    pub fn mark_running(&self) {
163        self.transition(DownloadLifecycle::start);
164    }
165
166    pub fn request_pause(&self) {
167        self.transition(DownloadLifecycle::request_pause);
168    }
169
170    pub fn mark_paused(&self) {
171        self.transition(DownloadLifecycle::mark_paused);
172    }
173
174    pub fn request_cancel(&self) {
175        self.transition(DownloadLifecycle::request_cancel);
176    }
177
178    pub fn mark_cancelled(&self) {
179        self.transition(DownloadLifecycle::mark_cancelled);
180    }
181
182    pub fn mark_completed(&self) {
183        self.transition(DownloadLifecycle::mark_completed);
184    }
185
186    fn transition(&self, transition: impl FnOnce(DownloadLifecycle) -> DownloadLifecycle) {
187        let mut progress = self
188            .inner
189            .progress
190            .lock()
191            .expect("download state mutex poisoned");
192        progress.lifecycle = transition(progress.lifecycle);
193    }
194
195    pub fn mark_failed(&self, message: impl Into<String>) {
196        let mut progress = self
197            .inner
198            .progress
199            .lock()
200            .expect("download state mutex poisoned");
201        progress.lifecycle = progress.lifecycle.mark_failed();
202        progress.last_error = Some(message.into());
203    }
204
205    pub fn clear_error(&self) {
206        self.inner
207            .progress
208            .lock()
209            .expect("download state mutex poisoned")
210            .last_error = None;
211    }
212
213    pub fn update_from_metadata(&self, metadata: &PartMetadata) {
214        let mut progress = self
215            .inner
216            .progress
217            .lock()
218            .expect("download state mutex poisoned");
219        progress.content_len = metadata.content_len;
220        progress.downloaded_bytes = metadata.completed_bytes();
221        progress.chunk_size = metadata.chunk_size;
222        progress.chunk_count = metadata.chunk_count;
223        progress.completed_chunks = metadata.completed_chunks();
224        progress.bitmap = metadata.bitmap.as_bytes().to_vec();
225    }
226
227    pub fn increment_active_io(&self) {
228        self.inner.active_io.fetch_add(1, Ordering::Relaxed);
229    }
230
231    pub fn decrement_active_io(&self) {
232        self.inner.active_io.fetch_sub(1, Ordering::Relaxed);
233    }
234
235    #[must_use]
236    pub fn snapshot(&self) -> DownloadSnapshot {
237        let progress = self
238            .inner
239            .progress
240            .lock()
241            .expect("download state mutex poisoned")
242            .clone();
243        DownloadSnapshot {
244            phase: progress.lifecycle.phase(),
245            content_len: progress.content_len,
246            downloaded_bytes: progress.downloaded_bytes,
247            chunk_size: progress.chunk_size,
248            chunk_count: progress.chunk_count,
249            completed_chunks: progress.completed_chunks,
250            active_io: self.inner.active_io.load(Ordering::Relaxed),
251            last_error: progress.last_error,
252        }
253    }
254
255    #[must_use]
256    pub fn bitmap(&self) -> Vec<u8> {
257        self.inner
258            .progress
259            .lock()
260            .expect("download state mutex poisoned")
261            .bitmap
262            .clone()
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn lifecycle_reports_transitional_pause_and_cancel_phases() {
272        let state = SharedState::new();
273
274        assert_eq!(state.snapshot().phase, DownloadPhase::Created);
275        state.mark_running();
276        assert_eq!(state.snapshot().phase, DownloadPhase::Running);
277        state.request_pause();
278        assert_eq!(state.snapshot().phase, DownloadPhase::Pausing);
279        state.mark_paused();
280        assert_eq!(state.snapshot().phase, DownloadPhase::Paused);
281        state.mark_running();
282        assert_eq!(state.snapshot().phase, DownloadPhase::Running);
283        state.request_cancel();
284        assert_eq!(state.snapshot().phase, DownloadPhase::Cancelling);
285        state.mark_cancelled();
286        assert_eq!(state.snapshot().phase, DownloadPhase::Cancelled);
287    }
288
289    #[test]
290    fn lifecycle_keeps_terminal_states_stable_for_late_events() {
291        let state = SharedState::new();
292
293        state.mark_running();
294        state.mark_completed();
295        state.request_pause();
296        state.request_cancel();
297
298        assert_eq!(state.snapshot().phase, DownloadPhase::Completed);
299    }
300}