Skip to main content

tuitbot_server/routes/content/
compose.rs

1//! Compose endpoints for tweets, threads, and unified compose.
2
3use std::sync::Arc;
4
5use axum::extract::State;
6use axum::Json;
7use serde::Deserialize;
8use serde_json::{json, Value};
9use tuitbot_core::content::{
10    serialize_blocks_for_storage, tweet_weighted_len, validate_thread_blocks, ThreadBlock,
11    MAX_TWEET_CHARS,
12};
13use tuitbot_core::storage::provenance::ProvenanceRef;
14use tuitbot_core::storage::{action_log, approval_queue, scheduled_content};
15use tuitbot_core::x_api::{XApiClient, XApiHttpClient};
16
17use crate::account::{require_mutate, AccountContext};
18use crate::error::ApiError;
19use crate::state::AppState;
20use crate::ws::{AccountWsEvent, WsEvent};
21
22use super::read_approval_mode;
23
24/// A single thread block in an API request payload.
25#[derive(Debug, Deserialize)]
26pub struct ThreadBlockRequest {
27    /// Client-generated stable UUID.
28    pub id: String,
29    /// Tweet text content.
30    pub text: String,
31    /// Per-block media file paths.
32    #[serde(default)]
33    pub media_paths: Vec<String>,
34    /// Zero-based ordering index.
35    pub order: u32,
36}
37
38impl ThreadBlockRequest {
39    /// Convert to the core domain type.
40    pub(crate) fn into_core(self) -> ThreadBlock {
41        ThreadBlock {
42            id: self.id,
43            text: self.text,
44            media_paths: self.media_paths,
45            order: self.order,
46        }
47    }
48}
49
50/// Request body for composing a manual tweet.
51#[derive(Deserialize)]
52pub struct ComposeTweetRequest {
53    /// The tweet text.
54    pub text: String,
55    /// Optional ISO 8601 timestamp to schedule the tweet.
56    pub scheduled_for: Option<String>,
57    /// Optional provenance refs linking this content to vault source material.
58    #[serde(default)]
59    pub provenance: Option<Vec<ProvenanceRef>>,
60}
61
62/// `POST /api/content/tweets` — compose and queue a manual tweet.
63pub async fn compose_tweet(
64    State(state): State<Arc<AppState>>,
65    ctx: AccountContext,
66    Json(body): Json<ComposeTweetRequest>,
67) -> Result<Json<Value>, ApiError> {
68    require_mutate(&ctx)?;
69
70    let text = body.text.trim();
71    if text.is_empty() {
72        return Err(ApiError::BadRequest("text is required".to_string()));
73    }
74
75    // Check if approval mode is enabled.
76    let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
77
78    if approval_mode {
79        let prov_input = build_provenance_input(body.provenance.as_deref());
80
81        let id = approval_queue::enqueue_with_provenance_for(
82            &state.db,
83            &ctx.account_id,
84            "tweet",
85            "", // no target tweet
86            "", // no target author
87            text,
88            "", // no topic
89            "", // no archetype
90            0.0,
91            "[]",
92            None,
93            None,
94            prov_input.as_ref(),
95            body.scheduled_for.as_deref(),
96        )
97        .await?;
98
99        let _ = state.event_tx.send(AccountWsEvent {
100            account_id: ctx.account_id.clone(),
101            event: WsEvent::ApprovalQueued {
102                id,
103                action_type: "tweet".to_string(),
104                content: text.to_string(),
105                media_paths: vec![],
106            },
107        });
108
109        Ok(Json(json!({
110            "status": "queued_for_approval",
111            "id": id,
112            "scheduled_for": body.scheduled_for,
113        })))
114    } else {
115        // Without X API client in AppState, we can only acknowledge the intent.
116        Ok(Json(json!({
117            "status": "accepted",
118            "text": text,
119            "scheduled_for": body.scheduled_for,
120        })))
121    }
122}
123
124/// Request body for composing a manual thread.
125#[derive(Deserialize)]
126pub struct ComposeThreadRequest {
127    /// The tweets forming the thread.
128    pub tweets: Vec<String>,
129    /// Optional ISO 8601 timestamp to schedule the thread.
130    pub scheduled_for: Option<String>,
131}
132
133/// `POST /api/content/threads` — compose and queue a manual thread.
134pub async fn compose_thread(
135    State(state): State<Arc<AppState>>,
136    ctx: AccountContext,
137    Json(body): Json<ComposeThreadRequest>,
138) -> Result<Json<Value>, ApiError> {
139    require_mutate(&ctx)?;
140
141    if body.tweets.is_empty() {
142        return Err(ApiError::BadRequest(
143            "tweets array must not be empty".to_string(),
144        ));
145    }
146
147    let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
148    let combined = body.tweets.join("\n---\n");
149
150    if approval_mode {
151        let id = approval_queue::enqueue_with_context_for(
152            &state.db,
153            &ctx.account_id,
154            "thread",
155            "",
156            "",
157            &combined,
158            "",
159            "",
160            0.0,
161            "[]",
162            None,
163            None,
164            body.scheduled_for.as_deref(),
165        )
166        .await?;
167
168        let _ = state.event_tx.send(AccountWsEvent {
169            account_id: ctx.account_id.clone(),
170            event: WsEvent::ApprovalQueued {
171                id,
172                action_type: "thread".to_string(),
173                content: combined,
174                media_paths: vec![],
175            },
176        });
177
178        Ok(Json(json!({
179            "status": "queued_for_approval",
180            "id": id,
181            "scheduled_for": body.scheduled_for,
182        })))
183    } else {
184        Ok(Json(json!({
185            "status": "accepted",
186            "tweet_count": body.tweets.len(),
187            "scheduled_for": body.scheduled_for,
188        })))
189    }
190}
191
192/// Request body for the unified compose endpoint.
193#[derive(Deserialize)]
194pub struct ComposeRequest {
195    /// Content type: "tweet" or "thread".
196    pub content_type: String,
197    /// Content text (string for tweet, JSON array string for thread).
198    pub content: String,
199    /// Optional ISO 8601 timestamp to schedule the content.
200    pub scheduled_for: Option<String>,
201    /// Optional local media file paths to attach (top-level, used for tweets).
202    #[serde(default)]
203    pub media_paths: Option<Vec<String>>,
204    /// Optional structured thread blocks. Takes precedence over `content` for threads.
205    #[serde(default)]
206    pub blocks: Option<Vec<ThreadBlockRequest>>,
207    /// Optional provenance refs linking this content to vault source material.
208    #[serde(default)]
209    pub provenance: Option<Vec<ProvenanceRef>>,
210}
211
212/// `POST /api/content/compose` — compose manual content (tweet or thread).
213pub async fn compose(
214    State(state): State<Arc<AppState>>,
215    ctx: AccountContext,
216    Json(mut body): Json<ComposeRequest>,
217) -> Result<Json<Value>, ApiError> {
218    require_mutate(&ctx)?;
219
220    let blocks = body.blocks.take();
221
222    match body.content_type.as_str() {
223        "tweet" => compose_tweet_flow(&state, &ctx, &body).await,
224        "thread" => {
225            if let Some(blocks) = blocks {
226                compose_thread_blocks_flow(&state, &ctx, &body, blocks).await
227            } else {
228                compose_thread_legacy_flow(&state, &ctx, &body).await
229            }
230        }
231        _ => Err(ApiError::BadRequest(
232            "content_type must be 'tweet' or 'thread'".to_string(),
233        )),
234    }
235}
236
237/// Handle tweet compose via the unified endpoint.
238async fn compose_tweet_flow(
239    state: &AppState,
240    ctx: &AccountContext,
241    body: &ComposeRequest,
242) -> Result<Json<Value>, ApiError> {
243    let content = body.content.trim().to_string();
244    if content.is_empty() {
245        return Err(ApiError::BadRequest("content is required".to_string()));
246    }
247    if tweet_weighted_len(&content) > MAX_TWEET_CHARS {
248        return Err(ApiError::BadRequest(
249            "tweet content must not exceed 280 characters".to_string(),
250        ));
251    }
252
253    persist_content(state, ctx, body, &content).await
254}
255
256/// Handle legacy thread compose (content as JSON array of strings).
257async fn compose_thread_legacy_flow(
258    state: &AppState,
259    ctx: &AccountContext,
260    body: &ComposeRequest,
261) -> Result<Json<Value>, ApiError> {
262    let content = body.content.trim().to_string();
263    if content.is_empty() {
264        return Err(ApiError::BadRequest("content is required".to_string()));
265    }
266
267    let tweets: Vec<String> = serde_json::from_str(&content).map_err(|_| {
268        ApiError::BadRequest("thread content must be a JSON array of strings".to_string())
269    })?;
270
271    if tweets.is_empty() {
272        return Err(ApiError::BadRequest(
273            "thread must contain at least one tweet".to_string(),
274        ));
275    }
276
277    for (i, tweet) in tweets.iter().enumerate() {
278        if tweet_weighted_len(tweet) > MAX_TWEET_CHARS {
279            return Err(ApiError::BadRequest(format!(
280                "tweet {} exceeds 280 characters",
281                i + 1
282            )));
283        }
284    }
285
286    persist_content(state, ctx, body, &content).await
287}
288
289/// Handle structured thread blocks compose.
290async fn compose_thread_blocks_flow(
291    state: &AppState,
292    ctx: &AccountContext,
293    body: &ComposeRequest,
294    block_requests: Vec<ThreadBlockRequest>,
295) -> Result<Json<Value>, ApiError> {
296    let core_blocks: Vec<ThreadBlock> = block_requests.into_iter().map(|b| b.into_core()).collect();
297
298    validate_thread_blocks(&core_blocks).map_err(|e| ApiError::BadRequest(e.api_message()))?;
299
300    let block_ids: Vec<String> = {
301        let mut sorted = core_blocks.clone();
302        sorted.sort_by_key(|b| b.order);
303        sorted.iter().map(|b| b.id.clone()).collect()
304    };
305
306    let content = serialize_blocks_for_storage(&core_blocks);
307
308    // Collect per-block media into a flat list for approval queue storage.
309    let all_media: Vec<String> = {
310        let mut sorted = core_blocks.clone();
311        sorted.sort_by_key(|b| b.order);
312        sorted.iter().flat_map(|b| b.media_paths.clone()).collect()
313    };
314
315    // Validate scheduled_for early, before any branching logic
316    let normalized_schedule = match &body.scheduled_for {
317        Some(raw) => Some(
318            tuitbot_core::scheduling::validate_and_normalize(
319                raw,
320                tuitbot_core::scheduling::DEFAULT_GRACE_SECONDS,
321            )
322            .map_err(|e| ApiError::BadRequest(e.to_string()))?,
323        ),
324        None => None,
325    };
326
327    let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
328
329    if approval_mode {
330        let media_json = serde_json::to_string(&all_media).unwrap_or_else(|_| "[]".to_string());
331        let prov_input = build_provenance_input(body.provenance.as_deref());
332
333        let id = approval_queue::enqueue_with_provenance_for(
334            &state.db,
335            &ctx.account_id,
336            "thread",
337            "",
338            "",
339            &content,
340            "",
341            "",
342            0.0,
343            &media_json,
344            None,
345            None,
346            prov_input.as_ref(),
347            normalized_schedule.as_deref(),
348        )
349        .await?;
350
351        let _ = state.event_tx.send(AccountWsEvent {
352            account_id: ctx.account_id.clone(),
353            event: WsEvent::ApprovalQueued {
354                id,
355                action_type: "thread".to_string(),
356                content: content.clone(),
357                media_paths: all_media,
358            },
359        });
360
361        Ok(Json(json!({
362            "status": "queued_for_approval",
363            "id": id,
364            "block_ids": block_ids,
365            "scheduled_for": normalized_schedule,
366        })))
367    } else if let Some(ref normalized) = normalized_schedule {
368        // User explicitly chose a future time — already validated above.
369        let id = scheduled_content::insert_for(
370            &state.db,
371            &ctx.account_id,
372            "thread",
373            &content,
374            Some(normalized),
375        )
376        .await?;
377
378        let _ = state.event_tx.send(AccountWsEvent {
379            account_id: ctx.account_id.clone(),
380            event: WsEvent::ContentScheduled {
381                id,
382                content_type: "thread".to_string(),
383                scheduled_for: Some(normalized.clone()),
384            },
385        });
386
387        Ok(Json(json!({
388            "status": "scheduled",
389            "id": id,
390            "block_ids": block_ids,
391        })))
392    } else {
393        // Immediate publish — try posting as a reply chain.
394        let can_post = super::can_post_for(state, &ctx.account_id).await;
395        if !can_post {
396            let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
397            let id = scheduled_content::insert_for(
398                &state.db,
399                &ctx.account_id,
400                "thread",
401                &content,
402                Some(&scheduled_for),
403            )
404            .await?;
405
406            let _ = state.event_tx.send(AccountWsEvent {
407                account_id: ctx.account_id.clone(),
408                event: WsEvent::ContentScheduled {
409                    id,
410                    content_type: "thread".to_string(),
411                    scheduled_for: Some(scheduled_for),
412                },
413            });
414
415            return Ok(Json(json!({
416                "status": "scheduled",
417                "id": id,
418                "block_ids": block_ids,
419            })));
420        }
421
422        try_post_thread_now(state, ctx, &core_blocks).await
423    }
424}
425
426/// Persist content via approval queue, scheduled content, or post directly.
427async fn persist_content(
428    state: &AppState,
429    ctx: &AccountContext,
430    body: &ComposeRequest,
431    content: &str,
432) -> Result<Json<Value>, ApiError> {
433    // Validate scheduled_for early, before any branching logic
434    let normalized_schedule = match &body.scheduled_for {
435        Some(raw) => Some(
436            tuitbot_core::scheduling::validate_and_normalize(
437                raw,
438                tuitbot_core::scheduling::DEFAULT_GRACE_SECONDS,
439            )
440            .map_err(|e| ApiError::BadRequest(e.to_string()))?,
441        ),
442        None => None,
443    };
444
445    let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
446
447    if approval_mode {
448        let media_paths = body.media_paths.as_deref().unwrap_or(&[]);
449        let media_json = serde_json::to_string(media_paths).unwrap_or_else(|_| "[]".to_string());
450
451        let prov_input = build_provenance_input(body.provenance.as_deref());
452
453        let id = approval_queue::enqueue_with_provenance_for(
454            &state.db,
455            &ctx.account_id,
456            &body.content_type,
457            "",
458            "",
459            content,
460            "",
461            "",
462            0.0,
463            &media_json,
464            None,
465            None,
466            prov_input.as_ref(),
467            normalized_schedule.as_deref(),
468        )
469        .await?;
470
471        let _ = state.event_tx.send(AccountWsEvent {
472            account_id: ctx.account_id.clone(),
473            event: WsEvent::ApprovalQueued {
474                id,
475                action_type: body.content_type.clone(),
476                content: content.to_string(),
477                media_paths: media_paths.to_vec(),
478            },
479        });
480
481        Ok(Json(json!({
482            "status": "queued_for_approval",
483            "id": id,
484            "scheduled_for": normalized_schedule,
485        })))
486    } else if let Some(ref normalized) = normalized_schedule {
487        // User explicitly chose a future time — already validated above.
488        let id = scheduled_content::insert_for(
489            &state.db,
490            &ctx.account_id,
491            &body.content_type,
492            content,
493            Some(normalized),
494        )
495        .await?;
496
497        let _ = state.event_tx.send(AccountWsEvent {
498            account_id: ctx.account_id.clone(),
499            event: WsEvent::ContentScheduled {
500                id,
501                content_type: body.content_type.clone(),
502                scheduled_for: Some(normalized.clone()),
503            },
504        });
505
506        Ok(Json(json!({
507            "status": "scheduled",
508            "id": id,
509        })))
510    } else {
511        // Immediate publish — try posting via X API directly.
512        // If not configured for direct posting, save to calendar instead.
513        let can_post = super::can_post_for(state, &ctx.account_id).await;
514        if !can_post {
515            let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
516            let id = scheduled_content::insert_for(
517                &state.db,
518                &ctx.account_id,
519                &body.content_type,
520                content,
521                Some(&scheduled_for),
522            )
523            .await?;
524
525            let _ = state.event_tx.send(AccountWsEvent {
526                account_id: ctx.account_id.clone(),
527                event: WsEvent::ContentScheduled {
528                    id,
529                    content_type: body.content_type.clone(),
530                    scheduled_for: Some(scheduled_for),
531                },
532            });
533
534            return Ok(Json(json!({
535                "status": "scheduled",
536                "id": id,
537            })));
538        }
539
540        try_post_now(state, ctx, &body.content_type, content).await
541    }
542}
543
544/// Build an X API client for the given account based on the configured backend.
545///
546/// Returns `Box<dyn XApiClient>` so callers can use either scraper or OAuth
547/// without duplicating the construction logic.
548async fn build_x_client(
549    state: &AppState,
550    ctx: &AccountContext,
551) -> Result<Box<dyn XApiClient>, ApiError> {
552    let config = super::read_effective_config(state, &ctx.account_id).await?;
553
554    match config.x_api.provider_backend.as_str() {
555        "scraper" => {
556            let account_data =
557                tuitbot_core::storage::accounts::account_data_dir(&state.data_dir, &ctx.account_id);
558            let client = tuitbot_core::x_api::LocalModeXClient::with_session(
559                config.x_api.scraper_allow_mutations,
560                &account_data,
561            )
562            .await;
563            Ok(Box::new(client))
564        }
565        "x_api" => {
566            let token_path = tuitbot_core::storage::accounts::account_token_path(
567                &state.data_dir,
568                &ctx.account_id,
569            );
570            let access_token = state
571                .get_x_access_token(&token_path, &ctx.account_id)
572                .await
573                .map_err(|e| {
574                    ApiError::BadRequest(format!(
575                        "X API authentication failed — re-link your account in Settings. ({e})"
576                    ))
577                })?;
578            Ok(Box::new(XApiHttpClient::new(access_token)))
579        }
580        _ => Err(ApiError::BadRequest(
581            "Direct posting requires X API credentials or a browser session. \
582             Configure in Settings → X API."
583                .to_string(),
584        )),
585    }
586}
587
588/// Attempt to post a tweet directly via X API or cookie-auth transport.
589async fn try_post_now(
590    state: &AppState,
591    ctx: &AccountContext,
592    content_type: &str,
593    content: &str,
594) -> Result<Json<Value>, ApiError> {
595    let client = build_x_client(state, ctx).await?;
596
597    let posted = client
598        .post_tweet(content)
599        .await
600        .map_err(|e| ApiError::Internal(format!("Failed to post tweet: {e}")))?;
601
602    let metadata = json!({
603        "tweet_id": posted.id,
604        "content_type": content_type,
605        "source": "compose",
606    });
607    let _ = action_log::log_action_for(
608        &state.db,
609        &ctx.account_id,
610        "tweet_posted",
611        "success",
612        Some(&format!("Posted tweet {}", posted.id)),
613        Some(&metadata.to_string()),
614    )
615    .await;
616
617    Ok(Json(json!({
618        "status": "posted",
619        "tweet_id": posted.id,
620    })))
621}
622
623/// Post a thread as a reply chain: first tweet standalone, each subsequent
624/// tweet replying to the previous one. Returns all posted tweet IDs.
625async fn try_post_thread_now(
626    state: &AppState,
627    ctx: &AccountContext,
628    blocks: &[ThreadBlock],
629) -> Result<Json<Value>, ApiError> {
630    let client = build_x_client(state, ctx).await?;
631
632    let mut sorted: Vec<&ThreadBlock> = blocks.iter().collect();
633    sorted.sort_by_key(|b| b.order);
634
635    let mut tweet_ids: Vec<String> = Vec::with_capacity(sorted.len());
636
637    for (i, block) in sorted.iter().enumerate() {
638        let posted = if i == 0 {
639            client.post_tweet(&block.text).await
640        } else {
641            client.reply_to_tweet(&block.text, &tweet_ids[i - 1]).await
642        };
643
644        match posted {
645            Ok(p) => tweet_ids.push(p.id),
646            Err(e) => {
647                // Log partial failure with the IDs we did post.
648                let metadata = json!({
649                    "posted_tweet_ids": tweet_ids,
650                    "failed_at_index": i,
651                    "error": e.to_string(),
652                    "source": "compose",
653                });
654                let _ = action_log::log_action_for(
655                    &state.db,
656                    &ctx.account_id,
657                    "thread_posted",
658                    "partial_failure",
659                    Some(&format!(
660                        "Thread failed at tweet {}/{}: {e}",
661                        i + 1,
662                        sorted.len()
663                    )),
664                    Some(&metadata.to_string()),
665                )
666                .await;
667
668                return Err(ApiError::Internal(format!(
669                    "Thread failed at tweet {}/{}: {e}. \
670                     {} tweet(s) were posted and cannot be undone.",
671                    i + 1,
672                    sorted.len(),
673                    tweet_ids.len()
674                )));
675            }
676        }
677    }
678
679    let metadata = json!({
680        "tweet_ids": tweet_ids,
681        "content_type": "thread",
682        "source": "compose",
683    });
684    let _ = action_log::log_action_for(
685        &state.db,
686        &ctx.account_id,
687        "thread_posted",
688        "success",
689        Some(&format!("Posted thread ({} tweets)", tweet_ids.len())),
690        Some(&metadata.to_string()),
691    )
692    .await;
693
694    Ok(Json(json!({
695        "status": "posted",
696        "tweet_ids": tweet_ids,
697    })))
698}
699
700/// Build a `ProvenanceInput` from optional provenance refs.
701fn build_provenance_input(
702    provenance: Option<&[ProvenanceRef]>,
703) -> Option<approval_queue::ProvenanceInput> {
704    let refs = provenance?;
705    if refs.is_empty() {
706        return None;
707    }
708
709    let source_node_id = refs.iter().find_map(|r| r.node_id);
710    let source_seed_id = refs.iter().find_map(|r| r.seed_id);
711    let source_chunks_json = serde_json::to_string(refs).unwrap_or_else(|_| "[]".to_string());
712
713    Some(approval_queue::ProvenanceInput {
714        source_node_id,
715        source_seed_id,
716        source_chunks_json,
717        refs: refs.to_vec(),
718    })
719}