use std::sync::Arc;
use crate::config::Config;
use crate::llm::LlmProvider;
use crate::safety::{contains_banned_phrase, DedupChecker};
use crate::storage;
use crate::storage::DbPool;
use crate::toolkit;
use crate::x_api::XApiClient;
use super::{make_content_gen, ProposeResult, QueueItem, WorkflowError};
#[derive(Debug, Clone)]
pub struct QueueInput {
pub items: Vec<QueueItem>,
pub mention_product: bool,
}
pub async fn execute(
db: &DbPool,
x_client: Option<&dyn XApiClient>,
llm: Option<&Arc<dyn LlmProvider>>,
config: &Config,
input: QueueInput,
) -> Result<Vec<ProposeResult>, WorkflowError> {
if input.items.is_empty() {
return Err(WorkflowError::InvalidInput(
"items must not be empty.".to_string(),
));
}
let approval_mode = config.effective_approval_mode();
let dedup = DedupChecker::new(db.clone());
let banned = &config.limits.banned_phrases;
let gen = llm.map(|l| make_content_gen(l, &config.business));
let mut results = Vec::with_capacity(input.items.len());
for item in &input.items {
let tweet = match storage::tweets::get_tweet_by_id(db, &item.candidate_id).await {
Ok(Some(t)) => t,
Ok(None) => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: format!("Tweet {} not found in discovery DB.", item.candidate_id),
});
continue;
}
Err(e) => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: format!("DB error: {e}"),
});
continue;
}
};
let reply_text = if let Some(text) = &item.pre_drafted_text {
text.clone()
} else {
let content_gen = match &gen {
Some(g) => g,
None => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: "LLM not configured for auto-generation.".to_string(),
});
continue;
}
};
match content_gen
.generate_reply(
&tweet.content,
&tweet.author_username,
input.mention_product,
)
.await
{
Ok(output) => output.text,
Err(e) => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: format!("LLM generation failed: {e}"),
});
continue;
}
}
};
if let Ok(true) = dedup.has_replied_to(&item.candidate_id).await {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: "Already replied to this tweet.".to_string(),
});
continue;
}
if let Some(phrase) = contains_banned_phrase(&reply_text, banned) {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: format!("Contains banned phrase: {phrase}"),
});
continue;
}
if let Ok(true) = dedup.is_phrasing_similar(&reply_text, 20).await {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: "Reply too similar to a recent reply.".to_string(),
});
continue;
}
if approval_mode {
match storage::approval_queue::enqueue(
db,
"reply",
&item.candidate_id,
&tweet.author_username,
&reply_text,
"composite",
"auto",
tweet.relevance_score.unwrap_or(0.0),
"[]",
)
.await
{
Ok(id) => {
results.push(ProposeResult::Queued {
candidate_id: item.candidate_id.clone(),
approval_queue_id: id,
});
}
Err(e) => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: format!("Failed to enqueue: {e}"),
});
}
}
} else {
let client = match x_client {
Some(c) => c,
None => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: "X API client not available.".to_string(),
});
continue;
}
};
match toolkit::write::reply_to_tweet(client, &reply_text, &item.candidate_id, None)
.await
{
Ok(posted) => {
let _ = storage::tweets::mark_tweet_replied(db, &item.candidate_id).await;
results.push(ProposeResult::Executed {
candidate_id: item.candidate_id.clone(),
reply_tweet_id: posted.id,
});
}
Err(e) => {
results.push(ProposeResult::Blocked {
candidate_id: item.candidate_id.clone(),
reason: format!("X API error: {e}"),
});
}
}
}
}
Ok(results)
}