Skip to main content

usenet_dl/db/
downloads.rs

1//! Download queue CRUD operations.
2
3use crate::error::DatabaseError;
4use crate::types::DownloadId;
5use crate::{Error, Result};
6
7use super::{Database, Download, NewDownload};
8
9impl Database {
10    /// Insert a new download record
11    pub async fn insert_download(&self, download: &NewDownload) -> Result<DownloadId> {
12        let now = chrono::Utc::now().timestamp();
13
14        let result = sqlx::query(
15            r#"
16            INSERT INTO downloads (
17                name, nzb_path, nzb_meta_name, nzb_hash, job_name,
18                category, destination, post_process, priority, status,
19                progress, speed_bps, size_bytes, downloaded_bytes,
20                created_at
21            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
22            "#,
23        )
24        .bind(&download.name)
25        .bind(&download.nzb_path)
26        .bind(&download.nzb_meta_name)
27        .bind(&download.nzb_hash)
28        .bind(&download.job_name)
29        .bind(&download.category)
30        .bind(&download.destination)
31        .bind(download.post_process)
32        .bind(download.priority)
33        .bind(download.status)
34        .bind(0.0f32) // progress
35        .bind(0i64) // speed_bps
36        .bind(download.size_bytes)
37        .bind(0i64) // downloaded_bytes
38        .bind(now)
39        .execute(&self.pool)
40        .await
41        .map_err(|e| {
42            Error::Database(DatabaseError::QueryFailed(format!(
43                "Failed to insert download: {}",
44                e
45            )))
46        })?;
47
48        Ok(DownloadId(result.last_insert_rowid()))
49    }
50
51    /// Get a download by ID
52    pub async fn get_download(&self, id: DownloadId) -> Result<Option<Download>> {
53        let row = sqlx::query_as::<_, Download>(
54            r#"
55            SELECT
56                id, name, nzb_path, nzb_meta_name, nzb_hash, job_name,
57                category, destination, post_process, priority, status,
58                progress, speed_bps, size_bytes, downloaded_bytes,
59                error_message, created_at, started_at, completed_at,
60                direct_unpack_state, direct_unpack_extracted_count
61            FROM downloads
62            WHERE id = ?
63            "#,
64        )
65        .bind(id)
66        .fetch_optional(&self.pool)
67        .await
68        .map_err(|e| {
69            Error::Database(DatabaseError::QueryFailed(format!(
70                "Failed to get download: {}",
71                e
72            )))
73        })?;
74
75        Ok(row)
76    }
77
78    /// List all downloads
79    pub async fn list_downloads(&self) -> Result<Vec<Download>> {
80        let rows = sqlx::query_as::<_, Download>(
81            r#"
82            SELECT
83                id, name, nzb_path, nzb_meta_name, nzb_hash, job_name,
84                category, destination, post_process, priority, status,
85                progress, speed_bps, size_bytes, downloaded_bytes,
86                error_message, created_at, started_at, completed_at,
87                direct_unpack_state, direct_unpack_extracted_count
88            FROM downloads
89            ORDER BY priority DESC, created_at ASC
90            "#,
91        )
92        .fetch_all(&self.pool)
93        .await
94        .map_err(|e| {
95            Error::Database(DatabaseError::QueryFailed(format!(
96                "Failed to list downloads: {}",
97                e
98            )))
99        })?;
100
101        Ok(rows)
102    }
103
104    /// List downloads with a specific status
105    pub async fn list_downloads_by_status(&self, status: i32) -> Result<Vec<Download>> {
106        let rows = sqlx::query_as::<_, Download>(
107            r#"
108            SELECT
109                id, name, nzb_path, nzb_meta_name, nzb_hash, job_name,
110                category, destination, post_process, priority, status,
111                progress, speed_bps, size_bytes, downloaded_bytes,
112                error_message, created_at, started_at, completed_at,
113                direct_unpack_state, direct_unpack_extracted_count
114            FROM downloads
115            WHERE status = ?
116            ORDER BY priority DESC, created_at ASC
117            "#,
118        )
119        .bind(status)
120        .fetch_all(&self.pool)
121        .await
122        .map_err(|e| {
123            Error::Database(DatabaseError::QueryFailed(format!(
124                "Failed to list downloads by status: {}",
125                e
126            )))
127        })?;
128
129        Ok(rows)
130    }
131
132    /// Update download status
133    pub async fn update_status(&self, id: DownloadId, status: i32) -> Result<()> {
134        sqlx::query("UPDATE downloads SET status = ? WHERE id = ?")
135            .bind(status)
136            .bind(id)
137            .execute(&self.pool)
138            .await
139            .map_err(|e| {
140                Error::Database(DatabaseError::QueryFailed(format!(
141                    "Failed to update status: {}",
142                    e
143                )))
144            })?;
145
146        Ok(())
147    }
148
149    /// Update download progress
150    pub async fn update_progress(
151        &self,
152        id: DownloadId,
153        progress: f32,
154        speed_bps: u64,
155        downloaded_bytes: u64,
156    ) -> Result<()> {
157        sqlx::query(
158            "UPDATE downloads SET progress = ?, speed_bps = ?, downloaded_bytes = ? WHERE id = ?",
159        )
160        .bind(progress)
161        .bind(speed_bps as i64)
162        .bind(downloaded_bytes as i64)
163        .bind(id)
164        .execute(&self.pool)
165        .await
166        .map_err(|e| {
167            Error::Database(DatabaseError::QueryFailed(format!(
168                "Failed to update progress: {}",
169                e
170            )))
171        })?;
172
173        Ok(())
174    }
175
176    /// Update download priority
177    pub async fn update_priority(&self, id: DownloadId, priority: i32) -> Result<()> {
178        sqlx::query("UPDATE downloads SET priority = ? WHERE id = ?")
179            .bind(priority)
180            .bind(id)
181            .execute(&self.pool)
182            .await
183            .map_err(|e| {
184                Error::Database(DatabaseError::QueryFailed(format!(
185                    "Failed to update priority: {}",
186                    e
187                )))
188            })?;
189
190        Ok(())
191    }
192
193    /// Set download error message
194    pub async fn set_error(&self, id: DownloadId, error: &str) -> Result<()> {
195        sqlx::query("UPDATE downloads SET error_message = ? WHERE id = ?")
196            .bind(error)
197            .bind(id)
198            .execute(&self.pool)
199            .await
200            .map_err(|e| {
201                Error::Database(DatabaseError::QueryFailed(format!(
202                    "Failed to set error: {}",
203                    e
204                )))
205            })?;
206
207        Ok(())
208    }
209
210    /// Set download started timestamp
211    pub async fn set_started(&self, id: DownloadId) -> Result<()> {
212        let now = chrono::Utc::now().timestamp();
213        sqlx::query("UPDATE downloads SET started_at = ? WHERE id = ?")
214            .bind(now)
215            .bind(id)
216            .execute(&self.pool)
217            .await
218            .map_err(|e| {
219                Error::Database(DatabaseError::QueryFailed(format!(
220                    "Failed to set started timestamp: {}",
221                    e
222                )))
223            })?;
224
225        Ok(())
226    }
227
228    /// Set download completed timestamp
229    pub async fn set_completed(&self, id: DownloadId) -> Result<()> {
230        let now = chrono::Utc::now().timestamp();
231        sqlx::query("UPDATE downloads SET completed_at = ? WHERE id = ?")
232            .bind(now)
233            .bind(id)
234            .execute(&self.pool)
235            .await
236            .map_err(|e| {
237                Error::Database(DatabaseError::QueryFailed(format!(
238                    "Failed to set completed timestamp: {}",
239                    e
240                )))
241            })?;
242
243        Ok(())
244    }
245
246    /// Delete a download
247    pub async fn delete_download(&self, id: DownloadId) -> Result<()> {
248        sqlx::query("DELETE FROM downloads WHERE id = ?")
249            .bind(id)
250            .execute(&self.pool)
251            .await
252            .map_err(|e| {
253                Error::Database(DatabaseError::QueryFailed(format!(
254                    "Failed to delete download: {}",
255                    e
256                )))
257            })?;
258
259        Ok(())
260    }
261
262    /// Get incomplete downloads (for resume on startup)
263    pub async fn get_incomplete_downloads(&self) -> Result<Vec<Download>> {
264        let rows = sqlx::query_as::<_, Download>(
265            r#"
266            SELECT
267                id, name, nzb_path, nzb_meta_name, nzb_hash, job_name,
268                category, destination, post_process, priority, status,
269                progress, speed_bps, size_bytes, downloaded_bytes,
270                error_message, created_at, started_at, completed_at,
271                direct_unpack_state, direct_unpack_extracted_count
272            FROM downloads
273            WHERE status IN (0, 1, 3)
274            ORDER BY priority DESC, created_at ASC
275            "#,
276        )
277        .fetch_all(&self.pool)
278        .await
279        .map_err(|e| {
280            Error::Database(DatabaseError::QueryFailed(format!(
281                "Failed to get incomplete downloads: {}",
282                e
283            )))
284        })?;
285
286        Ok(rows)
287    }
288
289    /// Get all downloads (for state persistence during shutdown)
290    pub async fn get_all_downloads(&self) -> Result<Vec<Download>> {
291        let rows = sqlx::query_as::<_, Download>(
292            r#"
293            SELECT
294                id, name, nzb_path, nzb_meta_name, nzb_hash, job_name,
295                category, destination, post_process, priority, status,
296                progress, speed_bps, size_bytes, downloaded_bytes,
297                error_message, created_at, started_at, completed_at,
298                direct_unpack_state, direct_unpack_extracted_count
299            FROM downloads
300            ORDER BY created_at ASC
301            "#,
302        )
303        .fetch_all(&self.pool)
304        .await
305        .map_err(|e| {
306            Error::Database(DatabaseError::QueryFailed(format!(
307                "Failed to get all downloads: {}",
308                e
309            )))
310        })?;
311
312        Ok(rows)
313    }
314}