docker_image_pusher/upload/
streaming.rs1use crate::error::{Result, PusherError};
4use crate::error::handlers::NetworkErrorHandler;
5use crate::output::OutputManager;
6use reqwest::{Client, header::CONTENT_TYPE, Body};
7use std::path::Path;
8use std::io::SeekFrom;
9use std::time::Duration;
10use tokio::time::sleep;
11use tokio::io::{AsyncReadExt, AsyncSeekExt};
12use tokio_util::io::ReaderStream;
13
14pub struct StreamingUploader {
15 client: Client,
16 max_retries: usize,
17 retry_delay: Duration,
18 timeout: Duration,
19 output: OutputManager,
20}
21
22impl StreamingUploader {
23 pub fn new(
24 client: Client,
25 max_retries: usize,
26 timeout_seconds: u64,
27 output: OutputManager,
28 ) -> Self {
29 Self {
30 client,
31 max_retries,
32 retry_delay: Duration::from_secs(10),
33 timeout: Duration::from_secs(timeout_seconds),
34 output,
35 }
36 }
37
38 pub async fn upload_from_tar_entry<P>(
39 &self,
40 tar_path: &Path,
41 _entry_path: &str,
42 entry_offset: u64,
43 entry_size: u64,
44 upload_url: &str,
45 digest: &str,
46 token: &Option<String>,
47 progress_callback: P,
48 ) -> Result<()>
49 where
50 P: Fn(u64, u64) + Send + Sync + 'static,
51 {
52 self.output.step(&format!("Starting streaming upload for {} ({})",
53 &digest[..16], self.output.format_size(entry_size)));
54
55 for attempt in 1..=self.max_retries {
56 self.output.detail(&format!("Streaming upload attempt {} of {}", attempt, self.max_retries));
57
58 match self.try_streaming_upload(
59 tar_path,
60 entry_offset,
61 entry_size,
62 upload_url,
63 digest,
64 token,
65 &progress_callback
66 ).await {
67 Ok(_) => {
68 progress_callback(entry_size, entry_size);
69 self.output.success(&format!("Streaming upload completed successfully on attempt {}", attempt));
70 return Ok(());
71 }
72 Err(e) if attempt < self.max_retries => {
73 self.output.warning(&format!("Streaming attempt {} failed: {}", attempt, e));
74 self.output.info(&format!("Waiting {}s before retry...", self.retry_delay.as_secs()));
75 sleep(self.retry_delay).await;
76 }
77 Err(e) => {
78 self.output.error(&format!("All {} streaming attempts failed. Last error: {}", self.max_retries, e));
79 return Err(e);
80 }
81 }
82 }
83
84 Err(PusherError::Upload("All streaming upload attempts failed".to_string()))
85 }
86
87 async fn try_streaming_upload<P>(
88 &self,
89 tar_path: &Path,
90 entry_offset: u64,
91 entry_size: u64,
92 upload_url: &str,
93 digest: &str,
94 token: &Option<String>,
95 progress_callback: &P,
96 ) -> Result<()>
97 where
98 P: Fn(u64, u64) + Send + Sync + 'static,
99 {
100 let url = if upload_url.contains('?') {
102 format!("{}&digest={}", upload_url, digest)
103 } else {
104 format!("{}?digest={}", upload_url, digest)
105 };
106
107 let mut async_file = tokio::fs::File::open(tar_path).await
109 .map_err(|e| PusherError::Io(e.to_string()))?;
110
111 async_file.seek(SeekFrom::Start(entry_offset)).await
112 .map_err(|e| PusherError::Io(e.to_string()))?;
113
114 let limited_async_reader = async_file.take(entry_size);
116 let stream = ReaderStream::new(limited_async_reader);
117 let body = Body::wrap_stream(stream);
118
119 let mut request = self.client
120 .put(&url)
121 .header(CONTENT_TYPE, "application/octet-stream")
122 .header("Content-Length", entry_size.to_string())
123 .timeout(self.timeout)
124 .body(body);
125
126 if let Some(token) = token {
127 request = request.bearer_auth(token);
128 }
129
130 progress_callback(0, entry_size);
131
132 self.output.progress(&format!("Streaming {}", self.output.format_size(entry_size)));
133 let start_time = std::time::Instant::now();
134
135 let response = request.send().await .map_err(|e| {
136 self.output.error(&format!("Network error during streaming upload: {}", e));
137 NetworkErrorHandler::handle_network_error(&e, "streaming upload")
138 })?;
139
140 let elapsed = start_time.elapsed();
141 let speed = if elapsed.as_secs() > 0 {
142 entry_size / elapsed.as_secs()
143 } else {
144 entry_size
145 };
146
147 self.output.progress_done();
148 self.output.info(&format!("Streaming completed in {} (avg speed: {})",
149 self.output.format_duration(elapsed), self.output.format_size(speed)));
150
151 if response.status().is_success() {
152 Ok(())
153 } else {
154 let status = response.status();
155 let error_text = response.text().await
156 .unwrap_or_else(|_| "Failed to read error response".to_string());
157
158 let error_msg = match status.as_u16() {
159 413 => "File too large for registry".to_string(),
160 507 => "Insufficient storage space on registry".to_string(),
161 401 => "Authentication failed during upload".to_string(),
162 403 => "Permission denied for upload".to_string(),
163 408 | 504 => "Streaming upload timeout".to_string(),
164 _ => format!("Streaming upload failed (status {}): {}", status, error_text)
165 };
166
167 self.output.error(&error_msg);
168 Err(PusherError::Upload(error_msg))
169 }
170 }
171}