1use super::loop_helpers::ConsecutiveErrorTracker;
10use super::scheduler::LoopScheduler;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio_util::sync::CancellationToken;
14
15#[async_trait::async_trait]
21pub trait ProfileFetcher: Send + Sync {
22 async fn get_profile_metrics(&self) -> Result<ProfileMetrics, AnalyticsError>;
24}
25
26#[async_trait::async_trait]
28pub trait EngagementFetcher: Send + Sync {
29 async fn get_tweet_metrics(&self, tweet_id: &str) -> Result<TweetMetrics, AnalyticsError>;
31}
32
33#[async_trait::async_trait]
35pub trait AnalyticsStorage: Send + Sync {
36 async fn store_follower_snapshot(
38 &self,
39 followers: i64,
40 following: i64,
41 tweets: i64,
42 ) -> Result<(), AnalyticsError>;
43
44 async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError>;
46
47 async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError>;
49
50 async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError>;
52
53 async fn store_reply_performance(
55 &self,
56 reply_id: &str,
57 likes: i64,
58 replies: i64,
59 impressions: i64,
60 score: f64,
61 ) -> Result<(), AnalyticsError>;
62
63 async fn store_tweet_performance(
65 &self,
66 tweet_id: &str,
67 likes: i64,
68 retweets: i64,
69 replies: i64,
70 impressions: i64,
71 score: f64,
72 ) -> Result<(), AnalyticsError>;
73
74 async fn update_content_score(
76 &self,
77 topic: &str,
78 format: &str,
79 score: f64,
80 ) -> Result<(), AnalyticsError>;
81
82 async fn log_action(
84 &self,
85 action_type: &str,
86 status: &str,
87 message: &str,
88 ) -> Result<(), AnalyticsError>;
89}
90
91#[derive(Debug, Clone)]
97pub struct ProfileMetrics {
98 pub follower_count: i64,
99 pub following_count: i64,
100 pub tweet_count: i64,
101}
102
103#[derive(Debug, Clone)]
105pub struct TweetMetrics {
106 pub likes: i64,
107 pub retweets: i64,
108 pub replies: i64,
109 pub impressions: i64,
110}
111
112#[derive(Debug)]
114pub enum AnalyticsError {
115 ApiError(String),
117 StorageError(String),
119 Other(String),
121}
122
123impl std::fmt::Display for AnalyticsError {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 match self {
126 Self::ApiError(msg) => write!(f, "API error: {msg}"),
127 Self::StorageError(msg) => write!(f, "storage error: {msg}"),
128 Self::Other(msg) => write!(f, "{msg}"),
129 }
130 }
131}
132
133impl std::error::Error for AnalyticsError {}
134
135pub struct AnalyticsLoop {
141 profile_fetcher: Arc<dyn ProfileFetcher>,
142 engagement_fetcher: Arc<dyn EngagementFetcher>,
143 storage: Arc<dyn AnalyticsStorage>,
144}
145
146impl AnalyticsLoop {
147 pub fn new(
149 profile_fetcher: Arc<dyn ProfileFetcher>,
150 engagement_fetcher: Arc<dyn EngagementFetcher>,
151 storage: Arc<dyn AnalyticsStorage>,
152 ) -> Self {
153 Self {
154 profile_fetcher,
155 engagement_fetcher,
156 storage,
157 }
158 }
159
160 pub async fn run(&self, cancel: CancellationToken, scheduler: LoopScheduler) {
162 tracing::info!("Analytics loop started");
163
164 let mut error_tracker = ConsecutiveErrorTracker::new(5, Duration::from_secs(600));
165
166 loop {
167 if cancel.is_cancelled() {
168 break;
169 }
170
171 match self.run_iteration().await {
172 Ok(summary) => {
173 error_tracker.record_success();
174 tracing::info!(
175 followers = summary.follower_count,
176 replies_measured = summary.replies_measured,
177 tweets_measured = summary.tweets_measured,
178 "Analytics iteration complete"
179 );
180 }
181 Err(e) => {
182 let should_pause = error_tracker.record_error();
183 tracing::warn!(error = %e, "Analytics iteration failed");
184
185 if should_pause {
186 tracing::warn!(
187 pause_secs = error_tracker.pause_duration().as_secs(),
188 "Pausing analytics loop due to consecutive errors"
189 );
190 tokio::select! {
191 _ = cancel.cancelled() => break,
192 _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
193 }
194 error_tracker.reset();
195 continue;
196 }
197 }
198 }
199
200 tokio::select! {
201 _ = cancel.cancelled() => break,
202 _ = scheduler.tick() => {},
203 }
204 }
205
206 tracing::info!("Analytics loop stopped");
207 }
208
209 pub async fn run_iteration(&self) -> Result<AnalyticsSummary, AnalyticsError> {
211 let mut summary = AnalyticsSummary::default();
212
213 let metrics = self.profile_fetcher.get_profile_metrics().await?;
215 summary.follower_count = metrics.follower_count;
216
217 tracing::info!(
218 followers = metrics.follower_count,
219 "Follower snapshot: {} followers",
220 metrics.follower_count,
221 );
222
223 self.storage
224 .store_follower_snapshot(
225 metrics.follower_count,
226 metrics.following_count,
227 metrics.tweet_count,
228 )
229 .await?;
230
231 if let Ok(Some(yesterday)) = self.storage.get_yesterday_followers().await {
233 if yesterday > 0 {
234 let drop_pct =
235 (yesterday - metrics.follower_count) as f64 / yesterday as f64 * 100.0;
236 if drop_pct > 2.0 {
237 tracing::warn!(
238 yesterday = yesterday,
239 today = metrics.follower_count,
240 drop_pct = format!("{:.1}%", drop_pct),
241 "Significant follower drop detected"
242 );
243
244 let _ = self
245 .storage
246 .log_action(
247 "analytics",
248 "alert",
249 &format!(
250 "Follower drop: {} -> {} ({:.1}%)",
251 yesterday, metrics.follower_count, drop_pct
252 ),
253 )
254 .await;
255 }
256 }
257 }
258
259 let reply_ids = self.storage.get_replies_needing_measurement().await?;
261 for reply_id in &reply_ids {
262 match self.engagement_fetcher.get_tweet_metrics(reply_id).await {
263 Ok(m) => {
264 let score =
265 compute_performance_score(m.likes, m.replies, m.retweets, m.impressions);
266 let _ = self
267 .storage
268 .store_reply_performance(reply_id, m.likes, m.replies, m.impressions, score)
269 .await;
270 summary.replies_measured += 1;
271 }
272 Err(e) => {
273 tracing::debug!(reply_id = %reply_id, error = %e, "Failed to fetch reply metrics");
274 }
275 }
276 }
277
278 let tweet_ids = self.storage.get_tweets_needing_measurement().await?;
280 for tweet_id in &tweet_ids {
281 match self.engagement_fetcher.get_tweet_metrics(tweet_id).await {
282 Ok(m) => {
283 let score =
284 compute_performance_score(m.likes, m.replies, m.retweets, m.impressions);
285 let _ = self
286 .storage
287 .store_tweet_performance(
288 tweet_id,
289 m.likes,
290 m.retweets,
291 m.replies,
292 m.impressions,
293 score,
294 )
295 .await;
296 summary.tweets_measured += 1;
297 }
298 Err(e) => {
299 tracing::debug!(tweet_id = %tweet_id, error = %e, "Failed to fetch tweet metrics");
300 }
301 }
302 }
303
304 let _ = self
305 .storage
306 .log_action(
307 "analytics",
308 "success",
309 &format!(
310 "Followers: {}, replies measured: {}, tweets measured: {}",
311 summary.follower_count, summary.replies_measured, summary.tweets_measured,
312 ),
313 )
314 .await;
315
316 Ok(summary)
317 }
318}
319
320#[derive(Debug, Default)]
322pub struct AnalyticsSummary {
323 pub follower_count: i64,
324 pub replies_measured: usize,
325 pub tweets_measured: usize,
326}
327
328pub fn compute_performance_score(likes: i64, replies: i64, retweets: i64, impressions: i64) -> f64 {
332 let numerator = (likes * 3 + replies * 5 + retweets * 4) as f64;
333 let denominator = impressions.max(1) as f64;
334 numerator / denominator * 1000.0
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use std::sync::Mutex;
341
342 struct MockProfileFetcher {
345 metrics: ProfileMetrics,
346 }
347
348 #[async_trait::async_trait]
349 impl ProfileFetcher for MockProfileFetcher {
350 async fn get_profile_metrics(&self) -> Result<ProfileMetrics, AnalyticsError> {
351 Ok(self.metrics.clone())
352 }
353 }
354
355 struct MockEngagementFetcher {
356 metrics: TweetMetrics,
357 }
358
359 #[async_trait::async_trait]
360 impl EngagementFetcher for MockEngagementFetcher {
361 async fn get_tweet_metrics(&self, _tweet_id: &str) -> Result<TweetMetrics, AnalyticsError> {
362 Ok(self.metrics.clone())
363 }
364 }
365
366 struct MockAnalyticsStorage {
367 snapshots: Mutex<Vec<(i64, i64, i64)>>,
368 yesterday_followers: Option<i64>,
369 reply_ids: Vec<String>,
370 tweet_ids: Vec<String>,
371 reply_perfs: Mutex<Vec<(String, f64)>>,
372 tweet_perfs: Mutex<Vec<(String, f64)>>,
373 }
374
375 impl MockAnalyticsStorage {
376 fn new() -> Self {
377 Self {
378 snapshots: Mutex::new(Vec::new()),
379 yesterday_followers: None,
380 reply_ids: Vec::new(),
381 tweet_ids: Vec::new(),
382 reply_perfs: Mutex::new(Vec::new()),
383 tweet_perfs: Mutex::new(Vec::new()),
384 }
385 }
386
387 fn with_yesterday(mut self, followers: i64) -> Self {
388 self.yesterday_followers = Some(followers);
389 self
390 }
391
392 fn with_replies(mut self, ids: Vec<String>) -> Self {
393 self.reply_ids = ids;
394 self
395 }
396
397 fn with_tweets(mut self, ids: Vec<String>) -> Self {
398 self.tweet_ids = ids;
399 self
400 }
401 }
402
403 #[async_trait::async_trait]
404 impl AnalyticsStorage for MockAnalyticsStorage {
405 async fn store_follower_snapshot(
406 &self,
407 followers: i64,
408 following: i64,
409 tweets: i64,
410 ) -> Result<(), AnalyticsError> {
411 self.snapshots
412 .lock()
413 .expect("lock")
414 .push((followers, following, tweets));
415 Ok(())
416 }
417
418 async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
419 Ok(self.yesterday_followers)
420 }
421
422 async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
423 Ok(self.reply_ids.clone())
424 }
425
426 async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
427 Ok(self.tweet_ids.clone())
428 }
429
430 async fn store_reply_performance(
431 &self,
432 reply_id: &str,
433 _likes: i64,
434 _replies: i64,
435 _impressions: i64,
436 score: f64,
437 ) -> Result<(), AnalyticsError> {
438 self.reply_perfs
439 .lock()
440 .expect("lock")
441 .push((reply_id.to_string(), score));
442 Ok(())
443 }
444
445 async fn store_tweet_performance(
446 &self,
447 tweet_id: &str,
448 _likes: i64,
449 _retweets: i64,
450 _replies: i64,
451 _impressions: i64,
452 score: f64,
453 ) -> Result<(), AnalyticsError> {
454 self.tweet_perfs
455 .lock()
456 .expect("lock")
457 .push((tweet_id.to_string(), score));
458 Ok(())
459 }
460
461 async fn update_content_score(
462 &self,
463 _topic: &str,
464 _format: &str,
465 _score: f64,
466 ) -> Result<(), AnalyticsError> {
467 Ok(())
468 }
469
470 async fn log_action(
471 &self,
472 _action_type: &str,
473 _status: &str,
474 _message: &str,
475 ) -> Result<(), AnalyticsError> {
476 Ok(())
477 }
478 }
479
480 fn default_profile() -> ProfileMetrics {
481 ProfileMetrics {
482 follower_count: 1000,
483 following_count: 200,
484 tweet_count: 500,
485 }
486 }
487
488 fn default_tweet_metrics() -> TweetMetrics {
489 TweetMetrics {
490 likes: 10,
491 retweets: 3,
492 replies: 5,
493 impressions: 1000,
494 }
495 }
496
497 #[tokio::test]
500 async fn iteration_snapshots_followers() {
501 let storage = Arc::new(MockAnalyticsStorage::new());
502 let analytics = AnalyticsLoop::new(
503 Arc::new(MockProfileFetcher {
504 metrics: default_profile(),
505 }),
506 Arc::new(MockEngagementFetcher {
507 metrics: default_tweet_metrics(),
508 }),
509 storage.clone(),
510 );
511
512 let summary = analytics.run_iteration().await.expect("iteration");
513 assert_eq!(summary.follower_count, 1000);
514 assert_eq!(storage.snapshots.lock().expect("lock").len(), 1);
515 }
516
517 #[tokio::test]
518 async fn iteration_measures_replies() {
519 let storage = Arc::new(
520 MockAnalyticsStorage::new().with_replies(vec!["r1".to_string(), "r2".to_string()]),
521 );
522 let analytics = AnalyticsLoop::new(
523 Arc::new(MockProfileFetcher {
524 metrics: default_profile(),
525 }),
526 Arc::new(MockEngagementFetcher {
527 metrics: default_tweet_metrics(),
528 }),
529 storage.clone(),
530 );
531
532 let summary = analytics.run_iteration().await.expect("iteration");
533 assert_eq!(summary.replies_measured, 2);
534 assert_eq!(storage.reply_perfs.lock().expect("lock").len(), 2);
535 }
536
537 #[tokio::test]
538 async fn iteration_measures_tweets() {
539 let storage = Arc::new(MockAnalyticsStorage::new().with_tweets(vec!["tw1".to_string()]));
540 let analytics = AnalyticsLoop::new(
541 Arc::new(MockProfileFetcher {
542 metrics: default_profile(),
543 }),
544 Arc::new(MockEngagementFetcher {
545 metrics: default_tweet_metrics(),
546 }),
547 storage.clone(),
548 );
549
550 let summary = analytics.run_iteration().await.expect("iteration");
551 assert_eq!(summary.tweets_measured, 1);
552 assert_eq!(storage.tweet_perfs.lock().expect("lock").len(), 1);
553 }
554
555 #[tokio::test]
556 async fn iteration_detects_follower_drop() {
557 let storage = Arc::new(MockAnalyticsStorage::new().with_yesterday(1000));
559 let analytics = AnalyticsLoop::new(
560 Arc::new(MockProfileFetcher {
561 metrics: ProfileMetrics {
562 follower_count: 970,
563 following_count: 200,
564 tweet_count: 500,
565 },
566 }),
567 Arc::new(MockEngagementFetcher {
568 metrics: default_tweet_metrics(),
569 }),
570 storage,
571 );
572
573 let summary = analytics.run_iteration().await.expect("iteration");
575 assert_eq!(summary.follower_count, 970);
576 }
577
578 #[tokio::test]
579 async fn iteration_no_drop_alert_when_stable() {
580 let storage = Arc::new(MockAnalyticsStorage::new().with_yesterday(1000));
582 let analytics = AnalyticsLoop::new(
583 Arc::new(MockProfileFetcher {
584 metrics: ProfileMetrics {
585 follower_count: 999,
586 following_count: 200,
587 tweet_count: 500,
588 },
589 }),
590 Arc::new(MockEngagementFetcher {
591 metrics: default_tweet_metrics(),
592 }),
593 storage,
594 );
595
596 let summary = analytics.run_iteration().await.expect("iteration");
597 assert_eq!(summary.follower_count, 999);
598 }
599
600 #[test]
601 fn performance_score_basic() {
602 let score = compute_performance_score(10, 5, 3, 1000);
603 assert!((score - 67.0).abs() < 0.01);
605 }
606
607 #[test]
608 fn performance_score_zero_impressions() {
609 let score = compute_performance_score(10, 5, 3, 0);
610 assert!((score - 67000.0).abs() < 0.01);
611 }
612
613 #[test]
614 fn analytics_error_display() {
615 let err = AnalyticsError::ApiError("timeout".to_string());
616 assert_eq!(err.to_string(), "API error: timeout");
617
618 let err = AnalyticsError::StorageError("disk full".to_string());
619 assert_eq!(err.to_string(), "storage error: disk full");
620 }
621
622 #[test]
623 fn analytics_error_display_other() {
624 let err = AnalyticsError::Other("unexpected".to_string());
625 assert_eq!(err.to_string(), "unexpected");
626 }
627
628 #[test]
629 fn analytics_error_is_std_error() {
630 let err = AnalyticsError::ApiError("test".to_string());
631 let _: &dyn std::error::Error = &err;
633 }
634
635 #[test]
636 fn analytics_summary_default() {
637 let summary = AnalyticsSummary::default();
638 assert_eq!(summary.follower_count, 0);
639 assert_eq!(summary.replies_measured, 0);
640 assert_eq!(summary.tweets_measured, 0);
641 }
642
643 #[test]
644 fn profile_metrics_debug_and_clone() {
645 let m = ProfileMetrics {
646 follower_count: 500,
647 following_count: 100,
648 tweet_count: 200,
649 };
650 let m2 = m.clone();
651 assert_eq!(m2.follower_count, 500);
652 let debug = format!("{m:?}");
653 assert!(debug.contains("500"));
654 }
655
656 #[test]
657 fn tweet_metrics_debug_and_clone() {
658 let m = TweetMetrics {
659 likes: 5,
660 retweets: 2,
661 replies: 3,
662 impressions: 100,
663 };
664 let m2 = m.clone();
665 assert_eq!(m2.likes, 5);
666 let debug = format!("{m:?}");
667 assert!(debug.contains("100"));
668 }
669
670 #[test]
671 fn performance_score_all_zeros() {
672 let score = compute_performance_score(0, 0, 0, 0);
673 assert!((score - 0.0).abs() < 0.01);
674 }
675
676 #[test]
677 fn performance_score_high_engagement() {
678 let score = compute_performance_score(100, 50, 30, 500);
679 assert!((score - 1340.0).abs() < 0.01);
681 }
682
683 #[test]
684 fn performance_score_only_likes() {
685 let score = compute_performance_score(10, 0, 0, 100);
686 assert!((score - 300.0).abs() < 0.01);
688 }
689
690 #[test]
691 fn performance_score_only_replies() {
692 let score = compute_performance_score(0, 10, 0, 100);
693 assert!((score - 500.0).abs() < 0.01);
695 }
696
697 #[test]
698 fn performance_score_only_retweets() {
699 let score = compute_performance_score(0, 0, 10, 100);
700 assert!((score - 400.0).abs() < 0.01);
702 }
703
704 #[test]
705 fn performance_score_negative_impressions_clamped() {
706 let score = compute_performance_score(1, 1, 1, -5);
708 assert!((score - 12000.0).abs() < 0.01);
710 }
711
712 #[test]
713 fn analytics_error_debug() {
714 let err = AnalyticsError::ApiError("timeout".to_string());
715 let debug = format!("{err:?}");
716 assert!(debug.contains("ApiError"));
717 assert!(debug.contains("timeout"));
718
719 let err = AnalyticsError::StorageError("disk full".to_string());
720 let debug = format!("{err:?}");
721 assert!(debug.contains("StorageError"));
722
723 let err = AnalyticsError::Other("unexpected".to_string());
724 let debug = format!("{err:?}");
725 assert!(debug.contains("Other"));
726 }
727
728 #[test]
729 fn analytics_summary_debug() {
730 let summary = AnalyticsSummary {
731 follower_count: 500,
732 replies_measured: 3,
733 tweets_measured: 2,
734 };
735 let debug = format!("{summary:?}");
736 assert!(debug.contains("500"));
737 assert!(debug.contains("3"));
738 assert!(debug.contains("2"));
739 }
740
741 #[test]
742 fn analytics_error_source_is_none() {
743 let err = AnalyticsError::ApiError("test".to_string());
744 assert!(std::error::Error::source(&err).is_none());
746 }
747
748 #[tokio::test]
749 async fn iteration_with_both_replies_and_tweets() {
750 let storage = Arc::new(
751 MockAnalyticsStorage::new()
752 .with_replies(vec!["r1".to_string()])
753 .with_tweets(vec!["t1".to_string(), "t2".to_string()]),
754 );
755 let analytics = AnalyticsLoop::new(
756 Arc::new(MockProfileFetcher {
757 metrics: default_profile(),
758 }),
759 Arc::new(MockEngagementFetcher {
760 metrics: default_tweet_metrics(),
761 }),
762 storage.clone(),
763 );
764
765 let summary = analytics.run_iteration().await.expect("iteration");
766 assert_eq!(summary.replies_measured, 1);
767 assert_eq!(summary.tweets_measured, 2);
768 assert_eq!(summary.follower_count, 1000);
769 }
770
771 #[tokio::test]
772 async fn iteration_follower_growth_no_alert() {
773 let storage = Arc::new(MockAnalyticsStorage::new().with_yesterday(1000));
775 let analytics = AnalyticsLoop::new(
776 Arc::new(MockProfileFetcher {
777 metrics: ProfileMetrics {
778 follower_count: 1050,
779 following_count: 200,
780 tweet_count: 500,
781 },
782 }),
783 Arc::new(MockEngagementFetcher {
784 metrics: default_tweet_metrics(),
785 }),
786 storage,
787 );
788
789 let summary = analytics.run_iteration().await.expect("iteration");
790 assert_eq!(summary.follower_count, 1050);
791 }
792
793 #[tokio::test]
794 async fn iteration_no_yesterday_data() {
795 let storage = Arc::new(MockAnalyticsStorage::new());
796 let analytics = AnalyticsLoop::new(
798 Arc::new(MockProfileFetcher {
799 metrics: default_profile(),
800 }),
801 Arc::new(MockEngagementFetcher {
802 metrics: default_tweet_metrics(),
803 }),
804 storage,
805 );
806
807 let summary = analytics.run_iteration().await.expect("iteration");
808 assert_eq!(summary.follower_count, 1000);
809 }
810
811 struct FailingEngagementFetcher;
814
815 #[async_trait::async_trait]
816 impl EngagementFetcher for FailingEngagementFetcher {
817 async fn get_tweet_metrics(&self, _tweet_id: &str) -> Result<TweetMetrics, AnalyticsError> {
818 Err(AnalyticsError::ApiError("rate limited".to_string()))
819 }
820 }
821
822 #[tokio::test]
823 async fn iteration_engagement_fetch_failure_continues() {
824 let storage = Arc::new(
825 MockAnalyticsStorage::new()
826 .with_replies(vec!["r1".to_string()])
827 .with_tweets(vec!["t1".to_string()]),
828 );
829 let analytics = AnalyticsLoop::new(
830 Arc::new(MockProfileFetcher {
831 metrics: default_profile(),
832 }),
833 Arc::new(FailingEngagementFetcher),
834 storage.clone(),
835 );
836
837 let summary = analytics.run_iteration().await.expect("iteration");
838 assert_eq!(summary.replies_measured, 0);
840 assert_eq!(summary.tweets_measured, 0);
841 }
842}