1use std::sync::Arc;
4
5use axum::extract::State;
6use axum::Json;
7use serde::Deserialize;
8use serde_json::{json, Value};
9use tuitbot_core::content::{
10 serialize_blocks_for_storage, tweet_weighted_len, validate_thread_blocks, ThreadBlock,
11 MAX_TWEET_CHARS,
12};
13use tuitbot_core::storage::provenance::ProvenanceRef;
14use tuitbot_core::storage::{action_log, approval_queue, scheduled_content};
15use tuitbot_core::x_api::{XApiClient, XApiHttpClient};
16
17use crate::account::{require_mutate, AccountContext};
18use crate::error::ApiError;
19use crate::state::AppState;
20use crate::ws::{AccountWsEvent, WsEvent};
21
22use super::read_approval_mode;
23
24#[derive(Debug, Deserialize)]
26pub struct ThreadBlockRequest {
27 pub id: String,
29 pub text: String,
31 #[serde(default)]
33 pub media_paths: Vec<String>,
34 pub order: u32,
36}
37
38impl ThreadBlockRequest {
39 pub(crate) fn into_core(self) -> ThreadBlock {
41 ThreadBlock {
42 id: self.id,
43 text: self.text,
44 media_paths: self.media_paths,
45 order: self.order,
46 }
47 }
48}
49
50#[derive(Deserialize)]
52pub struct ComposeTweetRequest {
53 pub text: String,
55 pub scheduled_for: Option<String>,
57 #[serde(default)]
59 pub provenance: Option<Vec<ProvenanceRef>>,
60}
61
62pub async fn compose_tweet(
64 State(state): State<Arc<AppState>>,
65 ctx: AccountContext,
66 Json(body): Json<ComposeTweetRequest>,
67) -> Result<Json<Value>, ApiError> {
68 require_mutate(&ctx)?;
69
70 let text = body.text.trim();
71 if text.is_empty() {
72 return Err(ApiError::BadRequest("text is required".to_string()));
73 }
74
75 let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
77
78 if approval_mode {
79 let prov_input = build_provenance_input(body.provenance.as_deref());
80
81 let id = approval_queue::enqueue_with_provenance_for(
82 &state.db,
83 &ctx.account_id,
84 "tweet",
85 "", "", text,
88 "", "", 0.0,
91 "[]",
92 None,
93 None,
94 prov_input.as_ref(),
95 body.scheduled_for.as_deref(),
96 )
97 .await?;
98
99 let _ = state.event_tx.send(AccountWsEvent {
100 account_id: ctx.account_id.clone(),
101 event: WsEvent::ApprovalQueued {
102 id,
103 action_type: "tweet".to_string(),
104 content: text.to_string(),
105 media_paths: vec![],
106 },
107 });
108
109 Ok(Json(json!({
110 "status": "queued_for_approval",
111 "id": id,
112 "scheduled_for": body.scheduled_for,
113 })))
114 } else {
115 Ok(Json(json!({
117 "status": "accepted",
118 "text": text,
119 "scheduled_for": body.scheduled_for,
120 })))
121 }
122}
123
124#[derive(Deserialize)]
126pub struct ComposeThreadRequest {
127 pub tweets: Vec<String>,
129 pub scheduled_for: Option<String>,
131}
132
133pub async fn compose_thread(
135 State(state): State<Arc<AppState>>,
136 ctx: AccountContext,
137 Json(body): Json<ComposeThreadRequest>,
138) -> Result<Json<Value>, ApiError> {
139 require_mutate(&ctx)?;
140
141 if body.tweets.is_empty() {
142 return Err(ApiError::BadRequest(
143 "tweets array must not be empty".to_string(),
144 ));
145 }
146
147 let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
148 let combined = body.tweets.join("\n---\n");
149
150 if approval_mode {
151 let id = approval_queue::enqueue_with_context_for(
152 &state.db,
153 &ctx.account_id,
154 "thread",
155 "",
156 "",
157 &combined,
158 "",
159 "",
160 0.0,
161 "[]",
162 None,
163 None,
164 body.scheduled_for.as_deref(),
165 )
166 .await?;
167
168 let _ = state.event_tx.send(AccountWsEvent {
169 account_id: ctx.account_id.clone(),
170 event: WsEvent::ApprovalQueued {
171 id,
172 action_type: "thread".to_string(),
173 content: combined,
174 media_paths: vec![],
175 },
176 });
177
178 Ok(Json(json!({
179 "status": "queued_for_approval",
180 "id": id,
181 "scheduled_for": body.scheduled_for,
182 })))
183 } else {
184 Ok(Json(json!({
185 "status": "accepted",
186 "tweet_count": body.tweets.len(),
187 "scheduled_for": body.scheduled_for,
188 })))
189 }
190}
191
192#[derive(Deserialize)]
194pub struct ComposeRequest {
195 pub content_type: String,
197 pub content: String,
199 pub scheduled_for: Option<String>,
201 #[serde(default)]
203 pub media_paths: Option<Vec<String>>,
204 #[serde(default)]
206 pub blocks: Option<Vec<ThreadBlockRequest>>,
207 #[serde(default)]
209 pub provenance: Option<Vec<ProvenanceRef>>,
210}
211
212pub async fn compose(
214 State(state): State<Arc<AppState>>,
215 ctx: AccountContext,
216 Json(mut body): Json<ComposeRequest>,
217) -> Result<Json<Value>, ApiError> {
218 require_mutate(&ctx)?;
219
220 let blocks = body.blocks.take();
221
222 match body.content_type.as_str() {
223 "tweet" => compose_tweet_flow(&state, &ctx, &body).await,
224 "thread" => {
225 if let Some(blocks) = blocks {
226 compose_thread_blocks_flow(&state, &ctx, &body, blocks).await
227 } else {
228 compose_thread_legacy_flow(&state, &ctx, &body).await
229 }
230 }
231 _ => Err(ApiError::BadRequest(
232 "content_type must be 'tweet' or 'thread'".to_string(),
233 )),
234 }
235}
236
237async fn compose_tweet_flow(
239 state: &AppState,
240 ctx: &AccountContext,
241 body: &ComposeRequest,
242) -> Result<Json<Value>, ApiError> {
243 let content = body.content.trim().to_string();
244 if content.is_empty() {
245 return Err(ApiError::BadRequest("content is required".to_string()));
246 }
247 if tweet_weighted_len(&content) > MAX_TWEET_CHARS {
248 return Err(ApiError::BadRequest(
249 "tweet content must not exceed 280 characters".to_string(),
250 ));
251 }
252
253 persist_content(state, ctx, body, &content).await
254}
255
256async fn compose_thread_legacy_flow(
258 state: &AppState,
259 ctx: &AccountContext,
260 body: &ComposeRequest,
261) -> Result<Json<Value>, ApiError> {
262 let content = body.content.trim().to_string();
263 if content.is_empty() {
264 return Err(ApiError::BadRequest("content is required".to_string()));
265 }
266
267 let tweets: Vec<String> = serde_json::from_str(&content).map_err(|_| {
268 ApiError::BadRequest("thread content must be a JSON array of strings".to_string())
269 })?;
270
271 if tweets.is_empty() {
272 return Err(ApiError::BadRequest(
273 "thread must contain at least one tweet".to_string(),
274 ));
275 }
276
277 for (i, tweet) in tweets.iter().enumerate() {
278 if tweet_weighted_len(tweet) > MAX_TWEET_CHARS {
279 return Err(ApiError::BadRequest(format!(
280 "tweet {} exceeds 280 characters",
281 i + 1
282 )));
283 }
284 }
285
286 persist_content(state, ctx, body, &content).await
287}
288
289async fn compose_thread_blocks_flow(
291 state: &AppState,
292 ctx: &AccountContext,
293 body: &ComposeRequest,
294 block_requests: Vec<ThreadBlockRequest>,
295) -> Result<Json<Value>, ApiError> {
296 let core_blocks: Vec<ThreadBlock> = block_requests.into_iter().map(|b| b.into_core()).collect();
297
298 validate_thread_blocks(&core_blocks).map_err(|e| ApiError::BadRequest(e.api_message()))?;
299
300 let block_ids: Vec<String> = {
301 let mut sorted = core_blocks.clone();
302 sorted.sort_by_key(|b| b.order);
303 sorted.iter().map(|b| b.id.clone()).collect()
304 };
305
306 let content = serialize_blocks_for_storage(&core_blocks);
307
308 let all_media: Vec<String> = {
310 let mut sorted = core_blocks.clone();
311 sorted.sort_by_key(|b| b.order);
312 sorted.iter().flat_map(|b| b.media_paths.clone()).collect()
313 };
314
315 let normalized_schedule = match &body.scheduled_for {
317 Some(raw) => Some(
318 tuitbot_core::scheduling::validate_and_normalize(
319 raw,
320 tuitbot_core::scheduling::DEFAULT_GRACE_SECONDS,
321 )
322 .map_err(|e| ApiError::BadRequest(e.to_string()))?,
323 ),
324 None => None,
325 };
326
327 let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
328
329 if approval_mode {
330 let media_json = serde_json::to_string(&all_media).unwrap_or_else(|_| "[]".to_string());
331 let prov_input = build_provenance_input(body.provenance.as_deref());
332
333 let id = approval_queue::enqueue_with_provenance_for(
334 &state.db,
335 &ctx.account_id,
336 "thread",
337 "",
338 "",
339 &content,
340 "",
341 "",
342 0.0,
343 &media_json,
344 None,
345 None,
346 prov_input.as_ref(),
347 normalized_schedule.as_deref(),
348 )
349 .await?;
350
351 let _ = state.event_tx.send(AccountWsEvent {
352 account_id: ctx.account_id.clone(),
353 event: WsEvent::ApprovalQueued {
354 id,
355 action_type: "thread".to_string(),
356 content: content.clone(),
357 media_paths: all_media,
358 },
359 });
360
361 Ok(Json(json!({
362 "status": "queued_for_approval",
363 "id": id,
364 "block_ids": block_ids,
365 "scheduled_for": normalized_schedule,
366 })))
367 } else if let Some(ref normalized) = normalized_schedule {
368 let id = scheduled_content::insert_for(
370 &state.db,
371 &ctx.account_id,
372 "thread",
373 &content,
374 Some(normalized),
375 )
376 .await?;
377
378 let _ = state.event_tx.send(AccountWsEvent {
379 account_id: ctx.account_id.clone(),
380 event: WsEvent::ContentScheduled {
381 id,
382 content_type: "thread".to_string(),
383 scheduled_for: Some(normalized.clone()),
384 },
385 });
386
387 Ok(Json(json!({
388 "status": "scheduled",
389 "id": id,
390 "block_ids": block_ids,
391 })))
392 } else {
393 let can_post = super::can_post_for(state, &ctx.account_id).await;
395 if !can_post {
396 let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
397 let id = scheduled_content::insert_for(
398 &state.db,
399 &ctx.account_id,
400 "thread",
401 &content,
402 Some(&scheduled_for),
403 )
404 .await?;
405
406 let _ = state.event_tx.send(AccountWsEvent {
407 account_id: ctx.account_id.clone(),
408 event: WsEvent::ContentScheduled {
409 id,
410 content_type: "thread".to_string(),
411 scheduled_for: Some(scheduled_for),
412 },
413 });
414
415 return Ok(Json(json!({
416 "status": "scheduled",
417 "id": id,
418 "block_ids": block_ids,
419 })));
420 }
421
422 try_post_thread_now(state, ctx, &core_blocks).await
423 }
424}
425
426async fn persist_content(
428 state: &AppState,
429 ctx: &AccountContext,
430 body: &ComposeRequest,
431 content: &str,
432) -> Result<Json<Value>, ApiError> {
433 let normalized_schedule = match &body.scheduled_for {
435 Some(raw) => Some(
436 tuitbot_core::scheduling::validate_and_normalize(
437 raw,
438 tuitbot_core::scheduling::DEFAULT_GRACE_SECONDS,
439 )
440 .map_err(|e| ApiError::BadRequest(e.to_string()))?,
441 ),
442 None => None,
443 };
444
445 let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
446
447 if approval_mode {
448 let media_paths = body.media_paths.as_deref().unwrap_or(&[]);
449 let media_json = serde_json::to_string(media_paths).unwrap_or_else(|_| "[]".to_string());
450
451 let prov_input = build_provenance_input(body.provenance.as_deref());
452
453 let id = approval_queue::enqueue_with_provenance_for(
454 &state.db,
455 &ctx.account_id,
456 &body.content_type,
457 "",
458 "",
459 content,
460 "",
461 "",
462 0.0,
463 &media_json,
464 None,
465 None,
466 prov_input.as_ref(),
467 normalized_schedule.as_deref(),
468 )
469 .await?;
470
471 let _ = state.event_tx.send(AccountWsEvent {
472 account_id: ctx.account_id.clone(),
473 event: WsEvent::ApprovalQueued {
474 id,
475 action_type: body.content_type.clone(),
476 content: content.to_string(),
477 media_paths: media_paths.to_vec(),
478 },
479 });
480
481 Ok(Json(json!({
482 "status": "queued_for_approval",
483 "id": id,
484 "scheduled_for": normalized_schedule,
485 })))
486 } else if let Some(ref normalized) = normalized_schedule {
487 let id = scheduled_content::insert_for(
489 &state.db,
490 &ctx.account_id,
491 &body.content_type,
492 content,
493 Some(normalized),
494 )
495 .await?;
496
497 let _ = state.event_tx.send(AccountWsEvent {
498 account_id: ctx.account_id.clone(),
499 event: WsEvent::ContentScheduled {
500 id,
501 content_type: body.content_type.clone(),
502 scheduled_for: Some(normalized.clone()),
503 },
504 });
505
506 Ok(Json(json!({
507 "status": "scheduled",
508 "id": id,
509 })))
510 } else {
511 let can_post = super::can_post_for(state, &ctx.account_id).await;
514 if !can_post {
515 let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
516 let id = scheduled_content::insert_for(
517 &state.db,
518 &ctx.account_id,
519 &body.content_type,
520 content,
521 Some(&scheduled_for),
522 )
523 .await?;
524
525 let _ = state.event_tx.send(AccountWsEvent {
526 account_id: ctx.account_id.clone(),
527 event: WsEvent::ContentScheduled {
528 id,
529 content_type: body.content_type.clone(),
530 scheduled_for: Some(scheduled_for),
531 },
532 });
533
534 return Ok(Json(json!({
535 "status": "scheduled",
536 "id": id,
537 })));
538 }
539
540 try_post_now(state, ctx, &body.content_type, content).await
541 }
542}
543
544async fn build_x_client(
549 state: &AppState,
550 ctx: &AccountContext,
551) -> Result<Box<dyn XApiClient>, ApiError> {
552 let config = super::read_effective_config(state, &ctx.account_id).await?;
553
554 match config.x_api.provider_backend.as_str() {
555 "scraper" => {
556 let account_data =
557 tuitbot_core::storage::accounts::account_data_dir(&state.data_dir, &ctx.account_id);
558 let client = tuitbot_core::x_api::LocalModeXClient::with_session(
559 config.x_api.scraper_allow_mutations,
560 &account_data,
561 )
562 .await;
563 Ok(Box::new(client))
564 }
565 "x_api" => {
566 let token_path = tuitbot_core::storage::accounts::account_token_path(
567 &state.data_dir,
568 &ctx.account_id,
569 );
570 let access_token = state
571 .get_x_access_token(&token_path, &ctx.account_id)
572 .await
573 .map_err(|e| {
574 ApiError::BadRequest(format!(
575 "X API authentication failed — re-link your account in Settings. ({e})"
576 ))
577 })?;
578 Ok(Box::new(XApiHttpClient::new(access_token)))
579 }
580 _ => Err(ApiError::BadRequest(
581 "Direct posting requires X API credentials or a browser session. \
582 Configure in Settings → X API."
583 .to_string(),
584 )),
585 }
586}
587
588async fn try_post_now(
590 state: &AppState,
591 ctx: &AccountContext,
592 content_type: &str,
593 content: &str,
594) -> Result<Json<Value>, ApiError> {
595 let client = build_x_client(state, ctx).await?;
596
597 let posted = client
598 .post_tweet(content)
599 .await
600 .map_err(|e| ApiError::Internal(format!("Failed to post tweet: {e}")))?;
601
602 let metadata = json!({
603 "tweet_id": posted.id,
604 "content_type": content_type,
605 "source": "compose",
606 });
607 let _ = action_log::log_action_for(
608 &state.db,
609 &ctx.account_id,
610 "tweet_posted",
611 "success",
612 Some(&format!("Posted tweet {}", posted.id)),
613 Some(&metadata.to_string()),
614 )
615 .await;
616
617 Ok(Json(json!({
618 "status": "posted",
619 "tweet_id": posted.id,
620 })))
621}
622
623async fn try_post_thread_now(
626 state: &AppState,
627 ctx: &AccountContext,
628 blocks: &[ThreadBlock],
629) -> Result<Json<Value>, ApiError> {
630 let client = build_x_client(state, ctx).await?;
631
632 let mut sorted: Vec<&ThreadBlock> = blocks.iter().collect();
633 sorted.sort_by_key(|b| b.order);
634
635 let mut tweet_ids: Vec<String> = Vec::with_capacity(sorted.len());
636
637 for (i, block) in sorted.iter().enumerate() {
638 let posted = if i == 0 {
639 client.post_tweet(&block.text).await
640 } else {
641 client.reply_to_tweet(&block.text, &tweet_ids[i - 1]).await
642 };
643
644 match posted {
645 Ok(p) => tweet_ids.push(p.id),
646 Err(e) => {
647 let metadata = json!({
649 "posted_tweet_ids": tweet_ids,
650 "failed_at_index": i,
651 "error": e.to_string(),
652 "source": "compose",
653 });
654 let _ = action_log::log_action_for(
655 &state.db,
656 &ctx.account_id,
657 "thread_posted",
658 "partial_failure",
659 Some(&format!(
660 "Thread failed at tweet {}/{}: {e}",
661 i + 1,
662 sorted.len()
663 )),
664 Some(&metadata.to_string()),
665 )
666 .await;
667
668 return Err(ApiError::Internal(format!(
669 "Thread failed at tweet {}/{}: {e}. \
670 {} tweet(s) were posted and cannot be undone.",
671 i + 1,
672 sorted.len(),
673 tweet_ids.len()
674 )));
675 }
676 }
677 }
678
679 let metadata = json!({
680 "tweet_ids": tweet_ids,
681 "content_type": "thread",
682 "source": "compose",
683 });
684 let _ = action_log::log_action_for(
685 &state.db,
686 &ctx.account_id,
687 "thread_posted",
688 "success",
689 Some(&format!("Posted thread ({} tweets)", tweet_ids.len())),
690 Some(&metadata.to_string()),
691 )
692 .await;
693
694 Ok(Json(json!({
695 "status": "posted",
696 "tweet_ids": tweet_ids,
697 })))
698}
699
700fn build_provenance_input(
702 provenance: Option<&[ProvenanceRef]>,
703) -> Option<approval_queue::ProvenanceInput> {
704 let refs = provenance?;
705 if refs.is_empty() {
706 return None;
707 }
708
709 let source_node_id = refs.iter().find_map(|r| r.node_id);
710 let source_seed_id = refs.iter().find_map(|r| r.seed_id);
711 let source_chunks_json = serde_json::to_string(refs).unwrap_or_else(|_| "[]".to_string());
712
713 Some(approval_queue::ProvenanceInput {
714 source_node_id,
715 source_seed_id,
716 source_chunks_json,
717 refs: refs.to_vec(),
718 })
719}