1use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9
10#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
12pub struct OriginalTweet {
13 pub id: i64,
15 pub tweet_id: Option<String>,
17 pub content: String,
19 pub topic: Option<String>,
21 pub llm_provider: Option<String>,
23 pub created_at: String,
25 pub status: String,
27 pub error_message: Option<String>,
29}
30
31#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
33pub struct Thread {
34 pub id: i64,
36 pub topic: String,
38 pub tweet_count: i64,
40 pub root_tweet_id: Option<String>,
42 pub created_at: String,
44 pub status: String,
46}
47
48#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
50pub struct ThreadTweet {
51 pub id: i64,
53 pub thread_id: i64,
55 pub position: i64,
57 pub tweet_id: Option<String>,
59 pub content: String,
61 pub created_at: String,
63}
64
65pub async fn insert_original_tweet_for(
67 pool: &DbPool,
68 account_id: &str,
69 tweet: &OriginalTweet,
70) -> Result<i64, StorageError> {
71 let result = sqlx::query(
72 "INSERT INTO original_tweets \
73 (account_id, tweet_id, content, topic, llm_provider, created_at, status, error_message) \
74 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
75 )
76 .bind(account_id)
77 .bind(&tweet.tweet_id)
78 .bind(&tweet.content)
79 .bind(&tweet.topic)
80 .bind(&tweet.llm_provider)
81 .bind(&tweet.created_at)
82 .bind(&tweet.status)
83 .bind(&tweet.error_message)
84 .execute(pool)
85 .await
86 .map_err(|e| StorageError::Query { source: e })?;
87
88 Ok(result.last_insert_rowid())
89}
90
91pub async fn insert_original_tweet(
93 pool: &DbPool,
94 tweet: &OriginalTweet,
95) -> Result<i64, StorageError> {
96 insert_original_tweet_for(pool, DEFAULT_ACCOUNT_ID, tweet).await
97}
98
99pub async fn set_original_tweet_source_node_for(
103 pool: &DbPool,
104 account_id: &str,
105 id: i64,
106 source_node_id: i64,
107) -> Result<(), StorageError> {
108 sqlx::query("UPDATE original_tweets SET source_node_id = ? WHERE id = ? AND account_id = ?")
109 .bind(source_node_id)
110 .bind(id)
111 .bind(account_id)
112 .execute(pool)
113 .await
114 .map_err(|e| StorageError::Query { source: e })?;
115
116 Ok(())
117}
118
119pub async fn insert_original_tweet_with_provenance_for(
123 pool: &DbPool,
124 account_id: &str,
125 tweet: &OriginalTweet,
126 refs: &[super::provenance::ProvenanceRef],
127) -> Result<i64, StorageError> {
128 let id = insert_original_tweet_for(pool, account_id, tweet).await?;
129
130 if !refs.is_empty() {
131 super::provenance::insert_links_for(pool, account_id, "original_tweet", id, refs).await?;
132 }
133
134 Ok(id)
135}
136
137pub async fn get_original_tweet_id_by_tweet_id(
139 pool: &DbPool,
140 account_id: &str,
141 tweet_id: &str,
142) -> Result<Option<i64>, StorageError> {
143 let row: Option<(i64,)> = sqlx::query_as(
144 "SELECT id FROM original_tweets WHERE account_id = ? AND tweet_id = ? LIMIT 1",
145 )
146 .bind(account_id)
147 .bind(tweet_id)
148 .fetch_optional(pool)
149 .await
150 .map_err(|e| StorageError::Query { source: e })?;
151
152 Ok(row.map(|r| r.0))
153}
154
155pub async fn get_last_original_tweet_time_for(
157 pool: &DbPool,
158 account_id: &str,
159) -> Result<Option<String>, StorageError> {
160 let row: Option<(String,)> = sqlx::query_as(
161 "SELECT created_at FROM original_tweets WHERE account_id = ? AND status = 'sent' \
162 ORDER BY created_at DESC LIMIT 1",
163 )
164 .bind(account_id)
165 .fetch_optional(pool)
166 .await
167 .map_err(|e| StorageError::Query { source: e })?;
168
169 Ok(row.map(|r| r.0))
170}
171
172pub async fn get_last_original_tweet_time(pool: &DbPool) -> Result<Option<String>, StorageError> {
174 get_last_original_tweet_time_for(pool, DEFAULT_ACCOUNT_ID).await
175}
176
177pub async fn count_tweets_today_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
179 let row: (i64,) = sqlx::query_as(
180 "SELECT COUNT(*) FROM original_tweets WHERE account_id = ? AND date(created_at) = date('now')",
181 )
182 .bind(account_id)
183 .fetch_one(pool)
184 .await
185 .map_err(|e| StorageError::Query { source: e })?;
186
187 Ok(row.0)
188}
189
190pub async fn count_tweets_today(pool: &DbPool) -> Result<i64, StorageError> {
192 count_tweets_today_for(pool, DEFAULT_ACCOUNT_ID).await
193}
194
195pub async fn insert_thread_for(
197 pool: &DbPool,
198 account_id: &str,
199 thread: &Thread,
200) -> Result<i64, StorageError> {
201 let result = sqlx::query(
202 "INSERT INTO threads (account_id, topic, tweet_count, root_tweet_id, created_at, status) \
203 VALUES (?, ?, ?, ?, ?, ?)",
204 )
205 .bind(account_id)
206 .bind(&thread.topic)
207 .bind(thread.tweet_count)
208 .bind(&thread.root_tweet_id)
209 .bind(&thread.created_at)
210 .bind(&thread.status)
211 .execute(pool)
212 .await
213 .map_err(|e| StorageError::Query { source: e })?;
214
215 Ok(result.last_insert_rowid())
216}
217
218pub async fn insert_thread(pool: &DbPool, thread: &Thread) -> Result<i64, StorageError> {
220 insert_thread_for(pool, DEFAULT_ACCOUNT_ID, thread).await
221}
222
223pub async fn insert_thread_tweets_for(
227 pool: &DbPool,
228 account_id: &str,
229 thread_id: i64,
230 tweets: &[ThreadTweet],
231) -> Result<(), StorageError> {
232 let mut tx = pool
233 .begin()
234 .await
235 .map_err(|e| StorageError::Connection { source: e })?;
236
237 for tweet in tweets {
238 sqlx::query(
239 "INSERT INTO thread_tweets \
240 (account_id, thread_id, position, tweet_id, content, created_at) \
241 VALUES (?, ?, ?, ?, ?, ?)",
242 )
243 .bind(account_id)
244 .bind(thread_id)
245 .bind(tweet.position)
246 .bind(&tweet.tweet_id)
247 .bind(&tweet.content)
248 .bind(&tweet.created_at)
249 .execute(&mut *tx)
250 .await
251 .map_err(|e| StorageError::Query { source: e })?;
252 }
253
254 tx.commit()
255 .await
256 .map_err(|e| StorageError::Connection { source: e })?;
257
258 Ok(())
259}
260
261pub async fn insert_thread_tweets(
265 pool: &DbPool,
266 thread_id: i64,
267 tweets: &[ThreadTweet],
268) -> Result<(), StorageError> {
269 insert_thread_tweets_for(pool, DEFAULT_ACCOUNT_ID, thread_id, tweets).await
270}
271
272pub async fn get_last_thread_time_for(
274 pool: &DbPool,
275 account_id: &str,
276) -> Result<Option<String>, StorageError> {
277 let row: Option<(String,)> = sqlx::query_as(
278 "SELECT created_at FROM threads WHERE account_id = ? AND status = 'sent' \
279 ORDER BY created_at DESC LIMIT 1",
280 )
281 .bind(account_id)
282 .fetch_optional(pool)
283 .await
284 .map_err(|e| StorageError::Query { source: e })?;
285
286 Ok(row.map(|r| r.0))
287}
288
289pub async fn get_last_thread_time(pool: &DbPool) -> Result<Option<String>, StorageError> {
291 get_last_thread_time_for(pool, DEFAULT_ACCOUNT_ID).await
292}
293
294pub async fn get_todays_tweet_times_for(
296 pool: &DbPool,
297 account_id: &str,
298) -> Result<Vec<String>, StorageError> {
299 let rows: Vec<(String,)> = sqlx::query_as(
300 "SELECT created_at FROM original_tweets \
301 WHERE account_id = ? AND status = 'sent' AND date(created_at) = date('now') \
302 ORDER BY created_at ASC",
303 )
304 .bind(account_id)
305 .fetch_all(pool)
306 .await
307 .map_err(|e| StorageError::Query { source: e })?;
308
309 Ok(rows.into_iter().map(|r| r.0).collect())
310}
311
312pub async fn get_todays_tweet_times(pool: &DbPool) -> Result<Vec<String>, StorageError> {
314 get_todays_tweet_times_for(pool, DEFAULT_ACCOUNT_ID).await
315}
316
317pub async fn count_threads_this_week_for(
319 pool: &DbPool,
320 account_id: &str,
321) -> Result<i64, StorageError> {
322 let row: (i64,) = sqlx::query_as(
323 "SELECT COUNT(*) FROM threads \
324 WHERE account_id = ? AND strftime('%Y-%W', created_at) = strftime('%Y-%W', 'now')",
325 )
326 .bind(account_id)
327 .fetch_one(pool)
328 .await
329 .map_err(|e| StorageError::Query { source: e })?;
330
331 Ok(row.0)
332}
333
334pub async fn count_threads_this_week(pool: &DbPool) -> Result<i64, StorageError> {
336 count_threads_this_week_for(pool, DEFAULT_ACCOUNT_ID).await
337}
338
339pub async fn get_tweets_in_range_for(
341 pool: &DbPool,
342 account_id: &str,
343 from: &str,
344 to: &str,
345) -> Result<Vec<OriginalTweet>, StorageError> {
346 sqlx::query_as::<_, OriginalTweet>(
347 "SELECT * FROM original_tweets \
348 WHERE account_id = ? AND created_at BETWEEN ? AND ? \
349 ORDER BY created_at ASC",
350 )
351 .bind(account_id)
352 .bind(from)
353 .bind(to)
354 .fetch_all(pool)
355 .await
356 .map_err(|e| StorageError::Query { source: e })
357}
358
359pub async fn get_tweets_in_range(
361 pool: &DbPool,
362 from: &str,
363 to: &str,
364) -> Result<Vec<OriginalTweet>, StorageError> {
365 get_tweets_in_range_for(pool, DEFAULT_ACCOUNT_ID, from, to).await
366}
367
368pub async fn get_threads_in_range_for(
370 pool: &DbPool,
371 account_id: &str,
372 from: &str,
373 to: &str,
374) -> Result<Vec<Thread>, StorageError> {
375 sqlx::query_as::<_, Thread>(
376 "SELECT * FROM threads \
377 WHERE account_id = ? AND created_at BETWEEN ? AND ? \
378 ORDER BY created_at ASC",
379 )
380 .bind(account_id)
381 .bind(from)
382 .bind(to)
383 .fetch_all(pool)
384 .await
385 .map_err(|e| StorageError::Query { source: e })
386}
387
388pub async fn get_threads_in_range(
390 pool: &DbPool,
391 from: &str,
392 to: &str,
393) -> Result<Vec<Thread>, StorageError> {
394 get_threads_in_range_for(pool, DEFAULT_ACCOUNT_ID, from, to).await
395}
396
397pub async fn get_recent_original_tweets_for(
399 pool: &DbPool,
400 account_id: &str,
401 limit: u32,
402) -> Result<Vec<OriginalTweet>, StorageError> {
403 sqlx::query_as::<_, OriginalTweet>(
404 "SELECT * FROM original_tweets WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
405 )
406 .bind(account_id)
407 .bind(limit)
408 .fetch_all(pool)
409 .await
410 .map_err(|e| StorageError::Query { source: e })
411}
412
413pub async fn get_recent_original_tweets(
415 pool: &DbPool,
416 limit: u32,
417) -> Result<Vec<OriginalTweet>, StorageError> {
418 get_recent_original_tweets_for(pool, DEFAULT_ACCOUNT_ID, limit).await
419}
420
421pub async fn get_recent_threads_for(
423 pool: &DbPool,
424 account_id: &str,
425 limit: u32,
426) -> Result<Vec<Thread>, StorageError> {
427 sqlx::query_as::<_, Thread>(
428 "SELECT * FROM threads WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
429 )
430 .bind(account_id)
431 .bind(limit)
432 .fetch_all(pool)
433 .await
434 .map_err(|e| StorageError::Query { source: e })
435}
436
437pub async fn get_recent_threads(pool: &DbPool, limit: u32) -> Result<Vec<Thread>, StorageError> {
439 get_recent_threads_for(pool, DEFAULT_ACCOUNT_ID, limit).await
440}
441
442pub async fn get_thread_tweet_ids_by_root_for(
447 pool: &DbPool,
448 account_id: &str,
449 root_tweet_id: &str,
450) -> Result<Vec<String>, StorageError> {
451 let rows: Vec<(String,)> = sqlx::query_as(
452 "SELECT tt.tweet_id FROM thread_tweets tt \
453 JOIN threads t ON tt.thread_id = t.id \
454 WHERE t.account_id = ? AND t.root_tweet_id = ? AND tt.position > 0 \
455 ORDER BY tt.position ASC",
456 )
457 .bind(account_id)
458 .bind(root_tweet_id)
459 .fetch_all(pool)
460 .await
461 .map_err(|e| StorageError::Query { source: e })?;
462
463 Ok(rows
464 .into_iter()
465 .filter_map(|r| if r.0.is_empty() { None } else { Some(r.0) })
466 .collect())
467}
468
469pub async fn persist_thread_records(
477 pool: &DbPool,
478 account_id: &str,
479 topic: &str,
480 tweet_ids: &[String],
481 tweet_contents: &[String],
482 status: &str,
483) -> Result<(i64, i64), StorageError> {
484 let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
485 let root_tweet_id = tweet_ids.first().map(|s| s.as_str()).unwrap_or("");
486
487 let thread = Thread {
489 id: 0,
490 topic: topic.to_string(),
491 tweet_count: tweet_ids.len() as i64,
492 root_tweet_id: Some(root_tweet_id.to_string()),
493 created_at: now.clone(),
494 status: status.to_string(),
495 };
496 let thread_id = insert_thread_for(pool, account_id, &thread).await?;
497
498 let thread_tweets: Vec<ThreadTweet> = tweet_ids
500 .iter()
501 .zip(tweet_contents.iter())
502 .enumerate()
503 .map(|(i, (tid, content))| ThreadTweet {
504 id: 0,
505 thread_id,
506 position: i as i64,
507 tweet_id: Some(tid.clone()),
508 content: content.clone(),
509 created_at: now.clone(),
510 })
511 .collect();
512 insert_thread_tweets_for(pool, account_id, thread_id, &thread_tweets).await?;
513
514 let ot = OriginalTweet {
516 id: 0,
517 tweet_id: Some(root_tweet_id.to_string()),
518 content: tweet_contents.first().cloned().unwrap_or_default(),
519 topic: if topic.is_empty() {
520 None
521 } else {
522 Some(topic.to_string())
523 },
524 llm_provider: None,
525 created_at: now,
526 status: if status == "partial" {
527 "sent".to_string()
528 } else {
529 status.to_string()
530 },
531 error_message: None,
532 };
533 let ot_id = insert_original_tweet_for(pool, account_id, &ot).await?;
534
535 Ok((thread_id, ot_id))
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use crate::storage::init_test_db;
542
543 fn now_iso() -> String {
544 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
545 }
546
547 fn sample_original_tweet() -> OriginalTweet {
548 OriginalTweet {
549 id: 0,
550 tweet_id: Some("ot_123".to_string()),
551 content: "Educational tweet about Rust".to_string(),
552 topic: Some("rust".to_string()),
553 llm_provider: Some("openai".to_string()),
554 created_at: now_iso(),
555 status: "sent".to_string(),
556 error_message: None,
557 }
558 }
559
560 fn sample_thread() -> Thread {
561 Thread {
562 id: 0,
563 topic: "Rust async patterns".to_string(),
564 tweet_count: 3,
565 root_tweet_id: Some("root_456".to_string()),
566 created_at: now_iso(),
567 status: "sent".to_string(),
568 }
569 }
570
571 fn sample_thread_tweets(thread_id: i64) -> Vec<ThreadTweet> {
572 (0..3)
573 .map(|i| ThreadTweet {
574 id: 0,
575 thread_id,
576 position: i,
577 tweet_id: Some(format!("tt_{i}")),
578 content: format!("Thread tweet {i}"),
579 created_at: now_iso(),
580 })
581 .collect()
582 }
583
584 #[tokio::test]
585 async fn insert_and_query_original_tweet() {
586 let pool = init_test_db().await.expect("init db");
587 let tweet = sample_original_tweet();
588
589 let id = insert_original_tweet(&pool, &tweet).await.expect("insert");
590 assert!(id > 0);
591
592 let time = get_last_original_tweet_time(&pool).await.expect("get time");
593 assert!(time.is_some());
594 }
595
596 #[tokio::test]
597 async fn count_tweets_today_works() {
598 let pool = init_test_db().await.expect("init db");
599 let tweet = sample_original_tweet();
600
601 insert_original_tweet(&pool, &tweet).await.expect("insert");
602 let count = count_tweets_today(&pool).await.expect("count");
603 assert_eq!(count, 1);
604 }
605
606 #[tokio::test]
607 async fn insert_thread_with_tweets() {
608 let pool = init_test_db().await.expect("init db");
609 let thread = sample_thread();
610
611 let thread_id = insert_thread(&pool, &thread).await.expect("insert thread");
612 let tweets = sample_thread_tweets(thread_id);
613 insert_thread_tweets(&pool, thread_id, &tweets)
614 .await
615 .expect("insert tweets");
616
617 let rows: Vec<(i64,)> = sqlx::query_as(
619 "SELECT position FROM thread_tweets WHERE thread_id = ? ORDER BY position",
620 )
621 .bind(thread_id)
622 .fetch_all(&pool)
623 .await
624 .expect("query");
625
626 assert_eq!(rows.len(), 3);
627 assert_eq!(rows[0].0, 0);
628 assert_eq!(rows[1].0, 1);
629 assert_eq!(rows[2].0, 2);
630 }
631
632 #[tokio::test]
633 async fn thread_tweet_duplicate_position_fails() {
634 let pool = init_test_db().await.expect("init db");
635 let thread = sample_thread();
636
637 let thread_id = insert_thread(&pool, &thread).await.expect("insert thread");
638
639 let duplicate_tweets = vec![
641 ThreadTweet {
642 id: 0,
643 thread_id,
644 position: 0,
645 tweet_id: Some("a".to_string()),
646 content: "First".to_string(),
647 created_at: now_iso(),
648 },
649 ThreadTweet {
650 id: 0,
651 thread_id,
652 position: 0, tweet_id: Some("b".to_string()),
654 content: "Second".to_string(),
655 created_at: now_iso(),
656 },
657 ];
658
659 let result = insert_thread_tweets(&pool, thread_id, &duplicate_tweets).await;
660 assert!(result.is_err());
661
662 let rows: Vec<(i64,)> =
664 sqlx::query_as("SELECT COUNT(*) FROM thread_tweets WHERE thread_id = ?")
665 .bind(thread_id)
666 .fetch_all(&pool)
667 .await
668 .expect("query");
669
670 assert_eq!(rows[0].0, 0, "transaction should have rolled back");
671 }
672
673 #[tokio::test]
674 async fn count_threads_this_week_works() {
675 let pool = init_test_db().await.expect("init db");
676 let thread = sample_thread();
677
678 insert_thread(&pool, &thread).await.expect("insert");
679 let count = count_threads_this_week(&pool).await.expect("count");
680 assert_eq!(count, 1);
681 }
682
683 #[tokio::test]
684 async fn last_thread_time_empty() {
685 let pool = init_test_db().await.expect("init db");
686 let time = get_last_thread_time(&pool).await.expect("get time");
687 assert!(time.is_none());
688 }
689
690 #[tokio::test]
691 async fn get_tweets_in_range_filters() {
692 let pool = init_test_db().await.expect("init db");
693
694 let mut tweet = sample_original_tweet();
695 tweet.created_at = "2026-02-20T10:00:00Z".to_string();
696 insert_original_tweet(&pool, &tweet).await.expect("insert");
697
698 let mut tweet2 = sample_original_tweet();
699 tweet2.created_at = "2026-02-25T10:00:00Z".to_string();
700 tweet2.tweet_id = Some("ot_456".to_string());
701 insert_original_tweet(&pool, &tweet2).await.expect("insert");
702
703 let in_range = get_tweets_in_range(&pool, "2026-02-19T00:00:00Z", "2026-02-21T00:00:00Z")
704 .await
705 .expect("range");
706 assert_eq!(in_range.len(), 1);
707 assert_eq!(in_range[0].tweet_id, Some("ot_123".to_string()));
708
709 let all = get_tweets_in_range(&pool, "2026-02-01T00:00:00Z", "2026-02-28T00:00:00Z")
710 .await
711 .expect("range");
712 assert_eq!(all.len(), 2);
713 }
714
715 #[tokio::test]
716 async fn get_recent_original_tweets_returns_newest_first() {
717 let pool = init_test_db().await.expect("init db");
718
719 let mut tweet1 = sample_original_tweet();
720 tweet1.created_at = "2026-02-20T10:00:00Z".to_string();
721 tweet1.tweet_id = Some("ot_1".to_string());
722 insert_original_tweet(&pool, &tweet1).await.expect("insert");
723
724 let mut tweet2 = sample_original_tweet();
725 tweet2.created_at = "2026-02-21T10:00:00Z".to_string();
726 tweet2.tweet_id = Some("ot_2".to_string());
727 insert_original_tweet(&pool, &tweet2).await.expect("insert");
728
729 let mut tweet3 = sample_original_tweet();
730 tweet3.created_at = "2026-02-22T10:00:00Z".to_string();
731 tweet3.tweet_id = Some("ot_3".to_string());
732 insert_original_tweet(&pool, &tweet3).await.expect("insert");
733
734 let recent = get_recent_original_tweets(&pool, 2).await.expect("recent");
735 assert_eq!(recent.len(), 2);
736 assert_eq!(recent[0].tweet_id, Some("ot_3".to_string()));
737 assert_eq!(recent[1].tweet_id, Some("ot_2".to_string()));
738 }
739
740 #[tokio::test]
741 async fn get_recent_original_tweets_empty() {
742 let pool = init_test_db().await.expect("init db");
743 let recent = get_recent_original_tweets(&pool, 10).await.expect("recent");
744 assert!(recent.is_empty());
745 }
746
747 #[tokio::test]
748 async fn get_recent_threads_returns_newest_first() {
749 let pool = init_test_db().await.expect("init db");
750
751 let mut thread1 = sample_thread();
752 thread1.created_at = "2026-02-20T10:00:00Z".to_string();
753 insert_thread(&pool, &thread1).await.expect("insert");
754
755 let mut thread2 = sample_thread();
756 thread2.created_at = "2026-02-21T10:00:00Z".to_string();
757 insert_thread(&pool, &thread2).await.expect("insert");
758
759 let recent = get_recent_threads(&pool, 1).await.expect("recent");
760 assert_eq!(recent.len(), 1);
761 assert_eq!(recent[0].created_at, "2026-02-21T10:00:00Z");
762 }
763
764 #[tokio::test]
765 async fn get_recent_threads_empty() {
766 let pool = init_test_db().await.expect("init db");
767 let recent = get_recent_threads(&pool, 10).await.expect("recent");
768 assert!(recent.is_empty());
769 }
770
771 #[tokio::test]
772 async fn get_todays_tweet_times_returns_today_only() {
773 let pool = init_test_db().await.expect("init db");
774
775 let tweet = sample_original_tweet();
777 insert_original_tweet(&pool, &tweet).await.expect("insert");
778
779 let mut old_tweet = sample_original_tweet();
781 old_tweet.created_at = "2020-01-01T10:00:00Z".to_string();
782 old_tweet.tweet_id = Some("ot_old".to_string());
783 insert_original_tweet(&pool, &old_tweet)
784 .await
785 .expect("insert old");
786
787 let times = get_todays_tweet_times(&pool).await.expect("times");
788 assert_eq!(times.len(), 1);
790 }
791
792 #[tokio::test]
793 async fn get_last_thread_time_returns_latest() {
794 let pool = init_test_db().await.expect("init db");
795
796 let mut thread1 = sample_thread();
797 thread1.created_at = "2026-02-20T10:00:00Z".to_string();
798 insert_thread(&pool, &thread1).await.expect("insert");
799
800 let mut thread2 = sample_thread();
801 thread2.created_at = "2026-02-22T10:00:00Z".to_string();
802 insert_thread(&pool, &thread2).await.expect("insert");
803
804 let time = get_last_thread_time(&pool).await.expect("get time");
805 assert_eq!(time, Some("2026-02-22T10:00:00Z".to_string()));
806 }
807
808 #[tokio::test]
809 async fn insert_original_tweet_failed_status() {
810 let pool = init_test_db().await.expect("init db");
811
812 let mut tweet = sample_original_tweet();
813 tweet.status = "failed".to_string();
814 tweet.error_message = Some("API error".to_string());
815 tweet.tweet_id = None;
816
817 let id = insert_original_tweet(&pool, &tweet).await.expect("insert");
818 assert!(id > 0);
819
820 let time = get_last_original_tweet_time(&pool).await.expect("get time");
822 assert!(time.is_none());
823 }
824
825 #[tokio::test]
826 async fn get_threads_in_range_filters() {
827 let pool = init_test_db().await.expect("init db");
828
829 let mut thread = sample_thread();
830 thread.created_at = "2026-02-20T10:00:00Z".to_string();
831 insert_thread(&pool, &thread).await.expect("insert");
832
833 let mut thread2 = sample_thread();
834 thread2.created_at = "2026-02-25T10:00:00Z".to_string();
835 insert_thread(&pool, &thread2).await.expect("insert");
836
837 let in_range = get_threads_in_range(&pool, "2026-02-19T00:00:00Z", "2026-02-21T00:00:00Z")
838 .await
839 .expect("range");
840 assert_eq!(in_range.len(), 1);
841
842 let all = get_threads_in_range(&pool, "2026-02-01T00:00:00Z", "2026-02-28T00:00:00Z")
843 .await
844 .expect("range");
845 assert_eq!(all.len(), 2);
846 }
847
848 #[tokio::test]
849 async fn get_thread_tweet_ids_by_root_excludes_root() {
850 let pool = init_test_db().await.expect("init db");
851 let account_id = DEFAULT_ACCOUNT_ID;
852
853 let thread = sample_thread(); let thread_id = insert_thread_for(&pool, account_id, &thread)
855 .await
856 .expect("insert thread");
857
858 let tweets = vec![
860 ThreadTweet {
861 id: 0,
862 thread_id,
863 position: 0,
864 tweet_id: Some("root_456".to_string()),
865 content: "Root tweet".to_string(),
866 created_at: now_iso(),
867 },
868 ThreadTweet {
869 id: 0,
870 thread_id,
871 position: 1,
872 tweet_id: Some("child_1".to_string()),
873 content: "Child 1".to_string(),
874 created_at: now_iso(),
875 },
876 ThreadTweet {
877 id: 0,
878 thread_id,
879 position: 2,
880 tweet_id: Some("child_2".to_string()),
881 content: "Child 2".to_string(),
882 created_at: now_iso(),
883 },
884 ];
885 insert_thread_tweets_for(&pool, account_id, thread_id, &tweets)
886 .await
887 .expect("insert tweets");
888
889 let child_ids = get_thread_tweet_ids_by_root_for(&pool, account_id, "root_456")
890 .await
891 .expect("query");
892
893 assert_eq!(child_ids.len(), 2);
894 assert_eq!(child_ids[0], "child_1");
895 assert_eq!(child_ids[1], "child_2");
896 }
897
898 #[tokio::test]
899 async fn get_thread_tweet_ids_by_root_empty_when_no_children() {
900 let pool = init_test_db().await.expect("init db");
901 let account_id = DEFAULT_ACCOUNT_ID;
902
903 let ids = get_thread_tweet_ids_by_root_for(&pool, account_id, "nonexistent_root")
904 .await
905 .expect("query");
906 assert!(ids.is_empty());
907 }
908
909 #[tokio::test]
910 async fn persist_thread_records_creates_all_rows() {
911 let pool = init_test_db().await.expect("init db");
912 let account_id = DEFAULT_ACCOUNT_ID;
913
914 let tweet_ids = vec![
915 "root_t1".to_string(),
916 "child_t2".to_string(),
917 "child_t3".to_string(),
918 ];
919 let tweet_contents = vec![
920 "Root content".to_string(),
921 "Child 2 content".to_string(),
922 "Child 3 content".to_string(),
923 ];
924
925 let (thread_id, ot_id) = persist_thread_records(
926 &pool,
927 account_id,
928 "test topic",
929 &tweet_ids,
930 &tweet_contents,
931 "sent",
932 )
933 .await
934 .expect("persist");
935
936 assert!(thread_id > 0);
937 assert!(ot_id > 0);
938
939 let threads: Vec<(String, i64)> =
941 sqlx::query_as("SELECT root_tweet_id, tweet_count FROM threads WHERE id = ?")
942 .bind(thread_id)
943 .fetch_all(&pool)
944 .await
945 .expect("query threads");
946 assert_eq!(threads.len(), 1);
947 assert_eq!(threads[0].0, "root_t1");
948 assert_eq!(threads[0].1, 3);
949
950 let tt_count: (i64,) =
952 sqlx::query_as("SELECT COUNT(*) FROM thread_tweets WHERE thread_id = ?")
953 .bind(thread_id)
954 .fetch_one(&pool)
955 .await
956 .expect("count");
957 assert_eq!(tt_count.0, 3);
958
959 let ot: Vec<(Option<String>,)> =
961 sqlx::query_as("SELECT tweet_id FROM original_tweets WHERE id = ?")
962 .bind(ot_id)
963 .fetch_all(&pool)
964 .await
965 .expect("query ot");
966 assert_eq!(ot.len(), 1);
967 assert_eq!(ot[0].0.as_deref(), Some("root_t1"));
968
969 let children = get_thread_tweet_ids_by_root_for(&pool, account_id, "root_t1")
971 .await
972 .expect("children");
973 assert_eq!(children, vec!["child_t2", "child_t3"]);
974 }
975
976 #[tokio::test]
977 async fn persist_thread_records_partial_status() {
978 let pool = init_test_db().await.expect("init db");
979 let account_id = DEFAULT_ACCOUNT_ID;
980
981 let tweet_ids = vec!["partial_root".to_string(), "partial_child".to_string()];
983 let tweet_contents = vec!["Root".to_string(), "Child".to_string()];
984
985 let (thread_id, _ot_id) = persist_thread_records(
986 &pool,
987 account_id,
988 "partial topic",
989 &tweet_ids,
990 &tweet_contents,
991 "partial",
992 )
993 .await
994 .expect("persist partial");
995
996 let status: (String,) = sqlx::query_as("SELECT status FROM threads WHERE id = ?")
997 .bind(thread_id)
998 .fetch_one(&pool)
999 .await
1000 .expect("query");
1001 assert_eq!(status.0, "partial");
1002
1003 let ot_status: (String,) =
1005 sqlx::query_as("SELECT status FROM original_tweets WHERE tweet_id = ?")
1006 .bind("partial_root")
1007 .fetch_one(&pool)
1008 .await
1009 .expect("query ot");
1010 assert_eq!(ot_status.0, "sent");
1011 }
1012}