1use super::{ApprovalItem, ApprovalRow, ApprovalStats, ReviewAction};
4use crate::error::StorageError;
5use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
6use crate::storage::provenance::ProvenanceRef;
7use crate::storage::DbPool;
8
9const SELECT_COLS: &str = "id, action_type, target_tweet_id, target_author, \
11 generated_content, topic, archetype, score, status, created_at, \
12 COALESCE(media_paths, '[]') AS media_paths, reviewed_by, review_notes, reason, \
13 COALESCE(detected_risks, '[]') AS detected_risks, COALESCE(qa_report, '{}') AS qa_report, \
14 COALESCE(qa_hard_flags, '[]') AS qa_hard_flags, COALESCE(qa_soft_flags, '[]') AS qa_soft_flags, \
15 COALESCE(qa_recommendations, '[]') AS qa_recommendations, COALESCE(qa_score, 0) AS qa_score, \
16 COALESCE(qa_requires_override, 0) AS qa_requires_override, qa_override_by, qa_override_note, qa_override_at, \
17 source_node_id, source_seed_id, COALESCE(source_chunks_json, '[]') AS source_chunks_json, \
18 scheduled_for";
19
20#[allow(clippy::too_many_arguments)]
22pub async fn enqueue_for(
23 pool: &DbPool,
24 account_id: &str,
25 action_type: &str,
26 target_tweet_id: &str,
27 target_author: &str,
28 generated_content: &str,
29 topic: &str,
30 archetype: &str,
31 score: f64,
32 media_paths: &str,
33) -> Result<i64, StorageError> {
34 enqueue_with_context_for(
35 pool,
36 account_id,
37 action_type,
38 target_tweet_id,
39 target_author,
40 generated_content,
41 topic,
42 archetype,
43 score,
44 media_paths,
45 None,
46 None,
47 None,
48 )
49 .await
50}
51
52#[allow(clippy::too_many_arguments)]
54pub async fn enqueue(
55 pool: &DbPool,
56 action_type: &str,
57 target_tweet_id: &str,
58 target_author: &str,
59 generated_content: &str,
60 topic: &str,
61 archetype: &str,
62 score: f64,
63 media_paths: &str,
64) -> Result<i64, StorageError> {
65 enqueue_for(
66 pool,
67 DEFAULT_ACCOUNT_ID,
68 action_type,
69 target_tweet_id,
70 target_author,
71 generated_content,
72 topic,
73 archetype,
74 score,
75 media_paths,
76 )
77 .await
78}
79
80#[allow(clippy::too_many_arguments)]
83pub async fn enqueue_with_context_for(
84 pool: &DbPool,
85 account_id: &str,
86 action_type: &str,
87 target_tweet_id: &str,
88 target_author: &str,
89 generated_content: &str,
90 topic: &str,
91 archetype: &str,
92 score: f64,
93 media_paths: &str,
94 reason: Option<&str>,
95 detected_risks: Option<&str>,
96 scheduled_for: Option<&str>,
97) -> Result<i64, StorageError> {
98 let result = sqlx::query(
99 "INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
100 generated_content, topic, archetype, score, media_paths, reason, detected_risks, \
101 scheduled_for) \
102 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
103 )
104 .bind(account_id)
105 .bind(action_type)
106 .bind(target_tweet_id)
107 .bind(target_author)
108 .bind(generated_content)
109 .bind(topic)
110 .bind(archetype)
111 .bind(score)
112 .bind(media_paths)
113 .bind(reason)
114 .bind(detected_risks.unwrap_or("[]"))
115 .bind(scheduled_for)
116 .execute(pool)
117 .await
118 .map_err(|e| StorageError::Query { source: e })?;
119
120 Ok(result.last_insert_rowid())
121}
122
123#[allow(clippy::too_many_arguments)]
125pub async fn enqueue_with_context(
126 pool: &DbPool,
127 action_type: &str,
128 target_tweet_id: &str,
129 target_author: &str,
130 generated_content: &str,
131 topic: &str,
132 archetype: &str,
133 score: f64,
134 media_paths: &str,
135 reason: Option<&str>,
136 detected_risks: Option<&str>,
137) -> Result<i64, StorageError> {
138 enqueue_with_context_for(
139 pool,
140 DEFAULT_ACCOUNT_ID,
141 action_type,
142 target_tweet_id,
143 target_author,
144 generated_content,
145 topic,
146 archetype,
147 score,
148 media_paths,
149 reason,
150 detected_risks,
151 None,
152 )
153 .await
154}
155
156pub struct ProvenanceInput {
158 pub source_node_id: Option<i64>,
159 pub source_seed_id: Option<i64>,
160 pub source_chunks_json: String,
161 pub refs: Vec<ProvenanceRef>,
162}
163
164#[allow(clippy::too_many_arguments)]
169pub async fn enqueue_with_provenance_for(
170 pool: &DbPool,
171 account_id: &str,
172 action_type: &str,
173 target_tweet_id: &str,
174 target_author: &str,
175 generated_content: &str,
176 topic: &str,
177 archetype: &str,
178 score: f64,
179 media_paths: &str,
180 reason: Option<&str>,
181 detected_risks: Option<&str>,
182 provenance: Option<&ProvenanceInput>,
183 scheduled_for: Option<&str>,
184) -> Result<i64, StorageError> {
185 let (source_node_id, source_seed_id, source_chunks_json) = match provenance {
186 Some(p) => (
187 p.source_node_id,
188 p.source_seed_id,
189 p.source_chunks_json.as_str(),
190 ),
191 None => (None, None, "[]"),
192 };
193
194 let result = sqlx::query(
195 "INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
196 generated_content, topic, archetype, score, media_paths, reason, detected_risks, \
197 source_node_id, source_seed_id, source_chunks_json, scheduled_for) \
198 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
199 )
200 .bind(account_id)
201 .bind(action_type)
202 .bind(target_tweet_id)
203 .bind(target_author)
204 .bind(generated_content)
205 .bind(topic)
206 .bind(archetype)
207 .bind(score)
208 .bind(media_paths)
209 .bind(reason)
210 .bind(detected_risks.unwrap_or("[]"))
211 .bind(source_node_id)
212 .bind(source_seed_id)
213 .bind(source_chunks_json)
214 .bind(scheduled_for)
215 .execute(pool)
216 .await
217 .map_err(|e| StorageError::Query { source: e })?;
218
219 let id = result.last_insert_rowid();
220
221 if let Some(p) = provenance {
223 crate::storage::provenance::insert_links_for(
224 pool,
225 account_id,
226 "approval_queue",
227 id,
228 &p.refs,
229 )
230 .await?;
231 }
232
233 Ok(id)
234}
235
236pub async fn get_pending_for(
238 pool: &DbPool,
239 account_id: &str,
240) -> Result<Vec<ApprovalItem>, StorageError> {
241 let sql = format!(
242 "SELECT {SELECT_COLS} FROM approval_queue \
243 WHERE status = 'pending' AND account_id = ? ORDER BY created_at ASC"
244 );
245 let rows: Vec<ApprovalRow> = sqlx::query_as(&sql)
246 .bind(account_id)
247 .fetch_all(pool)
248 .await
249 .map_err(|e| StorageError::Query { source: e })?;
250
251 Ok(rows.into_iter().map(ApprovalItem::from).collect())
252}
253
254pub async fn get_pending(pool: &DbPool) -> Result<Vec<ApprovalItem>, StorageError> {
256 get_pending_for(pool, DEFAULT_ACCOUNT_ID).await
257}
258
259pub async fn pending_count_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
261 let row: (i64,) = sqlx::query_as(
262 "SELECT COUNT(*) FROM approval_queue WHERE status = 'pending' AND account_id = ?",
263 )
264 .bind(account_id)
265 .fetch_one(pool)
266 .await
267 .map_err(|e| StorageError::Query { source: e })?;
268
269 Ok(row.0)
270}
271
272pub async fn pending_count(pool: &DbPool) -> Result<i64, StorageError> {
274 pending_count_for(pool, DEFAULT_ACCOUNT_ID).await
275}
276
277pub async fn update_status_for(
282 pool: &DbPool,
283 account_id: &str,
284 id: i64,
285 status: &str,
286) -> Result<(), StorageError> {
287 let result = sqlx::query(
288 "UPDATE approval_queue SET status = ?, \
289 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
290 WHERE id = ? AND account_id = ? AND status = 'pending'",
291 )
292 .bind(status)
293 .bind(id)
294 .bind(account_id)
295 .execute(pool)
296 .await
297 .map_err(|e| StorageError::Query { source: e })?;
298
299 if result.rows_affected() == 0 {
300 if let Some(item) = get_by_id_for(pool, account_id, id).await? {
301 return Err(StorageError::AlreadyReviewed {
302 id,
303 current_status: item.status,
304 });
305 }
306 }
307
308 Ok(())
309}
310
311pub async fn update_status(pool: &DbPool, id: i64, status: &str) -> Result<(), StorageError> {
313 update_status_for(pool, DEFAULT_ACCOUNT_ID, id, status).await
314}
315
316pub async fn update_status_with_review_for(
321 pool: &DbPool,
322 account_id: &str,
323 id: i64,
324 status: &str,
325 review: &ReviewAction,
326) -> Result<(), StorageError> {
327 let result = sqlx::query(
328 "UPDATE approval_queue SET status = ?, \
329 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), \
330 reviewed_by = ?, review_notes = ? \
331 WHERE id = ? AND account_id = ? AND status = 'pending'",
332 )
333 .bind(status)
334 .bind(&review.actor)
335 .bind(&review.notes)
336 .bind(id)
337 .bind(account_id)
338 .execute(pool)
339 .await
340 .map_err(|e| StorageError::Query { source: e })?;
341
342 if result.rows_affected() == 0 {
343 if let Some(item) = get_by_id_for(pool, account_id, id).await? {
344 return Err(StorageError::AlreadyReviewed {
345 id,
346 current_status: item.status,
347 });
348 }
349 }
350
351 Ok(())
352}
353
354pub async fn update_status_with_review(
356 pool: &DbPool,
357 id: i64,
358 status: &str,
359 review: &ReviewAction,
360) -> Result<(), StorageError> {
361 update_status_with_review_for(pool, DEFAULT_ACCOUNT_ID, id, status, review).await
362}
363
364pub async fn update_content_and_approve_for(
369 pool: &DbPool,
370 account_id: &str,
371 id: i64,
372 new_content: &str,
373) -> Result<(), StorageError> {
374 let result = sqlx::query(
375 "UPDATE approval_queue SET generated_content = ?, status = 'approved', \
376 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
377 WHERE id = ? AND account_id = ? AND status = 'pending'",
378 )
379 .bind(new_content)
380 .bind(id)
381 .bind(account_id)
382 .execute(pool)
383 .await
384 .map_err(|e| StorageError::Query { source: e })?;
385
386 if result.rows_affected() == 0 {
387 if let Some(item) = get_by_id_for(pool, account_id, id).await? {
388 return Err(StorageError::AlreadyReviewed {
389 id,
390 current_status: item.status,
391 });
392 }
393 }
394
395 Ok(())
396}
397
398pub async fn update_content_and_approve(
400 pool: &DbPool,
401 id: i64,
402 new_content: &str,
403) -> Result<(), StorageError> {
404 update_content_and_approve_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
405}
406
407pub async fn get_by_id_for(
409 pool: &DbPool,
410 account_id: &str,
411 id: i64,
412) -> Result<Option<ApprovalItem>, StorageError> {
413 let sql = format!("SELECT {SELECT_COLS} FROM approval_queue WHERE id = ? AND account_id = ?");
414 let row: Option<ApprovalRow> = sqlx::query_as(&sql)
415 .bind(id)
416 .bind(account_id)
417 .fetch_optional(pool)
418 .await
419 .map_err(|e| StorageError::Query { source: e })?;
420
421 Ok(row.map(ApprovalItem::from))
422}
423
424pub async fn get_by_id(pool: &DbPool, id: i64) -> Result<Option<ApprovalItem>, StorageError> {
426 get_by_id_for(pool, DEFAULT_ACCOUNT_ID, id).await
427}
428
429pub async fn get_stats_for(pool: &DbPool, account_id: &str) -> Result<ApprovalStats, StorageError> {
431 let row: (i64, i64, i64, i64, i64) = sqlx::query_as(
432 "SELECT \
433 COALESCE(SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END), 0), \
434 COALESCE(SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END), 0), \
435 COALESCE(SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END), 0), \
436 COALESCE(SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), 0), \
437 COALESCE(SUM(CASE WHEN status = 'scheduled' THEN 1 ELSE 0 END), 0) \
438 FROM approval_queue WHERE account_id = ?",
439 )
440 .bind(account_id)
441 .fetch_one(pool)
442 .await
443 .map_err(|e| StorageError::Query { source: e })?;
444
445 Ok(ApprovalStats {
446 pending: row.0,
447 approved: row.1,
448 rejected: row.2,
449 failed: row.3,
450 scheduled: row.4,
451 })
452}
453
454pub async fn get_stats(pool: &DbPool) -> Result<ApprovalStats, StorageError> {
456 get_stats_for(pool, DEFAULT_ACCOUNT_ID).await
457}
458
459pub async fn get_by_statuses_for(
462 pool: &DbPool,
463 account_id: &str,
464 statuses: &[&str],
465 action_type: Option<&str>,
466) -> Result<Vec<ApprovalItem>, StorageError> {
467 if statuses.is_empty() {
468 return Ok(Vec::new());
469 }
470
471 let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
472 let in_clause = placeholders.join(", ");
473
474 let query = if let Some(at) = action_type {
475 let sql = format!(
476 "SELECT {SELECT_COLS} FROM approval_queue \
477 WHERE account_id = ? AND status IN ({in_clause}) AND action_type = ? \
478 ORDER BY created_at ASC"
479 );
480 let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
481 q = q.bind(account_id);
482 for s in statuses {
483 q = q.bind(*s);
484 }
485 q = q.bind(at);
486 q.fetch_all(pool).await
487 } else {
488 let sql = format!(
489 "SELECT {SELECT_COLS} FROM approval_queue \
490 WHERE account_id = ? AND status IN ({in_clause}) \
491 ORDER BY created_at ASC"
492 );
493 let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
494 q = q.bind(account_id);
495 for s in statuses {
496 q = q.bind(*s);
497 }
498 q.fetch_all(pool).await
499 };
500
501 let rows = query.map_err(|e| StorageError::Query { source: e })?;
502 Ok(rows.into_iter().map(ApprovalItem::from).collect())
503}
504
505pub async fn get_by_statuses(
507 pool: &DbPool,
508 statuses: &[&str],
509 action_type: Option<&str>,
510) -> Result<Vec<ApprovalItem>, StorageError> {
511 get_by_statuses_for(pool, DEFAULT_ACCOUNT_ID, statuses, action_type).await
512}
513
514pub async fn get_filtered_for(
516 pool: &DbPool,
517 account_id: &str,
518 statuses: &[&str],
519 action_type: Option<&str>,
520 reviewed_by: Option<&str>,
521 since: Option<&str>,
522) -> Result<Vec<ApprovalItem>, StorageError> {
523 if statuses.is_empty() {
524 return Ok(Vec::new());
525 }
526
527 let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
528 let in_clause = placeholders.join(", ");
529
530 let mut sql = format!(
531 "SELECT {SELECT_COLS} FROM approval_queue \
532 WHERE account_id = ? AND status IN ({in_clause})"
533 );
534 if action_type.is_some() {
535 sql.push_str(" AND action_type = ?");
536 }
537 if reviewed_by.is_some() {
538 sql.push_str(" AND reviewed_by = ?");
539 }
540 if since.is_some() {
541 sql.push_str(" AND created_at >= ?");
542 }
543 sql.push_str(" ORDER BY created_at ASC");
544
545 let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
546 q = q.bind(account_id);
547 for s in statuses {
548 q = q.bind(*s);
549 }
550 if let Some(at) = action_type {
551 q = q.bind(at);
552 }
553 if let Some(rb) = reviewed_by {
554 q = q.bind(rb);
555 }
556 if let Some(s) = since {
557 q = q.bind(s);
558 }
559
560 let rows = q
561 .fetch_all(pool)
562 .await
563 .map_err(|e| StorageError::Query { source: e })?;
564 Ok(rows.into_iter().map(ApprovalItem::from).collect())
565}
566
567pub async fn get_filtered(
569 pool: &DbPool,
570 statuses: &[&str],
571 action_type: Option<&str>,
572 reviewed_by: Option<&str>,
573 since: Option<&str>,
574) -> Result<Vec<ApprovalItem>, StorageError> {
575 get_filtered_for(
576 pool,
577 DEFAULT_ACCOUNT_ID,
578 statuses,
579 action_type,
580 reviewed_by,
581 since,
582 )
583 .await
584}
585
586pub async fn update_content_for(
588 pool: &DbPool,
589 account_id: &str,
590 id: i64,
591 new_content: &str,
592) -> Result<(), StorageError> {
593 sqlx::query("UPDATE approval_queue SET generated_content = ? WHERE id = ? AND account_id = ?")
594 .bind(new_content)
595 .bind(id)
596 .bind(account_id)
597 .execute(pool)
598 .await
599 .map_err(|e| StorageError::Query { source: e })?;
600
601 Ok(())
602}
603
604pub async fn update_content(pool: &DbPool, id: i64, new_content: &str) -> Result<(), StorageError> {
606 update_content_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
607}
608
609pub async fn update_media_paths_for(
611 pool: &DbPool,
612 account_id: &str,
613 id: i64,
614 media_paths: &str,
615) -> Result<(), StorageError> {
616 sqlx::query("UPDATE approval_queue SET media_paths = ? WHERE id = ? AND account_id = ?")
617 .bind(media_paths)
618 .bind(id)
619 .bind(account_id)
620 .execute(pool)
621 .await
622 .map_err(|e| StorageError::Query { source: e })?;
623
624 Ok(())
625}
626
627pub async fn update_media_paths(
629 pool: &DbPool,
630 id: i64,
631 media_paths: &str,
632) -> Result<(), StorageError> {
633 update_media_paths_for(pool, DEFAULT_ACCOUNT_ID, id, media_paths).await
634}
635
636#[allow(clippy::too_many_arguments)]
638pub async fn update_qa_fields_for(
639 pool: &DbPool,
640 account_id: &str,
641 id: i64,
642 qa_report: &str,
643 qa_hard_flags: &str,
644 qa_soft_flags: &str,
645 qa_recommendations: &str,
646 qa_score: f64,
647 qa_requires_override: bool,
648) -> Result<(), StorageError> {
649 sqlx::query(
650 "UPDATE approval_queue SET qa_report = ?, qa_hard_flags = ?, qa_soft_flags = ?, \
651 qa_recommendations = ?, qa_score = ?, qa_requires_override = ? \
652 WHERE id = ? AND account_id = ?",
653 )
654 .bind(qa_report)
655 .bind(qa_hard_flags)
656 .bind(qa_soft_flags)
657 .bind(qa_recommendations)
658 .bind(qa_score)
659 .bind(if qa_requires_override { 1 } else { 0 })
660 .bind(id)
661 .bind(account_id)
662 .execute(pool)
663 .await
664 .map_err(|e| StorageError::Query { source: e })?;
665
666 Ok(())
667}
668
669#[allow(clippy::too_many_arguments)]
671pub async fn update_qa_fields(
672 pool: &DbPool,
673 id: i64,
674 qa_report: &str,
675 qa_hard_flags: &str,
676 qa_soft_flags: &str,
677 qa_recommendations: &str,
678 qa_score: f64,
679 qa_requires_override: bool,
680) -> Result<(), StorageError> {
681 update_qa_fields_for(
682 pool,
683 DEFAULT_ACCOUNT_ID,
684 id,
685 qa_report,
686 qa_hard_flags,
687 qa_soft_flags,
688 qa_recommendations,
689 qa_score,
690 qa_requires_override,
691 )
692 .await
693}
694
695pub async fn set_qa_override_for(
697 pool: &DbPool,
698 account_id: &str,
699 id: i64,
700 actor: &str,
701 note: &str,
702) -> Result<(), StorageError> {
703 sqlx::query(
704 "UPDATE approval_queue SET qa_override_by = ?, qa_override_note = ?, \
705 qa_override_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
706 WHERE id = ? AND account_id = ?",
707 )
708 .bind(actor)
709 .bind(note)
710 .bind(id)
711 .bind(account_id)
712 .execute(pool)
713 .await
714 .map_err(|e| StorageError::Query { source: e })?;
715
716 Ok(())
717}
718
719pub async fn set_qa_override(
721 pool: &DbPool,
722 id: i64,
723 actor: &str,
724 note: &str,
725) -> Result<(), StorageError> {
726 set_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id, actor, note).await
727}
728
729pub async fn clear_qa_override_for(
731 pool: &DbPool,
732 account_id: &str,
733 id: i64,
734) -> Result<(), StorageError> {
735 sqlx::query(
736 "UPDATE approval_queue SET qa_override_by = NULL, qa_override_note = NULL, \
737 qa_override_at = NULL WHERE id = ? AND account_id = ?",
738 )
739 .bind(id)
740 .bind(account_id)
741 .execute(pool)
742 .await
743 .map_err(|e| StorageError::Query { source: e })?;
744
745 Ok(())
746}
747
748pub async fn clear_qa_override(pool: &DbPool, id: i64) -> Result<(), StorageError> {
750 clear_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id).await
751}
752
753pub async fn get_next_approved_for(
755 pool: &DbPool,
756 account_id: &str,
757) -> Result<Option<ApprovalItem>, StorageError> {
758 let sql = format!(
759 "SELECT {SELECT_COLS} FROM approval_queue \
760 WHERE status = 'approved' AND account_id = ? ORDER BY reviewed_at ASC LIMIT 1"
761 );
762 let row: Option<ApprovalRow> = sqlx::query_as(&sql)
763 .bind(account_id)
764 .fetch_optional(pool)
765 .await
766 .map_err(|e| StorageError::Query { source: e })?;
767
768 Ok(row.map(ApprovalItem::from))
769}
770
771pub async fn get_next_approved(pool: &DbPool) -> Result<Option<ApprovalItem>, StorageError> {
773 get_next_approved_for(pool, DEFAULT_ACCOUNT_ID).await
774}
775
776pub async fn mark_posted_for(
778 pool: &DbPool,
779 account_id: &str,
780 id: i64,
781 tweet_id: &str,
782) -> Result<(), StorageError> {
783 sqlx::query(
784 "UPDATE approval_queue SET status = 'posted', posted_tweet_id = ? \
785 WHERE id = ? AND account_id = ?",
786 )
787 .bind(tweet_id)
788 .bind(id)
789 .bind(account_id)
790 .execute(pool)
791 .await
792 .map_err(|e| StorageError::Query { source: e })?;
793
794 Ok(())
795}
796
797pub async fn mark_posted(pool: &DbPool, id: i64, tweet_id: &str) -> Result<(), StorageError> {
799 mark_posted_for(pool, DEFAULT_ACCOUNT_ID, id, tweet_id).await
800}
801
802pub async fn mark_failed_for(
804 pool: &DbPool,
805 account_id: &str,
806 id: i64,
807 error_message: &str,
808) -> Result<(), StorageError> {
809 sqlx::query(
810 "UPDATE approval_queue SET status = 'failed', review_notes = ? \
811 WHERE id = ? AND account_id = ? AND status = 'approved'",
812 )
813 .bind(error_message)
814 .bind(id)
815 .bind(account_id)
816 .execute(pool)
817 .await
818 .map_err(|e| StorageError::Query { source: e })?;
819
820 Ok(())
821}
822
823pub async fn mark_failed(pool: &DbPool, id: i64, error_message: &str) -> Result<(), StorageError> {
825 mark_failed_for(pool, DEFAULT_ACCOUNT_ID, id, error_message).await
826}
827
828pub async fn expire_old_items_for(
830 pool: &DbPool,
831 account_id: &str,
832 hours: u32,
833) -> Result<u64, StorageError> {
834 let result = sqlx::query(
835 "UPDATE approval_queue SET status = 'expired', \
836 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
837 WHERE status = 'pending' AND account_id = ? \
838 AND created_at < strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?)",
839 )
840 .bind(account_id)
841 .bind(format!("-{hours} hours"))
842 .execute(pool)
843 .await
844 .map_err(|e| StorageError::Query { source: e })?;
845
846 Ok(result.rows_affected())
847}
848
849pub async fn expire_old_items(pool: &DbPool, hours: u32) -> Result<u64, StorageError> {
851 expire_old_items_for(pool, DEFAULT_ACCOUNT_ID, hours).await
852}
853
854pub async fn batch_approve_for(
856 pool: &DbPool,
857 account_id: &str,
858 max_batch: usize,
859 review: &ReviewAction,
860) -> Result<Vec<i64>, StorageError> {
861 let pending = get_pending_for(pool, account_id).await?;
862 let to_approve: Vec<&ApprovalItem> = pending.iter().take(max_batch).collect();
863 let mut approved_ids = Vec::with_capacity(to_approve.len());
864
865 for item in to_approve {
866 update_status_with_review_for(pool, account_id, item.id, "approved", review).await?;
867 approved_ids.push(item.id);
868 }
869
870 Ok(approved_ids)
871}
872
873pub async fn batch_approve(
875 pool: &DbPool,
876 max_batch: usize,
877 review: &ReviewAction,
878) -> Result<Vec<i64>, StorageError> {
879 batch_approve_for(pool, DEFAULT_ACCOUNT_ID, max_batch, review).await
880}