tuitbot_core/workflow/
queue.rs1use std::sync::Arc;
8
9use crate::config::Config;
10use crate::llm::LlmProvider;
11use crate::safety::{contains_banned_phrase, DedupChecker};
12use crate::storage;
13use crate::storage::DbPool;
14use crate::toolkit;
15use crate::x_api::XApiClient;
16
17use super::{make_content_gen, ProposeResult, QueueItem, WorkflowError};
18
19#[derive(Debug, Clone)]
21pub struct QueueInput {
22 pub items: Vec<QueueItem>,
24 pub mention_product: bool,
26}
27
28pub async fn execute(
35 db: &DbPool,
36 x_client: Option<&dyn XApiClient>,
37 llm: Option<&Arc<dyn LlmProvider>>,
38 config: &Config,
39 input: QueueInput,
40) -> Result<Vec<ProposeResult>, WorkflowError> {
41 if input.items.is_empty() {
42 return Err(WorkflowError::InvalidInput(
43 "items must not be empty.".to_string(),
44 ));
45 }
46
47 let approval_mode = config.effective_approval_mode();
48 let dedup = DedupChecker::new(db.clone());
49 let banned = &config.limits.banned_phrases;
50
51 let gen = llm.map(|l| make_content_gen(l, &config.business));
53
54 let mut results = Vec::with_capacity(input.items.len());
55
56 for item in &input.items {
57 let tweet = match storage::tweets::get_tweet_by_id(db, &item.candidate_id).await {
59 Ok(Some(t)) => t,
60 Ok(None) => {
61 results.push(ProposeResult::Blocked {
62 candidate_id: item.candidate_id.clone(),
63 reason: format!("Tweet {} not found in discovery DB.", item.candidate_id),
64 });
65 continue;
66 }
67 Err(e) => {
68 results.push(ProposeResult::Blocked {
69 candidate_id: item.candidate_id.clone(),
70 reason: format!("DB error: {e}"),
71 });
72 continue;
73 }
74 };
75
76 let reply_text = if let Some(text) = &item.pre_drafted_text {
78 text.clone()
79 } else {
80 let content_gen = match &gen {
81 Some(g) => g,
82 None => {
83 results.push(ProposeResult::Blocked {
84 candidate_id: item.candidate_id.clone(),
85 reason: "LLM not configured for auto-generation.".to_string(),
86 });
87 continue;
88 }
89 };
90 match content_gen
91 .generate_reply(
92 &tweet.content,
93 &tweet.author_username,
94 input.mention_product,
95 )
96 .await
97 {
98 Ok(output) => output.text,
99 Err(e) => {
100 results.push(ProposeResult::Blocked {
101 candidate_id: item.candidate_id.clone(),
102 reason: format!("LLM generation failed: {e}"),
103 });
104 continue;
105 }
106 }
107 };
108
109 if let Ok(true) = dedup.has_replied_to(&item.candidate_id).await {
111 results.push(ProposeResult::Blocked {
112 candidate_id: item.candidate_id.clone(),
113 reason: "Already replied to this tweet.".to_string(),
114 });
115 continue;
116 }
117
118 if let Some(phrase) = contains_banned_phrase(&reply_text, banned) {
119 results.push(ProposeResult::Blocked {
120 candidate_id: item.candidate_id.clone(),
121 reason: format!("Contains banned phrase: {phrase}"),
122 });
123 continue;
124 }
125
126 if let Ok(true) = dedup.is_phrasing_similar(&reply_text, 20).await {
127 results.push(ProposeResult::Blocked {
128 candidate_id: item.candidate_id.clone(),
129 reason: "Reply too similar to a recent reply.".to_string(),
130 });
131 continue;
132 }
133
134 if approval_mode {
136 match storage::approval_queue::enqueue(
137 db,
138 "reply",
139 &item.candidate_id,
140 &tweet.author_username,
141 &reply_text,
142 "composite",
143 "auto",
144 tweet.relevance_score.unwrap_or(0.0),
145 "[]",
146 )
147 .await
148 {
149 Ok(id) => {
150 results.push(ProposeResult::Queued {
151 candidate_id: item.candidate_id.clone(),
152 approval_queue_id: id,
153 });
154 }
155 Err(e) => {
156 results.push(ProposeResult::Blocked {
157 candidate_id: item.candidate_id.clone(),
158 reason: format!("Failed to enqueue: {e}"),
159 });
160 }
161 }
162 } else {
163 let client = match x_client {
165 Some(c) => c,
166 None => {
167 results.push(ProposeResult::Blocked {
168 candidate_id: item.candidate_id.clone(),
169 reason: "X API client not available.".to_string(),
170 });
171 continue;
172 }
173 };
174
175 match toolkit::write::reply_to_tweet(client, &reply_text, &item.candidate_id, None)
177 .await
178 {
179 Ok(posted) => {
180 let _ = storage::tweets::mark_tweet_replied(db, &item.candidate_id).await;
181 results.push(ProposeResult::Executed {
182 candidate_id: item.candidate_id.clone(),
183 reply_tweet_id: posted.id,
184 });
185 }
186 Err(e) => {
187 results.push(ProposeResult::Blocked {
188 candidate_id: item.candidate_id.clone(),
189 reason: format!("X API error: {e}"),
190 });
191 }
192 }
193 }
194 }
195
196 Ok(results)
197}