1use crate::error::StorageError;
8use crate::storage::DbPool;
9
10#[derive(Debug, Clone, Default, serde::Serialize)]
12pub struct ActionCounts {
13 pub replies: i64,
14 pub tweets: i64,
15 pub threads: i64,
16 pub target_replies: i64,
17}
18
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct TopicPerformance {
22 pub topic: String,
23 pub format: String,
24 pub avg_score: f64,
25 pub post_count: i64,
26}
27
28#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
30pub struct ContentHighlight {
31 pub content_type: String,
32 pub content_preview: String,
33 pub performance_score: f64,
34 pub likes: i64,
35 pub replies_received: i64,
36}
37
38pub async fn count_actions_in_range(
42 pool: &DbPool,
43 start: &str,
44 end: &str,
45) -> Result<ActionCounts, StorageError> {
46 let rows: Vec<(String, i64)> = sqlx::query_as(
47 "SELECT action_type, COUNT(*) as cnt FROM action_log \
48 WHERE created_at >= ? AND created_at < ? AND status = 'success' \
49 GROUP BY action_type",
50 )
51 .bind(start)
52 .bind(end)
53 .fetch_all(pool)
54 .await
55 .map_err(|e| StorageError::Query { source: e })?;
56
57 let mut counts = ActionCounts::default();
58 for (action_type, count) in rows {
59 match action_type.as_str() {
60 "reply" => counts.replies = count,
61 "tweet" => counts.tweets = count,
62 "thread" => counts.threads = count,
63 "target_reply" => counts.target_replies = count,
64 _ => {}
65 }
66 }
67 Ok(counts)
68}
69
70pub async fn get_follower_at_date(pool: &DbPool, date: &str) -> Result<Option<i64>, StorageError> {
74 let row: Option<(i64,)> = sqlx::query_as(
75 "SELECT follower_count FROM follower_snapshots \
76 WHERE snapshot_date <= ? ORDER BY snapshot_date DESC LIMIT 1",
77 )
78 .bind(date)
79 .fetch_optional(pool)
80 .await
81 .map_err(|e| StorageError::Query { source: e })?;
82
83 Ok(row.map(|r| r.0))
84}
85
86pub async fn avg_reply_score_in_range(
88 pool: &DbPool,
89 start: &str,
90 end: &str,
91) -> Result<f64, StorageError> {
92 let row: (f64,) = sqlx::query_as(
93 "SELECT COALESCE(AVG(rp.performance_score), 0.0) \
94 FROM reply_performance rp \
95 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
96 WHERE rs.created_at >= ? AND rs.created_at < ?",
97 )
98 .bind(start)
99 .bind(end)
100 .fetch_one(pool)
101 .await
102 .map_err(|e| StorageError::Query { source: e })?;
103
104 Ok(row.0)
105}
106
107pub async fn avg_tweet_score_in_range(
109 pool: &DbPool,
110 start: &str,
111 end: &str,
112) -> Result<f64, StorageError> {
113 let row: (f64,) = sqlx::query_as(
114 "SELECT COALESCE(AVG(tp.performance_score), 0.0) \
115 FROM tweet_performance tp \
116 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
117 WHERE ot.created_at >= ? AND ot.created_at < ?",
118 )
119 .bind(start)
120 .bind(end)
121 .fetch_one(pool)
122 .await
123 .map_err(|e| StorageError::Query { source: e })?;
124
125 Ok(row.0)
126}
127
128pub async fn reply_acceptance_rate(
130 pool: &DbPool,
131 start: &str,
132 end: &str,
133) -> Result<f64, StorageError> {
134 let row: (i64, i64) = sqlx::query_as(
135 "SELECT \
136 COUNT(*) as total, \
137 SUM(CASE WHEN rp.replies_received > 0 THEN 1 ELSE 0 END) as accepted \
138 FROM replies_sent rs \
139 JOIN reply_performance rp ON rp.reply_id = rs.reply_tweet_id \
140 WHERE rs.created_at >= ? AND rs.created_at < ?",
141 )
142 .bind(start)
143 .bind(end)
144 .fetch_one(pool)
145 .await
146 .map_err(|e| StorageError::Query { source: e })?;
147
148 if row.0 == 0 {
149 return Ok(0.0);
150 }
151 Ok(row.1 as f64 / row.0 as f64)
152}
153
154pub async fn top_topics_in_range(
156 pool: &DbPool,
157 start: &str,
158 end: &str,
159 limit: u32,
160) -> Result<Vec<TopicPerformance>, StorageError> {
161 let rows: Vec<(String, String, f64, i64)> = sqlx::query_as(
162 "SELECT ot.topic, COALESCE(ot.topic, '') as format, \
163 AVG(tp.performance_score) as avg_score, COUNT(*) as post_count \
164 FROM tweet_performance tp \
165 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
166 WHERE ot.created_at >= ? AND ot.created_at < ? AND ot.topic IS NOT NULL \
167 GROUP BY ot.topic \
168 HAVING post_count >= 1 \
169 ORDER BY avg_score DESC \
170 LIMIT ?",
171 )
172 .bind(start)
173 .bind(end)
174 .bind(limit)
175 .fetch_all(pool)
176 .await
177 .map_err(|e| StorageError::Query { source: e })?;
178
179 Ok(rows
180 .into_iter()
181 .map(|r| TopicPerformance {
182 topic: r.0,
183 format: r.1,
184 avg_score: r.2,
185 post_count: r.3,
186 })
187 .collect())
188}
189
190pub async fn bottom_topics_in_range(
192 pool: &DbPool,
193 start: &str,
194 end: &str,
195 limit: u32,
196) -> Result<Vec<TopicPerformance>, StorageError> {
197 let rows: Vec<(String, String, f64, i64)> = sqlx::query_as(
198 "SELECT ot.topic, COALESCE(ot.topic, '') as format, \
199 AVG(tp.performance_score) as avg_score, COUNT(*) as post_count \
200 FROM tweet_performance tp \
201 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
202 WHERE ot.created_at >= ? AND ot.created_at < ? AND ot.topic IS NOT NULL \
203 GROUP BY ot.topic \
204 HAVING post_count >= 3 \
205 ORDER BY avg_score ASC \
206 LIMIT ?",
207 )
208 .bind(start)
209 .bind(end)
210 .bind(limit)
211 .fetch_all(pool)
212 .await
213 .map_err(|e| StorageError::Query { source: e })?;
214
215 Ok(rows
216 .into_iter()
217 .map(|r| TopicPerformance {
218 topic: r.0,
219 format: r.1,
220 avg_score: r.2,
221 post_count: r.3,
222 })
223 .collect())
224}
225
226pub async fn top_content_in_range(
228 pool: &DbPool,
229 start: &str,
230 end: &str,
231 limit: u32,
232) -> Result<Vec<ContentHighlight>, StorageError> {
233 let rows: Vec<(String, String, f64, i64, i64)> = sqlx::query_as(
234 "SELECT content_type, content_preview, performance_score, likes, replies_received FROM ( \
235 SELECT 'reply' as content_type, \
236 SUBSTR(rs.reply_content, 1, 120) as content_preview, \
237 rp.performance_score, rp.likes_received as likes, \
238 rp.replies_received, rs.created_at as posted_at \
239 FROM reply_performance rp \
240 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
241 WHERE rs.created_at >= ? AND rs.created_at < ? \
242 UNION ALL \
243 SELECT 'tweet' as content_type, \
244 SUBSTR(ot.content, 1, 120) as content_preview, \
245 tp.performance_score, tp.likes_received as likes, \
246 tp.replies_received, ot.created_at as posted_at \
247 FROM tweet_performance tp \
248 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
249 WHERE ot.created_at >= ? AND ot.created_at < ? \
250 ) ORDER BY performance_score DESC LIMIT ?",
251 )
252 .bind(start)
253 .bind(end)
254 .bind(start)
255 .bind(end)
256 .bind(limit)
257 .fetch_all(pool)
258 .await
259 .map_err(|e| StorageError::Query { source: e })?;
260
261 Ok(rows
262 .into_iter()
263 .map(|r| ContentHighlight {
264 content_type: r.0,
265 content_preview: r.1,
266 performance_score: r.2,
267 likes: r.3,
268 replies_received: r.4,
269 })
270 .collect())
271}
272
273pub async fn distinct_topic_count(
275 pool: &DbPool,
276 start: &str,
277 end: &str,
278) -> Result<i64, StorageError> {
279 let row: (i64,) = sqlx::query_as(
280 "SELECT COUNT(DISTINCT topic) FROM original_tweets \
281 WHERE created_at >= ? AND created_at < ? AND topic IS NOT NULL AND topic != ''",
282 )
283 .bind(start)
284 .bind(end)
285 .fetch_one(pool)
286 .await
287 .map_err(|e| StorageError::Query { source: e })?;
288
289 Ok(row.0)
290}
291
292pub async fn count_actions_in_range_for(
299 pool: &DbPool,
300 account_id: &str,
301 start: &str,
302 end: &str,
303) -> Result<ActionCounts, StorageError> {
304 let rows: Vec<(String, i64)> = sqlx::query_as(
305 "SELECT action_type, COUNT(*) as cnt FROM action_log \
306 WHERE created_at >= ? AND created_at < ? AND status = 'success' \
307 AND account_id = ? \
308 GROUP BY action_type",
309 )
310 .bind(start)
311 .bind(end)
312 .bind(account_id)
313 .fetch_all(pool)
314 .await
315 .map_err(|e| StorageError::Query { source: e })?;
316
317 let mut counts = ActionCounts::default();
318 for (action_type, count) in rows {
319 match action_type.as_str() {
320 "reply" => counts.replies = count,
321 "tweet" => counts.tweets = count,
322 "thread" => counts.threads = count,
323 "target_reply" => counts.target_replies = count,
324 _ => {}
325 }
326 }
327 Ok(counts)
328}
329
330pub async fn get_follower_at_date_for(
332 pool: &DbPool,
333 account_id: &str,
334 date: &str,
335) -> Result<Option<i64>, StorageError> {
336 let row: Option<(i64,)> = sqlx::query_as(
337 "SELECT follower_count FROM follower_snapshots \
338 WHERE snapshot_date <= ? AND account_id = ? \
339 ORDER BY snapshot_date DESC LIMIT 1",
340 )
341 .bind(date)
342 .bind(account_id)
343 .fetch_optional(pool)
344 .await
345 .map_err(|e| StorageError::Query { source: e })?;
346
347 Ok(row.map(|r| r.0))
348}
349
350pub async fn avg_reply_score_in_range_for(
352 pool: &DbPool,
353 account_id: &str,
354 start: &str,
355 end: &str,
356) -> Result<f64, StorageError> {
357 let row: (f64,) = sqlx::query_as(
358 "SELECT COALESCE(AVG(rp.performance_score), 0.0) \
359 FROM reply_performance rp \
360 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
361 WHERE rs.created_at >= ? AND rs.created_at < ? AND rs.account_id = ?",
362 )
363 .bind(start)
364 .bind(end)
365 .bind(account_id)
366 .fetch_one(pool)
367 .await
368 .map_err(|e| StorageError::Query { source: e })?;
369
370 Ok(row.0)
371}
372
373pub async fn avg_tweet_score_in_range_for(
375 pool: &DbPool,
376 account_id: &str,
377 start: &str,
378 end: &str,
379) -> Result<f64, StorageError> {
380 let row: (f64,) = sqlx::query_as(
381 "SELECT COALESCE(AVG(tp.performance_score), 0.0) \
382 FROM tweet_performance tp \
383 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
384 WHERE ot.created_at >= ? AND ot.created_at < ? AND ot.account_id = ?",
385 )
386 .bind(start)
387 .bind(end)
388 .bind(account_id)
389 .fetch_one(pool)
390 .await
391 .map_err(|e| StorageError::Query { source: e })?;
392
393 Ok(row.0)
394}
395
396pub async fn reply_acceptance_rate_for(
398 pool: &DbPool,
399 account_id: &str,
400 start: &str,
401 end: &str,
402) -> Result<f64, StorageError> {
403 let row: (i64, i64) = sqlx::query_as(
404 "SELECT \
405 COUNT(*) as total, \
406 SUM(CASE WHEN rp.replies_received > 0 THEN 1 ELSE 0 END) as accepted \
407 FROM replies_sent rs \
408 JOIN reply_performance rp ON rp.reply_id = rs.reply_tweet_id \
409 WHERE rs.created_at >= ? AND rs.created_at < ? AND rs.account_id = ?",
410 )
411 .bind(start)
412 .bind(end)
413 .bind(account_id)
414 .fetch_one(pool)
415 .await
416 .map_err(|e| StorageError::Query { source: e })?;
417
418 if row.0 == 0 {
419 return Ok(0.0);
420 }
421 Ok(row.1 as f64 / row.0 as f64)
422}
423
424pub async fn top_topics_in_range_for(
426 pool: &DbPool,
427 account_id: &str,
428 start: &str,
429 end: &str,
430 limit: u32,
431) -> Result<Vec<TopicPerformance>, StorageError> {
432 let rows: Vec<(String, String, f64, i64)> = sqlx::query_as(
433 "SELECT ot.topic, COALESCE(ot.topic, '') as format, \
434 AVG(tp.performance_score) as avg_score, COUNT(*) as post_count \
435 FROM tweet_performance tp \
436 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
437 WHERE ot.created_at >= ? AND ot.created_at < ? AND ot.topic IS NOT NULL \
438 AND ot.account_id = ? \
439 GROUP BY ot.topic \
440 HAVING post_count >= 1 \
441 ORDER BY avg_score DESC \
442 LIMIT ?",
443 )
444 .bind(start)
445 .bind(end)
446 .bind(account_id)
447 .bind(limit)
448 .fetch_all(pool)
449 .await
450 .map_err(|e| StorageError::Query { source: e })?;
451
452 Ok(rows
453 .into_iter()
454 .map(|r| TopicPerformance {
455 topic: r.0,
456 format: r.1,
457 avg_score: r.2,
458 post_count: r.3,
459 })
460 .collect())
461}
462
463pub async fn bottom_topics_in_range_for(
465 pool: &DbPool,
466 account_id: &str,
467 start: &str,
468 end: &str,
469 limit: u32,
470) -> Result<Vec<TopicPerformance>, StorageError> {
471 let rows: Vec<(String, String, f64, i64)> = sqlx::query_as(
472 "SELECT ot.topic, COALESCE(ot.topic, '') as format, \
473 AVG(tp.performance_score) as avg_score, COUNT(*) as post_count \
474 FROM tweet_performance tp \
475 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
476 WHERE ot.created_at >= ? AND ot.created_at < ? AND ot.topic IS NOT NULL \
477 AND ot.account_id = ? \
478 GROUP BY ot.topic \
479 HAVING post_count >= 3 \
480 ORDER BY avg_score ASC \
481 LIMIT ?",
482 )
483 .bind(start)
484 .bind(end)
485 .bind(account_id)
486 .bind(limit)
487 .fetch_all(pool)
488 .await
489 .map_err(|e| StorageError::Query { source: e })?;
490
491 Ok(rows
492 .into_iter()
493 .map(|r| TopicPerformance {
494 topic: r.0,
495 format: r.1,
496 avg_score: r.2,
497 post_count: r.3,
498 })
499 .collect())
500}
501
502pub async fn top_content_in_range_for(
504 pool: &DbPool,
505 account_id: &str,
506 start: &str,
507 end: &str,
508 limit: u32,
509) -> Result<Vec<ContentHighlight>, StorageError> {
510 let rows: Vec<(String, String, f64, i64, i64)> = sqlx::query_as(
511 "SELECT content_type, content_preview, performance_score, likes, replies_received FROM ( \
512 SELECT 'reply' as content_type, \
513 SUBSTR(rs.reply_content, 1, 120) as content_preview, \
514 rp.performance_score, rp.likes_received as likes, \
515 rp.replies_received, rs.created_at as posted_at \
516 FROM reply_performance rp \
517 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
518 WHERE rs.created_at >= ? AND rs.created_at < ? AND rs.account_id = ? \
519 UNION ALL \
520 SELECT 'tweet' as content_type, \
521 SUBSTR(ot.content, 1, 120) as content_preview, \
522 tp.performance_score, tp.likes_received as likes, \
523 tp.replies_received, ot.created_at as posted_at \
524 FROM tweet_performance tp \
525 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
526 WHERE ot.created_at >= ? AND ot.created_at < ? AND ot.account_id = ? \
527 ) ORDER BY performance_score DESC LIMIT ?",
528 )
529 .bind(start)
530 .bind(end)
531 .bind(account_id)
532 .bind(start)
533 .bind(end)
534 .bind(account_id)
535 .bind(limit)
536 .fetch_all(pool)
537 .await
538 .map_err(|e| StorageError::Query { source: e })?;
539
540 Ok(rows
541 .into_iter()
542 .map(|r| ContentHighlight {
543 content_type: r.0,
544 content_preview: r.1,
545 performance_score: r.2,
546 likes: r.3,
547 replies_received: r.4,
548 })
549 .collect())
550}
551
552pub async fn distinct_topic_count_for(
554 pool: &DbPool,
555 account_id: &str,
556 start: &str,
557 end: &str,
558) -> Result<i64, StorageError> {
559 let row: (i64,) = sqlx::query_as(
560 "SELECT COUNT(DISTINCT topic) FROM original_tweets \
561 WHERE created_at >= ? AND created_at < ? AND topic IS NOT NULL \
562 AND topic != '' AND account_id = ?",
563 )
564 .bind(start)
565 .bind(end)
566 .bind(account_id)
567 .fetch_one(pool)
568 .await
569 .map_err(|e| StorageError::Query { source: e })?;
570
571 Ok(row.0)
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use crate::storage::init_test_db;
578
579 #[tokio::test]
580 async fn count_actions_empty() {
581 let pool = init_test_db().await.expect("init db");
582 let counts = count_actions_in_range(&pool, "2026-01-01T00:00:00Z", "2026-12-31T23:59:59Z")
583 .await
584 .expect("count");
585 assert_eq!(counts.replies, 0);
586 assert_eq!(counts.tweets, 0);
587 }
588
589 #[tokio::test]
590 async fn follower_at_date_empty() {
591 let pool = init_test_db().await.expect("init db");
592 let count = get_follower_at_date(&pool, "2026-12-31")
593 .await
594 .expect("get");
595 assert!(count.is_none());
596 }
597
598 #[tokio::test]
599 async fn avg_reply_score_empty() {
600 let pool = init_test_db().await.expect("init db");
601 let score = avg_reply_score_in_range(&pool, "2026-01-01T00:00:00Z", "2026-12-31T23:59:59Z")
602 .await
603 .expect("avg");
604 assert!((score - 0.0).abs() < 0.01);
605 }
606
607 #[tokio::test]
608 async fn reply_acceptance_rate_empty() {
609 let pool = init_test_db().await.expect("init db");
610 let rate = reply_acceptance_rate(&pool, "2026-01-01T00:00:00Z", "2026-12-31T23:59:59Z")
611 .await
612 .expect("rate");
613 assert!((rate - 0.0).abs() < 0.01);
614 }
615
616 #[tokio::test]
617 async fn top_topics_empty() {
618 let pool = init_test_db().await.expect("init db");
619 let topics = top_topics_in_range(&pool, "2026-01-01T00:00:00Z", "2026-12-31T23:59:59Z", 5)
620 .await
621 .expect("topics");
622 assert!(topics.is_empty());
623 }
624
625 #[tokio::test]
626 async fn top_content_empty() {
627 let pool = init_test_db().await.expect("init db");
628 let items = top_content_in_range(&pool, "2026-01-01T00:00:00Z", "2026-12-31T23:59:59Z", 5)
629 .await
630 .expect("content");
631 assert!(items.is_empty());
632 }
633
634 #[tokio::test]
635 async fn distinct_topic_count_empty() {
636 let pool = init_test_db().await.expect("init db");
637 let count = distinct_topic_count(&pool, "2026-01-01T00:00:00Z", "2026-12-31T23:59:59Z")
638 .await
639 .expect("count");
640 assert_eq!(count, 0);
641 }
642}