tuitbot_core/automation/analytics_loop/
mod.rs1mod collector;
10mod reporter;
11#[cfg(test)]
12mod tests;
13
14pub use collector::{
15 AnalyticsError, AnalyticsStorage, EngagementFetcher, ForgeSyncResult, ProfileFetcher,
16 ProfileMetrics, TweetMetrics,
17};
18pub use reporter::{compute_performance_score, AnalyticsSummary};
19
20use super::loop_helpers::ConsecutiveErrorTracker;
21use super::scheduler::LoopScheduler;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio_util::sync::CancellationToken;
25
26pub struct AnalyticsLoop {
28 profile_fetcher: Arc<dyn ProfileFetcher>,
29 engagement_fetcher: Arc<dyn EngagementFetcher>,
30 storage: Arc<dyn AnalyticsStorage>,
31}
32
33impl AnalyticsLoop {
34 pub fn new(
36 profile_fetcher: Arc<dyn ProfileFetcher>,
37 engagement_fetcher: Arc<dyn EngagementFetcher>,
38 storage: Arc<dyn AnalyticsStorage>,
39 ) -> Self {
40 Self {
41 profile_fetcher,
42 engagement_fetcher,
43 storage,
44 }
45 }
46
47 pub async fn run(&self, cancel: CancellationToken, scheduler: LoopScheduler) {
49 tracing::info!("Analytics loop started");
50
51 let mut error_tracker = ConsecutiveErrorTracker::new(5, Duration::from_secs(600));
52
53 loop {
54 if cancel.is_cancelled() {
55 break;
56 }
57
58 match self.run_iteration().await {
59 Ok(summary) => {
60 error_tracker.record_success();
61 tracing::info!(
62 followers = summary.follower_count,
63 replies_measured = summary.replies_measured,
64 tweets_measured = summary.tweets_measured,
65 "Analytics iteration complete"
66 );
67 }
68 Err(e) => {
69 let should_pause = error_tracker.record_error();
70 tracing::warn!(error = %e, "Analytics iteration failed");
71
72 if should_pause {
73 tracing::warn!(
74 pause_secs = error_tracker.pause_duration().as_secs(),
75 "Pausing analytics loop due to consecutive errors"
76 );
77 tokio::select! {
78 _ = cancel.cancelled() => break,
79 _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
80 }
81 error_tracker.reset();
82 continue;
83 }
84 }
85 }
86
87 tokio::select! {
88 _ = cancel.cancelled() => break,
89 _ = scheduler.tick() => {},
90 }
91 }
92
93 tracing::info!("Analytics loop stopped");
94 }
95
96 pub async fn run_iteration(&self) -> Result<AnalyticsSummary, AnalyticsError> {
98 let mut summary = AnalyticsSummary::default();
99
100 let metrics = self.profile_fetcher.get_profile_metrics().await?;
102 summary.follower_count = metrics.follower_count;
103
104 tracing::info!(
105 followers = metrics.follower_count,
106 "Follower snapshot: {} followers",
107 metrics.follower_count,
108 );
109
110 self.storage
111 .store_follower_snapshot(
112 metrics.follower_count,
113 metrics.following_count,
114 metrics.tweet_count,
115 )
116 .await?;
117
118 if let Ok(Some(yesterday)) = self.storage.get_yesterday_followers().await {
120 if yesterday > 0 {
121 let drop_pct =
122 (yesterday - metrics.follower_count) as f64 / yesterday as f64 * 100.0;
123 if drop_pct > 2.0 {
124 tracing::warn!(
125 yesterday = yesterday,
126 today = metrics.follower_count,
127 drop_pct = format!("{:.1}%", drop_pct),
128 "Significant follower drop detected"
129 );
130
131 let _ = self
132 .storage
133 .log_action(
134 "analytics",
135 "alert",
136 &format!(
137 "Follower drop: {} -> {} ({:.1}%)",
138 yesterday, metrics.follower_count, drop_pct
139 ),
140 )
141 .await;
142 }
143 }
144 }
145
146 let reply_ids = self.storage.get_replies_needing_measurement().await?;
148 for reply_id in &reply_ids {
149 match self.engagement_fetcher.get_tweet_metrics(reply_id).await {
150 Ok(m) => {
151 let score =
152 compute_performance_score(m.likes, m.replies, m.retweets, m.impressions);
153 let _ = self
154 .storage
155 .store_reply_performance(reply_id, m.likes, m.replies, m.impressions, score)
156 .await;
157 summary.replies_measured += 1;
158 }
159 Err(e) => {
160 tracing::debug!(reply_id = %reply_id, error = %e, "Failed to fetch reply metrics");
161 }
162 }
163 }
164
165 let tweet_ids = self.storage.get_tweets_needing_measurement().await?;
167 for tweet_id in &tweet_ids {
168 match self.engagement_fetcher.get_tweet_metrics(tweet_id).await {
169 Ok(m) => {
170 let score =
171 compute_performance_score(m.likes, m.replies, m.retweets, m.impressions);
172 let _ = self
173 .storage
174 .store_tweet_performance(
175 tweet_id,
176 m.likes,
177 m.retweets,
178 m.replies,
179 m.impressions,
180 score,
181 )
182 .await;
183 summary.tweets_measured += 1;
184 }
185 Err(e) => {
186 tracing::debug!(tweet_id = %tweet_id, error = %e, "Failed to fetch tweet metrics");
187 }
188 }
189 }
190
191 match self.storage.run_forge_sync_if_enabled().await {
193 Ok(Some(forge_result)) => {
194 tracing::info!(
195 tweets_synced = forge_result.tweets_synced,
196 threads_synced = forge_result.threads_synced,
197 "Forge sync complete"
198 );
199 summary.forge_synced = true;
200 }
201 Ok(None) => {
202 }
204 Err(e) => {
205 tracing::warn!(error = %e, "Forge sync failed");
207 }
208 }
209
210 match self.storage.run_aggregations().await {
212 Ok(()) => {
213 tracing::debug!("Background aggregations complete");
214 }
215 Err(e) => {
216 tracing::warn!(error = %e, "Background aggregation failed");
218 }
219 }
220
221 let _ = self
222 .storage
223 .log_action(
224 "analytics",
225 "success",
226 &format!(
227 "Followers: {}, replies measured: {}, tweets measured: {}",
228 summary.follower_count, summary.replies_measured, summary.tweets_measured,
229 ),
230 )
231 .await;
232
233 Ok(summary)
234 }
235}