Skip to main content

tuitbot_core/workflow/
queue.rs

1//! Queue step: validate replies, safety-check, then route to approval queue or execute.
2//!
3//! This is the third step in the reply pipeline: given draft text and candidates,
4//! either queue for human approval or execute directly via toolkit.
5//! All X API writes route through `toolkit::write::reply_to_tweet`.
6
7use std::sync::Arc;
8
9use crate::config::Config;
10use crate::llm::LlmProvider;
11use crate::safety::{contains_banned_phrase, DedupChecker};
12use crate::storage;
13use crate::storage::DbPool;
14use crate::toolkit;
15use crate::x_api::XApiClient;
16
17use super::{make_content_gen, ProposeResult, QueueItem, WorkflowError};
18
19/// Input for the queue step.
20#[derive(Debug, Clone)]
21pub struct QueueInput {
22    /// Items to process (each is a candidate + optional pre-drafted text).
23    pub items: Vec<QueueItem>,
24    /// Whether to mention the product in generated replies.
25    pub mention_product: bool,
26}
27
28/// Execute the queue step: validate, safety-check, route or execute.
29///
30/// When `approval_mode` is true, replies are queued for human review.
31/// When false, replies are executed immediately via toolkit.
32///
33/// All X API writes go through `toolkit::write::reply_to_tweet`.
34pub async fn execute(
35    db: &DbPool,
36    x_client: Option<&dyn XApiClient>,
37    llm: Option<&Arc<dyn LlmProvider>>,
38    config: &Config,
39    input: QueueInput,
40) -> Result<Vec<ProposeResult>, WorkflowError> {
41    if input.items.is_empty() {
42        return Err(WorkflowError::InvalidInput(
43            "items must not be empty.".to_string(),
44        ));
45    }
46
47    let approval_mode = config.effective_approval_mode();
48    let dedup = DedupChecker::new(db.clone());
49    let banned = &config.limits.banned_phrases;
50
51    // Build content generator if LLM is available (needed for auto-generation)
52    let gen = llm.map(|l| make_content_gen(l, &config.business));
53
54    let mut results = Vec::with_capacity(input.items.len());
55
56    for item in &input.items {
57        // Fetch tweet from DB
58        let tweet = match storage::tweets::get_tweet_by_id(db, &item.candidate_id).await {
59            Ok(Some(t)) => t,
60            Ok(None) => {
61                results.push(ProposeResult::Blocked {
62                    candidate_id: item.candidate_id.clone(),
63                    reason: format!("Tweet {} not found in discovery DB.", item.candidate_id),
64                });
65                continue;
66            }
67            Err(e) => {
68                results.push(ProposeResult::Blocked {
69                    candidate_id: item.candidate_id.clone(),
70                    reason: format!("DB error: {e}"),
71                });
72                continue;
73            }
74        };
75
76        // Determine reply text: pre-drafted or auto-generate
77        let reply_text = if let Some(text) = &item.pre_drafted_text {
78            text.clone()
79        } else {
80            let content_gen = match &gen {
81                Some(g) => g,
82                None => {
83                    results.push(ProposeResult::Blocked {
84                        candidate_id: item.candidate_id.clone(),
85                        reason: "LLM not configured for auto-generation.".to_string(),
86                    });
87                    continue;
88                }
89            };
90            match content_gen
91                .generate_reply(
92                    &tweet.content,
93                    &tweet.author_username,
94                    input.mention_product,
95                )
96                .await
97            {
98                Ok(output) => output.text,
99                Err(e) => {
100                    results.push(ProposeResult::Blocked {
101                        candidate_id: item.candidate_id.clone(),
102                        reason: format!("LLM generation failed: {e}"),
103                    });
104                    continue;
105                }
106            }
107        };
108
109        // Safety checks
110        if let Ok(true) = dedup.has_replied_to(&item.candidate_id).await {
111            results.push(ProposeResult::Blocked {
112                candidate_id: item.candidate_id.clone(),
113                reason: "Already replied to this tweet.".to_string(),
114            });
115            continue;
116        }
117
118        if let Some(phrase) = contains_banned_phrase(&reply_text, banned) {
119            results.push(ProposeResult::Blocked {
120                candidate_id: item.candidate_id.clone(),
121                reason: format!("Contains banned phrase: {phrase}"),
122            });
123            continue;
124        }
125
126        if let Ok(true) = dedup.is_phrasing_similar(&reply_text, 20).await {
127            results.push(ProposeResult::Blocked {
128                candidate_id: item.candidate_id.clone(),
129                reason: "Reply too similar to a recent reply.".to_string(),
130            });
131            continue;
132        }
133
134        // Route: approval queue or direct execution
135        if approval_mode {
136            match storage::approval_queue::enqueue(
137                db,
138                "reply",
139                &item.candidate_id,
140                &tweet.author_username,
141                &reply_text,
142                "composite",
143                "auto",
144                tweet.relevance_score.unwrap_or(0.0),
145                "[]",
146            )
147            .await
148            {
149                Ok(id) => {
150                    results.push(ProposeResult::Queued {
151                        candidate_id: item.candidate_id.clone(),
152                        approval_queue_id: id,
153                    });
154                }
155                Err(e) => {
156                    results.push(ProposeResult::Blocked {
157                        candidate_id: item.candidate_id.clone(),
158                        reason: format!("Failed to enqueue: {e}"),
159                    });
160                }
161            }
162        } else {
163            // Direct execution requires X client
164            let client = match x_client {
165                Some(c) => c,
166                None => {
167                    results.push(ProposeResult::Blocked {
168                        candidate_id: item.candidate_id.clone(),
169                        reason: "X API client not available.".to_string(),
170                    });
171                    continue;
172                }
173            };
174
175            // Route through toolkit, not direct XApiClient
176            match toolkit::write::reply_to_tweet(client, &reply_text, &item.candidate_id, None)
177                .await
178            {
179                Ok(posted) => {
180                    let _ = storage::tweets::mark_tweet_replied(db, &item.candidate_id).await;
181                    results.push(ProposeResult::Executed {
182                        candidate_id: item.candidate_id.clone(),
183                        reply_tweet_id: posted.id,
184                    });
185                }
186                Err(e) => {
187                    results.push(ProposeResult::Blocked {
188                        candidate_id: item.candidate_id.clone(),
189                        reason: format!("X API error: {e}"),
190                    });
191                }
192            }
193        }
194    }
195
196    Ok(results)
197}