Skip to main content

mkt_google/
provider.rs

1//! [`MarketingProvider`] implementation for Google Ads.
2
3use mkt_core::error::{MktError, Result};
4use mkt_core::models::{
5    Campaign, CampaignFilters, CampaignId, CreateCampaignInput, InsightsQuery, InsightsReport,
6    Paginated, ProviderHealth, UpdateCampaignInput,
7};
8use mkt_core::provider::{MarketingProvider, ProviderCapabilities};
9use tracing::instrument;
10
11use crate::client::GoogleClient;
12
13/// Upper bound on insights pages fetched per query (defense against
14/// runaway cursors; each page holds thousands of rows).
15const MAX_PAGES: usize = 100;
16use crate::mapping;
17
18/// Google Ads marketing provider.
19///
20/// Wraps a [`GoogleClient`] and implements [`MarketingProvider`] using the
21/// Google Ads REST API: GAQL for reads, `:mutate` endpoints for writes.
22#[derive(Debug)]
23pub struct GoogleProvider {
24    client: GoogleClient,
25}
26
27impl GoogleProvider {
28    /// Create a new Google Ads provider.
29    pub const fn new(client: GoogleClient) -> Self {
30        Self { client }
31    }
32
33    /// Fetch one campaign by numeric ID via GAQL.
34    async fn fetch_campaign(&self, id: &str) -> Result<Campaign> {
35        let query = format!(
36            "SELECT {} FROM campaign WHERE campaign.id = {id}",
37            mapping::CAMPAIGN_GAQL_FIELDS
38        );
39        let resp = self.client.search(&query).await?;
40        let row = resp["results"]
41            .as_array()
42            .and_then(|rows| rows.first())
43            .ok_or_else(|| MktError::ApiError {
44                provider: "google".into(),
45                status: 404,
46                message: format!("campaign {id} not found"),
47                retry_after: None,
48            })?;
49        mapping::google_row_to_campaign(row)
50    }
51}
52
53impl MarketingProvider for GoogleProvider {
54    fn name(&self) -> &'static str {
55        "google"
56    }
57
58    fn display_name(&self) -> &'static str {
59        "Google Ads"
60    }
61
62    fn capabilities(&self) -> ProviderCapabilities {
63        ProviderCapabilities {
64            campaigns: true,
65            adsets: false,
66            ads: false,
67            creatives: false,
68            audiences: false,
69            insights: true,
70            organic_posts: false,
71            dark_posts: false,
72            video_upload: false,
73            image_upload: false,
74            workflow_templates: false,
75        }
76    }
77
78    // ── Campaigns ──────────────────────────────────────
79
80    #[instrument(skip(self))]
81    fn list_campaigns(
82        &self,
83        filters: &CampaignFilters,
84    ) -> impl std::future::Future<Output = Result<Paginated<Campaign>>> + Send {
85        async move {
86            let mut query = format!("SELECT {} FROM campaign", mapping::CAMPAIGN_GAQL_FIELDS);
87
88            let mut conditions: Vec<String> = Vec::new();
89            if let Some(status) = &filters.status {
90                conditions.push(format!(
91                    "campaign.status = '{}'",
92                    mapping::domain_status_to_google(status)
93                ));
94            }
95            if let Some(name) = &filters.name_contains {
96                // GAQL LIKE: quotes escaped, %/_/[ neutralized so user
97                // input matches literally.
98                let escaped = mapping::escape_gaql_like(name);
99                conditions.push(format!("campaign.name LIKE '%{escaped}%'"));
100            }
101            if !conditions.is_empty() {
102                query.push_str(" WHERE ");
103                query.push_str(&conditions.join(" AND "));
104            }
105            if let Some(limit) = filters.limit {
106                query = format!("{query} LIMIT {limit}");
107            }
108
109            let resp = self
110                .client
111                .search_page(&query, filters.cursor.as_deref())
112                .await?;
113
114            let items = resp["results"]
115                .as_array()
116                .unwrap_or(&Vec::new())
117                .iter()
118                .map(mapping::google_row_to_campaign)
119                .collect::<Result<Vec<_>>>()?;
120
121            let next_cursor = resp["nextPageToken"].as_str().map(String::from);
122
123            Ok(Paginated {
124                data: items,
125                next_cursor,
126                total: None,
127            })
128        }
129    }
130
131    #[instrument(skip(self))]
132    fn get_campaign(
133        &self,
134        id: &CampaignId,
135    ) -> impl std::future::Future<Output = Result<Campaign>> + Send {
136        async move { self.fetch_campaign(&id.0).await }
137    }
138
139    #[instrument(skip(self, input))]
140    fn create_campaign(
141        &self,
142        input: &CreateCampaignInput,
143    ) -> impl std::future::Future<Output = Result<Campaign>> + Send {
144        async move {
145            // Budget and campaign go out in ONE atomic googleAds:mutate
146            // request: the budget is created under a temporary negative ID
147            // and referenced by the campaign, so a campaign failure can no
148            // longer leave an orphaned budget behind.
149            let ops = mapping::atomic_create_operations(input, self.client.customer_id())?;
150            let resp = self.client.mutate_atomic(&ops).await?;
151            let new_id = mapping::campaign_id_from_atomic_mutate(&resp)?;
152
153            self.fetch_campaign(&new_id).await
154        }
155    }
156
157    #[instrument(skip(self, input))]
158    fn update_campaign(
159        &self,
160        id: &CampaignId,
161        input: &UpdateCampaignInput,
162    ) -> impl std::future::Future<Output = Result<Campaign>> + Send {
163        async move {
164            let ops = mapping::campaign_update_operation(self.client.customer_id(), &id.0, input);
165            let _resp = self.client.mutate("campaigns", &ops).await?;
166            self.fetch_campaign(&id.0).await
167        }
168    }
169
170    #[instrument(skip(self))]
171    fn delete_campaign(
172        &self,
173        id: &CampaignId,
174    ) -> impl std::future::Future<Output = Result<()>> + Send {
175        async move {
176            let resource = mapping::campaign_resource_name(self.client.customer_id(), &id.0);
177            let ops = serde_json::json!([{ "remove": resource }]);
178            let _resp = self.client.mutate("campaigns", &ops).await?;
179            Ok(())
180        }
181    }
182
183    // ── Insights ─────────────────────────────────────────
184
185    #[instrument(skip(self, query))]
186    fn get_insights(
187        &self,
188        query: &InsightsQuery,
189    ) -> impl std::future::Future<Output = Result<InsightsReport>> + Send {
190        async move {
191            let metrics = if query.metrics.is_empty() {
192                "metrics.impressions, metrics.clicks, metrics.cost_micros, metrics.ctr".to_string()
193            } else {
194                query
195                    .metrics
196                    .iter()
197                    .map(|m| format!("metrics.{m}"))
198                    .collect::<Vec<_>>()
199                    .join(", ")
200            };
201
202            let gaql = format!(
203                "SELECT campaign.id, campaign.name, segments.date, {metrics} FROM campaign"
204            );
205
206            // GAQL requires a finite date range whenever a core date
207            // segment is selected, so an unbounded query must not be sent.
208            let gaql = query.date_range.as_ref().map_or_else(
209                || format!("{gaql} WHERE segments.date DURING LAST_30_DAYS"),
210                |range| {
211                    format!(
212                        "{gaql} WHERE segments.date BETWEEN '{}' AND '{}'",
213                        range.start.format("%Y-%m-%d"),
214                        range.end.format("%Y-%m-%d")
215                    )
216                },
217            );
218
219            // Aggregate every page: a truncated report silently loses
220            // spend data. Page count is bounded by the API's 10k rows/page.
221            let mut report: Option<InsightsReport> = None;
222            let mut page_token: Option<String> = None;
223            for _ in 0..MAX_PAGES {
224                let resp = self
225                    .client
226                    .search_page(&gaql, page_token.as_deref())
227                    .await?;
228                let page = mapping::google_insights_to_domain(&resp)?;
229                report = Some(match report.take() {
230                    None => page,
231                    Some(mut acc) => {
232                        acc.rows.extend(page.rows);
233                        acc
234                    }
235                });
236                page_token = resp["nextPageToken"].as_str().map(String::from);
237                if page_token.is_none() {
238                    break;
239                }
240            }
241            report.ok_or_else(|| MktError::ApiError {
242                provider: "google".into(),
243                status: 0,
244                message: "insights search returned no pages".into(),
245                retry_after: None,
246            })
247        }
248    }
249
250    // ── Health check ───────────────────────────────────
251
252    async fn health_check(&self) -> Result<ProviderHealth> {
253        let start = std::time::Instant::now();
254        let result = self
255            .client
256            .search("SELECT customer.id FROM customer LIMIT 1")
257            .await;
258        #[allow(clippy::cast_possible_truncation)] // health-check latency fits in u64
259        let latency = start.elapsed().as_millis() as u64;
260
261        match result {
262            Ok(_) => Ok(ProviderHealth {
263                provider: "google".into(),
264                healthy: true,
265                latency_ms: latency,
266                message: None,
267            }),
268            Err(e) => Ok(ProviderHealth {
269                provider: "google".into(),
270                healthy: false,
271                latency_ms: latency,
272                message: Some(e.to_string()),
273            }),
274        }
275    }
276}