Skip to main content

tuitbot_core/x_api/client/
mod.rs

1//! Reqwest-based X API v2 HTTP client implementation.
2//!
3//! Provides `XApiHttpClient` which implements the `XApiClient` trait
4//! using reqwest for HTTP requests with proper error mapping and
5//! rate limit header parsing.
6
7mod trait_impl;
8
9#[cfg(test)]
10mod tests;
11
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15use crate::error::XApiError;
16use crate::safety::redact::redact_secrets;
17use crate::storage::{self, DbPool};
18
19use super::types::{RateLimitInfo, XApiErrorResponse};
20
21/// Default X API v2 base URL.
22const DEFAULT_BASE_URL: &str = "https://api.x.com/2";
23
24/// Default X API v1.1 media upload base URL.
25const DEFAULT_UPLOAD_BASE_URL: &str = "https://upload.twitter.com/1.1";
26
27/// Standard tweet fields requested on every query.
28pub(crate) const TWEET_FIELDS: &str = "public_metrics,created_at,author_id,conversation_id";
29
30/// Standard expansions requested on every query.
31pub(crate) const EXPANSIONS: &str = "author_id";
32
33/// Standard user fields requested on every query.
34pub(crate) const USER_FIELDS: &str = "username,public_metrics";
35
36/// HTTP client for the X API v2.
37///
38/// Uses reqwest with Bearer token authentication. The access token
39/// is stored behind an `Arc<RwLock>` so the token manager can
40/// update it transparently after a refresh.
41pub struct XApiHttpClient {
42    pub(crate) client: reqwest::Client,
43    pub(crate) base_url: String,
44    pub(crate) upload_base_url: String,
45    pub(crate) access_token: Arc<RwLock<String>>,
46    pool: Arc<RwLock<Option<DbPool>>>,
47}
48
49impl XApiHttpClient {
50    /// Create a new X API HTTP client with the given access token.
51    pub fn new(access_token: String) -> Self {
52        Self {
53            client: reqwest::Client::new(),
54            base_url: DEFAULT_BASE_URL.to_string(),
55            upload_base_url: DEFAULT_UPLOAD_BASE_URL.to_string(),
56            access_token: Arc::new(RwLock::new(access_token)),
57            pool: Arc::new(RwLock::new(None)),
58        }
59    }
60
61    /// Create a new client with a custom base URL (for testing with wiremock).
62    pub fn with_base_url(access_token: String, base_url: String) -> Self {
63        let upload_base_url = base_url.clone();
64        Self {
65            client: reqwest::Client::new(),
66            base_url,
67            upload_base_url,
68            access_token: Arc::new(RwLock::new(access_token)),
69            pool: Arc::new(RwLock::new(None)),
70        }
71    }
72
73    /// Set the database pool for usage tracking.
74    ///
75    /// Called after DB initialization to enable fire-and-forget recording
76    /// of every X API call.
77    pub async fn set_pool(&self, pool: DbPool) {
78        let mut lock = self.pool.write().await;
79        *lock = Some(pool);
80    }
81
82    /// Get a shared reference to the access token lock for token manager integration.
83    pub fn access_token_lock(&self) -> Arc<RwLock<String>> {
84        self.access_token.clone()
85    }
86
87    /// Update the access token (used by token manager after refresh).
88    pub async fn set_access_token(&self, token: String) {
89        let mut lock = self.access_token.write().await;
90        *lock = token;
91    }
92
93    /// Parse rate limit headers from an X API response.
94    pub(crate) fn parse_rate_limit_headers(headers: &reqwest::header::HeaderMap) -> RateLimitInfo {
95        let remaining = headers
96            .get("x-rate-limit-remaining")
97            .and_then(|v| v.to_str().ok())
98            .and_then(|v| v.parse::<u64>().ok());
99
100        let reset_at = headers
101            .get("x-rate-limit-reset")
102            .and_then(|v| v.to_str().ok())
103            .and_then(|v| v.parse::<u64>().ok());
104
105        RateLimitInfo {
106            remaining,
107            reset_at,
108        }
109    }
110
111    /// Map an HTTP error response to a typed `XApiError`.
112    pub(crate) async fn map_error_response(response: reqwest::Response) -> XApiError {
113        let status = response.status().as_u16();
114        let rate_info = Self::parse_rate_limit_headers(response.headers());
115
116        let raw_body = response.text().await.unwrap_or_default();
117        let error_detail = serde_json::from_str::<XApiErrorResponse>(&raw_body).ok();
118        let body = redact_secrets(&raw_body);
119
120        let message = error_detail
121            .as_ref()
122            .and_then(|e| e.detail.clone())
123            .unwrap_or_else(|| body.clone());
124        let message = redact_secrets(&message);
125
126        match status {
127            429 => {
128                let retry_after = rate_info.reset_at.and_then(|reset| {
129                    let now = chrono::Utc::now().timestamp() as u64;
130                    reset.checked_sub(now)
131                });
132                XApiError::RateLimited { retry_after }
133            }
134            401 => XApiError::AuthExpired,
135            403 if Self::is_scope_insufficient_message(&message) => {
136                XApiError::ScopeInsufficient { message }
137            }
138            403 => XApiError::Forbidden { message },
139            _ => XApiError::ApiError { status, message },
140        }
141    }
142
143    fn is_scope_insufficient_message(message: &str) -> bool {
144        let normalized = message.to_ascii_lowercase();
145        normalized.contains("scope")
146            && (normalized.contains("insufficient")
147                || normalized.contains("missing")
148                || normalized.contains("not granted")
149                || normalized.contains("required"))
150    }
151
152    /// Record an API call in the usage tracking table (fire-and-forget).
153    pub(crate) fn record_usage(&self, path: &str, method: &str, status_code: u16) {
154        let pool_lock = self.pool.clone();
155        let endpoint = path.to_string();
156        let http_method = method.to_string();
157        let cost = storage::x_api_usage::estimate_cost(&endpoint, &http_method);
158        // Only record successful calls for cost (failed requests don't incur charges per X docs).
159        let final_cost = if status_code < 400 { cost } else { 0.0 };
160        tokio::spawn(async move {
161            if let Some(pool) = pool_lock.read().await.as_ref() {
162                if let Err(e) = storage::x_api_usage::insert_x_api_usage(
163                    pool,
164                    &endpoint,
165                    &http_method,
166                    status_code as i32,
167                    final_cost,
168                )
169                .await
170                {
171                    tracing::warn!(error = %e, "Failed to record X API usage");
172                }
173            }
174        });
175    }
176
177    /// Send a GET request and handle common error patterns.
178    pub(crate) async fn get(
179        &self,
180        path: &str,
181        query: &[(&str, &str)],
182    ) -> Result<reqwest::Response, XApiError> {
183        let token = self.access_token.read().await;
184        let url = format!("{}{}", self.base_url, path);
185
186        let response = self
187            .client
188            .get(&url)
189            .bearer_auth(&*token)
190            .query(query)
191            .send()
192            .await
193            .map_err(|e| XApiError::Network { source: e })?;
194
195        let status_code = response.status().as_u16();
196        let rate_info = Self::parse_rate_limit_headers(response.headers());
197        tracing::debug!(
198            path,
199            remaining = ?rate_info.remaining,
200            reset_at = ?rate_info.reset_at,
201            "X API response"
202        );
203
204        self.record_usage(path, "GET", status_code);
205
206        if response.status().is_success() {
207            Ok(response)
208        } else {
209            Err(Self::map_error_response(response).await)
210        }
211    }
212
213    /// Send a DELETE request and handle common error patterns.
214    pub(crate) async fn delete(&self, path: &str) -> Result<reqwest::Response, XApiError> {
215        let token = self.access_token.read().await;
216        let url = format!("{}{}", self.base_url, path);
217
218        let response = self
219            .client
220            .delete(&url)
221            .bearer_auth(&*token)
222            .send()
223            .await
224            .map_err(|e| XApiError::Network { source: e })?;
225
226        let status_code = response.status().as_u16();
227        let rate_info = Self::parse_rate_limit_headers(response.headers());
228        tracing::debug!(
229            path,
230            remaining = ?rate_info.remaining,
231            reset_at = ?rate_info.reset_at,
232            "X API response"
233        );
234
235        self.record_usage(path, "DELETE", status_code);
236
237        if response.status().is_success() {
238            Ok(response)
239        } else {
240            Err(Self::map_error_response(response).await)
241        }
242    }
243
244    /// Send a POST request with JSON body and handle common error patterns.
245    pub(crate) async fn post_json<T: serde::Serialize>(
246        &self,
247        path: &str,
248        body: &T,
249    ) -> Result<reqwest::Response, XApiError> {
250        let token = self.access_token.read().await;
251        let url = format!("{}{}", self.base_url, path);
252
253        let response = self
254            .client
255            .post(&url)
256            .bearer_auth(&*token)
257            .json(body)
258            .send()
259            .await
260            .map_err(|e| XApiError::Network { source: e })?;
261
262        let status_code = response.status().as_u16();
263        let rate_info = Self::parse_rate_limit_headers(response.headers());
264        tracing::debug!(
265            path,
266            remaining = ?rate_info.remaining,
267            reset_at = ?rate_info.reset_at,
268            "X API response"
269        );
270
271        self.record_usage(path, "POST", status_code);
272
273        if response.status().is_success() {
274            Ok(response)
275        } else {
276            Err(Self::map_error_response(response).await)
277        }
278    }
279}