docker_image_pusher/upload/
streaming.rs

1//! Streaming upload implementation for very large files
2
3use crate::error::{Result, PusherError};
4use crate::error::handlers::NetworkErrorHandler;
5use crate::output::OutputManager;
6use reqwest::{Client, header::CONTENT_TYPE, Body};
7use std::path::Path;
8use std::io::SeekFrom;
9use std::time::Duration;
10use tokio::time::sleep;
11use tokio::io::{AsyncReadExt, AsyncSeekExt};
12use tokio_util::io::ReaderStream;
13
14pub struct StreamingUploader {
15    client: Client,
16    max_retries: usize,
17    retry_delay: Duration,
18    timeout: Duration,
19    output: OutputManager,
20}
21
22impl StreamingUploader {
23    pub fn new(
24        client: Client,
25        max_retries: usize,
26        timeout_seconds: u64,
27        output: OutputManager,
28    ) -> Self {
29        Self {
30            client,
31            max_retries,
32            retry_delay: Duration::from_secs(10),
33            timeout: Duration::from_secs(timeout_seconds),
34            output,
35        }
36    }
37
38    pub async fn upload_from_tar_entry<P>(
39        &self,
40        tar_path: &Path,
41        _entry_path: &str,
42        entry_offset: u64,
43        entry_size: u64,
44        upload_url: &str,
45        digest: &str,
46        token: &Option<String>,
47        progress_callback: P,
48    ) -> Result<()>
49    where
50        P: Fn(u64, u64) + Send + Sync + 'static,
51    {
52        self.output.step(&format!("Starting streaming upload for {} ({})", 
53                 &digest[..16], self.output.format_size(entry_size)));
54
55        for attempt in 1..=self.max_retries {
56            self.output.detail(&format!("Streaming upload attempt {} of {}", attempt, self.max_retries));
57            
58            match self.try_streaming_upload(
59                tar_path, 
60                entry_offset, 
61                entry_size, 
62                upload_url, 
63                digest, 
64                token, 
65                &progress_callback
66            ).await {
67                Ok(_) => {
68                    progress_callback(entry_size, entry_size);
69                    self.output.success(&format!("Streaming upload completed successfully on attempt {}", attempt));
70                    return Ok(());
71                }
72                Err(e) if attempt < self.max_retries => {
73                    self.output.warning(&format!("Streaming attempt {} failed: {}", attempt, e));
74                    self.output.info(&format!("Waiting {}s before retry...", self.retry_delay.as_secs()));
75                    sleep(self.retry_delay).await;
76                }
77                Err(e) => {
78                    self.output.error(&format!("All {} streaming attempts failed. Last error: {}", self.max_retries, e));
79                    return Err(e);
80                }
81            }
82        }
83        
84        Err(PusherError::Upload("All streaming upload attempts failed".to_string()))
85    }
86
87    async fn try_streaming_upload<P>(
88        &self,
89        tar_path: &Path,
90        entry_offset: u64,
91        entry_size: u64,
92        upload_url: &str,
93        digest: &str,
94        token: &Option<String>,
95        progress_callback: &P,
96    ) -> Result<()>
97    where
98        P: Fn(u64, u64) + Send + Sync + 'static,
99    {
100        // Fix URL construction to match the chunked uploader
101        let url = if upload_url.contains('?') {
102            format!("{}&digest={}", upload_url, digest)
103        } else {
104            format!("{}?digest={}", upload_url, digest)
105        };
106
107        // Open async file and seek to the correct position
108        let mut async_file = tokio::fs::File::open(tar_path).await
109            .map_err(|e| PusherError::Io(e.to_string()))?;
110        
111        async_file.seek(SeekFrom::Start(entry_offset)).await
112            .map_err(|e| PusherError::Io(e.to_string()))?;
113        
114        // Create a limited async reader
115        let limited_async_reader = async_file.take(entry_size);
116        let stream = ReaderStream::new(limited_async_reader);
117        let body = Body::wrap_stream(stream);
118
119        let mut request = self.client
120            .put(&url)
121            .header(CONTENT_TYPE, "application/octet-stream")
122            .header("Content-Length", entry_size.to_string())
123            .timeout(self.timeout)
124            .body(body);
125
126        if let Some(token) = token {
127            request = request.bearer_auth(token);
128        }
129
130        progress_callback(0, entry_size);
131        
132        self.output.progress(&format!("Streaming {}", self.output.format_size(entry_size)));
133        let start_time = std::time::Instant::now();
134
135        let response = request.send().await            .map_err(|e| {
136                self.output.error(&format!("Network error during streaming upload: {}", e));
137                NetworkErrorHandler::handle_network_error(&e, "streaming upload")
138            })?;
139
140        let elapsed = start_time.elapsed();
141        let speed = if elapsed.as_secs() > 0 {
142            entry_size / elapsed.as_secs()
143        } else {
144            entry_size
145        };
146
147        self.output.progress_done();
148        self.output.info(&format!("Streaming completed in {} (avg speed: {})", 
149                 self.output.format_duration(elapsed), self.output.format_size(speed)));
150
151        if response.status().is_success() {
152            Ok(())
153        } else {
154            let status = response.status();
155            let error_text = response.text().await
156                .unwrap_or_else(|_| "Failed to read error response".to_string());
157            
158            let error_msg = match status.as_u16() {
159                413 => "File too large for registry".to_string(),
160                507 => "Insufficient storage space on registry".to_string(),
161                401 => "Authentication failed during upload".to_string(),
162                403 => "Permission denied for upload".to_string(),
163                408 | 504 => "Streaming upload timeout".to_string(),
164                _ => format!("Streaming upload failed (status {}): {}", status, error_text)
165            };
166            
167            self.output.error(&error_msg);
168            Err(PusherError::Upload(error_msg))
169        }
170    }
171}