tuitbot_core/x_api/client/
mod.rs1mod trait_impl;
8
9#[cfg(test)]
10mod tests;
11
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15use crate::error::XApiError;
16use crate::safety::redact::redact_secrets;
17use crate::storage::{self, DbPool};
18
19use super::types::{RateLimitInfo, XApiErrorResponse};
20
21const DEFAULT_BASE_URL: &str = "https://api.x.com/2";
23
24const DEFAULT_UPLOAD_BASE_URL: &str = "https://upload.twitter.com/1.1";
26
27pub(crate) const TWEET_FIELDS: &str = "public_metrics,created_at,author_id,conversation_id";
29
30pub(crate) const EXPANSIONS: &str = "author_id";
32
33pub(crate) const USER_FIELDS: &str =
35 "username,name,public_metrics,profile_image_url,description,location,url";
36
37pub struct XApiHttpClient {
43 pub(crate) client: reqwest::Client,
44 pub(crate) base_url: String,
45 pub(crate) upload_base_url: String,
46 pub(crate) access_token: Arc<RwLock<String>>,
47 pool: Arc<RwLock<Option<DbPool>>>,
48}
49
50impl XApiHttpClient {
51 pub fn new(access_token: String) -> Self {
53 Self {
54 client: reqwest::Client::new(),
55 base_url: DEFAULT_BASE_URL.to_string(),
56 upload_base_url: DEFAULT_UPLOAD_BASE_URL.to_string(),
57 access_token: Arc::new(RwLock::new(access_token)),
58 pool: Arc::new(RwLock::new(None)),
59 }
60 }
61
62 pub fn with_base_url(access_token: String, base_url: String) -> Self {
64 let upload_base_url = base_url.clone();
65 Self {
66 client: reqwest::Client::new(),
67 base_url,
68 upload_base_url,
69 access_token: Arc::new(RwLock::new(access_token)),
70 pool: Arc::new(RwLock::new(None)),
71 }
72 }
73
74 pub async fn set_pool(&self, pool: DbPool) {
79 let mut lock = self.pool.write().await;
80 *lock = Some(pool);
81 }
82
83 pub fn access_token_lock(&self) -> Arc<RwLock<String>> {
85 self.access_token.clone()
86 }
87
88 pub async fn set_access_token(&self, token: String) {
90 let mut lock = self.access_token.write().await;
91 *lock = token;
92 }
93
94 pub(crate) fn parse_rate_limit_headers(headers: &reqwest::header::HeaderMap) -> RateLimitInfo {
96 let remaining = headers
97 .get("x-rate-limit-remaining")
98 .and_then(|v| v.to_str().ok())
99 .and_then(|v| v.parse::<u64>().ok());
100
101 let reset_at = headers
102 .get("x-rate-limit-reset")
103 .and_then(|v| v.to_str().ok())
104 .and_then(|v| v.parse::<u64>().ok());
105
106 RateLimitInfo {
107 remaining,
108 reset_at,
109 }
110 }
111
112 pub(crate) async fn map_error_response(response: reqwest::Response) -> XApiError {
114 let status = response.status().as_u16();
115 let rate_info = Self::parse_rate_limit_headers(response.headers());
116
117 let raw_body = response.text().await.unwrap_or_default();
118 let error_detail = serde_json::from_str::<XApiErrorResponse>(&raw_body).ok();
119 let body = redact_secrets(&raw_body);
120
121 let message = error_detail
122 .as_ref()
123 .and_then(|e| e.detail.clone())
124 .unwrap_or_else(|| body.clone());
125 let message = redact_secrets(&message);
126
127 match status {
128 429 => {
129 let retry_after = rate_info.reset_at.and_then(|reset| {
130 let now = chrono::Utc::now().timestamp() as u64;
131 reset.checked_sub(now)
132 });
133 XApiError::RateLimited { retry_after }
134 }
135 401 => XApiError::AuthExpired,
136 403 if Self::is_scope_insufficient_message(&message) => {
137 XApiError::ScopeInsufficient { message }
138 }
139 403 => XApiError::Forbidden { message },
140 _ => XApiError::ApiError { status, message },
141 }
142 }
143
144 fn is_scope_insufficient_message(message: &str) -> bool {
145 let normalized = message.to_ascii_lowercase();
146 normalized.contains("scope")
147 && (normalized.contains("insufficient")
148 || normalized.contains("missing")
149 || normalized.contains("not granted")
150 || normalized.contains("required"))
151 }
152
153 pub(crate) fn record_usage(&self, path: &str, method: &str, status_code: u16) {
155 let pool_lock = self.pool.clone();
156 let endpoint = path.to_string();
157 let http_method = method.to_string();
158 let cost = storage::x_api_usage::estimate_cost(&endpoint, &http_method);
159 let final_cost = if status_code < 400 { cost } else { 0.0 };
161 tokio::spawn(async move {
162 if let Some(pool) = pool_lock.read().await.as_ref() {
163 if let Err(e) = storage::x_api_usage::insert_x_api_usage(
164 pool,
165 &endpoint,
166 &http_method,
167 status_code as i32,
168 final_cost,
169 )
170 .await
171 {
172 tracing::warn!(error = %e, "Failed to record X API usage");
173 }
174 }
175 });
176 }
177
178 pub(crate) async fn get(
180 &self,
181 path: &str,
182 query: &[(&str, &str)],
183 ) -> Result<reqwest::Response, XApiError> {
184 let token = self.access_token.read().await;
185 let url = format!("{}{}", self.base_url, path);
186
187 let response = self
188 .client
189 .get(&url)
190 .bearer_auth(&*token)
191 .query(query)
192 .send()
193 .await
194 .map_err(|e| XApiError::Network { source: e })?;
195
196 let status_code = response.status().as_u16();
197 let rate_info = Self::parse_rate_limit_headers(response.headers());
198 tracing::debug!(
199 path,
200 remaining = ?rate_info.remaining,
201 reset_at = ?rate_info.reset_at,
202 "X API response"
203 );
204
205 self.record_usage(path, "GET", status_code);
206
207 if response.status().is_success() {
208 Ok(response)
209 } else {
210 Err(Self::map_error_response(response).await)
211 }
212 }
213
214 pub(crate) async fn delete(&self, path: &str) -> Result<reqwest::Response, XApiError> {
216 let token = self.access_token.read().await;
217 let url = format!("{}{}", self.base_url, path);
218
219 let response = self
220 .client
221 .delete(&url)
222 .bearer_auth(&*token)
223 .send()
224 .await
225 .map_err(|e| XApiError::Network { source: e })?;
226
227 let status_code = response.status().as_u16();
228 let rate_info = Self::parse_rate_limit_headers(response.headers());
229 tracing::debug!(
230 path,
231 remaining = ?rate_info.remaining,
232 reset_at = ?rate_info.reset_at,
233 "X API response"
234 );
235
236 self.record_usage(path, "DELETE", status_code);
237
238 if response.status().is_success() {
239 Ok(response)
240 } else {
241 Err(Self::map_error_response(response).await)
242 }
243 }
244
245 pub(crate) async fn post_json<T: serde::Serialize>(
247 &self,
248 path: &str,
249 body: &T,
250 ) -> Result<reqwest::Response, XApiError> {
251 let token = self.access_token.read().await;
252 let url = format!("{}{}", self.base_url, path);
253
254 let response = self
255 .client
256 .post(&url)
257 .bearer_auth(&*token)
258 .json(body)
259 .send()
260 .await
261 .map_err(|e| XApiError::Network { source: e })?;
262
263 let status_code = response.status().as_u16();
264 let rate_info = Self::parse_rate_limit_headers(response.headers());
265 tracing::debug!(
266 path,
267 remaining = ?rate_info.remaining,
268 reset_at = ?rate_info.reset_at,
269 "X API response"
270 );
271
272 self.record_usage(path, "POST", status_code);
273
274 if response.status().is_success() {
275 Ok(response)
276 } else {
277 Err(Self::map_error_response(response).await)
278 }
279 }
280}