tuitbot_core/automation/watchtower/loopback/sync/
mod.rs1use std::io;
8use std::path::Path;
9
10use super::{
11 parse_tuitbot_metadata, serialize_frontmatter_to_file, split_front_matter, TuitbotFrontMatter,
12};
13
14#[cfg(test)]
15mod tests;
16
17#[derive(Debug, PartialEq, Eq)]
19pub enum UpdateResult {
20 Updated,
21 EntryNotFound,
22 FileNotFound,
23}
24
25#[derive(Debug, Clone)]
27pub struct EntryAnalytics {
28 pub impressions: i64,
29 pub likes: i64,
30 pub retweets: i64,
31 pub replies: i64,
32 pub engagement_rate: Option<f64>,
33 pub performance_score: Option<f64>,
34 pub synced_at: String,
35}
36
37pub use crate::storage::analytics::PerformancePercentiles;
39
40#[derive(Debug, Clone)]
42pub struct TweetPerformanceRow {
43 pub tweet_id: String,
44 pub likes_received: i64,
45 pub retweets_received: i64,
46 pub replies_received: i64,
47 pub impressions: i64,
48 pub performance_score: f64,
49}
50
51#[derive(Debug, Default)]
53pub struct ForgeSyncSummary {
54 pub tweets_synced: usize,
55 pub threads_synced: usize,
56 pub entries_not_found: usize,
57 pub files_not_found: usize,
58 pub non_local_skipped: usize,
59}
60
61pub fn update_entry_analytics(
66 path: &Path,
67 tweet_id: &str,
68 analytics: &EntryAnalytics,
69 percentiles: &PerformancePercentiles,
70) -> Result<UpdateResult, io::Error> {
71 let content = match std::fs::read_to_string(path) {
72 Ok(c) => c,
73 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(UpdateResult::FileNotFound),
74 Err(e) => return Err(e),
75 };
76
77 let existing = parse_tuitbot_metadata(&content);
78 if !existing.iter().any(|e| e.tweet_id == tweet_id) {
79 return Ok(UpdateResult::EntryNotFound);
80 }
81
82 let (yaml_str, body) = split_front_matter(&content);
83
84 let mut fm: TuitbotFrontMatter = match yaml_str {
85 Some(y) => serde_yaml::from_str(y).unwrap_or_default(),
86 None => return Ok(UpdateResult::EntryNotFound),
87 };
88
89 let mut found = false;
91 for entry in &mut fm.tuitbot {
92 if entry.tweet_id == tweet_id {
93 entry.impressions = Some(analytics.impressions);
94 entry.likes = Some(analytics.likes);
95 entry.retweets = Some(analytics.retweets);
96 entry.replies = Some(analytics.replies);
97 entry.engagement_rate = analytics.engagement_rate;
98 entry.performance_score = analytics.performance_score;
99 entry.synced_at = Some(analytics.synced_at.clone());
100 found = true;
101 break;
102 }
103 }
104
105 if !found {
106 return Ok(UpdateResult::EntryNotFound);
107 }
108
109 recompute_summaries(&mut fm, percentiles);
110 serialize_frontmatter_to_file(path, &fm, body)?;
111 Ok(UpdateResult::Updated)
112}
113
114pub fn recompute_summaries(fm: &mut TuitbotFrontMatter, percentiles: &PerformancePercentiles) {
120 let key = |s: &str| serde_yaml::Value::String(s.to_string());
121 let str_val = |s: &str| serde_yaml::Value::String(s.to_string());
122
123 let summary_keys = [
125 "tuitbot_social_performance",
126 "tuitbot_best_post_impressions",
127 "tuitbot_best_post_url",
128 "tuitbot_last_synced_at",
129 ];
130 for k in &summary_keys {
131 fm.other.remove(key(k));
132 }
133
134 let best = fm
136 .tuitbot
137 .iter()
138 .filter(|e| e.impressions.is_some())
139 .max_by(|a, b| {
140 let imp_cmp = a.impressions.unwrap().cmp(&b.impressions.unwrap());
141 if imp_cmp == std::cmp::Ordering::Equal {
142 a.published_at.cmp(&b.published_at)
143 } else {
144 imp_cmp
145 }
146 });
147
148 let best = match best {
149 Some(b) => b,
150 None => {
151 fm.other
152 .insert(key("tuitbot_social_performance"), str_val("none"));
153 return;
154 }
155 };
156
157 let best_impressions = best.impressions.unwrap();
158 let best_url = best.url.clone();
159
160 let tier = if !percentiles.has_sufficient_data {
162 "none"
163 } else if best_impressions >= percentiles.p90_impressions {
164 "high"
165 } else if best_impressions >= percentiles.p50_impressions {
166 "medium"
167 } else {
168 "low"
169 };
170
171 fm.other
172 .insert(key("tuitbot_social_performance"), str_val(tier));
173 fm.other.insert(
174 key("tuitbot_best_post_impressions"),
175 serde_yaml::Value::Number(serde_yaml::Number::from(best_impressions)),
176 );
177 fm.other
178 .insert(key("tuitbot_best_post_url"), str_val(&best_url));
179
180 let last_synced = fm
182 .tuitbot
183 .iter()
184 .filter_map(|e| e.synced_at.as_deref())
185 .max();
186
187 if let Some(synced) = last_synced {
188 fm.other
189 .insert(key("tuitbot_last_synced_at"), str_val(synced));
190 }
191}
192
193pub fn aggregate_thread_metrics(performances: &[TweetPerformanceRow]) -> Option<EntryAnalytics> {
199 if performances.is_empty() {
200 return None;
201 }
202
203 let total_impressions: i64 = performances.iter().map(|p| p.impressions).sum();
204 let total_likes: i64 = performances.iter().map(|p| p.likes_received).sum();
205 let total_retweets: i64 = performances.iter().map(|p| p.retweets_received).sum();
206 let total_replies: i64 = performances.iter().map(|p| p.replies_received).sum();
207
208 let engagement_rate = if total_impressions > 0 {
209 let engagements = (total_likes + total_retweets + total_replies) as f64;
210 Some((engagements / total_impressions as f64) * 100.0)
211 } else {
212 None
213 };
214
215 let weighted_sum: f64 = performances
217 .iter()
218 .filter(|p| p.impressions > 0)
219 .map(|p| p.performance_score * p.impressions as f64)
220 .sum();
221 let total_weight: i64 = performances
222 .iter()
223 .filter(|p| p.impressions > 0)
224 .map(|p| p.impressions)
225 .sum();
226
227 let performance_score = if total_weight > 0 {
228 Some(weighted_sum / total_weight as f64)
229 } else {
230 None
231 };
232
233 let synced_at = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
234
235 Some(EntryAnalytics {
236 impressions: total_impressions,
237 likes: total_likes,
238 retweets: total_retweets,
239 replies: total_replies,
240 engagement_rate,
241 performance_score,
242 synced_at,
243 })
244}
245
246pub async fn run_forge_sync(
252 pool: &crate::storage::DbPool,
253 account_id: &str,
254 analytics_sync_enabled: bool,
255 percentiles: &PerformancePercentiles,
256) -> Result<ForgeSyncSummary, crate::error::StorageError> {
257 if !analytics_sync_enabled {
258 return Ok(ForgeSyncSummary::default());
259 }
260
261 let performances =
262 crate::storage::analytics::get_all_tweet_performances_for(pool, account_id).await?;
263
264 let mut summary = ForgeSyncSummary::default();
265
266 for perf in &performances {
267 let ot_id = match crate::storage::threads::get_original_tweet_id_by_tweet_id(
269 pool,
270 account_id,
271 &perf.tweet_id,
272 )
273 .await?
274 {
275 Some(id) => id,
276 None => continue,
277 };
278
279 let (source_path, source_type, base_path) =
281 match crate::storage::provenance::get_primary_source_for_tweet(pool, account_id, ot_id)
282 .await?
283 {
284 Some(info) => info,
285 None => continue,
286 };
287
288 if source_type != "local_fs" {
290 summary.non_local_skipped += 1;
291 continue;
292 }
293
294 let expanded = crate::storage::expand_tilde(&base_path);
295 let full_path = std::path::PathBuf::from(expanded).join(&source_path);
296
297 let child_ids = crate::storage::threads::get_thread_tweet_ids_by_root_for(
299 pool,
300 account_id,
301 &perf.tweet_id,
302 )
303 .await
304 .unwrap_or_default();
305
306 let is_thread = !child_ids.is_empty();
307 let analytics = if !is_thread {
308 EntryAnalytics {
310 impressions: perf.impressions,
311 likes: perf.likes_received,
312 retweets: perf.retweets_received,
313 replies: perf.replies_received,
314 engagement_rate: if perf.impressions > 0 {
315 let eng = (perf.likes_received + perf.retweets_received + perf.replies_received)
316 as f64;
317 Some((eng / perf.impressions as f64) * 100.0)
318 } else {
319 None
320 },
321 performance_score: Some(perf.performance_score),
322 synced_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
323 }
324 } else {
325 let mut all_ids = vec![perf.tweet_id.clone()];
327 all_ids.extend(child_ids);
328
329 let all_performances =
330 crate::storage::analytics::get_tweet_performances_for(pool, account_id, &all_ids)
331 .await?;
332
333 let converted: Vec<TweetPerformanceRow> = all_performances
334 .into_iter()
335 .map(|p| TweetPerformanceRow {
336 tweet_id: p.tweet_id,
337 likes_received: p.likes_received,
338 retweets_received: p.retweets_received,
339 replies_received: p.replies_received,
340 impressions: p.impressions,
341 performance_score: p.performance_score,
342 })
343 .collect();
344
345 match aggregate_thread_metrics(&converted) {
346 Some(a) => a,
347 None => continue,
348 }
349 };
350
351 match update_entry_analytics(&full_path, &perf.tweet_id, &analytics, percentiles) {
352 Ok(UpdateResult::Updated) => {
353 if !is_thread {
354 summary.tweets_synced += 1;
355 } else {
356 summary.threads_synced += 1;
357 }
358 }
359 Ok(UpdateResult::EntryNotFound) => {
360 summary.entries_not_found += 1;
361 }
362 Ok(UpdateResult::FileNotFound) => {
363 summary.files_not_found += 1;
364 }
365 Err(e) => {
366 tracing::warn!(
367 tweet_id = %perf.tweet_id,
368 path = %full_path.display(),
369 error = %e,
370 "Forge sync: file write failed"
371 );
372 summary.files_not_found += 1;
373 }
374 }
375 }
376
377 Ok(summary)
378}