1use std::sync::Arc;
4
5use axum::extract::State;
6use axum::Json;
7use serde::Deserialize;
8use serde_json::{json, Value};
9use tuitbot_core::content::{
10 serialize_blocks_for_storage, tweet_weighted_len, validate_thread_blocks, ThreadBlock,
11 MAX_TWEET_CHARS,
12};
13use tuitbot_core::storage::{action_log, approval_queue, scheduled_content};
14use tuitbot_core::x_api::{XApiClient, XApiHttpClient};
15
16use crate::account::{require_mutate, AccountContext};
17use crate::error::ApiError;
18use crate::state::AppState;
19use crate::ws::{AccountWsEvent, WsEvent};
20
21use super::read_approval_mode;
22
23#[derive(Debug, Deserialize)]
25pub struct ThreadBlockRequest {
26 pub id: String,
28 pub text: String,
30 #[serde(default)]
32 pub media_paths: Vec<String>,
33 pub order: u32,
35}
36
37impl ThreadBlockRequest {
38 pub(crate) fn into_core(self) -> ThreadBlock {
40 ThreadBlock {
41 id: self.id,
42 text: self.text,
43 media_paths: self.media_paths,
44 order: self.order,
45 }
46 }
47}
48
49#[derive(Deserialize)]
51pub struct ComposeTweetRequest {
52 pub text: String,
54 pub scheduled_for: Option<String>,
56}
57
58pub async fn compose_tweet(
60 State(state): State<Arc<AppState>>,
61 ctx: AccountContext,
62 Json(body): Json<ComposeTweetRequest>,
63) -> Result<Json<Value>, ApiError> {
64 require_mutate(&ctx)?;
65
66 let text = body.text.trim();
67 if text.is_empty() {
68 return Err(ApiError::BadRequest("text is required".to_string()));
69 }
70
71 let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
73
74 if approval_mode {
75 let id = approval_queue::enqueue_for(
76 &state.db,
77 &ctx.account_id,
78 "tweet",
79 "", "", text,
82 "", "", 0.0,
85 "[]",
86 )
87 .await?;
88
89 let _ = state.event_tx.send(AccountWsEvent {
90 account_id: ctx.account_id.clone(),
91 event: WsEvent::ApprovalQueued {
92 id,
93 action_type: "tweet".to_string(),
94 content: text.to_string(),
95 media_paths: vec![],
96 },
97 });
98
99 Ok(Json(json!({
100 "status": "queued_for_approval",
101 "id": id,
102 })))
103 } else {
104 Ok(Json(json!({
106 "status": "accepted",
107 "text": text,
108 "scheduled_for": body.scheduled_for,
109 })))
110 }
111}
112
113#[derive(Deserialize)]
115pub struct ComposeThreadRequest {
116 pub tweets: Vec<String>,
118 pub scheduled_for: Option<String>,
120}
121
122pub async fn compose_thread(
124 State(state): State<Arc<AppState>>,
125 ctx: AccountContext,
126 Json(body): Json<ComposeThreadRequest>,
127) -> Result<Json<Value>, ApiError> {
128 require_mutate(&ctx)?;
129
130 if body.tweets.is_empty() {
131 return Err(ApiError::BadRequest(
132 "tweets array must not be empty".to_string(),
133 ));
134 }
135
136 let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
137 let combined = body.tweets.join("\n---\n");
138
139 if approval_mode {
140 let id = approval_queue::enqueue_for(
141 &state.db,
142 &ctx.account_id,
143 "thread",
144 "",
145 "",
146 &combined,
147 "",
148 "",
149 0.0,
150 "[]",
151 )
152 .await?;
153
154 let _ = state.event_tx.send(AccountWsEvent {
155 account_id: ctx.account_id.clone(),
156 event: WsEvent::ApprovalQueued {
157 id,
158 action_type: "thread".to_string(),
159 content: combined,
160 media_paths: vec![],
161 },
162 });
163
164 Ok(Json(json!({
165 "status": "queued_for_approval",
166 "id": id,
167 })))
168 } else {
169 Ok(Json(json!({
170 "status": "accepted",
171 "tweet_count": body.tweets.len(),
172 "scheduled_for": body.scheduled_for,
173 })))
174 }
175}
176
177#[derive(Deserialize)]
179pub struct ComposeRequest {
180 pub content_type: String,
182 pub content: String,
184 pub scheduled_for: Option<String>,
186 #[serde(default)]
188 pub media_paths: Option<Vec<String>>,
189 #[serde(default)]
191 pub blocks: Option<Vec<ThreadBlockRequest>>,
192}
193
194pub async fn compose(
196 State(state): State<Arc<AppState>>,
197 ctx: AccountContext,
198 Json(mut body): Json<ComposeRequest>,
199) -> Result<Json<Value>, ApiError> {
200 require_mutate(&ctx)?;
201
202 let blocks = body.blocks.take();
203
204 match body.content_type.as_str() {
205 "tweet" => compose_tweet_flow(&state, &ctx, &body).await,
206 "thread" => {
207 if let Some(blocks) = blocks {
208 compose_thread_blocks_flow(&state, &ctx, &body, blocks).await
209 } else {
210 compose_thread_legacy_flow(&state, &ctx, &body).await
211 }
212 }
213 _ => Err(ApiError::BadRequest(
214 "content_type must be 'tweet' or 'thread'".to_string(),
215 )),
216 }
217}
218
219async fn compose_tweet_flow(
221 state: &AppState,
222 ctx: &AccountContext,
223 body: &ComposeRequest,
224) -> Result<Json<Value>, ApiError> {
225 let content = body.content.trim().to_string();
226 if content.is_empty() {
227 return Err(ApiError::BadRequest("content is required".to_string()));
228 }
229 if tweet_weighted_len(&content) > MAX_TWEET_CHARS {
230 return Err(ApiError::BadRequest(
231 "tweet content must not exceed 280 characters".to_string(),
232 ));
233 }
234
235 persist_content(state, ctx, body, &content).await
236}
237
238async fn compose_thread_legacy_flow(
240 state: &AppState,
241 ctx: &AccountContext,
242 body: &ComposeRequest,
243) -> Result<Json<Value>, ApiError> {
244 let content = body.content.trim().to_string();
245 if content.is_empty() {
246 return Err(ApiError::BadRequest("content is required".to_string()));
247 }
248
249 let tweets: Vec<String> = serde_json::from_str(&content).map_err(|_| {
250 ApiError::BadRequest("thread content must be a JSON array of strings".to_string())
251 })?;
252
253 if tweets.is_empty() {
254 return Err(ApiError::BadRequest(
255 "thread must contain at least one tweet".to_string(),
256 ));
257 }
258
259 for (i, tweet) in tweets.iter().enumerate() {
260 if tweet_weighted_len(tweet) > MAX_TWEET_CHARS {
261 return Err(ApiError::BadRequest(format!(
262 "tweet {} exceeds 280 characters",
263 i + 1
264 )));
265 }
266 }
267
268 persist_content(state, ctx, body, &content).await
269}
270
271async fn compose_thread_blocks_flow(
273 state: &AppState,
274 ctx: &AccountContext,
275 body: &ComposeRequest,
276 block_requests: Vec<ThreadBlockRequest>,
277) -> Result<Json<Value>, ApiError> {
278 let core_blocks: Vec<ThreadBlock> = block_requests.into_iter().map(|b| b.into_core()).collect();
279
280 validate_thread_blocks(&core_blocks).map_err(|e| ApiError::BadRequest(e.api_message()))?;
281
282 let block_ids: Vec<String> = {
283 let mut sorted = core_blocks.clone();
284 sorted.sort_by_key(|b| b.order);
285 sorted.iter().map(|b| b.id.clone()).collect()
286 };
287
288 let content = serialize_blocks_for_storage(&core_blocks);
289
290 let all_media: Vec<String> = {
292 let mut sorted = core_blocks.clone();
293 sorted.sort_by_key(|b| b.order);
294 sorted.iter().flat_map(|b| b.media_paths.clone()).collect()
295 };
296
297 let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
298
299 if approval_mode {
300 let media_json = serde_json::to_string(&all_media).unwrap_or_else(|_| "[]".to_string());
301 let id = approval_queue::enqueue_for(
302 &state.db,
303 &ctx.account_id,
304 "thread",
305 "",
306 "",
307 &content,
308 "",
309 "",
310 0.0,
311 &media_json,
312 )
313 .await?;
314
315 let _ = state.event_tx.send(AccountWsEvent {
316 account_id: ctx.account_id.clone(),
317 event: WsEvent::ApprovalQueued {
318 id,
319 action_type: "thread".to_string(),
320 content: content.clone(),
321 media_paths: all_media,
322 },
323 });
324
325 Ok(Json(json!({
326 "status": "queued_for_approval",
327 "id": id,
328 "block_ids": block_ids,
329 })))
330 } else if body.scheduled_for.is_some() {
331 let id = scheduled_content::insert_for(
333 &state.db,
334 &ctx.account_id,
335 "thread",
336 &content,
337 body.scheduled_for.as_deref(),
338 )
339 .await?;
340
341 let _ = state.event_tx.send(AccountWsEvent {
342 account_id: ctx.account_id.clone(),
343 event: WsEvent::ContentScheduled {
344 id,
345 content_type: "thread".to_string(),
346 scheduled_for: body.scheduled_for.clone(),
347 },
348 });
349
350 Ok(Json(json!({
351 "status": "scheduled",
352 "id": id,
353 "block_ids": block_ids,
354 })))
355 } else {
356 let can_post = super::can_post_for(state, &ctx.account_id).await;
358 if !can_post {
359 let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
360 let id = scheduled_content::insert_for(
361 &state.db,
362 &ctx.account_id,
363 "thread",
364 &content,
365 Some(&scheduled_for),
366 )
367 .await?;
368
369 let _ = state.event_tx.send(AccountWsEvent {
370 account_id: ctx.account_id.clone(),
371 event: WsEvent::ContentScheduled {
372 id,
373 content_type: "thread".to_string(),
374 scheduled_for: Some(scheduled_for),
375 },
376 });
377
378 return Ok(Json(json!({
379 "status": "scheduled",
380 "id": id,
381 "block_ids": block_ids,
382 })));
383 }
384
385 try_post_thread_now(state, ctx, &core_blocks).await
386 }
387}
388
389async fn persist_content(
391 state: &AppState,
392 ctx: &AccountContext,
393 body: &ComposeRequest,
394 content: &str,
395) -> Result<Json<Value>, ApiError> {
396 let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
397
398 if approval_mode {
399 let media_paths = body.media_paths.as_deref().unwrap_or(&[]);
400 let media_json = serde_json::to_string(media_paths).unwrap_or_else(|_| "[]".to_string());
401 let id = approval_queue::enqueue_for(
402 &state.db,
403 &ctx.account_id,
404 &body.content_type,
405 "",
406 "",
407 content,
408 "",
409 "",
410 0.0,
411 &media_json,
412 )
413 .await?;
414
415 let _ = state.event_tx.send(AccountWsEvent {
416 account_id: ctx.account_id.clone(),
417 event: WsEvent::ApprovalQueued {
418 id,
419 action_type: body.content_type.clone(),
420 content: content.to_string(),
421 media_paths: media_paths.to_vec(),
422 },
423 });
424
425 Ok(Json(json!({
426 "status": "queued_for_approval",
427 "id": id,
428 })))
429 } else if body.scheduled_for.is_some() {
430 let id = scheduled_content::insert_for(
432 &state.db,
433 &ctx.account_id,
434 &body.content_type,
435 content,
436 body.scheduled_for.as_deref(),
437 )
438 .await?;
439
440 let _ = state.event_tx.send(AccountWsEvent {
441 account_id: ctx.account_id.clone(),
442 event: WsEvent::ContentScheduled {
443 id,
444 content_type: body.content_type.clone(),
445 scheduled_for: body.scheduled_for.clone(),
446 },
447 });
448
449 Ok(Json(json!({
450 "status": "scheduled",
451 "id": id,
452 })))
453 } else {
454 let can_post = super::can_post_for(state, &ctx.account_id).await;
457 if !can_post {
458 let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
459 let id = scheduled_content::insert_for(
460 &state.db,
461 &ctx.account_id,
462 &body.content_type,
463 content,
464 Some(&scheduled_for),
465 )
466 .await?;
467
468 let _ = state.event_tx.send(AccountWsEvent {
469 account_id: ctx.account_id.clone(),
470 event: WsEvent::ContentScheduled {
471 id,
472 content_type: body.content_type.clone(),
473 scheduled_for: Some(scheduled_for),
474 },
475 });
476
477 return Ok(Json(json!({
478 "status": "scheduled",
479 "id": id,
480 })));
481 }
482
483 try_post_now(state, ctx, &body.content_type, content).await
484 }
485}
486
487async fn build_x_client(
492 state: &AppState,
493 ctx: &AccountContext,
494) -> Result<Box<dyn XApiClient>, ApiError> {
495 let config = super::read_effective_config(state, &ctx.account_id).await?;
496
497 match config.x_api.provider_backend.as_str() {
498 "scraper" => {
499 let account_data =
500 tuitbot_core::storage::accounts::account_data_dir(&state.data_dir, &ctx.account_id);
501 let client = tuitbot_core::x_api::LocalModeXClient::with_session(
502 config.x_api.scraper_allow_mutations,
503 &account_data,
504 )
505 .await;
506 Ok(Box::new(client))
507 }
508 "x_api" => {
509 let token_path = tuitbot_core::storage::accounts::account_token_path(
510 &state.data_dir,
511 &ctx.account_id,
512 );
513 let access_token = state
514 .get_x_access_token(&token_path, &ctx.account_id)
515 .await
516 .map_err(|e| {
517 ApiError::BadRequest(format!(
518 "X API authentication failed — re-link your account in Settings. ({e})"
519 ))
520 })?;
521 Ok(Box::new(XApiHttpClient::new(access_token)))
522 }
523 _ => Err(ApiError::BadRequest(
524 "Direct posting requires X API credentials or a browser session. \
525 Configure in Settings → X API."
526 .to_string(),
527 )),
528 }
529}
530
531async fn try_post_now(
533 state: &AppState,
534 ctx: &AccountContext,
535 content_type: &str,
536 content: &str,
537) -> Result<Json<Value>, ApiError> {
538 let client = build_x_client(state, ctx).await?;
539
540 let posted = client
541 .post_tweet(content)
542 .await
543 .map_err(|e| ApiError::Internal(format!("Failed to post tweet: {e}")))?;
544
545 let metadata = json!({
546 "tweet_id": posted.id,
547 "content_type": content_type,
548 "source": "compose",
549 });
550 let _ = action_log::log_action_for(
551 &state.db,
552 &ctx.account_id,
553 "tweet_posted",
554 "success",
555 Some(&format!("Posted tweet {}", posted.id)),
556 Some(&metadata.to_string()),
557 )
558 .await;
559
560 Ok(Json(json!({
561 "status": "posted",
562 "tweet_id": posted.id,
563 })))
564}
565
566async fn try_post_thread_now(
569 state: &AppState,
570 ctx: &AccountContext,
571 blocks: &[ThreadBlock],
572) -> Result<Json<Value>, ApiError> {
573 let client = build_x_client(state, ctx).await?;
574
575 let mut sorted: Vec<&ThreadBlock> = blocks.iter().collect();
576 sorted.sort_by_key(|b| b.order);
577
578 let mut tweet_ids: Vec<String> = Vec::with_capacity(sorted.len());
579
580 for (i, block) in sorted.iter().enumerate() {
581 let posted = if i == 0 {
582 client.post_tweet(&block.text).await
583 } else {
584 client.reply_to_tweet(&block.text, &tweet_ids[i - 1]).await
585 };
586
587 match posted {
588 Ok(p) => tweet_ids.push(p.id),
589 Err(e) => {
590 let metadata = json!({
592 "posted_tweet_ids": tweet_ids,
593 "failed_at_index": i,
594 "error": e.to_string(),
595 "source": "compose",
596 });
597 let _ = action_log::log_action_for(
598 &state.db,
599 &ctx.account_id,
600 "thread_posted",
601 "partial_failure",
602 Some(&format!(
603 "Thread failed at tweet {}/{}: {e}",
604 i + 1,
605 sorted.len()
606 )),
607 Some(&metadata.to_string()),
608 )
609 .await;
610
611 return Err(ApiError::Internal(format!(
612 "Thread failed at tweet {}/{}: {e}. \
613 {} tweet(s) were posted and cannot be undone.",
614 i + 1,
615 sorted.len(),
616 tweet_ids.len()
617 )));
618 }
619 }
620 }
621
622 let metadata = json!({
623 "tweet_ids": tweet_ids,
624 "content_type": "thread",
625 "source": "compose",
626 });
627 let _ = action_log::log_action_for(
628 &state.db,
629 &ctx.account_id,
630 "thread_posted",
631 "success",
632 Some(&format!("Posted thread ({} tweets)", tweet_ids.len())),
633 Some(&metadata.to_string()),
634 )
635 .await;
636
637 Ok(Json(json!({
638 "status": "posted",
639 "tweet_ids": tweet_ids,
640 })))
641}