google_cloud_storage/http/
resumable_upload_client.rs

1use std::fmt;
2
3use reqwest::header::{CONTENT_LENGTH, CONTENT_RANGE, RANGE};
4use reqwest::{Body, Response};
5use reqwest_middleware::ClientWithMiddleware as Client;
6
7use crate::http::{check_response_status, objects::Object, Error};
8
9#[derive(thiserror::Error, Debug)]
10pub enum ChunkError {
11    #[error("invalid range: first={0} last={1}")]
12    InvalidRange(u64, u64),
13    #[error("total object size must not be zero")]
14    ZeroTotalObjectSize,
15    #[error("last byte must be less than total object size: last={0} total={1}")]
16    InvalidLastBytes(u64, u64),
17}
18
19#[derive(PartialEq, Debug)]
20#[allow(clippy::large_enum_variant)]
21pub enum UploadStatus {
22    Ok(Object),
23    NotStarted,
24    ResumeIncomplete(UploadedRange),
25}
26
27#[derive(PartialEq, Debug)]
28pub struct UploadedRange {
29    pub first_byte: u64,
30    pub last_byte: u64,
31}
32
33#[derive(Clone, Debug)]
34pub struct ChunkSize {
35    first_byte: u64,
36    last_byte: u64,
37    total_object_size: Option<u64>,
38}
39
40impl fmt::Display for ChunkSize {
41    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
42        if self.total_object_size == Some(self.first_byte) {
43            write!(f, "bytes */")?;
44        } else {
45            write!(f, "bytes {}-{}/", self.first_byte, self.last_byte)?;
46        }
47
48        match self.total_object_size {
49            Some(total_object_size) => write!(f, "{total_object_size}"),
50            None => write!(f, "*"),
51        }
52    }
53}
54
55impl ChunkSize {
56    pub fn new(first_byte: u64, last_byte: u64, total_object_size: Option<u64>) -> ChunkSize {
57        Self {
58            first_byte,
59            last_byte,
60            total_object_size,
61        }
62    }
63
64    pub fn size(&self) -> u64 {
65        if self.total_object_size == Some(self.first_byte) {
66            0
67        } else {
68            self.last_byte - self.first_byte + 1
69        }
70    }
71}
72
73#[derive(Clone)]
74pub struct ResumableUploadClient {
75    session_url: String,
76    http: Client,
77}
78
79impl ResumableUploadClient {
80    pub fn url(&self) -> &str {
81        self.session_url.as_str()
82    }
83
84    pub fn new(session_url: String, http: Client) -> Self {
85        Self { session_url, http }
86    }
87
88    /// https://cloud.google.com/storage/docs/performing-resumable-uploads#single-chunk-upload
89    pub async fn upload_single_chunk<T: Into<Body>>(&self, data: T, size: usize) -> Result<(), Error> {
90        let response = self
91            .http
92            .put(&self.session_url)
93            .header(CONTENT_LENGTH, size)
94            .body(data)
95            .send()
96            .await?;
97        check_response_status(response).await?;
98        Ok(())
99    }
100
101    /// https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
102    /// https://cloud.google.com/storage/docs/performing-resumable-uploads#resume-upload
103    pub async fn upload_multiple_chunk<T: Into<Body>>(&self, data: T, size: &ChunkSize) -> Result<UploadStatus, Error> {
104        let response = self
105            .http
106            .put(&self.session_url)
107            .header(CONTENT_RANGE, size.to_string())
108            .header(CONTENT_LENGTH, size.size())
109            .body(data)
110            .send()
111            .await?;
112        Self::map_resume_response(response).await
113    }
114
115    /// https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
116    pub async fn status(&self, object_size: Option<u64>) -> Result<UploadStatus, Error> {
117        let mut content_range = "bytes */".to_owned();
118        match object_size {
119            Some(object_size) => content_range.push_str(&object_size.to_string()),
120            None => content_range.push('*'),
121        };
122        let response = self
123            .http
124            .put(&self.session_url)
125            .header(CONTENT_RANGE, content_range)
126            .header(CONTENT_LENGTH, 0)
127            .body(Vec::new())
128            .send()
129            .await?;
130        Self::map_resume_response(response).await
131    }
132
133    /// https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload
134    pub async fn cancel(self) -> Result<(), Error> {
135        let response = self
136            .http
137            .delete(&self.session_url)
138            .header(CONTENT_LENGTH, 0)
139            .send()
140            .await?;
141        if response.status() == 499 {
142            Ok(())
143        } else {
144            check_response_status(response).await?;
145            Ok(())
146        }
147    }
148
149    async fn map_resume_response(response: Response) -> Result<UploadStatus, Error> {
150        if response.status() != 308 {
151            let response = check_response_status(response).await?;
152            return Ok(UploadStatus::Ok(response.json::<Object>().await?));
153        }
154
155        let range = response.headers().get(RANGE);
156
157        if range.is_none() {
158            return Ok(UploadStatus::NotStarted);
159        }
160
161        let range = range
162            .unwrap()
163            .to_str()
164            .map_err(|error| Error::InvalidRangeHeader(error.to_string()))?;
165
166        let range = range
167            .split('=')
168            .nth(1)
169            .ok_or_else(|| Error::InvalidRangeHeader(range.to_string()))?;
170
171        let start_end: Vec<&str> = range.split('-').collect();
172        let first_byte = start_end
173            .first()
174            .unwrap_or(&"0")
175            .parse::<u64>()
176            .map_err(|_| Error::InvalidRangeHeader(range.to_string()))?;
177
178        let last_byte = start_end
179            .get(1)
180            .ok_or_else(|| Error::InvalidRangeHeader(range.to_string()))?
181            .parse::<u64>()
182            .map_err(|_| Error::InvalidRangeHeader(range.to_string()))?;
183
184        Ok(UploadStatus::ResumeIncomplete(UploadedRange { first_byte, last_byte }))
185    }
186}