1use mkt_core::error::{MktError, Result};
4use mkt_core::models::{
5 Campaign, CampaignFilters, CampaignId, CreateCampaignInput, InsightsQuery, InsightsReport,
6 Paginated, ProviderHealth, UpdateCampaignInput,
7};
8use mkt_core::provider::{MarketingProvider, ProviderCapabilities};
9use tracing::instrument;
10
11use crate::client::GoogleClient;
12
13const MAX_PAGES: usize = 100;
16use crate::mapping;
17
18#[derive(Debug)]
23pub struct GoogleProvider {
24 client: GoogleClient,
25}
26
27impl GoogleProvider {
28 pub const fn new(client: GoogleClient) -> Self {
30 Self { client }
31 }
32
33 async fn fetch_campaign(&self, id: &str) -> Result<Campaign> {
35 let query = format!(
36 "SELECT {} FROM campaign WHERE campaign.id = {id}",
37 mapping::CAMPAIGN_GAQL_FIELDS
38 );
39 let resp = self.client.search(&query).await?;
40 let row = resp["results"]
41 .as_array()
42 .and_then(|rows| rows.first())
43 .ok_or_else(|| MktError::ApiError {
44 provider: "google".into(),
45 status: 404,
46 message: format!("campaign {id} not found"),
47 retry_after: None,
48 })?;
49 mapping::google_row_to_campaign(row)
50 }
51}
52
53impl MarketingProvider for GoogleProvider {
54 fn name(&self) -> &'static str {
55 "google"
56 }
57
58 fn display_name(&self) -> &'static str {
59 "Google Ads"
60 }
61
62 fn capabilities(&self) -> ProviderCapabilities {
63 ProviderCapabilities {
64 campaigns: true,
65 adsets: false,
66 ads: false,
67 creatives: false,
68 audiences: false,
69 insights: true,
70 organic_posts: false,
71 dark_posts: false,
72 video_upload: false,
73 image_upload: false,
74 workflow_templates: false,
75 }
76 }
77
78 #[instrument(skip(self))]
81 fn list_campaigns(
82 &self,
83 filters: &CampaignFilters,
84 ) -> impl std::future::Future<Output = Result<Paginated<Campaign>>> + Send {
85 async move {
86 let mut query = format!("SELECT {} FROM campaign", mapping::CAMPAIGN_GAQL_FIELDS);
87
88 let mut conditions: Vec<String> = Vec::new();
89 if let Some(status) = &filters.status {
90 conditions.push(format!(
91 "campaign.status = '{}'",
92 mapping::domain_status_to_google(status)
93 ));
94 }
95 if let Some(name) = &filters.name_contains {
96 let escaped = mapping::escape_gaql_like(name);
99 conditions.push(format!("campaign.name LIKE '%{escaped}%'"));
100 }
101 if !conditions.is_empty() {
102 query.push_str(" WHERE ");
103 query.push_str(&conditions.join(" AND "));
104 }
105 if let Some(limit) = filters.limit {
106 query = format!("{query} LIMIT {limit}");
107 }
108
109 let resp = self
110 .client
111 .search_page(&query, filters.cursor.as_deref())
112 .await?;
113
114 let items = resp["results"]
115 .as_array()
116 .unwrap_or(&Vec::new())
117 .iter()
118 .map(mapping::google_row_to_campaign)
119 .collect::<Result<Vec<_>>>()?;
120
121 let next_cursor = resp["nextPageToken"].as_str().map(String::from);
122
123 Ok(Paginated {
124 data: items,
125 next_cursor,
126 total: None,
127 })
128 }
129 }
130
131 #[instrument(skip(self))]
132 fn get_campaign(
133 &self,
134 id: &CampaignId,
135 ) -> impl std::future::Future<Output = Result<Campaign>> + Send {
136 async move { self.fetch_campaign(&id.0).await }
137 }
138
139 #[instrument(skip(self, input))]
140 fn create_campaign(
141 &self,
142 input: &CreateCampaignInput,
143 ) -> impl std::future::Future<Output = Result<Campaign>> + Send {
144 async move {
145 let ops = mapping::atomic_create_operations(input, self.client.customer_id())?;
150 let resp = self.client.mutate_atomic(&ops).await?;
151 let new_id = mapping::campaign_id_from_atomic_mutate(&resp)?;
152
153 self.fetch_campaign(&new_id).await
154 }
155 }
156
157 #[instrument(skip(self, input))]
158 fn update_campaign(
159 &self,
160 id: &CampaignId,
161 input: &UpdateCampaignInput,
162 ) -> impl std::future::Future<Output = Result<Campaign>> + Send {
163 async move {
164 let ops = mapping::campaign_update_operation(self.client.customer_id(), &id.0, input);
165 let _resp = self.client.mutate("campaigns", &ops).await?;
166 self.fetch_campaign(&id.0).await
167 }
168 }
169
170 #[instrument(skip(self))]
171 fn delete_campaign(
172 &self,
173 id: &CampaignId,
174 ) -> impl std::future::Future<Output = Result<()>> + Send {
175 async move {
176 let resource = mapping::campaign_resource_name(self.client.customer_id(), &id.0);
177 let ops = serde_json::json!([{ "remove": resource }]);
178 let _resp = self.client.mutate("campaigns", &ops).await?;
179 Ok(())
180 }
181 }
182
183 #[instrument(skip(self, query))]
186 fn get_insights(
187 &self,
188 query: &InsightsQuery,
189 ) -> impl std::future::Future<Output = Result<InsightsReport>> + Send {
190 async move {
191 let metrics = if query.metrics.is_empty() {
192 "metrics.impressions, metrics.clicks, metrics.cost_micros, metrics.ctr".to_string()
193 } else {
194 query
195 .metrics
196 .iter()
197 .map(|m| format!("metrics.{m}"))
198 .collect::<Vec<_>>()
199 .join(", ")
200 };
201
202 let gaql = format!(
203 "SELECT campaign.id, campaign.name, segments.date, {metrics} FROM campaign"
204 );
205
206 let gaql = query.date_range.as_ref().map_or_else(
209 || format!("{gaql} WHERE segments.date DURING LAST_30_DAYS"),
210 |range| {
211 format!(
212 "{gaql} WHERE segments.date BETWEEN '{}' AND '{}'",
213 range.start.format("%Y-%m-%d"),
214 range.end.format("%Y-%m-%d")
215 )
216 },
217 );
218
219 let mut report: Option<InsightsReport> = None;
222 let mut page_token: Option<String> = None;
223 for _ in 0..MAX_PAGES {
224 let resp = self
225 .client
226 .search_page(&gaql, page_token.as_deref())
227 .await?;
228 let page = mapping::google_insights_to_domain(&resp)?;
229 report = Some(match report.take() {
230 None => page,
231 Some(mut acc) => {
232 acc.rows.extend(page.rows);
233 acc
234 }
235 });
236 page_token = resp["nextPageToken"].as_str().map(String::from);
237 if page_token.is_none() {
238 break;
239 }
240 }
241 report.ok_or_else(|| MktError::ApiError {
242 provider: "google".into(),
243 status: 0,
244 message: "insights search returned no pages".into(),
245 retry_after: None,
246 })
247 }
248 }
249
250 async fn health_check(&self) -> Result<ProviderHealth> {
253 let start = std::time::Instant::now();
254 let result = self
255 .client
256 .search("SELECT customer.id FROM customer LIMIT 1")
257 .await;
258 #[allow(clippy::cast_possible_truncation)] let latency = start.elapsed().as_millis() as u64;
260
261 match result {
262 Ok(_) => Ok(ProviderHealth {
263 provider: "google".into(),
264 healthy: true,
265 latency_ms: latency,
266 message: None,
267 }),
268 Err(e) => Ok(ProviderHealth {
269 provider: "google".into(),
270 healthy: false,
271 latency_ms: latency,
272 message: Some(e.to_string()),
273 }),
274 }
275 }
276}