Skip to main content

tuitbot_core/storage/
analytics.rs

1//! CRUD operations for analytics tables.
2//!
3//! Manages follower snapshots, reply/tweet performance metrics,
4//! and content score running averages.
5
6use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9use chrono::{NaiveDate, Utc};
10
11// ============================================================================
12// Follower snapshots
13// ============================================================================
14
15/// A daily follower snapshot.
16#[derive(Debug, Clone, serde::Serialize)]
17pub struct FollowerSnapshot {
18    pub snapshot_date: String,
19    pub follower_count: i64,
20    pub following_count: i64,
21    pub tweet_count: i64,
22}
23
24/// Upsert a follower snapshot for today for a specific account.
25pub async fn upsert_follower_snapshot_for(
26    pool: &DbPool,
27    account_id: &str,
28    follower_count: i64,
29    following_count: i64,
30    tweet_count: i64,
31) -> Result<(), StorageError> {
32    sqlx::query(
33        "INSERT INTO follower_snapshots (account_id, snapshot_date, follower_count, following_count, tweet_count) \
34         VALUES (?, date('now'), ?, ?, ?) \
35         ON CONFLICT(snapshot_date) DO UPDATE SET \
36         account_id = excluded.account_id, \
37         follower_count = excluded.follower_count, \
38         following_count = excluded.following_count, \
39         tweet_count = excluded.tweet_count",
40    )
41    .bind(account_id)
42    .bind(follower_count)
43    .bind(following_count)
44    .bind(tweet_count)
45    .execute(pool)
46    .await
47    .map_err(|e| StorageError::Query { source: e })?;
48    Ok(())
49}
50
51/// Upsert a follower snapshot for today.
52pub async fn upsert_follower_snapshot(
53    pool: &DbPool,
54    follower_count: i64,
55    following_count: i64,
56    tweet_count: i64,
57) -> Result<(), StorageError> {
58    upsert_follower_snapshot_for(
59        pool,
60        DEFAULT_ACCOUNT_ID,
61        follower_count,
62        following_count,
63        tweet_count,
64    )
65    .await
66}
67
68/// Get the most recent N follower snapshots for a specific account, newest first.
69pub async fn get_follower_snapshots_for(
70    pool: &DbPool,
71    account_id: &str,
72    limit: u32,
73) -> Result<Vec<FollowerSnapshot>, StorageError> {
74    let rows: Vec<(String, i64, i64, i64)> = sqlx::query_as(
75        "SELECT snapshot_date, follower_count, following_count, tweet_count \
76         FROM follower_snapshots WHERE account_id = ? ORDER BY snapshot_date DESC LIMIT ?",
77    )
78    .bind(account_id)
79    .bind(limit)
80    .fetch_all(pool)
81    .await
82    .map_err(|e| StorageError::Query { source: e })?;
83
84    Ok(rows
85        .into_iter()
86        .map(|r| FollowerSnapshot {
87            snapshot_date: r.0,
88            follower_count: r.1,
89            following_count: r.2,
90            tweet_count: r.3,
91        })
92        .collect())
93}
94
95/// Get the most recent N follower snapshots, newest first.
96pub async fn get_follower_snapshots(
97    pool: &DbPool,
98    limit: u32,
99) -> Result<Vec<FollowerSnapshot>, StorageError> {
100    get_follower_snapshots_for(pool, DEFAULT_ACCOUNT_ID, limit).await
101}
102
103// ============================================================================
104// Reply performance
105// ============================================================================
106
107/// Store or update reply performance metrics for a specific account.
108pub async fn upsert_reply_performance_for(
109    pool: &DbPool,
110    account_id: &str,
111    reply_id: &str,
112    likes: i64,
113    replies: i64,
114    impressions: i64,
115    score: f64,
116) -> Result<(), StorageError> {
117    sqlx::query(
118        "INSERT INTO reply_performance (account_id, reply_id, likes_received, replies_received, impressions, performance_score) \
119         VALUES (?, ?, ?, ?, ?, ?) \
120         ON CONFLICT(reply_id) DO UPDATE SET \
121         likes_received = excluded.likes_received, \
122         replies_received = excluded.replies_received, \
123         impressions = excluded.impressions, \
124         performance_score = excluded.performance_score, \
125         measured_at = datetime('now')",
126    )
127    .bind(account_id)
128    .bind(reply_id)
129    .bind(likes)
130    .bind(replies)
131    .bind(impressions)
132    .bind(score)
133    .execute(pool)
134    .await
135    .map_err(|e| StorageError::Query { source: e })?;
136    Ok(())
137}
138
139/// Store or update reply performance metrics.
140pub async fn upsert_reply_performance(
141    pool: &DbPool,
142    reply_id: &str,
143    likes: i64,
144    replies: i64,
145    impressions: i64,
146    score: f64,
147) -> Result<(), StorageError> {
148    upsert_reply_performance_for(
149        pool,
150        DEFAULT_ACCOUNT_ID,
151        reply_id,
152        likes,
153        replies,
154        impressions,
155        score,
156    )
157    .await
158}
159
160// ============================================================================
161// Tweet performance
162// ============================================================================
163
164/// Store or update tweet performance metrics for a specific account.
165#[allow(clippy::too_many_arguments)]
166pub async fn upsert_tweet_performance_for(
167    pool: &DbPool,
168    account_id: &str,
169    tweet_id: &str,
170    likes: i64,
171    retweets: i64,
172    replies: i64,
173    impressions: i64,
174    score: f64,
175) -> Result<(), StorageError> {
176    sqlx::query(
177        "INSERT INTO tweet_performance (account_id, tweet_id, likes_received, retweets_received, replies_received, impressions, performance_score) \
178         VALUES (?, ?, ?, ?, ?, ?, ?) \
179         ON CONFLICT(tweet_id) DO UPDATE SET \
180         likes_received = excluded.likes_received, \
181         retweets_received = excluded.retweets_received, \
182         replies_received = excluded.replies_received, \
183         impressions = excluded.impressions, \
184         performance_score = excluded.performance_score, \
185         measured_at = datetime('now')",
186    )
187    .bind(account_id)
188    .bind(tweet_id)
189    .bind(likes)
190    .bind(retweets)
191    .bind(replies)
192    .bind(impressions)
193    .bind(score)
194    .execute(pool)
195    .await
196    .map_err(|e| StorageError::Query { source: e })?;
197    Ok(())
198}
199
200/// Store or update tweet performance metrics.
201pub async fn upsert_tweet_performance(
202    pool: &DbPool,
203    tweet_id: &str,
204    likes: i64,
205    retweets: i64,
206    replies: i64,
207    impressions: i64,
208    score: f64,
209) -> Result<(), StorageError> {
210    upsert_tweet_performance_for(
211        pool,
212        DEFAULT_ACCOUNT_ID,
213        tweet_id,
214        likes,
215        retweets,
216        replies,
217        impressions,
218        score,
219    )
220    .await
221}
222
223// ============================================================================
224// Content scores
225// ============================================================================
226
227/// A topic/format performance score.
228#[derive(Debug, Clone, serde::Serialize)]
229pub struct ContentScore {
230    pub topic: String,
231    pub format: String,
232    pub total_posts: i64,
233    pub avg_performance: f64,
234}
235
236/// Update the running average for a topic/format pair for a specific account.
237///
238/// Uses incremental mean: new_avg = old_avg + (score - old_avg) / new_count.
239pub async fn update_content_score_for(
240    pool: &DbPool,
241    account_id: &str,
242    topic: &str,
243    format: &str,
244    new_score: f64,
245) -> Result<(), StorageError> {
246    // Insert or update with incremental average
247    sqlx::query(
248        "INSERT INTO content_scores (account_id, topic, format, total_posts, avg_performance) \
249         VALUES (?, ?, ?, 1, ?) \
250         ON CONFLICT(topic, format) DO UPDATE SET \
251         account_id = excluded.account_id, \
252         total_posts = content_scores.total_posts + 1, \
253         avg_performance = content_scores.avg_performance + \
254         (? - content_scores.avg_performance) / (content_scores.total_posts + 1)",
255    )
256    .bind(account_id)
257    .bind(topic)
258    .bind(format)
259    .bind(new_score)
260    .bind(new_score)
261    .execute(pool)
262    .await
263    .map_err(|e| StorageError::Query { source: e })?;
264    Ok(())
265}
266
267/// Update the running average for a topic/format pair.
268///
269/// Uses incremental mean: new_avg = old_avg + (score - old_avg) / new_count.
270pub async fn update_content_score(
271    pool: &DbPool,
272    topic: &str,
273    format: &str,
274    new_score: f64,
275) -> Result<(), StorageError> {
276    update_content_score_for(pool, DEFAULT_ACCOUNT_ID, topic, format, new_score).await
277}
278
279/// Get top-performing topics for a specific account ordered by average performance descending.
280pub async fn get_top_topics_for(
281    pool: &DbPool,
282    account_id: &str,
283    limit: u32,
284) -> Result<Vec<ContentScore>, StorageError> {
285    let rows: Vec<(String, String, i64, f64)> = sqlx::query_as(
286        "SELECT topic, format, total_posts, avg_performance \
287         FROM content_scores \
288         WHERE account_id = ? \
289         ORDER BY avg_performance DESC \
290         LIMIT ?",
291    )
292    .bind(account_id)
293    .bind(limit)
294    .fetch_all(pool)
295    .await
296    .map_err(|e| StorageError::Query { source: e })?;
297
298    Ok(rows
299        .into_iter()
300        .map(|r| ContentScore {
301            topic: r.0,
302            format: r.1,
303            total_posts: r.2,
304            avg_performance: r.3,
305        })
306        .collect())
307}
308
309/// Get top-performing topics ordered by average performance descending.
310pub async fn get_top_topics(pool: &DbPool, limit: u32) -> Result<Vec<ContentScore>, StorageError> {
311    get_top_topics_for(pool, DEFAULT_ACCOUNT_ID, limit).await
312}
313
314/// Get average reply engagement rate for a specific account (avg performance_score across all measured replies).
315pub async fn get_avg_reply_engagement_for(
316    pool: &DbPool,
317    account_id: &str,
318) -> Result<f64, StorageError> {
319    let row: (f64,) = sqlx::query_as(
320        "SELECT COALESCE(AVG(performance_score), 0.0) FROM reply_performance WHERE account_id = ?",
321    )
322    .bind(account_id)
323    .fetch_one(pool)
324    .await
325    .map_err(|e| StorageError::Query { source: e })?;
326
327    Ok(row.0)
328}
329
330/// Get average reply engagement rate (avg performance_score across all measured replies).
331pub async fn get_avg_reply_engagement(pool: &DbPool) -> Result<f64, StorageError> {
332    get_avg_reply_engagement_for(pool, DEFAULT_ACCOUNT_ID).await
333}
334
335/// Get average tweet engagement rate for a specific account (avg performance_score across all measured tweets).
336pub async fn get_avg_tweet_engagement_for(
337    pool: &DbPool,
338    account_id: &str,
339) -> Result<f64, StorageError> {
340    let row: (f64,) = sqlx::query_as(
341        "SELECT COALESCE(AVG(performance_score), 0.0) FROM tweet_performance WHERE account_id = ?",
342    )
343    .bind(account_id)
344    .fetch_one(pool)
345    .await
346    .map_err(|e| StorageError::Query { source: e })?;
347
348    Ok(row.0)
349}
350
351/// Get average tweet engagement rate (avg performance_score across all measured tweets).
352pub async fn get_avg_tweet_engagement(pool: &DbPool) -> Result<f64, StorageError> {
353    get_avg_tweet_engagement_for(pool, DEFAULT_ACCOUNT_ID).await
354}
355
356/// Get total count of measured replies and tweets for a specific account.
357pub async fn get_performance_counts_for(
358    pool: &DbPool,
359    account_id: &str,
360) -> Result<(i64, i64), StorageError> {
361    let reply_count: (i64,) =
362        sqlx::query_as("SELECT COUNT(*) FROM reply_performance WHERE account_id = ?")
363            .bind(account_id)
364            .fetch_one(pool)
365            .await
366            .map_err(|e| StorageError::Query { source: e })?;
367
368    let tweet_count: (i64,) =
369        sqlx::query_as("SELECT COUNT(*) FROM tweet_performance WHERE account_id = ?")
370            .bind(account_id)
371            .fetch_one(pool)
372            .await
373            .map_err(|e| StorageError::Query { source: e })?;
374
375    Ok((reply_count.0, tweet_count.0))
376}
377
378/// Get total count of measured replies and tweets.
379pub async fn get_performance_counts(pool: &DbPool) -> Result<(i64, i64), StorageError> {
380    get_performance_counts_for(pool, DEFAULT_ACCOUNT_ID).await
381}
382
383/// Compute the performance score for a piece of content.
384///
385/// Formula: `(likes * 3 + replies * 5 + retweets * 4) / max(impressions, 1) * 1000`
386pub fn compute_performance_score(likes: i64, replies: i64, retweets: i64, impressions: i64) -> f64 {
387    let numerator = (likes * 3 + replies * 5 + retweets * 4) as f64;
388    let denominator = impressions.max(1) as f64;
389    numerator / denominator * 1000.0
390}
391
392// ============================================================================
393// Analytics summary (aggregated dashboard data)
394// ============================================================================
395
396/// Follower growth metrics.
397#[derive(Debug, Clone, serde::Serialize)]
398pub struct FollowerSummary {
399    pub current: i64,
400    pub change_7d: i64,
401    pub change_30d: i64,
402}
403
404/// Today's action breakdown.
405#[derive(Debug, Clone, serde::Serialize)]
406pub struct ActionsSummary {
407    pub replies: i64,
408    pub tweets: i64,
409    pub threads: i64,
410}
411
412/// Engagement overview.
413#[derive(Debug, Clone, serde::Serialize)]
414pub struct EngagementSummary {
415    pub avg_reply_score: f64,
416    pub avg_tweet_score: f64,
417    pub total_replies_sent: i64,
418    pub total_tweets_posted: i64,
419}
420
421/// Combined analytics summary for the dashboard.
422#[derive(Debug, Clone, serde::Serialize)]
423pub struct AnalyticsSummary {
424    pub followers: FollowerSummary,
425    pub actions_today: ActionsSummary,
426    pub engagement: EngagementSummary,
427    pub top_topics: Vec<ContentScore>,
428}
429
430/// Get a combined analytics summary for the dashboard for a specific account.
431///
432/// Aggregates follower deltas, today's action counts, and engagement stats
433/// into a single struct to minimise round-trips from the frontend.
434pub async fn get_analytics_summary_for(
435    pool: &DbPool,
436    account_id: &str,
437) -> Result<AnalyticsSummary, StorageError> {
438    // --- Follower data ---
439    let snapshots = get_follower_snapshots_for(pool, account_id, 90).await?;
440    let current = snapshots.first().map_or(0, |s| s.follower_count);
441
442    // Find the first snapshot whose date is at least N days ago (handles gaps from
443    // downtime or weekends).  Snapshots are ordered newest-first.
444    let today = Utc::now().date_naive();
445    let follower_at_or_before = |days: i64| -> i64 {
446        snapshots
447            .iter()
448            .find(|s| {
449                NaiveDate::parse_from_str(&s.snapshot_date, "%Y-%m-%d")
450                    .map(|d| (today - d).num_days() >= days)
451                    .unwrap_or(false)
452            })
453            .map_or(current, |s| s.follower_count)
454    };
455
456    let change_7d = if snapshots.len() >= 2 {
457        current - follower_at_or_before(7)
458    } else {
459        0
460    };
461    let change_30d = if snapshots.len() >= 2 {
462        current - follower_at_or_before(30)
463    } else {
464        0
465    };
466
467    // --- Today's actions (from action_log) ---
468    let today = Utc::now().format("%Y-%m-%dT00:00:00Z").to_string();
469    let counts = super::action_log::get_action_counts_since(pool, &today).await?;
470    let actions_today = ActionsSummary {
471        replies: *counts.get("reply").unwrap_or(&0),
472        tweets: *counts.get("tweet").unwrap_or(&0),
473        threads: *counts.get("thread").unwrap_or(&0),
474    };
475
476    // --- Engagement ---
477    let avg_reply_score = get_avg_reply_engagement_for(pool, account_id).await?;
478    let avg_tweet_score = get_avg_tweet_engagement_for(pool, account_id).await?;
479    let (total_replies_sent, total_tweets_posted) =
480        get_performance_counts_for(pool, account_id).await?;
481
482    // --- Top topics ---
483    let top_topics = get_top_topics_for(pool, account_id, 5).await?;
484
485    Ok(AnalyticsSummary {
486        followers: FollowerSummary {
487            current,
488            change_7d,
489            change_30d,
490        },
491        actions_today,
492        engagement: EngagementSummary {
493            avg_reply_score,
494            avg_tweet_score,
495            total_replies_sent,
496            total_tweets_posted,
497        },
498        top_topics,
499    })
500}
501
502/// Get a combined analytics summary for the dashboard.
503///
504/// Aggregates follower deltas, today's action counts, and engagement stats
505/// into a single struct to minimise round-trips from the frontend.
506pub async fn get_analytics_summary(pool: &DbPool) -> Result<AnalyticsSummary, StorageError> {
507    get_analytics_summary_for(pool, DEFAULT_ACCOUNT_ID).await
508}
509
510// ============================================================================
511// Recent performance (joined with content for preview)
512// ============================================================================
513
514/// A recent content item with performance metrics.
515#[derive(Debug, Clone, serde::Serialize)]
516pub struct PerformanceItem {
517    /// "reply", "tweet", or "thread"
518    pub content_type: String,
519    /// Truncated content preview
520    pub content_preview: String,
521    /// Likes received
522    pub likes: i64,
523    /// Replies received
524    pub replies_received: i64,
525    /// Retweets (0 for replies)
526    pub retweets: i64,
527    /// Impressions
528    pub impressions: i64,
529    /// Computed performance score
530    pub performance_score: f64,
531    /// When the content was posted (ISO-8601)
532    pub posted_at: String,
533}
534
535/// Row type returned by the recent-performance UNION query.
536type PerformanceRow = (String, String, i64, i64, i64, i64, f64, String);
537
538/// Get recent content performance items for a specific account, newest first.
539///
540/// Unions reply and tweet performance joined with their content tables
541/// so the dashboard can show a content preview alongside metrics.
542pub async fn get_recent_performance_items_for(
543    pool: &DbPool,
544    account_id: &str,
545    limit: u32,
546) -> Result<Vec<PerformanceItem>, StorageError> {
547    let rows: Vec<PerformanceRow> = sqlx::query_as(
548        "SELECT 'reply' as content_type, \
549                SUBSTR(rs.reply_content, 1, 120) as content_preview, \
550                rp.likes_received, rp.replies_received, 0 as retweets, \
551                rp.impressions, rp.performance_score, rs.created_at as posted_at \
552         FROM reply_performance rp \
553         JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
554         WHERE rp.account_id = ? \
555         UNION ALL \
556         SELECT 'tweet' as content_type, \
557                SUBSTR(ot.content, 1, 120) as content_preview, \
558                tp.likes_received, tp.replies_received, tp.retweets_received, \
559                tp.impressions, tp.performance_score, ot.created_at as posted_at \
560         FROM tweet_performance tp \
561         JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
562         WHERE tp.account_id = ? \
563         ORDER BY posted_at DESC \
564         LIMIT ?",
565    )
566    .bind(account_id)
567    .bind(account_id)
568    .bind(limit)
569    .fetch_all(pool)
570    .await
571    .map_err(|e| StorageError::Query { source: e })?;
572
573    Ok(rows
574        .into_iter()
575        .map(|r| PerformanceItem {
576            content_type: r.0,
577            content_preview: r.1,
578            likes: r.2,
579            replies_received: r.3,
580            retweets: r.4,
581            impressions: r.5,
582            performance_score: r.6,
583            posted_at: r.7,
584        })
585        .collect())
586}
587
588/// Get recent content performance items, newest first.
589///
590/// Unions reply and tweet performance joined with their content tables
591/// so the dashboard can show a content preview alongside metrics.
592pub async fn get_recent_performance_items(
593    pool: &DbPool,
594    limit: u32,
595) -> Result<Vec<PerformanceItem>, StorageError> {
596    get_recent_performance_items_for(pool, DEFAULT_ACCOUNT_ID, limit).await
597}
598
599/// Hourly posting performance data.
600#[derive(Debug, Clone, serde::Serialize)]
601pub struct HourlyPerformance {
602    /// Hour of day (0-23).
603    pub hour: i64,
604    /// Average engagement score for posts in this hour.
605    pub avg_engagement: f64,
606    /// Number of posts in this hour.
607    pub post_count: i64,
608}
609
610/// Get optimal posting times based on historical performance for a specific account.
611pub async fn get_optimal_posting_times_for(
612    pool: &DbPool,
613    account_id: &str,
614) -> Result<Vec<HourlyPerformance>, StorageError> {
615    let rows: Vec<(i64, f64, i64)> = sqlx::query_as(
616        "SELECT
617            CAST(strftime('%H', ot.created_at) AS INTEGER) as hour,
618            COALESCE(AVG(tp.performance_score), 0.0) as avg_engagement,
619            COUNT(*) as post_count
620         FROM original_tweets ot
621         LEFT JOIN tweet_performance tp ON tp.tweet_id = ot.tweet_id
622         WHERE ot.account_id = ? AND ot.status = 'sent' AND ot.tweet_id IS NOT NULL
623         GROUP BY hour
624         ORDER BY avg_engagement DESC",
625    )
626    .bind(account_id)
627    .fetch_all(pool)
628    .await
629    .map_err(|e| StorageError::Query { source: e })?;
630
631    Ok(rows
632        .into_iter()
633        .map(|(hour, avg_engagement, post_count)| HourlyPerformance {
634            hour,
635            avg_engagement,
636            post_count,
637        })
638        .collect())
639}
640
641/// Get optimal posting times based on historical performance.
642pub async fn get_optimal_posting_times(
643    pool: &DbPool,
644) -> Result<Vec<HourlyPerformance>, StorageError> {
645    get_optimal_posting_times_for(pool, DEFAULT_ACCOUNT_ID).await
646}
647
648// ============================================================================
649// Archetype & engagement score updates (Winning DNA pipeline)
650// ============================================================================
651
652/// Update the archetype_vibe classification for a tweet performance record.
653pub async fn update_tweet_archetype(
654    pool: &DbPool,
655    tweet_id: &str,
656    archetype_vibe: &str,
657) -> Result<(), StorageError> {
658    sqlx::query("UPDATE tweet_performance SET archetype_vibe = ? WHERE tweet_id = ?")
659        .bind(archetype_vibe)
660        .bind(tweet_id)
661        .execute(pool)
662        .await
663        .map_err(|e| StorageError::Query { source: e })?;
664    Ok(())
665}
666
667/// Update the archetype_vibe classification for a reply performance record.
668pub async fn update_reply_archetype(
669    pool: &DbPool,
670    reply_id: &str,
671    archetype_vibe: &str,
672) -> Result<(), StorageError> {
673    sqlx::query("UPDATE reply_performance SET archetype_vibe = ? WHERE reply_id = ?")
674        .bind(archetype_vibe)
675        .bind(reply_id)
676        .execute(pool)
677        .await
678        .map_err(|e| StorageError::Query { source: e })?;
679    Ok(())
680}
681
682/// Update the engagement_score for a tweet performance record.
683pub async fn update_tweet_engagement_score(
684    pool: &DbPool,
685    tweet_id: &str,
686    score: f64,
687) -> Result<(), StorageError> {
688    sqlx::query("UPDATE tweet_performance SET engagement_score = ? WHERE tweet_id = ?")
689        .bind(score)
690        .bind(tweet_id)
691        .execute(pool)
692        .await
693        .map_err(|e| StorageError::Query { source: e })?;
694    Ok(())
695}
696
697/// Update the engagement_score for a reply performance record.
698pub async fn update_reply_engagement_score(
699    pool: &DbPool,
700    reply_id: &str,
701    score: f64,
702) -> Result<(), StorageError> {
703    sqlx::query("UPDATE reply_performance SET engagement_score = ? WHERE reply_id = ?")
704        .bind(score)
705        .bind(reply_id)
706        .execute(pool)
707        .await
708        .map_err(|e| StorageError::Query { source: e })?;
709    Ok(())
710}
711
712/// Get the maximum performance_score across all tweets and replies.
713///
714/// Returns 0.0 if no performance data exists. Used to normalize engagement scores.
715pub async fn get_max_performance_score(pool: &DbPool) -> Result<f64, StorageError> {
716    let row: (f64,) = sqlx::query_as(
717        "SELECT COALESCE(MAX(max_score), 0.0) FROM (\
718             SELECT MAX(performance_score) as max_score FROM tweet_performance \
719             UNION ALL \
720             SELECT MAX(performance_score) as max_score FROM reply_performance\
721         )",
722    )
723    .fetch_one(pool)
724    .await
725    .map_err(|e| StorageError::Query { source: e })?;
726
727    Ok(row.0)
728}
729
730/// Row type returned by the ancestor retrieval query.
731#[derive(Debug, Clone)]
732pub struct AncestorRow {
733    /// "tweet" or "reply"
734    pub content_type: String,
735    /// The tweet or reply ID.
736    pub id: String,
737    /// Truncated content preview (up to 120 chars).
738    pub content_preview: String,
739    /// Archetype classification (may be None if not yet classified).
740    pub archetype_vibe: Option<String>,
741    /// Normalized engagement score (0.0-1.0).
742    pub engagement_score: Option<f64>,
743    /// Raw performance score.
744    pub performance_score: f64,
745    /// When the content was posted (ISO-8601).
746    pub posted_at: String,
747}
748
749/// Row type returned by the ancestor retrieval UNION query.
750type AncestorQueryRow = (
751    String,
752    String,
753    String,
754    Option<String>,
755    Option<f64>,
756    f64,
757    String,
758);
759
760/// Convert an ancestor query row tuple into an `AncestorRow` struct.
761fn ancestor_row_from_tuple(r: AncestorQueryRow) -> AncestorRow {
762    AncestorRow {
763        content_type: r.0,
764        id: r.1,
765        content_preview: r.2,
766        archetype_vibe: r.3,
767        engagement_score: r.4,
768        performance_score: r.5,
769        posted_at: r.6,
770    }
771}
772
773/// Query scored ancestors with engagement_score populated.
774///
775/// Returns ancestors where `engagement_score >= min_score`, ordered by
776/// engagement_score DESC. For topic matching, uses the `topic` column on
777/// original_tweets and LIKE-based content matching on replies.
778pub async fn get_scored_ancestors(
779    pool: &DbPool,
780    topic_keywords: &[String],
781    min_score: f64,
782    limit: u32,
783) -> Result<Vec<AncestorRow>, StorageError> {
784    if topic_keywords.is_empty() {
785        // No keywords: return top ancestors regardless of topic
786        let rows: Vec<AncestorQueryRow> = sqlx::query_as(
787            "SELECT 'tweet' as content_type, tp.tweet_id, \
788                        SUBSTR(ot.content, 1, 120), \
789                        tp.archetype_vibe, tp.engagement_score, tp.performance_score, \
790                        ot.created_at \
791                 FROM tweet_performance tp \
792                 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
793                 WHERE tp.engagement_score IS NOT NULL \
794                   AND tp.engagement_score >= ? \
795                 UNION ALL \
796                 SELECT 'reply', rp.reply_id, SUBSTR(rs.reply_content, 1, 120), \
797                        rp.archetype_vibe, rp.engagement_score, rp.performance_score, \
798                        rs.created_at \
799                 FROM reply_performance rp \
800                 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
801                 WHERE rp.engagement_score IS NOT NULL \
802                   AND rp.engagement_score >= ? \
803                 ORDER BY engagement_score DESC \
804                 LIMIT ?",
805        )
806        .bind(min_score)
807        .bind(min_score)
808        .bind(limit)
809        .fetch_all(pool)
810        .await
811        .map_err(|e| StorageError::Query { source: e })?;
812
813        return Ok(rows.into_iter().map(ancestor_row_from_tuple).collect());
814    }
815
816    // Build parameterized IN clause for tweet topics and LIKE clauses for replies.
817    // SQLx uses sequential `?` placeholders for SQLite.
818    let topic_placeholders: String = (0..topic_keywords.len())
819        .map(|_| "?".to_string())
820        .collect::<Vec<_>>()
821        .join(", ");
822
823    let like_conditions: Vec<String> = (0..topic_keywords.len())
824        .map(|_| "rs.reply_content LIKE '%' || ? || '%'".to_string())
825        .collect();
826    let like_clause = like_conditions.join(" OR ");
827
828    let query_str = format!(
829        "SELECT 'tweet' as content_type, tp.tweet_id, \
830                SUBSTR(ot.content, 1, 120), \
831                tp.archetype_vibe, tp.engagement_score, tp.performance_score, \
832                ot.created_at \
833         FROM tweet_performance tp \
834         JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
835         WHERE tp.engagement_score IS NOT NULL \
836           AND tp.engagement_score >= ? \
837           AND (ot.topic IN ({topic_placeholders})) \
838         UNION ALL \
839         SELECT 'reply', rp.reply_id, SUBSTR(rs.reply_content, 1, 120), \
840                rp.archetype_vibe, rp.engagement_score, rp.performance_score, \
841                rs.created_at \
842         FROM reply_performance rp \
843         JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
844         WHERE rp.engagement_score IS NOT NULL \
845           AND rp.engagement_score >= ? \
846           AND ({like_clause}) \
847         ORDER BY engagement_score DESC \
848         LIMIT ?"
849    );
850
851    let mut query = sqlx::query_as::<_, AncestorQueryRow>(&query_str);
852
853    // Bind: min_score for tweets, then topic keywords for IN clause
854    query = query.bind(min_score);
855    for kw in topic_keywords {
856        query = query.bind(kw);
857    }
858    // Bind: min_score for replies, then keywords for LIKE clauses, then limit
859    query = query.bind(min_score);
860    for kw in topic_keywords {
861        query = query.bind(kw);
862    }
863    query = query.bind(limit);
864
865    let rows = query
866        .fetch_all(pool)
867        .await
868        .map_err(|e| StorageError::Query { source: e })?;
869
870    Ok(rows.into_iter().map(ancestor_row_from_tuple).collect())
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876    use crate::storage::init_test_db;
877
878    #[tokio::test]
879    async fn upsert_and_get_follower_snapshot() {
880        let pool = init_test_db().await.expect("init db");
881
882        upsert_follower_snapshot(&pool, 1000, 200, 500)
883            .await
884            .expect("upsert");
885
886        let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
887        assert_eq!(snapshots.len(), 1);
888        assert_eq!(snapshots[0].follower_count, 1000);
889        assert_eq!(snapshots[0].following_count, 200);
890        assert_eq!(snapshots[0].tweet_count, 500);
891    }
892
893    #[tokio::test]
894    async fn upsert_follower_snapshot_updates_existing() {
895        let pool = init_test_db().await.expect("init db");
896
897        upsert_follower_snapshot(&pool, 1000, 200, 500)
898            .await
899            .expect("upsert");
900        upsert_follower_snapshot(&pool, 1050, 201, 510)
901            .await
902            .expect("upsert again");
903
904        let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
905        assert_eq!(snapshots.len(), 1);
906        assert_eq!(snapshots[0].follower_count, 1050);
907    }
908
909    #[tokio::test]
910    async fn upsert_reply_performance_works() {
911        let pool = init_test_db().await.expect("init db");
912
913        upsert_reply_performance(&pool, "r1", 5, 2, 100, 55.0)
914            .await
915            .expect("upsert");
916
917        // Update
918        upsert_reply_performance(&pool, "r1", 10, 3, 200, 75.0)
919            .await
920            .expect("update");
921    }
922
923    #[tokio::test]
924    async fn upsert_tweet_performance_works() {
925        let pool = init_test_db().await.expect("init db");
926
927        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
928            .await
929            .expect("upsert");
930
931        // Update
932        upsert_tweet_performance(&pool, "tw1", 20, 10, 5, 1000, 95.0)
933            .await
934            .expect("update");
935    }
936
937    #[tokio::test]
938    async fn update_and_get_content_scores() {
939        let pool = init_test_db().await.expect("init db");
940
941        update_content_score(&pool, "rust", "tip", 80.0)
942            .await
943            .expect("update");
944        update_content_score(&pool, "rust", "tip", 90.0)
945            .await
946            .expect("update");
947        update_content_score(&pool, "python", "list", 60.0)
948            .await
949            .expect("update");
950
951        let top = get_top_topics(&pool, 10).await.expect("get");
952        assert_eq!(top.len(), 2);
953        // Rust should be higher (avg ~85) than Python (60)
954        assert_eq!(top[0].topic, "rust");
955        assert_eq!(top[0].total_posts, 2);
956        assert!(top[0].avg_performance > 80.0);
957    }
958
959    #[test]
960    fn compute_performance_score_basic() {
961        let score = compute_performance_score(10, 5, 3, 1000);
962        // (10*3 + 5*5 + 3*4) / 1000 * 1000 = (30 + 25 + 12) = 67
963        assert!((score - 67.0).abs() < 0.01);
964    }
965
966    #[test]
967    fn compute_performance_score_zero_impressions() {
968        let score = compute_performance_score(10, 5, 3, 0);
969        // Denominator clamped to 1: (30 + 25 + 12) / 1 * 1000 = 67000
970        assert!((score - 67000.0).abs() < 0.01);
971    }
972
973    #[test]
974    fn compute_performance_score_all_zero() {
975        let score = compute_performance_score(0, 0, 0, 0);
976        assert!((score - 0.0).abs() < 0.01);
977    }
978
979    #[tokio::test]
980    async fn avg_reply_engagement_empty() {
981        let pool = init_test_db().await.expect("init db");
982        let avg = get_avg_reply_engagement(&pool).await.expect("avg");
983        assert!((avg - 0.0).abs() < 0.01);
984    }
985
986    #[tokio::test]
987    async fn avg_reply_engagement_with_data() {
988        let pool = init_test_db().await.expect("init db");
989        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
990            .await
991            .expect("upsert");
992        upsert_reply_performance(&pool, "r2", 20, 10, 2000, 80.0)
993            .await
994            .expect("upsert");
995
996        let avg = get_avg_reply_engagement(&pool).await.expect("avg");
997        // (67 + 80) / 2 = 73.5
998        assert!((avg - 73.5).abs() < 0.01);
999    }
1000
1001    #[tokio::test]
1002    async fn avg_tweet_engagement_empty() {
1003        let pool = init_test_db().await.expect("init db");
1004        let avg = get_avg_tweet_engagement(&pool).await.expect("avg");
1005        assert!((avg - 0.0).abs() < 0.01);
1006    }
1007
1008    #[tokio::test]
1009    async fn performance_counts_empty() {
1010        let pool = init_test_db().await.expect("init db");
1011        let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
1012        assert_eq!(replies, 0);
1013        assert_eq!(tweets, 0);
1014    }
1015
1016    #[tokio::test]
1017    async fn performance_counts_with_data() {
1018        let pool = init_test_db().await.expect("init db");
1019        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1020            .await
1021            .expect("upsert");
1022        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1023            .await
1024            .expect("upsert");
1025        upsert_tweet_performance(&pool, "tw2", 20, 10, 5, 1000, 95.0)
1026            .await
1027            .expect("upsert");
1028
1029        let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
1030        assert_eq!(replies, 1);
1031        assert_eq!(tweets, 2);
1032    }
1033
1034    #[tokio::test]
1035    async fn analytics_summary_empty() {
1036        let pool = init_test_db().await.expect("init db");
1037        let summary = get_analytics_summary(&pool).await.expect("summary");
1038        assert_eq!(summary.followers.current, 0);
1039        assert_eq!(summary.followers.change_7d, 0);
1040        assert_eq!(summary.followers.change_30d, 0);
1041        assert_eq!(summary.actions_today.replies, 0);
1042        assert!((summary.engagement.avg_reply_score - 0.0).abs() < 0.01);
1043        assert!(summary.top_topics.is_empty());
1044    }
1045
1046    #[tokio::test]
1047    async fn analytics_summary_with_data() {
1048        let pool = init_test_db().await.expect("init db");
1049
1050        // Insert follower snapshot (only today since test db is in-memory)
1051        upsert_follower_snapshot(&pool, 1000, 200, 500)
1052            .await
1053            .expect("upsert");
1054
1055        // Insert some performance data
1056        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1057            .await
1058            .expect("upsert");
1059
1060        // Insert content scores so top_topics is populated
1061        update_content_score(&pool, "rust", "tip", 80.0)
1062            .await
1063            .expect("score");
1064        update_content_score(&pool, "ai", "thread", 60.0)
1065            .await
1066            .expect("score");
1067
1068        let summary = get_analytics_summary(&pool).await.expect("summary");
1069        assert_eq!(summary.followers.current, 1000);
1070        assert!(summary.engagement.avg_reply_score > 0.0);
1071        assert_eq!(summary.engagement.total_replies_sent, 1);
1072        assert_eq!(summary.top_topics.len(), 2);
1073        assert_eq!(summary.top_topics[0].topic, "rust");
1074    }
1075
1076    #[tokio::test]
1077    async fn recent_performance_items_empty() {
1078        let pool = init_test_db().await.expect("init db");
1079        let items = get_recent_performance_items(&pool, 10).await.expect("get");
1080        assert!(items.is_empty());
1081    }
1082
1083    #[tokio::test]
1084    async fn recent_performance_items_with_data() {
1085        let pool = init_test_db().await.expect("init db");
1086
1087        // Insert a reply and its performance
1088        let reply = crate::storage::replies::ReplySent {
1089            id: 0,
1090            target_tweet_id: "t1".to_string(),
1091            reply_tweet_id: Some("r1".to_string()),
1092            reply_content: "Great point about testing!".to_string(),
1093            llm_provider: Some("openai".to_string()),
1094            llm_model: Some("gpt-4o".to_string()),
1095            created_at: "2026-02-23T12:00:00Z".to_string(),
1096            status: "sent".to_string(),
1097            error_message: None,
1098        };
1099        crate::storage::replies::insert_reply(&pool, &reply)
1100            .await
1101            .expect("insert reply");
1102        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1103            .await
1104            .expect("upsert perf");
1105
1106        let items = get_recent_performance_items(&pool, 10).await.expect("get");
1107        assert_eq!(items.len(), 1);
1108        assert_eq!(items[0].content_type, "reply");
1109        assert!(items[0].content_preview.contains("testing"));
1110        assert_eq!(items[0].likes, 10);
1111    }
1112
1113    // ============================================================================
1114    // Winning DNA storage tests
1115    // ============================================================================
1116
1117    #[tokio::test]
1118    async fn update_and_get_tweet_archetype() {
1119        let pool = init_test_db().await.expect("init db");
1120
1121        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1122            .await
1123            .expect("upsert");
1124
1125        update_tweet_archetype(&pool, "tw1", "list")
1126            .await
1127            .expect("update");
1128
1129        let row: (Option<String>,) =
1130            sqlx::query_as("SELECT archetype_vibe FROM tweet_performance WHERE tweet_id = ?")
1131                .bind("tw1")
1132                .fetch_one(&pool)
1133                .await
1134                .expect("query");
1135        assert_eq!(row.0.as_deref(), Some("list"));
1136    }
1137
1138    #[tokio::test]
1139    async fn update_and_get_reply_archetype() {
1140        let pool = init_test_db().await.expect("init db");
1141
1142        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
1143            .await
1144            .expect("upsert");
1145
1146        update_reply_archetype(&pool, "r1", "ask_question")
1147            .await
1148            .expect("update");
1149
1150        let row: (Option<String>,) =
1151            sqlx::query_as("SELECT archetype_vibe FROM reply_performance WHERE reply_id = ?")
1152                .bind("r1")
1153                .fetch_one(&pool)
1154                .await
1155                .expect("query");
1156        assert_eq!(row.0.as_deref(), Some("ask_question"));
1157    }
1158
1159    #[tokio::test]
1160    async fn update_and_get_engagement_score() {
1161        let pool = init_test_db().await.expect("init db");
1162
1163        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1164            .await
1165            .expect("upsert");
1166
1167        update_tweet_engagement_score(&pool, "tw1", 0.85)
1168            .await
1169            .expect("update");
1170
1171        let row: (Option<f64>,) =
1172            sqlx::query_as("SELECT engagement_score FROM tweet_performance WHERE tweet_id = ?")
1173                .bind("tw1")
1174                .fetch_one(&pool)
1175                .await
1176                .expect("query");
1177        assert!((row.0.unwrap() - 0.85).abs() < 0.001);
1178    }
1179
1180    #[tokio::test]
1181    async fn get_max_performance_score_empty() {
1182        let pool = init_test_db().await.expect("init db");
1183        let max = get_max_performance_score(&pool).await.expect("max");
1184        assert!((max - 0.0).abs() < 0.01);
1185    }
1186
1187    #[tokio::test]
1188    async fn get_max_performance_score_with_data() {
1189        let pool = init_test_db().await.expect("init db");
1190
1191        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1192            .await
1193            .expect("upsert");
1194        upsert_reply_performance(&pool, "r1", 20, 10, 2000, 95.0)
1195            .await
1196            .expect("upsert");
1197
1198        let max = get_max_performance_score(&pool).await.expect("max");
1199        assert!((max - 95.0).abs() < 0.01);
1200    }
1201
1202    #[tokio::test]
1203    async fn get_scored_ancestors_empty() {
1204        let pool = init_test_db().await.expect("init db");
1205        let ancestors = get_scored_ancestors(&pool, &[], 0.1, 10)
1206            .await
1207            .expect("query");
1208        assert!(ancestors.is_empty());
1209    }
1210
1211    #[tokio::test]
1212    async fn get_scored_ancestors_returns_scored_items() {
1213        let pool = init_test_db().await.expect("init db");
1214
1215        // Insert a tweet with performance + engagement_score
1216        sqlx::query(
1217            "INSERT INTO original_tweets (account_id, tweet_id, content, topic, status, created_at) \
1218             VALUES ('00000000-0000-0000-0000-000000000000', 'tw1', 'Great Rust testing tips', 'rust', 'sent', '2026-02-27T10:00:00Z')",
1219        )
1220        .execute(&pool)
1221        .await
1222        .expect("insert tweet");
1223
1224        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
1225            .await
1226            .expect("upsert perf");
1227        update_tweet_engagement_score(&pool, "tw1", 0.85)
1228            .await
1229            .expect("update score");
1230
1231        let ancestors = get_scored_ancestors(&pool, &[], 0.1, 10)
1232            .await
1233            .expect("query");
1234        assert_eq!(ancestors.len(), 1);
1235        assert_eq!(ancestors[0].content_type, "tweet");
1236        assert_eq!(ancestors[0].id, "tw1");
1237        assert!((ancestors[0].engagement_score.unwrap() - 0.85).abs() < 0.001);
1238    }
1239
1240    #[tokio::test]
1241    async fn get_scored_ancestors_filters_low_engagement() {
1242        let pool = init_test_db().await.expect("init db");
1243
1244        sqlx::query(
1245            "INSERT INTO original_tweets (account_id, tweet_id, content, topic, status, created_at) \
1246             VALUES ('00000000-0000-0000-0000-000000000000', 'tw1', 'Low performer', 'rust', 'sent', '2026-02-27T10:00:00Z')",
1247        )
1248        .execute(&pool)
1249        .await
1250        .expect("insert tweet");
1251
1252        upsert_tweet_performance(&pool, "tw1", 1, 0, 0, 500, 5.0)
1253            .await
1254            .expect("upsert perf");
1255        update_tweet_engagement_score(&pool, "tw1", 0.05)
1256            .await
1257            .expect("update score");
1258
1259        // min_score = 0.1, so this ancestor should be filtered out
1260        let ancestors = get_scored_ancestors(&pool, &[], 0.1, 10)
1261            .await
1262            .expect("query");
1263        assert!(ancestors.is_empty());
1264    }
1265}