google_cloud_storage/http/
resumable_upload_client.rs1use std::fmt;
2
3use reqwest::header::{CONTENT_LENGTH, CONTENT_RANGE, RANGE};
4use reqwest::{Body, Response};
5use reqwest_middleware::ClientWithMiddleware as Client;
6
7use crate::http::{check_response_status, objects::Object, Error};
8
9#[derive(thiserror::Error, Debug)]
10pub enum ChunkError {
11 #[error("invalid range: first={0} last={1}")]
12 InvalidRange(u64, u64),
13 #[error("total object size must not be zero")]
14 ZeroTotalObjectSize,
15 #[error("last byte must be less than total object size: last={0} total={1}")]
16 InvalidLastBytes(u64, u64),
17}
18
19#[derive(PartialEq, Debug)]
20#[allow(clippy::large_enum_variant)]
21pub enum UploadStatus {
22 Ok(Object),
23 NotStarted,
24 ResumeIncomplete(UploadedRange),
25}
26
27#[derive(PartialEq, Debug)]
28pub struct UploadedRange {
29 pub first_byte: u64,
30 pub last_byte: u64,
31}
32
33#[derive(Clone, Debug)]
34pub struct ChunkSize {
35 first_byte: u64,
36 last_byte: u64,
37 total_object_size: Option<u64>,
38}
39
40impl fmt::Display for ChunkSize {
41 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
42 if self.total_object_size == Some(self.first_byte) {
43 write!(f, "bytes */")?;
44 } else {
45 write!(f, "bytes {}-{}/", self.first_byte, self.last_byte)?;
46 }
47
48 match self.total_object_size {
49 Some(total_object_size) => write!(f, "{total_object_size}"),
50 None => write!(f, "*"),
51 }
52 }
53}
54
55impl ChunkSize {
56 pub fn new(first_byte: u64, last_byte: u64, total_object_size: Option<u64>) -> ChunkSize {
57 Self {
58 first_byte,
59 last_byte,
60 total_object_size,
61 }
62 }
63
64 pub fn size(&self) -> u64 {
65 if self.total_object_size == Some(self.first_byte) {
66 0
67 } else {
68 self.last_byte - self.first_byte + 1
69 }
70 }
71}
72
73#[derive(Clone)]
74pub struct ResumableUploadClient {
75 session_url: String,
76 http: Client,
77}
78
79impl ResumableUploadClient {
80 pub fn url(&self) -> &str {
81 self.session_url.as_str()
82 }
83
84 pub fn new(session_url: String, http: Client) -> Self {
85 Self { session_url, http }
86 }
87
88 pub async fn upload_single_chunk<T: Into<Body>>(&self, data: T, size: usize) -> Result<(), Error> {
90 let response = self
91 .http
92 .put(&self.session_url)
93 .header(CONTENT_LENGTH, size)
94 .body(data)
95 .send()
96 .await?;
97 check_response_status(response).await?;
98 Ok(())
99 }
100
101 pub async fn upload_multiple_chunk<T: Into<Body>>(&self, data: T, size: &ChunkSize) -> Result<UploadStatus, Error> {
104 let response = self
105 .http
106 .put(&self.session_url)
107 .header(CONTENT_RANGE, size.to_string())
108 .header(CONTENT_LENGTH, size.size())
109 .body(data)
110 .send()
111 .await?;
112 Self::map_resume_response(response).await
113 }
114
115 pub async fn status(&self, object_size: Option<u64>) -> Result<UploadStatus, Error> {
117 let mut content_range = "bytes */".to_owned();
118 match object_size {
119 Some(object_size) => content_range.push_str(&object_size.to_string()),
120 None => content_range.push('*'),
121 };
122 let response = self
123 .http
124 .put(&self.session_url)
125 .header(CONTENT_RANGE, content_range)
126 .header(CONTENT_LENGTH, 0)
127 .body(Vec::new())
128 .send()
129 .await?;
130 Self::map_resume_response(response).await
131 }
132
133 pub async fn cancel(self) -> Result<(), Error> {
135 let response = self
136 .http
137 .delete(&self.session_url)
138 .header(CONTENT_LENGTH, 0)
139 .send()
140 .await?;
141 if response.status() == 499 {
142 Ok(())
143 } else {
144 check_response_status(response).await?;
145 Ok(())
146 }
147 }
148
149 async fn map_resume_response(response: Response) -> Result<UploadStatus, Error> {
150 if response.status() != 308 {
151 let response = check_response_status(response).await?;
152 return Ok(UploadStatus::Ok(response.json::<Object>().await?));
153 }
154
155 let range = response.headers().get(RANGE);
156
157 if range.is_none() {
158 return Ok(UploadStatus::NotStarted);
159 }
160
161 let range = range
162 .unwrap()
163 .to_str()
164 .map_err(|error| Error::InvalidRangeHeader(error.to_string()))?;
165
166 let range = range
167 .split('=')
168 .nth(1)
169 .ok_or_else(|| Error::InvalidRangeHeader(range.to_string()))?;
170
171 let start_end: Vec<&str> = range.split('-').collect();
172 let first_byte = start_end
173 .first()
174 .unwrap_or(&"0")
175 .parse::<u64>()
176 .map_err(|_| Error::InvalidRangeHeader(range.to_string()))?;
177
178 let last_byte = start_end
179 .get(1)
180 .ok_or_else(|| Error::InvalidRangeHeader(range.to_string()))?
181 .parse::<u64>()
182 .map_err(|_| Error::InvalidRangeHeader(range.to_string()))?;
183
184 Ok(UploadStatus::ResumeIncomplete(UploadedRange { first_byte, last_byte }))
185 }
186}