1use std::sync::Arc;
4
5use axum::extract::State;
6use axum::Json;
7use serde::Deserialize;
8use serde_json::{json, Value};
9use tuitbot_core::content::{
10 serialize_blocks_for_storage, tweet_weighted_len, validate_thread_blocks, ThreadBlock,
11 MAX_TWEET_CHARS,
12};
13use tuitbot_core::storage::provenance::ProvenanceRef;
14use tuitbot_core::storage::{action_log, approval_queue, scheduled_content};
15use tuitbot_core::x_api::{XApiClient, XApiHttpClient};
16
17use crate::account::{require_mutate, AccountContext};
18use crate::error::ApiError;
19use crate::state::AppState;
20use crate::ws::{AccountWsEvent, WsEvent};
21
22use super::read_approval_mode;
23
24#[derive(Debug, Deserialize)]
26pub struct ThreadBlockRequest {
27 pub id: String,
29 pub text: String,
31 #[serde(default)]
33 pub media_paths: Vec<String>,
34 pub order: u32,
36}
37
38impl ThreadBlockRequest {
39 pub(crate) fn into_core(self) -> ThreadBlock {
41 ThreadBlock {
42 id: self.id,
43 text: self.text,
44 media_paths: self.media_paths,
45 order: self.order,
46 }
47 }
48}
49
50#[derive(Deserialize)]
52pub struct ComposeTweetRequest {
53 pub text: String,
55 pub scheduled_for: Option<String>,
57 #[serde(default)]
59 pub provenance: Option<Vec<ProvenanceRef>>,
60}
61
62pub async fn compose_tweet(
64 State(state): State<Arc<AppState>>,
65 ctx: AccountContext,
66 Json(body): Json<ComposeTweetRequest>,
67) -> Result<Json<Value>, ApiError> {
68 require_mutate(&ctx)?;
69
70 let text = body.text.trim();
71 if text.is_empty() {
72 return Err(ApiError::BadRequest("text is required".to_string()));
73 }
74
75 let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
77
78 if approval_mode {
79 let prov_input = build_provenance_input(body.provenance.as_deref());
80
81 let id = approval_queue::enqueue_with_provenance_for(
82 &state.db,
83 &ctx.account_id,
84 "tweet",
85 "", "", text,
88 "", "", 0.0,
91 "[]",
92 None,
93 None,
94 prov_input.as_ref(),
95 )
96 .await?;
97
98 let _ = state.event_tx.send(AccountWsEvent {
99 account_id: ctx.account_id.clone(),
100 event: WsEvent::ApprovalQueued {
101 id,
102 action_type: "tweet".to_string(),
103 content: text.to_string(),
104 media_paths: vec![],
105 },
106 });
107
108 Ok(Json(json!({
109 "status": "queued_for_approval",
110 "id": id,
111 })))
112 } else {
113 Ok(Json(json!({
115 "status": "accepted",
116 "text": text,
117 "scheduled_for": body.scheduled_for,
118 })))
119 }
120}
121
122#[derive(Deserialize)]
124pub struct ComposeThreadRequest {
125 pub tweets: Vec<String>,
127 pub scheduled_for: Option<String>,
129}
130
131pub async fn compose_thread(
133 State(state): State<Arc<AppState>>,
134 ctx: AccountContext,
135 Json(body): Json<ComposeThreadRequest>,
136) -> Result<Json<Value>, ApiError> {
137 require_mutate(&ctx)?;
138
139 if body.tweets.is_empty() {
140 return Err(ApiError::BadRequest(
141 "tweets array must not be empty".to_string(),
142 ));
143 }
144
145 let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
146 let combined = body.tweets.join("\n---\n");
147
148 if approval_mode {
149 let id = approval_queue::enqueue_for(
150 &state.db,
151 &ctx.account_id,
152 "thread",
153 "",
154 "",
155 &combined,
156 "",
157 "",
158 0.0,
159 "[]",
160 )
161 .await?;
162
163 let _ = state.event_tx.send(AccountWsEvent {
164 account_id: ctx.account_id.clone(),
165 event: WsEvent::ApprovalQueued {
166 id,
167 action_type: "thread".to_string(),
168 content: combined,
169 media_paths: vec![],
170 },
171 });
172
173 Ok(Json(json!({
174 "status": "queued_for_approval",
175 "id": id,
176 })))
177 } else {
178 Ok(Json(json!({
179 "status": "accepted",
180 "tweet_count": body.tweets.len(),
181 "scheduled_for": body.scheduled_for,
182 })))
183 }
184}
185
186#[derive(Deserialize)]
188pub struct ComposeRequest {
189 pub content_type: String,
191 pub content: String,
193 pub scheduled_for: Option<String>,
195 #[serde(default)]
197 pub media_paths: Option<Vec<String>>,
198 #[serde(default)]
200 pub blocks: Option<Vec<ThreadBlockRequest>>,
201 #[serde(default)]
203 pub provenance: Option<Vec<ProvenanceRef>>,
204}
205
206pub async fn compose(
208 State(state): State<Arc<AppState>>,
209 ctx: AccountContext,
210 Json(mut body): Json<ComposeRequest>,
211) -> Result<Json<Value>, ApiError> {
212 require_mutate(&ctx)?;
213
214 let blocks = body.blocks.take();
215
216 match body.content_type.as_str() {
217 "tweet" => compose_tweet_flow(&state, &ctx, &body).await,
218 "thread" => {
219 if let Some(blocks) = blocks {
220 compose_thread_blocks_flow(&state, &ctx, &body, blocks).await
221 } else {
222 compose_thread_legacy_flow(&state, &ctx, &body).await
223 }
224 }
225 _ => Err(ApiError::BadRequest(
226 "content_type must be 'tweet' or 'thread'".to_string(),
227 )),
228 }
229}
230
231async fn compose_tweet_flow(
233 state: &AppState,
234 ctx: &AccountContext,
235 body: &ComposeRequest,
236) -> Result<Json<Value>, ApiError> {
237 let content = body.content.trim().to_string();
238 if content.is_empty() {
239 return Err(ApiError::BadRequest("content is required".to_string()));
240 }
241 if tweet_weighted_len(&content) > MAX_TWEET_CHARS {
242 return Err(ApiError::BadRequest(
243 "tweet content must not exceed 280 characters".to_string(),
244 ));
245 }
246
247 persist_content(state, ctx, body, &content).await
248}
249
250async fn compose_thread_legacy_flow(
252 state: &AppState,
253 ctx: &AccountContext,
254 body: &ComposeRequest,
255) -> Result<Json<Value>, ApiError> {
256 let content = body.content.trim().to_string();
257 if content.is_empty() {
258 return Err(ApiError::BadRequest("content is required".to_string()));
259 }
260
261 let tweets: Vec<String> = serde_json::from_str(&content).map_err(|_| {
262 ApiError::BadRequest("thread content must be a JSON array of strings".to_string())
263 })?;
264
265 if tweets.is_empty() {
266 return Err(ApiError::BadRequest(
267 "thread must contain at least one tweet".to_string(),
268 ));
269 }
270
271 for (i, tweet) in tweets.iter().enumerate() {
272 if tweet_weighted_len(tweet) > MAX_TWEET_CHARS {
273 return Err(ApiError::BadRequest(format!(
274 "tweet {} exceeds 280 characters",
275 i + 1
276 )));
277 }
278 }
279
280 persist_content(state, ctx, body, &content).await
281}
282
283async fn compose_thread_blocks_flow(
285 state: &AppState,
286 ctx: &AccountContext,
287 body: &ComposeRequest,
288 block_requests: Vec<ThreadBlockRequest>,
289) -> Result<Json<Value>, ApiError> {
290 let core_blocks: Vec<ThreadBlock> = block_requests.into_iter().map(|b| b.into_core()).collect();
291
292 validate_thread_blocks(&core_blocks).map_err(|e| ApiError::BadRequest(e.api_message()))?;
293
294 let block_ids: Vec<String> = {
295 let mut sorted = core_blocks.clone();
296 sorted.sort_by_key(|b| b.order);
297 sorted.iter().map(|b| b.id.clone()).collect()
298 };
299
300 let content = serialize_blocks_for_storage(&core_blocks);
301
302 let all_media: Vec<String> = {
304 let mut sorted = core_blocks.clone();
305 sorted.sort_by_key(|b| b.order);
306 sorted.iter().flat_map(|b| b.media_paths.clone()).collect()
307 };
308
309 let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
310
311 if approval_mode {
312 let media_json = serde_json::to_string(&all_media).unwrap_or_else(|_| "[]".to_string());
313 let prov_input = build_provenance_input(body.provenance.as_deref());
314
315 let id = approval_queue::enqueue_with_provenance_for(
316 &state.db,
317 &ctx.account_id,
318 "thread",
319 "",
320 "",
321 &content,
322 "",
323 "",
324 0.0,
325 &media_json,
326 None,
327 None,
328 prov_input.as_ref(),
329 )
330 .await?;
331
332 let _ = state.event_tx.send(AccountWsEvent {
333 account_id: ctx.account_id.clone(),
334 event: WsEvent::ApprovalQueued {
335 id,
336 action_type: "thread".to_string(),
337 content: content.clone(),
338 media_paths: all_media,
339 },
340 });
341
342 Ok(Json(json!({
343 "status": "queued_for_approval",
344 "id": id,
345 "block_ids": block_ids,
346 })))
347 } else if body.scheduled_for.is_some() {
348 let id = scheduled_content::insert_for(
350 &state.db,
351 &ctx.account_id,
352 "thread",
353 &content,
354 body.scheduled_for.as_deref(),
355 )
356 .await?;
357
358 let _ = state.event_tx.send(AccountWsEvent {
359 account_id: ctx.account_id.clone(),
360 event: WsEvent::ContentScheduled {
361 id,
362 content_type: "thread".to_string(),
363 scheduled_for: body.scheduled_for.clone(),
364 },
365 });
366
367 Ok(Json(json!({
368 "status": "scheduled",
369 "id": id,
370 "block_ids": block_ids,
371 })))
372 } else {
373 let can_post = super::can_post_for(state, &ctx.account_id).await;
375 if !can_post {
376 let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
377 let id = scheduled_content::insert_for(
378 &state.db,
379 &ctx.account_id,
380 "thread",
381 &content,
382 Some(&scheduled_for),
383 )
384 .await?;
385
386 let _ = state.event_tx.send(AccountWsEvent {
387 account_id: ctx.account_id.clone(),
388 event: WsEvent::ContentScheduled {
389 id,
390 content_type: "thread".to_string(),
391 scheduled_for: Some(scheduled_for),
392 },
393 });
394
395 return Ok(Json(json!({
396 "status": "scheduled",
397 "id": id,
398 "block_ids": block_ids,
399 })));
400 }
401
402 try_post_thread_now(state, ctx, &core_blocks).await
403 }
404}
405
406async fn persist_content(
408 state: &AppState,
409 ctx: &AccountContext,
410 body: &ComposeRequest,
411 content: &str,
412) -> Result<Json<Value>, ApiError> {
413 let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
414
415 if approval_mode {
416 let media_paths = body.media_paths.as_deref().unwrap_or(&[]);
417 let media_json = serde_json::to_string(media_paths).unwrap_or_else(|_| "[]".to_string());
418
419 let prov_input = build_provenance_input(body.provenance.as_deref());
420
421 let id = approval_queue::enqueue_with_provenance_for(
422 &state.db,
423 &ctx.account_id,
424 &body.content_type,
425 "",
426 "",
427 content,
428 "",
429 "",
430 0.0,
431 &media_json,
432 None,
433 None,
434 prov_input.as_ref(),
435 )
436 .await?;
437
438 let _ = state.event_tx.send(AccountWsEvent {
439 account_id: ctx.account_id.clone(),
440 event: WsEvent::ApprovalQueued {
441 id,
442 action_type: body.content_type.clone(),
443 content: content.to_string(),
444 media_paths: media_paths.to_vec(),
445 },
446 });
447
448 Ok(Json(json!({
449 "status": "queued_for_approval",
450 "id": id,
451 })))
452 } else if body.scheduled_for.is_some() {
453 let id = scheduled_content::insert_for(
455 &state.db,
456 &ctx.account_id,
457 &body.content_type,
458 content,
459 body.scheduled_for.as_deref(),
460 )
461 .await?;
462
463 let _ = state.event_tx.send(AccountWsEvent {
464 account_id: ctx.account_id.clone(),
465 event: WsEvent::ContentScheduled {
466 id,
467 content_type: body.content_type.clone(),
468 scheduled_for: body.scheduled_for.clone(),
469 },
470 });
471
472 Ok(Json(json!({
473 "status": "scheduled",
474 "id": id,
475 })))
476 } else {
477 let can_post = super::can_post_for(state, &ctx.account_id).await;
480 if !can_post {
481 let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
482 let id = scheduled_content::insert_for(
483 &state.db,
484 &ctx.account_id,
485 &body.content_type,
486 content,
487 Some(&scheduled_for),
488 )
489 .await?;
490
491 let _ = state.event_tx.send(AccountWsEvent {
492 account_id: ctx.account_id.clone(),
493 event: WsEvent::ContentScheduled {
494 id,
495 content_type: body.content_type.clone(),
496 scheduled_for: Some(scheduled_for),
497 },
498 });
499
500 return Ok(Json(json!({
501 "status": "scheduled",
502 "id": id,
503 })));
504 }
505
506 try_post_now(state, ctx, &body.content_type, content).await
507 }
508}
509
510async fn build_x_client(
515 state: &AppState,
516 ctx: &AccountContext,
517) -> Result<Box<dyn XApiClient>, ApiError> {
518 let config = super::read_effective_config(state, &ctx.account_id).await?;
519
520 match config.x_api.provider_backend.as_str() {
521 "scraper" => {
522 let account_data =
523 tuitbot_core::storage::accounts::account_data_dir(&state.data_dir, &ctx.account_id);
524 let client = tuitbot_core::x_api::LocalModeXClient::with_session(
525 config.x_api.scraper_allow_mutations,
526 &account_data,
527 )
528 .await;
529 Ok(Box::new(client))
530 }
531 "x_api" => {
532 let token_path = tuitbot_core::storage::accounts::account_token_path(
533 &state.data_dir,
534 &ctx.account_id,
535 );
536 let access_token = state
537 .get_x_access_token(&token_path, &ctx.account_id)
538 .await
539 .map_err(|e| {
540 ApiError::BadRequest(format!(
541 "X API authentication failed — re-link your account in Settings. ({e})"
542 ))
543 })?;
544 Ok(Box::new(XApiHttpClient::new(access_token)))
545 }
546 _ => Err(ApiError::BadRequest(
547 "Direct posting requires X API credentials or a browser session. \
548 Configure in Settings → X API."
549 .to_string(),
550 )),
551 }
552}
553
554async fn try_post_now(
556 state: &AppState,
557 ctx: &AccountContext,
558 content_type: &str,
559 content: &str,
560) -> Result<Json<Value>, ApiError> {
561 let client = build_x_client(state, ctx).await?;
562
563 let posted = client
564 .post_tweet(content)
565 .await
566 .map_err(|e| ApiError::Internal(format!("Failed to post tweet: {e}")))?;
567
568 let metadata = json!({
569 "tweet_id": posted.id,
570 "content_type": content_type,
571 "source": "compose",
572 });
573 let _ = action_log::log_action_for(
574 &state.db,
575 &ctx.account_id,
576 "tweet_posted",
577 "success",
578 Some(&format!("Posted tweet {}", posted.id)),
579 Some(&metadata.to_string()),
580 )
581 .await;
582
583 Ok(Json(json!({
584 "status": "posted",
585 "tweet_id": posted.id,
586 })))
587}
588
589async fn try_post_thread_now(
592 state: &AppState,
593 ctx: &AccountContext,
594 blocks: &[ThreadBlock],
595) -> Result<Json<Value>, ApiError> {
596 let client = build_x_client(state, ctx).await?;
597
598 let mut sorted: Vec<&ThreadBlock> = blocks.iter().collect();
599 sorted.sort_by_key(|b| b.order);
600
601 let mut tweet_ids: Vec<String> = Vec::with_capacity(sorted.len());
602
603 for (i, block) in sorted.iter().enumerate() {
604 let posted = if i == 0 {
605 client.post_tweet(&block.text).await
606 } else {
607 client.reply_to_tweet(&block.text, &tweet_ids[i - 1]).await
608 };
609
610 match posted {
611 Ok(p) => tweet_ids.push(p.id),
612 Err(e) => {
613 let metadata = json!({
615 "posted_tweet_ids": tweet_ids,
616 "failed_at_index": i,
617 "error": e.to_string(),
618 "source": "compose",
619 });
620 let _ = action_log::log_action_for(
621 &state.db,
622 &ctx.account_id,
623 "thread_posted",
624 "partial_failure",
625 Some(&format!(
626 "Thread failed at tweet {}/{}: {e}",
627 i + 1,
628 sorted.len()
629 )),
630 Some(&metadata.to_string()),
631 )
632 .await;
633
634 return Err(ApiError::Internal(format!(
635 "Thread failed at tweet {}/{}: {e}. \
636 {} tweet(s) were posted and cannot be undone.",
637 i + 1,
638 sorted.len(),
639 tweet_ids.len()
640 )));
641 }
642 }
643 }
644
645 let metadata = json!({
646 "tweet_ids": tweet_ids,
647 "content_type": "thread",
648 "source": "compose",
649 });
650 let _ = action_log::log_action_for(
651 &state.db,
652 &ctx.account_id,
653 "thread_posted",
654 "success",
655 Some(&format!("Posted thread ({} tweets)", tweet_ids.len())),
656 Some(&metadata.to_string()),
657 )
658 .await;
659
660 Ok(Json(json!({
661 "status": "posted",
662 "tweet_ids": tweet_ids,
663 })))
664}
665
666fn build_provenance_input(
668 provenance: Option<&[ProvenanceRef]>,
669) -> Option<approval_queue::ProvenanceInput> {
670 let refs = provenance?;
671 if refs.is_empty() {
672 return None;
673 }
674
675 let source_node_id = refs.iter().find_map(|r| r.node_id);
676 let source_seed_id = refs.iter().find_map(|r| r.seed_id);
677 let source_chunks_json = serde_json::to_string(refs).unwrap_or_else(|_| "[]".to_string());
678
679 Some(approval_queue::ProvenanceInput {
680 source_node_id,
681 source_seed_id,
682 source_chunks_json,
683 refs: refs.to_vec(),
684 })
685}