Skip to main content

tuitbot_core/storage/
threads.rs

1//! CRUD operations for original tweets and educational threads.
2//!
3//! Provides functions to insert and query original tweets and threads,
4//! supporting the content and thread automation loops.
5
6use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9
10/// An educational tweet generated and posted by the agent.
11#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
12pub struct OriginalTweet {
13    /// Internal auto-generated ID.
14    pub id: i64,
15    /// X tweet ID after posting (None if failed).
16    pub tweet_id: Option<String>,
17    /// Tweet text.
18    pub content: String,
19    /// Industry topic this covers.
20    pub topic: Option<String>,
21    /// Which LLM generated this.
22    pub llm_provider: Option<String>,
23    /// ISO-8601 UTC timestamp when tweet was posted.
24    pub created_at: String,
25    /// Status: sent or failed.
26    pub status: String,
27    /// Error details if failed.
28    pub error_message: Option<String>,
29}
30
31/// A series of connected tweets posted as a thread.
32#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
33pub struct Thread {
34    /// Internal auto-generated ID.
35    pub id: i64,
36    /// Thread topic.
37    pub topic: String,
38    /// Number of tweets in thread.
39    pub tweet_count: i64,
40    /// X tweet ID of first tweet.
41    pub root_tweet_id: Option<String>,
42    /// ISO-8601 UTC timestamp when thread was posted.
43    pub created_at: String,
44    /// Status: sent, partial, or failed.
45    pub status: String,
46}
47
48/// An individual tweet within a thread.
49#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
50pub struct ThreadTweet {
51    /// Internal auto-generated ID.
52    pub id: i64,
53    /// Parent thread ID.
54    pub thread_id: i64,
55    /// 0-indexed position in thread.
56    pub position: i64,
57    /// X tweet ID after posting.
58    pub tweet_id: Option<String>,
59    /// Tweet text.
60    pub content: String,
61    /// ISO-8601 UTC timestamp.
62    pub created_at: String,
63}
64
65/// Insert a new original tweet for a specific account. Returns the auto-generated ID.
66pub async fn insert_original_tweet_for(
67    pool: &DbPool,
68    account_id: &str,
69    tweet: &OriginalTweet,
70) -> Result<i64, StorageError> {
71    let result = sqlx::query(
72        "INSERT INTO original_tweets \
73         (account_id, tweet_id, content, topic, llm_provider, created_at, status, error_message) \
74         VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
75    )
76    .bind(account_id)
77    .bind(&tweet.tweet_id)
78    .bind(&tweet.content)
79    .bind(&tweet.topic)
80    .bind(&tweet.llm_provider)
81    .bind(&tweet.created_at)
82    .bind(&tweet.status)
83    .bind(&tweet.error_message)
84    .execute(pool)
85    .await
86    .map_err(|e| StorageError::Query { source: e })?;
87
88    Ok(result.last_insert_rowid())
89}
90
91/// Insert a new original tweet. Returns the auto-generated ID.
92pub async fn insert_original_tweet(
93    pool: &DbPool,
94    tweet: &OriginalTweet,
95) -> Result<i64, StorageError> {
96    insert_original_tweet_for(pool, DEFAULT_ACCOUNT_ID, tweet).await
97}
98
99/// Set the `source_node_id` on an existing original tweet for a specific account.
100///
101/// Used by the approval poster to propagate vault provenance after posting.
102pub async fn set_original_tweet_source_node_for(
103    pool: &DbPool,
104    account_id: &str,
105    id: i64,
106    source_node_id: i64,
107) -> Result<(), StorageError> {
108    sqlx::query("UPDATE original_tweets SET source_node_id = ? WHERE id = ? AND account_id = ?")
109        .bind(source_node_id)
110        .bind(id)
111        .bind(account_id)
112        .execute(pool)
113        .await
114        .map_err(|e| StorageError::Query { source: e })?;
115
116    Ok(())
117}
118
119/// Insert an original tweet with provenance for a specific account.
120///
121/// Creates the tweet row and then inserts provenance link rows.
122pub async fn insert_original_tweet_with_provenance_for(
123    pool: &DbPool,
124    account_id: &str,
125    tweet: &OriginalTweet,
126    refs: &[super::provenance::ProvenanceRef],
127) -> Result<i64, StorageError> {
128    let id = insert_original_tweet_for(pool, account_id, tweet).await?;
129
130    if !refs.is_empty() {
131        super::provenance::insert_links_for(pool, account_id, "original_tweet", id, refs).await?;
132    }
133
134    Ok(id)
135}
136
137/// Get original_tweet row ID by tweet_id for a specific account.
138pub async fn get_original_tweet_id_by_tweet_id(
139    pool: &DbPool,
140    account_id: &str,
141    tweet_id: &str,
142) -> Result<Option<i64>, StorageError> {
143    let row: Option<(i64,)> = sqlx::query_as(
144        "SELECT id FROM original_tweets WHERE account_id = ? AND tweet_id = ? LIMIT 1",
145    )
146    .bind(account_id)
147    .bind(tweet_id)
148    .fetch_optional(pool)
149    .await
150    .map_err(|e| StorageError::Query { source: e })?;
151
152    Ok(row.map(|r| r.0))
153}
154
155/// Get the timestamp of the most recent successfully posted original tweet for a specific account.
156pub async fn get_last_original_tweet_time_for(
157    pool: &DbPool,
158    account_id: &str,
159) -> Result<Option<String>, StorageError> {
160    let row: Option<(String,)> = sqlx::query_as(
161        "SELECT created_at FROM original_tweets WHERE account_id = ? AND status = 'sent' \
162         ORDER BY created_at DESC LIMIT 1",
163    )
164    .bind(account_id)
165    .fetch_optional(pool)
166    .await
167    .map_err(|e| StorageError::Query { source: e })?;
168
169    Ok(row.map(|r| r.0))
170}
171
172/// Get the timestamp of the most recent successfully posted original tweet.
173pub async fn get_last_original_tweet_time(pool: &DbPool) -> Result<Option<String>, StorageError> {
174    get_last_original_tweet_time_for(pool, DEFAULT_ACCOUNT_ID).await
175}
176
177/// Count original tweets posted today (UTC) for a specific account.
178pub async fn count_tweets_today_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
179    let row: (i64,) = sqlx::query_as(
180        "SELECT COUNT(*) FROM original_tweets WHERE account_id = ? AND date(created_at) = date('now')",
181    )
182    .bind(account_id)
183    .fetch_one(pool)
184    .await
185    .map_err(|e| StorageError::Query { source: e })?;
186
187    Ok(row.0)
188}
189
190/// Count original tweets posted today (UTC).
191pub async fn count_tweets_today(pool: &DbPool) -> Result<i64, StorageError> {
192    count_tweets_today_for(pool, DEFAULT_ACCOUNT_ID).await
193}
194
195/// Insert a new thread record for a specific account. Returns the auto-generated ID.
196pub async fn insert_thread_for(
197    pool: &DbPool,
198    account_id: &str,
199    thread: &Thread,
200) -> Result<i64, StorageError> {
201    let result = sqlx::query(
202        "INSERT INTO threads (account_id, topic, tweet_count, root_tweet_id, created_at, status) \
203         VALUES (?, ?, ?, ?, ?, ?)",
204    )
205    .bind(account_id)
206    .bind(&thread.topic)
207    .bind(thread.tweet_count)
208    .bind(&thread.root_tweet_id)
209    .bind(&thread.created_at)
210    .bind(&thread.status)
211    .execute(pool)
212    .await
213    .map_err(|e| StorageError::Query { source: e })?;
214
215    Ok(result.last_insert_rowid())
216}
217
218/// Insert a new thread record. Returns the auto-generated ID.
219pub async fn insert_thread(pool: &DbPool, thread: &Thread) -> Result<i64, StorageError> {
220    insert_thread_for(pool, DEFAULT_ACCOUNT_ID, thread).await
221}
222
223/// Insert all tweets for a thread atomically using a transaction for a specific account.
224///
225/// Either all tweets are inserted or none are (rollback on failure).
226pub async fn insert_thread_tweets_for(
227    pool: &DbPool,
228    account_id: &str,
229    thread_id: i64,
230    tweets: &[ThreadTweet],
231) -> Result<(), StorageError> {
232    let mut tx = pool
233        .begin()
234        .await
235        .map_err(|e| StorageError::Connection { source: e })?;
236
237    for tweet in tweets {
238        sqlx::query(
239            "INSERT INTO thread_tweets \
240             (account_id, thread_id, position, tweet_id, content, created_at) \
241             VALUES (?, ?, ?, ?, ?, ?)",
242        )
243        .bind(account_id)
244        .bind(thread_id)
245        .bind(tweet.position)
246        .bind(&tweet.tweet_id)
247        .bind(&tweet.content)
248        .bind(&tweet.created_at)
249        .execute(&mut *tx)
250        .await
251        .map_err(|e| StorageError::Query { source: e })?;
252    }
253
254    tx.commit()
255        .await
256        .map_err(|e| StorageError::Connection { source: e })?;
257
258    Ok(())
259}
260
261/// Insert all tweets for a thread atomically using a transaction.
262///
263/// Either all tweets are inserted or none are (rollback on failure).
264pub async fn insert_thread_tweets(
265    pool: &DbPool,
266    thread_id: i64,
267    tweets: &[ThreadTweet],
268) -> Result<(), StorageError> {
269    insert_thread_tweets_for(pool, DEFAULT_ACCOUNT_ID, thread_id, tweets).await
270}
271
272/// Get the timestamp of the most recent successfully posted thread for a specific account.
273pub async fn get_last_thread_time_for(
274    pool: &DbPool,
275    account_id: &str,
276) -> Result<Option<String>, StorageError> {
277    let row: Option<(String,)> = sqlx::query_as(
278        "SELECT created_at FROM threads WHERE account_id = ? AND status = 'sent' \
279         ORDER BY created_at DESC LIMIT 1",
280    )
281    .bind(account_id)
282    .fetch_optional(pool)
283    .await
284    .map_err(|e| StorageError::Query { source: e })?;
285
286    Ok(row.map(|r| r.0))
287}
288
289/// Get the timestamp of the most recent successfully posted thread.
290pub async fn get_last_thread_time(pool: &DbPool) -> Result<Option<String>, StorageError> {
291    get_last_thread_time_for(pool, DEFAULT_ACCOUNT_ID).await
292}
293
294/// Get the timestamps of all successfully posted original tweets today (UTC) for a specific account.
295pub async fn get_todays_tweet_times_for(
296    pool: &DbPool,
297    account_id: &str,
298) -> Result<Vec<String>, StorageError> {
299    let rows: Vec<(String,)> = sqlx::query_as(
300        "SELECT created_at FROM original_tweets \
301         WHERE account_id = ? AND status = 'sent' AND date(created_at) = date('now') \
302         ORDER BY created_at ASC",
303    )
304    .bind(account_id)
305    .fetch_all(pool)
306    .await
307    .map_err(|e| StorageError::Query { source: e })?;
308
309    Ok(rows.into_iter().map(|r| r.0).collect())
310}
311
312/// Get the timestamps of all successfully posted original tweets today (UTC).
313pub async fn get_todays_tweet_times(pool: &DbPool) -> Result<Vec<String>, StorageError> {
314    get_todays_tweet_times_for(pool, DEFAULT_ACCOUNT_ID).await
315}
316
317/// Count threads posted in the current ISO week (Monday-Sunday, UTC) for a specific account.
318pub async fn count_threads_this_week_for(
319    pool: &DbPool,
320    account_id: &str,
321) -> Result<i64, StorageError> {
322    let row: (i64,) = sqlx::query_as(
323        "SELECT COUNT(*) FROM threads \
324         WHERE account_id = ? AND strftime('%Y-%W', created_at) = strftime('%Y-%W', 'now')",
325    )
326    .bind(account_id)
327    .fetch_one(pool)
328    .await
329    .map_err(|e| StorageError::Query { source: e })?;
330
331    Ok(row.0)
332}
333
334/// Count threads posted in the current ISO week (Monday-Sunday, UTC).
335pub async fn count_threads_this_week(pool: &DbPool) -> Result<i64, StorageError> {
336    count_threads_this_week_for(pool, DEFAULT_ACCOUNT_ID).await
337}
338
339/// Get original tweets within a date range for a specific account, ordered by creation time.
340pub async fn get_tweets_in_range_for(
341    pool: &DbPool,
342    account_id: &str,
343    from: &str,
344    to: &str,
345) -> Result<Vec<OriginalTweet>, StorageError> {
346    sqlx::query_as::<_, OriginalTweet>(
347        "SELECT * FROM original_tweets \
348         WHERE account_id = ? AND created_at BETWEEN ? AND ? \
349         ORDER BY created_at ASC",
350    )
351    .bind(account_id)
352    .bind(from)
353    .bind(to)
354    .fetch_all(pool)
355    .await
356    .map_err(|e| StorageError::Query { source: e })
357}
358
359/// Get original tweets within a date range, ordered by creation time.
360pub async fn get_tweets_in_range(
361    pool: &DbPool,
362    from: &str,
363    to: &str,
364) -> Result<Vec<OriginalTweet>, StorageError> {
365    get_tweets_in_range_for(pool, DEFAULT_ACCOUNT_ID, from, to).await
366}
367
368/// Get threads within a date range for a specific account, ordered by creation time.
369pub async fn get_threads_in_range_for(
370    pool: &DbPool,
371    account_id: &str,
372    from: &str,
373    to: &str,
374) -> Result<Vec<Thread>, StorageError> {
375    sqlx::query_as::<_, Thread>(
376        "SELECT * FROM threads \
377         WHERE account_id = ? AND created_at BETWEEN ? AND ? \
378         ORDER BY created_at ASC",
379    )
380    .bind(account_id)
381    .bind(from)
382    .bind(to)
383    .fetch_all(pool)
384    .await
385    .map_err(|e| StorageError::Query { source: e })
386}
387
388/// Get threads within a date range, ordered by creation time.
389pub async fn get_threads_in_range(
390    pool: &DbPool,
391    from: &str,
392    to: &str,
393) -> Result<Vec<Thread>, StorageError> {
394    get_threads_in_range_for(pool, DEFAULT_ACCOUNT_ID, from, to).await
395}
396
397/// Get the most recent original tweets for a specific account, newest first.
398pub async fn get_recent_original_tweets_for(
399    pool: &DbPool,
400    account_id: &str,
401    limit: u32,
402) -> Result<Vec<OriginalTweet>, StorageError> {
403    sqlx::query_as::<_, OriginalTweet>(
404        "SELECT * FROM original_tweets WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
405    )
406    .bind(account_id)
407    .bind(limit)
408    .fetch_all(pool)
409    .await
410    .map_err(|e| StorageError::Query { source: e })
411}
412
413/// Get the most recent original tweets, newest first.
414pub async fn get_recent_original_tweets(
415    pool: &DbPool,
416    limit: u32,
417) -> Result<Vec<OriginalTweet>, StorageError> {
418    get_recent_original_tweets_for(pool, DEFAULT_ACCOUNT_ID, limit).await
419}
420
421/// Get the most recent threads for a specific account, newest first.
422pub async fn get_recent_threads_for(
423    pool: &DbPool,
424    account_id: &str,
425    limit: u32,
426) -> Result<Vec<Thread>, StorageError> {
427    sqlx::query_as::<_, Thread>(
428        "SELECT * FROM threads WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
429    )
430    .bind(account_id)
431    .bind(limit)
432    .fetch_all(pool)
433    .await
434    .map_err(|e| StorageError::Query { source: e })
435}
436
437/// Get the most recent threads, newest first.
438pub async fn get_recent_threads(pool: &DbPool, limit: u32) -> Result<Vec<Thread>, StorageError> {
439    get_recent_threads_for(pool, DEFAULT_ACCOUNT_ID, limit).await
440}
441
442/// Get child tweet IDs for a thread by root tweet ID (excludes root, position > 0).
443///
444/// Used by Forge sync as a fallback when `child_tweet_ids` is not available
445/// in the frontmatter entry.
446pub async fn get_thread_tweet_ids_by_root_for(
447    pool: &DbPool,
448    account_id: &str,
449    root_tweet_id: &str,
450) -> Result<Vec<String>, StorageError> {
451    let rows: Vec<(String,)> = sqlx::query_as(
452        "SELECT tt.tweet_id FROM thread_tweets tt \
453         JOIN threads t ON tt.thread_id = t.id \
454         WHERE t.account_id = ? AND t.root_tweet_id = ? AND tt.position > 0 \
455         ORDER BY tt.position ASC",
456    )
457    .bind(account_id)
458    .bind(root_tweet_id)
459    .fetch_all(pool)
460    .await
461    .map_err(|e| StorageError::Query { source: e })?;
462
463    Ok(rows
464        .into_iter()
465        .filter_map(|r| if r.0.is_empty() { None } else { Some(r.0) })
466        .collect())
467}
468
469/// Persist thread records atomically: one `threads` row, N `thread_tweets` rows,
470/// and one `original_tweets` row for the root tweet.
471///
472/// Returns `(thread_id, original_tweet_id)` for provenance linking.
473///
474/// `tweet_ids` must have root at index 0 and children at 1..N.
475/// `tweet_contents` must be parallel to `tweet_ids`.
476pub async fn persist_thread_records(
477    pool: &DbPool,
478    account_id: &str,
479    topic: &str,
480    tweet_ids: &[String],
481    tweet_contents: &[String],
482    status: &str,
483) -> Result<(i64, i64), StorageError> {
484    let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
485    let root_tweet_id = tweet_ids.first().map(|s| s.as_str()).unwrap_or("");
486
487    // 1. Insert thread row.
488    let thread = Thread {
489        id: 0,
490        topic: topic.to_string(),
491        tweet_count: tweet_ids.len() as i64,
492        root_tweet_id: Some(root_tweet_id.to_string()),
493        created_at: now.clone(),
494        status: status.to_string(),
495    };
496    let thread_id = insert_thread_for(pool, account_id, &thread).await?;
497
498    // 2. Insert thread_tweets rows.
499    let thread_tweets: Vec<ThreadTweet> = tweet_ids
500        .iter()
501        .zip(tweet_contents.iter())
502        .enumerate()
503        .map(|(i, (tid, content))| ThreadTweet {
504            id: 0,
505            thread_id,
506            position: i as i64,
507            tweet_id: Some(tid.clone()),
508            content: content.clone(),
509            created_at: now.clone(),
510        })
511        .collect();
512    insert_thread_tweets_for(pool, account_id, thread_id, &thread_tweets).await?;
513
514    // 3. Insert original_tweets row for root tweet (analytics compatibility).
515    let ot = OriginalTweet {
516        id: 0,
517        tweet_id: Some(root_tweet_id.to_string()),
518        content: tweet_contents.first().cloned().unwrap_or_default(),
519        topic: if topic.is_empty() {
520            None
521        } else {
522            Some(topic.to_string())
523        },
524        llm_provider: None,
525        created_at: now,
526        status: if status == "partial" {
527            "sent".to_string()
528        } else {
529            status.to_string()
530        },
531        error_message: None,
532    };
533    let ot_id = insert_original_tweet_for(pool, account_id, &ot).await?;
534
535    Ok((thread_id, ot_id))
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use crate::storage::init_test_db;
542
543    fn now_iso() -> String {
544        chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
545    }
546
547    fn sample_original_tweet() -> OriginalTweet {
548        OriginalTweet {
549            id: 0,
550            tweet_id: Some("ot_123".to_string()),
551            content: "Educational tweet about Rust".to_string(),
552            topic: Some("rust".to_string()),
553            llm_provider: Some("openai".to_string()),
554            created_at: now_iso(),
555            status: "sent".to_string(),
556            error_message: None,
557        }
558    }
559
560    fn sample_thread() -> Thread {
561        Thread {
562            id: 0,
563            topic: "Rust async patterns".to_string(),
564            tweet_count: 3,
565            root_tweet_id: Some("root_456".to_string()),
566            created_at: now_iso(),
567            status: "sent".to_string(),
568        }
569    }
570
571    fn sample_thread_tweets(thread_id: i64) -> Vec<ThreadTweet> {
572        (0..3)
573            .map(|i| ThreadTweet {
574                id: 0,
575                thread_id,
576                position: i,
577                tweet_id: Some(format!("tt_{i}")),
578                content: format!("Thread tweet {i}"),
579                created_at: now_iso(),
580            })
581            .collect()
582    }
583
584    #[tokio::test]
585    async fn insert_and_query_original_tweet() {
586        let pool = init_test_db().await.expect("init db");
587        let tweet = sample_original_tweet();
588
589        let id = insert_original_tweet(&pool, &tweet).await.expect("insert");
590        assert!(id > 0);
591
592        let time = get_last_original_tweet_time(&pool).await.expect("get time");
593        assert!(time.is_some());
594    }
595
596    #[tokio::test]
597    async fn count_tweets_today_works() {
598        let pool = init_test_db().await.expect("init db");
599        let tweet = sample_original_tweet();
600
601        insert_original_tweet(&pool, &tweet).await.expect("insert");
602        let count = count_tweets_today(&pool).await.expect("count");
603        assert_eq!(count, 1);
604    }
605
606    #[tokio::test]
607    async fn insert_thread_with_tweets() {
608        let pool = init_test_db().await.expect("init db");
609        let thread = sample_thread();
610
611        let thread_id = insert_thread(&pool, &thread).await.expect("insert thread");
612        let tweets = sample_thread_tweets(thread_id);
613        insert_thread_tweets(&pool, thread_id, &tweets)
614            .await
615            .expect("insert tweets");
616
617        // Verify all tweets were inserted
618        let rows: Vec<(i64,)> = sqlx::query_as(
619            "SELECT position FROM thread_tweets WHERE thread_id = ? ORDER BY position",
620        )
621        .bind(thread_id)
622        .fetch_all(&pool)
623        .await
624        .expect("query");
625
626        assert_eq!(rows.len(), 3);
627        assert_eq!(rows[0].0, 0);
628        assert_eq!(rows[1].0, 1);
629        assert_eq!(rows[2].0, 2);
630    }
631
632    #[tokio::test]
633    async fn thread_tweet_duplicate_position_fails() {
634        let pool = init_test_db().await.expect("init db");
635        let thread = sample_thread();
636
637        let thread_id = insert_thread(&pool, &thread).await.expect("insert thread");
638
639        // Two tweets with same position should fail the UNIQUE constraint
640        let duplicate_tweets = vec![
641            ThreadTweet {
642                id: 0,
643                thread_id,
644                position: 0,
645                tweet_id: Some("a".to_string()),
646                content: "First".to_string(),
647                created_at: now_iso(),
648            },
649            ThreadTweet {
650                id: 0,
651                thread_id,
652                position: 0, // duplicate position
653                tweet_id: Some("b".to_string()),
654                content: "Second".to_string(),
655                created_at: now_iso(),
656            },
657        ];
658
659        let result = insert_thread_tweets(&pool, thread_id, &duplicate_tweets).await;
660        assert!(result.is_err());
661
662        // Verify transaction rolled back (no partial data)
663        let rows: Vec<(i64,)> =
664            sqlx::query_as("SELECT COUNT(*) FROM thread_tweets WHERE thread_id = ?")
665                .bind(thread_id)
666                .fetch_all(&pool)
667                .await
668                .expect("query");
669
670        assert_eq!(rows[0].0, 0, "transaction should have rolled back");
671    }
672
673    #[tokio::test]
674    async fn count_threads_this_week_works() {
675        let pool = init_test_db().await.expect("init db");
676        let thread = sample_thread();
677
678        insert_thread(&pool, &thread).await.expect("insert");
679        let count = count_threads_this_week(&pool).await.expect("count");
680        assert_eq!(count, 1);
681    }
682
683    #[tokio::test]
684    async fn last_thread_time_empty() {
685        let pool = init_test_db().await.expect("init db");
686        let time = get_last_thread_time(&pool).await.expect("get time");
687        assert!(time.is_none());
688    }
689
690    #[tokio::test]
691    async fn get_tweets_in_range_filters() {
692        let pool = init_test_db().await.expect("init db");
693
694        let mut tweet = sample_original_tweet();
695        tweet.created_at = "2026-02-20T10:00:00Z".to_string();
696        insert_original_tweet(&pool, &tweet).await.expect("insert");
697
698        let mut tweet2 = sample_original_tweet();
699        tweet2.created_at = "2026-02-25T10:00:00Z".to_string();
700        tweet2.tweet_id = Some("ot_456".to_string());
701        insert_original_tweet(&pool, &tweet2).await.expect("insert");
702
703        let in_range = get_tweets_in_range(&pool, "2026-02-19T00:00:00Z", "2026-02-21T00:00:00Z")
704            .await
705            .expect("range");
706        assert_eq!(in_range.len(), 1);
707        assert_eq!(in_range[0].tweet_id, Some("ot_123".to_string()));
708
709        let all = get_tweets_in_range(&pool, "2026-02-01T00:00:00Z", "2026-02-28T00:00:00Z")
710            .await
711            .expect("range");
712        assert_eq!(all.len(), 2);
713    }
714
715    #[tokio::test]
716    async fn get_recent_original_tweets_returns_newest_first() {
717        let pool = init_test_db().await.expect("init db");
718
719        let mut tweet1 = sample_original_tweet();
720        tweet1.created_at = "2026-02-20T10:00:00Z".to_string();
721        tweet1.tweet_id = Some("ot_1".to_string());
722        insert_original_tweet(&pool, &tweet1).await.expect("insert");
723
724        let mut tweet2 = sample_original_tweet();
725        tweet2.created_at = "2026-02-21T10:00:00Z".to_string();
726        tweet2.tweet_id = Some("ot_2".to_string());
727        insert_original_tweet(&pool, &tweet2).await.expect("insert");
728
729        let mut tweet3 = sample_original_tweet();
730        tweet3.created_at = "2026-02-22T10:00:00Z".to_string();
731        tweet3.tweet_id = Some("ot_3".to_string());
732        insert_original_tweet(&pool, &tweet3).await.expect("insert");
733
734        let recent = get_recent_original_tweets(&pool, 2).await.expect("recent");
735        assert_eq!(recent.len(), 2);
736        assert_eq!(recent[0].tweet_id, Some("ot_3".to_string()));
737        assert_eq!(recent[1].tweet_id, Some("ot_2".to_string()));
738    }
739
740    #[tokio::test]
741    async fn get_recent_original_tweets_empty() {
742        let pool = init_test_db().await.expect("init db");
743        let recent = get_recent_original_tweets(&pool, 10).await.expect("recent");
744        assert!(recent.is_empty());
745    }
746
747    #[tokio::test]
748    async fn get_recent_threads_returns_newest_first() {
749        let pool = init_test_db().await.expect("init db");
750
751        let mut thread1 = sample_thread();
752        thread1.created_at = "2026-02-20T10:00:00Z".to_string();
753        insert_thread(&pool, &thread1).await.expect("insert");
754
755        let mut thread2 = sample_thread();
756        thread2.created_at = "2026-02-21T10:00:00Z".to_string();
757        insert_thread(&pool, &thread2).await.expect("insert");
758
759        let recent = get_recent_threads(&pool, 1).await.expect("recent");
760        assert_eq!(recent.len(), 1);
761        assert_eq!(recent[0].created_at, "2026-02-21T10:00:00Z");
762    }
763
764    #[tokio::test]
765    async fn get_recent_threads_empty() {
766        let pool = init_test_db().await.expect("init db");
767        let recent = get_recent_threads(&pool, 10).await.expect("recent");
768        assert!(recent.is_empty());
769    }
770
771    #[tokio::test]
772    async fn get_todays_tweet_times_returns_today_only() {
773        let pool = init_test_db().await.expect("init db");
774
775        // Insert a tweet with today's date
776        let tweet = sample_original_tweet();
777        insert_original_tweet(&pool, &tweet).await.expect("insert");
778
779        // Insert a tweet from a different day
780        let mut old_tweet = sample_original_tweet();
781        old_tweet.created_at = "2020-01-01T10:00:00Z".to_string();
782        old_tweet.tweet_id = Some("ot_old".to_string());
783        insert_original_tweet(&pool, &old_tweet)
784            .await
785            .expect("insert old");
786
787        let times = get_todays_tweet_times(&pool).await.expect("times");
788        // Should only include today's tweet
789        assert_eq!(times.len(), 1);
790    }
791
792    #[tokio::test]
793    async fn get_last_thread_time_returns_latest() {
794        let pool = init_test_db().await.expect("init db");
795
796        let mut thread1 = sample_thread();
797        thread1.created_at = "2026-02-20T10:00:00Z".to_string();
798        insert_thread(&pool, &thread1).await.expect("insert");
799
800        let mut thread2 = sample_thread();
801        thread2.created_at = "2026-02-22T10:00:00Z".to_string();
802        insert_thread(&pool, &thread2).await.expect("insert");
803
804        let time = get_last_thread_time(&pool).await.expect("get time");
805        assert_eq!(time, Some("2026-02-22T10:00:00Z".to_string()));
806    }
807
808    #[tokio::test]
809    async fn insert_original_tweet_failed_status() {
810        let pool = init_test_db().await.expect("init db");
811
812        let mut tweet = sample_original_tweet();
813        tweet.status = "failed".to_string();
814        tweet.error_message = Some("API error".to_string());
815        tweet.tweet_id = None;
816
817        let id = insert_original_tweet(&pool, &tweet).await.expect("insert");
818        assert!(id > 0);
819
820        // Failed tweets should NOT appear in last original tweet time (status != 'sent')
821        let time = get_last_original_tweet_time(&pool).await.expect("get time");
822        assert!(time.is_none());
823    }
824
825    #[tokio::test]
826    async fn get_threads_in_range_filters() {
827        let pool = init_test_db().await.expect("init db");
828
829        let mut thread = sample_thread();
830        thread.created_at = "2026-02-20T10:00:00Z".to_string();
831        insert_thread(&pool, &thread).await.expect("insert");
832
833        let mut thread2 = sample_thread();
834        thread2.created_at = "2026-02-25T10:00:00Z".to_string();
835        insert_thread(&pool, &thread2).await.expect("insert");
836
837        let in_range = get_threads_in_range(&pool, "2026-02-19T00:00:00Z", "2026-02-21T00:00:00Z")
838            .await
839            .expect("range");
840        assert_eq!(in_range.len(), 1);
841
842        let all = get_threads_in_range(&pool, "2026-02-01T00:00:00Z", "2026-02-28T00:00:00Z")
843            .await
844            .expect("range");
845        assert_eq!(all.len(), 2);
846    }
847
848    #[tokio::test]
849    async fn get_thread_tweet_ids_by_root_excludes_root() {
850        let pool = init_test_db().await.expect("init db");
851        let account_id = DEFAULT_ACCOUNT_ID;
852
853        let thread = sample_thread(); // root_tweet_id = "root_456"
854        let thread_id = insert_thread_for(&pool, account_id, &thread)
855            .await
856            .expect("insert thread");
857
858        // Position 0 = root, positions 1-2 = children
859        let tweets = vec![
860            ThreadTweet {
861                id: 0,
862                thread_id,
863                position: 0,
864                tweet_id: Some("root_456".to_string()),
865                content: "Root tweet".to_string(),
866                created_at: now_iso(),
867            },
868            ThreadTweet {
869                id: 0,
870                thread_id,
871                position: 1,
872                tweet_id: Some("child_1".to_string()),
873                content: "Child 1".to_string(),
874                created_at: now_iso(),
875            },
876            ThreadTweet {
877                id: 0,
878                thread_id,
879                position: 2,
880                tweet_id: Some("child_2".to_string()),
881                content: "Child 2".to_string(),
882                created_at: now_iso(),
883            },
884        ];
885        insert_thread_tweets_for(&pool, account_id, thread_id, &tweets)
886            .await
887            .expect("insert tweets");
888
889        let child_ids = get_thread_tweet_ids_by_root_for(&pool, account_id, "root_456")
890            .await
891            .expect("query");
892
893        assert_eq!(child_ids.len(), 2);
894        assert_eq!(child_ids[0], "child_1");
895        assert_eq!(child_ids[1], "child_2");
896    }
897
898    #[tokio::test]
899    async fn get_thread_tweet_ids_by_root_empty_when_no_children() {
900        let pool = init_test_db().await.expect("init db");
901        let account_id = DEFAULT_ACCOUNT_ID;
902
903        let ids = get_thread_tweet_ids_by_root_for(&pool, account_id, "nonexistent_root")
904            .await
905            .expect("query");
906        assert!(ids.is_empty());
907    }
908
909    #[tokio::test]
910    async fn persist_thread_records_creates_all_rows() {
911        let pool = init_test_db().await.expect("init db");
912        let account_id = DEFAULT_ACCOUNT_ID;
913
914        let tweet_ids = vec![
915            "root_t1".to_string(),
916            "child_t2".to_string(),
917            "child_t3".to_string(),
918        ];
919        let tweet_contents = vec![
920            "Root content".to_string(),
921            "Child 2 content".to_string(),
922            "Child 3 content".to_string(),
923        ];
924
925        let (thread_id, ot_id) = persist_thread_records(
926            &pool,
927            account_id,
928            "test topic",
929            &tweet_ids,
930            &tweet_contents,
931            "sent",
932        )
933        .await
934        .expect("persist");
935
936        assert!(thread_id > 0);
937        assert!(ot_id > 0);
938
939        // Verify thread row
940        let threads: Vec<(String, i64)> =
941            sqlx::query_as("SELECT root_tweet_id, tweet_count FROM threads WHERE id = ?")
942                .bind(thread_id)
943                .fetch_all(&pool)
944                .await
945                .expect("query threads");
946        assert_eq!(threads.len(), 1);
947        assert_eq!(threads[0].0, "root_t1");
948        assert_eq!(threads[0].1, 3);
949
950        // Verify thread_tweets rows
951        let tt_count: (i64,) =
952            sqlx::query_as("SELECT COUNT(*) FROM thread_tweets WHERE thread_id = ?")
953                .bind(thread_id)
954                .fetch_one(&pool)
955                .await
956                .expect("count");
957        assert_eq!(tt_count.0, 3);
958
959        // Verify original_tweets row for root
960        let ot: Vec<(Option<String>,)> =
961            sqlx::query_as("SELECT tweet_id FROM original_tweets WHERE id = ?")
962                .bind(ot_id)
963                .fetch_all(&pool)
964                .await
965                .expect("query ot");
966        assert_eq!(ot.len(), 1);
967        assert_eq!(ot[0].0.as_deref(), Some("root_t1"));
968
969        // Verify child IDs via the query helper
970        let children = get_thread_tweet_ids_by_root_for(&pool, account_id, "root_t1")
971            .await
972            .expect("children");
973        assert_eq!(children, vec!["child_t2", "child_t3"]);
974    }
975
976    #[tokio::test]
977    async fn persist_thread_records_partial_status() {
978        let pool = init_test_db().await.expect("init db");
979        let account_id = DEFAULT_ACCOUNT_ID;
980
981        // Simulate a 4-tweet thread where only 2 posted
982        let tweet_ids = vec!["partial_root".to_string(), "partial_child".to_string()];
983        let tweet_contents = vec!["Root".to_string(), "Child".to_string()];
984
985        let (thread_id, _ot_id) = persist_thread_records(
986            &pool,
987            account_id,
988            "partial topic",
989            &tweet_ids,
990            &tweet_contents,
991            "partial",
992        )
993        .await
994        .expect("persist partial");
995
996        let status: (String,) = sqlx::query_as("SELECT status FROM threads WHERE id = ?")
997            .bind(thread_id)
998            .fetch_one(&pool)
999            .await
1000            .expect("query");
1001        assert_eq!(status.0, "partial");
1002
1003        // OT status should be "sent" even for partial threads (root was posted)
1004        let ot_status: (String,) =
1005            sqlx::query_as("SELECT status FROM original_tweets WHERE tweet_id = ?")
1006                .bind("partial_root")
1007                .fetch_one(&pool)
1008                .await
1009                .expect("query ot");
1010        assert_eq!(ot_status.0, "sent");
1011    }
1012}