Skip to main content

tuitbot_core/storage/
analytics.rs

1//! CRUD operations for analytics tables.
2//!
3//! Manages follower snapshots, reply/tweet performance metrics,
4//! and content score running averages.
5
6use super::DbPool;
7use crate::error::StorageError;
8use chrono::{NaiveDate, Utc};
9
10// ============================================================================
11// Follower snapshots
12// ============================================================================
13
14/// A daily follower snapshot.
15#[derive(Debug, Clone, serde::Serialize)]
16pub struct FollowerSnapshot {
17    pub snapshot_date: String,
18    pub follower_count: i64,
19    pub following_count: i64,
20    pub tweet_count: i64,
21}
22
23/// Upsert a follower snapshot for today.
24pub async fn upsert_follower_snapshot(
25    pool: &DbPool,
26    follower_count: i64,
27    following_count: i64,
28    tweet_count: i64,
29) -> Result<(), StorageError> {
30    sqlx::query(
31        "INSERT INTO follower_snapshots (snapshot_date, follower_count, following_count, tweet_count) \
32         VALUES (date('now'), ?, ?, ?) \
33         ON CONFLICT(snapshot_date) DO UPDATE SET \
34         follower_count = excluded.follower_count, \
35         following_count = excluded.following_count, \
36         tweet_count = excluded.tweet_count",
37    )
38    .bind(follower_count)
39    .bind(following_count)
40    .bind(tweet_count)
41    .execute(pool)
42    .await
43    .map_err(|e| StorageError::Query { source: e })?;
44    Ok(())
45}
46
47/// Get the most recent N follower snapshots, newest first.
48pub async fn get_follower_snapshots(
49    pool: &DbPool,
50    limit: u32,
51) -> Result<Vec<FollowerSnapshot>, StorageError> {
52    let rows: Vec<(String, i64, i64, i64)> = sqlx::query_as(
53        "SELECT snapshot_date, follower_count, following_count, tweet_count \
54         FROM follower_snapshots ORDER BY snapshot_date DESC LIMIT ?",
55    )
56    .bind(limit)
57    .fetch_all(pool)
58    .await
59    .map_err(|e| StorageError::Query { source: e })?;
60
61    Ok(rows
62        .into_iter()
63        .map(|r| FollowerSnapshot {
64            snapshot_date: r.0,
65            follower_count: r.1,
66            following_count: r.2,
67            tweet_count: r.3,
68        })
69        .collect())
70}
71
72// ============================================================================
73// Reply performance
74// ============================================================================
75
76/// Store or update reply performance metrics.
77pub async fn upsert_reply_performance(
78    pool: &DbPool,
79    reply_id: &str,
80    likes: i64,
81    replies: i64,
82    impressions: i64,
83    score: f64,
84) -> Result<(), StorageError> {
85    sqlx::query(
86        "INSERT INTO reply_performance (reply_id, likes_received, replies_received, impressions, performance_score) \
87         VALUES (?, ?, ?, ?, ?) \
88         ON CONFLICT(reply_id) DO UPDATE SET \
89         likes_received = excluded.likes_received, \
90         replies_received = excluded.replies_received, \
91         impressions = excluded.impressions, \
92         performance_score = excluded.performance_score, \
93         measured_at = datetime('now')",
94    )
95    .bind(reply_id)
96    .bind(likes)
97    .bind(replies)
98    .bind(impressions)
99    .bind(score)
100    .execute(pool)
101    .await
102    .map_err(|e| StorageError::Query { source: e })?;
103    Ok(())
104}
105
106// ============================================================================
107// Tweet performance
108// ============================================================================
109
110/// Store or update tweet performance metrics.
111pub async fn upsert_tweet_performance(
112    pool: &DbPool,
113    tweet_id: &str,
114    likes: i64,
115    retweets: i64,
116    replies: i64,
117    impressions: i64,
118    score: f64,
119) -> Result<(), StorageError> {
120    sqlx::query(
121        "INSERT INTO tweet_performance (tweet_id, likes_received, retweets_received, replies_received, impressions, performance_score) \
122         VALUES (?, ?, ?, ?, ?, ?) \
123         ON CONFLICT(tweet_id) DO UPDATE SET \
124         likes_received = excluded.likes_received, \
125         retweets_received = excluded.retweets_received, \
126         replies_received = excluded.replies_received, \
127         impressions = excluded.impressions, \
128         performance_score = excluded.performance_score, \
129         measured_at = datetime('now')",
130    )
131    .bind(tweet_id)
132    .bind(likes)
133    .bind(retweets)
134    .bind(replies)
135    .bind(impressions)
136    .bind(score)
137    .execute(pool)
138    .await
139    .map_err(|e| StorageError::Query { source: e })?;
140    Ok(())
141}
142
143// ============================================================================
144// Content scores
145// ============================================================================
146
147/// A topic/format performance score.
148#[derive(Debug, Clone, serde::Serialize)]
149pub struct ContentScore {
150    pub topic: String,
151    pub format: String,
152    pub total_posts: i64,
153    pub avg_performance: f64,
154}
155
156/// Update the running average for a topic/format pair.
157///
158/// Uses incremental mean: new_avg = old_avg + (score - old_avg) / new_count.
159pub async fn update_content_score(
160    pool: &DbPool,
161    topic: &str,
162    format: &str,
163    new_score: f64,
164) -> Result<(), StorageError> {
165    // Insert or update with incremental average
166    sqlx::query(
167        "INSERT INTO content_scores (topic, format, total_posts, avg_performance) \
168         VALUES (?, ?, 1, ?) \
169         ON CONFLICT(topic, format) DO UPDATE SET \
170         total_posts = content_scores.total_posts + 1, \
171         avg_performance = content_scores.avg_performance + \
172         (? - content_scores.avg_performance) / (content_scores.total_posts + 1)",
173    )
174    .bind(topic)
175    .bind(format)
176    .bind(new_score)
177    .bind(new_score)
178    .execute(pool)
179    .await
180    .map_err(|e| StorageError::Query { source: e })?;
181    Ok(())
182}
183
184/// Get top-performing topics ordered by average performance descending.
185pub async fn get_top_topics(pool: &DbPool, limit: u32) -> Result<Vec<ContentScore>, StorageError> {
186    let rows: Vec<(String, String, i64, f64)> = sqlx::query_as(
187        "SELECT topic, format, total_posts, avg_performance \
188         FROM content_scores \
189         ORDER BY avg_performance DESC \
190         LIMIT ?",
191    )
192    .bind(limit)
193    .fetch_all(pool)
194    .await
195    .map_err(|e| StorageError::Query { source: e })?;
196
197    Ok(rows
198        .into_iter()
199        .map(|r| ContentScore {
200            topic: r.0,
201            format: r.1,
202            total_posts: r.2,
203            avg_performance: r.3,
204        })
205        .collect())
206}
207
208/// Get average reply engagement rate (avg performance_score across all measured replies).
209pub async fn get_avg_reply_engagement(pool: &DbPool) -> Result<f64, StorageError> {
210    let row: (f64,) =
211        sqlx::query_as("SELECT COALESCE(AVG(performance_score), 0.0) FROM reply_performance")
212            .fetch_one(pool)
213            .await
214            .map_err(|e| StorageError::Query { source: e })?;
215
216    Ok(row.0)
217}
218
219/// Get average tweet engagement rate (avg performance_score across all measured tweets).
220pub async fn get_avg_tweet_engagement(pool: &DbPool) -> Result<f64, StorageError> {
221    let row: (f64,) =
222        sqlx::query_as("SELECT COALESCE(AVG(performance_score), 0.0) FROM tweet_performance")
223            .fetch_one(pool)
224            .await
225            .map_err(|e| StorageError::Query { source: e })?;
226
227    Ok(row.0)
228}
229
230/// Get total count of measured replies and tweets.
231pub async fn get_performance_counts(pool: &DbPool) -> Result<(i64, i64), StorageError> {
232    let reply_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM reply_performance")
233        .fetch_one(pool)
234        .await
235        .map_err(|e| StorageError::Query { source: e })?;
236
237    let tweet_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tweet_performance")
238        .fetch_one(pool)
239        .await
240        .map_err(|e| StorageError::Query { source: e })?;
241
242    Ok((reply_count.0, tweet_count.0))
243}
244
245/// Compute the performance score for a piece of content.
246///
247/// Formula: `(likes * 3 + replies * 5 + retweets * 4) / max(impressions, 1) * 1000`
248pub fn compute_performance_score(likes: i64, replies: i64, retweets: i64, impressions: i64) -> f64 {
249    let numerator = (likes * 3 + replies * 5 + retweets * 4) as f64;
250    let denominator = impressions.max(1) as f64;
251    numerator / denominator * 1000.0
252}
253
254// ============================================================================
255// Analytics summary (aggregated dashboard data)
256// ============================================================================
257
258/// Follower growth metrics.
259#[derive(Debug, Clone, serde::Serialize)]
260pub struct FollowerSummary {
261    pub current: i64,
262    pub change_7d: i64,
263    pub change_30d: i64,
264}
265
266/// Today's action breakdown.
267#[derive(Debug, Clone, serde::Serialize)]
268pub struct ActionsSummary {
269    pub replies: i64,
270    pub tweets: i64,
271    pub threads: i64,
272}
273
274/// Engagement overview.
275#[derive(Debug, Clone, serde::Serialize)]
276pub struct EngagementSummary {
277    pub avg_reply_score: f64,
278    pub avg_tweet_score: f64,
279    pub total_replies_sent: i64,
280    pub total_tweets_posted: i64,
281}
282
283/// Combined analytics summary for the dashboard.
284#[derive(Debug, Clone, serde::Serialize)]
285pub struct AnalyticsSummary {
286    pub followers: FollowerSummary,
287    pub actions_today: ActionsSummary,
288    pub engagement: EngagementSummary,
289    pub top_topics: Vec<ContentScore>,
290}
291
292/// Get a combined analytics summary for the dashboard.
293///
294/// Aggregates follower deltas, today's action counts, and engagement stats
295/// into a single struct to minimise round-trips from the frontend.
296pub async fn get_analytics_summary(pool: &DbPool) -> Result<AnalyticsSummary, StorageError> {
297    // --- Follower data ---
298    let snapshots = get_follower_snapshots(pool, 90).await?;
299    let current = snapshots.first().map_or(0, |s| s.follower_count);
300
301    // Find the first snapshot whose date is at least N days ago (handles gaps from
302    // downtime or weekends).  Snapshots are ordered newest-first.
303    let today = Utc::now().date_naive();
304    let follower_at_or_before = |days: i64| -> i64 {
305        snapshots
306            .iter()
307            .find(|s| {
308                NaiveDate::parse_from_str(&s.snapshot_date, "%Y-%m-%d")
309                    .map(|d| (today - d).num_days() >= days)
310                    .unwrap_or(false)
311            })
312            .map_or(current, |s| s.follower_count)
313    };
314
315    let change_7d = if snapshots.len() >= 2 {
316        current - follower_at_or_before(7)
317    } else {
318        0
319    };
320    let change_30d = if snapshots.len() >= 2 {
321        current - follower_at_or_before(30)
322    } else {
323        0
324    };
325
326    // --- Today's actions (from action_log) ---
327    let today = Utc::now().format("%Y-%m-%dT00:00:00Z").to_string();
328    let counts = super::action_log::get_action_counts_since(pool, &today).await?;
329    let actions_today = ActionsSummary {
330        replies: *counts.get("reply").unwrap_or(&0),
331        tweets: *counts.get("tweet").unwrap_or(&0),
332        threads: *counts.get("thread").unwrap_or(&0),
333    };
334
335    // --- Engagement ---
336    let avg_reply_score = get_avg_reply_engagement(pool).await?;
337    let avg_tweet_score = get_avg_tweet_engagement(pool).await?;
338    let (total_replies_sent, total_tweets_posted) = get_performance_counts(pool).await?;
339
340    // --- Top topics ---
341    let top_topics = get_top_topics(pool, 5).await?;
342
343    Ok(AnalyticsSummary {
344        followers: FollowerSummary {
345            current,
346            change_7d,
347            change_30d,
348        },
349        actions_today,
350        engagement: EngagementSummary {
351            avg_reply_score,
352            avg_tweet_score,
353            total_replies_sent,
354            total_tweets_posted,
355        },
356        top_topics,
357    })
358}
359
360// ============================================================================
361// Recent performance (joined with content for preview)
362// ============================================================================
363
364/// A recent content item with performance metrics.
365#[derive(Debug, Clone, serde::Serialize)]
366pub struct PerformanceItem {
367    /// "reply", "tweet", or "thread"
368    pub content_type: String,
369    /// Truncated content preview
370    pub content_preview: String,
371    /// Likes received
372    pub likes: i64,
373    /// Replies received
374    pub replies_received: i64,
375    /// Retweets (0 for replies)
376    pub retweets: i64,
377    /// Impressions
378    pub impressions: i64,
379    /// Computed performance score
380    pub performance_score: f64,
381    /// When the content was posted (ISO-8601)
382    pub posted_at: String,
383}
384
385/// Row type returned by the recent-performance UNION query.
386type PerformanceRow = (String, String, i64, i64, i64, i64, f64, String);
387
388/// Get recent content performance items, newest first.
389///
390/// Unions reply and tweet performance joined with their content tables
391/// so the dashboard can show a content preview alongside metrics.
392pub async fn get_recent_performance_items(
393    pool: &DbPool,
394    limit: u32,
395) -> Result<Vec<PerformanceItem>, StorageError> {
396    let rows: Vec<PerformanceRow> = sqlx::query_as(
397        "SELECT 'reply' as content_type, \
398                SUBSTR(rs.reply_content, 1, 120) as content_preview, \
399                rp.likes_received, rp.replies_received, 0 as retweets, \
400                rp.impressions, rp.performance_score, rs.created_at as posted_at \
401         FROM reply_performance rp \
402         JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
403         UNION ALL \
404         SELECT 'tweet' as content_type, \
405                SUBSTR(ot.content, 1, 120) as content_preview, \
406                tp.likes_received, tp.replies_received, tp.retweets_received, \
407                tp.impressions, tp.performance_score, ot.created_at as posted_at \
408         FROM tweet_performance tp \
409         JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
410         ORDER BY posted_at DESC \
411         LIMIT ?",
412    )
413    .bind(limit)
414    .fetch_all(pool)
415    .await
416    .map_err(|e| StorageError::Query { source: e })?;
417
418    Ok(rows
419        .into_iter()
420        .map(|r| PerformanceItem {
421            content_type: r.0,
422            content_preview: r.1,
423            likes: r.2,
424            replies_received: r.3,
425            retweets: r.4,
426            impressions: r.5,
427            performance_score: r.6,
428            posted_at: r.7,
429        })
430        .collect())
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use crate::storage::init_test_db;
437
438    #[tokio::test]
439    async fn upsert_and_get_follower_snapshot() {
440        let pool = init_test_db().await.expect("init db");
441
442        upsert_follower_snapshot(&pool, 1000, 200, 500)
443            .await
444            .expect("upsert");
445
446        let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
447        assert_eq!(snapshots.len(), 1);
448        assert_eq!(snapshots[0].follower_count, 1000);
449        assert_eq!(snapshots[0].following_count, 200);
450        assert_eq!(snapshots[0].tweet_count, 500);
451    }
452
453    #[tokio::test]
454    async fn upsert_follower_snapshot_updates_existing() {
455        let pool = init_test_db().await.expect("init db");
456
457        upsert_follower_snapshot(&pool, 1000, 200, 500)
458            .await
459            .expect("upsert");
460        upsert_follower_snapshot(&pool, 1050, 201, 510)
461            .await
462            .expect("upsert again");
463
464        let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
465        assert_eq!(snapshots.len(), 1);
466        assert_eq!(snapshots[0].follower_count, 1050);
467    }
468
469    #[tokio::test]
470    async fn upsert_reply_performance_works() {
471        let pool = init_test_db().await.expect("init db");
472
473        upsert_reply_performance(&pool, "r1", 5, 2, 100, 55.0)
474            .await
475            .expect("upsert");
476
477        // Update
478        upsert_reply_performance(&pool, "r1", 10, 3, 200, 75.0)
479            .await
480            .expect("update");
481    }
482
483    #[tokio::test]
484    async fn upsert_tweet_performance_works() {
485        let pool = init_test_db().await.expect("init db");
486
487        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
488            .await
489            .expect("upsert");
490
491        // Update
492        upsert_tweet_performance(&pool, "tw1", 20, 10, 5, 1000, 95.0)
493            .await
494            .expect("update");
495    }
496
497    #[tokio::test]
498    async fn update_and_get_content_scores() {
499        let pool = init_test_db().await.expect("init db");
500
501        update_content_score(&pool, "rust", "tip", 80.0)
502            .await
503            .expect("update");
504        update_content_score(&pool, "rust", "tip", 90.0)
505            .await
506            .expect("update");
507        update_content_score(&pool, "python", "list", 60.0)
508            .await
509            .expect("update");
510
511        let top = get_top_topics(&pool, 10).await.expect("get");
512        assert_eq!(top.len(), 2);
513        // Rust should be higher (avg ~85) than Python (60)
514        assert_eq!(top[0].topic, "rust");
515        assert_eq!(top[0].total_posts, 2);
516        assert!(top[0].avg_performance > 80.0);
517    }
518
519    #[test]
520    fn compute_performance_score_basic() {
521        let score = compute_performance_score(10, 5, 3, 1000);
522        // (10*3 + 5*5 + 3*4) / 1000 * 1000 = (30 + 25 + 12) = 67
523        assert!((score - 67.0).abs() < 0.01);
524    }
525
526    #[test]
527    fn compute_performance_score_zero_impressions() {
528        let score = compute_performance_score(10, 5, 3, 0);
529        // Denominator clamped to 1: (30 + 25 + 12) / 1 * 1000 = 67000
530        assert!((score - 67000.0).abs() < 0.01);
531    }
532
533    #[test]
534    fn compute_performance_score_all_zero() {
535        let score = compute_performance_score(0, 0, 0, 0);
536        assert!((score - 0.0).abs() < 0.01);
537    }
538
539    #[tokio::test]
540    async fn avg_reply_engagement_empty() {
541        let pool = init_test_db().await.expect("init db");
542        let avg = get_avg_reply_engagement(&pool).await.expect("avg");
543        assert!((avg - 0.0).abs() < 0.01);
544    }
545
546    #[tokio::test]
547    async fn avg_reply_engagement_with_data() {
548        let pool = init_test_db().await.expect("init db");
549        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
550            .await
551            .expect("upsert");
552        upsert_reply_performance(&pool, "r2", 20, 10, 2000, 80.0)
553            .await
554            .expect("upsert");
555
556        let avg = get_avg_reply_engagement(&pool).await.expect("avg");
557        // (67 + 80) / 2 = 73.5
558        assert!((avg - 73.5).abs() < 0.01);
559    }
560
561    #[tokio::test]
562    async fn avg_tweet_engagement_empty() {
563        let pool = init_test_db().await.expect("init db");
564        let avg = get_avg_tweet_engagement(&pool).await.expect("avg");
565        assert!((avg - 0.0).abs() < 0.01);
566    }
567
568    #[tokio::test]
569    async fn performance_counts_empty() {
570        let pool = init_test_db().await.expect("init db");
571        let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
572        assert_eq!(replies, 0);
573        assert_eq!(tweets, 0);
574    }
575
576    #[tokio::test]
577    async fn performance_counts_with_data() {
578        let pool = init_test_db().await.expect("init db");
579        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
580            .await
581            .expect("upsert");
582        upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
583            .await
584            .expect("upsert");
585        upsert_tweet_performance(&pool, "tw2", 20, 10, 5, 1000, 95.0)
586            .await
587            .expect("upsert");
588
589        let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
590        assert_eq!(replies, 1);
591        assert_eq!(tweets, 2);
592    }
593
594    #[tokio::test]
595    async fn analytics_summary_empty() {
596        let pool = init_test_db().await.expect("init db");
597        let summary = get_analytics_summary(&pool).await.expect("summary");
598        assert_eq!(summary.followers.current, 0);
599        assert_eq!(summary.followers.change_7d, 0);
600        assert_eq!(summary.followers.change_30d, 0);
601        assert_eq!(summary.actions_today.replies, 0);
602        assert!((summary.engagement.avg_reply_score - 0.0).abs() < 0.01);
603        assert!(summary.top_topics.is_empty());
604    }
605
606    #[tokio::test]
607    async fn analytics_summary_with_data() {
608        let pool = init_test_db().await.expect("init db");
609
610        // Insert follower snapshot (only today since test db is in-memory)
611        upsert_follower_snapshot(&pool, 1000, 200, 500)
612            .await
613            .expect("upsert");
614
615        // Insert some performance data
616        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
617            .await
618            .expect("upsert");
619
620        // Insert content scores so top_topics is populated
621        update_content_score(&pool, "rust", "tip", 80.0)
622            .await
623            .expect("score");
624        update_content_score(&pool, "ai", "thread", 60.0)
625            .await
626            .expect("score");
627
628        let summary = get_analytics_summary(&pool).await.expect("summary");
629        assert_eq!(summary.followers.current, 1000);
630        assert!(summary.engagement.avg_reply_score > 0.0);
631        assert_eq!(summary.engagement.total_replies_sent, 1);
632        assert_eq!(summary.top_topics.len(), 2);
633        assert_eq!(summary.top_topics[0].topic, "rust");
634    }
635
636    #[tokio::test]
637    async fn recent_performance_items_empty() {
638        let pool = init_test_db().await.expect("init db");
639        let items = get_recent_performance_items(&pool, 10).await.expect("get");
640        assert!(items.is_empty());
641    }
642
643    #[tokio::test]
644    async fn recent_performance_items_with_data() {
645        let pool = init_test_db().await.expect("init db");
646
647        // Insert a reply and its performance
648        let reply = crate::storage::replies::ReplySent {
649            id: 0,
650            target_tweet_id: "t1".to_string(),
651            reply_tweet_id: Some("r1".to_string()),
652            reply_content: "Great point about testing!".to_string(),
653            llm_provider: Some("openai".to_string()),
654            llm_model: Some("gpt-4o".to_string()),
655            created_at: "2026-02-23T12:00:00Z".to_string(),
656            status: "sent".to_string(),
657            error_message: None,
658        };
659        crate::storage::replies::insert_reply(&pool, &reply)
660            .await
661            .expect("insert reply");
662        upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
663            .await
664            .expect("upsert perf");
665
666        let items = get_recent_performance_items(&pool, 10).await.expect("get");
667        assert_eq!(items.len(), 1);
668        assert_eq!(items[0].content_type, "reply");
669        assert!(items[0].content_preview.contains("testing"));
670        assert_eq!(items[0].likes, 10);
671    }
672}