Skip to main content

tuitbot_core/x_api/client/
mod.rs

1//! Reqwest-based X API v2 HTTP client implementation.
2//!
3//! Provides `XApiHttpClient` which implements the `XApiClient` trait
4//! using reqwest for HTTP requests with proper error mapping and
5//! rate limit header parsing.
6
7mod trait_impl;
8
9#[cfg(test)]
10mod tests;
11
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15use crate::error::XApiError;
16use crate::safety::redact::redact_secrets;
17use crate::storage::{self, DbPool};
18
19use super::types::{RateLimitInfo, XApiErrorResponse};
20
21/// Default X API v2 base URL.
22const DEFAULT_BASE_URL: &str = "https://api.x.com/2";
23
24/// Default X API v1.1 media upload base URL.
25const DEFAULT_UPLOAD_BASE_URL: &str = "https://upload.twitter.com/1.1";
26
27/// Standard tweet fields requested on every query.
28pub(crate) const TWEET_FIELDS: &str = "public_metrics,created_at,author_id,conversation_id";
29
30/// Standard expansions requested on every query.
31pub(crate) const EXPANSIONS: &str = "author_id";
32
33/// Standard user fields requested on every query.
34pub(crate) const USER_FIELDS: &str =
35    "username,name,public_metrics,profile_image_url,description,location,url";
36
37/// HTTP client for the X API v2.
38///
39/// Uses reqwest with Bearer token authentication. The access token
40/// is stored behind an `Arc<RwLock>` so the token manager can
41/// update it transparently after a refresh.
42pub struct XApiHttpClient {
43    pub(crate) client: reqwest::Client,
44    pub(crate) base_url: String,
45    pub(crate) upload_base_url: String,
46    pub(crate) access_token: Arc<RwLock<String>>,
47    pool: Arc<RwLock<Option<DbPool>>>,
48}
49
50impl XApiHttpClient {
51    /// Create a new X API HTTP client with the given access token.
52    pub fn new(access_token: String) -> Self {
53        Self {
54            client: reqwest::Client::new(),
55            base_url: DEFAULT_BASE_URL.to_string(),
56            upload_base_url: DEFAULT_UPLOAD_BASE_URL.to_string(),
57            access_token: Arc::new(RwLock::new(access_token)),
58            pool: Arc::new(RwLock::new(None)),
59        }
60    }
61
62    /// Create a new client with a custom base URL (for testing with wiremock).
63    pub fn with_base_url(access_token: String, base_url: String) -> Self {
64        let upload_base_url = base_url.clone();
65        Self {
66            client: reqwest::Client::new(),
67            base_url,
68            upload_base_url,
69            access_token: Arc::new(RwLock::new(access_token)),
70            pool: Arc::new(RwLock::new(None)),
71        }
72    }
73
74    /// Set the database pool for usage tracking.
75    ///
76    /// Called after DB initialization to enable fire-and-forget recording
77    /// of every X API call.
78    pub async fn set_pool(&self, pool: DbPool) {
79        let mut lock = self.pool.write().await;
80        *lock = Some(pool);
81    }
82
83    /// Get a shared reference to the access token lock for token manager integration.
84    pub fn access_token_lock(&self) -> Arc<RwLock<String>> {
85        self.access_token.clone()
86    }
87
88    /// Update the access token (used by token manager after refresh).
89    pub async fn set_access_token(&self, token: String) {
90        let mut lock = self.access_token.write().await;
91        *lock = token;
92    }
93
94    /// Parse rate limit headers from an X API response.
95    pub(crate) fn parse_rate_limit_headers(headers: &reqwest::header::HeaderMap) -> RateLimitInfo {
96        let remaining = headers
97            .get("x-rate-limit-remaining")
98            .and_then(|v| v.to_str().ok())
99            .and_then(|v| v.parse::<u64>().ok());
100
101        let reset_at = headers
102            .get("x-rate-limit-reset")
103            .and_then(|v| v.to_str().ok())
104            .and_then(|v| v.parse::<u64>().ok());
105
106        RateLimitInfo {
107            remaining,
108            reset_at,
109        }
110    }
111
112    /// Map an HTTP error response to a typed `XApiError`.
113    pub(crate) async fn map_error_response(response: reqwest::Response) -> XApiError {
114        let status = response.status().as_u16();
115        let rate_info = Self::parse_rate_limit_headers(response.headers());
116
117        let raw_body = response.text().await.unwrap_or_default();
118        let error_detail = serde_json::from_str::<XApiErrorResponse>(&raw_body).ok();
119        let body = redact_secrets(&raw_body);
120
121        let message = error_detail
122            .as_ref()
123            .and_then(|e| e.detail.clone())
124            .unwrap_or_else(|| body.clone());
125        let message = redact_secrets(&message);
126
127        match status {
128            429 => {
129                let retry_after = rate_info.reset_at.and_then(|reset| {
130                    let now = chrono::Utc::now().timestamp() as u64;
131                    reset.checked_sub(now)
132                });
133                XApiError::RateLimited { retry_after }
134            }
135            401 => XApiError::AuthExpired,
136            403 if Self::is_scope_insufficient_message(&message) => {
137                XApiError::ScopeInsufficient { message }
138            }
139            403 => XApiError::Forbidden { message },
140            _ => XApiError::ApiError { status, message },
141        }
142    }
143
144    fn is_scope_insufficient_message(message: &str) -> bool {
145        let normalized = message.to_ascii_lowercase();
146        normalized.contains("scope")
147            && (normalized.contains("insufficient")
148                || normalized.contains("missing")
149                || normalized.contains("not granted")
150                || normalized.contains("required"))
151    }
152
153    /// Record an API call in the usage tracking table (fire-and-forget).
154    pub(crate) fn record_usage(&self, path: &str, method: &str, status_code: u16) {
155        let pool_lock = self.pool.clone();
156        let endpoint = path.to_string();
157        let http_method = method.to_string();
158        let cost = storage::x_api_usage::estimate_cost(&endpoint, &http_method);
159        // Only record successful calls for cost (failed requests don't incur charges per X docs).
160        let final_cost = if status_code < 400 { cost } else { 0.0 };
161        tokio::spawn(async move {
162            if let Some(pool) = pool_lock.read().await.as_ref() {
163                if let Err(e) = storage::x_api_usage::insert_x_api_usage(
164                    pool,
165                    &endpoint,
166                    &http_method,
167                    status_code as i32,
168                    final_cost,
169                )
170                .await
171                {
172                    tracing::warn!(error = %e, "Failed to record X API usage");
173                }
174            }
175        });
176    }
177
178    /// Send a GET request and handle common error patterns.
179    pub(crate) async fn get(
180        &self,
181        path: &str,
182        query: &[(&str, &str)],
183    ) -> Result<reqwest::Response, XApiError> {
184        let token = self.access_token.read().await;
185        let url = format!("{}{}", self.base_url, path);
186
187        let response = self
188            .client
189            .get(&url)
190            .bearer_auth(&*token)
191            .query(query)
192            .send()
193            .await
194            .map_err(|e| XApiError::Network { source: e })?;
195
196        let status_code = response.status().as_u16();
197        let rate_info = Self::parse_rate_limit_headers(response.headers());
198        tracing::debug!(
199            path,
200            remaining = ?rate_info.remaining,
201            reset_at = ?rate_info.reset_at,
202            "X API response"
203        );
204
205        self.record_usage(path, "GET", status_code);
206
207        if response.status().is_success() {
208            Ok(response)
209        } else {
210            Err(Self::map_error_response(response).await)
211        }
212    }
213
214    /// Send a DELETE request and handle common error patterns.
215    pub(crate) async fn delete(&self, path: &str) -> Result<reqwest::Response, XApiError> {
216        let token = self.access_token.read().await;
217        let url = format!("{}{}", self.base_url, path);
218
219        let response = self
220            .client
221            .delete(&url)
222            .bearer_auth(&*token)
223            .send()
224            .await
225            .map_err(|e| XApiError::Network { source: e })?;
226
227        let status_code = response.status().as_u16();
228        let rate_info = Self::parse_rate_limit_headers(response.headers());
229        tracing::debug!(
230            path,
231            remaining = ?rate_info.remaining,
232            reset_at = ?rate_info.reset_at,
233            "X API response"
234        );
235
236        self.record_usage(path, "DELETE", status_code);
237
238        if response.status().is_success() {
239            Ok(response)
240        } else {
241            Err(Self::map_error_response(response).await)
242        }
243    }
244
245    /// Send a POST request with JSON body and handle common error patterns.
246    pub(crate) async fn post_json<T: serde::Serialize>(
247        &self,
248        path: &str,
249        body: &T,
250    ) -> Result<reqwest::Response, XApiError> {
251        let token = self.access_token.read().await;
252        let url = format!("{}{}", self.base_url, path);
253
254        let response = self
255            .client
256            .post(&url)
257            .bearer_auth(&*token)
258            .json(body)
259            .send()
260            .await
261            .map_err(|e| XApiError::Network { source: e })?;
262
263        let status_code = response.status().as_u16();
264        let rate_info = Self::parse_rate_limit_headers(response.headers());
265        tracing::debug!(
266            path,
267            remaining = ?rate_info.remaining,
268            reset_at = ?rate_info.reset_at,
269            "X API response"
270        );
271
272        self.record_usage(path, "POST", status_code);
273
274        if response.status().is_success() {
275            Ok(response)
276        } else {
277            Err(Self::map_error_response(response).await)
278        }
279    }
280}