use mkt_core::error::{MktError, Result};
use mkt_core::http::{OpKind, RateLimiter, RetryPolicy, retry, retry_after_secs};
use reqwest::Client;
use secrecy::{ExposeSecret, SecretString};
use tracing::instrument;
use crate::error::GoogleApiErrorResponse;
const API_VERSION: &str = "v24";
const MAX_CONCURRENT: usize = 100;
#[derive(Debug)]
pub struct GoogleClient {
http: Client,
base_url: String,
access_token: SecretString,
developer_token: String,
customer_id: String,
login_customer_id: Option<String>,
rate_limiter: RateLimiter,
retry: RetryPolicy,
}
impl GoogleClient {
pub fn new(
access_token: SecretString,
developer_token: String,
customer_id: &str,
login_customer_id: Option<&str>,
) -> Result<Self> {
let base_url = format!("https://googleads.googleapis.com/{API_VERSION}/");
Ok(Self::new_with_base_url(
access_token,
developer_token,
customer_id,
login_customer_id,
base_url,
)?
.with_retry_policy(RetryPolicy::standard()))
}
pub fn new_with_base_url(
access_token: SecretString,
developer_token: String,
customer_id: &str,
login_customer_id: Option<&str>,
base_url: String,
) -> Result<Self> {
let http = mkt_core::http::build_http_client(None)?;
Ok(Self {
http,
base_url,
access_token,
developer_token,
customer_id: customer_id.replace('-', ""),
login_customer_id: login_customer_id.map(|id| id.replace('-', "")),
rate_limiter: RateLimiter::new(MAX_CONCURRENT),
retry: RetryPolicy::none(),
})
}
#[must_use]
pub const fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry = policy;
self
}
pub fn customer_id(&self) -> &str {
&self.customer_id
}
#[instrument(skip(self, query), fields(provider = "google"))]
pub async fn search(&self, query: &str) -> Result<serde_json::Value> {
let path = format!("customers/{}/googleAds:search", self.customer_id);
let body = serde_json::json!({ "query": query });
self.post(&path, &body, OpKind::Read).await
}
#[instrument(skip(self, operations), fields(provider = "google"))]
pub async fn mutate(
&self,
resource: &str,
operations: &serde_json::Value,
) -> Result<serde_json::Value> {
let path = format!("customers/{}/{resource}:mutate", self.customer_id);
let body = serde_json::json!({ "operations": operations });
self.post(&path, &body, OpKind::Write).await
}
async fn post(
&self,
path: &str,
body: &serde_json::Value,
kind: OpKind,
) -> Result<serde_json::Value> {
self.rate_limiter.acquire(1).await?;
let url = format!("{}{path}", self.base_url);
retry(&self.retry, kind, || async {
let mut request = self
.http
.post(&url)
.bearer_auth(self.access_token.expose_secret())
.header("developer-token", &self.developer_token)
.json(body);
if let Some(login_id) = &self.login_customer_id {
request = request.header("login-customer-id", login_id);
}
let response = request.send().await?;
Self::parse_response(response).await
})
.await
}
async fn parse_response(response: reqwest::Response) -> Result<serde_json::Value> {
let status = response.status().as_u16();
let retry_after = retry_after_secs(response.headers());
let body = response.text().await?;
if (200..300).contains(&status) {
let value: serde_json::Value = serde_json::from_str(&body)?;
return Ok(value);
}
if status == 429 {
return Err(MktError::RateLimited {
provider: "google".into(),
retry_after_secs: retry_after.unwrap_or(60),
});
}
if let Ok(api_err) = serde_json::from_str::<GoogleApiErrorResponse>(&body) {
return Err(api_err.into_mkt_error(status));
}
Err(MktError::ApiError {
provider: "google".into(),
status,
message: body,
retry_after,
})
}
}