1use super::DbPool;
7use crate::error::StorageError;
8use chrono::{NaiveDate, Utc};
9
10#[derive(Debug, Clone, serde::Serialize)]
16pub struct FollowerSnapshot {
17 pub snapshot_date: String,
18 pub follower_count: i64,
19 pub following_count: i64,
20 pub tweet_count: i64,
21}
22
23pub async fn upsert_follower_snapshot(
25 pool: &DbPool,
26 follower_count: i64,
27 following_count: i64,
28 tweet_count: i64,
29) -> Result<(), StorageError> {
30 sqlx::query(
31 "INSERT INTO follower_snapshots (snapshot_date, follower_count, following_count, tweet_count) \
32 VALUES (date('now'), ?, ?, ?) \
33 ON CONFLICT(snapshot_date) DO UPDATE SET \
34 follower_count = excluded.follower_count, \
35 following_count = excluded.following_count, \
36 tweet_count = excluded.tweet_count",
37 )
38 .bind(follower_count)
39 .bind(following_count)
40 .bind(tweet_count)
41 .execute(pool)
42 .await
43 .map_err(|e| StorageError::Query { source: e })?;
44 Ok(())
45}
46
47pub async fn get_follower_snapshots(
49 pool: &DbPool,
50 limit: u32,
51) -> Result<Vec<FollowerSnapshot>, StorageError> {
52 let rows: Vec<(String, i64, i64, i64)> = sqlx::query_as(
53 "SELECT snapshot_date, follower_count, following_count, tweet_count \
54 FROM follower_snapshots ORDER BY snapshot_date DESC LIMIT ?",
55 )
56 .bind(limit)
57 .fetch_all(pool)
58 .await
59 .map_err(|e| StorageError::Query { source: e })?;
60
61 Ok(rows
62 .into_iter()
63 .map(|r| FollowerSnapshot {
64 snapshot_date: r.0,
65 follower_count: r.1,
66 following_count: r.2,
67 tweet_count: r.3,
68 })
69 .collect())
70}
71
72pub async fn upsert_reply_performance(
78 pool: &DbPool,
79 reply_id: &str,
80 likes: i64,
81 replies: i64,
82 impressions: i64,
83 score: f64,
84) -> Result<(), StorageError> {
85 sqlx::query(
86 "INSERT INTO reply_performance (reply_id, likes_received, replies_received, impressions, performance_score) \
87 VALUES (?, ?, ?, ?, ?) \
88 ON CONFLICT(reply_id) DO UPDATE SET \
89 likes_received = excluded.likes_received, \
90 replies_received = excluded.replies_received, \
91 impressions = excluded.impressions, \
92 performance_score = excluded.performance_score, \
93 measured_at = datetime('now')",
94 )
95 .bind(reply_id)
96 .bind(likes)
97 .bind(replies)
98 .bind(impressions)
99 .bind(score)
100 .execute(pool)
101 .await
102 .map_err(|e| StorageError::Query { source: e })?;
103 Ok(())
104}
105
106pub async fn upsert_tweet_performance(
112 pool: &DbPool,
113 tweet_id: &str,
114 likes: i64,
115 retweets: i64,
116 replies: i64,
117 impressions: i64,
118 score: f64,
119) -> Result<(), StorageError> {
120 sqlx::query(
121 "INSERT INTO tweet_performance (tweet_id, likes_received, retweets_received, replies_received, impressions, performance_score) \
122 VALUES (?, ?, ?, ?, ?, ?) \
123 ON CONFLICT(tweet_id) DO UPDATE SET \
124 likes_received = excluded.likes_received, \
125 retweets_received = excluded.retweets_received, \
126 replies_received = excluded.replies_received, \
127 impressions = excluded.impressions, \
128 performance_score = excluded.performance_score, \
129 measured_at = datetime('now')",
130 )
131 .bind(tweet_id)
132 .bind(likes)
133 .bind(retweets)
134 .bind(replies)
135 .bind(impressions)
136 .bind(score)
137 .execute(pool)
138 .await
139 .map_err(|e| StorageError::Query { source: e })?;
140 Ok(())
141}
142
143#[derive(Debug, Clone, serde::Serialize)]
149pub struct ContentScore {
150 pub topic: String,
151 pub format: String,
152 pub total_posts: i64,
153 pub avg_performance: f64,
154}
155
156pub async fn update_content_score(
160 pool: &DbPool,
161 topic: &str,
162 format: &str,
163 new_score: f64,
164) -> Result<(), StorageError> {
165 sqlx::query(
167 "INSERT INTO content_scores (topic, format, total_posts, avg_performance) \
168 VALUES (?, ?, 1, ?) \
169 ON CONFLICT(topic, format) DO UPDATE SET \
170 total_posts = content_scores.total_posts + 1, \
171 avg_performance = content_scores.avg_performance + \
172 (? - content_scores.avg_performance) / (content_scores.total_posts + 1)",
173 )
174 .bind(topic)
175 .bind(format)
176 .bind(new_score)
177 .bind(new_score)
178 .execute(pool)
179 .await
180 .map_err(|e| StorageError::Query { source: e })?;
181 Ok(())
182}
183
184pub async fn get_top_topics(pool: &DbPool, limit: u32) -> Result<Vec<ContentScore>, StorageError> {
186 let rows: Vec<(String, String, i64, f64)> = sqlx::query_as(
187 "SELECT topic, format, total_posts, avg_performance \
188 FROM content_scores \
189 ORDER BY avg_performance DESC \
190 LIMIT ?",
191 )
192 .bind(limit)
193 .fetch_all(pool)
194 .await
195 .map_err(|e| StorageError::Query { source: e })?;
196
197 Ok(rows
198 .into_iter()
199 .map(|r| ContentScore {
200 topic: r.0,
201 format: r.1,
202 total_posts: r.2,
203 avg_performance: r.3,
204 })
205 .collect())
206}
207
208pub async fn get_avg_reply_engagement(pool: &DbPool) -> Result<f64, StorageError> {
210 let row: (f64,) =
211 sqlx::query_as("SELECT COALESCE(AVG(performance_score), 0.0) FROM reply_performance")
212 .fetch_one(pool)
213 .await
214 .map_err(|e| StorageError::Query { source: e })?;
215
216 Ok(row.0)
217}
218
219pub async fn get_avg_tweet_engagement(pool: &DbPool) -> Result<f64, StorageError> {
221 let row: (f64,) =
222 sqlx::query_as("SELECT COALESCE(AVG(performance_score), 0.0) FROM tweet_performance")
223 .fetch_one(pool)
224 .await
225 .map_err(|e| StorageError::Query { source: e })?;
226
227 Ok(row.0)
228}
229
230pub async fn get_performance_counts(pool: &DbPool) -> Result<(i64, i64), StorageError> {
232 let reply_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM reply_performance")
233 .fetch_one(pool)
234 .await
235 .map_err(|e| StorageError::Query { source: e })?;
236
237 let tweet_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tweet_performance")
238 .fetch_one(pool)
239 .await
240 .map_err(|e| StorageError::Query { source: e })?;
241
242 Ok((reply_count.0, tweet_count.0))
243}
244
245pub fn compute_performance_score(likes: i64, replies: i64, retweets: i64, impressions: i64) -> f64 {
249 let numerator = (likes * 3 + replies * 5 + retweets * 4) as f64;
250 let denominator = impressions.max(1) as f64;
251 numerator / denominator * 1000.0
252}
253
254#[derive(Debug, Clone, serde::Serialize)]
260pub struct FollowerSummary {
261 pub current: i64,
262 pub change_7d: i64,
263 pub change_30d: i64,
264}
265
266#[derive(Debug, Clone, serde::Serialize)]
268pub struct ActionsSummary {
269 pub replies: i64,
270 pub tweets: i64,
271 pub threads: i64,
272}
273
274#[derive(Debug, Clone, serde::Serialize)]
276pub struct EngagementSummary {
277 pub avg_reply_score: f64,
278 pub avg_tweet_score: f64,
279 pub total_replies_sent: i64,
280 pub total_tweets_posted: i64,
281}
282
283#[derive(Debug, Clone, serde::Serialize)]
285pub struct AnalyticsSummary {
286 pub followers: FollowerSummary,
287 pub actions_today: ActionsSummary,
288 pub engagement: EngagementSummary,
289 pub top_topics: Vec<ContentScore>,
290}
291
292pub async fn get_analytics_summary(pool: &DbPool) -> Result<AnalyticsSummary, StorageError> {
297 let snapshots = get_follower_snapshots(pool, 90).await?;
299 let current = snapshots.first().map_or(0, |s| s.follower_count);
300
301 let today = Utc::now().date_naive();
304 let follower_at_or_before = |days: i64| -> i64 {
305 snapshots
306 .iter()
307 .find(|s| {
308 NaiveDate::parse_from_str(&s.snapshot_date, "%Y-%m-%d")
309 .map(|d| (today - d).num_days() >= days)
310 .unwrap_or(false)
311 })
312 .map_or(current, |s| s.follower_count)
313 };
314
315 let change_7d = if snapshots.len() >= 2 {
316 current - follower_at_or_before(7)
317 } else {
318 0
319 };
320 let change_30d = if snapshots.len() >= 2 {
321 current - follower_at_or_before(30)
322 } else {
323 0
324 };
325
326 let today = Utc::now().format("%Y-%m-%dT00:00:00Z").to_string();
328 let counts = super::action_log::get_action_counts_since(pool, &today).await?;
329 let actions_today = ActionsSummary {
330 replies: *counts.get("reply").unwrap_or(&0),
331 tweets: *counts.get("tweet").unwrap_or(&0),
332 threads: *counts.get("thread").unwrap_or(&0),
333 };
334
335 let avg_reply_score = get_avg_reply_engagement(pool).await?;
337 let avg_tweet_score = get_avg_tweet_engagement(pool).await?;
338 let (total_replies_sent, total_tweets_posted) = get_performance_counts(pool).await?;
339
340 let top_topics = get_top_topics(pool, 5).await?;
342
343 Ok(AnalyticsSummary {
344 followers: FollowerSummary {
345 current,
346 change_7d,
347 change_30d,
348 },
349 actions_today,
350 engagement: EngagementSummary {
351 avg_reply_score,
352 avg_tweet_score,
353 total_replies_sent,
354 total_tweets_posted,
355 },
356 top_topics,
357 })
358}
359
360#[derive(Debug, Clone, serde::Serialize)]
366pub struct PerformanceItem {
367 pub content_type: String,
369 pub content_preview: String,
371 pub likes: i64,
373 pub replies_received: i64,
375 pub retweets: i64,
377 pub impressions: i64,
379 pub performance_score: f64,
381 pub posted_at: String,
383}
384
385type PerformanceRow = (String, String, i64, i64, i64, i64, f64, String);
387
388pub async fn get_recent_performance_items(
393 pool: &DbPool,
394 limit: u32,
395) -> Result<Vec<PerformanceItem>, StorageError> {
396 let rows: Vec<PerformanceRow> = sqlx::query_as(
397 "SELECT 'reply' as content_type, \
398 SUBSTR(rs.reply_content, 1, 120) as content_preview, \
399 rp.likes_received, rp.replies_received, 0 as retweets, \
400 rp.impressions, rp.performance_score, rs.created_at as posted_at \
401 FROM reply_performance rp \
402 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
403 UNION ALL \
404 SELECT 'tweet' as content_type, \
405 SUBSTR(ot.content, 1, 120) as content_preview, \
406 tp.likes_received, tp.replies_received, tp.retweets_received, \
407 tp.impressions, tp.performance_score, ot.created_at as posted_at \
408 FROM tweet_performance tp \
409 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
410 ORDER BY posted_at DESC \
411 LIMIT ?",
412 )
413 .bind(limit)
414 .fetch_all(pool)
415 .await
416 .map_err(|e| StorageError::Query { source: e })?;
417
418 Ok(rows
419 .into_iter()
420 .map(|r| PerformanceItem {
421 content_type: r.0,
422 content_preview: r.1,
423 likes: r.2,
424 replies_received: r.3,
425 retweets: r.4,
426 impressions: r.5,
427 performance_score: r.6,
428 posted_at: r.7,
429 })
430 .collect())
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use crate::storage::init_test_db;
437
438 #[tokio::test]
439 async fn upsert_and_get_follower_snapshot() {
440 let pool = init_test_db().await.expect("init db");
441
442 upsert_follower_snapshot(&pool, 1000, 200, 500)
443 .await
444 .expect("upsert");
445
446 let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
447 assert_eq!(snapshots.len(), 1);
448 assert_eq!(snapshots[0].follower_count, 1000);
449 assert_eq!(snapshots[0].following_count, 200);
450 assert_eq!(snapshots[0].tweet_count, 500);
451 }
452
453 #[tokio::test]
454 async fn upsert_follower_snapshot_updates_existing() {
455 let pool = init_test_db().await.expect("init db");
456
457 upsert_follower_snapshot(&pool, 1000, 200, 500)
458 .await
459 .expect("upsert");
460 upsert_follower_snapshot(&pool, 1050, 201, 510)
461 .await
462 .expect("upsert again");
463
464 let snapshots = get_follower_snapshots(&pool, 10).await.expect("get");
465 assert_eq!(snapshots.len(), 1);
466 assert_eq!(snapshots[0].follower_count, 1050);
467 }
468
469 #[tokio::test]
470 async fn upsert_reply_performance_works() {
471 let pool = init_test_db().await.expect("init db");
472
473 upsert_reply_performance(&pool, "r1", 5, 2, 100, 55.0)
474 .await
475 .expect("upsert");
476
477 upsert_reply_performance(&pool, "r1", 10, 3, 200, 75.0)
479 .await
480 .expect("update");
481 }
482
483 #[tokio::test]
484 async fn upsert_tweet_performance_works() {
485 let pool = init_test_db().await.expect("init db");
486
487 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
488 .await
489 .expect("upsert");
490
491 upsert_tweet_performance(&pool, "tw1", 20, 10, 5, 1000, 95.0)
493 .await
494 .expect("update");
495 }
496
497 #[tokio::test]
498 async fn update_and_get_content_scores() {
499 let pool = init_test_db().await.expect("init db");
500
501 update_content_score(&pool, "rust", "tip", 80.0)
502 .await
503 .expect("update");
504 update_content_score(&pool, "rust", "tip", 90.0)
505 .await
506 .expect("update");
507 update_content_score(&pool, "python", "list", 60.0)
508 .await
509 .expect("update");
510
511 let top = get_top_topics(&pool, 10).await.expect("get");
512 assert_eq!(top.len(), 2);
513 assert_eq!(top[0].topic, "rust");
515 assert_eq!(top[0].total_posts, 2);
516 assert!(top[0].avg_performance > 80.0);
517 }
518
519 #[test]
520 fn compute_performance_score_basic() {
521 let score = compute_performance_score(10, 5, 3, 1000);
522 assert!((score - 67.0).abs() < 0.01);
524 }
525
526 #[test]
527 fn compute_performance_score_zero_impressions() {
528 let score = compute_performance_score(10, 5, 3, 0);
529 assert!((score - 67000.0).abs() < 0.01);
531 }
532
533 #[test]
534 fn compute_performance_score_all_zero() {
535 let score = compute_performance_score(0, 0, 0, 0);
536 assert!((score - 0.0).abs() < 0.01);
537 }
538
539 #[tokio::test]
540 async fn avg_reply_engagement_empty() {
541 let pool = init_test_db().await.expect("init db");
542 let avg = get_avg_reply_engagement(&pool).await.expect("avg");
543 assert!((avg - 0.0).abs() < 0.01);
544 }
545
546 #[tokio::test]
547 async fn avg_reply_engagement_with_data() {
548 let pool = init_test_db().await.expect("init db");
549 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
550 .await
551 .expect("upsert");
552 upsert_reply_performance(&pool, "r2", 20, 10, 2000, 80.0)
553 .await
554 .expect("upsert");
555
556 let avg = get_avg_reply_engagement(&pool).await.expect("avg");
557 assert!((avg - 73.5).abs() < 0.01);
559 }
560
561 #[tokio::test]
562 async fn avg_tweet_engagement_empty() {
563 let pool = init_test_db().await.expect("init db");
564 let avg = get_avg_tweet_engagement(&pool).await.expect("avg");
565 assert!((avg - 0.0).abs() < 0.01);
566 }
567
568 #[tokio::test]
569 async fn performance_counts_empty() {
570 let pool = init_test_db().await.expect("init db");
571 let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
572 assert_eq!(replies, 0);
573 assert_eq!(tweets, 0);
574 }
575
576 #[tokio::test]
577 async fn performance_counts_with_data() {
578 let pool = init_test_db().await.expect("init db");
579 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
580 .await
581 .expect("upsert");
582 upsert_tweet_performance(&pool, "tw1", 10, 5, 3, 500, 82.0)
583 .await
584 .expect("upsert");
585 upsert_tweet_performance(&pool, "tw2", 20, 10, 5, 1000, 95.0)
586 .await
587 .expect("upsert");
588
589 let (replies, tweets) = get_performance_counts(&pool).await.expect("counts");
590 assert_eq!(replies, 1);
591 assert_eq!(tweets, 2);
592 }
593
594 #[tokio::test]
595 async fn analytics_summary_empty() {
596 let pool = init_test_db().await.expect("init db");
597 let summary = get_analytics_summary(&pool).await.expect("summary");
598 assert_eq!(summary.followers.current, 0);
599 assert_eq!(summary.followers.change_7d, 0);
600 assert_eq!(summary.followers.change_30d, 0);
601 assert_eq!(summary.actions_today.replies, 0);
602 assert!((summary.engagement.avg_reply_score - 0.0).abs() < 0.01);
603 assert!(summary.top_topics.is_empty());
604 }
605
606 #[tokio::test]
607 async fn analytics_summary_with_data() {
608 let pool = init_test_db().await.expect("init db");
609
610 upsert_follower_snapshot(&pool, 1000, 200, 500)
612 .await
613 .expect("upsert");
614
615 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
617 .await
618 .expect("upsert");
619
620 update_content_score(&pool, "rust", "tip", 80.0)
622 .await
623 .expect("score");
624 update_content_score(&pool, "ai", "thread", 60.0)
625 .await
626 .expect("score");
627
628 let summary = get_analytics_summary(&pool).await.expect("summary");
629 assert_eq!(summary.followers.current, 1000);
630 assert!(summary.engagement.avg_reply_score > 0.0);
631 assert_eq!(summary.engagement.total_replies_sent, 1);
632 assert_eq!(summary.top_topics.len(), 2);
633 assert_eq!(summary.top_topics[0].topic, "rust");
634 }
635
636 #[tokio::test]
637 async fn recent_performance_items_empty() {
638 let pool = init_test_db().await.expect("init db");
639 let items = get_recent_performance_items(&pool, 10).await.expect("get");
640 assert!(items.is_empty());
641 }
642
643 #[tokio::test]
644 async fn recent_performance_items_with_data() {
645 let pool = init_test_db().await.expect("init db");
646
647 let reply = crate::storage::replies::ReplySent {
649 id: 0,
650 target_tweet_id: "t1".to_string(),
651 reply_tweet_id: Some("r1".to_string()),
652 reply_content: "Great point about testing!".to_string(),
653 llm_provider: Some("openai".to_string()),
654 llm_model: Some("gpt-4o".to_string()),
655 created_at: "2026-02-23T12:00:00Z".to_string(),
656 status: "sent".to_string(),
657 error_message: None,
658 };
659 crate::storage::replies::insert_reply(&pool, &reply)
660 .await
661 .expect("insert reply");
662 upsert_reply_performance(&pool, "r1", 10, 5, 1000, 67.0)
663 .await
664 .expect("upsert perf");
665
666 let items = get_recent_performance_items(&pool, 10).await.expect("get");
667 assert_eq!(items.len(), 1);
668 assert_eq!(items[0].content_type, "reply");
669 assert!(items[0].content_preview.contains("testing"));
670 assert_eq!(items[0].likes, 10);
671 }
672}