tuitbot_core/x_api/client/
mod.rs1mod trait_impl;
8
9#[cfg(test)]
10mod tests;
11
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15use crate::error::XApiError;
16use crate::safety::redact::redact_secrets;
17use crate::storage::{self, DbPool};
18
19use super::types::{RateLimitInfo, XApiErrorResponse};
20
21const DEFAULT_BASE_URL: &str = "https://api.x.com/2";
23
24const DEFAULT_UPLOAD_BASE_URL: &str = "https://upload.twitter.com/1.1";
26
27pub(crate) const TWEET_FIELDS: &str = "public_metrics,created_at,author_id,conversation_id";
29
30pub(crate) const EXPANSIONS: &str = "author_id";
32
33pub(crate) const USER_FIELDS: &str = "username,name,public_metrics,profile_image_url";
35
36pub struct XApiHttpClient {
42 pub(crate) client: reqwest::Client,
43 pub(crate) base_url: String,
44 pub(crate) upload_base_url: String,
45 pub(crate) access_token: Arc<RwLock<String>>,
46 pool: Arc<RwLock<Option<DbPool>>>,
47}
48
49impl XApiHttpClient {
50 pub fn new(access_token: String) -> Self {
52 Self {
53 client: reqwest::Client::new(),
54 base_url: DEFAULT_BASE_URL.to_string(),
55 upload_base_url: DEFAULT_UPLOAD_BASE_URL.to_string(),
56 access_token: Arc::new(RwLock::new(access_token)),
57 pool: Arc::new(RwLock::new(None)),
58 }
59 }
60
61 pub fn with_base_url(access_token: String, base_url: String) -> Self {
63 let upload_base_url = base_url.clone();
64 Self {
65 client: reqwest::Client::new(),
66 base_url,
67 upload_base_url,
68 access_token: Arc::new(RwLock::new(access_token)),
69 pool: Arc::new(RwLock::new(None)),
70 }
71 }
72
73 pub async fn set_pool(&self, pool: DbPool) {
78 let mut lock = self.pool.write().await;
79 *lock = Some(pool);
80 }
81
82 pub fn access_token_lock(&self) -> Arc<RwLock<String>> {
84 self.access_token.clone()
85 }
86
87 pub async fn set_access_token(&self, token: String) {
89 let mut lock = self.access_token.write().await;
90 *lock = token;
91 }
92
93 pub(crate) fn parse_rate_limit_headers(headers: &reqwest::header::HeaderMap) -> RateLimitInfo {
95 let remaining = headers
96 .get("x-rate-limit-remaining")
97 .and_then(|v| v.to_str().ok())
98 .and_then(|v| v.parse::<u64>().ok());
99
100 let reset_at = headers
101 .get("x-rate-limit-reset")
102 .and_then(|v| v.to_str().ok())
103 .and_then(|v| v.parse::<u64>().ok());
104
105 RateLimitInfo {
106 remaining,
107 reset_at,
108 }
109 }
110
111 pub(crate) async fn map_error_response(response: reqwest::Response) -> XApiError {
113 let status = response.status().as_u16();
114 let rate_info = Self::parse_rate_limit_headers(response.headers());
115
116 let raw_body = response.text().await.unwrap_or_default();
117 let error_detail = serde_json::from_str::<XApiErrorResponse>(&raw_body).ok();
118 let body = redact_secrets(&raw_body);
119
120 let message = error_detail
121 .as_ref()
122 .and_then(|e| e.detail.clone())
123 .unwrap_or_else(|| body.clone());
124 let message = redact_secrets(&message);
125
126 match status {
127 429 => {
128 let retry_after = rate_info.reset_at.and_then(|reset| {
129 let now = chrono::Utc::now().timestamp() as u64;
130 reset.checked_sub(now)
131 });
132 XApiError::RateLimited { retry_after }
133 }
134 401 => XApiError::AuthExpired,
135 403 if Self::is_scope_insufficient_message(&message) => {
136 XApiError::ScopeInsufficient { message }
137 }
138 403 => XApiError::Forbidden { message },
139 _ => XApiError::ApiError { status, message },
140 }
141 }
142
143 fn is_scope_insufficient_message(message: &str) -> bool {
144 let normalized = message.to_ascii_lowercase();
145 normalized.contains("scope")
146 && (normalized.contains("insufficient")
147 || normalized.contains("missing")
148 || normalized.contains("not granted")
149 || normalized.contains("required"))
150 }
151
152 pub(crate) fn record_usage(&self, path: &str, method: &str, status_code: u16) {
154 let pool_lock = self.pool.clone();
155 let endpoint = path.to_string();
156 let http_method = method.to_string();
157 let cost = storage::x_api_usage::estimate_cost(&endpoint, &http_method);
158 let final_cost = if status_code < 400 { cost } else { 0.0 };
160 tokio::spawn(async move {
161 if let Some(pool) = pool_lock.read().await.as_ref() {
162 if let Err(e) = storage::x_api_usage::insert_x_api_usage(
163 pool,
164 &endpoint,
165 &http_method,
166 status_code as i32,
167 final_cost,
168 )
169 .await
170 {
171 tracing::warn!(error = %e, "Failed to record X API usage");
172 }
173 }
174 });
175 }
176
177 pub(crate) async fn get(
179 &self,
180 path: &str,
181 query: &[(&str, &str)],
182 ) -> Result<reqwest::Response, XApiError> {
183 let token = self.access_token.read().await;
184 let url = format!("{}{}", self.base_url, path);
185
186 let response = self
187 .client
188 .get(&url)
189 .bearer_auth(&*token)
190 .query(query)
191 .send()
192 .await
193 .map_err(|e| XApiError::Network { source: e })?;
194
195 let status_code = response.status().as_u16();
196 let rate_info = Self::parse_rate_limit_headers(response.headers());
197 tracing::debug!(
198 path,
199 remaining = ?rate_info.remaining,
200 reset_at = ?rate_info.reset_at,
201 "X API response"
202 );
203
204 self.record_usage(path, "GET", status_code);
205
206 if response.status().is_success() {
207 Ok(response)
208 } else {
209 Err(Self::map_error_response(response).await)
210 }
211 }
212
213 pub(crate) async fn delete(&self, path: &str) -> Result<reqwest::Response, XApiError> {
215 let token = self.access_token.read().await;
216 let url = format!("{}{}", self.base_url, path);
217
218 let response = self
219 .client
220 .delete(&url)
221 .bearer_auth(&*token)
222 .send()
223 .await
224 .map_err(|e| XApiError::Network { source: e })?;
225
226 let status_code = response.status().as_u16();
227 let rate_info = Self::parse_rate_limit_headers(response.headers());
228 tracing::debug!(
229 path,
230 remaining = ?rate_info.remaining,
231 reset_at = ?rate_info.reset_at,
232 "X API response"
233 );
234
235 self.record_usage(path, "DELETE", status_code);
236
237 if response.status().is_success() {
238 Ok(response)
239 } else {
240 Err(Self::map_error_response(response).await)
241 }
242 }
243
244 pub(crate) async fn post_json<T: serde::Serialize>(
246 &self,
247 path: &str,
248 body: &T,
249 ) -> Result<reqwest::Response, XApiError> {
250 let token = self.access_token.read().await;
251 let url = format!("{}{}", self.base_url, path);
252
253 let response = self
254 .client
255 .post(&url)
256 .bearer_auth(&*token)
257 .json(body)
258 .send()
259 .await
260 .map_err(|e| XApiError::Network { source: e })?;
261
262 let status_code = response.status().as_u16();
263 let rate_info = Self::parse_rate_limit_headers(response.headers());
264 tracing::debug!(
265 path,
266 remaining = ?rate_info.remaining,
267 reset_at = ?rate_info.reset_at,
268 "X API response"
269 );
270
271 self.record_usage(path, "POST", status_code);
272
273 if response.status().is_success() {
274 Ok(response)
275 } else {
276 Err(Self::map_error_response(response).await)
277 }
278 }
279}