mod poster;
mod queue;
mod tests;
pub use poster::parse_thread_content;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::storage::{self, DbPool};
use crate::x_api::XApiClient;
pub async fn run_approval_poster(
pool: DbPool,
x_client: Arc<dyn XApiClient>,
account_id: String,
min_delay: Duration,
max_delay: Duration,
cancel: CancellationToken,
) {
tracing::info!(account_id = %account_id, "Approval poster loop started");
let idle_interval = Duration::from_secs(15);
loop {
tokio::select! {
biased;
() = cancel.cancelled() => {
tracing::info!("Approval poster received cancellation");
break;
}
() = tokio::time::sleep(idle_interval) => {}
}
match storage::approval_queue::get_next_approved_for(&pool, &account_id).await {
Ok(Some(item)) => {
tracing::info!(
id = item.id,
account_id = %account_id,
action_type = %item.action_type,
"Posting approved item"
);
let media_paths: Vec<String> =
serde_json::from_str(&item.media_paths).unwrap_or_default();
let media_ids = if media_paths.is_empty() {
vec![]
} else {
match poster::upload_media(&*x_client, &media_paths).await {
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
id = item.id,
error = %e,
"Failed to upload media for approved item, posting without media"
);
vec![]
}
}
};
if item.action_type == "thread" {
match poster::post_thread_and_persist(
&pool,
&*x_client,
&account_id,
&item,
&media_ids,
)
.await
{
Ok(root_tweet_id) => {
tracing::info!(
id = item.id,
root_tweet_id = %root_tweet_id,
"Approved thread posted successfully"
);
if let Err(e) = storage::approval_queue::mark_posted_for(
&pool,
&account_id,
item.id,
&root_tweet_id,
)
.await
{
tracing::warn!(id = item.id, error = %e, "Failed to mark thread as posted");
}
let _ = storage::action_log::log_action_for(
&pool,
&account_id,
"thread_posted",
"success",
Some(&format!("Posted approved thread {}", item.id)),
None,
)
.await;
}
Err(e) => {
tracing::warn!(id = item.id, error = %e, "Failed to post approved thread");
let _ = storage::approval_queue::mark_failed_for(
&pool,
&account_id,
item.id,
&format!("Thread posting failed: {e}"),
)
.await;
let _ = storage::action_log::log_action_for(
&pool,
&account_id,
"thread_posted",
"error",
Some(&format!(
"Failed to post approved thread {}: {}",
item.id, e
)),
None,
)
.await;
}
}
} else {
let result = match item.action_type.as_str() {
"reply" if !item.target_tweet_id.is_empty() => {
poster::post_reply(
&*x_client,
&item.target_tweet_id,
&item.generated_content,
&media_ids,
)
.await
}
_ => {
poster::post_tweet(&*x_client, &item.generated_content, &media_ids)
.await
}
};
match result {
Ok(tweet_id) => {
tracing::info!(
id = item.id,
tweet_id = %tweet_id,
"Approved item posted successfully"
);
if let Err(e) = storage::approval_queue::mark_posted_for(
&pool,
&account_id,
item.id,
&tweet_id,
)
.await
{
tracing::warn!(
id = item.id,
error = %e,
"Failed to mark approved item as posted"
);
}
queue::propagate_provenance(&pool, &account_id, &item, &tweet_id).await;
queue::execute_loopback_for_provenance(
&pool,
&account_id,
&item,
&tweet_id,
)
.await;
let _ = storage::action_log::log_action_for(
&pool,
&account_id,
&format!("{}_posted", item.action_type),
"success",
Some(&format!("Posted approved item {}", item.id)),
None,
)
.await;
}
Err(e) => {
tracing::warn!(
id = item.id,
error = %e,
"Failed to post approved item"
);
let _ = storage::approval_queue::mark_failed_for(
&pool,
&account_id,
item.id,
&format!("Posting failed: {e}"),
)
.await;
let _ = storage::action_log::log_action_for(
&pool,
&account_id,
&format!("{}_posted", item.action_type),
"error",
Some(&format!("Failed to post approved item {}: {}", item.id, e)),
None,
)
.await;
}
}
}
let delay = queue::randomized_delay(min_delay, max_delay);
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
}
Ok(None) => {
}
Err(e) => {
tracing::warn!(error = %e, "Failed to query approved items");
}
}
}
tracing::info!(account_id = %account_id, "Approval poster loop stopped");
}