Skip to main content

gosh_dl/http/
segment.rs

1//! Segmented Download Support
2//!
3//! This module provides multi-connection segmented downloads for faster
4//! HTTP/HTTPS transfers. It splits files into segments and downloads
5//! them in parallel using multiple connections.
6
7use super::connection::RetryPolicy;
8use super::resume::{
9    should_restart_without_ranges, validate_ranged_response, RangedResponseContext,
10};
11use super::ACCEPT_ENCODING_IDENTITY;
12use crate::error::{EngineError, NetworkErrorKind, ProtocolErrorKind, Result, StorageErrorKind};
13use crate::storage::Segment;
14use crate::types::DownloadProgress;
15
16use bytes::Bytes;
17use futures::stream::StreamExt;
18use parking_lot::RwLock;
19use reqwest::Client;
20use std::path::PathBuf;
21use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::fs::{File, OpenOptions};
25use tokio::io::{AsyncSeekExt, AsyncWriteExt, SeekFrom};
26use tokio::sync::Semaphore;
27use tokio_util::sync::CancellationToken;
28
29/// Minimum segment size (1 MiB)
30pub const MIN_SEGMENT_SIZE: u64 = 1024 * 1024;
31
32/// Default number of connections per download
33pub const DEFAULT_CONNECTIONS: usize = 16;
34
35/// Progress update interval
36const PROGRESS_INTERVAL: Duration = Duration::from_millis(250);
37
38/// Persistence interval for segment state
39const PERSISTENCE_INTERVAL: Duration = Duration::from_secs(5);
40
41fn log_progress_invariant(context: &str, progress: &DownloadProgress) {
42    if let Some(total_size) = progress.total_size {
43        if progress.completed_size > total_size {
44            debug_assert!(
45                progress.completed_size <= total_size,
46                "{} progress exceeded total size: {} > {}",
47                context,
48                progress.completed_size,
49                total_size
50            );
51            tracing::warn!(
52                "{} progress exceeded total size: {} > {}",
53                context,
54                progress.completed_size,
55                total_size
56            );
57        }
58    }
59}
60
61/// Shared state for a segmented download
62struct SharedState {
63    /// Total bytes downloaded across all segments
64    downloaded: AtomicU64,
65    /// Current download speed (bytes/sec)
66    speed: AtomicU64,
67    /// Number of active connections
68    active_connections: AtomicU64,
69    /// Whether download is paused
70    paused: AtomicBool,
71    /// Per-segment downloaded bytes (for tracking progress)
72    segment_progress: RwLock<Vec<u64>>,
73    /// Last persistence time
74    last_persistence: RwLock<Instant>,
75}
76
77/// Segmented download manager
78pub struct SegmentedDownload {
79    /// URL to download from
80    url: String,
81    /// Total file size
82    total_size: u64,
83    /// Path to save the file
84    save_path: PathBuf,
85    /// Segments
86    segments: Vec<Segment>,
87    /// Whether server supports range requests (stored for resume validation)
88    #[allow(dead_code)]
89    supports_range: bool,
90    /// ETag for validation
91    etag: Option<String>,
92    /// Last-Modified for validation
93    last_modified: Option<String>,
94    /// Shared state (wrapped in Arc for task sharing)
95    state: Arc<SharedState>,
96}
97
98/// Server capabilities determined from HEAD request
99#[derive(Debug, Clone)]
100pub struct ServerCapabilities {
101    /// Content-Length header value
102    pub content_length: Option<u64>,
103    /// Whether server supports Range requests
104    pub supports_range: bool,
105    /// ETag header for validation
106    pub etag: Option<String>,
107    /// Last-Modified header for validation
108    pub last_modified: Option<String>,
109    /// Suggested filename from Content-Disposition
110    pub suggested_filename: Option<String>,
111}
112
113impl SegmentedDownload {
114    /// Create a new segmented download
115    pub fn new(
116        url: String,
117        total_size: u64,
118        save_path: PathBuf,
119        supports_range: bool,
120        etag: Option<String>,
121        last_modified: Option<String>,
122    ) -> Self {
123        Self {
124            url,
125            total_size,
126            save_path,
127            segments: Vec::new(),
128            supports_range,
129            etag,
130            last_modified,
131            state: Arc::new(SharedState {
132                downloaded: AtomicU64::new(0),
133                speed: AtomicU64::new(0),
134                active_connections: AtomicU64::new(0),
135                paused: AtomicBool::new(false),
136                segment_progress: RwLock::new(Vec::new()),
137                last_persistence: RwLock::new(Instant::now()),
138            }),
139        }
140    }
141
142    /// Initialize segments for a new download
143    pub fn init_segments(&mut self, max_connections: usize, min_segment_size: u64) {
144        let num_segments =
145            calculate_segment_count(self.total_size, max_connections, min_segment_size);
146        let segment_size = self.total_size / num_segments as u64;
147
148        let mut segments = Vec::with_capacity(num_segments);
149        for i in 0..num_segments {
150            let start = i as u64 * segment_size;
151            let end = if i == num_segments - 1 {
152                self.total_size - 1
153            } else {
154                (i as u64 + 1) * segment_size - 1
155            };
156            segments.push(Segment::new(i, start, end));
157        }
158
159        // Initialize segment progress tracking
160        *self.state.segment_progress.write() = vec![0u64; num_segments];
161
162        self.segments = segments;
163    }
164
165    /// Restore segments from saved state
166    pub fn restore_segments(&mut self, saved_segments: Vec<Segment>) {
167        // Calculate total already downloaded
168        let downloaded: u64 = saved_segments.iter().map(|s| s.downloaded).sum();
169        self.state.downloaded.store(downloaded, Ordering::Relaxed);
170
171        // Initialize segment progress tracking with saved values
172        let progress: Vec<u64> = saved_segments.iter().map(|s| s.downloaded).collect();
173        *self.state.segment_progress.write() = progress;
174
175        self.segments = saved_segments;
176    }
177
178    /// Get current segments
179    pub fn segments(&self) -> &[Segment] {
180        &self.segments
181    }
182
183    /// Get segments with current progress updated
184    ///
185    /// This creates a snapshot of the current segment state for persistence.
186    pub fn segments_with_progress(&self) -> Vec<Segment> {
187        let progress = self.state.segment_progress.read();
188        self.segments
189            .iter()
190            .enumerate()
191            .map(|(idx, s)| {
192                let mut segment = s.clone();
193                if let Some(&downloaded) = progress.get(idx) {
194                    segment.downloaded = downloaded;
195                    if segment.downloaded >= segment.size() {
196                        segment.state = crate::storage::SegmentState::Completed;
197                    } else if segment.downloaded > 0 {
198                        segment.state = crate::storage::SegmentState::Downloading;
199                    }
200                }
201                segment
202            })
203            .collect()
204    }
205
206    /// Start the segmented download
207    #[allow(clippy::too_many_arguments)]
208    pub async fn start<F>(
209        &self,
210        client: &Client,
211        user_agent: &str,
212        headers: &[(String, String)],
213        max_connections: usize,
214        retry_policy: &RetryPolicy,
215        cancel_token: CancellationToken,
216        progress_callback: F,
217    ) -> Result<()>
218    where
219        F: Fn(DownloadProgress) + Send + Sync + 'static,
220    {
221        // Create/open the file and pre-allocate space
222        let file = self.prepare_file().await?;
223        let file = Arc::new(tokio::sync::Mutex::new(file));
224
225        // Create semaphore for connection limiting
226        let semaphore = Arc::new(Semaphore::new(max_connections));
227
228        // Child cancel token: cancelled on fatal (non-retryable) segment errors
229        // so sibling segments stop promptly instead of wasting bandwidth
230        let fatal_cancel = cancel_token.child_token();
231
232        // Shared state for progress tracking
233        let progress_callback = Arc::new(progress_callback);
234        let last_progress = Arc::new(RwLock::new(Instant::now()));
235        let bytes_since_progress = Arc::new(AtomicU64::new(0));
236
237        // Clone segments data for tasks
238        let segments_data: Vec<_> = self
239            .segments
240            .iter()
241            .enumerate()
242            .filter(|(_, s)| !s.is_complete())
243            .map(|(idx, s)| (idx, s.start, s.end, s.downloaded))
244            .collect();
245
246        // Spawn tasks for each pending segment
247        let mut handles = Vec::new();
248
249        for (segment_idx, start, end, already_downloaded) in segments_data {
250            let client = client.clone();
251            let url = self.url.clone();
252            let user_agent = user_agent.to_string();
253            let headers = headers.to_vec();
254            let file = Arc::clone(&file);
255            let semaphore = Arc::clone(&semaphore);
256            let cancel_token = fatal_cancel.clone();
257            let etag = self.etag.clone();
258            let last_modified = self.last_modified.clone();
259            let state = Arc::clone(&self.state);
260            let progress_callback = Arc::clone(&progress_callback);
261            let last_progress = Arc::clone(&last_progress);
262            let bytes_since_progress = Arc::clone(&bytes_since_progress);
263            let total_size = self.total_size;
264            let retry_policy = retry_policy.clone();
265
266            let handle = tokio::spawn(async move {
267                // Acquire permit
268                let _permit = semaphore
269                    .acquire()
270                    .await
271                    .map_err(|_| EngineError::Shutdown)?;
272
273                // Check cancellation
274                if cancel_token.is_cancelled() {
275                    return Ok(());
276                }
277
278                // Check if paused
279                if state.paused.load(Ordering::Relaxed) {
280                    return Ok(());
281                }
282
283                state.active_connections.fetch_add(1, Ordering::Relaxed);
284
285                // Persistent state across retries
286                let mut segment_bytes: u64 = already_downloaded;
287                let expected_segment_size = end - start + 1;
288                let mut last_speed_update = Instant::now();
289                let mut bytes_for_speed: u64 = 0;
290                let mut attempt = 0u32;
291
292                // Check if already complete before entering retry loop
293                if start + segment_bytes > end {
294                    state.active_connections.fetch_sub(1, Ordering::Relaxed);
295                    return Ok(());
296                }
297
298                let result: Result<()> = 'retry: loop {
299                    // Check cancellation between retries
300                    if cancel_token.is_cancelled() {
301                        break 'retry Ok(());
302                    }
303
304                    // Calculate resume position from current progress
305                    let resume_start = start + segment_bytes;
306                    if resume_start > end {
307                        break 'retry Ok(());
308                    }
309
310                    // Build request with Range header
311                    let mut request = client.get(&url);
312                    request = request.header("User-Agent", &user_agent);
313                    request = request.header("Range", format!("bytes={}-{}", resume_start, end));
314
315                    // Add ETag for validation if available
316                    if let Some(if_range_val) = etag.as_deref().or(last_modified.as_deref()) {
317                        request = request.header("If-Range", if_range_val);
318                    }
319
320                    // Add custom headers
321                    for (name, value) in &headers {
322                        request = request.header(name.as_str(), value.as_str());
323                    }
324                    request = request.header("Accept-Encoding", ACCEPT_ENCODING_IDENTITY);
325
326                    // Send request
327                    let response = match request.send().await {
328                        Ok(r) => r,
329                        Err(e) => {
330                            let err: EngineError = e.into();
331                            attempt += 1;
332                            if retry_policy.should_retry(attempt - 1, &err) {
333                                tracing::warn!(
334                                    "Segment {} request failed (attempt {}/{}), retrying: {}",
335                                    segment_idx,
336                                    attempt,
337                                    retry_policy.max_attempts,
338                                    err
339                                );
340                                let delay = retry_policy.delay_for_attempt(attempt - 1);
341                                tokio::time::sleep(delay).await;
342                                continue 'retry;
343                            }
344                            if !err.is_retryable() {
345                                cancel_token.cancel();
346                            }
347                            break 'retry Err(err);
348                        }
349                    };
350
351                    let status = response.status();
352
353                    // Handle 416 Range Not Satisfiable — not retryable
354                    if status == reqwest::StatusCode::RANGE_NOT_SATISFIABLE {
355                        cancel_token.cancel();
356                        break 'retry Err(EngineError::network(
357                            NetworkErrorKind::HttpStatus(416),
358                            format!(
359                                "Segment {} range not satisfiable (file may have changed on server)",
360                                segment_idx
361                            ),
362                        ));
363                    }
364
365                    // Handle server errors (5xx) with retry
366                    if status.is_server_error() {
367                        let err = EngineError::network(
368                            NetworkErrorKind::HttpStatus(status.as_u16()),
369                            format!("Segment {} server error: {}", segment_idx, status),
370                        );
371                        attempt += 1;
372                        if retry_policy.should_retry(attempt - 1, &err) {
373                            tracing::warn!(
374                                "Segment {} server error (attempt {}/{}), retrying: {}",
375                                segment_idx,
376                                attempt,
377                                retry_policy.max_attempts,
378                                status
379                            );
380                            let delay = retry_policy.delay_for_attempt(attempt - 1);
381                            tokio::time::sleep(delay).await;
382                            continue 'retry;
383                        }
384                        break 'retry Err(err);
385                    }
386
387                    if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
388                        cancel_token.cancel();
389                        break 'retry Err(EngineError::network(
390                            NetworkErrorKind::HttpStatus(status.as_u16()),
391                            format!("Segment {} HTTP error: {}", segment_idx, status),
392                        ));
393                    }
394
395                    if let Err(e) = validate_ranged_response(
396                        resume_start,
397                        Some(end),
398                        status,
399                        response
400                            .headers()
401                            .get("content-range")
402                            .and_then(|v| v.to_str().ok()),
403                        RangedResponseContext {
404                            sent_if_range: etag.is_some() || last_modified.is_some(),
405                            expected_etag: etag.as_deref(),
406                            expected_last_modified: last_modified.as_deref(),
407                            response_etag: response
408                                .headers()
409                                .get("etag")
410                                .and_then(|v| v.to_str().ok()),
411                            response_last_modified: response
412                                .headers()
413                                .get("last-modified")
414                                .and_then(|v| v.to_str().ok()),
415                        },
416                    ) {
417                        break 'retry Err(e);
418                    }
419
420                    // Stream data to file
421                    let mut stream = response.bytes_stream();
422                    let mut stream_failed = false;
423
424                    while let Some(chunk_result) = tokio::select! {
425                        chunk = stream.next() => chunk,
426                        _ = cancel_token.cancelled() => None,
427                    } {
428                        // Check pause
429                        if state.paused.load(Ordering::Relaxed) {
430                            break;
431                        }
432
433                        let chunk: Bytes = match chunk_result {
434                            Ok(c) => c,
435                            Err(e) => {
436                                let err: EngineError = e.into();
437                                attempt += 1;
438                                if retry_policy.should_retry(attempt - 1, &err) {
439                                    tracing::warn!(
440                                        "Segment {} stream error (attempt {}/{}), retrying from byte {}: {}",
441                                        segment_idx, attempt, retry_policy.max_attempts, segment_bytes, err
442                                    );
443                                    stream_failed = true;
444                                    break;
445                                }
446                                if !err.is_retryable() {
447                                    cancel_token.cancel();
448                                }
449                                break 'retry Err(err);
450                            }
451                        };
452
453                        let chunk_len = chunk.len() as u64;
454
455                        // Write to file at correct offset
456                        {
457                            let mut file = file.lock().await;
458                            file.seek(SeekFrom::Start(start + segment_bytes))
459                                .await
460                                .map_err(|e| {
461                                    EngineError::storage(
462                                        StorageErrorKind::Io,
463                                        PathBuf::new(),
464                                        format!("Seek failed: {}", e),
465                                    )
466                                })?;
467                            file.write_all(&chunk).await.map_err(|e| {
468                                EngineError::storage(
469                                    StorageErrorKind::Io,
470                                    PathBuf::new(),
471                                    format!("Write failed: {}", e),
472                                )
473                            })?;
474                        }
475
476                        segment_bytes += chunk_len;
477                        if segment_bytes > expected_segment_size {
478                            break 'retry Err(EngineError::protocol(
479                                ProtocolErrorKind::InvalidResponse,
480                                format!(
481                                    "Segment {} exceeded expected size: received {} bytes, expected {} bytes",
482                                    segment_idx, segment_bytes, expected_segment_size
483                                ),
484                            ));
485                        }
486
487                        // Update segment progress for persistence
488                        {
489                            let mut progress = state.segment_progress.write();
490                            if let Some(p) = progress.get_mut(segment_idx) {
491                                *p = segment_bytes;
492                            }
493                        }
494
495                        // Update global counters
496                        let total_downloaded =
497                            state.downloaded.fetch_add(chunk_len, Ordering::Relaxed) + chunk_len;
498                        if total_downloaded > total_size {
499                            break 'retry Err(EngineError::protocol(
500                                ProtocolErrorKind::InvalidResponse,
501                                format!(
502                                    "Download exceeded expected size: received {} bytes, expected {} bytes",
503                                    total_downloaded, total_size
504                                ),
505                            ));
506                        }
507                        bytes_since_progress.fetch_add(chunk_len, Ordering::Relaxed);
508                        bytes_for_speed += chunk_len;
509
510                        // Update speed calculation
511                        let now = Instant::now();
512                        let speed_elapsed = now.duration_since(last_speed_update);
513                        if speed_elapsed >= Duration::from_millis(500) {
514                            let current_speed =
515                                (bytes_for_speed as f64 / speed_elapsed.as_secs_f64()) as u64;
516                            state.speed.store(current_speed, Ordering::Relaxed);
517                            bytes_for_speed = 0;
518                            last_speed_update = now;
519                        }
520
521                        // Emit progress at intervals
522                        let should_emit = {
523                            let mut last = last_progress.write();
524                            if now.duration_since(*last) >= PROGRESS_INTERVAL {
525                                *last = now;
526                                bytes_since_progress.store(0, Ordering::Relaxed);
527                                true
528                            } else {
529                                false
530                            }
531                        };
532
533                        if should_emit {
534                            let current_speed = state.speed.load(Ordering::Relaxed);
535                            let connections =
536                                state.active_connections.load(Ordering::Relaxed) as u32;
537
538                            let progress = DownloadProgress {
539                                total_size: Some(total_size),
540                                completed_size: total_downloaded,
541                                download_speed: current_speed,
542                                upload_speed: 0,
543                                connections,
544                                seeders: 0,
545                                peers: 0,
546                                eta_seconds: if current_speed > 0 {
547                                    Some(
548                                        (total_size.saturating_sub(total_downloaded))
549                                            / current_speed,
550                                    )
551                                } else {
552                                    None
553                                },
554                            };
555                            log_progress_invariant("segmented http download", &progress);
556                            progress_callback(progress);
557                        }
558                    }
559
560                    if stream_failed {
561                        let delay = retry_policy.delay_for_attempt(attempt - 1);
562                        tokio::time::sleep(delay).await;
563                        continue 'retry;
564                    }
565
566                    // Stream completed successfully (or was paused/cancelled)
567                    break 'retry Ok(());
568                };
569
570                state.active_connections.fetch_sub(1, Ordering::Relaxed);
571
572                // Return the result from the retry loop
573                result
574            });
575
576            handles.push(handle);
577        }
578
579        // Wait for all segment tasks to complete and collect errors
580        let mut segment_errors: Vec<String> = Vec::new();
581        let mut any_retryable = false;
582        let mut restart_without_ranges_reason: Option<String> = None;
583        for (idx, handle) in handles.into_iter().enumerate() {
584            match handle.await {
585                Err(e) => {
586                    // Task panicked
587                    tracing::error!("Segment {} task panicked: {:?}", idx, e);
588                    segment_errors.push(format!("Segment {} panicked: {:?}", idx, e));
589                }
590                Ok(Err(e)) => {
591                    // Task returned an error
592                    tracing::error!("Segment {} failed: {:?}", idx, e);
593                    if e.is_retryable() {
594                        any_retryable = true;
595                    }
596                    if restart_without_ranges_reason.is_none() && should_restart_without_ranges(&e)
597                    {
598                        restart_without_ranges_reason = Some(e.to_string());
599                    }
600                    segment_errors.push(format!("Segment {} failed: {}", idx, e));
601                }
602                Ok(Ok(())) => {
603                    // Task completed successfully
604                }
605            }
606        }
607
608        // If any segments failed, return error
609        if !segment_errors.is_empty() {
610            if let Some(reason) = restart_without_ranges_reason {
611                return Err(EngineError::protocol(
612                    ProtocolErrorKind::RangeNotSupported,
613                    format!(
614                        "Segmented download requires restart without ranges: {}",
615                        reason
616                    ),
617                ));
618            }
619            // Preserve retryability: if any segment had a retryable error,
620            // the aggregate should also be retryable so the engine can retry
621            let kind = if any_retryable {
622                NetworkErrorKind::ConnectionReset
623            } else {
624                NetworkErrorKind::Other
625            };
626            return Err(EngineError::network(
627                kind,
628                format!(
629                    "Download failed: {} segment(s) failed: {}",
630                    segment_errors.len(),
631                    segment_errors.join("; ")
632                ),
633            ));
634        }
635
636        // Sync file to disk
637        {
638            let mut file = file.lock().await;
639            file.flush().await.map_err(|e| {
640                EngineError::storage(
641                    StorageErrorKind::Io,
642                    &self.save_path,
643                    format!("Flush failed: {}", e),
644                )
645            })?;
646            file.sync_all().await.map_err(|e| {
647                EngineError::storage(
648                    StorageErrorKind::Io,
649                    &self.save_path,
650                    format!("Sync failed: {}", e),
651                )
652            })?;
653        }
654
655        // Final progress update
656        let total_downloaded = self.state.downloaded.load(Ordering::Relaxed);
657        if total_downloaded != self.total_size {
658            return Err(EngineError::protocol(
659                ProtocolErrorKind::InvalidResponse,
660                format!(
661                    "Segmented download size mismatch: received {} bytes, expected {} bytes",
662                    total_downloaded, self.total_size
663                ),
664            ));
665        }
666        let progress = DownloadProgress {
667            total_size: Some(self.total_size),
668            completed_size: total_downloaded,
669            download_speed: 0,
670            upload_speed: 0,
671            connections: 0,
672            seeders: 0,
673            peers: 0,
674            eta_seconds: None,
675        };
676        log_progress_invariant("segmented http download", &progress);
677        progress_callback(progress);
678
679        // Check if complete
680        if total_downloaded >= self.total_size {
681            // Rename from .part to final name
682            self.finalize().await?;
683        }
684
685        Ok(())
686    }
687
688    /// Check if persistence is due based on the time interval.
689    ///
690    /// Returns true if enough time has passed since the last persistence,
691    /// and resets the timer if so.
692    pub fn should_persist(&self) -> bool {
693        let mut last = self.state.last_persistence.write();
694        let now = Instant::now();
695        if now.duration_since(*last) >= PERSISTENCE_INTERVAL {
696            *last = now;
697            true
698        } else {
699            false
700        }
701    }
702
703    /// Force mark persistence as done (call after successful save).
704    pub fn mark_persisted(&self) {
705        *self.state.last_persistence.write() = Instant::now();
706    }
707
708    /// Prepare the output file
709    async fn prepare_file(&self) -> Result<File> {
710        // Use .part extension during download
711        let part_path = self.part_path();
712
713        // Ensure parent directory exists
714        if let Some(parent) = part_path.parent() {
715            tokio::fs::create_dir_all(parent).await.map_err(|e| {
716                EngineError::storage(
717                    StorageErrorKind::Io,
718                    parent,
719                    format!("Create dir failed: {}", e),
720                )
721            })?;
722        }
723
724        // Check if file exists (for resume)
725        let file = if part_path.exists() {
726            OpenOptions::new()
727                .write(true)
728                .read(true)
729                .open(&part_path)
730                .await
731                .map_err(|e| {
732                    EngineError::storage(
733                        StorageErrorKind::Io,
734                        &part_path,
735                        format!("Open failed: {}", e),
736                    )
737                })?
738        } else {
739            // Create new file and pre-allocate
740            let file = File::create(&part_path).await.map_err(|e| {
741                EngineError::storage(
742                    StorageErrorKind::Io,
743                    &part_path,
744                    format!("Create failed: {}", e),
745                )
746            })?;
747
748            // Pre-allocate space
749            file.set_len(self.total_size).await.map_err(|e| {
750                EngineError::storage(
751                    StorageErrorKind::Io,
752                    &part_path,
753                    format!("Pre-allocate failed: {}", e),
754                )
755            })?;
756
757            file
758        };
759
760        Ok(file)
761    }
762
763    /// Get the .part file path
764    fn part_path(&self) -> PathBuf {
765        let ext = self
766            .save_path
767            .extension()
768            .map(|e| format!("{}.part", e.to_string_lossy()))
769            .unwrap_or_else(|| "part".to_string());
770        self.save_path.with_extension(ext)
771    }
772
773    /// Rename .part file to final name
774    async fn finalize(&self) -> Result<()> {
775        let part_path = self.part_path();
776        if part_path.exists() {
777            tokio::fs::rename(&part_path, &self.save_path)
778                .await
779                .map_err(|e| {
780                    EngineError::storage(
781                        StorageErrorKind::Io,
782                        &self.save_path,
783                        format!("Rename failed: {}", e),
784                    )
785                })?;
786        }
787        Ok(())
788    }
789
790    /// Pause the download
791    pub fn pause(&self) {
792        self.state.paused.store(true, Ordering::Relaxed);
793    }
794
795    /// Check if download is complete
796    pub fn is_complete(&self) -> bool {
797        self.state.downloaded.load(Ordering::Relaxed) >= self.total_size
798    }
799
800    /// Get current progress
801    pub fn progress(&self) -> DownloadProgress {
802        let progress = DownloadProgress {
803            total_size: Some(self.total_size),
804            completed_size: self.state.downloaded.load(Ordering::Relaxed),
805            download_speed: self.state.speed.load(Ordering::Relaxed),
806            upload_speed: 0,
807            connections: self.state.active_connections.load(Ordering::Relaxed) as u32,
808            seeders: 0,
809            peers: 0,
810            eta_seconds: {
811                let speed = self.state.speed.load(Ordering::Relaxed);
812                let remaining = self
813                    .total_size
814                    .saturating_sub(self.state.downloaded.load(Ordering::Relaxed));
815                if speed > 0 {
816                    Some(remaining / speed)
817                } else {
818                    None
819                }
820            },
821        };
822        log_progress_invariant("segmented http download", &progress);
823        progress
824    }
825}
826
827/// Calculate optimal number of segments based on file size and constraints
828pub fn calculate_segment_count(
829    total_size: u64,
830    max_connections: usize,
831    min_segment_size: u64,
832) -> usize {
833    if total_size == 0 {
834        return 1;
835    }
836
837    // Calculate maximum segments based on min_segment_size
838    let max_segments_by_size = (total_size / min_segment_size) as usize;
839
840    // Use the smaller of max_connections and max_segments_by_size
841    let num_segments = max_connections.min(max_segments_by_size.max(1));
842
843    // Ensure at least 1 segment
844    num_segments.max(1)
845}
846
847/// Probe server capabilities with a HEAD request
848pub async fn probe_server(
849    client: &Client,
850    url: &str,
851    user_agent: &str,
852) -> Result<ServerCapabilities> {
853    let response = client
854        .head(url)
855        .header("User-Agent", user_agent)
856        .header("Accept-Encoding", ACCEPT_ENCODING_IDENTITY)
857        .send()
858        .await
859        .map_err(EngineError::from)?;
860
861    if !response.status().is_success() {
862        return Err(EngineError::network(
863            NetworkErrorKind::HttpStatus(response.status().as_u16()),
864            format!("HEAD request returned: {}", response.status()),
865        ));
866    }
867
868    let headers = response.headers();
869
870    let content_length = headers
871        .get("content-length")
872        .and_then(|v| v.to_str().ok())
873        .and_then(|s| s.parse::<u64>().ok());
874
875    let supports_range = headers
876        .get("accept-ranges")
877        .and_then(|v| v.to_str().ok())
878        .map(|v| v.contains("bytes"))
879        .unwrap_or(false);
880
881    let etag = headers
882        .get("etag")
883        .and_then(|v| v.to_str().ok())
884        .map(|s| s.to_string());
885
886    let last_modified = headers
887        .get("last-modified")
888        .and_then(|v| v.to_str().ok())
889        .map(|s| s.to_string());
890
891    let suggested_filename = headers
892        .get("content-disposition")
893        .and_then(|v| v.to_str().ok())
894        .and_then(parse_content_disposition);
895
896    Ok(ServerCapabilities {
897        content_length,
898        supports_range,
899        etag,
900        last_modified,
901        suggested_filename,
902    })
903}
904
905/// Parse filename from Content-Disposition header
906fn parse_content_disposition(header: &str) -> Option<String> {
907    // Look for filename="..." or filename*=UTF-8''...
908    if let Some(start) = header.find("filename=") {
909        let rest = &header[start + 9..];
910        if let Some(stripped) = rest.strip_prefix('"') {
911            let end = stripped.find('"')?;
912            return Some(stripped[..end].to_string());
913        } else {
914            let end = rest.find(';').unwrap_or(rest.len());
915            return Some(rest[..end].trim().to_string());
916        }
917    }
918
919    if let Some(start) = header.find("filename*=") {
920        let rest = &header[start + 10..];
921        if let Some(quote_start) = rest.find("''") {
922            let encoded = &rest[quote_start + 2..];
923            let end = encoded.find(';').unwrap_or(encoded.len());
924            if let Ok(decoded) = urlencoding::decode(&encoded[..end]) {
925                return Some(decoded.to_string());
926            }
927        }
928    }
929
930    None
931}
932
933#[cfg(test)]
934mod tests {
935    use super::*;
936
937    #[test]
938    fn test_calculate_segment_count() {
939        // 100MB file, 16 connections, 1MB min
940        assert_eq!(
941            calculate_segment_count(100 * 1024 * 1024, 16, 1024 * 1024),
942            16
943        );
944
945        // 10MB file, 16 connections, 1MB min -> only 10 segments
946        assert_eq!(
947            calculate_segment_count(10 * 1024 * 1024, 16, 1024 * 1024),
948            10
949        );
950
951        // 500KB file, 16 connections, 1MB min -> 1 segment
952        assert_eq!(calculate_segment_count(512 * 1024, 16, 1024 * 1024), 1);
953
954        // Empty file
955        assert_eq!(calculate_segment_count(0, 16, 1024 * 1024), 1);
956
957        // Very large file
958        assert_eq!(
959            calculate_segment_count(10 * 1024 * 1024 * 1024, 16, 1024 * 1024),
960            16
961        );
962    }
963
964    #[test]
965    fn test_segment_init() {
966        let mut download = SegmentedDownload::new(
967            "https://example.com/file.zip".to_string(),
968            100 * 1024 * 1024, // 100MB
969            PathBuf::from("/tmp/file.zip"),
970            true,
971            None,
972            None,
973        );
974
975        download.init_segments(16, 1024 * 1024);
976
977        let segments = download.segments();
978        assert_eq!(segments.len(), 16);
979
980        // Check segment boundaries
981        assert_eq!(segments[0].start, 0);
982        assert_eq!(segments[15].end, 100 * 1024 * 1024 - 1);
983
984        // Check segments are contiguous
985        for i in 0..15 {
986            assert_eq!(segments[i].end + 1, segments[i + 1].start);
987        }
988    }
989
990    #[test]
991    fn test_parse_content_disposition() {
992        assert_eq!(
993            parse_content_disposition("attachment; filename=\"test.zip\""),
994            Some("test.zip".to_string())
995        );
996
997        assert_eq!(
998            parse_content_disposition("attachment; filename=test.zip"),
999            Some("test.zip".to_string())
1000        );
1001
1002        assert_eq!(
1003            parse_content_disposition("attachment; filename*=UTF-8''test%20file.zip"),
1004            Some("test file.zip".to_string())
1005        );
1006    }
1007}