use bigml::wait::{wait, WaitStatus};
use hyper::StatusCode;
use super::{
super::{client::original_http_error, percent_encode, Client, NoQuery},
gcs_write_access_denied_wait_options, ls, parse_gs_url,
};
use crate::tokio_glue::ConsumeWithParallelism;
use crate::{clouds::gcloud::ClientError, common::*};
const PARALLEL_DELETIONS: usize = 10;
#[instrument(level = "trace", skip(ctx))]
pub(crate) async fn rm_r(ctx: &Context, url: &Url) -> Result<()> {
debug!("deleting existing {}", url);
let url_stream = ls(ctx, url).await?;
let del_fut_stream: BoxStream<BoxFuture<()>> = url_stream
.map_ok(move |item| {
async move {
let url = item.to_url_string();
trace!("deleting {}", url);
let url = url.parse::<Url>()?;
let (bucket, object) = parse_gs_url(&url)?;
let req_url = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o/{}",
percent_encode(&bucket),
percent_encode(&object),
);
let client = Client::new().await?;
let opt = gcs_write_access_denied_wait_options();
wait(&opt, || async {
match client.delete(&req_url, NoQuery).await {
Ok(()) => WaitStatus::Finished(()),
Err(err) if should_retry_delete(&err) => {
WaitStatus::FailedTemporarily(err)
}
Err(err) => WaitStatus::FailedPermanently(err),
}
})
.await?;
Ok(())
}
.boxed()
})
.boxed();
del_fut_stream
.consume_with_parallelism(PARALLEL_DELETIONS)
.await?;
Ok(())
}
fn should_retry_delete(err: &ClientError) -> bool {
match err {
ClientError::NotFound { .. } => false,
ClientError::Other(err) => {
if let Some(err) = original_http_error(err) {
err.status() == Some(StatusCode::FORBIDDEN)
} else {
false
}
}
}
}