Skip to main content

tuitbot_core/storage/approval_queue/
queries.rs

1//! Query functions for the approval queue.
2
3use super::{ApprovalItem, ApprovalRow, ApprovalStats, ReviewAction};
4use crate::error::StorageError;
5use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
6use crate::storage::provenance::ProvenanceRef;
7use crate::storage::DbPool;
8
9/// Standard SELECT columns for approval queue queries.
10const SELECT_COLS: &str = "id, action_type, target_tweet_id, target_author, \
11    generated_content, topic, archetype, score, status, created_at, \
12    COALESCE(media_paths, '[]') AS media_paths, reviewed_by, review_notes, reason, \
13    COALESCE(detected_risks, '[]') AS detected_risks, COALESCE(qa_report, '{}') AS qa_report, \
14    COALESCE(qa_hard_flags, '[]') AS qa_hard_flags, COALESCE(qa_soft_flags, '[]') AS qa_soft_flags, \
15    COALESCE(qa_recommendations, '[]') AS qa_recommendations, COALESCE(qa_score, 0) AS qa_score, \
16    COALESCE(qa_requires_override, 0) AS qa_requires_override, qa_override_by, qa_override_note, qa_override_at, \
17    source_node_id, source_seed_id, COALESCE(source_chunks_json, '[]') AS source_chunks_json, \
18    scheduled_for";
19
20/// Insert a new item into the approval queue for a specific account.
21#[allow(clippy::too_many_arguments)]
22pub async fn enqueue_for(
23    pool: &DbPool,
24    account_id: &str,
25    action_type: &str,
26    target_tweet_id: &str,
27    target_author: &str,
28    generated_content: &str,
29    topic: &str,
30    archetype: &str,
31    score: f64,
32    media_paths: &str,
33) -> Result<i64, StorageError> {
34    enqueue_with_context_for(
35        pool,
36        account_id,
37        action_type,
38        target_tweet_id,
39        target_author,
40        generated_content,
41        topic,
42        archetype,
43        score,
44        media_paths,
45        None,
46        None,
47        None,
48    )
49    .await
50}
51
52/// Insert a new item into the approval queue.
53#[allow(clippy::too_many_arguments)]
54pub async fn enqueue(
55    pool: &DbPool,
56    action_type: &str,
57    target_tweet_id: &str,
58    target_author: &str,
59    generated_content: &str,
60    topic: &str,
61    archetype: &str,
62    score: f64,
63    media_paths: &str,
64) -> Result<i64, StorageError> {
65    enqueue_for(
66        pool,
67        DEFAULT_ACCOUNT_ID,
68        action_type,
69        target_tweet_id,
70        target_author,
71        generated_content,
72        topic,
73        archetype,
74        score,
75        media_paths,
76    )
77    .await
78}
79
80/// Insert a new item into the approval queue with optional reason, risks, and scheduling intent
81/// for a specific account.
82#[allow(clippy::too_many_arguments)]
83pub async fn enqueue_with_context_for(
84    pool: &DbPool,
85    account_id: &str,
86    action_type: &str,
87    target_tweet_id: &str,
88    target_author: &str,
89    generated_content: &str,
90    topic: &str,
91    archetype: &str,
92    score: f64,
93    media_paths: &str,
94    reason: Option<&str>,
95    detected_risks: Option<&str>,
96    scheduled_for: Option<&str>,
97) -> Result<i64, StorageError> {
98    let result = sqlx::query(
99        "INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
100         generated_content, topic, archetype, score, media_paths, reason, detected_risks, \
101         scheduled_for) \
102         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
103    )
104    .bind(account_id)
105    .bind(action_type)
106    .bind(target_tweet_id)
107    .bind(target_author)
108    .bind(generated_content)
109    .bind(topic)
110    .bind(archetype)
111    .bind(score)
112    .bind(media_paths)
113    .bind(reason)
114    .bind(detected_risks.unwrap_or("[]"))
115    .bind(scheduled_for)
116    .execute(pool)
117    .await
118    .map_err(|e| StorageError::Query { source: e })?;
119
120    Ok(result.last_insert_rowid())
121}
122
123/// Insert a new item into the approval queue with optional reason and risks.
124#[allow(clippy::too_many_arguments)]
125pub async fn enqueue_with_context(
126    pool: &DbPool,
127    action_type: &str,
128    target_tweet_id: &str,
129    target_author: &str,
130    generated_content: &str,
131    topic: &str,
132    archetype: &str,
133    score: f64,
134    media_paths: &str,
135    reason: Option<&str>,
136    detected_risks: Option<&str>,
137) -> Result<i64, StorageError> {
138    enqueue_with_context_for(
139        pool,
140        DEFAULT_ACCOUNT_ID,
141        action_type,
142        target_tweet_id,
143        target_author,
144        generated_content,
145        topic,
146        archetype,
147        score,
148        media_paths,
149        reason,
150        detected_risks,
151        None,
152    )
153    .await
154}
155
156/// Bundled provenance input for `enqueue_with_provenance_for`.
157pub struct ProvenanceInput {
158    pub source_node_id: Option<i64>,
159    pub source_seed_id: Option<i64>,
160    pub source_chunks_json: String,
161    pub refs: Vec<ProvenanceRef>,
162}
163
164/// Insert a new item into the approval queue with provenance for a specific account.
165///
166/// Populates the inline provenance columns (`source_node_id`, `source_seed_id`,
167/// `source_chunks_json`) and inserts rows into `vault_provenance_links`.
168#[allow(clippy::too_many_arguments)]
169pub async fn enqueue_with_provenance_for(
170    pool: &DbPool,
171    account_id: &str,
172    action_type: &str,
173    target_tweet_id: &str,
174    target_author: &str,
175    generated_content: &str,
176    topic: &str,
177    archetype: &str,
178    score: f64,
179    media_paths: &str,
180    reason: Option<&str>,
181    detected_risks: Option<&str>,
182    provenance: Option<&ProvenanceInput>,
183    scheduled_for: Option<&str>,
184) -> Result<i64, StorageError> {
185    let (source_node_id, source_seed_id, source_chunks_json) = match provenance {
186        Some(p) => (
187            p.source_node_id,
188            p.source_seed_id,
189            p.source_chunks_json.as_str(),
190        ),
191        None => (None, None, "[]"),
192    };
193
194    let result = sqlx::query(
195        "INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
196         generated_content, topic, archetype, score, media_paths, reason, detected_risks, \
197         source_node_id, source_seed_id, source_chunks_json, scheduled_for) \
198         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
199    )
200    .bind(account_id)
201    .bind(action_type)
202    .bind(target_tweet_id)
203    .bind(target_author)
204    .bind(generated_content)
205    .bind(topic)
206    .bind(archetype)
207    .bind(score)
208    .bind(media_paths)
209    .bind(reason)
210    .bind(detected_risks.unwrap_or("[]"))
211    .bind(source_node_id)
212    .bind(source_seed_id)
213    .bind(source_chunks_json)
214    .bind(scheduled_for)
215    .execute(pool)
216    .await
217    .map_err(|e| StorageError::Query { source: e })?;
218
219    let id = result.last_insert_rowid();
220
221    // Insert provenance link rows.
222    if let Some(p) = provenance {
223        crate::storage::provenance::insert_links_for(
224            pool,
225            account_id,
226            "approval_queue",
227            id,
228            &p.refs,
229        )
230        .await?;
231    }
232
233    Ok(id)
234}
235
236/// Get all pending approval items for a specific account, ordered by creation time (oldest first).
237pub async fn get_pending_for(
238    pool: &DbPool,
239    account_id: &str,
240) -> Result<Vec<ApprovalItem>, StorageError> {
241    let sql = format!(
242        "SELECT {SELECT_COLS} FROM approval_queue \
243         WHERE status = 'pending' AND account_id = ? ORDER BY created_at ASC"
244    );
245    let rows: Vec<ApprovalRow> = sqlx::query_as(&sql)
246        .bind(account_id)
247        .fetch_all(pool)
248        .await
249        .map_err(|e| StorageError::Query { source: e })?;
250
251    Ok(rows.into_iter().map(ApprovalItem::from).collect())
252}
253
254/// Get all pending approval items, ordered by creation time (oldest first).
255pub async fn get_pending(pool: &DbPool) -> Result<Vec<ApprovalItem>, StorageError> {
256    get_pending_for(pool, DEFAULT_ACCOUNT_ID).await
257}
258
259/// Get the count of pending items for a specific account.
260pub async fn pending_count_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
261    let row: (i64,) = sqlx::query_as(
262        "SELECT COUNT(*) FROM approval_queue WHERE status = 'pending' AND account_id = ?",
263    )
264    .bind(account_id)
265    .fetch_one(pool)
266    .await
267    .map_err(|e| StorageError::Query { source: e })?;
268
269    Ok(row.0)
270}
271
272/// Get the count of pending items.
273pub async fn pending_count(pool: &DbPool) -> Result<i64, StorageError> {
274    pending_count_for(pool, DEFAULT_ACCOUNT_ID).await
275}
276
277/// Update the status of an approval item for a specific account.
278///
279/// Only items with `status = 'pending'` can be reviewed. Returns
280/// `StorageError::AlreadyReviewed` if the item has already left pending.
281pub async fn update_status_for(
282    pool: &DbPool,
283    account_id: &str,
284    id: i64,
285    status: &str,
286) -> Result<(), StorageError> {
287    let result = sqlx::query(
288        "UPDATE approval_queue SET status = ?, \
289         reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
290         WHERE id = ? AND account_id = ? AND status = 'pending'",
291    )
292    .bind(status)
293    .bind(id)
294    .bind(account_id)
295    .execute(pool)
296    .await
297    .map_err(|e| StorageError::Query { source: e })?;
298
299    if result.rows_affected() == 0 {
300        if let Some(item) = get_by_id_for(pool, account_id, id).await? {
301            return Err(StorageError::AlreadyReviewed {
302                id,
303                current_status: item.status,
304            });
305        }
306    }
307
308    Ok(())
309}
310
311/// Update the status of an approval item.
312pub async fn update_status(pool: &DbPool, id: i64, status: &str) -> Result<(), StorageError> {
313    update_status_for(pool, DEFAULT_ACCOUNT_ID, id, status).await
314}
315
316/// Update the status of an approval item with review metadata for a specific account.
317///
318/// Only items with `status = 'pending'` can be reviewed. Returns
319/// `StorageError::AlreadyReviewed` if the item has already left pending.
320pub async fn update_status_with_review_for(
321    pool: &DbPool,
322    account_id: &str,
323    id: i64,
324    status: &str,
325    review: &ReviewAction,
326) -> Result<(), StorageError> {
327    let result = sqlx::query(
328        "UPDATE approval_queue SET status = ?, \
329         reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), \
330         reviewed_by = ?, review_notes = ? \
331         WHERE id = ? AND account_id = ? AND status = 'pending'",
332    )
333    .bind(status)
334    .bind(&review.actor)
335    .bind(&review.notes)
336    .bind(id)
337    .bind(account_id)
338    .execute(pool)
339    .await
340    .map_err(|e| StorageError::Query { source: e })?;
341
342    if result.rows_affected() == 0 {
343        if let Some(item) = get_by_id_for(pool, account_id, id).await? {
344            return Err(StorageError::AlreadyReviewed {
345                id,
346                current_status: item.status,
347            });
348        }
349    }
350
351    Ok(())
352}
353
354/// Update the status of an approval item with review metadata.
355pub async fn update_status_with_review(
356    pool: &DbPool,
357    id: i64,
358    status: &str,
359    review: &ReviewAction,
360) -> Result<(), StorageError> {
361    update_status_with_review_for(pool, DEFAULT_ACCOUNT_ID, id, status, review).await
362}
363
364/// Update the content and status of an approval item for a specific account (for edit-then-approve).
365///
366/// Only items with `status = 'pending'` can be approved. Returns
367/// `StorageError::AlreadyReviewed` if the item has already left pending.
368pub async fn update_content_and_approve_for(
369    pool: &DbPool,
370    account_id: &str,
371    id: i64,
372    new_content: &str,
373) -> Result<(), StorageError> {
374    let result = sqlx::query(
375        "UPDATE approval_queue SET generated_content = ?, status = 'approved', \
376         reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
377         WHERE id = ? AND account_id = ? AND status = 'pending'",
378    )
379    .bind(new_content)
380    .bind(id)
381    .bind(account_id)
382    .execute(pool)
383    .await
384    .map_err(|e| StorageError::Query { source: e })?;
385
386    if result.rows_affected() == 0 {
387        if let Some(item) = get_by_id_for(pool, account_id, id).await? {
388            return Err(StorageError::AlreadyReviewed {
389                id,
390                current_status: item.status,
391            });
392        }
393    }
394
395    Ok(())
396}
397
398/// Update the content and status of an approval item (for edit-then-approve).
399pub async fn update_content_and_approve(
400    pool: &DbPool,
401    id: i64,
402    new_content: &str,
403) -> Result<(), StorageError> {
404    update_content_and_approve_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
405}
406
407/// Get a single approval item by ID for a specific account.
408pub async fn get_by_id_for(
409    pool: &DbPool,
410    account_id: &str,
411    id: i64,
412) -> Result<Option<ApprovalItem>, StorageError> {
413    let sql = format!("SELECT {SELECT_COLS} FROM approval_queue WHERE id = ? AND account_id = ?");
414    let row: Option<ApprovalRow> = sqlx::query_as(&sql)
415        .bind(id)
416        .bind(account_id)
417        .fetch_optional(pool)
418        .await
419        .map_err(|e| StorageError::Query { source: e })?;
420
421    Ok(row.map(ApprovalItem::from))
422}
423
424/// Get a single approval item by ID.
425pub async fn get_by_id(pool: &DbPool, id: i64) -> Result<Option<ApprovalItem>, StorageError> {
426    get_by_id_for(pool, DEFAULT_ACCOUNT_ID, id).await
427}
428
429/// Get counts of items grouped by status for a specific account.
430pub async fn get_stats_for(pool: &DbPool, account_id: &str) -> Result<ApprovalStats, StorageError> {
431    let row: (i64, i64, i64, i64, i64) = sqlx::query_as(
432        "SELECT \
433            COALESCE(SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END), 0), \
434            COALESCE(SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END), 0), \
435            COALESCE(SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END), 0), \
436            COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0), \
437            COALESCE(SUM(CASE WHEN status = 'scheduled' THEN 1 ELSE 0 END), 0) \
438         FROM approval_queue WHERE account_id = ?",
439    )
440    .bind(account_id)
441    .fetch_one(pool)
442    .await
443    .map_err(|e| StorageError::Query { source: e })?;
444
445    Ok(ApprovalStats {
446        pending: row.0,
447        approved: row.1,
448        rejected: row.2,
449        failed: row.3,
450        scheduled: row.4,
451    })
452}
453
454/// Get counts of items grouped by status.
455pub async fn get_stats(pool: &DbPool) -> Result<ApprovalStats, StorageError> {
456    get_stats_for(pool, DEFAULT_ACCOUNT_ID).await
457}
458
459/// Get approval items filtered by one or more statuses for a specific account,
460/// with optional action type filter.
461pub async fn get_by_statuses_for(
462    pool: &DbPool,
463    account_id: &str,
464    statuses: &[&str],
465    action_type: Option<&str>,
466) -> Result<Vec<ApprovalItem>, StorageError> {
467    if statuses.is_empty() {
468        return Ok(Vec::new());
469    }
470
471    let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
472    let in_clause = placeholders.join(", ");
473
474    let query = if let Some(at) = action_type {
475        let sql = format!(
476            "SELECT {SELECT_COLS} FROM approval_queue \
477             WHERE account_id = ? AND status IN ({in_clause}) AND action_type = ? \
478             ORDER BY created_at ASC"
479        );
480        let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
481        q = q.bind(account_id);
482        for s in statuses {
483            q = q.bind(*s);
484        }
485        q = q.bind(at);
486        q.fetch_all(pool).await
487    } else {
488        let sql = format!(
489            "SELECT {SELECT_COLS} FROM approval_queue \
490             WHERE account_id = ? AND status IN ({in_clause}) \
491             ORDER BY created_at ASC"
492        );
493        let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
494        q = q.bind(account_id);
495        for s in statuses {
496            q = q.bind(*s);
497        }
498        q.fetch_all(pool).await
499    };
500
501    let rows = query.map_err(|e| StorageError::Query { source: e })?;
502    Ok(rows.into_iter().map(ApprovalItem::from).collect())
503}
504
505/// Get approval items filtered by one or more statuses, with optional action type filter.
506pub async fn get_by_statuses(
507    pool: &DbPool,
508    statuses: &[&str],
509    action_type: Option<&str>,
510) -> Result<Vec<ApprovalItem>, StorageError> {
511    get_by_statuses_for(pool, DEFAULT_ACCOUNT_ID, statuses, action_type).await
512}
513
514/// Get approval items with optional filters for a specific account.
515pub async fn get_filtered_for(
516    pool: &DbPool,
517    account_id: &str,
518    statuses: &[&str],
519    action_type: Option<&str>,
520    reviewed_by: Option<&str>,
521    since: Option<&str>,
522) -> Result<Vec<ApprovalItem>, StorageError> {
523    if statuses.is_empty() {
524        return Ok(Vec::new());
525    }
526
527    let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
528    let in_clause = placeholders.join(", ");
529
530    let mut sql = format!(
531        "SELECT {SELECT_COLS} FROM approval_queue \
532         WHERE account_id = ? AND status IN ({in_clause})"
533    );
534    if action_type.is_some() {
535        sql.push_str(" AND action_type = ?");
536    }
537    if reviewed_by.is_some() {
538        sql.push_str(" AND reviewed_by = ?");
539    }
540    if since.is_some() {
541        sql.push_str(" AND created_at >= ?");
542    }
543    sql.push_str(" ORDER BY created_at ASC");
544
545    let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
546    q = q.bind(account_id);
547    for s in statuses {
548        q = q.bind(*s);
549    }
550    if let Some(at) = action_type {
551        q = q.bind(at);
552    }
553    if let Some(rb) = reviewed_by {
554        q = q.bind(rb);
555    }
556    if let Some(s) = since {
557        q = q.bind(s);
558    }
559
560    let rows = q
561        .fetch_all(pool)
562        .await
563        .map_err(|e| StorageError::Query { source: e })?;
564    Ok(rows.into_iter().map(ApprovalItem::from).collect())
565}
566
567/// Get approval items with optional filters for reviewer, date range, statuses, and action type.
568pub async fn get_filtered(
569    pool: &DbPool,
570    statuses: &[&str],
571    action_type: Option<&str>,
572    reviewed_by: Option<&str>,
573    since: Option<&str>,
574) -> Result<Vec<ApprovalItem>, StorageError> {
575    get_filtered_for(
576        pool,
577        DEFAULT_ACCOUNT_ID,
578        statuses,
579        action_type,
580        reviewed_by,
581        since,
582    )
583    .await
584}
585
586/// Update the generated content of an item for a specific account without changing its status.
587pub async fn update_content_for(
588    pool: &DbPool,
589    account_id: &str,
590    id: i64,
591    new_content: &str,
592) -> Result<(), StorageError> {
593    sqlx::query("UPDATE approval_queue SET generated_content = ? WHERE id = ? AND account_id = ?")
594        .bind(new_content)
595        .bind(id)
596        .bind(account_id)
597        .execute(pool)
598        .await
599        .map_err(|e| StorageError::Query { source: e })?;
600
601    Ok(())
602}
603
604/// Update the generated content of an item without changing its status.
605pub async fn update_content(pool: &DbPool, id: i64, new_content: &str) -> Result<(), StorageError> {
606    update_content_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
607}
608
609/// Update the media paths of an approval item for a specific account.
610pub async fn update_media_paths_for(
611    pool: &DbPool,
612    account_id: &str,
613    id: i64,
614    media_paths: &str,
615) -> Result<(), StorageError> {
616    sqlx::query("UPDATE approval_queue SET media_paths = ? WHERE id = ? AND account_id = ?")
617        .bind(media_paths)
618        .bind(id)
619        .bind(account_id)
620        .execute(pool)
621        .await
622        .map_err(|e| StorageError::Query { source: e })?;
623
624    Ok(())
625}
626
627/// Update the media paths of an approval item.
628pub async fn update_media_paths(
629    pool: &DbPool,
630    id: i64,
631    media_paths: &str,
632) -> Result<(), StorageError> {
633    update_media_paths_for(pool, DEFAULT_ACCOUNT_ID, id, media_paths).await
634}
635
636/// Update QA fields for an approval item for a specific account.
637#[allow(clippy::too_many_arguments)]
638pub async fn update_qa_fields_for(
639    pool: &DbPool,
640    account_id: &str,
641    id: i64,
642    qa_report: &str,
643    qa_hard_flags: &str,
644    qa_soft_flags: &str,
645    qa_recommendations: &str,
646    qa_score: f64,
647    qa_requires_override: bool,
648) -> Result<(), StorageError> {
649    sqlx::query(
650        "UPDATE approval_queue SET qa_report = ?, qa_hard_flags = ?, qa_soft_flags = ?, \
651         qa_recommendations = ?, qa_score = ?, qa_requires_override = ? \
652         WHERE id = ? AND account_id = ?",
653    )
654    .bind(qa_report)
655    .bind(qa_hard_flags)
656    .bind(qa_soft_flags)
657    .bind(qa_recommendations)
658    .bind(qa_score)
659    .bind(if qa_requires_override { 1 } else { 0 })
660    .bind(id)
661    .bind(account_id)
662    .execute(pool)
663    .await
664    .map_err(|e| StorageError::Query { source: e })?;
665
666    Ok(())
667}
668
669/// Update QA fields for an approval item.
670#[allow(clippy::too_many_arguments)]
671pub async fn update_qa_fields(
672    pool: &DbPool,
673    id: i64,
674    qa_report: &str,
675    qa_hard_flags: &str,
676    qa_soft_flags: &str,
677    qa_recommendations: &str,
678    qa_score: f64,
679    qa_requires_override: bool,
680) -> Result<(), StorageError> {
681    update_qa_fields_for(
682        pool,
683        DEFAULT_ACCOUNT_ID,
684        id,
685        qa_report,
686        qa_hard_flags,
687        qa_soft_flags,
688        qa_recommendations,
689        qa_score,
690        qa_requires_override,
691    )
692    .await
693}
694
695/// Record an explicit QA override action for a specific account.
696pub async fn set_qa_override_for(
697    pool: &DbPool,
698    account_id: &str,
699    id: i64,
700    actor: &str,
701    note: &str,
702) -> Result<(), StorageError> {
703    sqlx::query(
704        "UPDATE approval_queue SET qa_override_by = ?, qa_override_note = ?, \
705         qa_override_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
706         WHERE id = ? AND account_id = ?",
707    )
708    .bind(actor)
709    .bind(note)
710    .bind(id)
711    .bind(account_id)
712    .execute(pool)
713    .await
714    .map_err(|e| StorageError::Query { source: e })?;
715
716    Ok(())
717}
718
719/// Record an explicit QA override action.
720pub async fn set_qa_override(
721    pool: &DbPool,
722    id: i64,
723    actor: &str,
724    note: &str,
725) -> Result<(), StorageError> {
726    set_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id, actor, note).await
727}
728
729/// Clear QA override metadata for a specific account (used when content changes and QA is re-run).
730pub async fn clear_qa_override_for(
731    pool: &DbPool,
732    account_id: &str,
733    id: i64,
734) -> Result<(), StorageError> {
735    sqlx::query(
736        "UPDATE approval_queue SET qa_override_by = NULL, qa_override_note = NULL, \
737         qa_override_at = NULL WHERE id = ? AND account_id = ?",
738    )
739    .bind(id)
740    .bind(account_id)
741    .execute(pool)
742    .await
743    .map_err(|e| StorageError::Query { source: e })?;
744
745    Ok(())
746}
747
748/// Clear QA override metadata (used when content changes and QA is re-run).
749pub async fn clear_qa_override(pool: &DbPool, id: i64) -> Result<(), StorageError> {
750    clear_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id).await
751}
752
753/// Fetch the next approved item ready for posting for a specific account.
754pub async fn get_next_approved_for(
755    pool: &DbPool,
756    account_id: &str,
757) -> Result<Option<ApprovalItem>, StorageError> {
758    let sql = format!(
759        "SELECT {SELECT_COLS} FROM approval_queue \
760         WHERE status = 'approved' AND account_id = ? ORDER BY reviewed_at ASC LIMIT 1"
761    );
762    let row: Option<ApprovalRow> = sqlx::query_as(&sql)
763        .bind(account_id)
764        .fetch_optional(pool)
765        .await
766        .map_err(|e| StorageError::Query { source: e })?;
767
768    Ok(row.map(ApprovalItem::from))
769}
770
771/// Fetch the next approved item ready for posting.
772pub async fn get_next_approved(pool: &DbPool) -> Result<Option<ApprovalItem>, StorageError> {
773    get_next_approved_for(pool, DEFAULT_ACCOUNT_ID).await
774}
775
776/// Mark an approved item as posted for a specific account, storing the returned tweet ID.
777pub async fn mark_posted_for(
778    pool: &DbPool,
779    account_id: &str,
780    id: i64,
781    tweet_id: &str,
782) -> Result<(), StorageError> {
783    sqlx::query(
784        "UPDATE approval_queue SET status = 'posted', posted_tweet_id = ? \
785         WHERE id = ? AND account_id = ?",
786    )
787    .bind(tweet_id)
788    .bind(id)
789    .bind(account_id)
790    .execute(pool)
791    .await
792    .map_err(|e| StorageError::Query { source: e })?;
793
794    Ok(())
795}
796
797/// Mark an approved item as posted, storing the returned tweet ID.
798pub async fn mark_posted(pool: &DbPool, id: i64, tweet_id: &str) -> Result<(), StorageError> {
799    mark_posted_for(pool, DEFAULT_ACCOUNT_ID, id, tweet_id).await
800}
801
802/// Mark an approved item as failed for a specific account, storing the error message.
803pub async fn mark_failed_for(
804    pool: &DbPool,
805    account_id: &str,
806    id: i64,
807    error_message: &str,
808) -> Result<(), StorageError> {
809    sqlx::query(
810        "UPDATE approval_queue SET status = 'failed', review_notes = ? \
811         WHERE id = ? AND account_id = ? AND status = 'approved'",
812    )
813    .bind(error_message)
814    .bind(id)
815    .bind(account_id)
816    .execute(pool)
817    .await
818    .map_err(|e| StorageError::Query { source: e })?;
819
820    Ok(())
821}
822
823/// Mark an approved item as failed, storing the error message.
824pub async fn mark_failed(pool: &DbPool, id: i64, error_message: &str) -> Result<(), StorageError> {
825    mark_failed_for(pool, DEFAULT_ACCOUNT_ID, id, error_message).await
826}
827
828/// Expire old pending items for a specific account (older than the specified hours).
829pub async fn expire_old_items_for(
830    pool: &DbPool,
831    account_id: &str,
832    hours: u32,
833) -> Result<u64, StorageError> {
834    let result = sqlx::query(
835        "UPDATE approval_queue SET status = 'expired', \
836         reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
837         WHERE status = 'pending' AND account_id = ? \
838         AND created_at < strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?)",
839    )
840    .bind(account_id)
841    .bind(format!("-{hours} hours"))
842    .execute(pool)
843    .await
844    .map_err(|e| StorageError::Query { source: e })?;
845
846    Ok(result.rows_affected())
847}
848
849/// Expire old pending items (older than the specified hours).
850pub async fn expire_old_items(pool: &DbPool, hours: u32) -> Result<u64, StorageError> {
851    expire_old_items_for(pool, DEFAULT_ACCOUNT_ID, hours).await
852}
853
854/// Batch-approve the oldest N pending items for a specific account, returning their IDs.
855pub async fn batch_approve_for(
856    pool: &DbPool,
857    account_id: &str,
858    max_batch: usize,
859    review: &ReviewAction,
860) -> Result<Vec<i64>, StorageError> {
861    let pending = get_pending_for(pool, account_id).await?;
862    let to_approve: Vec<&ApprovalItem> = pending.iter().take(max_batch).collect();
863    let mut approved_ids = Vec::with_capacity(to_approve.len());
864
865    for item in to_approve {
866        update_status_with_review_for(pool, account_id, item.id, "approved", review).await?;
867        approved_ids.push(item.id);
868    }
869
870    Ok(approved_ids)
871}
872
873/// Batch-approve the oldest N pending items, returning their IDs.
874pub async fn batch_approve(
875    pool: &DbPool,
876    max_batch: usize,
877    review: &ReviewAction,
878) -> Result<Vec<i64>, StorageError> {
879    batch_approve_for(pool, DEFAULT_ACCOUNT_ID, max_batch, review).await
880}