1use mkt_core::error::{MktError, Result};
9use mkt_core::http::{OpKind, RateLimiter, RetryPolicy, retry, retry_after_secs};
10use reqwest::Client;
11use secrecy::{ExposeSecret, SecretString};
12use tracing::instrument;
13
14use crate::error::GoogleApiErrorResponse;
15
16const API_VERSION: &str = "v24";
18
19const MAX_CONCURRENT: usize = 100;
21
22#[derive(Debug)]
24pub struct GoogleClient {
25 http: Client,
26 base_url: String,
27 access_token: SecretString,
28 developer_token: String,
29 customer_id: String,
30 login_customer_id: Option<String>,
31 rate_limiter: RateLimiter,
32 retry: RetryPolicy,
33}
34
35impl GoogleClient {
36 pub fn new(
45 access_token: SecretString,
46 developer_token: String,
47 customer_id: &str,
48 login_customer_id: Option<&str>,
49 ) -> Result<Self> {
50 let base_url = format!("https://googleads.googleapis.com/{API_VERSION}/");
51 Ok(Self::new_with_base_url(
52 access_token,
53 developer_token,
54 customer_id,
55 login_customer_id,
56 base_url,
57 )?
58 .with_retry_policy(RetryPolicy::standard()))
59 }
60
61 pub fn new_with_base_url(
69 access_token: SecretString,
70 developer_token: String,
71 customer_id: &str,
72 login_customer_id: Option<&str>,
73 base_url: String,
74 ) -> Result<Self> {
75 let http = mkt_core::http::build_http_client(None)?;
76 Ok(Self {
77 http,
78 base_url,
79 access_token,
80 developer_token,
81 customer_id: customer_id.replace('-', ""),
82 login_customer_id: login_customer_id.map(|id| id.replace('-', "")),
83 rate_limiter: RateLimiter::new(MAX_CONCURRENT),
84 retry: RetryPolicy::none(),
85 })
86 }
87
88 #[must_use]
91 pub const fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
92 self.retry = policy;
93 self
94 }
95
96 pub fn customer_id(&self) -> &str {
98 &self.customer_id
99 }
100
101 #[instrument(skip(self, query), fields(provider = "google"))]
108 pub async fn search(&self, query: &str) -> Result<serde_json::Value> {
109 let path = format!("customers/{}/googleAds:search", self.customer_id);
110 let body = serde_json::json!({ "query": query });
111 self.post(&path, &body, OpKind::Read).await
112 }
113
114 #[instrument(skip(self, operations), fields(provider = "google"))]
122 pub async fn mutate(
123 &self,
124 resource: &str,
125 operations: &serde_json::Value,
126 ) -> Result<serde_json::Value> {
127 let path = format!("customers/{}/{resource}:mutate", self.customer_id);
128 let body = serde_json::json!({ "operations": operations });
129 self.post(&path, &body, OpKind::Write).await
130 }
131
132 async fn post(
134 &self,
135 path: &str,
136 body: &serde_json::Value,
137 kind: OpKind,
138 ) -> Result<serde_json::Value> {
139 self.rate_limiter.acquire(1).await?;
140 let url = format!("{}{path}", self.base_url);
141
142 retry(&self.retry, kind, || async {
143 let mut request = self
144 .http
145 .post(&url)
146 .bearer_auth(self.access_token.expose_secret())
147 .header("developer-token", &self.developer_token)
148 .json(body);
149
150 if let Some(login_id) = &self.login_customer_id {
151 request = request.header("login-customer-id", login_id);
152 }
153
154 let response = request.send().await?;
155 Self::parse_response(response).await
156 })
157 .await
158 }
159
160 async fn parse_response(response: reqwest::Response) -> Result<serde_json::Value> {
162 let status = response.status().as_u16();
163 let retry_after = retry_after_secs(response.headers());
164 let body = response.text().await?;
165
166 if (200..300).contains(&status) {
167 let value: serde_json::Value = serde_json::from_str(&body)?;
168 return Ok(value);
169 }
170
171 if status == 429 {
172 return Err(MktError::RateLimited {
173 provider: "google".into(),
174 retry_after_secs: retry_after.unwrap_or(60),
175 });
176 }
177
178 if let Ok(api_err) = serde_json::from_str::<GoogleApiErrorResponse>(&body) {
179 return Err(api_err.into_mkt_error(status));
180 }
181
182 Err(MktError::ApiError {
183 provider: "google".into(),
184 status,
185 message: body,
186 retry_after,
187 })
188 }
189}