1use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9use chrono::{NaiveDate, Utc};
10
11#[derive(Debug, Clone, serde::Serialize)]
17pub struct FollowerSnapshot {
18 pub snapshot_date: String,
19 pub follower_count: i64,
20 pub following_count: i64,
21 pub tweet_count: i64,
22}
23
24pub async fn upsert_follower_snapshot_for(
26 pool: &DbPool,
27 account_id: &str,
28 follower_count: i64,
29 following_count: i64,
30 tweet_count: i64,
31) -> Result<(), StorageError> {
32 sqlx::query(
33 "INSERT INTO follower_snapshots (account_id, snapshot_date, follower_count, following_count, tweet_count) \
34 VALUES (?, date('now'), ?, ?, ?) \
35 ON CONFLICT(snapshot_date) DO UPDATE SET \
36 account_id = excluded.account_id, \
37 follower_count = excluded.follower_count, \
38 following_count = excluded.following_count, \
39 tweet_count = excluded.tweet_count",
40 )
41 .bind(account_id)
42 .bind(follower_count)
43 .bind(following_count)
44 .bind(tweet_count)
45 .execute(pool)
46 .await
47 .map_err(|e| StorageError::Query { source: e })?;
48 Ok(())
49}
50
51pub async fn upsert_follower_snapshot(
53 pool: &DbPool,
54 follower_count: i64,
55 following_count: i64,
56 tweet_count: i64,
57) -> Result<(), StorageError> {
58 upsert_follower_snapshot_for(
59 pool,
60 DEFAULT_ACCOUNT_ID,
61 follower_count,
62 following_count,
63 tweet_count,
64 )
65 .await
66}
67
68pub async fn get_follower_snapshots_for(
70 pool: &DbPool,
71 account_id: &str,
72 limit: u32,
73) -> Result<Vec<FollowerSnapshot>, StorageError> {
74 let rows: Vec<(String, i64, i64, i64)> = sqlx::query_as(
75 "SELECT snapshot_date, follower_count, following_count, tweet_count \
76 FROM follower_snapshots WHERE account_id = ? ORDER BY snapshot_date DESC LIMIT ?",
77 )
78 .bind(account_id)
79 .bind(limit)
80 .fetch_all(pool)
81 .await
82 .map_err(|e| StorageError::Query { source: e })?;
83
84 Ok(rows
85 .into_iter()
86 .map(|r| FollowerSnapshot {
87 snapshot_date: r.0,
88 follower_count: r.1,
89 following_count: r.2,
90 tweet_count: r.3,
91 })
92 .collect())
93}
94
95pub async fn get_follower_snapshots(
97 pool: &DbPool,
98 limit: u32,
99) -> Result<Vec<FollowerSnapshot>, StorageError> {
100 get_follower_snapshots_for(pool, DEFAULT_ACCOUNT_ID, limit).await
101}
102
103pub async fn upsert_reply_performance_for(
109 pool: &DbPool,
110 account_id: &str,
111 reply_id: &str,
112 likes: i64,
113 replies: i64,
114 impressions: i64,
115 score: f64,
116) -> Result<(), StorageError> {
117 sqlx::query(
118 "INSERT INTO reply_performance (account_id, reply_id, likes_received, replies_received, impressions, performance_score) \
119 VALUES (?, ?, ?, ?, ?, ?) \
120 ON CONFLICT(reply_id) DO UPDATE SET \
121 likes_received = excluded.likes_received, \
122 replies_received = excluded.replies_received, \
123 impressions = excluded.impressions, \
124 performance_score = excluded.performance_score, \
125 measured_at = datetime('now')",
126 )
127 .bind(account_id)
128 .bind(reply_id)
129 .bind(likes)
130 .bind(replies)
131 .bind(impressions)
132 .bind(score)
133 .execute(pool)
134 .await
135 .map_err(|e| StorageError::Query { source: e })?;
136 Ok(())
137}
138
139pub async fn upsert_reply_performance(
141 pool: &DbPool,
142 reply_id: &str,
143 likes: i64,
144 replies: i64,
145 impressions: i64,
146 score: f64,
147) -> Result<(), StorageError> {
148 upsert_reply_performance_for(
149 pool,
150 DEFAULT_ACCOUNT_ID,
151 reply_id,
152 likes,
153 replies,
154 impressions,
155 score,
156 )
157 .await
158}
159
160#[allow(clippy::too_many_arguments)]
166pub async fn upsert_tweet_performance_for(
167 pool: &DbPool,
168 account_id: &str,
169 tweet_id: &str,
170 likes: i64,
171 retweets: i64,
172 replies: i64,
173 impressions: i64,
174 score: f64,
175) -> Result<(), StorageError> {
176 sqlx::query(
177 "INSERT INTO tweet_performance (account_id, tweet_id, likes_received, retweets_received, replies_received, impressions, performance_score) \
178 VALUES (?, ?, ?, ?, ?, ?, ?) \
179 ON CONFLICT(tweet_id) DO UPDATE SET \
180 likes_received = excluded.likes_received, \
181 retweets_received = excluded.retweets_received, \
182 replies_received = excluded.replies_received, \
183 impressions = excluded.impressions, \
184 performance_score = excluded.performance_score, \
185 measured_at = datetime('now')",
186 )
187 .bind(account_id)
188 .bind(tweet_id)
189 .bind(likes)
190 .bind(retweets)
191 .bind(replies)
192 .bind(impressions)
193 .bind(score)
194 .execute(pool)
195 .await
196 .map_err(|e| StorageError::Query { source: e })?;
197 Ok(())
198}
199
200pub async fn upsert_tweet_performance(
202 pool: &DbPool,
203 tweet_id: &str,
204 likes: i64,
205 retweets: i64,
206 replies: i64,
207 impressions: i64,
208 score: f64,
209) -> Result<(), StorageError> {
210 upsert_tweet_performance_for(
211 pool,
212 DEFAULT_ACCOUNT_ID,
213 tweet_id,
214 likes,
215 retweets,
216 replies,
217 impressions,
218 score,
219 )
220 .await
221}
222
223#[derive(Debug, Clone, serde::Serialize)]
229pub struct ContentScore {
230 pub topic: String,
231 pub format: String,
232 pub total_posts: i64,
233 pub avg_performance: f64,
234}
235
236pub async fn update_content_score_for(
240 pool: &DbPool,
241 account_id: &str,
242 topic: &str,
243 format: &str,
244 new_score: f64,
245) -> Result<(), StorageError> {
246 sqlx::query(
248 "INSERT INTO content_scores (account_id, topic, format, total_posts, avg_performance) \
249 VALUES (?, ?, ?, 1, ?) \
250 ON CONFLICT(topic, format) DO UPDATE SET \
251 account_id = excluded.account_id, \
252 total_posts = content_scores.total_posts + 1, \
253 avg_performance = content_scores.avg_performance + \
254 (? - content_scores.avg_performance) / (content_scores.total_posts + 1)",
255 )
256 .bind(account_id)
257 .bind(topic)
258 .bind(format)
259 .bind(new_score)
260 .bind(new_score)
261 .execute(pool)
262 .await
263 .map_err(|e| StorageError::Query { source: e })?;
264 Ok(())
265}
266
267pub async fn update_content_score(
271 pool: &DbPool,
272 topic: &str,
273 format: &str,
274 new_score: f64,
275) -> Result<(), StorageError> {
276 update_content_score_for(pool, DEFAULT_ACCOUNT_ID, topic, format, new_score).await
277}
278
279pub async fn get_top_topics_for(
281 pool: &DbPool,
282 account_id: &str,
283 limit: u32,
284) -> Result<Vec<ContentScore>, StorageError> {
285 let rows: Vec<(String, String, i64, f64)> = sqlx::query_as(
286 "SELECT topic, format, total_posts, avg_performance \
287 FROM content_scores \
288 WHERE account_id = ? \
289 ORDER BY avg_performance DESC \
290 LIMIT ?",
291 )
292 .bind(account_id)
293 .bind(limit)
294 .fetch_all(pool)
295 .await
296 .map_err(|e| StorageError::Query { source: e })?;
297
298 Ok(rows
299 .into_iter()
300 .map(|r| ContentScore {
301 topic: r.0,
302 format: r.1,
303 total_posts: r.2,
304 avg_performance: r.3,
305 })
306 .collect())
307}
308
309pub async fn get_top_topics(pool: &DbPool, limit: u32) -> Result<Vec<ContentScore>, StorageError> {
311 get_top_topics_for(pool, DEFAULT_ACCOUNT_ID, limit).await
312}
313
314pub async fn get_avg_reply_engagement_for(
316 pool: &DbPool,
317 account_id: &str,
318) -> Result<f64, StorageError> {
319 let row: (f64,) = sqlx::query_as(
320 "SELECT COALESCE(AVG(performance_score), 0.0) FROM reply_performance WHERE account_id = ?",
321 )
322 .bind(account_id)
323 .fetch_one(pool)
324 .await
325 .map_err(|e| StorageError::Query { source: e })?;
326
327 Ok(row.0)
328}
329
330pub async fn get_avg_reply_engagement(pool: &DbPool) -> Result<f64, StorageError> {
332 get_avg_reply_engagement_for(pool, DEFAULT_ACCOUNT_ID).await
333}
334
335pub async fn get_avg_tweet_engagement_for(
337 pool: &DbPool,
338 account_id: &str,
339) -> Result<f64, StorageError> {
340 let row: (f64,) = sqlx::query_as(
341 "SELECT COALESCE(AVG(performance_score), 0.0) FROM tweet_performance WHERE account_id = ?",
342 )
343 .bind(account_id)
344 .fetch_one(pool)
345 .await
346 .map_err(|e| StorageError::Query { source: e })?;
347
348 Ok(row.0)
349}
350
351pub async fn get_avg_tweet_engagement(pool: &DbPool) -> Result<f64, StorageError> {
353 get_avg_tweet_engagement_for(pool, DEFAULT_ACCOUNT_ID).await
354}
355
356pub async fn get_performance_counts_for(
358 pool: &DbPool,
359 account_id: &str,
360) -> Result<(i64, i64), StorageError> {
361 let reply_count: (i64,) =
362 sqlx::query_as("SELECT COUNT(*) FROM reply_performance WHERE account_id = ?")
363 .bind(account_id)
364 .fetch_one(pool)
365 .await
366 .map_err(|e| StorageError::Query { source: e })?;
367
368 let tweet_count: (i64,) =
369 sqlx::query_as("SELECT COUNT(*) FROM tweet_performance WHERE account_id = ?")
370 .bind(account_id)
371 .fetch_one(pool)
372 .await
373 .map_err(|e| StorageError::Query { source: e })?;
374
375 Ok((reply_count.0, tweet_count.0))
376}
377
378pub async fn get_performance_counts(pool: &DbPool) -> Result<(i64, i64), StorageError> {
380 get_performance_counts_for(pool, DEFAULT_ACCOUNT_ID).await
381}
382
383pub fn compute_performance_score(likes: i64, replies: i64, retweets: i64, impressions: i64) -> f64 {
387 let numerator = (likes * 3 + replies * 5 + retweets * 4) as f64;
388 let denominator = impressions.max(1) as f64;
389 numerator / denominator * 1000.0
390}
391
392#[derive(Debug, Clone, serde::Serialize)]
398pub struct FollowerSummary {
399 pub current: i64,
400 pub change_7d: i64,
401 pub change_30d: i64,
402}
403
404#[derive(Debug, Clone, serde::Serialize)]
406pub struct ActionsSummary {
407 pub replies: i64,
408 pub tweets: i64,
409 pub threads: i64,
410}
411
412#[derive(Debug, Clone, serde::Serialize)]
414pub struct EngagementSummary {
415 pub avg_reply_score: f64,
416 pub avg_tweet_score: f64,
417 pub total_replies_sent: i64,
418 pub total_tweets_posted: i64,
419}
420
421#[derive(Debug, Clone, serde::Serialize)]
423pub struct AnalyticsSummary {
424 pub followers: FollowerSummary,
425 pub actions_today: ActionsSummary,
426 pub engagement: EngagementSummary,
427 pub top_topics: Vec<ContentScore>,
428}
429
430pub async fn get_analytics_summary_for(
435 pool: &DbPool,
436 account_id: &str,
437) -> Result<AnalyticsSummary, StorageError> {
438 let snapshots = get_follower_snapshots_for(pool, account_id, 90).await?;
440 let current = snapshots.first().map_or(0, |s| s.follower_count);
441
442 let today = Utc::now().date_naive();
445 let follower_at_or_before = |days: i64| -> i64 {
446 snapshots
447 .iter()
448 .find(|s| {
449 NaiveDate::parse_from_str(&s.snapshot_date, "%Y-%m-%d")
450 .map(|d| (today - d).num_days() >= days)
451 .unwrap_or(false)
452 })
453 .map_or(current, |s| s.follower_count)
454 };
455
456 let change_7d = if snapshots.len() >= 2 {
457 current - follower_at_or_before(7)
458 } else {
459 0
460 };
461 let change_30d = if snapshots.len() >= 2 {
462 current - follower_at_or_before(30)
463 } else {
464 0
465 };
466
467 let today = Utc::now().format("%Y-%m-%dT00:00:00Z").to_string();
469 let counts = super::action_log::get_action_counts_since(pool, &today).await?;
470 let actions_today = ActionsSummary {
471 replies: *counts.get("reply").unwrap_or(&0),
472 tweets: *counts.get("tweet").unwrap_or(&0),
473 threads: *counts.get("thread").unwrap_or(&0),
474 };
475
476 let avg_reply_score = get_avg_reply_engagement_for(pool, account_id).await?;
478 let avg_tweet_score = get_avg_tweet_engagement_for(pool, account_id).await?;
479 let (total_replies_sent, total_tweets_posted) =
480 get_performance_counts_for(pool, account_id).await?;
481
482 let top_topics = get_top_topics_for(pool, account_id, 5).await?;
484
485 Ok(AnalyticsSummary {
486 followers: FollowerSummary {
487 current,
488 change_7d,
489 change_30d,
490 },
491 actions_today,
492 engagement: EngagementSummary {
493 avg_reply_score,
494 avg_tweet_score,
495 total_replies_sent,
496 total_tweets_posted,
497 },
498 top_topics,
499 })
500}
501
502pub async fn get_analytics_summary(pool: &DbPool) -> Result<AnalyticsSummary, StorageError> {
507 get_analytics_summary_for(pool, DEFAULT_ACCOUNT_ID).await
508}
509
510#[derive(Debug, Clone, serde::Serialize)]
516pub struct PerformanceItem {
517 pub content_type: String,
519 pub content_preview: String,
521 pub likes: i64,
523 pub replies_received: i64,
525 pub retweets: i64,
527 pub impressions: i64,
529 pub performance_score: f64,
531 pub posted_at: String,
533}
534
535type PerformanceRow = (String, String, i64, i64, i64, i64, f64, String);
537
538pub async fn get_recent_performance_items_for(
543 pool: &DbPool,
544 account_id: &str,
545 limit: u32,
546) -> Result<Vec<PerformanceItem>, StorageError> {
547 let rows: Vec<PerformanceRow> = sqlx::query_as(
548 "SELECT 'reply' as content_type, \
549 SUBSTR(rs.reply_content, 1, 120) as content_preview, \
550 rp.likes_received, rp.replies_received, 0 as retweets, \
551 rp.impressions, rp.performance_score, rs.created_at as posted_at \
552 FROM reply_performance rp \
553 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
554 WHERE rp.account_id = ? \
555 UNION ALL \
556 SELECT 'tweet' as content_type, \
557 SUBSTR(ot.content, 1, 120) as content_preview, \
558 tp.likes_received, tp.replies_received, tp.retweets_received, \
559 tp.impressions, tp.performance_score, ot.created_at as posted_at \
560 FROM tweet_performance tp \
561 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
562 WHERE tp.account_id = ? \
563 ORDER BY posted_at DESC \
564 LIMIT ?",
565 )
566 .bind(account_id)
567 .bind(account_id)
568 .bind(limit)
569 .fetch_all(pool)
570 .await
571 .map_err(|e| StorageError::Query { source: e })?;
572
573 Ok(rows
574 .into_iter()
575 .map(|r| PerformanceItem {
576 content_type: r.0,
577 content_preview: r.1,
578 likes: r.2,
579 replies_received: r.3,
580 retweets: r.4,
581 impressions: r.5,
582 performance_score: r.6,
583 posted_at: r.7,
584 })
585 .collect())
586}
587
588pub async fn get_recent_performance_items(
593 pool: &DbPool,
594 limit: u32,
595) -> Result<Vec<PerformanceItem>, StorageError> {
596 get_recent_performance_items_for(pool, DEFAULT_ACCOUNT_ID, limit).await
597}
598
599#[derive(Debug, Clone, serde::Serialize)]
601pub struct HourlyPerformance {
602 pub hour: i64,
604 pub avg_engagement: f64,
606 pub post_count: i64,
608}
609
610pub async fn get_optimal_posting_times_for(
612 pool: &DbPool,
613 account_id: &str,
614) -> Result<Vec<HourlyPerformance>, StorageError> {
615 let rows: Vec<(i64, f64, i64)> = sqlx::query_as(
616 "SELECT
617 CAST(strftime('%H', ot.created_at) AS INTEGER) as hour,
618 COALESCE(AVG(tp.performance_score), 0.0) as avg_engagement,
619 COUNT(*) as post_count
620 FROM original_tweets ot
621 LEFT JOIN tweet_performance tp ON tp.tweet_id = ot.tweet_id
622 WHERE ot.account_id = ? AND ot.status = 'sent' AND ot.tweet_id IS NOT NULL
623 GROUP BY hour
624 ORDER BY avg_engagement DESC",
625 )
626 .bind(account_id)
627 .fetch_all(pool)
628 .await
629 .map_err(|e| StorageError::Query { source: e })?;
630
631 Ok(rows
632 .into_iter()
633 .map(|(hour, avg_engagement, post_count)| HourlyPerformance {
634 hour,
635 avg_engagement,
636 post_count,
637 })
638 .collect())
639}
640
641pub async fn get_optimal_posting_times(
643 pool: &DbPool,
644) -> Result<Vec<HourlyPerformance>, StorageError> {
645 get_optimal_posting_times_for(pool, DEFAULT_ACCOUNT_ID).await
646}
647
648pub async fn update_tweet_archetype(
654 pool: &DbPool,
655 tweet_id: &str,
656 archetype_vibe: &str,
657) -> Result<(), StorageError> {
658 sqlx::query("UPDATE tweet_performance SET archetype_vibe = ? WHERE tweet_id = ?")
659 .bind(archetype_vibe)
660 .bind(tweet_id)
661 .execute(pool)
662 .await
663 .map_err(|e| StorageError::Query { source: e })?;
664 Ok(())
665}
666
667pub async fn update_reply_archetype(
669 pool: &DbPool,
670 reply_id: &str,
671 archetype_vibe: &str,
672) -> Result<(), StorageError> {
673 sqlx::query("UPDATE reply_performance SET archetype_vibe = ? WHERE reply_id = ?")
674 .bind(archetype_vibe)
675 .bind(reply_id)
676 .execute(pool)
677 .await
678 .map_err(|e| StorageError::Query { source: e })?;
679 Ok(())
680}
681
682pub async fn update_tweet_engagement_score(
684 pool: &DbPool,
685 tweet_id: &str,
686 score: f64,
687) -> Result<(), StorageError> {
688 sqlx::query("UPDATE tweet_performance SET engagement_score = ? WHERE tweet_id = ?")
689 .bind(score)
690 .bind(tweet_id)
691 .execute(pool)
692 .await
693 .map_err(|e| StorageError::Query { source: e })?;
694 Ok(())
695}
696
697pub async fn update_reply_engagement_score(
699 pool: &DbPool,
700 reply_id: &str,
701 score: f64,
702) -> Result<(), StorageError> {
703 sqlx::query("UPDATE reply_performance SET engagement_score = ? WHERE reply_id = ?")
704 .bind(score)
705 .bind(reply_id)
706 .execute(pool)
707 .await
708 .map_err(|e| StorageError::Query { source: e })?;
709 Ok(())
710}
711
712pub async fn get_max_performance_score(pool: &DbPool) -> Result<f64, StorageError> {
716 let row: (f64,) = sqlx::query_as(
717 "SELECT COALESCE(MAX(max_score), 0.0) FROM (\
718 SELECT MAX(performance_score) as max_score FROM tweet_performance \
719 UNION ALL \
720 SELECT MAX(performance_score) as max_score FROM reply_performance\
721 )",
722 )
723 .fetch_one(pool)
724 .await
725 .map_err(|e| StorageError::Query { source: e })?;
726
727 Ok(row.0)
728}
729
730#[derive(Debug, Clone)]
732pub struct AncestorRow {
733 pub content_type: String,
735 pub id: String,
737 pub content_preview: String,
739 pub archetype_vibe: Option<String>,
741 pub engagement_score: Option<f64>,
743 pub performance_score: f64,
745 pub posted_at: String,
747}
748
749type AncestorQueryRow = (
751 String,
752 String,
753 String,
754 Option<String>,
755 Option<f64>,
756 f64,
757 String,
758);
759
760fn ancestor_row_from_tuple(r: AncestorQueryRow) -> AncestorRow {
762 AncestorRow {
763 content_type: r.0,
764 id: r.1,
765 content_preview: r.2,
766 archetype_vibe: r.3,
767 engagement_score: r.4,
768 performance_score: r.5,
769 posted_at: r.6,
770 }
771}
772
773pub async fn get_scored_ancestors(
779 pool: &DbPool,
780 topic_keywords: &[String],
781 min_score: f64,
782 limit: u32,
783) -> Result<Vec<AncestorRow>, StorageError> {
784 if topic_keywords.is_empty() {
785 let rows: Vec<AncestorQueryRow> = sqlx::query_as(
787 "SELECT 'tweet' as content_type, tp.tweet_id, \
788 SUBSTR(ot.content, 1, 120), \
789 tp.archetype_vibe, tp.engagement_score, tp.performance_score, \
790 ot.created_at \
791 FROM tweet_performance tp \
792 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
793 WHERE tp.engagement_score IS NOT NULL \
794 AND tp.engagement_score >= ? \
795 UNION ALL \
796 SELECT 'reply', rp.reply_id, SUBSTR(rs.reply_content, 1, 120), \
797 rp.archetype_vibe, rp.engagement_score, rp.performance_score, \
798 rs.created_at \
799 FROM reply_performance rp \
800 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
801 WHERE rp.engagement_score IS NOT NULL \
802 AND rp.engagement_score >= ? \
803 ORDER BY engagement_score DESC \
804 LIMIT ?",
805 )
806 .bind(min_score)
807 .bind(min_score)
808 .bind(limit)
809 .fetch_all(pool)
810 .await
811 .map_err(|e| StorageError::Query { source: e })?;
812
813 return Ok(rows.into_iter().map(ancestor_row_from_tuple).collect());
814 }
815
816 let topic_placeholders: String = (0..topic_keywords.len())
819 .map(|_| "?".to_string())
820 .collect::<Vec<_>>()
821 .join(", ");
822
823 let like_conditions: Vec<String> = (0..topic_keywords.len())
824 .map(|_| "rs.reply_content LIKE '%' || ? || '%'".to_string())
825 .collect();
826 let like_clause = like_conditions.join(" OR ");
827
828 let query_str = format!(
829 "SELECT 'tweet' as content_type, tp.tweet_id, \
830 SUBSTR(ot.content, 1, 120), \
831 tp.archetype_vibe, tp.engagement_score, tp.performance_score, \
832 ot.created_at \
833 FROM tweet_performance tp \
834 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
835 WHERE tp.engagement_score IS NOT NULL \
836 AND tp.engagement_score >= ? \
837 AND (ot.topic IN ({topic_placeholders})) \
838 UNION ALL \
839 SELECT 'reply', rp.reply_id, SUBSTR(rs.reply_content, 1, 120), \
840 rp.archetype_vibe, rp.engagement_score, rp.performance_score, \
841 rs.created_at \
842 FROM reply_performance rp \
843 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
844 WHERE rp.engagement_score IS NOT NULL \
845 AND rp.engagement_score >= ? \
846 AND ({like_clause}) \
847 ORDER BY engagement_score DESC \
848 LIMIT ?"
849 );
850
851 let mut query = sqlx::query_as::<_, AncestorQueryRow>(&query_str);
852
853 query = query.bind(min_score);
855 for kw in topic_keywords {
856 query = query.bind(kw);
857 }
858 query = query.bind(min_score);
860 for kw in topic_keywords {
861 query = query.bind(kw);
862 }
863 query = query.bind(limit);
864
865 let rows = query
866 .fetch_all(pool)
867 .await
868 .map_err(|e| StorageError::Query { source: e })?;
869
870 Ok(rows.into_iter().map(ancestor_row_from_tuple).collect())
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876 use crate::storage::init_test_db;
877
878 #[tokio::test]
879 async fn upsert_and_get_follower_snapshot() {
880 let pool = init_test_db().await.expect("init db");
881
882 upsert_follower_snapshot(&pool, 1000, 200, 500)
883 .await
884 .expect("upsert");
885
886 let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
887 assert_eq!(snapshots.len(), 1);
888 assert_eq!(snapshots[0].follower_count, 1000);
889 assert_eq!(snapshots[0].following_count, 200);
890 assert_eq!(snapshots[0].tweet_count, 500);
891 }
892
893 #[tokio::test]
894 async fn upsert_follower_snapshot_updates_existing() {
895 let pool = init_test_db().await.expect("init db");
896
897 upsert_follower_snapshot(&pool, 1000, 200, 500)
898 .await
899 .expect("upsert");
900 upsert_follower_snapshot(&pool, 1050, 201, 510)
901 .await
902 .expect("upsert again");
903
904 let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
905 assert_eq!(snapshots.len(), 1);
906 assert_eq!(snapshots[0].follower_count, 1050);
907 }
908
909 #[tokio::test]
910 async fn upsert_reply_performance_works() {
911 let pool = init_test_db().await.expect("init db");
912
913 upsert_reply_performance(&pool, "r1", 5, 2, 100, 55.0)
914 .await
915 .expect("upsert");
916
917 upsert_reply_performance(&pool, "r1", 10, 3, 200, 75.0)
919 .await
920 .expect("update");
921 }
922
923 #[tokio::test]
924 async fn upsert_tweet_performance_works() {
925 let pool = init_test_db().await.expect("init db");
926
927 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
928 .await
929 .expect("upsert");
930
931 upsert_tweet_performance(&pool, "tw1", 20, 10, 5, 1000, 95.0)
933 .await
934 .expect("update");
935 }
936
937 #[tokio::test]
938 async fn update_and_get_content_scores() {
939 let pool = init_test_db().await.expect("init db");
940
941 update_content_score(&pool, "rust", "tip", 80.0)
942 .await
943 .expect("update");
944 update_content_score(&pool, "rust", "tip", 90.0)
945 .await
946 .expect("update");
947 update_content_score(&pool, "python", "list", 60.0)
948 .await
949 .expect("update");
950
951 let top = get_top_topics(&pool, 10).await.expect("get");
952 assert_eq!(top.len(), 2);
953 assert_eq!(top[0].topic, "rust");
955 assert_eq!(top[0].total_posts, 2);
956 assert!(top[0].avg_performance > 80.0);
957 }
958
959 #[test]
960 fn compute_performance_score_basic() {
961 let score = compute_performance_score(10, 5, 3, 1000);
962 assert!((score - 67.0).abs() < 0.01);
964 }
965
966 #[test]
967 fn compute_performance_score_zero_impressions() {
968 let score = compute_performance_score(10, 5, 3, 0);
969 assert!((score - 67000.0).abs() < 0.01);
971 }
972
973 #[test]
974 fn compute_performance_score_all_zero() {
975 let score = compute_performance_score(0, 0, 0, 0);
976 assert!((score - 0.0).abs() < 0.01);
977 }
978
979 #[tokio::test]
980 async fn avg_reply_engagement_empty() {
981 let pool = init_test_db().await.expect("init db");
982 let avg = get_avg_reply_engagement(&pool).await.expect("avg");
983 assert!((avg - 0.0).abs() < 0.01);
984 }
985
986 #[tokio::test]
987 async fn avg_reply_engagement_with_data() {
988 let pool = init_test_db().await.expect("init db");
989 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
990 .await
991 .expect("upsert");
992 upsert_reply_performance(&pool, "r2", 20, 10, 2000, 80.0)
993 .await
994 .expect("upsert");
995
996 let avg = get_avg_reply_engagement(&pool).await.expect("avg");
997 assert!((avg - 73.5).abs() < 0.01);
999 }
1000
1001 #[tokio::test]
1002 async fn avg_tweet_engagement_empty() {
1003 let pool = init_test_db().await.expect("init db");
1004 let avg = get_avg_tweet_engagement(&pool).await.expect("avg");
1005 assert!((avg - 0.0).abs() < 0.01);
1006 }
1007
1008 #[tokio::test]
1009 async fn performance_counts_empty() {
1010 let pool = init_test_db().await.expect("init db");
1011 let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
1012 assert_eq!(replies, 0);
1013 assert_eq!(tweets, 0);
1014 }
1015
1016 #[tokio::test]
1017 async fn performance_counts_with_data() {
1018 let pool = init_test_db().await.expect("init db");
1019 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1020 .await
1021 .expect("upsert");
1022 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1023 .await
1024 .expect("upsert");
1025 upsert_tweet_performance(&pool, "tw2", 20, 10, 5, 1000, 95.0)
1026 .await
1027 .expect("upsert");
1028
1029 let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
1030 assert_eq!(replies, 1);
1031 assert_eq!(tweets, 2);
1032 }
1033
1034 #[tokio::test]
1035 async fn analytics_summary_empty() {
1036 let pool = init_test_db().await.expect("init db");
1037 let summary = get_analytics_summary(&pool).await.expect("summary");
1038 assert_eq!(summary.followers.current, 0);
1039 assert_eq!(summary.followers.change_7d, 0);
1040 assert_eq!(summary.followers.change_30d, 0);
1041 assert_eq!(summary.actions_today.replies, 0);
1042 assert!((summary.engagement.avg_reply_score - 0.0).abs() < 0.01);
1043 assert!(summary.top_topics.is_empty());
1044 }
1045
1046 #[tokio::test]
1047 async fn analytics_summary_with_data() {
1048 let pool = init_test_db().await.expect("init db");
1049
1050 upsert_follower_snapshot(&pool, 1000, 200, 500)
1052 .await
1053 .expect("upsert");
1054
1055 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1057 .await
1058 .expect("upsert");
1059
1060 update_content_score(&pool, "rust", "tip", 80.0)
1062 .await
1063 .expect("score");
1064 update_content_score(&pool, "ai", "thread", 60.0)
1065 .await
1066 .expect("score");
1067
1068 let summary = get_analytics_summary(&pool).await.expect("summary");
1069 assert_eq!(summary.followers.current, 1000);
1070 assert!(summary.engagement.avg_reply_score > 0.0);
1071 assert_eq!(summary.engagement.total_replies_sent, 1);
1072 assert_eq!(summary.top_topics.len(), 2);
1073 assert_eq!(summary.top_topics[0].topic, "rust");
1074 }
1075
1076 #[tokio::test]
1077 async fn recent_performance_items_empty() {
1078 let pool = init_test_db().await.expect("init db");
1079 let items = get_recent_performance_items(&pool, 10).await.expect("get");
1080 assert!(items.is_empty());
1081 }
1082
1083 #[tokio::test]
1084 async fn recent_performance_items_with_data() {
1085 let pool = init_test_db().await.expect("init db");
1086
1087 let reply = crate::storage::replies::ReplySent {
1089 id: 0,
1090 target_tweet_id: "t1".to_string(),
1091 reply_tweet_id: Some("r1".to_string()),
1092 reply_content: "Great point about testing!".to_string(),
1093 llm_provider: Some("openai".to_string()),
1094 llm_model: Some("gpt-4o".to_string()),
1095 created_at: "2026-02-23T12:00:00Z".to_string(),
1096 status: "sent".to_string(),
1097 error_message: None,
1098 };
1099 crate::storage::replies::insert_reply(&pool, &reply)
1100 .await
1101 .expect("insert reply");
1102 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1103 .await
1104 .expect("upsert perf");
1105
1106 let items = get_recent_performance_items(&pool, 10).await.expect("get");
1107 assert_eq!(items.len(), 1);
1108 assert_eq!(items[0].content_type, "reply");
1109 assert!(items[0].content_preview.contains("testing"));
1110 assert_eq!(items[0].likes, 10);
1111 }
1112
1113 #[tokio::test]
1118 async fn update_and_get_tweet_archetype() {
1119 let pool = init_test_db().await.expect("init db");
1120
1121 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1122 .await
1123 .expect("upsert");
1124
1125 update_tweet_archetype(&pool, "tw1", "list")
1126 .await
1127 .expect("update");
1128
1129 let row: (Option<String>,) =
1130 sqlx::query_as("SELECT archetype_vibe FROM tweet_performance WHERE tweet_id = ?")
1131 .bind("tw1")
1132 .fetch_one(&pool)
1133 .await
1134 .expect("query");
1135 assert_eq!(row.0.as_deref(), Some("list"));
1136 }
1137
1138 #[tokio::test]
1139 async fn update_and_get_reply_archetype() {
1140 let pool = init_test_db().await.expect("init db");
1141
1142 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1143 .await
1144 .expect("upsert");
1145
1146 update_reply_archetype(&pool, "r1", "ask_question")
1147 .await
1148 .expect("update");
1149
1150 let row: (Option<String>,) =
1151 sqlx::query_as("SELECT archetype_vibe FROM reply_performance WHERE reply_id = ?")
1152 .bind("r1")
1153 .fetch_one(&pool)
1154 .await
1155 .expect("query");
1156 assert_eq!(row.0.as_deref(), Some("ask_question"));
1157 }
1158
1159 #[tokio::test]
1160 async fn update_and_get_engagement_score() {
1161 let pool = init_test_db().await.expect("init db");
1162
1163 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1164 .await
1165 .expect("upsert");
1166
1167 update_tweet_engagement_score(&pool, "tw1", 0.85)
1168 .await
1169 .expect("update");
1170
1171 let row: (Option<f64>,) =
1172 sqlx::query_as("SELECT engagement_score FROM tweet_performance WHERE tweet_id = ?")
1173 .bind("tw1")
1174 .fetch_one(&pool)
1175 .await
1176 .expect("query");
1177 assert!((row.0.unwrap() - 0.85).abs() < 0.001);
1178 }
1179
1180 #[tokio::test]
1181 async fn get_max_performance_score_empty() {
1182 let pool = init_test_db().await.expect("init db");
1183 let max = get_max_performance_score(&pool).await.expect("max");
1184 assert!((max - 0.0).abs() < 0.01);
1185 }
1186
1187 #[tokio::test]
1188 async fn get_max_performance_score_with_data() {
1189 let pool = init_test_db().await.expect("init db");
1190
1191 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1192 .await
1193 .expect("upsert");
1194 upsert_reply_performance(&pool, "r1", 20, 10, 2000, 95.0)
1195 .await
1196 .expect("upsert");
1197
1198 let max = get_max_performance_score(&pool).await.expect("max");
1199 assert!((max - 95.0).abs() < 0.01);
1200 }
1201
1202 #[tokio::test]
1203 async fn get_scored_ancestors_empty() {
1204 let pool = init_test_db().await.expect("init db");
1205 let ancestors = get_scored_ancestors(&pool, &[], 0.1, 10)
1206 .await
1207 .expect("query");
1208 assert!(ancestors.is_empty());
1209 }
1210
1211 #[tokio::test]
1212 async fn get_scored_ancestors_returns_scored_items() {
1213 let pool = init_test_db().await.expect("init db");
1214
1215 sqlx::query(
1217 "INSERT INTO original_tweets (account_id, tweet_id, content, topic, status, created_at) \
1218 VALUES ('00000000-0000-0000-0000-000000000000', 'tw1', 'Great Rust testing tips', 'rust', 'sent', '2026-02-27T10:00:00Z')",
1219 )
1220 .execute(&pool)
1221 .await
1222 .expect("insert tweet");
1223
1224 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1225 .await
1226 .expect("upsert perf");
1227 update_tweet_engagement_score(&pool, "tw1", 0.85)
1228 .await
1229 .expect("update score");
1230
1231 let ancestors = get_scored_ancestors(&pool, &[], 0.1, 10)
1232 .await
1233 .expect("query");
1234 assert_eq!(ancestors.len(), 1);
1235 assert_eq!(ancestors[0].content_type, "tweet");
1236 assert_eq!(ancestors[0].id, "tw1");
1237 assert!((ancestors[0].engagement_score.unwrap() - 0.85).abs() < 0.001);
1238 }
1239
1240 #[tokio::test]
1241 async fn get_scored_ancestors_filters_low_engagement() {
1242 let pool = init_test_db().await.expect("init db");
1243
1244 sqlx::query(
1245 "INSERT INTO original_tweets (account_id, tweet_id, content, topic, status, created_at) \
1246 VALUES ('00000000-0000-0000-0000-000000000000', 'tw1', 'Low performer', 'rust', 'sent', '2026-02-27T10:00:00Z')",
1247 )
1248 .execute(&pool)
1249 .await
1250 .expect("insert tweet");
1251
1252 upsert_tweet_performance(&pool, "tw1", 1, 0, 0, 500, 5.0)
1253 .await
1254 .expect("upsert perf");
1255 update_tweet_engagement_score(&pool, "tw1", 0.05)
1256 .await
1257 .expect("update score");
1258
1259 let ancestors = get_scored_ancestors(&pool, &[], 0.1, 10)
1261 .await
1262 .expect("query");
1263 assert!(ancestors.is_empty());
1264 }
1265}