use super::{ApprovalItem, ApprovalRow, ApprovalStats, ReviewAction};
use crate::error::StorageError;
use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
use crate::storage::provenance::ProvenanceRef;
use crate::storage::DbPool;
const SELECT_COLS: &str = "id, action_type, target_tweet_id, target_author, \
generated_content, topic, archetype, score, status, created_at, \
COALESCE(media_paths, '[]') AS media_paths, reviewed_by, review_notes, reason, \
COALESCE(detected_risks, '[]') AS detected_risks, COALESCE(qa_report, '{}') AS qa_report, \
COALESCE(qa_hard_flags, '[]') AS qa_hard_flags, COALESCE(qa_soft_flags, '[]') AS qa_soft_flags, \
COALESCE(qa_recommendations, '[]') AS qa_recommendations, COALESCE(qa_score, 0) AS qa_score, \
COALESCE(qa_requires_override, 0) AS qa_requires_override, qa_override_by, qa_override_note, qa_override_at, \
source_node_id, source_seed_id, COALESCE(source_chunks_json, '[]') AS source_chunks_json, \
scheduled_for";
#[allow(clippy::too_many_arguments)]
pub async fn enqueue_for(
pool: &DbPool,
account_id: &str,
action_type: &str,
target_tweet_id: &str,
target_author: &str,
generated_content: &str,
topic: &str,
archetype: &str,
score: f64,
media_paths: &str,
) -> Result<i64, StorageError> {
enqueue_with_context_for(
pool,
account_id,
action_type,
target_tweet_id,
target_author,
generated_content,
topic,
archetype,
score,
media_paths,
None,
None,
None,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn enqueue(
pool: &DbPool,
action_type: &str,
target_tweet_id: &str,
target_author: &str,
generated_content: &str,
topic: &str,
archetype: &str,
score: f64,
media_paths: &str,
) -> Result<i64, StorageError> {
enqueue_for(
pool,
DEFAULT_ACCOUNT_ID,
action_type,
target_tweet_id,
target_author,
generated_content,
topic,
archetype,
score,
media_paths,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn enqueue_with_context_for(
pool: &DbPool,
account_id: &str,
action_type: &str,
target_tweet_id: &str,
target_author: &str,
generated_content: &str,
topic: &str,
archetype: &str,
score: f64,
media_paths: &str,
reason: Option<&str>,
detected_risks: Option<&str>,
scheduled_for: Option<&str>,
) -> Result<i64, StorageError> {
let result = sqlx::query(
"INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
generated_content, topic, archetype, score, media_paths, reason, detected_risks, \
scheduled_for) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(account_id)
.bind(action_type)
.bind(target_tweet_id)
.bind(target_author)
.bind(generated_content)
.bind(topic)
.bind(archetype)
.bind(score)
.bind(media_paths)
.bind(reason)
.bind(detected_risks.unwrap_or("[]"))
.bind(scheduled_for)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(result.last_insert_rowid())
}
#[allow(clippy::too_many_arguments)]
pub async fn enqueue_with_context(
pool: &DbPool,
action_type: &str,
target_tweet_id: &str,
target_author: &str,
generated_content: &str,
topic: &str,
archetype: &str,
score: f64,
media_paths: &str,
reason: Option<&str>,
detected_risks: Option<&str>,
) -> Result<i64, StorageError> {
enqueue_with_context_for(
pool,
DEFAULT_ACCOUNT_ID,
action_type,
target_tweet_id,
target_author,
generated_content,
topic,
archetype,
score,
media_paths,
reason,
detected_risks,
None,
)
.await
}
pub struct ProvenanceInput {
pub source_node_id: Option<i64>,
pub source_seed_id: Option<i64>,
pub source_chunks_json: String,
pub refs: Vec<ProvenanceRef>,
}
#[allow(clippy::too_many_arguments)]
pub async fn enqueue_with_provenance_for(
pool: &DbPool,
account_id: &str,
action_type: &str,
target_tweet_id: &str,
target_author: &str,
generated_content: &str,
topic: &str,
archetype: &str,
score: f64,
media_paths: &str,
reason: Option<&str>,
detected_risks: Option<&str>,
provenance: Option<&ProvenanceInput>,
scheduled_for: Option<&str>,
) -> Result<i64, StorageError> {
let (source_node_id, source_seed_id, source_chunks_json) = match provenance {
Some(p) => (
p.source_node_id,
p.source_seed_id,
p.source_chunks_json.as_str(),
),
None => (None, None, "[]"),
};
let result = sqlx::query(
"INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
generated_content, topic, archetype, score, media_paths, reason, detected_risks, \
source_node_id, source_seed_id, source_chunks_json, scheduled_for) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(account_id)
.bind(action_type)
.bind(target_tweet_id)
.bind(target_author)
.bind(generated_content)
.bind(topic)
.bind(archetype)
.bind(score)
.bind(media_paths)
.bind(reason)
.bind(detected_risks.unwrap_or("[]"))
.bind(source_node_id)
.bind(source_seed_id)
.bind(source_chunks_json)
.bind(scheduled_for)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
let id = result.last_insert_rowid();
if let Some(p) = provenance {
crate::storage::provenance::insert_links_for(
pool,
account_id,
"approval_queue",
id,
&p.refs,
)
.await?;
}
Ok(id)
}
pub async fn get_pending_for(
pool: &DbPool,
account_id: &str,
) -> Result<Vec<ApprovalItem>, StorageError> {
let sql = format!(
"SELECT {SELECT_COLS} FROM approval_queue \
WHERE status = 'pending' AND account_id = ? ORDER BY created_at ASC"
);
let rows: Vec<ApprovalRow> = sqlx::query_as(&sql)
.bind(account_id)
.fetch_all(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(rows.into_iter().map(ApprovalItem::from).collect())
}
pub async fn get_pending(pool: &DbPool) -> Result<Vec<ApprovalItem>, StorageError> {
get_pending_for(pool, DEFAULT_ACCOUNT_ID).await
}
pub async fn pending_count_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
let row: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM approval_queue WHERE status = 'pending' AND account_id = ?",
)
.bind(account_id)
.fetch_one(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(row.0)
}
pub async fn pending_count(pool: &DbPool) -> Result<i64, StorageError> {
pending_count_for(pool, DEFAULT_ACCOUNT_ID).await
}
pub async fn update_status_for(
pool: &DbPool,
account_id: &str,
id: i64,
status: &str,
) -> Result<(), StorageError> {
let result = sqlx::query(
"UPDATE approval_queue SET status = ?, \
reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
WHERE id = ? AND account_id = ? AND status = 'pending'",
)
.bind(status)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
if result.rows_affected() == 0 {
if let Some(item) = get_by_id_for(pool, account_id, id).await? {
return Err(StorageError::AlreadyReviewed {
id,
current_status: item.status,
});
}
}
Ok(())
}
pub async fn update_status(pool: &DbPool, id: i64, status: &str) -> Result<(), StorageError> {
update_status_for(pool, DEFAULT_ACCOUNT_ID, id, status).await
}
pub async fn update_status_with_review_for(
pool: &DbPool,
account_id: &str,
id: i64,
status: &str,
review: &ReviewAction,
) -> Result<(), StorageError> {
let result = sqlx::query(
"UPDATE approval_queue SET status = ?, \
reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), \
reviewed_by = ?, review_notes = ? \
WHERE id = ? AND account_id = ? AND status = 'pending'",
)
.bind(status)
.bind(&review.actor)
.bind(&review.notes)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
if result.rows_affected() == 0 {
if let Some(item) = get_by_id_for(pool, account_id, id).await? {
return Err(StorageError::AlreadyReviewed {
id,
current_status: item.status,
});
}
}
Ok(())
}
pub async fn update_status_with_review(
pool: &DbPool,
id: i64,
status: &str,
review: &ReviewAction,
) -> Result<(), StorageError> {
update_status_with_review_for(pool, DEFAULT_ACCOUNT_ID, id, status, review).await
}
pub async fn update_content_and_approve_for(
pool: &DbPool,
account_id: &str,
id: i64,
new_content: &str,
) -> Result<(), StorageError> {
let result = sqlx::query(
"UPDATE approval_queue SET generated_content = ?, status = 'approved', \
reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
WHERE id = ? AND account_id = ? AND status = 'pending'",
)
.bind(new_content)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
if result.rows_affected() == 0 {
if let Some(item) = get_by_id_for(pool, account_id, id).await? {
return Err(StorageError::AlreadyReviewed {
id,
current_status: item.status,
});
}
}
Ok(())
}
pub async fn update_content_and_approve(
pool: &DbPool,
id: i64,
new_content: &str,
) -> Result<(), StorageError> {
update_content_and_approve_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
}
pub async fn get_by_id_for(
pool: &DbPool,
account_id: &str,
id: i64,
) -> Result<Option<ApprovalItem>, StorageError> {
let sql = format!("SELECT {SELECT_COLS} FROM approval_queue WHERE id = ? AND account_id = ?");
let row: Option<ApprovalRow> = sqlx::query_as(&sql)
.bind(id)
.bind(account_id)
.fetch_optional(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(row.map(ApprovalItem::from))
}
pub async fn get_by_id(pool: &DbPool, id: i64) -> Result<Option<ApprovalItem>, StorageError> {
get_by_id_for(pool, DEFAULT_ACCOUNT_ID, id).await
}
pub async fn get_stats_for(pool: &DbPool, account_id: &str) -> Result<ApprovalStats, StorageError> {
let row: (i64, i64, i64, i64, i64) = sqlx::query_as(
"SELECT \
COALESCE(SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END), 0), \
COALESCE(SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END), 0), \
COALESCE(SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END), 0), \
COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0), \
COALESCE(SUM(CASE WHEN status = 'scheduled' THEN 1 ELSE 0 END), 0) \
FROM approval_queue WHERE account_id = ?",
)
.bind(account_id)
.fetch_one(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(ApprovalStats {
pending: row.0,
approved: row.1,
rejected: row.2,
failed: row.3,
scheduled: row.4,
})
}
pub async fn get_stats(pool: &DbPool) -> Result<ApprovalStats, StorageError> {
get_stats_for(pool, DEFAULT_ACCOUNT_ID).await
}
pub async fn get_by_statuses_for(
pool: &DbPool,
account_id: &str,
statuses: &[&str],
action_type: Option<&str>,
) -> Result<Vec<ApprovalItem>, StorageError> {
if statuses.is_empty() {
return Ok(Vec::new());
}
let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
let in_clause = placeholders.join(", ");
let query = if let Some(at) = action_type {
let sql = format!(
"SELECT {SELECT_COLS} FROM approval_queue \
WHERE account_id = ? AND status IN ({in_clause}) AND action_type = ? \
ORDER BY created_at ASC"
);
let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
q = q.bind(account_id);
for s in statuses {
q = q.bind(*s);
}
q = q.bind(at);
q.fetch_all(pool).await
} else {
let sql = format!(
"SELECT {SELECT_COLS} FROM approval_queue \
WHERE account_id = ? AND status IN ({in_clause}) \
ORDER BY created_at ASC"
);
let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
q = q.bind(account_id);
for s in statuses {
q = q.bind(*s);
}
q.fetch_all(pool).await
};
let rows = query.map_err(|e| StorageError::Query { source: e })?;
Ok(rows.into_iter().map(ApprovalItem::from).collect())
}
pub async fn get_by_statuses(
pool: &DbPool,
statuses: &[&str],
action_type: Option<&str>,
) -> Result<Vec<ApprovalItem>, StorageError> {
get_by_statuses_for(pool, DEFAULT_ACCOUNT_ID, statuses, action_type).await
}
pub async fn get_filtered_for(
pool: &DbPool,
account_id: &str,
statuses: &[&str],
action_type: Option<&str>,
reviewed_by: Option<&str>,
since: Option<&str>,
) -> Result<Vec<ApprovalItem>, StorageError> {
if statuses.is_empty() {
return Ok(Vec::new());
}
let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
let in_clause = placeholders.join(", ");
let mut sql = format!(
"SELECT {SELECT_COLS} FROM approval_queue \
WHERE account_id = ? AND status IN ({in_clause})"
);
if action_type.is_some() {
sql.push_str(" AND action_type = ?");
}
if reviewed_by.is_some() {
sql.push_str(" AND reviewed_by = ?");
}
if since.is_some() {
sql.push_str(" AND created_at >= ?");
}
sql.push_str(" ORDER BY created_at ASC");
let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
q = q.bind(account_id);
for s in statuses {
q = q.bind(*s);
}
if let Some(at) = action_type {
q = q.bind(at);
}
if let Some(rb) = reviewed_by {
q = q.bind(rb);
}
if let Some(s) = since {
q = q.bind(s);
}
let rows = q
.fetch_all(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(rows.into_iter().map(ApprovalItem::from).collect())
}
pub async fn get_filtered(
pool: &DbPool,
statuses: &[&str],
action_type: Option<&str>,
reviewed_by: Option<&str>,
since: Option<&str>,
) -> Result<Vec<ApprovalItem>, StorageError> {
get_filtered_for(
pool,
DEFAULT_ACCOUNT_ID,
statuses,
action_type,
reviewed_by,
since,
)
.await
}
pub async fn update_content_for(
pool: &DbPool,
account_id: &str,
id: i64,
new_content: &str,
) -> Result<(), StorageError> {
sqlx::query("UPDATE approval_queue SET generated_content = ? WHERE id = ? AND account_id = ?")
.bind(new_content)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn update_content(pool: &DbPool, id: i64, new_content: &str) -> Result<(), StorageError> {
update_content_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
}
pub async fn update_media_paths_for(
pool: &DbPool,
account_id: &str,
id: i64,
media_paths: &str,
) -> Result<(), StorageError> {
sqlx::query("UPDATE approval_queue SET media_paths = ? WHERE id = ? AND account_id = ?")
.bind(media_paths)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn update_media_paths(
pool: &DbPool,
id: i64,
media_paths: &str,
) -> Result<(), StorageError> {
update_media_paths_for(pool, DEFAULT_ACCOUNT_ID, id, media_paths).await
}
#[allow(clippy::too_many_arguments)]
pub async fn update_qa_fields_for(
pool: &DbPool,
account_id: &str,
id: i64,
qa_report: &str,
qa_hard_flags: &str,
qa_soft_flags: &str,
qa_recommendations: &str,
qa_score: f64,
qa_requires_override: bool,
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE approval_queue SET qa_report = ?, qa_hard_flags = ?, qa_soft_flags = ?, \
qa_recommendations = ?, qa_score = ?, qa_requires_override = ? \
WHERE id = ? AND account_id = ?",
)
.bind(qa_report)
.bind(qa_hard_flags)
.bind(qa_soft_flags)
.bind(qa_recommendations)
.bind(qa_score)
.bind(if qa_requires_override { 1 } else { 0 })
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn update_qa_fields(
pool: &DbPool,
id: i64,
qa_report: &str,
qa_hard_flags: &str,
qa_soft_flags: &str,
qa_recommendations: &str,
qa_score: f64,
qa_requires_override: bool,
) -> Result<(), StorageError> {
update_qa_fields_for(
pool,
DEFAULT_ACCOUNT_ID,
id,
qa_report,
qa_hard_flags,
qa_soft_flags,
qa_recommendations,
qa_score,
qa_requires_override,
)
.await
}
pub async fn set_qa_override_for(
pool: &DbPool,
account_id: &str,
id: i64,
actor: &str,
note: &str,
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE approval_queue SET qa_override_by = ?, qa_override_note = ?, \
qa_override_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
WHERE id = ? AND account_id = ?",
)
.bind(actor)
.bind(note)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn set_qa_override(
pool: &DbPool,
id: i64,
actor: &str,
note: &str,
) -> Result<(), StorageError> {
set_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id, actor, note).await
}
pub async fn clear_qa_override_for(
pool: &DbPool,
account_id: &str,
id: i64,
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE approval_queue SET qa_override_by = NULL, qa_override_note = NULL, \
qa_override_at = NULL WHERE id = ? AND account_id = ?",
)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn clear_qa_override(pool: &DbPool, id: i64) -> Result<(), StorageError> {
clear_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id).await
}
pub async fn get_next_approved_for(
pool: &DbPool,
account_id: &str,
) -> Result<Option<ApprovalItem>, StorageError> {
let sql = format!(
"SELECT {SELECT_COLS} FROM approval_queue \
WHERE status = 'approved' AND account_id = ? ORDER BY reviewed_at ASC LIMIT 1"
);
let row: Option<ApprovalRow> = sqlx::query_as(&sql)
.bind(account_id)
.fetch_optional(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(row.map(ApprovalItem::from))
}
pub async fn get_next_approved(pool: &DbPool) -> Result<Option<ApprovalItem>, StorageError> {
get_next_approved_for(pool, DEFAULT_ACCOUNT_ID).await
}
pub async fn mark_posted_for(
pool: &DbPool,
account_id: &str,
id: i64,
tweet_id: &str,
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE approval_queue SET status = 'posted', posted_tweet_id = ? \
WHERE id = ? AND account_id = ?",
)
.bind(tweet_id)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn mark_posted(pool: &DbPool, id: i64, tweet_id: &str) -> Result<(), StorageError> {
mark_posted_for(pool, DEFAULT_ACCOUNT_ID, id, tweet_id).await
}
pub async fn mark_failed_for(
pool: &DbPool,
account_id: &str,
id: i64,
error_message: &str,
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE approval_queue SET status = 'failed', review_notes = ? \
WHERE id = ? AND account_id = ? AND status = 'approved'",
)
.bind(error_message)
.bind(id)
.bind(account_id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn mark_failed(pool: &DbPool, id: i64, error_message: &str) -> Result<(), StorageError> {
mark_failed_for(pool, DEFAULT_ACCOUNT_ID, id, error_message).await
}
pub async fn expire_old_items_for(
pool: &DbPool,
account_id: &str,
hours: u32,
) -> Result<u64, StorageError> {
let result = sqlx::query(
"UPDATE approval_queue SET status = 'expired', \
reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
WHERE status = 'pending' AND account_id = ? \
AND created_at < strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?)",
)
.bind(account_id)
.bind(format!("-{hours} hours"))
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(result.rows_affected())
}
pub async fn expire_old_items(pool: &DbPool, hours: u32) -> Result<u64, StorageError> {
expire_old_items_for(pool, DEFAULT_ACCOUNT_ID, hours).await
}
pub async fn batch_approve_for(
pool: &DbPool,
account_id: &str,
max_batch: usize,
review: &ReviewAction,
) -> Result<Vec<i64>, StorageError> {
let pending = get_pending_for(pool, account_id).await?;
let to_approve: Vec<&ApprovalItem> = pending.iter().take(max_batch).collect();
let mut approved_ids = Vec::with_capacity(to_approve.len());
for item in to_approve {
update_status_with_review_for(pool, account_id, item.id, "approved", review).await?;
approved_ids.push(item.id);
}
Ok(approved_ids)
}
pub async fn batch_approve(
pool: &DbPool,
max_batch: usize,
review: &ReviewAction,
) -> Result<Vec<i64>, StorageError> {
batch_approve_for(pool, DEFAULT_ACCOUNT_ID, max_batch, review).await
}