Skip to main content

tuitbot_core/storage/analytics/
ancestors.rs

1use super::super::DbPool;
2use crate::error::StorageError;
3
4/// Update the archetype_vibe classification for a tweet performance record.
5pub async fn update_tweet_archetype(
6    pool: &DbPool,
7    tweet_id: &str,
8    archetype_vibe: &str,
9) -> Result<(), StorageError> {
10    sqlx::query("UPDATE tweet_performance SET archetype_vibe = ? WHERE tweet_id = ?")
11        .bind(archetype_vibe)
12        .bind(tweet_id)
13        .execute(pool)
14        .await
15        .map_err(|e| StorageError::Query { source: e })?;
16    Ok(())
17}
18
19/// Update the archetype_vibe classification for a reply performance record.
20pub async fn update_reply_archetype(
21    pool: &DbPool,
22    reply_id: &str,
23    archetype_vibe: &str,
24) -> Result<(), StorageError> {
25    sqlx::query("UPDATE reply_performance SET archetype_vibe = ? WHERE reply_id = ?")
26        .bind(archetype_vibe)
27        .bind(reply_id)
28        .execute(pool)
29        .await
30        .map_err(|e| StorageError::Query { source: e })?;
31    Ok(())
32}
33
34/// Update the engagement_score for a tweet performance record.
35pub async fn update_tweet_engagement_score(
36    pool: &DbPool,
37    tweet_id: &str,
38    score: f64,
39) -> Result<(), StorageError> {
40    sqlx::query("UPDATE tweet_performance SET engagement_score = ? WHERE tweet_id = ?")
41        .bind(score)
42        .bind(tweet_id)
43        .execute(pool)
44        .await
45        .map_err(|e| StorageError::Query { source: e })?;
46    Ok(())
47}
48
49/// Update the engagement_score for a reply performance record.
50pub async fn update_reply_engagement_score(
51    pool: &DbPool,
52    reply_id: &str,
53    score: f64,
54) -> Result<(), StorageError> {
55    sqlx::query("UPDATE reply_performance SET engagement_score = ? WHERE reply_id = ?")
56        .bind(score)
57        .bind(reply_id)
58        .execute(pool)
59        .await
60        .map_err(|e| StorageError::Query { source: e })?;
61    Ok(())
62}
63
64/// Get the maximum performance_score across all tweets and replies.
65///
66/// Returns 0.0 if no performance data exists. Used to normalize engagement scores.
67pub async fn get_max_performance_score(pool: &DbPool) -> Result<f64, StorageError> {
68    let row: (f64,) = sqlx::query_as(
69        "SELECT COALESCE(MAX(max_score), 0.0) FROM (\
70             SELECT MAX(performance_score) as max_score FROM tweet_performance \
71             UNION ALL \
72             SELECT MAX(performance_score) as max_score FROM reply_performance\
73         )",
74    )
75    .fetch_one(pool)
76    .await
77    .map_err(|e| StorageError::Query { source: e })?;
78
79    Ok(row.0)
80}
81
82/// Row type returned by the ancestor retrieval query.
83#[derive(Debug, Clone)]
84pub struct AncestorRow {
85    /// "tweet" or "reply"
86    pub content_type: String,
87    /// The tweet or reply ID.
88    pub id: String,
89    /// Truncated content preview (up to 120 chars).
90    pub content_preview: String,
91    /// Archetype classification (may be None if not yet classified).
92    pub archetype_vibe: Option<String>,
93    /// Normalized engagement score (0.0-1.0).
94    pub engagement_score: Option<f64>,
95    /// Raw performance score.
96    pub performance_score: f64,
97    /// When the content was posted (ISO-8601).
98    pub posted_at: String,
99}
100
101/// Row type returned by the ancestor retrieval UNION query.
102type AncestorQueryRow = (
103    String,
104    String,
105    String,
106    Option<String>,
107    Option<f64>,
108    f64,
109    String,
110);
111
112/// Convert an ancestor query row tuple into an `AncestorRow` struct.
113fn ancestor_row_from_tuple(r: AncestorQueryRow) -> AncestorRow {
114    AncestorRow {
115        content_type: r.0,
116        id: r.1,
117        content_preview: r.2,
118        archetype_vibe: r.3,
119        engagement_score: r.4,
120        performance_score: r.5,
121        posted_at: r.6,
122    }
123}
124
125/// Query scored ancestors with engagement_score populated.
126///
127/// Returns ancestors where `engagement_score >= min_score`, ordered by
128/// engagement_score DESC. For topic matching, uses the `topic` column on
129/// original_tweets and LIKE-based content matching on replies.
130pub async fn get_scored_ancestors(
131    pool: &DbPool,
132    topic_keywords: &[String],
133    min_score: f64,
134    limit: u32,
135) -> Result<Vec<AncestorRow>, StorageError> {
136    if topic_keywords.is_empty() {
137        // No keywords: return top ancestors regardless of topic
138        let rows: Vec<AncestorQueryRow> = sqlx::query_as(
139            "SELECT 'tweet' as content_type, tp.tweet_id, \
140                        SUBSTR(ot.content, 1, 120), \
141                        tp.archetype_vibe, tp.engagement_score, tp.performance_score, \
142                        ot.created_at \
143                 FROM tweet_performance tp \
144                 JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
145                 WHERE tp.engagement_score IS NOT NULL \
146                   AND tp.engagement_score >= ? \
147                 UNION ALL \
148                 SELECT 'reply', rp.reply_id, SUBSTR(rs.reply_content, 1, 120), \
149                        rp.archetype_vibe, rp.engagement_score, rp.performance_score, \
150                        rs.created_at \
151                 FROM reply_performance rp \
152                 JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
153                 WHERE rp.engagement_score IS NOT NULL \
154                   AND rp.engagement_score >= ? \
155                 ORDER BY engagement_score DESC \
156                 LIMIT ?",
157        )
158        .bind(min_score)
159        .bind(min_score)
160        .bind(limit)
161        .fetch_all(pool)
162        .await
163        .map_err(|e| StorageError::Query { source: e })?;
164
165        return Ok(rows.into_iter().map(ancestor_row_from_tuple).collect());
166    }
167
168    // Build parameterized IN clause for tweet topics and LIKE clauses for replies.
169    // SQLx uses sequential `?` placeholders for SQLite.
170    let topic_placeholders: String = (0..topic_keywords.len())
171        .map(|_| "?".to_string())
172        .collect::<Vec<_>>()
173        .join(", ");
174
175    let like_conditions: Vec<String> = (0..topic_keywords.len())
176        .map(|_| "rs.reply_content LIKE '%' || ? || '%'".to_string())
177        .collect();
178    let like_clause = like_conditions.join(" OR ");
179
180    let query_str = format!(
181        "SELECT 'tweet' as content_type, tp.tweet_id, \
182                SUBSTR(ot.content, 1, 120), \
183                tp.archetype_vibe, tp.engagement_score, tp.performance_score, \
184                ot.created_at \
185         FROM tweet_performance tp \
186         JOIN original_tweets ot ON ot.tweet_id = tp.tweet_id \
187         WHERE tp.engagement_score IS NOT NULL \
188           AND tp.engagement_score >= ? \
189           AND (ot.topic IN ({topic_placeholders})) \
190         UNION ALL \
191         SELECT 'reply', rp.reply_id, SUBSTR(rs.reply_content, 1, 120), \
192                rp.archetype_vibe, rp.engagement_score, rp.performance_score, \
193                rs.created_at \
194         FROM reply_performance rp \
195         JOIN replies_sent rs ON rs.reply_tweet_id = rp.reply_id \
196         WHERE rp.engagement_score IS NOT NULL \
197           AND rp.engagement_score >= ? \
198           AND ({like_clause}) \
199         ORDER BY engagement_score DESC \
200         LIMIT ?"
201    );
202
203    let mut query = sqlx::query_as::<_, AncestorQueryRow>(&query_str);
204
205    // Bind: min_score for tweets, then topic keywords for IN clause
206    query = query.bind(min_score);
207    for kw in topic_keywords {
208        query = query.bind(kw);
209    }
210    // Bind: min_score for replies, then keywords for LIKE clauses, then limit
211    query = query.bind(min_score);
212    for kw in topic_keywords {
213        query = query.bind(kw);
214    }
215    query = query.bind(limit);
216
217    let rows = query
218        .fetch_all(pool)
219        .await
220        .map_err(|e| StorageError::Query { source: e })?;
221
222    Ok(rows.into_iter().map(ancestor_row_from_tuple).collect())
223}