use serde::Serialize;
use super::{
super::{crc32c_stream::Crc32cStream, percent_encode, Client, NoQuery},
parse_gs_url, StorageObject,
};
use crate::common::*;
use crate::tokio_glue::idiomatic_bytes_stream;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct UploadQuery {
upload_type: &'static str,
if_generation_match: i64,
name: String,
}
#[instrument(level = "trace", skip(ctx, data))]
pub(crate) async fn upload_file<'a>(
ctx: &'a Context,
data: BoxStream<BytesMut>,
file_url: &'a Url,
) -> Result<StorageObject> {
debug!("streaming to {}", file_url);
let (bucket, object) = parse_gs_url(file_url)?;
let (stream, crc32c_reciever) = Crc32cStream::new(data);
let url = format!(
"https://storage.googleapis.com/upload/storage/v1/b/{}/o",
percent_encode(&bucket),
);
let query = UploadQuery {
upload_type: "media",
if_generation_match: 0,
name: object.clone(),
};
let client = Client::new().await?;
client
.post_stream(&url, query, idiomatic_bytes_stream(ctx, stream.boxed()))
.await?;
let hasher = crc32c_reciever
.await
.map_err(|_| format_err!("error waiting for checksum"))?;
let crc32c = hasher.finish_encoded();
let obj_url = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o/{}",
percent_encode(&bucket),
percent_encode(&object),
);
let obj: StorageObject = client.get(&obj_url, NoQuery).await?;
if obj.crc32c == crc32c {
Ok(obj)
} else {
Err(format_err!(
"{} does not have the expected checksum, did it change?",
file_url,
))
}
}