Skip to main content

tuitbot_core/automation/watchtower/loopback/sync/
mod.rs

1//! Forge sync engine — enriches source-note frontmatter with analytics.
2//!
3//! The sync path is separate from the publish writeback path. It never creates
4//! entries; it only updates analytics fields on entries that already exist
5//! (created by `write_metadata_to_file` during the publish step).
6
7use std::io;
8use std::path::Path;
9
10use super::{
11    parse_tuitbot_metadata, serialize_frontmatter_to_file, split_front_matter, TuitbotFrontMatter,
12};
13
14#[cfg(test)]
15mod tests;
16
17/// Result of an analytics update attempt on a single entry.
18#[derive(Debug, PartialEq, Eq)]
19pub enum UpdateResult {
20    Updated,
21    EntryNotFound,
22    FileNotFound,
23}
24
25/// Analytics data to write into a frontmatter entry.
26#[derive(Debug, Clone)]
27pub struct EntryAnalytics {
28    pub impressions: i64,
29    pub likes: i64,
30    pub retweets: i64,
31    pub replies: i64,
32    pub engagement_rate: Option<f64>,
33    pub performance_score: Option<f64>,
34    pub synced_at: String,
35}
36
37// Re-export from storage so callers that import from sync continue to work.
38pub use crate::storage::analytics::PerformancePercentiles;
39
40/// Metrics from the tweet_performance table for Forge sync.
41#[derive(Debug, Clone)]
42pub struct TweetPerformanceRow {
43    pub tweet_id: String,
44    pub likes_received: i64,
45    pub retweets_received: i64,
46    pub replies_received: i64,
47    pub impressions: i64,
48    pub performance_score: f64,
49}
50
51/// Result summary of a Forge sync iteration.
52#[derive(Debug, Default)]
53pub struct ForgeSyncSummary {
54    pub tweets_synced: usize,
55    pub threads_synced: usize,
56    pub entries_not_found: usize,
57    pub files_not_found: usize,
58    pub non_local_skipped: usize,
59}
60
61/// Update analytics fields on an existing frontmatter entry identified by tweet_id.
62///
63/// This is the Forge sync path — it never creates entries, only updates them.
64/// After updating, recomputes note-level summary fields.
65pub fn update_entry_analytics(
66    path: &Path,
67    tweet_id: &str,
68    analytics: &EntryAnalytics,
69    percentiles: &PerformancePercentiles,
70) -> Result<UpdateResult, io::Error> {
71    let content = match std::fs::read_to_string(path) {
72        Ok(c) => c,
73        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(UpdateResult::FileNotFound),
74        Err(e) => return Err(e),
75    };
76
77    let existing = parse_tuitbot_metadata(&content);
78    if !existing.iter().any(|e| e.tweet_id == tweet_id) {
79        return Ok(UpdateResult::EntryNotFound);
80    }
81
82    let (yaml_str, body) = split_front_matter(&content);
83
84    let mut fm: TuitbotFrontMatter = match yaml_str {
85        Some(y) => serde_yaml::from_str(y).unwrap_or_default(),
86        None => return Ok(UpdateResult::EntryNotFound),
87    };
88
89    // Find and update the matching entry (first match only).
90    let mut found = false;
91    for entry in &mut fm.tuitbot {
92        if entry.tweet_id == tweet_id {
93            entry.impressions = Some(analytics.impressions);
94            entry.likes = Some(analytics.likes);
95            entry.retweets = Some(analytics.retweets);
96            entry.replies = Some(analytics.replies);
97            entry.engagement_rate = analytics.engagement_rate;
98            entry.performance_score = analytics.performance_score;
99            entry.synced_at = Some(analytics.synced_at.clone());
100            found = true;
101            break;
102        }
103    }
104
105    if !found {
106        return Ok(UpdateResult::EntryNotFound);
107    }
108
109    recompute_summaries(&mut fm, percentiles);
110    serialize_frontmatter_to_file(path, &fm, body)?;
111    Ok(UpdateResult::Updated)
112}
113
114/// Recompute note-level summary fields from the tuitbot entries.
115///
116/// Sets `tuitbot_social_performance`, `tuitbot_best_post_impressions`,
117/// `tuitbot_best_post_url`, and `tuitbot_last_synced_at` in the
118/// `TuitbotFrontMatter.other` mapping.
119pub fn recompute_summaries(fm: &mut TuitbotFrontMatter, percentiles: &PerformancePercentiles) {
120    let key = |s: &str| serde_yaml::Value::String(s.to_string());
121    let str_val = |s: &str| serde_yaml::Value::String(s.to_string());
122
123    // Remove existing summary keys first (clean slate).
124    let summary_keys = [
125        "tuitbot_social_performance",
126        "tuitbot_best_post_impressions",
127        "tuitbot_best_post_url",
128        "tuitbot_last_synced_at",
129    ];
130    for k in &summary_keys {
131        fm.other.remove(key(k));
132    }
133
134    // Find the entry with the highest impressions (tie-break by latest published_at).
135    let best = fm
136        .tuitbot
137        .iter()
138        .filter(|e| e.impressions.is_some())
139        .max_by(|a, b| {
140            let imp_cmp = a.impressions.unwrap().cmp(&b.impressions.unwrap());
141            if imp_cmp == std::cmp::Ordering::Equal {
142                a.published_at.cmp(&b.published_at)
143            } else {
144                imp_cmp
145            }
146        });
147
148    let best = match best {
149        Some(b) => b,
150        None => {
151            fm.other
152                .insert(key("tuitbot_social_performance"), str_val("none"));
153            return;
154        }
155    };
156
157    let best_impressions = best.impressions.unwrap();
158    let best_url = best.url.clone();
159
160    // Determine performance tier.
161    let tier = if !percentiles.has_sufficient_data {
162        "none"
163    } else if best_impressions >= percentiles.p90_impressions {
164        "high"
165    } else if best_impressions >= percentiles.p50_impressions {
166        "medium"
167    } else {
168        "low"
169    };
170
171    fm.other
172        .insert(key("tuitbot_social_performance"), str_val(tier));
173    fm.other.insert(
174        key("tuitbot_best_post_impressions"),
175        serde_yaml::Value::Number(serde_yaml::Number::from(best_impressions)),
176    );
177    fm.other
178        .insert(key("tuitbot_best_post_url"), str_val(&best_url));
179
180    // Find the most recent synced_at across all entries.
181    let last_synced = fm
182        .tuitbot
183        .iter()
184        .filter_map(|e| e.synced_at.as_deref())
185        .max();
186
187    if let Some(synced) = last_synced {
188        fm.other
189            .insert(key("tuitbot_last_synced_at"), str_val(synced));
190    }
191}
192
193/// Aggregate tweet_performance metrics for a thread.
194///
195/// Sums counts (impressions, likes, retweets, replies) across all tweet IDs.
196/// Computes engagement_rate from totals and performance_score as
197/// impression-weighted average.
198pub fn aggregate_thread_metrics(performances: &[TweetPerformanceRow]) -> Option<EntryAnalytics> {
199    if performances.is_empty() {
200        return None;
201    }
202
203    let total_impressions: i64 = performances.iter().map(|p| p.impressions).sum();
204    let total_likes: i64 = performances.iter().map(|p| p.likes_received).sum();
205    let total_retweets: i64 = performances.iter().map(|p| p.retweets_received).sum();
206    let total_replies: i64 = performances.iter().map(|p| p.replies_received).sum();
207
208    let engagement_rate = if total_impressions > 0 {
209        let engagements = (total_likes + total_retweets + total_replies) as f64;
210        Some((engagements / total_impressions as f64) * 100.0)
211    } else {
212        None
213    };
214
215    // Impression-weighted average performance score.
216    let weighted_sum: f64 = performances
217        .iter()
218        .filter(|p| p.impressions > 0)
219        .map(|p| p.performance_score * p.impressions as f64)
220        .sum();
221    let total_weight: i64 = performances
222        .iter()
223        .filter(|p| p.impressions > 0)
224        .map(|p| p.impressions)
225        .sum();
226
227    let performance_score = if total_weight > 0 {
228        Some(weighted_sum / total_weight as f64)
229    } else {
230        None
231    };
232
233    let synced_at = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
234
235    Some(EntryAnalytics {
236        impressions: total_impressions,
237        likes: total_likes,
238        retweets: total_retweets,
239        replies: total_replies,
240        engagement_rate,
241        performance_score,
242        synced_at,
243    })
244}
245
246/// Run a single Forge sync iteration.
247///
248/// Finds all tweets with measured performance that have provenance
249/// to local_fs source notes, and writes aggregated analytics into
250/// those notes' frontmatter.
251pub async fn run_forge_sync(
252    pool: &crate::storage::DbPool,
253    account_id: &str,
254    analytics_sync_enabled: bool,
255    percentiles: &PerformancePercentiles,
256) -> Result<ForgeSyncSummary, crate::error::StorageError> {
257    if !analytics_sync_enabled {
258        return Ok(ForgeSyncSummary::default());
259    }
260
261    let performances =
262        crate::storage::analytics::get_all_tweet_performances_for(pool, account_id).await?;
263
264    let mut summary = ForgeSyncSummary::default();
265
266    for perf in &performances {
267        // Look up original_tweets row for this tweet_id.
268        let ot_id = match crate::storage::threads::get_original_tweet_id_by_tweet_id(
269            pool,
270            account_id,
271            &perf.tweet_id,
272        )
273        .await?
274        {
275            Some(id) => id,
276            None => continue,
277        };
278
279        // Look up provenance to find source path.
280        let (source_path, source_type, base_path) =
281            match crate::storage::provenance::get_primary_source_for_tweet(pool, account_id, ot_id)
282                .await?
283            {
284                Some(info) => info,
285                None => continue,
286            };
287
288        // Gate on source type — only local_fs is writable.
289        if source_type != "local_fs" {
290            summary.non_local_skipped += 1;
291            continue;
292        }
293
294        let expanded = crate::storage::expand_tilde(&base_path);
295        let full_path = std::path::PathBuf::from(expanded).join(&source_path);
296
297        // Check if this is a thread root by looking for children.
298        let child_ids = crate::storage::threads::get_thread_tweet_ids_by_root_for(
299            pool,
300            account_id,
301            &perf.tweet_id,
302        )
303        .await
304        .unwrap_or_default();
305
306        let is_thread = !child_ids.is_empty();
307        let analytics = if !is_thread {
308            // Single tweet — use metrics directly.
309            EntryAnalytics {
310                impressions: perf.impressions,
311                likes: perf.likes_received,
312                retweets: perf.retweets_received,
313                replies: perf.replies_received,
314                engagement_rate: if perf.impressions > 0 {
315                    let eng = (perf.likes_received + perf.retweets_received + perf.replies_received)
316                        as f64;
317                    Some((eng / perf.impressions as f64) * 100.0)
318                } else {
319                    None
320                },
321                performance_score: Some(perf.performance_score),
322                synced_at: chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
323            }
324        } else {
325            // Thread — gather all tweet IDs and aggregate.
326            let mut all_ids = vec![perf.tweet_id.clone()];
327            all_ids.extend(child_ids);
328
329            let all_performances =
330                crate::storage::analytics::get_tweet_performances_for(pool, account_id, &all_ids)
331                    .await?;
332
333            let converted: Vec<TweetPerformanceRow> = all_performances
334                .into_iter()
335                .map(|p| TweetPerformanceRow {
336                    tweet_id: p.tweet_id,
337                    likes_received: p.likes_received,
338                    retweets_received: p.retweets_received,
339                    replies_received: p.replies_received,
340                    impressions: p.impressions,
341                    performance_score: p.performance_score,
342                })
343                .collect();
344
345            match aggregate_thread_metrics(&converted) {
346                Some(a) => a,
347                None => continue,
348            }
349        };
350
351        match update_entry_analytics(&full_path, &perf.tweet_id, &analytics, percentiles) {
352            Ok(UpdateResult::Updated) => {
353                if !is_thread {
354                    summary.tweets_synced += 1;
355                } else {
356                    summary.threads_synced += 1;
357                }
358            }
359            Ok(UpdateResult::EntryNotFound) => {
360                summary.entries_not_found += 1;
361            }
362            Ok(UpdateResult::FileNotFound) => {
363                summary.files_not_found += 1;
364            }
365            Err(e) => {
366                tracing::warn!(
367                    tweet_id = %perf.tweet_id,
368                    path = %full_path.display(),
369                    error = %e,
370                    "Forge sync: file write failed"
371                );
372                summary.files_not_found += 1;
373            }
374        }
375    }
376
377    Ok(summary)
378}