Skip to main content

tuitbot_core/automation/analytics_loop/
mod.rs

1//! Analytics loop for tracking content performance.
2//!
3//! Runs periodically to:
4//! 1. Snapshot follower counts via the X API.
5//! 2. Fetch engagement metrics on content posted ~24h ago.
6//! 3. Compute performance scores and update running averages.
7//! 4. Alert on significant follower drops.
8
9mod collector;
10mod reporter;
11#[cfg(test)]
12mod tests;
13
14pub use collector::{
15    AnalyticsError, AnalyticsStorage, EngagementFetcher, ForgeSyncResult, ProfileFetcher,
16    ProfileMetrics, TweetMetrics,
17};
18pub use reporter::{compute_performance_score, AnalyticsSummary};
19
20use super::loop_helpers::ConsecutiveErrorTracker;
21use super::scheduler::LoopScheduler;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio_util::sync::CancellationToken;
25
26/// Analytics loop that tracks content performance and follower trends.
27pub struct AnalyticsLoop {
28    profile_fetcher: Arc<dyn ProfileFetcher>,
29    engagement_fetcher: Arc<dyn EngagementFetcher>,
30    storage: Arc<dyn AnalyticsStorage>,
31}
32
33impl AnalyticsLoop {
34    /// Create a new analytics loop.
35    pub fn new(
36        profile_fetcher: Arc<dyn ProfileFetcher>,
37        engagement_fetcher: Arc<dyn EngagementFetcher>,
38        storage: Arc<dyn AnalyticsStorage>,
39    ) -> Self {
40        Self {
41            profile_fetcher,
42            engagement_fetcher,
43            storage,
44        }
45    }
46
47    /// Run the continuous analytics loop until cancellation.
48    pub async fn run(&self, cancel: CancellationToken, scheduler: LoopScheduler) {
49        tracing::info!("Analytics loop started");
50
51        let mut error_tracker = ConsecutiveErrorTracker::new(5, Duration::from_secs(600));
52
53        loop {
54            if cancel.is_cancelled() {
55                break;
56            }
57
58            match self.run_iteration().await {
59                Ok(summary) => {
60                    error_tracker.record_success();
61                    tracing::info!(
62                        followers = summary.follower_count,
63                        replies_measured = summary.replies_measured,
64                        tweets_measured = summary.tweets_measured,
65                        "Analytics iteration complete"
66                    );
67                }
68                Err(e) => {
69                    let should_pause = error_tracker.record_error();
70                    tracing::warn!(error = %e, "Analytics iteration failed");
71
72                    if should_pause {
73                        tracing::warn!(
74                            pause_secs = error_tracker.pause_duration().as_secs(),
75                            "Pausing analytics loop due to consecutive errors"
76                        );
77                        tokio::select! {
78                            _ = cancel.cancelled() => break,
79                            _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
80                        }
81                        error_tracker.reset();
82                        continue;
83                    }
84                }
85            }
86
87            tokio::select! {
88                _ = cancel.cancelled() => break,
89                _ = scheduler.tick() => {},
90            }
91        }
92
93        tracing::info!("Analytics loop stopped");
94    }
95
96    /// Run a single analytics iteration.
97    pub async fn run_iteration(&self) -> Result<AnalyticsSummary, AnalyticsError> {
98        let mut summary = AnalyticsSummary::default();
99
100        // 1. Snapshot follower count
101        let metrics = self.profile_fetcher.get_profile_metrics().await?;
102        summary.follower_count = metrics.follower_count;
103
104        tracing::info!(
105            followers = metrics.follower_count,
106            "Follower snapshot: {} followers",
107            metrics.follower_count,
108        );
109
110        self.storage
111            .store_follower_snapshot(
112                metrics.follower_count,
113                metrics.following_count,
114                metrics.tweet_count,
115            )
116            .await?;
117
118        // Check for significant follower drop
119        if let Ok(Some(yesterday)) = self.storage.get_yesterday_followers().await {
120            if yesterday > 0 {
121                let drop_pct =
122                    (yesterday - metrics.follower_count) as f64 / yesterday as f64 * 100.0;
123                if drop_pct > 2.0 {
124                    tracing::warn!(
125                        yesterday = yesterday,
126                        today = metrics.follower_count,
127                        drop_pct = format!("{:.1}%", drop_pct),
128                        "Significant follower drop detected"
129                    );
130
131                    let _ = self
132                        .storage
133                        .log_action(
134                            "analytics",
135                            "alert",
136                            &format!(
137                                "Follower drop: {} -> {} ({:.1}%)",
138                                yesterday, metrics.follower_count, drop_pct
139                            ),
140                        )
141                        .await;
142                }
143            }
144        }
145
146        // 2. Measure reply performance
147        let reply_ids = self.storage.get_replies_needing_measurement().await?;
148        for reply_id in &reply_ids {
149            match self.engagement_fetcher.get_tweet_metrics(reply_id).await {
150                Ok(m) => {
151                    let score =
152                        compute_performance_score(m.likes, m.replies, m.retweets, m.impressions);
153                    let _ = self
154                        .storage
155                        .store_reply_performance(reply_id, m.likes, m.replies, m.impressions, score)
156                        .await;
157                    summary.replies_measured += 1;
158                }
159                Err(e) => {
160                    tracing::debug!(reply_id = %reply_id, error = %e, "Failed to fetch reply metrics");
161                }
162            }
163        }
164
165        // 3. Measure tweet performance
166        let tweet_ids = self.storage.get_tweets_needing_measurement().await?;
167        for tweet_id in &tweet_ids {
168            match self.engagement_fetcher.get_tweet_metrics(tweet_id).await {
169                Ok(m) => {
170                    let score =
171                        compute_performance_score(m.likes, m.replies, m.retweets, m.impressions);
172                    let _ = self
173                        .storage
174                        .store_tweet_performance(
175                            tweet_id,
176                            m.likes,
177                            m.retweets,
178                            m.replies,
179                            m.impressions,
180                            score,
181                        )
182                        .await;
183                    summary.tweets_measured += 1;
184                }
185                Err(e) => {
186                    tracing::debug!(tweet_id = %tweet_id, error = %e, "Failed to fetch tweet metrics");
187                }
188            }
189        }
190
191        // 4. Forge sync (if enabled)
192        match self.storage.run_forge_sync_if_enabled().await {
193            Ok(Some(forge_result)) => {
194                tracing::info!(
195                    tweets_synced = forge_result.tweets_synced,
196                    threads_synced = forge_result.threads_synced,
197                    "Forge sync complete"
198                );
199                summary.forge_synced = true;
200            }
201            Ok(None) => {
202                // Forge sync not enabled — no action
203            }
204            Err(e) => {
205                // Forge sync failure is non-fatal
206                tracing::warn!(error = %e, "Forge sync failed");
207            }
208        }
209
210        // 5. Run background aggregations (best-times heatmap, reach snapshots)
211        match self.storage.run_aggregations().await {
212            Ok(()) => {
213                tracing::debug!("Background aggregations complete");
214            }
215            Err(e) => {
216                // Aggregation failure is non-fatal
217                tracing::warn!(error = %e, "Background aggregation failed");
218            }
219        }
220
221        let _ = self
222            .storage
223            .log_action(
224                "analytics",
225                "success",
226                &format!(
227                    "Followers: {}, replies measured: {}, tweets measured: {}",
228                    summary.follower_count, summary.replies_measured, summary.tweets_measured,
229                ),
230            )
231            .await;
232
233        Ok(summary)
234    }
235}