1use super::{ApprovalItem, ApprovalRow, ApprovalStats, ReviewAction};
4use crate::error::StorageError;
5use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
6use crate::storage::DbPool;
7
8const SELECT_COLS: &str = "id, action_type, target_tweet_id, target_author, \
10 generated_content, topic, archetype, score, status, created_at, \
11 COALESCE(media_paths, '[]') AS media_paths, reviewed_by, review_notes, reason, \
12 COALESCE(detected_risks, '[]') AS detected_risks, COALESCE(qa_report, '{}') AS qa_report, \
13 COALESCE(qa_hard_flags, '[]') AS qa_hard_flags, COALESCE(qa_soft_flags, '[]') AS qa_soft_flags, \
14 COALESCE(qa_recommendations, '[]') AS qa_recommendations, COALESCE(qa_score, 0) AS qa_score, \
15 COALESCE(qa_requires_override, 0) AS qa_requires_override, qa_override_by, qa_override_note, qa_override_at";
16
17#[allow(clippy::too_many_arguments)]
19pub async fn enqueue_for(
20 pool: &DbPool,
21 account_id: &str,
22 action_type: &str,
23 target_tweet_id: &str,
24 target_author: &str,
25 generated_content: &str,
26 topic: &str,
27 archetype: &str,
28 score: f64,
29 media_paths: &str,
30) -> Result<i64, StorageError> {
31 enqueue_with_context_for(
32 pool,
33 account_id,
34 action_type,
35 target_tweet_id,
36 target_author,
37 generated_content,
38 topic,
39 archetype,
40 score,
41 media_paths,
42 None,
43 None,
44 )
45 .await
46}
47
48#[allow(clippy::too_many_arguments)]
50pub async fn enqueue(
51 pool: &DbPool,
52 action_type: &str,
53 target_tweet_id: &str,
54 target_author: &str,
55 generated_content: &str,
56 topic: &str,
57 archetype: &str,
58 score: f64,
59 media_paths: &str,
60) -> Result<i64, StorageError> {
61 enqueue_for(
62 pool,
63 DEFAULT_ACCOUNT_ID,
64 action_type,
65 target_tweet_id,
66 target_author,
67 generated_content,
68 topic,
69 archetype,
70 score,
71 media_paths,
72 )
73 .await
74}
75
76#[allow(clippy::too_many_arguments)]
78pub async fn enqueue_with_context_for(
79 pool: &DbPool,
80 account_id: &str,
81 action_type: &str,
82 target_tweet_id: &str,
83 target_author: &str,
84 generated_content: &str,
85 topic: &str,
86 archetype: &str,
87 score: f64,
88 media_paths: &str,
89 reason: Option<&str>,
90 detected_risks: Option<&str>,
91) -> Result<i64, StorageError> {
92 let result = sqlx::query(
93 "INSERT INTO approval_queue (account_id, action_type, target_tweet_id, target_author, \
94 generated_content, topic, archetype, score, media_paths, reason, detected_risks) \
95 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
96 )
97 .bind(account_id)
98 .bind(action_type)
99 .bind(target_tweet_id)
100 .bind(target_author)
101 .bind(generated_content)
102 .bind(topic)
103 .bind(archetype)
104 .bind(score)
105 .bind(media_paths)
106 .bind(reason)
107 .bind(detected_risks.unwrap_or("[]"))
108 .execute(pool)
109 .await
110 .map_err(|e| StorageError::Query { source: e })?;
111
112 Ok(result.last_insert_rowid())
113}
114
115#[allow(clippy::too_many_arguments)]
117pub async fn enqueue_with_context(
118 pool: &DbPool,
119 action_type: &str,
120 target_tweet_id: &str,
121 target_author: &str,
122 generated_content: &str,
123 topic: &str,
124 archetype: &str,
125 score: f64,
126 media_paths: &str,
127 reason: Option<&str>,
128 detected_risks: Option<&str>,
129) -> Result<i64, StorageError> {
130 enqueue_with_context_for(
131 pool,
132 DEFAULT_ACCOUNT_ID,
133 action_type,
134 target_tweet_id,
135 target_author,
136 generated_content,
137 topic,
138 archetype,
139 score,
140 media_paths,
141 reason,
142 detected_risks,
143 )
144 .await
145}
146
147pub async fn get_pending_for(
149 pool: &DbPool,
150 account_id: &str,
151) -> Result<Vec<ApprovalItem>, StorageError> {
152 let sql = format!(
153 "SELECT {SELECT_COLS} FROM approval_queue \
154 WHERE status = 'pending' AND account_id = ? ORDER BY created_at ASC"
155 );
156 let rows: Vec<ApprovalRow> = sqlx::query_as(&sql)
157 .bind(account_id)
158 .fetch_all(pool)
159 .await
160 .map_err(|e| StorageError::Query { source: e })?;
161
162 Ok(rows.into_iter().map(ApprovalItem::from).collect())
163}
164
165pub async fn get_pending(pool: &DbPool) -> Result<Vec<ApprovalItem>, StorageError> {
167 get_pending_for(pool, DEFAULT_ACCOUNT_ID).await
168}
169
170pub async fn pending_count_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
172 let row: (i64,) = sqlx::query_as(
173 "SELECT COUNT(*) FROM approval_queue WHERE status = 'pending' AND account_id = ?",
174 )
175 .bind(account_id)
176 .fetch_one(pool)
177 .await
178 .map_err(|e| StorageError::Query { source: e })?;
179
180 Ok(row.0)
181}
182
183pub async fn pending_count(pool: &DbPool) -> Result<i64, StorageError> {
185 pending_count_for(pool, DEFAULT_ACCOUNT_ID).await
186}
187
188pub async fn update_status_for(
190 pool: &DbPool,
191 account_id: &str,
192 id: i64,
193 status: &str,
194) -> Result<(), StorageError> {
195 sqlx::query(
196 "UPDATE approval_queue SET status = ?, \
197 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ? AND account_id = ?",
198 )
199 .bind(status)
200 .bind(id)
201 .bind(account_id)
202 .execute(pool)
203 .await
204 .map_err(|e| StorageError::Query { source: e })?;
205
206 Ok(())
207}
208
209pub async fn update_status(pool: &DbPool, id: i64, status: &str) -> Result<(), StorageError> {
211 update_status_for(pool, DEFAULT_ACCOUNT_ID, id, status).await
212}
213
214pub async fn update_status_with_review_for(
216 pool: &DbPool,
217 account_id: &str,
218 id: i64,
219 status: &str,
220 review: &ReviewAction,
221) -> Result<(), StorageError> {
222 sqlx::query(
223 "UPDATE approval_queue SET status = ?, \
224 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), \
225 reviewed_by = ?, review_notes = ? WHERE id = ? AND account_id = ?",
226 )
227 .bind(status)
228 .bind(&review.actor)
229 .bind(&review.notes)
230 .bind(id)
231 .bind(account_id)
232 .execute(pool)
233 .await
234 .map_err(|e| StorageError::Query { source: e })?;
235
236 Ok(())
237}
238
239pub async fn update_status_with_review(
241 pool: &DbPool,
242 id: i64,
243 status: &str,
244 review: &ReviewAction,
245) -> Result<(), StorageError> {
246 update_status_with_review_for(pool, DEFAULT_ACCOUNT_ID, id, status, review).await
247}
248
249pub async fn update_content_and_approve_for(
251 pool: &DbPool,
252 account_id: &str,
253 id: i64,
254 new_content: &str,
255) -> Result<(), StorageError> {
256 sqlx::query(
257 "UPDATE approval_queue SET generated_content = ?, status = 'approved', \
258 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ? AND account_id = ?",
259 )
260 .bind(new_content)
261 .bind(id)
262 .bind(account_id)
263 .execute(pool)
264 .await
265 .map_err(|e| StorageError::Query { source: e })?;
266
267 Ok(())
268}
269
270pub async fn update_content_and_approve(
272 pool: &DbPool,
273 id: i64,
274 new_content: &str,
275) -> Result<(), StorageError> {
276 update_content_and_approve_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
277}
278
279pub async fn get_by_id_for(
281 pool: &DbPool,
282 account_id: &str,
283 id: i64,
284) -> Result<Option<ApprovalItem>, StorageError> {
285 let sql = format!("SELECT {SELECT_COLS} FROM approval_queue WHERE id = ? AND account_id = ?");
286 let row: Option<ApprovalRow> = sqlx::query_as(&sql)
287 .bind(id)
288 .bind(account_id)
289 .fetch_optional(pool)
290 .await
291 .map_err(|e| StorageError::Query { source: e })?;
292
293 Ok(row.map(ApprovalItem::from))
294}
295
296pub async fn get_by_id(pool: &DbPool, id: i64) -> Result<Option<ApprovalItem>, StorageError> {
298 get_by_id_for(pool, DEFAULT_ACCOUNT_ID, id).await
299}
300
301pub async fn get_stats_for(pool: &DbPool, account_id: &str) -> Result<ApprovalStats, StorageError> {
303 let row: (i64, i64, i64) = sqlx::query_as(
304 "SELECT \
305 COALESCE(SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END), 0), \
306 COALESCE(SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END), 0), \
307 COALESCE(SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END), 0) \
308 FROM approval_queue WHERE account_id = ?",
309 )
310 .bind(account_id)
311 .fetch_one(pool)
312 .await
313 .map_err(|e| StorageError::Query { source: e })?;
314
315 Ok(ApprovalStats {
316 pending: row.0,
317 approved: row.1,
318 rejected: row.2,
319 })
320}
321
322pub async fn get_stats(pool: &DbPool) -> Result<ApprovalStats, StorageError> {
324 get_stats_for(pool, DEFAULT_ACCOUNT_ID).await
325}
326
327pub async fn get_by_statuses_for(
330 pool: &DbPool,
331 account_id: &str,
332 statuses: &[&str],
333 action_type: Option<&str>,
334) -> Result<Vec<ApprovalItem>, StorageError> {
335 if statuses.is_empty() {
336 return Ok(Vec::new());
337 }
338
339 let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
340 let in_clause = placeholders.join(", ");
341
342 let query = if let Some(at) = action_type {
343 let sql = format!(
344 "SELECT {SELECT_COLS} FROM approval_queue \
345 WHERE account_id = ? AND status IN ({in_clause}) AND action_type = ? \
346 ORDER BY created_at ASC"
347 );
348 let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
349 q = q.bind(account_id);
350 for s in statuses {
351 q = q.bind(*s);
352 }
353 q = q.bind(at);
354 q.fetch_all(pool).await
355 } else {
356 let sql = format!(
357 "SELECT {SELECT_COLS} FROM approval_queue \
358 WHERE account_id = ? AND status IN ({in_clause}) \
359 ORDER BY created_at ASC"
360 );
361 let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
362 q = q.bind(account_id);
363 for s in statuses {
364 q = q.bind(*s);
365 }
366 q.fetch_all(pool).await
367 };
368
369 let rows = query.map_err(|e| StorageError::Query { source: e })?;
370 Ok(rows.into_iter().map(ApprovalItem::from).collect())
371}
372
373pub async fn get_by_statuses(
375 pool: &DbPool,
376 statuses: &[&str],
377 action_type: Option<&str>,
378) -> Result<Vec<ApprovalItem>, StorageError> {
379 get_by_statuses_for(pool, DEFAULT_ACCOUNT_ID, statuses, action_type).await
380}
381
382pub async fn get_filtered_for(
384 pool: &DbPool,
385 account_id: &str,
386 statuses: &[&str],
387 action_type: Option<&str>,
388 reviewed_by: Option<&str>,
389 since: Option<&str>,
390) -> Result<Vec<ApprovalItem>, StorageError> {
391 if statuses.is_empty() {
392 return Ok(Vec::new());
393 }
394
395 let placeholders: Vec<&str> = statuses.iter().map(|_| "?").collect();
396 let in_clause = placeholders.join(", ");
397
398 let mut sql = format!(
399 "SELECT {SELECT_COLS} FROM approval_queue \
400 WHERE account_id = ? AND status IN ({in_clause})"
401 );
402 if action_type.is_some() {
403 sql.push_str(" AND action_type = ?");
404 }
405 if reviewed_by.is_some() {
406 sql.push_str(" AND reviewed_by = ?");
407 }
408 if since.is_some() {
409 sql.push_str(" AND created_at >= ?");
410 }
411 sql.push_str(" ORDER BY created_at ASC");
412
413 let mut q = sqlx::query_as::<_, ApprovalRow>(&sql);
414 q = q.bind(account_id);
415 for s in statuses {
416 q = q.bind(*s);
417 }
418 if let Some(at) = action_type {
419 q = q.bind(at);
420 }
421 if let Some(rb) = reviewed_by {
422 q = q.bind(rb);
423 }
424 if let Some(s) = since {
425 q = q.bind(s);
426 }
427
428 let rows = q
429 .fetch_all(pool)
430 .await
431 .map_err(|e| StorageError::Query { source: e })?;
432 Ok(rows.into_iter().map(ApprovalItem::from).collect())
433}
434
435pub async fn get_filtered(
437 pool: &DbPool,
438 statuses: &[&str],
439 action_type: Option<&str>,
440 reviewed_by: Option<&str>,
441 since: Option<&str>,
442) -> Result<Vec<ApprovalItem>, StorageError> {
443 get_filtered_for(
444 pool,
445 DEFAULT_ACCOUNT_ID,
446 statuses,
447 action_type,
448 reviewed_by,
449 since,
450 )
451 .await
452}
453
454pub async fn update_content_for(
456 pool: &DbPool,
457 account_id: &str,
458 id: i64,
459 new_content: &str,
460) -> Result<(), StorageError> {
461 sqlx::query("UPDATE approval_queue SET generated_content = ? WHERE id = ? AND account_id = ?")
462 .bind(new_content)
463 .bind(id)
464 .bind(account_id)
465 .execute(pool)
466 .await
467 .map_err(|e| StorageError::Query { source: e })?;
468
469 Ok(())
470}
471
472pub async fn update_content(pool: &DbPool, id: i64, new_content: &str) -> Result<(), StorageError> {
474 update_content_for(pool, DEFAULT_ACCOUNT_ID, id, new_content).await
475}
476
477pub async fn update_media_paths_for(
479 pool: &DbPool,
480 account_id: &str,
481 id: i64,
482 media_paths: &str,
483) -> Result<(), StorageError> {
484 sqlx::query("UPDATE approval_queue SET media_paths = ? WHERE id = ? AND account_id = ?")
485 .bind(media_paths)
486 .bind(id)
487 .bind(account_id)
488 .execute(pool)
489 .await
490 .map_err(|e| StorageError::Query { source: e })?;
491
492 Ok(())
493}
494
495pub async fn update_media_paths(
497 pool: &DbPool,
498 id: i64,
499 media_paths: &str,
500) -> Result<(), StorageError> {
501 update_media_paths_for(pool, DEFAULT_ACCOUNT_ID, id, media_paths).await
502}
503
504#[allow(clippy::too_many_arguments)]
506pub async fn update_qa_fields_for(
507 pool: &DbPool,
508 account_id: &str,
509 id: i64,
510 qa_report: &str,
511 qa_hard_flags: &str,
512 qa_soft_flags: &str,
513 qa_recommendations: &str,
514 qa_score: f64,
515 qa_requires_override: bool,
516) -> Result<(), StorageError> {
517 sqlx::query(
518 "UPDATE approval_queue SET qa_report = ?, qa_hard_flags = ?, qa_soft_flags = ?, \
519 qa_recommendations = ?, qa_score = ?, qa_requires_override = ? \
520 WHERE id = ? AND account_id = ?",
521 )
522 .bind(qa_report)
523 .bind(qa_hard_flags)
524 .bind(qa_soft_flags)
525 .bind(qa_recommendations)
526 .bind(qa_score)
527 .bind(if qa_requires_override { 1 } else { 0 })
528 .bind(id)
529 .bind(account_id)
530 .execute(pool)
531 .await
532 .map_err(|e| StorageError::Query { source: e })?;
533
534 Ok(())
535}
536
537#[allow(clippy::too_many_arguments)]
539pub async fn update_qa_fields(
540 pool: &DbPool,
541 id: i64,
542 qa_report: &str,
543 qa_hard_flags: &str,
544 qa_soft_flags: &str,
545 qa_recommendations: &str,
546 qa_score: f64,
547 qa_requires_override: bool,
548) -> Result<(), StorageError> {
549 update_qa_fields_for(
550 pool,
551 DEFAULT_ACCOUNT_ID,
552 id,
553 qa_report,
554 qa_hard_flags,
555 qa_soft_flags,
556 qa_recommendations,
557 qa_score,
558 qa_requires_override,
559 )
560 .await
561}
562
563pub async fn set_qa_override_for(
565 pool: &DbPool,
566 account_id: &str,
567 id: i64,
568 actor: &str,
569 note: &str,
570) -> Result<(), StorageError> {
571 sqlx::query(
572 "UPDATE approval_queue SET qa_override_by = ?, qa_override_note = ?, \
573 qa_override_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
574 WHERE id = ? AND account_id = ?",
575 )
576 .bind(actor)
577 .bind(note)
578 .bind(id)
579 .bind(account_id)
580 .execute(pool)
581 .await
582 .map_err(|e| StorageError::Query { source: e })?;
583
584 Ok(())
585}
586
587pub async fn set_qa_override(
589 pool: &DbPool,
590 id: i64,
591 actor: &str,
592 note: &str,
593) -> Result<(), StorageError> {
594 set_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id, actor, note).await
595}
596
597pub async fn clear_qa_override_for(
599 pool: &DbPool,
600 account_id: &str,
601 id: i64,
602) -> Result<(), StorageError> {
603 sqlx::query(
604 "UPDATE approval_queue SET qa_override_by = NULL, qa_override_note = NULL, \
605 qa_override_at = NULL WHERE id = ? AND account_id = ?",
606 )
607 .bind(id)
608 .bind(account_id)
609 .execute(pool)
610 .await
611 .map_err(|e| StorageError::Query { source: e })?;
612
613 Ok(())
614}
615
616pub async fn clear_qa_override(pool: &DbPool, id: i64) -> Result<(), StorageError> {
618 clear_qa_override_for(pool, DEFAULT_ACCOUNT_ID, id).await
619}
620
621pub async fn get_next_approved_for(
623 pool: &DbPool,
624 account_id: &str,
625) -> Result<Option<ApprovalItem>, StorageError> {
626 let sql = format!(
627 "SELECT {SELECT_COLS} FROM approval_queue \
628 WHERE status = 'approved' AND account_id = ? ORDER BY reviewed_at ASC LIMIT 1"
629 );
630 let row: Option<ApprovalRow> = sqlx::query_as(&sql)
631 .bind(account_id)
632 .fetch_optional(pool)
633 .await
634 .map_err(|e| StorageError::Query { source: e })?;
635
636 Ok(row.map(ApprovalItem::from))
637}
638
639pub async fn get_next_approved(pool: &DbPool) -> Result<Option<ApprovalItem>, StorageError> {
641 get_next_approved_for(pool, DEFAULT_ACCOUNT_ID).await
642}
643
644pub async fn mark_posted_for(
646 pool: &DbPool,
647 account_id: &str,
648 id: i64,
649 tweet_id: &str,
650) -> Result<(), StorageError> {
651 sqlx::query(
652 "UPDATE approval_queue SET status = 'posted', posted_tweet_id = ? \
653 WHERE id = ? AND account_id = ?",
654 )
655 .bind(tweet_id)
656 .bind(id)
657 .bind(account_id)
658 .execute(pool)
659 .await
660 .map_err(|e| StorageError::Query { source: e })?;
661
662 Ok(())
663}
664
665pub async fn mark_posted(pool: &DbPool, id: i64, tweet_id: &str) -> Result<(), StorageError> {
667 mark_posted_for(pool, DEFAULT_ACCOUNT_ID, id, tweet_id).await
668}
669
670pub async fn expire_old_items_for(
672 pool: &DbPool,
673 account_id: &str,
674 hours: u32,
675) -> Result<u64, StorageError> {
676 let result = sqlx::query(
677 "UPDATE approval_queue SET status = 'expired', \
678 reviewed_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
679 WHERE status = 'pending' AND account_id = ? \
680 AND created_at < strftime('%Y-%m-%dT%H:%M:%SZ', 'now', ?)",
681 )
682 .bind(account_id)
683 .bind(format!("-{hours} hours"))
684 .execute(pool)
685 .await
686 .map_err(|e| StorageError::Query { source: e })?;
687
688 Ok(result.rows_affected())
689}
690
691pub async fn expire_old_items(pool: &DbPool, hours: u32) -> Result<u64, StorageError> {
693 expire_old_items_for(pool, DEFAULT_ACCOUNT_ID, hours).await
694}
695
696pub async fn batch_approve_for(
698 pool: &DbPool,
699 account_id: &str,
700 max_batch: usize,
701 review: &ReviewAction,
702) -> Result<Vec<i64>, StorageError> {
703 let pending = get_pending_for(pool, account_id).await?;
704 let to_approve: Vec<&ApprovalItem> = pending.iter().take(max_batch).collect();
705 let mut approved_ids = Vec::with_capacity(to_approve.len());
706
707 for item in to_approve {
708 update_status_with_review_for(pool, account_id, item.id, "approved", review).await?;
709 approved_ids.push(item.id);
710 }
711
712 Ok(approved_ids)
713}
714
715pub async fn batch_approve(
717 pool: &DbPool,
718 max_batch: usize,
719 review: &ReviewAction,
720) -> Result<Vec<i64>, StorageError> {
721 batch_approve_for(pool, DEFAULT_ACCOUNT_ID, max_batch, review).await
722}