1use super::{Segment, SegmentState, Storage};
6use crate::error::{EngineError, Result};
7use crate::types::{
8 DownloadId, DownloadKind, DownloadMetadata, DownloadProgress, DownloadState, DownloadStatus,
9};
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use rusqlite::{params, Connection, OptionalExtension};
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tokio::sync::Mutex;
16
17pub struct SqliteStorage {
19 conn: Arc<Mutex<Connection>>,
20}
21
22impl SqliteStorage {
23 pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
25 let path = path.as_ref();
26
27 if let Some(parent) = path.parent() {
29 if !parent.exists() {
30 tokio::fs::create_dir_all(parent).await.map_err(|e| {
31 EngineError::Database(format!("Failed to create database directory: {}", e))
32 })?;
33 }
34 }
35
36 let path = path.to_path_buf();
37 let conn = tokio::task::spawn_blocking(move || -> Result<Connection> {
38 let conn = Connection::open(&path)?;
39
40 conn.pragma_update(None, "journal_mode", "WAL")?;
42 conn.pragma_update(None, "synchronous", "NORMAL")?;
43 conn.pragma_update(None, "foreign_keys", "ON")?;
44
45 conn.execute_batch(SCHEMA)?;
47
48 Ok(conn)
49 })
50 .await
51 .map_err(|e| EngineError::Database(format!("Failed to initialize database: {}", e)))??;
52
53 Ok(Self {
54 conn: Arc::new(Mutex::new(conn)),
55 })
56 }
57
58 pub async fn in_memory() -> Result<Self> {
60 let conn = tokio::task::spawn_blocking(move || -> Result<Connection> {
61 let conn = Connection::open_in_memory()?;
62 conn.pragma_update(None, "foreign_keys", "ON")?;
63 conn.execute_batch(SCHEMA)?;
64 Ok(conn)
65 })
66 .await
67 .map_err(|e| {
68 EngineError::Database(format!("Failed to create in-memory database: {}", e))
69 })??;
70
71 Ok(Self {
72 conn: Arc::new(Mutex::new(conn)),
73 })
74 }
75}
76
77const SCHEMA: &str = r#"
79-- Downloads table
80CREATE TABLE IF NOT EXISTS downloads (
81 id TEXT PRIMARY KEY,
82 kind TEXT NOT NULL,
83 state TEXT NOT NULL,
84 state_error_kind TEXT,
85 state_error_message TEXT,
86 state_error_retryable INTEGER,
87
88 -- Progress
89 total_size INTEGER,
90 completed_size INTEGER NOT NULL DEFAULT 0,
91 download_speed INTEGER NOT NULL DEFAULT 0,
92 upload_speed INTEGER NOT NULL DEFAULT 0,
93 connections INTEGER NOT NULL DEFAULT 0,
94 seeders INTEGER NOT NULL DEFAULT 0,
95 peers INTEGER NOT NULL DEFAULT 0,
96
97 -- Priority
98 priority TEXT NOT NULL DEFAULT 'normal',
99
100 -- Metadata
101 name TEXT NOT NULL,
102 url TEXT,
103 magnet_uri TEXT,
104 info_hash TEXT,
105 save_dir TEXT NOT NULL,
106 filename TEXT,
107 user_agent TEXT,
108 referer TEXT,
109 headers_json TEXT,
110 cookies_json TEXT,
111 checksum_json TEXT,
112 mirrors_json TEXT,
113
114 -- Resume validation (HTTP)
115 etag TEXT,
116 last_modified TEXT,
117
118 -- Timestamps
119 created_at TEXT NOT NULL,
120 completed_at TEXT
121);
122
123-- Segments table for HTTP multi-connection downloads
124CREATE TABLE IF NOT EXISTS segments (
125 id INTEGER PRIMARY KEY AUTOINCREMENT,
126 download_id TEXT NOT NULL,
127 segment_index INTEGER NOT NULL,
128 start_byte INTEGER NOT NULL,
129 end_byte INTEGER NOT NULL,
130 downloaded INTEGER NOT NULL DEFAULT 0,
131 state TEXT NOT NULL,
132 error_message TEXT,
133 error_retries INTEGER DEFAULT 0,
134
135 FOREIGN KEY (download_id) REFERENCES downloads(id) ON DELETE CASCADE,
136 UNIQUE (download_id, segment_index)
137);
138
139-- Indexes for common queries
140CREATE INDEX IF NOT EXISTS idx_downloads_state ON downloads(state);
141CREATE INDEX IF NOT EXISTS idx_downloads_kind ON downloads(kind);
142CREATE INDEX IF NOT EXISTS idx_segments_download ON segments(download_id);
143"#;
144
145#[async_trait]
146impl Storage for SqliteStorage {
147 async fn save_download(&self, status: &DownloadStatus) -> Result<()> {
148 let conn = self.conn.clone();
149 let status = status.clone();
150
151 tokio::task::spawn_blocking(move || -> Result<()> {
152 let conn = conn.blocking_lock();
153
154 let (state_str, error_kind, error_msg, error_retryable) = match &status.state {
156 DownloadState::Queued => ("queued", None, None, None),
157 DownloadState::Connecting => ("connecting", None, None, None),
158 DownloadState::Downloading => ("downloading", None, None, None),
159 DownloadState::Seeding => ("seeding", None, None, None),
160 DownloadState::Paused => ("paused", None, None, None),
161 DownloadState::Completed => ("completed", None, None, None),
162 DownloadState::Error {
163 kind,
164 message,
165 retryable,
166 } => ("error", Some(kind.clone()), Some(message.clone()), Some(*retryable)),
167 };
168
169 let kind_str = match status.kind {
171 DownloadKind::Http => "http",
172 DownloadKind::Torrent => "torrent",
173 DownloadKind::Magnet => "magnet",
174 };
175
176 let priority_str = status.priority.to_string();
178
179 let headers_json = serde_json::to_string(&status.metadata.headers)
181 .unwrap_or_else(|_| "[]".to_string());
182
183 let cookies_json = serde_json::to_string(&status.metadata.cookies)
185 .unwrap_or_else(|_| "[]".to_string());
186
187 let checksum_json = status
189 .metadata
190 .checksum
191 .as_ref()
192 .and_then(|c| serde_json::to_string(c).ok());
193
194 let mirrors_json = serde_json::to_string(&status.metadata.mirrors)
196 .unwrap_or_else(|_| "[]".to_string());
197
198 conn.execute(
199 r#"
200 INSERT INTO downloads (
201 id, kind, state, state_error_kind, state_error_message, state_error_retryable,
202 total_size, completed_size, download_speed, upload_speed, connections, seeders, peers,
203 priority,
204 name, url, magnet_uri, info_hash, save_dir, filename, user_agent, referer,
205 headers_json, cookies_json, checksum_json, mirrors_json,
206 etag, last_modified, created_at, completed_at
207 ) VALUES (
208 ?1, ?2, ?3, ?4, ?5, ?6,
209 ?7, ?8, ?9, ?10, ?11, ?12, ?13,
210 ?14,
211 ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22,
212 ?23, ?24, ?25, ?26,
213 ?27, ?28, ?29, ?30
214 )
215 ON CONFLICT(id) DO UPDATE SET
216 state = excluded.state,
217 state_error_kind = excluded.state_error_kind,
218 state_error_message = excluded.state_error_message,
219 state_error_retryable = excluded.state_error_retryable,
220 total_size = excluded.total_size,
221 completed_size = excluded.completed_size,
222 download_speed = excluded.download_speed,
223 upload_speed = excluded.upload_speed,
224 connections = excluded.connections,
225 seeders = excluded.seeders,
226 peers = excluded.peers,
227 priority = excluded.priority,
228 filename = excluded.filename,
229 cookies_json = excluded.cookies_json,
230 checksum_json = excluded.checksum_json,
231 mirrors_json = excluded.mirrors_json,
232 etag = excluded.etag,
233 last_modified = excluded.last_modified,
234 completed_at = excluded.completed_at
235 "#,
236 params![
237 status.id.as_uuid().to_string(),
238 kind_str,
239 state_str,
240 error_kind,
241 error_msg,
242 error_retryable,
243 status.progress.total_size.map(|s| s as i64),
244 status.progress.completed_size as i64,
245 status.progress.download_speed as i64,
246 status.progress.upload_speed as i64,
247 status.progress.connections as i64,
248 status.progress.seeders as i64,
249 status.progress.peers as i64,
250 priority_str,
251 status.metadata.name,
252 status.metadata.url,
253 status.metadata.magnet_uri,
254 status.metadata.info_hash,
255 status.metadata.save_dir.to_string_lossy().to_string(),
256 status.metadata.filename,
257 status.metadata.user_agent,
258 status.metadata.referer,
259 headers_json,
260 cookies_json,
261 checksum_json,
262 mirrors_json,
263 status.metadata.etag,
264 status.metadata.last_modified,
265 status.created_at.to_rfc3339(),
266 status.completed_at.map(|t| t.to_rfc3339()),
267 ],
268 )?;
269
270 Ok(())
271 })
272 .await
273 .map_err(|e| EngineError::Database(format!("Failed to save download: {}", e)))?
274 }
275
276 async fn load_download(&self, id: DownloadId) -> Result<Option<DownloadStatus>> {
277 let conn = self.conn.clone();
278 let id_str = id.as_uuid().to_string();
279
280 tokio::task::spawn_blocking(move || -> Result<Option<DownloadStatus>> {
281 let conn = conn.blocking_lock();
282
283 let result: Option<DownloadStatus> = conn
284 .query_row(
285 r#"
286 SELECT
287 id, kind, state, state_error_kind, state_error_message, state_error_retryable,
288 total_size, completed_size, download_speed, upload_speed, connections, seeders, peers,
289 priority,
290 name, url, magnet_uri, info_hash, save_dir, filename, user_agent, referer,
291 headers_json, cookies_json, checksum_json, mirrors_json,
292 etag, last_modified, created_at, completed_at
293 FROM downloads
294 WHERE id = ?1
295 "#,
296 params![id_str],
297 |row| {
298 row_to_status(row)
299 },
300 )
301 .optional()?;
302
303 Ok(result)
304 })
305 .await
306 .map_err(|e| EngineError::Database(format!("Failed to load download: {}", e)))?
307 }
308
309 async fn load_all(&self) -> Result<Vec<DownloadStatus>> {
310 let conn = self.conn.clone();
311
312 tokio::task::spawn_blocking(move || -> Result<Vec<DownloadStatus>> {
313 let conn = conn.blocking_lock();
314
315 let mut stmt = conn.prepare(
316 r#"
317 SELECT
318 id, kind, state, state_error_kind, state_error_message, state_error_retryable,
319 total_size, completed_size, download_speed, upload_speed, connections, seeders, peers,
320 priority,
321 name, url, magnet_uri, info_hash, save_dir, filename, user_agent, referer,
322 headers_json, cookies_json, checksum_json, mirrors_json,
323 etag, last_modified, created_at, completed_at
324 FROM downloads
325 ORDER BY created_at DESC
326 "#,
327 )?;
328
329 let iter = stmt.query_map([], row_to_status)?;
330
331 let mut results = Vec::new();
332 for status in iter {
333 results.push(status?);
334 }
335
336 Ok(results)
337 })
338 .await
339 .map_err(|e| EngineError::Database(format!("Failed to load all downloads: {}", e)))?
340 }
341
342 async fn delete_download(&self, id: DownloadId) -> Result<()> {
343 let conn = self.conn.clone();
344 let id_str = id.as_uuid().to_string();
345
346 tokio::task::spawn_blocking(move || -> Result<()> {
347 let conn = conn.blocking_lock();
348 conn.execute("DELETE FROM downloads WHERE id = ?1", params![id_str])?;
349 Ok(())
350 })
351 .await
352 .map_err(|e| EngineError::Database(format!("Failed to delete download: {}", e)))?
353 }
354
355 async fn save_segments(&self, id: DownloadId, segments: &[Segment]) -> Result<()> {
356 let conn = self.conn.clone();
357 let id_str = id.as_uuid().to_string();
358 let segments = segments.to_vec();
359
360 tokio::task::spawn_blocking(move || -> Result<()> {
361 let conn = conn.blocking_lock();
362
363 conn.execute(
365 "DELETE FROM segments WHERE download_id = ?1",
366 params![id_str],
367 )?;
368
369 let mut stmt = conn.prepare(
371 r#"
372 INSERT INTO segments (download_id, segment_index, start_byte, end_byte, downloaded, state, error_message, error_retries)
373 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
374 "#,
375 )?;
376
377 for segment in &segments {
378 let (state_str, error_msg, retries) = match &segment.state {
379 SegmentState::Pending => ("pending", None, 0),
380 SegmentState::Downloading => ("downloading", None, 0),
381 SegmentState::Completed => ("completed", None, 0),
382 SegmentState::Failed { error, retries } => {
383 ("failed", Some(error.clone()), *retries)
384 }
385 };
386
387 stmt.execute(params![
388 id_str,
389 segment.index as i64,
390 segment.start as i64,
391 segment.end as i64,
392 segment.downloaded as i64,
393 state_str,
394 error_msg,
395 retries as i64,
396 ])?;
397 }
398
399 Ok(())
400 })
401 .await
402 .map_err(|e| EngineError::Database(format!("Failed to save segments: {}", e)))?
403 }
404
405 async fn load_segments(&self, id: DownloadId) -> Result<Vec<Segment>> {
406 let conn = self.conn.clone();
407 let id_str = id.as_uuid().to_string();
408
409 tokio::task::spawn_blocking(move || -> Result<Vec<Segment>> {
410 let conn = conn.blocking_lock();
411
412 let mut stmt = conn.prepare(
413 r#"
414 SELECT segment_index, start_byte, end_byte, downloaded, state, error_message, error_retries
415 FROM segments
416 WHERE download_id = ?1
417 ORDER BY segment_index
418 "#,
419 )?;
420
421 let iter = stmt.query_map(params![id_str], |row| {
422 let index: i64 = row.get(0)?;
423 let start: i64 = row.get(1)?;
424 let end: i64 = row.get(2)?;
425 let downloaded: i64 = row.get(3)?;
426 let state_str: String = row.get(4)?;
427 let error_msg: Option<String> = row.get(5)?;
428 let retries: i64 = row.get(6)?;
429
430 let state = match state_str.as_str() {
445 "pending" => SegmentState::Pending,
446 "downloading" => SegmentState::Pending,
447 "completed" => SegmentState::Completed,
448 "failed" => SegmentState::Failed {
449 error: error_msg.unwrap_or_default(),
450 retries: retries as u32,
451 },
452 _ => SegmentState::Pending,
453 };
454
455 Ok(Segment {
456 index: index as usize,
457 start: start as u64,
458 end: end as u64,
459 downloaded: downloaded as u64,
460 state,
461 })
462 })?;
463
464 let mut segments = Vec::new();
465 for segment in iter {
466 segments.push(segment?);
467 }
468
469 Ok(segments)
470 })
471 .await
472 .map_err(|e| EngineError::Database(format!("Failed to load segments: {}", e)))?
473 }
474
475 async fn delete_segments(&self, id: DownloadId) -> Result<()> {
476 let conn = self.conn.clone();
477 let id_str = id.as_uuid().to_string();
478
479 tokio::task::spawn_blocking(move || -> Result<()> {
480 let conn = conn.blocking_lock();
481 conn.execute(
482 "DELETE FROM segments WHERE download_id = ?1",
483 params![id_str],
484 )?;
485 Ok(())
486 })
487 .await
488 .map_err(|e| EngineError::Database(format!("Failed to delete segments: {}", e)))?
489 }
490
491 async fn health_check(&self) -> Result<()> {
492 let conn = self.conn.clone();
493
494 tokio::task::spawn_blocking(move || -> Result<()> {
495 let conn = conn.blocking_lock();
496 let _: i64 = conn.query_row("SELECT 1", [], |row| row.get(0))?;
498 Ok(())
499 })
500 .await
501 .map_err(|e| EngineError::Database(format!("Health check failed: {}", e)))?
502 }
503
504 async fn compact(&self) -> Result<()> {
505 let conn = self.conn.clone();
506
507 tokio::task::spawn_blocking(move || -> Result<()> {
508 let conn = conn.blocking_lock();
509 conn.execute("VACUUM", [])?;
510 Ok(())
511 })
512 .await
513 .map_err(|e| EngineError::Database(format!("Compact failed: {}", e)))?
514 }
515}
516
517fn row_to_status(row: &rusqlite::Row<'_>) -> rusqlite::Result<DownloadStatus> {
519 let id_str: String = row.get(0)?;
520 let kind_str: String = row.get(1)?;
521 let state_str: String = row.get(2)?;
522 let error_kind: Option<String> = row.get(3)?;
523 let error_msg: Option<String> = row.get(4)?;
524 let error_retryable: Option<bool> = row.get(5)?;
525
526 let total_size: Option<i64> = row.get(6)?;
527 let completed_size: i64 = row.get(7)?;
528 let download_speed: i64 = row.get(8)?;
529 let upload_speed: i64 = row.get(9)?;
530 let connections: i64 = row.get(10)?;
531 let seeders: i64 = row.get(11)?;
532 let peers: i64 = row.get(12)?;
533
534 let priority_str: String = row.get(13)?;
535
536 let name: String = row.get(14)?;
537 let url: Option<String> = row.get(15)?;
538 let magnet_uri: Option<String> = row.get(16)?;
539 let info_hash: Option<String> = row.get(17)?;
540 let save_dir: String = row.get(18)?;
541 let filename: Option<String> = row.get(19)?;
542 let user_agent: Option<String> = row.get(20)?;
543 let referer: Option<String> = row.get(21)?;
544 let headers_json: Option<String> = row.get(22)?;
545 let cookies_json: Option<String> = row.get(23)?;
546 let checksum_json: Option<String> = row.get(24)?;
547 let mirrors_json: Option<String> = row.get(25)?;
548
549 let etag: Option<String> = row.get(26)?;
550 let last_modified: Option<String> = row.get(27)?;
551 let created_at_str: String = row.get(28)?;
552 let completed_at_str: Option<String> = row.get(29)?;
553
554 let uuid = uuid::Uuid::parse_str(&id_str).map_err(|e| {
556 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
557 })?;
558 let id = DownloadId::from_uuid(uuid);
559
560 let kind = match kind_str.as_str() {
565 "http" => DownloadKind::Http,
566 "torrent" => DownloadKind::Torrent,
567 "magnet" => DownloadKind::Magnet,
568 _ => {
569 tracing::warn!(
570 "Unknown download kind '{}' for download {}, defaulting to Http",
571 kind_str,
572 id_str
573 );
574 DownloadKind::Http
575 }
576 };
577
578 let state = match state_str.as_str() {
583 "queued" => DownloadState::Queued,
584 "connecting" => DownloadState::Connecting,
585 "downloading" => DownloadState::Downloading,
586 "seeding" => DownloadState::Seeding,
587 "paused" => DownloadState::Paused,
588 "completed" => DownloadState::Completed,
589 "error" => DownloadState::Error {
590 kind: error_kind.unwrap_or_default(),
591 message: error_msg.unwrap_or_default(),
592 retryable: error_retryable.unwrap_or(false),
593 },
594 _ => {
595 tracing::warn!(
596 "Unknown download state '{}' for download {}, defaulting to Queued",
597 state_str,
598 id_str
599 );
600 DownloadState::Queued
601 }
602 };
603
604 let priority = priority_str
606 .parse::<crate::priority_queue::DownloadPriority>()
607 .unwrap_or_default();
608
609 let headers: Vec<(String, String)> = headers_json
611 .and_then(|s| serde_json::from_str(&s).ok())
612 .unwrap_or_default();
613
614 let cookies: Vec<String> = cookies_json
616 .and_then(|s| serde_json::from_str(&s).ok())
617 .unwrap_or_default();
618
619 let checksum: Option<crate::http::ExpectedChecksum> =
621 checksum_json.and_then(|s| serde_json::from_str(&s).ok());
622
623 let mirrors: Vec<String> = mirrors_json
625 .and_then(|s| serde_json::from_str(&s).ok())
626 .unwrap_or_default();
627
628 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
630 .map(|dt| dt.with_timezone(&Utc))
631 .unwrap_or_else(|_| Utc::now());
632
633 let completed_at = completed_at_str.and_then(|s| {
634 DateTime::parse_from_rfc3339(&s)
635 .ok()
636 .map(|dt| dt.with_timezone(&Utc))
637 });
638
639 Ok(DownloadStatus {
640 id,
641 kind,
642 state,
643 priority,
644 progress: DownloadProgress {
645 total_size: total_size.map(|n| n as u64),
646 completed_size: completed_size as u64,
647 download_speed: download_speed as u64,
648 upload_speed: upload_speed as u64,
649 connections: connections as u32,
650 seeders: seeders as u32,
651 peers: peers as u32,
652 eta_seconds: None,
653 },
654 metadata: DownloadMetadata {
655 name,
656 url,
657 magnet_uri,
658 info_hash,
659 save_dir: PathBuf::from(save_dir),
660 filename,
661 user_agent,
662 referer,
663 headers,
664 cookies,
665 checksum,
666 mirrors,
667 etag,
668 last_modified,
669 },
670 torrent_info: None,
671 peers: None,
672 created_at,
673 completed_at,
674 })
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680
681 fn create_test_status() -> DownloadStatus {
682 DownloadStatus {
683 id: DownloadId::new(),
684 kind: DownloadKind::Http,
685 state: DownloadState::Downloading,
686 priority: crate::priority_queue::DownloadPriority::Normal,
687 progress: DownloadProgress {
688 total_size: Some(1000),
689 completed_size: 500,
690 download_speed: 100,
691 upload_speed: 0,
692 connections: 4,
693 seeders: 0,
694 peers: 0,
695 eta_seconds: Some(5),
696 },
697 metadata: DownloadMetadata {
698 name: "test.zip".to_string(),
699 url: Some("https://example.com/test.zip".to_string()),
700 magnet_uri: None,
701 info_hash: None,
702 save_dir: PathBuf::from("/tmp/downloads"),
703 filename: Some("test.zip".to_string()),
704 user_agent: Some("gosh-dl/0.1.0".to_string()),
705 referer: None,
706 headers: vec![("X-Custom".to_string(), "value".to_string())],
707 cookies: Vec::new(),
708 checksum: None,
709 mirrors: Vec::new(),
710 etag: None,
711 last_modified: None,
712 },
713 torrent_info: None,
714 peers: None,
715 created_at: Utc::now(),
716 completed_at: None,
717 }
718 }
719
720 #[tokio::test]
721 async fn test_sqlite_save_load() {
722 let storage = SqliteStorage::in_memory().await.unwrap();
723 let status = create_test_status();
724 let id = status.id;
725
726 storage.save_download(&status).await.unwrap();
728
729 let loaded = storage.load_download(id).await.unwrap().unwrap();
731 assert_eq!(loaded.id, id);
732 assert_eq!(loaded.metadata.name, "test.zip");
733 assert_eq!(loaded.progress.completed_size, 500);
734 }
735
736 #[tokio::test]
737 async fn test_sqlite_load_all() {
738 let storage = SqliteStorage::in_memory().await.unwrap();
739
740 for i in 0..5 {
742 let mut status = create_test_status();
743 status.metadata.name = format!("file{}.zip", i);
744 storage.save_download(&status).await.unwrap();
745 }
746
747 let all = storage.load_all().await.unwrap();
749 assert_eq!(all.len(), 5);
750 }
751
752 #[tokio::test]
753 async fn test_sqlite_delete() {
754 let storage = SqliteStorage::in_memory().await.unwrap();
755 let status = create_test_status();
756 let id = status.id;
757
758 storage.save_download(&status).await.unwrap();
759 storage.delete_download(id).await.unwrap();
760
761 let loaded = storage.load_download(id).await.unwrap();
762 assert!(loaded.is_none());
763 }
764
765 #[tokio::test]
766 async fn test_sqlite_segments() {
767 let storage = SqliteStorage::in_memory().await.unwrap();
768
769 let status = create_test_status();
771 let id = status.id;
772 storage.save_download(&status).await.unwrap();
773
774 let segments = vec![
775 Segment::new(0, 0, 999),
776 Segment {
777 index: 1,
778 start: 1000,
779 end: 1999,
780 downloaded: 500,
781 state: SegmentState::Downloading,
782 },
783 Segment {
784 index: 2,
785 start: 2000,
786 end: 2999,
787 downloaded: 1000,
788 state: SegmentState::Completed,
789 },
790 ];
791
792 storage.save_segments(id, &segments).await.unwrap();
794
795 let loaded = storage.load_segments(id).await.unwrap();
797 assert_eq!(loaded.len(), 3);
798 assert_eq!(loaded[0].start, 0);
799 assert_eq!(loaded[1].downloaded, 500);
800 assert!(matches!(loaded[2].state, SegmentState::Completed));
801 }
802
803 #[tokio::test]
804 async fn test_sqlite_update() {
805 let storage = SqliteStorage::in_memory().await.unwrap();
806 let mut status = create_test_status();
807 let id = status.id;
808
809 storage.save_download(&status).await.unwrap();
811
812 status.progress.completed_size = 800;
814 status.state = DownloadState::Completed;
815 status.completed_at = Some(Utc::now());
816 storage.save_download(&status).await.unwrap();
817
818 let loaded = storage.load_download(id).await.unwrap().unwrap();
820 assert_eq!(loaded.progress.completed_size, 800);
821 assert!(matches!(loaded.state, DownloadState::Completed));
822 assert!(loaded.completed_at.is_some());
823 }
824
825 #[tokio::test]
826 async fn test_sqlite_health_check() {
827 let storage = SqliteStorage::in_memory().await.unwrap();
828 storage.health_check().await.unwrap();
829 }
830
831 #[tokio::test]
832 async fn test_sqlite_priority_persistence() {
833 let storage = SqliteStorage::in_memory().await.unwrap();
834 let mut status = create_test_status();
835 status.priority = crate::priority_queue::DownloadPriority::High;
836 let id = status.id;
837
838 storage.save_download(&status).await.unwrap();
839 let loaded = storage.load_download(id).await.unwrap().unwrap();
840
841 assert_eq!(
842 loaded.priority,
843 crate::priority_queue::DownloadPriority::High
844 );
845 }
846
847 #[tokio::test]
848 async fn test_sqlite_cookies_persistence() {
849 let storage = SqliteStorage::in_memory().await.unwrap();
850 let mut status = create_test_status();
851 status.metadata.cookies = vec!["session=abc123".to_string(), "user=test".to_string()];
852 let id = status.id;
853
854 storage.save_download(&status).await.unwrap();
855 let loaded = storage.load_download(id).await.unwrap().unwrap();
856
857 assert_eq!(loaded.metadata.cookies.len(), 2);
858 assert!(loaded
859 .metadata
860 .cookies
861 .contains(&"session=abc123".to_string()));
862 assert!(loaded.metadata.cookies.contains(&"user=test".to_string()));
863 }
864
865 #[tokio::test]
866 async fn test_sqlite_checksum_persistence() {
867 let storage = SqliteStorage::in_memory().await.unwrap();
868 let mut status = create_test_status();
869 status.metadata.checksum = Some(crate::http::ExpectedChecksum::sha256(
870 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
871 ));
872 let id = status.id;
873
874 storage.save_download(&status).await.unwrap();
875 let loaded = storage.load_download(id).await.unwrap().unwrap();
876
877 assert!(loaded.metadata.checksum.is_some());
878 let checksum = loaded.metadata.checksum.unwrap();
879 assert_eq!(checksum.algorithm, crate::http::ChecksumAlgorithm::Sha256);
880 assert_eq!(
881 checksum.value,
882 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
883 );
884 }
885
886 #[tokio::test]
887 async fn test_sqlite_mirrors_persistence() {
888 let storage = SqliteStorage::in_memory().await.unwrap();
889 let mut status = create_test_status();
890 status.metadata.mirrors = vec![
891 "https://mirror1.example.com/file.zip".to_string(),
892 "https://mirror2.example.com/file.zip".to_string(),
893 ];
894 let id = status.id;
895
896 storage.save_download(&status).await.unwrap();
897 let loaded = storage.load_download(id).await.unwrap().unwrap();
898
899 assert_eq!(loaded.metadata.mirrors.len(), 2);
900 assert!(loaded
901 .metadata
902 .mirrors
903 .contains(&"https://mirror1.example.com/file.zip".to_string()));
904 }
905
906 #[tokio::test]
907 async fn test_sqlite_full_metadata_persistence() {
908 let storage = SqliteStorage::in_memory().await.unwrap();
910 let mut status = create_test_status();
911
912 status.priority = crate::priority_queue::DownloadPriority::Critical;
913 status.metadata.cookies = vec!["auth=token".to_string()];
914 status.metadata.checksum = Some(crate::http::ExpectedChecksum::md5(
915 "d41d8cd98f00b204e9800998ecf8427e",
916 ));
917 status.metadata.mirrors = vec!["https://backup.example.com/file.zip".to_string()];
918
919 let id = status.id;
920 storage.save_download(&status).await.unwrap();
921
922 let loaded = storage.load_download(id).await.unwrap().unwrap();
923
924 assert_eq!(
925 loaded.priority,
926 crate::priority_queue::DownloadPriority::Critical
927 );
928 assert_eq!(loaded.metadata.cookies, vec!["auth=token".to_string()]);
929 assert!(loaded.metadata.checksum.is_some());
930 assert_eq!(loaded.metadata.mirrors.len(), 1);
931 }
932}