Skip to main content

tuitbot_server/routes/content/
compose.rs

1//! Compose endpoints for tweets, threads, and unified compose.
2
3use std::sync::Arc;
4
5use axum::extract::State;
6use axum::Json;
7use serde::Deserialize;
8use serde_json::{json, Value};
9use tuitbot_core::content::{
10    serialize_blocks_for_storage, tweet_weighted_len, validate_thread_blocks, ThreadBlock,
11    MAX_TWEET_CHARS,
12};
13use tuitbot_core::storage::provenance::ProvenanceRef;
14use tuitbot_core::storage::{action_log, approval_queue, scheduled_content};
15use tuitbot_core::x_api::{XApiClient, XApiHttpClient};
16
17use crate::account::{require_mutate, AccountContext};
18use crate::error::ApiError;
19use crate::state::AppState;
20use crate::ws::{AccountWsEvent, WsEvent};
21
22use super::read_approval_mode;
23
24/// A single thread block in an API request payload.
25#[derive(Debug, Deserialize)]
26pub struct ThreadBlockRequest {
27    /// Client-generated stable UUID.
28    pub id: String,
29    /// Tweet text content.
30    pub text: String,
31    /// Per-block media file paths.
32    #[serde(default)]
33    pub media_paths: Vec<String>,
34    /// Zero-based ordering index.
35    pub order: u32,
36}
37
38impl ThreadBlockRequest {
39    /// Convert to the core domain type.
40    pub(crate) fn into_core(self) -> ThreadBlock {
41        ThreadBlock {
42            id: self.id,
43            text: self.text,
44            media_paths: self.media_paths,
45            order: self.order,
46        }
47    }
48}
49
50/// Request body for composing a manual tweet.
51#[derive(Deserialize)]
52pub struct ComposeTweetRequest {
53    /// The tweet text.
54    pub text: String,
55    /// Optional ISO 8601 timestamp to schedule the tweet.
56    pub scheduled_for: Option<String>,
57    /// Optional provenance refs linking this content to vault source material.
58    #[serde(default)]
59    pub provenance: Option<Vec<ProvenanceRef>>,
60}
61
62/// `POST /api/content/tweets` — compose and queue a manual tweet.
63pub async fn compose_tweet(
64    State(state): State<Arc<AppState>>,
65    ctx: AccountContext,
66    Json(body): Json<ComposeTweetRequest>,
67) -> Result<Json<Value>, ApiError> {
68    require_mutate(&ctx)?;
69
70    let text = body.text.trim();
71    if text.is_empty() {
72        return Err(ApiError::BadRequest("text is required".to_string()));
73    }
74
75    // Check if approval mode is enabled.
76    let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
77
78    if approval_mode {
79        let prov_input = build_provenance_input(body.provenance.as_deref());
80
81        let id = approval_queue::enqueue_with_provenance_for(
82            &state.db,
83            &ctx.account_id,
84            "tweet",
85            "", // no target tweet
86            "", // no target author
87            text,
88            "", // no topic
89            "", // no archetype
90            0.0,
91            "[]",
92            None,
93            None,
94            prov_input.as_ref(),
95        )
96        .await?;
97
98        let _ = state.event_tx.send(AccountWsEvent {
99            account_id: ctx.account_id.clone(),
100            event: WsEvent::ApprovalQueued {
101                id,
102                action_type: "tweet".to_string(),
103                content: text.to_string(),
104                media_paths: vec![],
105            },
106        });
107
108        Ok(Json(json!({
109            "status": "queued_for_approval",
110            "id": id,
111        })))
112    } else {
113        // Without X API client in AppState, we can only acknowledge the intent.
114        Ok(Json(json!({
115            "status": "accepted",
116            "text": text,
117            "scheduled_for": body.scheduled_for,
118        })))
119    }
120}
121
122/// Request body for composing a manual thread.
123#[derive(Deserialize)]
124pub struct ComposeThreadRequest {
125    /// The tweets forming the thread.
126    pub tweets: Vec<String>,
127    /// Optional ISO 8601 timestamp to schedule the thread.
128    pub scheduled_for: Option<String>,
129}
130
131/// `POST /api/content/threads` — compose and queue a manual thread.
132pub async fn compose_thread(
133    State(state): State<Arc<AppState>>,
134    ctx: AccountContext,
135    Json(body): Json<ComposeThreadRequest>,
136) -> Result<Json<Value>, ApiError> {
137    require_mutate(&ctx)?;
138
139    if body.tweets.is_empty() {
140        return Err(ApiError::BadRequest(
141            "tweets array must not be empty".to_string(),
142        ));
143    }
144
145    let approval_mode = read_approval_mode(&state, &ctx.account_id).await?;
146    let combined = body.tweets.join("\n---\n");
147
148    if approval_mode {
149        let id = approval_queue::enqueue_for(
150            &state.db,
151            &ctx.account_id,
152            "thread",
153            "",
154            "",
155            &combined,
156            "",
157            "",
158            0.0,
159            "[]",
160        )
161        .await?;
162
163        let _ = state.event_tx.send(AccountWsEvent {
164            account_id: ctx.account_id.clone(),
165            event: WsEvent::ApprovalQueued {
166                id,
167                action_type: "thread".to_string(),
168                content: combined,
169                media_paths: vec![],
170            },
171        });
172
173        Ok(Json(json!({
174            "status": "queued_for_approval",
175            "id": id,
176        })))
177    } else {
178        Ok(Json(json!({
179            "status": "accepted",
180            "tweet_count": body.tweets.len(),
181            "scheduled_for": body.scheduled_for,
182        })))
183    }
184}
185
186/// Request body for the unified compose endpoint.
187#[derive(Deserialize)]
188pub struct ComposeRequest {
189    /// Content type: "tweet" or "thread".
190    pub content_type: String,
191    /// Content text (string for tweet, JSON array string for thread).
192    pub content: String,
193    /// Optional ISO 8601 timestamp to schedule the content.
194    pub scheduled_for: Option<String>,
195    /// Optional local media file paths to attach (top-level, used for tweets).
196    #[serde(default)]
197    pub media_paths: Option<Vec<String>>,
198    /// Optional structured thread blocks. Takes precedence over `content` for threads.
199    #[serde(default)]
200    pub blocks: Option<Vec<ThreadBlockRequest>>,
201    /// Optional provenance refs linking this content to vault source material.
202    #[serde(default)]
203    pub provenance: Option<Vec<ProvenanceRef>>,
204}
205
206/// `POST /api/content/compose` — compose manual content (tweet or thread).
207pub async fn compose(
208    State(state): State<Arc<AppState>>,
209    ctx: AccountContext,
210    Json(mut body): Json<ComposeRequest>,
211) -> Result<Json<Value>, ApiError> {
212    require_mutate(&ctx)?;
213
214    let blocks = body.blocks.take();
215
216    match body.content_type.as_str() {
217        "tweet" => compose_tweet_flow(&state, &ctx, &body).await,
218        "thread" => {
219            if let Some(blocks) = blocks {
220                compose_thread_blocks_flow(&state, &ctx, &body, blocks).await
221            } else {
222                compose_thread_legacy_flow(&state, &ctx, &body).await
223            }
224        }
225        _ => Err(ApiError::BadRequest(
226            "content_type must be 'tweet' or 'thread'".to_string(),
227        )),
228    }
229}
230
231/// Handle tweet compose via the unified endpoint.
232async fn compose_tweet_flow(
233    state: &AppState,
234    ctx: &AccountContext,
235    body: &ComposeRequest,
236) -> Result<Json<Value>, ApiError> {
237    let content = body.content.trim().to_string();
238    if content.is_empty() {
239        return Err(ApiError::BadRequest("content is required".to_string()));
240    }
241    if tweet_weighted_len(&content) > MAX_TWEET_CHARS {
242        return Err(ApiError::BadRequest(
243            "tweet content must not exceed 280 characters".to_string(),
244        ));
245    }
246
247    persist_content(state, ctx, body, &content).await
248}
249
250/// Handle legacy thread compose (content as JSON array of strings).
251async fn compose_thread_legacy_flow(
252    state: &AppState,
253    ctx: &AccountContext,
254    body: &ComposeRequest,
255) -> Result<Json<Value>, ApiError> {
256    let content = body.content.trim().to_string();
257    if content.is_empty() {
258        return Err(ApiError::BadRequest("content is required".to_string()));
259    }
260
261    let tweets: Vec<String> = serde_json::from_str(&content).map_err(|_| {
262        ApiError::BadRequest("thread content must be a JSON array of strings".to_string())
263    })?;
264
265    if tweets.is_empty() {
266        return Err(ApiError::BadRequest(
267            "thread must contain at least one tweet".to_string(),
268        ));
269    }
270
271    for (i, tweet) in tweets.iter().enumerate() {
272        if tweet_weighted_len(tweet) > MAX_TWEET_CHARS {
273            return Err(ApiError::BadRequest(format!(
274                "tweet {} exceeds 280 characters",
275                i + 1
276            )));
277        }
278    }
279
280    persist_content(state, ctx, body, &content).await
281}
282
283/// Handle structured thread blocks compose.
284async fn compose_thread_blocks_flow(
285    state: &AppState,
286    ctx: &AccountContext,
287    body: &ComposeRequest,
288    block_requests: Vec<ThreadBlockRequest>,
289) -> Result<Json<Value>, ApiError> {
290    let core_blocks: Vec<ThreadBlock> = block_requests.into_iter().map(|b| b.into_core()).collect();
291
292    validate_thread_blocks(&core_blocks).map_err(|e| ApiError::BadRequest(e.api_message()))?;
293
294    let block_ids: Vec<String> = {
295        let mut sorted = core_blocks.clone();
296        sorted.sort_by_key(|b| b.order);
297        sorted.iter().map(|b| b.id.clone()).collect()
298    };
299
300    let content = serialize_blocks_for_storage(&core_blocks);
301
302    // Collect per-block media into a flat list for approval queue storage.
303    let all_media: Vec<String> = {
304        let mut sorted = core_blocks.clone();
305        sorted.sort_by_key(|b| b.order);
306        sorted.iter().flat_map(|b| b.media_paths.clone()).collect()
307    };
308
309    let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
310
311    if approval_mode {
312        let media_json = serde_json::to_string(&all_media).unwrap_or_else(|_| "[]".to_string());
313        let prov_input = build_provenance_input(body.provenance.as_deref());
314
315        let id = approval_queue::enqueue_with_provenance_for(
316            &state.db,
317            &ctx.account_id,
318            "thread",
319            "",
320            "",
321            &content,
322            "",
323            "",
324            0.0,
325            &media_json,
326            None,
327            None,
328            prov_input.as_ref(),
329        )
330        .await?;
331
332        let _ = state.event_tx.send(AccountWsEvent {
333            account_id: ctx.account_id.clone(),
334            event: WsEvent::ApprovalQueued {
335                id,
336                action_type: "thread".to_string(),
337                content: content.clone(),
338                media_paths: all_media,
339            },
340        });
341
342        Ok(Json(json!({
343            "status": "queued_for_approval",
344            "id": id,
345            "block_ids": block_ids,
346        })))
347    } else if body.scheduled_for.is_some() {
348        // User explicitly chose a future time — save to calendar.
349        let id = scheduled_content::insert_for(
350            &state.db,
351            &ctx.account_id,
352            "thread",
353            &content,
354            body.scheduled_for.as_deref(),
355        )
356        .await?;
357
358        let _ = state.event_tx.send(AccountWsEvent {
359            account_id: ctx.account_id.clone(),
360            event: WsEvent::ContentScheduled {
361                id,
362                content_type: "thread".to_string(),
363                scheduled_for: body.scheduled_for.clone(),
364            },
365        });
366
367        Ok(Json(json!({
368            "status": "scheduled",
369            "id": id,
370            "block_ids": block_ids,
371        })))
372    } else {
373        // Immediate publish — try posting as a reply chain.
374        let can_post = super::can_post_for(state, &ctx.account_id).await;
375        if !can_post {
376            let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
377            let id = scheduled_content::insert_for(
378                &state.db,
379                &ctx.account_id,
380                "thread",
381                &content,
382                Some(&scheduled_for),
383            )
384            .await?;
385
386            let _ = state.event_tx.send(AccountWsEvent {
387                account_id: ctx.account_id.clone(),
388                event: WsEvent::ContentScheduled {
389                    id,
390                    content_type: "thread".to_string(),
391                    scheduled_for: Some(scheduled_for),
392                },
393            });
394
395            return Ok(Json(json!({
396                "status": "scheduled",
397                "id": id,
398                "block_ids": block_ids,
399            })));
400        }
401
402        try_post_thread_now(state, ctx, &core_blocks).await
403    }
404}
405
406/// Persist content via approval queue, scheduled content, or post directly.
407async fn persist_content(
408    state: &AppState,
409    ctx: &AccountContext,
410    body: &ComposeRequest,
411    content: &str,
412) -> Result<Json<Value>, ApiError> {
413    let approval_mode = read_approval_mode(state, &ctx.account_id).await?;
414
415    if approval_mode {
416        let media_paths = body.media_paths.as_deref().unwrap_or(&[]);
417        let media_json = serde_json::to_string(media_paths).unwrap_or_else(|_| "[]".to_string());
418
419        let prov_input = build_provenance_input(body.provenance.as_deref());
420
421        let id = approval_queue::enqueue_with_provenance_for(
422            &state.db,
423            &ctx.account_id,
424            &body.content_type,
425            "",
426            "",
427            content,
428            "",
429            "",
430            0.0,
431            &media_json,
432            None,
433            None,
434            prov_input.as_ref(),
435        )
436        .await?;
437
438        let _ = state.event_tx.send(AccountWsEvent {
439            account_id: ctx.account_id.clone(),
440            event: WsEvent::ApprovalQueued {
441                id,
442                action_type: body.content_type.clone(),
443                content: content.to_string(),
444                media_paths: media_paths.to_vec(),
445            },
446        });
447
448        Ok(Json(json!({
449            "status": "queued_for_approval",
450            "id": id,
451        })))
452    } else if body.scheduled_for.is_some() {
453        // User explicitly chose a future time — save to calendar.
454        let id = scheduled_content::insert_for(
455            &state.db,
456            &ctx.account_id,
457            &body.content_type,
458            content,
459            body.scheduled_for.as_deref(),
460        )
461        .await?;
462
463        let _ = state.event_tx.send(AccountWsEvent {
464            account_id: ctx.account_id.clone(),
465            event: WsEvent::ContentScheduled {
466                id,
467                content_type: body.content_type.clone(),
468                scheduled_for: body.scheduled_for.clone(),
469            },
470        });
471
472        Ok(Json(json!({
473            "status": "scheduled",
474            "id": id,
475        })))
476    } else {
477        // Immediate publish — try posting via X API directly.
478        // If not configured for direct posting, save to calendar instead.
479        let can_post = super::can_post_for(state, &ctx.account_id).await;
480        if !can_post {
481            let scheduled_for = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S").to_string();
482            let id = scheduled_content::insert_for(
483                &state.db,
484                &ctx.account_id,
485                &body.content_type,
486                content,
487                Some(&scheduled_for),
488            )
489            .await?;
490
491            let _ = state.event_tx.send(AccountWsEvent {
492                account_id: ctx.account_id.clone(),
493                event: WsEvent::ContentScheduled {
494                    id,
495                    content_type: body.content_type.clone(),
496                    scheduled_for: Some(scheduled_for),
497                },
498            });
499
500            return Ok(Json(json!({
501                "status": "scheduled",
502                "id": id,
503            })));
504        }
505
506        try_post_now(state, ctx, &body.content_type, content).await
507    }
508}
509
510/// Build an X API client for the given account based on the configured backend.
511///
512/// Returns `Box<dyn XApiClient>` so callers can use either scraper or OAuth
513/// without duplicating the construction logic.
514async fn build_x_client(
515    state: &AppState,
516    ctx: &AccountContext,
517) -> Result<Box<dyn XApiClient>, ApiError> {
518    let config = super::read_effective_config(state, &ctx.account_id).await?;
519
520    match config.x_api.provider_backend.as_str() {
521        "scraper" => {
522            let account_data =
523                tuitbot_core::storage::accounts::account_data_dir(&state.data_dir, &ctx.account_id);
524            let client = tuitbot_core::x_api::LocalModeXClient::with_session(
525                config.x_api.scraper_allow_mutations,
526                &account_data,
527            )
528            .await;
529            Ok(Box::new(client))
530        }
531        "x_api" => {
532            let token_path = tuitbot_core::storage::accounts::account_token_path(
533                &state.data_dir,
534                &ctx.account_id,
535            );
536            let access_token = state
537                .get_x_access_token(&token_path, &ctx.account_id)
538                .await
539                .map_err(|e| {
540                    ApiError::BadRequest(format!(
541                        "X API authentication failed — re-link your account in Settings. ({e})"
542                    ))
543                })?;
544            Ok(Box::new(XApiHttpClient::new(access_token)))
545        }
546        _ => Err(ApiError::BadRequest(
547            "Direct posting requires X API credentials or a browser session. \
548             Configure in Settings → X API."
549                .to_string(),
550        )),
551    }
552}
553
554/// Attempt to post a tweet directly via X API or cookie-auth transport.
555async fn try_post_now(
556    state: &AppState,
557    ctx: &AccountContext,
558    content_type: &str,
559    content: &str,
560) -> Result<Json<Value>, ApiError> {
561    let client = build_x_client(state, ctx).await?;
562
563    let posted = client
564        .post_tweet(content)
565        .await
566        .map_err(|e| ApiError::Internal(format!("Failed to post tweet: {e}")))?;
567
568    let metadata = json!({
569        "tweet_id": posted.id,
570        "content_type": content_type,
571        "source": "compose",
572    });
573    let _ = action_log::log_action_for(
574        &state.db,
575        &ctx.account_id,
576        "tweet_posted",
577        "success",
578        Some(&format!("Posted tweet {}", posted.id)),
579        Some(&metadata.to_string()),
580    )
581    .await;
582
583    Ok(Json(json!({
584        "status": "posted",
585        "tweet_id": posted.id,
586    })))
587}
588
589/// Post a thread as a reply chain: first tweet standalone, each subsequent
590/// tweet replying to the previous one. Returns all posted tweet IDs.
591async fn try_post_thread_now(
592    state: &AppState,
593    ctx: &AccountContext,
594    blocks: &[ThreadBlock],
595) -> Result<Json<Value>, ApiError> {
596    let client = build_x_client(state, ctx).await?;
597
598    let mut sorted: Vec<&ThreadBlock> = blocks.iter().collect();
599    sorted.sort_by_key(|b| b.order);
600
601    let mut tweet_ids: Vec<String> = Vec::with_capacity(sorted.len());
602
603    for (i, block) in sorted.iter().enumerate() {
604        let posted = if i == 0 {
605            client.post_tweet(&block.text).await
606        } else {
607            client.reply_to_tweet(&block.text, &tweet_ids[i - 1]).await
608        };
609
610        match posted {
611            Ok(p) => tweet_ids.push(p.id),
612            Err(e) => {
613                // Log partial failure with the IDs we did post.
614                let metadata = json!({
615                    "posted_tweet_ids": tweet_ids,
616                    "failed_at_index": i,
617                    "error": e.to_string(),
618                    "source": "compose",
619                });
620                let _ = action_log::log_action_for(
621                    &state.db,
622                    &ctx.account_id,
623                    "thread_posted",
624                    "partial_failure",
625                    Some(&format!(
626                        "Thread failed at tweet {}/{}: {e}",
627                        i + 1,
628                        sorted.len()
629                    )),
630                    Some(&metadata.to_string()),
631                )
632                .await;
633
634                return Err(ApiError::Internal(format!(
635                    "Thread failed at tweet {}/{}: {e}. \
636                     {} tweet(s) were posted and cannot be undone.",
637                    i + 1,
638                    sorted.len(),
639                    tweet_ids.len()
640                )));
641            }
642        }
643    }
644
645    let metadata = json!({
646        "tweet_ids": tweet_ids,
647        "content_type": "thread",
648        "source": "compose",
649    });
650    let _ = action_log::log_action_for(
651        &state.db,
652        &ctx.account_id,
653        "thread_posted",
654        "success",
655        Some(&format!("Posted thread ({} tweets)", tweet_ids.len())),
656        Some(&metadata.to_string()),
657    )
658    .await;
659
660    Ok(Json(json!({
661        "status": "posted",
662        "tweet_ids": tweet_ids,
663    })))
664}
665
666/// Build a `ProvenanceInput` from optional provenance refs.
667fn build_provenance_input(
668    provenance: Option<&[ProvenanceRef]>,
669) -> Option<approval_queue::ProvenanceInput> {
670    let refs = provenance?;
671    if refs.is_empty() {
672        return None;
673    }
674
675    let source_node_id = refs.iter().find_map(|r| r.node_id);
676    let source_seed_id = refs.iter().find_map(|r| r.seed_id);
677    let source_chunks_json = serde_json::to_string(refs).unwrap_or_else(|_| "[]".to_string());
678
679    Some(approval_queue::ProvenanceInput {
680        source_node_id,
681        source_seed_id,
682        source_chunks_json,
683        refs: refs.to_vec(),
684    })
685}