use crate::{GcpError, GcpHttpClient, Result};
use serde::{Deserialize, Serialize};
use std::time::{Duration, Instant};
#[derive(Debug)]
enum PollStatus {
InProgress,
Done,
Failed {
message: String,
code: Option<String>,
},
}
trait OperationPoller {
fn build_poll_url(&self, client: &GcpHttpClient) -> String;
fn parse_status(&self, response: &[u8]) -> Result<PollStatus>;
fn operation_id(&self) -> &str;
}
async fn poll_operation<P: OperationPoller>(
poller: &P,
client: &GcpHttpClient,
initial_interval: Duration,
timeout: Duration,
) -> Result<()> {
let mut interval = initial_interval;
let deadline = Instant::now() + timeout;
let max_interval = Duration::from_secs(5);
let backoff_multiplier = 1.5;
let url = poller.build_poll_url(client);
loop {
let response = client.get(&url).await?;
let status = poller.parse_status(&response)?;
match status {
PollStatus::Done => return Ok(()),
PollStatus::Failed { message, code } => {
return Err(GcpError::OperationFailed {
operation: poller.operation_id().to_string(),
message,
code,
});
}
PollStatus::InProgress => {
if Instant::now() >= deadline {
return Err(GcpError::OperationTimeout {
operation: poller.operation_id().to_string(),
timeout,
});
}
tokio::time::sleep(interval).await;
interval = Duration::from_secs_f64(
(interval.as_secs_f64() * backoff_multiplier).min(max_interval.as_secs_f64()),
);
}
}
}
}
pub struct Operation<'a> {
client: &'a GcpHttpClient,
operation_url: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
}
impl<'a> Operation<'a> {
pub fn new(
client: &'a GcpHttpClient,
operation_url: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
) -> Self {
Self {
client,
operation_url,
initial_interval,
timeout,
initially_done,
}
}
pub async fn wait(self) -> Result<()> {
if self.initially_done {
return Ok(());
}
poll_operation(&self, self.client, self.initial_interval, self.timeout).await
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
impl<'a> OperationPoller for Operation<'a> {
fn build_poll_url(&self, _client: &GcpHttpClient) -> String {
self.operation_url.clone()
}
fn parse_status(&self, response: &[u8]) -> Result<PollStatus> {
let status: ComputeOperationStatus =
serde_json::from_slice(response).map_err(|e| GcpError::InvalidResponse {
message: format!("Failed to parse operation status: {}", e),
body: Some(String::from_utf8_lossy(response).to_string()),
})?;
if status.status == "DONE" {
if let Some(error) = status.error {
let message = error
.errors
.first()
.and_then(|e| e.message.clone())
.unwrap_or_else(|| "Unknown error".to_string());
let code = error.errors.first().and_then(|e| e.code.clone());
Ok(PollStatus::Failed { message, code })
} else {
Ok(PollStatus::Done)
}
} else {
Ok(PollStatus::InProgress)
}
}
fn operation_id(&self) -> &str {
&self.operation_url
}
}
pub struct ServiceUsageOperation<'a> {
client: &'a GcpHttpClient,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
}
impl<'a> ServiceUsageOperation<'a> {
pub fn new(
client: &'a GcpHttpClient,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
) -> Self {
Self {
client,
operation_name,
initial_interval,
timeout,
initially_done,
}
}
pub async fn wait(self) -> Result<()> {
if self.initially_done {
return Ok(());
}
poll_operation(&self, self.client, self.initial_interval, self.timeout).await
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
fn build_url(&self, path: &str) -> String {
#[cfg(any(test, feature = "test-support"))]
{
if let Some(ref base) = self.client.base_url {
return format!("{}{}", base.trim_end_matches('/'), path);
}
}
format!("https://serviceusage.googleapis.com{}", path)
}
}
impl<'a> OperationPoller for ServiceUsageOperation<'a> {
fn build_poll_url(&self, _client: &GcpHttpClient) -> String {
self.build_url(&format!("/v1/{}", self.operation_name))
}
fn parse_status(&self, response: &[u8]) -> Result<PollStatus> {
let status: LroStatus =
serde_json::from_slice(response).map_err(|e| GcpError::InvalidResponse {
message: format!("Failed to parse operation status: {}", e),
body: Some(String::from_utf8_lossy(response).to_string()),
})?;
if status.done {
if let Some(error) = status.error {
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error")
.to_string();
let code = error.get("code").and_then(|v| {
v.as_str()
.map(String::from)
.or_else(|| v.as_i64().map(|n| n.to_string()))
});
Ok(PollStatus::Failed { message, code })
} else {
Ok(PollStatus::Done)
}
} else {
Ok(PollStatus::InProgress)
}
}
fn operation_id(&self) -> &str {
&self.operation_name
}
}
pub struct ResourceManagerOperation<'a> {
client: &'a GcpHttpClient,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
}
impl<'a> ResourceManagerOperation<'a> {
pub fn new(
client: &'a GcpHttpClient,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
) -> Self {
Self {
client,
operation_name,
initial_interval,
timeout,
initially_done,
}
}
pub async fn wait(self) -> Result<()> {
if self.initially_done {
return Ok(());
}
poll_operation(&self, self.client, self.initial_interval, self.timeout).await
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
fn build_url(&self, path: &str) -> String {
#[cfg(any(test, feature = "test-support"))]
{
if let Some(ref base) = self.client.base_url {
return format!("{}{}", base.trim_end_matches('/'), path);
}
}
format!("https://cloudresourcemanager.googleapis.com{}", path)
}
}
impl<'a> OperationPoller for ResourceManagerOperation<'a> {
fn build_poll_url(&self, _client: &GcpHttpClient) -> String {
self.build_url(&format!("/v3/{}", self.operation_name))
}
fn parse_status(&self, response: &[u8]) -> Result<PollStatus> {
let status: LroStatus =
serde_json::from_slice(response).map_err(|e| GcpError::InvalidResponse {
message: format!("Failed to parse operation status: {}", e),
body: Some(String::from_utf8_lossy(response).to_string()),
})?;
if status.done {
if let Some(error) = status.error {
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error")
.to_string();
let code = error.get("code").and_then(|v| {
v.as_str()
.map(String::from)
.or_else(|| v.as_i64().map(|n| n.to_string()))
});
Ok(PollStatus::Failed { message, code })
} else {
Ok(PollStatus::Done)
}
} else {
Ok(PollStatus::InProgress)
}
}
fn operation_id(&self) -> &str {
&self.operation_name
}
}
pub struct ContainerOperation<'a> {
client: &'a GcpHttpClient,
operation_url: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
}
impl<'a> ContainerOperation<'a> {
pub fn new(
client: &'a GcpHttpClient,
operation_url: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
) -> Self {
Self {
client,
operation_url,
initial_interval,
timeout,
initially_done,
}
}
pub async fn wait(self) -> Result<()> {
if self.initially_done {
return Ok(());
}
poll_operation(&self, self.client, self.initial_interval, self.timeout).await
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
impl<'a> OperationPoller for ContainerOperation<'a> {
fn build_poll_url(&self, _client: &GcpHttpClient) -> String {
self.operation_url.clone()
}
fn parse_status(&self, response: &[u8]) -> Result<PollStatus> {
let status: ContainerOperationStatus =
serde_json::from_slice(response).map_err(|e| GcpError::InvalidResponse {
message: format!("Failed to parse operation status: {}", e),
body: Some(String::from_utf8_lossy(response).to_string()),
})?;
if status.status == "DONE" {
if let Some(error) = status.error {
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error")
.to_string();
let code = error
.get("code")
.and_then(|v| v.as_i64().map(|n| n.to_string()));
Ok(PollStatus::Failed { message, code })
} else {
Ok(PollStatus::Done)
}
} else {
Ok(PollStatus::InProgress)
}
}
fn operation_id(&self) -> &str {
&self.operation_url
}
}
pub struct GkeBackupOperation<'a> {
client: &'a GcpHttpClient,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
}
impl<'a> GkeBackupOperation<'a> {
pub fn new(
client: &'a GcpHttpClient,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
) -> Self {
Self {
client,
operation_name,
initial_interval,
timeout,
initially_done,
}
}
pub async fn wait(self) -> Result<()> {
if self.initially_done {
return Ok(());
}
poll_operation(&self, self.client, self.initial_interval, self.timeout).await
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
fn build_url(&self, path: &str) -> String {
#[cfg(any(test, feature = "test-support"))]
{
if let Some(ref base) = self.client.base_url {
return format!("{}{}", base.trim_end_matches('/'), path);
}
}
format!("https://gkebackup.googleapis.com{}", path)
}
}
impl<'a> OperationPoller for GkeBackupOperation<'a> {
fn build_poll_url(&self, _client: &GcpHttpClient) -> String {
self.build_url(&format!("/v1/{}", self.operation_name))
}
fn parse_status(&self, response: &[u8]) -> Result<PollStatus> {
let status: LroStatus =
serde_json::from_slice(response).map_err(|e| GcpError::InvalidResponse {
message: format!("Failed to parse operation status: {}", e),
body: Some(String::from_utf8_lossy(response).to_string()),
})?;
if status.done {
if let Some(error) = status.error {
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error")
.to_string();
let code = error.get("code").and_then(|v| {
v.as_str()
.map(String::from)
.or_else(|| v.as_i64().map(|n| n.to_string()))
});
Ok(PollStatus::Failed { message, code })
} else {
Ok(PollStatus::Done)
}
} else {
Ok(PollStatus::InProgress)
}
}
fn operation_id(&self) -> &str {
&self.operation_name
}
}
pub struct SqlOperation<'a> {
client: &'a GcpHttpClient,
project: String,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
}
impl<'a> SqlOperation<'a> {
pub fn new(
client: &'a GcpHttpClient,
project: String,
operation_name: String,
initial_interval: Duration,
timeout: Duration,
initially_done: bool,
) -> Self {
Self {
client,
project,
operation_name,
initial_interval,
timeout,
initially_done,
}
}
pub async fn wait(self) -> Result<()> {
if self.initially_done {
return Ok(());
}
poll_operation(&self, self.client, self.initial_interval, self.timeout).await
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
fn build_url(&self, path: &str) -> String {
#[cfg(any(test, feature = "test-support"))]
{
if let Some(ref base) = self.client.base_url {
return format!("{}{}", base.trim_end_matches('/'), path);
}
}
format!("https://sqladmin.googleapis.com{}", path)
}
}
impl<'a> OperationPoller for SqlOperation<'a> {
fn build_poll_url(&self, _client: &GcpHttpClient) -> String {
self.build_url(&format!(
"/v1/projects/{}/operations/{}",
self.project, self.operation_name
))
}
fn parse_status(&self, response: &[u8]) -> Result<PollStatus> {
let status: ComputeOperationStatus =
serde_json::from_slice(response).map_err(|e| GcpError::InvalidResponse {
message: format!("Failed to parse operation status: {}", e),
body: Some(String::from_utf8_lossy(response).to_string()),
})?;
if status.status == "DONE" {
if let Some(error) = status.error {
let message = error
.errors
.first()
.and_then(|e| e.message.clone())
.unwrap_or_else(|| "Unknown error".to_string());
let code = error.errors.first().and_then(|e| e.code.clone());
Ok(PollStatus::Failed { message, code })
} else {
Ok(PollStatus::Done)
}
} else {
Ok(PollStatus::InProgress)
}
}
fn operation_id(&self) -> &str {
&self.operation_name
}
}
#[derive(Debug, Deserialize, Serialize)]
struct ComputeOperationStatus {
name: String,
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<ComputeOperationError>,
}
#[derive(Debug, Deserialize, Serialize)]
struct ComputeOperationError {
errors: Vec<ComputeOperationErrorDetail>,
}
#[derive(Debug, Deserialize, Serialize)]
struct ComputeOperationErrorDetail {
code: Option<String>,
message: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ContainerOperationStatus {
#[allow(dead_code)]
name: String,
status: String,
error: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct LroStatus {
#[serde(default)]
done: bool,
error: Option<serde_json::Value>,
}
pub struct PollConfig {
initial_interval: Duration,
timeout: Duration,
}
impl PollConfig {
pub fn disk_operation() -> Self {
Self {
initial_interval: Duration::from_secs(1),
timeout: Duration::from_secs(300),
}
}
pub fn service_usage_operation() -> Self {
Self {
initial_interval: Duration::from_millis(500),
timeout: Duration::from_secs(120),
}
}
pub fn project_operation() -> Self {
Self {
initial_interval: Duration::from_secs(2),
timeout: Duration::from_secs(600),
}
}
pub fn container_operation() -> Self {
Self {
initial_interval: Duration::from_secs(2),
timeout: Duration::from_secs(600),
}
}
pub fn gke_backup_operation() -> Self {
Self {
initial_interval: Duration::from_secs(2),
timeout: Duration::from_secs(600),
}
}
pub fn sql_operation() -> Self {
Self {
initial_interval: Duration::from_secs(2),
timeout: Duration::from_secs(900),
}
}
pub fn network_operation() -> Self {
Self {
initial_interval: Duration::from_secs(1),
timeout: Duration::from_secs(180),
}
}
pub fn instance_operation() -> Self {
Self {
initial_interval: Duration::from_secs(1),
timeout: Duration::from_secs(300),
}
}
pub fn initial_interval(&self) -> Duration {
self.initial_interval
}
pub fn timeout(&self) -> Duration {
self.timeout
}
}