use crate::context::AppContext;
use crate::errors::XmasterError;
use base64::Engine as _;
use reqwest::Method;
use reqwest_oauth1::OAuthClientProvider;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, OnceCell};
use tracing::warn;
const MAX_RETRIES: u32 = 3;
const BASE: &str = "https://api.x.com/2";
const UPLOAD_URL: &str = "https://upload.twitter.com/1.1/media/upload.json";
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct RateLimitInfo {
pub limit: u32,
pub remaining: u32,
pub reset: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TweetResponse {
pub id: String,
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TweetData {
pub id: String,
pub text: String,
#[serde(default)]
pub author_id: Option<String>,
#[serde(default)]
pub author_username: Option<String>,
#[serde(default)]
pub created_at: Option<String>,
#[serde(default)]
pub public_metrics: Option<TweetMetrics>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TweetMetrics {
#[serde(default)]
pub like_count: u64,
#[serde(default)]
pub retweet_count: u64,
#[serde(default)]
pub reply_count: u64,
#[serde(default)]
pub impression_count: u64,
#[serde(default)]
pub bookmark_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserResponse {
pub id: String,
pub name: String,
pub username: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub public_metrics: Option<UserMetrics>,
#[serde(default)]
pub profile_image_url: Option<String>,
#[serde(default)]
pub verified: Option<bool>,
#[serde(default)]
pub created_at: Option<String>,
}
pub type UserData = UserResponse;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserMetrics {
#[serde(default)]
pub followers_count: u64,
#[serde(default)]
pub following_count: u64,
#[serde(default)]
pub tweet_count: u64,
#[serde(default)]
pub listed_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DmConversation {
pub id: String,
#[serde(default)]
pub participant_ids: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DmMessage {
pub id: String,
#[serde(default)]
pub text: Option<String>,
#[serde(default)]
pub sender_id: Option<String>,
#[serde(default)]
pub created_at: Option<String>,
}
#[derive(Deserialize)]
struct ApiResponse<T> {
data: Option<T>,
#[serde(default)]
includes: Option<Value>,
#[serde(default)]
errors: Option<Vec<ApiErrorDetail>>,
}
#[derive(Deserialize)]
struct ApiErrorDetail {
#[serde(default)]
message: String,
#[serde(default)]
detail: Option<String>,
}
#[derive(Deserialize)]
struct MediaUploadResponse {
media_id_string: Option<String>,
media_id: Option<u64>,
}
pub struct XApi {
ctx: Arc<AppContext>,
cached_user_id: OnceCell<String>,
last_rate_limit: Mutex<Option<RateLimitInfo>>,
}
impl XApi {
pub fn new(ctx: Arc<AppContext>) -> Self {
Self {
ctx,
cached_user_id: OnceCell::new(),
last_rate_limit: Mutex::new(None),
}
}
pub async fn last_rate_limit(&self) -> Option<RateLimitInfo> {
*self.last_rate_limit.lock().await
}
pub async fn get_rate_limits(&self) -> Result<RateLimitInfo, XmasterError> {
self.request(Method::GET, &format!("{BASE}/users/me?user.fields=id"), None)
.await?;
self.last_rate_limit
.lock()
.await
.ok_or_else(|| XmasterError::Api {
provider: "x",
code: "no_rate_limit_headers",
message: "No rate limit headers in response".into(),
})
}
fn secrets(&self) -> reqwest_oauth1::Secrets<'_> {
let k = &self.ctx.config.keys;
reqwest_oauth1::Secrets::new(&k.api_key, &k.api_secret)
.token(&k.access_token, &k.access_token_secret)
}
fn require_auth(&self) -> Result<(), XmasterError> {
if !self.ctx.config.has_x_auth() {
return Err(XmasterError::AuthMissing {
provider: "x",
message: "X API credentials not configured".into(),
});
}
Ok(())
}
fn parse_rate_limit_headers(headers: &reqwest::header::HeaderMap) -> Option<RateLimitInfo> {
let limit = headers
.get("x-rate-limit-limit")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u32>().ok())?;
let remaining = headers
.get("x-rate-limit-remaining")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u32>().ok())?;
let reset = headers
.get("x-rate-limit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())?;
Some(RateLimitInfo { limit, remaining, reset })
}
async fn request(
&self,
method: Method,
url: &str,
body: Option<Value>,
) -> Result<Value, XmasterError> {
let mut last_err: Option<XmasterError> = None;
for attempt in 0..MAX_RETRIES {
match self.request_once(method.clone(), url, body.clone()).await {
Ok(val) => return Ok(val),
Err(e) if e.is_retryable() && attempt + 1 < MAX_RETRIES => {
let base_ms = 1000u64 * (1u64 << attempt);
let jitter_ms = rand::random::<u64>() % 500;
let mut delay = Duration::from_millis(base_ms + jitter_ms);
if let XmasterError::RateLimited { reset_at, .. } = &e {
if *reset_at > 0 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if *reset_at > now {
let wait = (*reset_at - now).min(60) + (jitter_ms / 1000);
delay = Duration::from_secs(wait);
}
}
}
warn!(
attempt = attempt + 1,
max = MAX_RETRIES,
delay_ms = delay.as_millis() as u64,
error = %e,
"Retrying after transient error"
);
tokio::time::sleep(delay).await;
last_err = Some(e);
}
Err(e) => return Err(e),
}
}
Err(last_err.unwrap_or_else(|| XmasterError::Api {
provider: "x",
code: "retry_exhausted",
message: "All retry attempts failed".into(),
}))
}
async fn request_once(
&self,
method: Method,
url: &str,
body: Option<Value>,
) -> Result<Value, XmasterError> {
self.require_auth()?;
let resp = match method {
Method::GET => {
self.ctx.client.clone().oauth1(self.secrets())
.get(url)
.send().await?
}
Method::POST => {
let mut b = self.ctx.client.clone().oauth1(self.secrets()).post(url);
if let Some(ref json) = body {
b = b.header("Content-Type", "application/json")
.body(serde_json::to_string(json)?);
}
b.send().await?
}
Method::DELETE => {
self.ctx.client.clone().oauth1(self.secrets())
.delete(url)
.send().await?
}
Method::PUT => {
let mut b = self.ctx.client.clone().oauth1(self.secrets()).put(url);
if let Some(ref json) = body {
b = b.header("Content-Type", "application/json")
.body(serde_json::to_string(json)?);
}
b.send().await?
}
_ => {
return Err(XmasterError::Api {
provider: "x",
code: "unsupported_method",
message: format!("Unsupported HTTP method: {method}"),
});
}
};
let status = resp.status();
if let Some(rl) = Self::parse_rate_limit_headers(resp.headers()) {
*self.last_rate_limit.lock().await = Some(rl);
}
if status == 401 || status == 403 {
let text = resp.text().await.unwrap_or_default();
let message = if text.contains("oauth1-permissions") {
format!(
"HTTP {status} Forbidden: {text}. \
Fix: Your Access Token was likely generated before enabling Read+Write. \
Go to developer.x.com → your app → Keys and tokens → Regenerate Access Token and Secret, \
then run: xmaster config set keys.access_token NEW_TOKEN && \
xmaster config set keys.access_token_secret NEW_SECRET"
)
} else {
format!("HTTP {status}: {text}")
};
return Err(XmasterError::AuthMissing {
provider: "x",
message,
});
}
if status == 429 {
let reset_at = resp
.headers()
.get("x-rate-limit-reset")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.or_else(|| {
resp.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.map(|secs| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
+ secs
})
})
.unwrap_or(0);
return Err(XmasterError::RateLimited {
provider: "x",
reset_at,
});
}
if status.as_u16() >= 500 {
return Err(XmasterError::ServerError {
status: status.as_u16(),
});
}
let text = resp.text().await?;
if text.is_empty() {
return Ok(Value::Null);
}
let val: Value = serde_json::from_str(&text).map_err(|_| XmasterError::Api {
provider: "x",
code: "json_parse",
message: format!("Failed to parse response: {}", &text[..text.len().min(200)]),
})?;
if !status.is_success() {
let msg = val["detail"]
.as_str()
.or_else(|| val["title"].as_str())
.unwrap_or("Unknown error");
return Err(XmasterError::Api {
provider: "x",
code: "api_error",
message: format!("HTTP {status}: {msg}"),
});
}
Ok(val)
}
async fn request_data<T: serde::de::DeserializeOwned>(
&self,
method: Method,
url: &str,
body: Option<Value>,
) -> Result<T, XmasterError> {
let val = self.request(method, url, body).await?;
let envelope: ApiResponse<T> = serde_json::from_value(val.clone())?;
if let Some(errors) = &envelope.errors {
if envelope.data.is_none() {
let msg = errors
.iter()
.map(|e| e.detail.as_deref().unwrap_or(&e.message))
.collect::<Vec<_>>()
.join("; ");
return Err(XmasterError::Api {
provider: "x",
code: "api_error",
message: msg,
});
}
}
envelope.data.ok_or_else(|| XmasterError::Api {
provider: "x",
code: "no_data",
message: "Response contained no data field".into(),
})
}
async fn request_list<T: serde::de::DeserializeOwned>(
&self,
method: Method,
url: &str,
body: Option<Value>,
) -> Result<(Vec<T>, Option<Value>), XmasterError> {
let val = self.request(method, url, body).await?;
let includes = val.get("includes").cloned();
let envelope: ApiResponse<Vec<T>> = serde_json::from_value(val)?;
Ok((envelope.data.unwrap_or_default(), includes))
}
pub async fn get_authenticated_user_id(&self) -> Result<String, XmasterError> {
self.cached_user_id
.get_or_try_init(|| async {
let user: UserResponse = self.request_data(
Method::GET,
&format!("{BASE}/users/me?user.fields=id"),
None,
).await?;
Ok(user.id)
})
.await
.cloned()
}
fn tweet_fields() -> &'static str {
"tweet.fields=created_at,public_metrics,author_id,conversation_id,entities,lang"
}
fn tweet_expansions() -> &'static str {
"expansions=author_id"
}
fn user_fields_param() -> &'static str {
"user.fields=created_at,description,public_metrics,verified,profile_image_url,username,name"
}
fn merge_authors(tweets: &mut [TweetData], includes: &Option<Value>) {
if let Some(inc) = includes {
if let Some(users) = inc.get("users").and_then(|u| u.as_array()) {
for tweet in tweets.iter_mut() {
if let Some(aid) = &tweet.author_id {
for user in users {
if user.get("id").and_then(|i| i.as_str()) == Some(aid) {
tweet.author_username =
user.get("username").and_then(|u| u.as_str()).map(String::from);
}
}
}
}
}
}
}
pub async fn create_tweet(
&self,
text: &str,
reply_to: Option<&str>,
quote_tweet_id: Option<&str>,
media_ids: Option<&[String]>,
poll_options: Option<&[String]>,
poll_duration: Option<u64>,
) -> Result<TweetResponse, XmasterError> {
let mut body = json!({ "text": text });
if let Some(reply_id) = reply_to {
body["reply"] = json!({ "in_reply_to_tweet_id": reply_id });
}
if let Some(qid) = quote_tweet_id {
body["quote_tweet_id"] = json!(qid);
}
if let Some(ids) = media_ids {
if !ids.is_empty() {
body["media"] = json!({ "media_ids": ids });
}
}
if let Some(opts) = poll_options {
if !opts.is_empty() {
body["poll"] = json!({
"options": opts,
"duration_minutes": poll_duration.unwrap_or(1440),
});
}
}
self.request_data(Method::POST, &format!("{BASE}/tweets"), Some(body))
.await
}
pub async fn delete_tweet(&self, id: &str) -> Result<(), XmasterError> {
self.request(Method::DELETE, &format!("{BASE}/tweets/{id}"), None)
.await?;
Ok(())
}
pub async fn like_tweet(&self, tweet_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::POST,
&format!("{BASE}/users/{uid}/likes"),
Some(json!({ "tweet_id": tweet_id })),
)
.await?;
Ok(())
}
pub async fn unlike_tweet(&self, tweet_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::DELETE,
&format!("{BASE}/users/{uid}/likes/{tweet_id}"),
None,
)
.await?;
Ok(())
}
pub async fn retweet(&self, tweet_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::POST,
&format!("{BASE}/users/{uid}/retweets"),
Some(json!({ "tweet_id": tweet_id })),
)
.await?;
Ok(())
}
pub async fn unretweet(&self, tweet_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::DELETE,
&format!("{BASE}/users/{uid}/retweets/{tweet_id}"),
None,
)
.await?;
Ok(())
}
pub async fn bookmark_tweet(&self, tweet_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::POST,
&format!("{BASE}/users/{uid}/bookmarks"),
Some(json!({ "tweet_id": tweet_id })),
)
.await?;
Ok(())
}
pub async fn unbookmark_tweet(&self, tweet_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::DELETE,
&format!("{BASE}/users/{uid}/bookmarks/{tweet_id}"),
None,
)
.await?;
Ok(())
}
pub async fn follow_user(&self, target_user_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::POST,
&format!("{BASE}/users/{uid}/following"),
Some(json!({ "target_user_id": target_user_id })),
)
.await?;
Ok(())
}
pub async fn unfollow_user(&self, target_user_id: &str) -> Result<(), XmasterError> {
let uid = self.get_authenticated_user_id().await?;
self.request(
Method::DELETE,
&format!("{BASE}/users/{uid}/following/{target_user_id}"),
None,
)
.await?;
Ok(())
}
pub async fn get_user_by_username(&self, username: &str) -> Result<UserResponse, XmasterError> {
let url = format!(
"{BASE}/users/by/username/{username}?{fields}",
fields = Self::user_fields_param()
);
self.request_data(Method::GET, &url, None).await
}
pub async fn get_me(&self) -> Result<UserResponse, XmasterError> {
let url = format!("{BASE}/users/me?{fields}", fields = Self::user_fields_param());
self.request_data(Method::GET, &url, None).await
}
pub async fn get_user_tweets(
&self,
user_id: &str,
count: usize,
) -> Result<Vec<TweetData>, XmasterError> {
let max = count.clamp(5, 100);
let url = format!(
"{BASE}/users/{user_id}/tweets?max_results={max}&{tf}&{exp}&{uf}",
tf = Self::tweet_fields(),
exp = Self::tweet_expansions(),
uf = Self::user_fields_param(),
);
let (mut tweets, includes) =
self.request_list::<TweetData>(Method::GET, &url, None).await?;
Self::merge_authors(&mut tweets, &includes);
Ok(tweets)
}
pub async fn get_user_mentions(
&self,
user_id: &str,
count: usize,
) -> Result<Vec<TweetData>, XmasterError> {
self.get_user_mentions_since(user_id, count, None).await
}
pub async fn get_user_mentions_since(
&self,
user_id: &str,
count: usize,
since_id: Option<&str>,
) -> Result<Vec<TweetData>, XmasterError> {
let max = count.clamp(5, 100);
let since_param = since_id
.map(|id| format!("&since_id={id}"))
.unwrap_or_default();
let url = format!(
"{BASE}/users/{user_id}/mentions?max_results={max}&{tf}&{exp}&{uf}{since_param}",
tf = Self::tweet_fields(),
exp = Self::tweet_expansions(),
uf = Self::user_fields_param(),
);
let (mut tweets, includes) =
self.request_list::<TweetData>(Method::GET, &url, None).await?;
Self::merge_authors(&mut tweets, &includes);
Ok(tweets)
}
pub async fn get_home_timeline(
&self,
count: usize,
) -> Result<Vec<TweetData>, XmasterError> {
let user_id = self.get_authenticated_user_id().await?;
let max = count.clamp(1, 100);
let url = format!(
"{BASE}/users/{user_id}/reverse_chronological_timeline?max_results={max}&{tf}&{exp}&{uf}",
tf = Self::tweet_fields(),
exp = Self::tweet_expansions(),
uf = Self::user_fields_param(),
);
let (mut tweets, includes) =
self.request_list::<TweetData>(Method::GET, &url, None).await?;
Self::merge_authors(&mut tweets, &includes);
Ok(tweets)
}
pub async fn get_user_followers(
&self,
user_id: &str,
count: usize,
) -> Result<Vec<UserData>, XmasterError> {
let max = count.clamp(1, 1000);
let url = format!(
"{BASE}/users/{user_id}/followers?max_results={max}&{uf}",
uf = Self::user_fields_param(),
);
let (users, _) = self.request_list::<UserData>(Method::GET, &url, None).await?;
Ok(users)
}
pub async fn get_user_following(
&self,
user_id: &str,
count: usize,
) -> Result<Vec<UserData>, XmasterError> {
let max = count.clamp(1, 1000);
let url = format!(
"{BASE}/users/{user_id}/following?max_results={max}&{uf}",
uf = Self::user_fields_param(),
);
let (users, _) = self.request_list::<UserData>(Method::GET, &url, None).await?;
Ok(users)
}
pub async fn search_tweets(
&self,
query: &str,
mode: &str,
count: usize,
) -> Result<Vec<TweetData>, XmasterError> {
let max = count.clamp(10, 100);
let encoded_query = percent_encoding::utf8_percent_encode(
query,
percent_encoding::NON_ALPHANUMERIC,
);
let sort = match mode {
"relevancy" | "relevant" => "relevancy",
_ => "recency",
};
let url = format!(
"{BASE}/tweets/search/recent?query={encoded_query}&max_results={max}&sort_order={sort}&{tf}&{exp}&{uf}",
tf = Self::tweet_fields(),
exp = Self::tweet_expansions(),
uf = Self::user_fields_param(),
);
let (mut tweets, includes) =
self.request_list::<TweetData>(Method::GET, &url, None).await?;
Self::merge_authors(&mut tweets, &includes);
Ok(tweets)
}
pub async fn get_bookmarks(&self, count: usize) -> Result<Vec<TweetData>, XmasterError> {
let uid = self.get_authenticated_user_id().await?;
let max = count.clamp(1, 100);
let url = format!(
"{BASE}/users/{uid}/bookmarks?max_results={max}&{tf}&{exp}&{uf}",
tf = Self::tweet_fields(),
exp = Self::tweet_expansions(),
uf = Self::user_fields_param(),
);
let (mut tweets, includes) =
self.request_list::<TweetData>(Method::GET, &url, None).await?;
Self::merge_authors(&mut tweets, &includes);
Ok(tweets)
}
pub async fn send_dm(
&self,
participant_id: &str,
text: &str,
) -> Result<(), XmasterError> {
self.request(
Method::POST,
&format!("{BASE}/dm_conversations/with/{participant_id}/messages"),
Some(json!({ "text": text })),
)
.await?;
Ok(())
}
pub async fn get_dm_conversations(
&self,
count: usize,
) -> Result<Vec<DmConversation>, XmasterError> {
let max = count.clamp(1, 100);
let url = format!(
"{BASE}/dm_events?max_results={max}&event_types=MessageCreate&dm_event.fields=id,text,sender_id,created_at,dm_conversation_id,participant_ids"
);
let val = self.request(Method::GET, &url, None).await?;
let events = val
.get("data")
.and_then(|d| d.as_array())
.cloned()
.unwrap_or_default();
let mut seen = std::collections::HashSet::new();
let mut convos = Vec::new();
for event in &events {
if let Some(cid) = event.get("dm_conversation_id").and_then(|c| c.as_str()) {
if seen.insert(cid.to_string()) {
let participant_ids = event
.get("participant_ids")
.and_then(|p| p.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
convos.push(DmConversation {
id: cid.to_string(),
participant_ids,
});
}
}
}
Ok(convos)
}
pub async fn get_dm_messages(
&self,
conversation_id: &str,
count: usize,
) -> Result<Vec<DmMessage>, XmasterError> {
let max = count.clamp(1, 100);
let url = format!(
"{BASE}/dm_conversations/{conversation_id}/dm_events?max_results={max}&event_types=MessageCreate&dm_event.fields=id,text,sender_id,created_at"
);
let val = self.request(Method::GET, &url, None).await?;
let events = val
.get("data")
.and_then(|d| d.as_array())
.cloned()
.unwrap_or_default();
let messages: Vec<DmMessage> = events
.into_iter()
.filter_map(|e| serde_json::from_value(e).ok())
.collect();
Ok(messages)
}
pub async fn upload_media(&self, file_path: &str) -> Result<String, XmasterError> {
self.require_auth()?;
let path = Path::new(file_path);
if !path.exists() {
return Err(XmasterError::Media(format!("File not found: {file_path}")));
}
let file_bytes = tokio::fs::read(path).await?;
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "media".into());
let mime = match path.extension().and_then(|e| e.to_str()) {
Some("png") => "image/png",
Some("jpg" | "jpeg") => "image/jpeg",
Some("gif") => "image/gif",
Some("webp") => "image/webp",
Some("mp4") => "video/mp4",
Some("mov") => "video/quicktime",
_ => "application/octet-stream",
};
let is_video = mime.starts_with("video/");
let category = if is_video { "tweet_video" } else { "tweet_image" };
let max_size = if is_video {
512 * 1024 * 1024
} else if mime == "image/gif" {
15 * 1024 * 1024
} else {
5 * 1024 * 1024
};
if file_bytes.len() > max_size {
return Err(XmasterError::Media(format!(
"File too large: {}MB (max {}MB for {})",
file_bytes.len() / 1024 / 1024,
max_size / 1024 / 1024,
if is_video { "video" } else { "image" },
)));
}
if !is_video && file_bytes.len() < 5_000_000 {
return self.simple_upload(&file_bytes, &file_name).await;
}
self.chunked_upload(&file_bytes, mime, category).await
}
async fn simple_upload(
&self,
data: &[u8],
file_name: &str,
) -> Result<String, XmasterError> {
let part = reqwest::multipart::Part::bytes(data.to_vec())
.file_name(file_name.to_string());
let form = reqwest::multipart::Form::new().part("media", part);
let resp = self.ctx.client.clone().oauth1(self.secrets())
.post(UPLOAD_URL)
.multipart(form)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(XmasterError::Media(format!(
"Upload failed (HTTP {status}): {text}"
)));
}
let upload: MediaUploadResponse = resp.json().await?;
upload
.media_id_string
.or_else(|| upload.media_id.map(|id| id.to_string()))
.ok_or_else(|| XmasterError::Media("No media_id in upload response".into()))
}
async fn chunked_upload(
&self,
data: &[u8],
mime: &str,
category: &str,
) -> Result<String, XmasterError> {
let total = data.len().to_string();
let resp = self.ctx.client.clone().oauth1(self.secrets())
.post(UPLOAD_URL)
.form(&[
("command", "INIT"),
("media_type", mime),
("total_bytes", total.as_str()),
("media_category", category),
])
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(XmasterError::Media(format!("INIT failed: {text}")));
}
let init_resp: MediaUploadResponse = resp.json().await?;
let media_id = init_resp
.media_id_string
.or_else(|| init_resp.media_id.map(|id| id.to_string()))
.ok_or_else(|| XmasterError::Media("No media_id from INIT".into()))?;
let chunk_size = 1024 * 1024;
let total_chunks = (data.len() + chunk_size - 1) / chunk_size;
for (i, chunk) in data.chunks(chunk_size).enumerate() {
if data.len() > 5_000_000 {
eprintln!(" Uploading chunk {}/{} ...", i + 1, total_chunks);
}
let b64_chunk = base64::engine::general_purpose::STANDARD.encode(chunk);
let seg = i.to_string();
let resp = self.ctx.client.clone().oauth1(self.secrets())
.post(UPLOAD_URL)
.form(&[
("command", "APPEND"),
("media_id", media_id.as_str()),
("segment_index", seg.as_str()),
("media_data", b64_chunk.as_str()),
])
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(XmasterError::Media(format!(
"APPEND segment {i} failed: {text}"
)));
}
}
let resp = self.ctx.client.clone().oauth1(self.secrets())
.post(UPLOAD_URL)
.form(&[("command", "FINALIZE"), ("media_id", media_id.as_str())])
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(XmasterError::Media(format!("FINALIZE failed: {text}")));
}
let finalize: Value = resp.json().await?;
if let Some(info) = finalize.get("processing_info") {
self.wait_for_processing(&media_id, info).await?;
}
Ok(media_id)
}
async fn wait_for_processing(
&self,
media_id: &str,
initial_info: &Value,
) -> Result<(), XmasterError> {
let mut check_after = initial_info
.get("check_after_secs")
.and_then(|v| v.as_u64())
.unwrap_or(5);
const MAX_RETRIES: u32 = 30;
const MAX_TOTAL_SECS: u64 = 300; let mut attempts = 0u32;
let mut elapsed_secs = 0u64;
loop {
tokio::time::sleep(std::time::Duration::from_secs(check_after)).await;
elapsed_secs += check_after;
attempts += 1;
if attempts > MAX_RETRIES || elapsed_secs > MAX_TOTAL_SECS {
return Err(XmasterError::Media(
"Upload processing timed out".into(),
));
}
let url = format!("{UPLOAD_URL}?command=STATUS&media_id={media_id}");
let resp = self.ctx.client.clone().oauth1(self.secrets()).get(&url).send().await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(XmasterError::Media(format!(
"STATUS check failed: {text}"
)));
}
let status: Value = resp.json().await?;
let state = status
.get("processing_info")
.and_then(|p| p.get("state"))
.and_then(|s| s.as_str())
.unwrap_or("succeeded");
match state {
"succeeded" => return Ok(()),
"failed" => {
let error = status
.get("processing_info")
.and_then(|p| p.get("error"))
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.unwrap_or("Unknown processing error");
return Err(XmasterError::Media(format!(
"Media processing failed: {error}"
)));
}
_ => {
check_after = status
.get("processing_info")
.and_then(|p| p.get("check_after_secs"))
.and_then(|v| v.as_u64())
.unwrap_or(5);
}
}
}
}
pub async fn set_media_alt_text(
&self,
media_id: &str,
alt_text: &str,
) -> Result<(), XmasterError> {
self.require_auth()?;
let body = json!({
"media_id": media_id,
"alt_text": { "text": alt_text }
});
let resp = self
.ctx
.client
.clone()
.oauth1(self.secrets())
.post("https://upload.twitter.com/1.1/media/metadata/create.json")
.header("Content-Type", "application/json")
.body(serde_json::to_string(&body)?)
.send()
.await?;
if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(XmasterError::Media(format!(
"Failed to set alt text: {text}"
)));
}
Ok(())
}
}