1use super::connection::RetryPolicy;
8use super::resume::{
9 should_restart_without_ranges, validate_ranged_response, RangedResponseContext,
10};
11use super::ACCEPT_ENCODING_IDENTITY;
12use crate::error::{EngineError, NetworkErrorKind, ProtocolErrorKind, Result, StorageErrorKind};
13use crate::storage::Segment;
14use crate::types::DownloadProgress;
15
16use bytes::Bytes;
17use futures::stream::StreamExt;
18use parking_lot::RwLock;
19use reqwest::Client;
20use std::path::PathBuf;
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::fs::{File, OpenOptions};
25use tokio::io::{AsyncSeekExt, AsyncWriteExt, SeekFrom};
26use tokio::sync::Semaphore;
27use tokio_util::sync::CancellationToken;
28
29pub const MIN_SEGMENT_SIZE: u64 = 1024 * 1024;
31
32pub const DEFAULT_CONNECTIONS: usize = 16;
34
35const PROGRESS_INTERVAL: Duration = Duration::from_millis(250);
37
38const PERSISTENCE_INTERVAL: Duration = Duration::from_secs(5);
40
41fn log_progress_invariant(context: &str, progress: &DownloadProgress) {
42 if let Some(total_size) = progress.total_size {
43 if progress.completed_size > total_size {
44 debug_assert!(
45 progress.completed_size <= total_size,
46 "{} progress exceeded total size: {} > {}",
47 context,
48 progress.completed_size,
49 total_size
50 );
51 tracing::warn!(
52 "{} progress exceeded total size: {} > {}",
53 context,
54 progress.completed_size,
55 total_size
56 );
57 }
58 }
59}
60
61struct SharedState {
63 downloaded: AtomicU64,
65 speed: AtomicU64,
67 active_connections: AtomicU64,
69 paused: AtomicBool,
71 segment_progress: RwLock<Vec<u64>>,
73 last_persistence: RwLock<Instant>,
75}
76
77pub struct SegmentedDownload {
79 url: String,
81 total_size: u64,
83 save_path: PathBuf,
85 segments: Vec<Segment>,
87 #[allow(dead_code)]
89 supports_range: bool,
90 etag: Option<String>,
92 last_modified: Option<String>,
94 state: Arc<SharedState>,
96}
97
98#[derive(Debug, Clone)]
100pub struct ServerCapabilities {
101 pub content_length: Option<u64>,
103 pub supports_range: bool,
105 pub etag: Option<String>,
107 pub last_modified: Option<String>,
109 pub suggested_filename: Option<String>,
111}
112
113impl SegmentedDownload {
114 pub fn new(
116 url: String,
117 total_size: u64,
118 save_path: PathBuf,
119 supports_range: bool,
120 etag: Option<String>,
121 last_modified: Option<String>,
122 ) -> Self {
123 Self {
124 url,
125 total_size,
126 save_path,
127 segments: Vec::new(),
128 supports_range,
129 etag,
130 last_modified,
131 state: Arc::new(SharedState {
132 downloaded: AtomicU64::new(0),
133 speed: AtomicU64::new(0),
134 active_connections: AtomicU64::new(0),
135 paused: AtomicBool::new(false),
136 segment_progress: RwLock::new(Vec::new()),
137 last_persistence: RwLock::new(Instant::now()),
138 }),
139 }
140 }
141
142 pub fn init_segments(&mut self, max_connections: usize, min_segment_size: u64) {
144 let num_segments =
145 calculate_segment_count(self.total_size, max_connections, min_segment_size);
146 let segment_size = self.total_size / num_segments as u64;
147
148 let mut segments = Vec::with_capacity(num_segments);
149 for i in 0..num_segments {
150 let start = i as u64 * segment_size;
151 let end = if i == num_segments - 1 {
152 self.total_size - 1
153 } else {
154 (i as u64 + 1) * segment_size - 1
155 };
156 segments.push(Segment::new(i, start, end));
157 }
158
159 *self.state.segment_progress.write() = vec![0u64; num_segments];
161
162 self.segments = segments;
163 }
164
165 pub fn restore_segments(&mut self, saved_segments: Vec<Segment>) {
167 let downloaded: u64 = saved_segments.iter().map(|s| s.downloaded).sum();
169 self.state.downloaded.store(downloaded, Ordering::Relaxed);
170
171 let progress: Vec<u64> = saved_segments.iter().map(|s| s.downloaded).collect();
173 *self.state.segment_progress.write() = progress;
174
175 self.segments = saved_segments;
176 }
177
178 pub fn segments(&self) -> &[Segment] {
180 &self.segments
181 }
182
183 pub fn segments_with_progress(&self) -> Vec<Segment> {
187 let progress = self.state.segment_progress.read();
188 self.segments
189 .iter()
190 .enumerate()
191 .map(|(idx, s)| {
192 let mut segment = s.clone();
193 if let Some(&downloaded) = progress.get(idx) {
194 segment.downloaded = downloaded;
195 if segment.downloaded >= segment.size() {
196 segment.state = crate::storage::SegmentState::Completed;
197 } else if segment.downloaded > 0 {
198 segment.state = crate::storage::SegmentState::Downloading;
199 }
200 }
201 segment
202 })
203 .collect()
204 }
205
206 #[allow(clippy::too_many_arguments)]
208 pub async fn start<F>(
209 &self,
210 client: &Client,
211 user_agent: &str,
212 headers: &[(String, String)],
213 max_connections: usize,
214 retry_policy: &RetryPolicy,
215 cancel_token: CancellationToken,
216 progress_callback: F,
217 ) -> Result<()>
218 where
219 F: Fn(DownloadProgress) + Send + Sync + 'static,
220 {
221 let file = self.prepare_file().await?;
223 let file = Arc::new(tokio::sync::Mutex::new(file));
224
225 let semaphore = Arc::new(Semaphore::new(max_connections));
227
228 let fatal_cancel = cancel_token.child_token();
231
232 let progress_callback = Arc::new(progress_callback);
234 let last_progress = Arc::new(RwLock::new(Instant::now()));
235 let bytes_since_progress = Arc::new(AtomicU64::new(0));
236
237 let segments_data: Vec<_> = self
239 .segments
240 .iter()
241 .enumerate()
242 .filter(|(_, s)| !s.is_complete())
243 .map(|(idx, s)| (idx, s.start, s.end, s.downloaded))
244 .collect();
245
246 let mut handles = Vec::new();
248
249 for (segment_idx, start, end, already_downloaded) in segments_data {
250 let client = client.clone();
251 let url = self.url.clone();
252 let user_agent = user_agent.to_string();
253 let headers = headers.to_vec();
254 let file = Arc::clone(&file);
255 let semaphore = Arc::clone(&semaphore);
256 let cancel_token = fatal_cancel.clone();
257 let etag = self.etag.clone();
258 let last_modified = self.last_modified.clone();
259 let state = Arc::clone(&self.state);
260 let progress_callback = Arc::clone(&progress_callback);
261 let last_progress = Arc::clone(&last_progress);
262 let bytes_since_progress = Arc::clone(&bytes_since_progress);
263 let total_size = self.total_size;
264 let retry_policy = retry_policy.clone();
265
266 let handle = tokio::spawn(async move {
267 let _permit = semaphore
269 .acquire()
270 .await
271 .map_err(|_| EngineError::Shutdown)?;
272
273 if cancel_token.is_cancelled() {
275 return Ok(());
276 }
277
278 if state.paused.load(Ordering::Relaxed) {
280 return Ok(());
281 }
282
283 state.active_connections.fetch_add(1, Ordering::Relaxed);
284
285 let mut segment_bytes: u64 = already_downloaded;
287 let expected_segment_size = end - start + 1;
288 let mut last_speed_update = Instant::now();
289 let mut bytes_for_speed: u64 = 0;
290 let mut attempt = 0u32;
291
292 if start + segment_bytes > end {
294 state.active_connections.fetch_sub(1, Ordering::Relaxed);
295 return Ok(());
296 }
297
298 let result: Result<()> = 'retry: loop {
299 if cancel_token.is_cancelled() {
301 break 'retry Ok(());
302 }
303
304 let resume_start = start + segment_bytes;
306 if resume_start > end {
307 break 'retry Ok(());
308 }
309
310 let mut request = client.get(&url);
312 request = request.header("User-Agent", &user_agent);
313 request = request.header("Range", format!("bytes={}-{}", resume_start, end));
314
315 if let Some(if_range_val) = etag.as_deref().or(last_modified.as_deref()) {
317 request = request.header("If-Range", if_range_val);
318 }
319
320 for (name, value) in &headers {
322 request = request.header(name.as_str(), value.as_str());
323 }
324 request = request.header("Accept-Encoding", ACCEPT_ENCODING_IDENTITY);
325
326 let response = match request.send().await {
328 Ok(r) => r,
329 Err(e) => {
330 let err: EngineError = e.into();
331 attempt += 1;
332 if retry_policy.should_retry(attempt - 1, &err) {
333 tracing::warn!(
334 "Segment {} request failed (attempt {}/{}), retrying: {}",
335 segment_idx,
336 attempt,
337 retry_policy.max_attempts,
338 err
339 );
340 let delay = retry_policy.delay_for_attempt(attempt - 1);
341 tokio::time::sleep(delay).await;
342 continue 'retry;
343 }
344 if !err.is_retryable() {
345 cancel_token.cancel();
346 }
347 break 'retry Err(err);
348 }
349 };
350
351 let status = response.status();
352
353 if status == reqwest::StatusCode::RANGE_NOT_SATISFIABLE {
355 cancel_token.cancel();
356 break 'retry Err(EngineError::network(
357 NetworkErrorKind::HttpStatus(416),
358 format!(
359 "Segment {} range not satisfiable (file may have changed on server)",
360 segment_idx
361 ),
362 ));
363 }
364
365 if status.is_server_error() {
367 let err = EngineError::network(
368 NetworkErrorKind::HttpStatus(status.as_u16()),
369 format!("Segment {} server error: {}", segment_idx, status),
370 );
371 attempt += 1;
372 if retry_policy.should_retry(attempt - 1, &err) {
373 tracing::warn!(
374 "Segment {} server error (attempt {}/{}), retrying: {}",
375 segment_idx,
376 attempt,
377 retry_policy.max_attempts,
378 status
379 );
380 let delay = retry_policy.delay_for_attempt(attempt - 1);
381 tokio::time::sleep(delay).await;
382 continue 'retry;
383 }
384 break 'retry Err(err);
385 }
386
387 if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
388 cancel_token.cancel();
389 break 'retry Err(EngineError::network(
390 NetworkErrorKind::HttpStatus(status.as_u16()),
391 format!("Segment {} HTTP error: {}", segment_idx, status),
392 ));
393 }
394
395 if let Err(e) = validate_ranged_response(
396 resume_start,
397 Some(end),
398 status,
399 response
400 .headers()
401 .get("content-range")
402 .and_then(|v| v.to_str().ok()),
403 RangedResponseContext {
404 sent_if_range: etag.is_some() || last_modified.is_some(),
405 expected_etag: etag.as_deref(),
406 expected_last_modified: last_modified.as_deref(),
407 response_etag: response
408 .headers()
409 .get("etag")
410 .and_then(|v| v.to_str().ok()),
411 response_last_modified: response
412 .headers()
413 .get("last-modified")
414 .and_then(|v| v.to_str().ok()),
415 },
416 ) {
417 break 'retry Err(e);
418 }
419
420 let mut stream = response.bytes_stream();
422 let mut stream_failed = false;
423
424 while let Some(chunk_result) = tokio::select! {
425 chunk = stream.next() => chunk,
426 _ = cancel_token.cancelled() => None,
427 } {
428 if state.paused.load(Ordering::Relaxed) {
430 break;
431 }
432
433 let chunk: Bytes = match chunk_result {
434 Ok(c) => c,
435 Err(e) => {
436 let err: EngineError = e.into();
437 attempt += 1;
438 if retry_policy.should_retry(attempt - 1, &err) {
439 tracing::warn!(
440 "Segment {} stream error (attempt {}/{}), retrying from byte {}: {}",
441 segment_idx, attempt, retry_policy.max_attempts, segment_bytes, err
442 );
443 stream_failed = true;
444 break;
445 }
446 if !err.is_retryable() {
447 cancel_token.cancel();
448 }
449 break 'retry Err(err);
450 }
451 };
452
453 let chunk_len = chunk.len() as u64;
454
455 {
457 let mut file = file.lock().await;
458 file.seek(SeekFrom::Start(start + segment_bytes))
459 .await
460 .map_err(|e| {
461 EngineError::storage(
462 StorageErrorKind::Io,
463 PathBuf::new(),
464 format!("Seek failed: {}", e),
465 )
466 })?;
467 file.write_all(&chunk).await.map_err(|e| {
468 EngineError::storage(
469 StorageErrorKind::Io,
470 PathBuf::new(),
471 format!("Write failed: {}", e),
472 )
473 })?;
474 }
475
476 segment_bytes += chunk_len;
477 if segment_bytes > expected_segment_size {
478 break 'retry Err(EngineError::protocol(
479 ProtocolErrorKind::InvalidResponse,
480 format!(
481 "Segment {} exceeded expected size: received {} bytes, expected {} bytes",
482 segment_idx, segment_bytes, expected_segment_size
483 ),
484 ));
485 }
486
487 {
489 let mut progress = state.segment_progress.write();
490 if let Some(p) = progress.get_mut(segment_idx) {
491 *p = segment_bytes;
492 }
493 }
494
495 let total_downloaded =
497 state.downloaded.fetch_add(chunk_len, Ordering::Relaxed) + chunk_len;
498 if total_downloaded > total_size {
499 break 'retry Err(EngineError::protocol(
500 ProtocolErrorKind::InvalidResponse,
501 format!(
502 "Download exceeded expected size: received {} bytes, expected {} bytes",
503 total_downloaded, total_size
504 ),
505 ));
506 }
507 bytes_since_progress.fetch_add(chunk_len, Ordering::Relaxed);
508 bytes_for_speed += chunk_len;
509
510 let now = Instant::now();
512 let speed_elapsed = now.duration_since(last_speed_update);
513 if speed_elapsed >= Duration::from_millis(500) {
514 let current_speed =
515 (bytes_for_speed as f64 / speed_elapsed.as_secs_f64()) as u64;
516 state.speed.store(current_speed, Ordering::Relaxed);
517 bytes_for_speed = 0;
518 last_speed_update = now;
519 }
520
521 let should_emit = {
523 let mut last = last_progress.write();
524 if now.duration_since(*last) >= PROGRESS_INTERVAL {
525 *last = now;
526 bytes_since_progress.store(0, Ordering::Relaxed);
527 true
528 } else {
529 false
530 }
531 };
532
533 if should_emit {
534 let current_speed = state.speed.load(Ordering::Relaxed);
535 let connections =
536 state.active_connections.load(Ordering::Relaxed) as u32;
537
538 let progress = DownloadProgress {
539 total_size: Some(total_size),
540 completed_size: total_downloaded,
541 download_speed: current_speed,
542 upload_speed: 0,
543 connections,
544 seeders: 0,
545 peers: 0,
546 eta_seconds: if current_speed > 0 {
547 Some(
548 (total_size.saturating_sub(total_downloaded))
549 / current_speed,
550 )
551 } else {
552 None
553 },
554 };
555 log_progress_invariant("segmented http download", &progress);
556 progress_callback(progress);
557 }
558 }
559
560 if stream_failed {
561 let delay = retry_policy.delay_for_attempt(attempt - 1);
562 tokio::time::sleep(delay).await;
563 continue 'retry;
564 }
565
566 break 'retry Ok(());
568 };
569
570 state.active_connections.fetch_sub(1, Ordering::Relaxed);
571
572 result
574 });
575
576 handles.push(handle);
577 }
578
579 let mut segment_errors: Vec<String> = Vec::new();
581 let mut any_retryable = false;
582 let mut restart_without_ranges_reason: Option<String> = None;
583 for (idx, handle) in handles.into_iter().enumerate() {
584 match handle.await {
585 Err(e) => {
586 tracing::error!("Segment {} task panicked: {:?}", idx, e);
588 segment_errors.push(format!("Segment {} panicked: {:?}", idx, e));
589 }
590 Ok(Err(e)) => {
591 tracing::error!("Segment {} failed: {:?}", idx, e);
593 if e.is_retryable() {
594 any_retryable = true;
595 }
596 if restart_without_ranges_reason.is_none() && should_restart_without_ranges(&e)
597 {
598 restart_without_ranges_reason = Some(e.to_string());
599 }
600 segment_errors.push(format!("Segment {} failed: {}", idx, e));
601 }
602 Ok(Ok(())) => {
603 }
605 }
606 }
607
608 if !segment_errors.is_empty() {
610 if let Some(reason) = restart_without_ranges_reason {
611 return Err(EngineError::protocol(
612 ProtocolErrorKind::RangeNotSupported,
613 format!(
614 "Segmented download requires restart without ranges: {}",
615 reason
616 ),
617 ));
618 }
619 let kind = if any_retryable {
622 NetworkErrorKind::ConnectionReset
623 } else {
624 NetworkErrorKind::Other
625 };
626 return Err(EngineError::network(
627 kind,
628 format!(
629 "Download failed: {} segment(s) failed: {}",
630 segment_errors.len(),
631 segment_errors.join("; ")
632 ),
633 ));
634 }
635
636 {
638 let mut file = file.lock().await;
639 file.flush().await.map_err(|e| {
640 EngineError::storage(
641 StorageErrorKind::Io,
642 &self.save_path,
643 format!("Flush failed: {}", e),
644 )
645 })?;
646 file.sync_all().await.map_err(|e| {
647 EngineError::storage(
648 StorageErrorKind::Io,
649 &self.save_path,
650 format!("Sync failed: {}", e),
651 )
652 })?;
653 }
654
655 let total_downloaded = self.state.downloaded.load(Ordering::Relaxed);
657 if total_downloaded != self.total_size {
658 return Err(EngineError::protocol(
659 ProtocolErrorKind::InvalidResponse,
660 format!(
661 "Segmented download size mismatch: received {} bytes, expected {} bytes",
662 total_downloaded, self.total_size
663 ),
664 ));
665 }
666 let progress = DownloadProgress {
667 total_size: Some(self.total_size),
668 completed_size: total_downloaded,
669 download_speed: 0,
670 upload_speed: 0,
671 connections: 0,
672 seeders: 0,
673 peers: 0,
674 eta_seconds: None,
675 };
676 log_progress_invariant("segmented http download", &progress);
677 progress_callback(progress);
678
679 if total_downloaded >= self.total_size {
681 self.finalize().await?;
683 }
684
685 Ok(())
686 }
687
688 pub fn should_persist(&self) -> bool {
693 let mut last = self.state.last_persistence.write();
694 let now = Instant::now();
695 if now.duration_since(*last) >= PERSISTENCE_INTERVAL {
696 *last = now;
697 true
698 } else {
699 false
700 }
701 }
702
703 pub fn mark_persisted(&self) {
705 *self.state.last_persistence.write() = Instant::now();
706 }
707
708 async fn prepare_file(&self) -> Result<File> {
710 let part_path = self.part_path();
712
713 if let Some(parent) = part_path.parent() {
715 tokio::fs::create_dir_all(parent).await.map_err(|e| {
716 EngineError::storage(
717 StorageErrorKind::Io,
718 parent,
719 format!("Create dir failed: {}", e),
720 )
721 })?;
722 }
723
724 let file = if part_path.exists() {
726 OpenOptions::new()
727 .write(true)
728 .read(true)
729 .open(&part_path)
730 .await
731 .map_err(|e| {
732 EngineError::storage(
733 StorageErrorKind::Io,
734 &part_path,
735 format!("Open failed: {}", e),
736 )
737 })?
738 } else {
739 let file = File::create(&part_path).await.map_err(|e| {
741 EngineError::storage(
742 StorageErrorKind::Io,
743 &part_path,
744 format!("Create failed: {}", e),
745 )
746 })?;
747
748 file.set_len(self.total_size).await.map_err(|e| {
750 EngineError::storage(
751 StorageErrorKind::Io,
752 &part_path,
753 format!("Pre-allocate failed: {}", e),
754 )
755 })?;
756
757 file
758 };
759
760 Ok(file)
761 }
762
763 fn part_path(&self) -> PathBuf {
765 let ext = self
766 .save_path
767 .extension()
768 .map(|e| format!("{}.part", e.to_string_lossy()))
769 .unwrap_or_else(|| "part".to_string());
770 self.save_path.with_extension(ext)
771 }
772
773 async fn finalize(&self) -> Result<()> {
775 let part_path = self.part_path();
776 if part_path.exists() {
777 tokio::fs::rename(&part_path, &self.save_path)
778 .await
779 .map_err(|e| {
780 EngineError::storage(
781 StorageErrorKind::Io,
782 &self.save_path,
783 format!("Rename failed: {}", e),
784 )
785 })?;
786 }
787 Ok(())
788 }
789
790 pub fn pause(&self) {
792 self.state.paused.store(true, Ordering::Relaxed);
793 }
794
795 pub fn is_complete(&self) -> bool {
797 self.state.downloaded.load(Ordering::Relaxed) >= self.total_size
798 }
799
800 pub fn progress(&self) -> DownloadProgress {
802 let progress = DownloadProgress {
803 total_size: Some(self.total_size),
804 completed_size: self.state.downloaded.load(Ordering::Relaxed),
805 download_speed: self.state.speed.load(Ordering::Relaxed),
806 upload_speed: 0,
807 connections: self.state.active_connections.load(Ordering::Relaxed) as u32,
808 seeders: 0,
809 peers: 0,
810 eta_seconds: {
811 let speed = self.state.speed.load(Ordering::Relaxed);
812 let remaining = self
813 .total_size
814 .saturating_sub(self.state.downloaded.load(Ordering::Relaxed));
815 if speed > 0 {
816 Some(remaining / speed)
817 } else {
818 None
819 }
820 },
821 };
822 log_progress_invariant("segmented http download", &progress);
823 progress
824 }
825}
826
827pub fn calculate_segment_count(
829 total_size: u64,
830 max_connections: usize,
831 min_segment_size: u64,
832) -> usize {
833 if total_size == 0 {
834 return 1;
835 }
836
837 let max_segments_by_size = (total_size / min_segment_size) as usize;
839
840 let num_segments = max_connections.min(max_segments_by_size.max(1));
842
843 num_segments.max(1)
845}
846
847pub async fn probe_server(
849 client: &Client,
850 url: &str,
851 user_agent: &str,
852) -> Result<ServerCapabilities> {
853 let response = client
854 .head(url)
855 .header("User-Agent", user_agent)
856 .header("Accept-Encoding", ACCEPT_ENCODING_IDENTITY)
857 .send()
858 .await
859 .map_err(EngineError::from)?;
860
861 if !response.status().is_success() {
862 return Err(EngineError::network(
863 NetworkErrorKind::HttpStatus(response.status().as_u16()),
864 format!("HEAD request returned: {}", response.status()),
865 ));
866 }
867
868 let headers = response.headers();
869
870 let content_length = headers
871 .get("content-length")
872 .and_then(|v| v.to_str().ok())
873 .and_then(|s| s.parse::<u64>().ok());
874
875 let supports_range = headers
876 .get("accept-ranges")
877 .and_then(|v| v.to_str().ok())
878 .map(|v| v.contains("bytes"))
879 .unwrap_or(false);
880
881 let etag = headers
882 .get("etag")
883 .and_then(|v| v.to_str().ok())
884 .map(|s| s.to_string());
885
886 let last_modified = headers
887 .get("last-modified")
888 .and_then(|v| v.to_str().ok())
889 .map(|s| s.to_string());
890
891 let suggested_filename = headers
892 .get("content-disposition")
893 .and_then(|v| v.to_str().ok())
894 .and_then(parse_content_disposition);
895
896 Ok(ServerCapabilities {
897 content_length,
898 supports_range,
899 etag,
900 last_modified,
901 suggested_filename,
902 })
903}
904
905fn parse_content_disposition(header: &str) -> Option<String> {
907 if let Some(start) = header.find("filename=") {
909 let rest = &header[start + 9..];
910 if let Some(stripped) = rest.strip_prefix('"') {
911 let end = stripped.find('"')?;
912 return Some(stripped[..end].to_string());
913 } else {
914 let end = rest.find(';').unwrap_or(rest.len());
915 return Some(rest[..end].trim().to_string());
916 }
917 }
918
919 if let Some(start) = header.find("filename*=") {
920 let rest = &header[start + 10..];
921 if let Some(quote_start) = rest.find("''") {
922 let encoded = &rest[quote_start + 2..];
923 let end = encoded.find(';').unwrap_or(encoded.len());
924 if let Ok(decoded) = urlencoding::decode(&encoded[..end]) {
925 return Some(decoded.to_string());
926 }
927 }
928 }
929
930 None
931}
932
933#[cfg(test)]
934mod tests {
935 use super::*;
936
937 #[test]
938 fn test_calculate_segment_count() {
939 assert_eq!(
941 calculate_segment_count(100 * 1024 * 1024, 16, 1024 * 1024),
942 16
943 );
944
945 assert_eq!(
947 calculate_segment_count(10 * 1024 * 1024, 16, 1024 * 1024),
948 10
949 );
950
951 assert_eq!(calculate_segment_count(512 * 1024, 16, 1024 * 1024), 1);
953
954 assert_eq!(calculate_segment_count(0, 16, 1024 * 1024), 1);
956
957 assert_eq!(
959 calculate_segment_count(10 * 1024 * 1024 * 1024, 16, 1024 * 1024),
960 16
961 );
962 }
963
964 #[test]
965 fn test_segment_init() {
966 let mut download = SegmentedDownload::new(
967 "https://example.com/file.zip".to_string(),
968 100 * 1024 * 1024, PathBuf::from("/tmp/file.zip"),
970 true,
971 None,
972 None,
973 );
974
975 download.init_segments(16, 1024 * 1024);
976
977 let segments = download.segments();
978 assert_eq!(segments.len(), 16);
979
980 assert_eq!(segments[0].start, 0);
982 assert_eq!(segments[15].end, 100 * 1024 * 1024 - 1);
983
984 for i in 0..15 {
986 assert_eq!(segments[i].end + 1, segments[i + 1].start);
987 }
988 }
989
990 #[test]
991 fn test_parse_content_disposition() {
992 assert_eq!(
993 parse_content_disposition("attachment; filename=\"test.zip\""),
994 Some("test.zip".to_string())
995 );
996
997 assert_eq!(
998 parse_content_disposition("attachment; filename=test.zip"),
999 Some("test.zip".to_string())
1000 );
1001
1002 assert_eq!(
1003 parse_content_disposition("attachment; filename*=UTF-8''test%20file.zip"),
1004 Some("test file.zip".to_string())
1005 );
1006 }
1007}