1use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9
10#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
12pub struct OriginalTweet {
13 pub id: i64,
15 pub tweet_id: Option<String>,
17 pub content: String,
19 pub topic: Option<String>,
21 pub llm_provider: Option<String>,
23 pub created_at: String,
25 pub status: String,
27 pub error_message: Option<String>,
29}
30
31#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
33pub struct Thread {
34 pub id: i64,
36 pub topic: String,
38 pub tweet_count: i64,
40 pub root_tweet_id: Option<String>,
42 pub created_at: String,
44 pub status: String,
46}
47
48#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize)]
50pub struct ThreadTweet {
51 pub id: i64,
53 pub thread_id: i64,
55 pub position: i64,
57 pub tweet_id: Option<String>,
59 pub content: String,
61 pub created_at: String,
63}
64
65pub async fn insert_original_tweet_for(
67 pool: &DbPool,
68 account_id: &str,
69 tweet: &OriginalTweet,
70) -> Result<i64, StorageError> {
71 let result = sqlx::query(
72 "INSERT INTO original_tweets \
73 (account_id, tweet_id, content, topic, llm_provider, created_at, status, error_message) \
74 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
75 )
76 .bind(account_id)
77 .bind(&tweet.tweet_id)
78 .bind(&tweet.content)
79 .bind(&tweet.topic)
80 .bind(&tweet.llm_provider)
81 .bind(&tweet.created_at)
82 .bind(&tweet.status)
83 .bind(&tweet.error_message)
84 .execute(pool)
85 .await
86 .map_err(|e| StorageError::Query { source: e })?;
87
88 Ok(result.last_insert_rowid())
89}
90
91pub async fn insert_original_tweet(
93 pool: &DbPool,
94 tweet: &OriginalTweet,
95) -> Result<i64, StorageError> {
96 insert_original_tweet_for(pool, DEFAULT_ACCOUNT_ID, tweet).await
97}
98
99pub async fn set_original_tweet_source_node_for(
103 pool: &DbPool,
104 account_id: &str,
105 id: i64,
106 source_node_id: i64,
107) -> Result<(), StorageError> {
108 sqlx::query("UPDATE original_tweets SET source_node_id = ? WHERE id = ? AND account_id = ?")
109 .bind(source_node_id)
110 .bind(id)
111 .bind(account_id)
112 .execute(pool)
113 .await
114 .map_err(|e| StorageError::Query { source: e })?;
115
116 Ok(())
117}
118
119pub async fn insert_original_tweet_with_provenance_for(
123 pool: &DbPool,
124 account_id: &str,
125 tweet: &OriginalTweet,
126 refs: &[super::provenance::ProvenanceRef],
127) -> Result<i64, StorageError> {
128 let id = insert_original_tweet_for(pool, account_id, tweet).await?;
129
130 if !refs.is_empty() {
131 super::provenance::insert_links_for(pool, account_id, "original_tweet", id, refs).await?;
132 }
133
134 Ok(id)
135}
136
137pub async fn get_last_original_tweet_time_for(
139 pool: &DbPool,
140 account_id: &str,
141) -> Result<Option<String>, StorageError> {
142 let row: Option<(String,)> = sqlx::query_as(
143 "SELECT created_at FROM original_tweets WHERE account_id = ? AND status = 'sent' \
144 ORDER BY created_at DESC LIMIT 1",
145 )
146 .bind(account_id)
147 .fetch_optional(pool)
148 .await
149 .map_err(|e| StorageError::Query { source: e })?;
150
151 Ok(row.map(|r| r.0))
152}
153
154pub async fn get_last_original_tweet_time(pool: &DbPool) -> Result<Option<String>, StorageError> {
156 get_last_original_tweet_time_for(pool, DEFAULT_ACCOUNT_ID).await
157}
158
159pub async fn count_tweets_today_for(pool: &DbPool, account_id: &str) -> Result<i64, StorageError> {
161 let row: (i64,) = sqlx::query_as(
162 "SELECT COUNT(*) FROM original_tweets WHERE account_id = ? AND date(created_at) = date('now')",
163 )
164 .bind(account_id)
165 .fetch_one(pool)
166 .await
167 .map_err(|e| StorageError::Query { source: e })?;
168
169 Ok(row.0)
170}
171
172pub async fn count_tweets_today(pool: &DbPool) -> Result<i64, StorageError> {
174 count_tweets_today_for(pool, DEFAULT_ACCOUNT_ID).await
175}
176
177pub async fn insert_thread_for(
179 pool: &DbPool,
180 account_id: &str,
181 thread: &Thread,
182) -> Result<i64, StorageError> {
183 let result = sqlx::query(
184 "INSERT INTO threads (account_id, topic, tweet_count, root_tweet_id, created_at, status) \
185 VALUES (?, ?, ?, ?, ?, ?)",
186 )
187 .bind(account_id)
188 .bind(&thread.topic)
189 .bind(thread.tweet_count)
190 .bind(&thread.root_tweet_id)
191 .bind(&thread.created_at)
192 .bind(&thread.status)
193 .execute(pool)
194 .await
195 .map_err(|e| StorageError::Query { source: e })?;
196
197 Ok(result.last_insert_rowid())
198}
199
200pub async fn insert_thread(pool: &DbPool, thread: &Thread) -> Result<i64, StorageError> {
202 insert_thread_for(pool, DEFAULT_ACCOUNT_ID, thread).await
203}
204
205pub async fn insert_thread_tweets_for(
209 pool: &DbPool,
210 account_id: &str,
211 thread_id: i64,
212 tweets: &[ThreadTweet],
213) -> Result<(), StorageError> {
214 let mut tx = pool
215 .begin()
216 .await
217 .map_err(|e| StorageError::Connection { source: e })?;
218
219 for tweet in tweets {
220 sqlx::query(
221 "INSERT INTO thread_tweets \
222 (account_id, thread_id, position, tweet_id, content, created_at) \
223 VALUES (?, ?, ?, ?, ?, ?)",
224 )
225 .bind(account_id)
226 .bind(thread_id)
227 .bind(tweet.position)
228 .bind(&tweet.tweet_id)
229 .bind(&tweet.content)
230 .bind(&tweet.created_at)
231 .execute(&mut *tx)
232 .await
233 .map_err(|e| StorageError::Query { source: e })?;
234 }
235
236 tx.commit()
237 .await
238 .map_err(|e| StorageError::Connection { source: e })?;
239
240 Ok(())
241}
242
243pub async fn insert_thread_tweets(
247 pool: &DbPool,
248 thread_id: i64,
249 tweets: &[ThreadTweet],
250) -> Result<(), StorageError> {
251 insert_thread_tweets_for(pool, DEFAULT_ACCOUNT_ID, thread_id, tweets).await
252}
253
254pub async fn get_last_thread_time_for(
256 pool: &DbPool,
257 account_id: &str,
258) -> Result<Option<String>, StorageError> {
259 let row: Option<(String,)> = sqlx::query_as(
260 "SELECT created_at FROM threads WHERE account_id = ? AND status = 'sent' \
261 ORDER BY created_at DESC LIMIT 1",
262 )
263 .bind(account_id)
264 .fetch_optional(pool)
265 .await
266 .map_err(|e| StorageError::Query { source: e })?;
267
268 Ok(row.map(|r| r.0))
269}
270
271pub async fn get_last_thread_time(pool: &DbPool) -> Result<Option<String>, StorageError> {
273 get_last_thread_time_for(pool, DEFAULT_ACCOUNT_ID).await
274}
275
276pub async fn get_todays_tweet_times_for(
278 pool: &DbPool,
279 account_id: &str,
280) -> Result<Vec<String>, StorageError> {
281 let rows: Vec<(String,)> = sqlx::query_as(
282 "SELECT created_at FROM original_tweets \
283 WHERE account_id = ? AND status = 'sent' AND date(created_at) = date('now') \
284 ORDER BY created_at ASC",
285 )
286 .bind(account_id)
287 .fetch_all(pool)
288 .await
289 .map_err(|e| StorageError::Query { source: e })?;
290
291 Ok(rows.into_iter().map(|r| r.0).collect())
292}
293
294pub async fn get_todays_tweet_times(pool: &DbPool) -> Result<Vec<String>, StorageError> {
296 get_todays_tweet_times_for(pool, DEFAULT_ACCOUNT_ID).await
297}
298
299pub async fn count_threads_this_week_for(
301 pool: &DbPool,
302 account_id: &str,
303) -> Result<i64, StorageError> {
304 let row: (i64,) = sqlx::query_as(
305 "SELECT COUNT(*) FROM threads \
306 WHERE account_id = ? AND strftime('%Y-%W', created_at) = strftime('%Y-%W', 'now')",
307 )
308 .bind(account_id)
309 .fetch_one(pool)
310 .await
311 .map_err(|e| StorageError::Query { source: e })?;
312
313 Ok(row.0)
314}
315
316pub async fn count_threads_this_week(pool: &DbPool) -> Result<i64, StorageError> {
318 count_threads_this_week_for(pool, DEFAULT_ACCOUNT_ID).await
319}
320
321pub async fn get_tweets_in_range_for(
323 pool: &DbPool,
324 account_id: &str,
325 from: &str,
326 to: &str,
327) -> Result<Vec<OriginalTweet>, StorageError> {
328 sqlx::query_as::<_, OriginalTweet>(
329 "SELECT * FROM original_tweets \
330 WHERE account_id = ? AND created_at BETWEEN ? AND ? \
331 ORDER BY created_at ASC",
332 )
333 .bind(account_id)
334 .bind(from)
335 .bind(to)
336 .fetch_all(pool)
337 .await
338 .map_err(|e| StorageError::Query { source: e })
339}
340
341pub async fn get_tweets_in_range(
343 pool: &DbPool,
344 from: &str,
345 to: &str,
346) -> Result<Vec<OriginalTweet>, StorageError> {
347 get_tweets_in_range_for(pool, DEFAULT_ACCOUNT_ID, from, to).await
348}
349
350pub async fn get_threads_in_range_for(
352 pool: &DbPool,
353 account_id: &str,
354 from: &str,
355 to: &str,
356) -> Result<Vec<Thread>, StorageError> {
357 sqlx::query_as::<_, Thread>(
358 "SELECT * FROM threads \
359 WHERE account_id = ? AND created_at BETWEEN ? AND ? \
360 ORDER BY created_at ASC",
361 )
362 .bind(account_id)
363 .bind(from)
364 .bind(to)
365 .fetch_all(pool)
366 .await
367 .map_err(|e| StorageError::Query { source: e })
368}
369
370pub async fn get_threads_in_range(
372 pool: &DbPool,
373 from: &str,
374 to: &str,
375) -> Result<Vec<Thread>, StorageError> {
376 get_threads_in_range_for(pool, DEFAULT_ACCOUNT_ID, from, to).await
377}
378
379pub async fn get_recent_original_tweets_for(
381 pool: &DbPool,
382 account_id: &str,
383 limit: u32,
384) -> Result<Vec<OriginalTweet>, StorageError> {
385 sqlx::query_as::<_, OriginalTweet>(
386 "SELECT * FROM original_tweets WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
387 )
388 .bind(account_id)
389 .bind(limit)
390 .fetch_all(pool)
391 .await
392 .map_err(|e| StorageError::Query { source: e })
393}
394
395pub async fn get_recent_original_tweets(
397 pool: &DbPool,
398 limit: u32,
399) -> Result<Vec<OriginalTweet>, StorageError> {
400 get_recent_original_tweets_for(pool, DEFAULT_ACCOUNT_ID, limit).await
401}
402
403pub async fn get_recent_threads_for(
405 pool: &DbPool,
406 account_id: &str,
407 limit: u32,
408) -> Result<Vec<Thread>, StorageError> {
409 sqlx::query_as::<_, Thread>(
410 "SELECT * FROM threads WHERE account_id = ? ORDER BY created_at DESC LIMIT ?",
411 )
412 .bind(account_id)
413 .bind(limit)
414 .fetch_all(pool)
415 .await
416 .map_err(|e| StorageError::Query { source: e })
417}
418
419pub async fn get_recent_threads(pool: &DbPool, limit: u32) -> Result<Vec<Thread>, StorageError> {
421 get_recent_threads_for(pool, DEFAULT_ACCOUNT_ID, limit).await
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::storage::init_test_db;
428
429 fn now_iso() -> String {
430 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
431 }
432
433 fn sample_original_tweet() -> OriginalTweet {
434 OriginalTweet {
435 id: 0,
436 tweet_id: Some("ot_123".to_string()),
437 content: "Educational tweet about Rust".to_string(),
438 topic: Some("rust".to_string()),
439 llm_provider: Some("openai".to_string()),
440 created_at: now_iso(),
441 status: "sent".to_string(),
442 error_message: None,
443 }
444 }
445
446 fn sample_thread() -> Thread {
447 Thread {
448 id: 0,
449 topic: "Rust async patterns".to_string(),
450 tweet_count: 3,
451 root_tweet_id: Some("root_456".to_string()),
452 created_at: now_iso(),
453 status: "sent".to_string(),
454 }
455 }
456
457 fn sample_thread_tweets(thread_id: i64) -> Vec<ThreadTweet> {
458 (0..3)
459 .map(|i| ThreadTweet {
460 id: 0,
461 thread_id,
462 position: i,
463 tweet_id: Some(format!("tt_{i}")),
464 content: format!("Thread tweet {i}"),
465 created_at: now_iso(),
466 })
467 .collect()
468 }
469
470 #[tokio::test]
471 async fn insert_and_query_original_tweet() {
472 let pool = init_test_db().await.expect("init db");
473 let tweet = sample_original_tweet();
474
475 let id = insert_original_tweet(&pool, &tweet).await.expect("insert");
476 assert!(id > 0);
477
478 let time = get_last_original_tweet_time(&pool).await.expect("get time");
479 assert!(time.is_some());
480 }
481
482 #[tokio::test]
483 async fn count_tweets_today_works() {
484 let pool = init_test_db().await.expect("init db");
485 let tweet = sample_original_tweet();
486
487 insert_original_tweet(&pool, &tweet).await.expect("insert");
488 let count = count_tweets_today(&pool).await.expect("count");
489 assert_eq!(count, 1);
490 }
491
492 #[tokio::test]
493 async fn insert_thread_with_tweets() {
494 let pool = init_test_db().await.expect("init db");
495 let thread = sample_thread();
496
497 let thread_id = insert_thread(&pool, &thread).await.expect("insert thread");
498 let tweets = sample_thread_tweets(thread_id);
499 insert_thread_tweets(&pool, thread_id, &tweets)
500 .await
501 .expect("insert tweets");
502
503 let rows: Vec<(i64,)> = sqlx::query_as(
505 "SELECT position FROM thread_tweets WHERE thread_id = ? ORDER BY position",
506 )
507 .bind(thread_id)
508 .fetch_all(&pool)
509 .await
510 .expect("query");
511
512 assert_eq!(rows.len(), 3);
513 assert_eq!(rows[0].0, 0);
514 assert_eq!(rows[1].0, 1);
515 assert_eq!(rows[2].0, 2);
516 }
517
518 #[tokio::test]
519 async fn thread_tweet_duplicate_position_fails() {
520 let pool = init_test_db().await.expect("init db");
521 let thread = sample_thread();
522
523 let thread_id = insert_thread(&pool, &thread).await.expect("insert thread");
524
525 let duplicate_tweets = vec![
527 ThreadTweet {
528 id: 0,
529 thread_id,
530 position: 0,
531 tweet_id: Some("a".to_string()),
532 content: "First".to_string(),
533 created_at: now_iso(),
534 },
535 ThreadTweet {
536 id: 0,
537 thread_id,
538 position: 0, tweet_id: Some("b".to_string()),
540 content: "Second".to_string(),
541 created_at: now_iso(),
542 },
543 ];
544
545 let result = insert_thread_tweets(&pool, thread_id, &duplicate_tweets).await;
546 assert!(result.is_err());
547
548 let rows: Vec<(i64,)> =
550 sqlx::query_as("SELECT COUNT(*) FROM thread_tweets WHERE thread_id = ?")
551 .bind(thread_id)
552 .fetch_all(&pool)
553 .await
554 .expect("query");
555
556 assert_eq!(rows[0].0, 0, "transaction should have rolled back");
557 }
558
559 #[tokio::test]
560 async fn count_threads_this_week_works() {
561 let pool = init_test_db().await.expect("init db");
562 let thread = sample_thread();
563
564 insert_thread(&pool, &thread).await.expect("insert");
565 let count = count_threads_this_week(&pool).await.expect("count");
566 assert_eq!(count, 1);
567 }
568
569 #[tokio::test]
570 async fn last_thread_time_empty() {
571 let pool = init_test_db().await.expect("init db");
572 let time = get_last_thread_time(&pool).await.expect("get time");
573 assert!(time.is_none());
574 }
575
576 #[tokio::test]
577 async fn get_tweets_in_range_filters() {
578 let pool = init_test_db().await.expect("init db");
579
580 let mut tweet = sample_original_tweet();
581 tweet.created_at = "2026-02-20T10:00:00Z".to_string();
582 insert_original_tweet(&pool, &tweet).await.expect("insert");
583
584 let mut tweet2 = sample_original_tweet();
585 tweet2.created_at = "2026-02-25T10:00:00Z".to_string();
586 tweet2.tweet_id = Some("ot_456".to_string());
587 insert_original_tweet(&pool, &tweet2).await.expect("insert");
588
589 let in_range = get_tweets_in_range(&pool, "2026-02-19T00:00:00Z", "2026-02-21T00:00:00Z")
590 .await
591 .expect("range");
592 assert_eq!(in_range.len(), 1);
593 assert_eq!(in_range[0].tweet_id, Some("ot_123".to_string()));
594
595 let all = get_tweets_in_range(&pool, "2026-02-01T00:00:00Z", "2026-02-28T00:00:00Z")
596 .await
597 .expect("range");
598 assert_eq!(all.len(), 2);
599 }
600
601 #[tokio::test]
602 async fn get_recent_original_tweets_returns_newest_first() {
603 let pool = init_test_db().await.expect("init db");
604
605 let mut tweet1 = sample_original_tweet();
606 tweet1.created_at = "2026-02-20T10:00:00Z".to_string();
607 tweet1.tweet_id = Some("ot_1".to_string());
608 insert_original_tweet(&pool, &tweet1).await.expect("insert");
609
610 let mut tweet2 = sample_original_tweet();
611 tweet2.created_at = "2026-02-21T10:00:00Z".to_string();
612 tweet2.tweet_id = Some("ot_2".to_string());
613 insert_original_tweet(&pool, &tweet2).await.expect("insert");
614
615 let mut tweet3 = sample_original_tweet();
616 tweet3.created_at = "2026-02-22T10:00:00Z".to_string();
617 tweet3.tweet_id = Some("ot_3".to_string());
618 insert_original_tweet(&pool, &tweet3).await.expect("insert");
619
620 let recent = get_recent_original_tweets(&pool, 2).await.expect("recent");
621 assert_eq!(recent.len(), 2);
622 assert_eq!(recent[0].tweet_id, Some("ot_3".to_string()));
623 assert_eq!(recent[1].tweet_id, Some("ot_2".to_string()));
624 }
625
626 #[tokio::test]
627 async fn get_recent_original_tweets_empty() {
628 let pool = init_test_db().await.expect("init db");
629 let recent = get_recent_original_tweets(&pool, 10).await.expect("recent");
630 assert!(recent.is_empty());
631 }
632
633 #[tokio::test]
634 async fn get_recent_threads_returns_newest_first() {
635 let pool = init_test_db().await.expect("init db");
636
637 let mut thread1 = sample_thread();
638 thread1.created_at = "2026-02-20T10:00:00Z".to_string();
639 insert_thread(&pool, &thread1).await.expect("insert");
640
641 let mut thread2 = sample_thread();
642 thread2.created_at = "2026-02-21T10:00:00Z".to_string();
643 insert_thread(&pool, &thread2).await.expect("insert");
644
645 let recent = get_recent_threads(&pool, 1).await.expect("recent");
646 assert_eq!(recent.len(), 1);
647 assert_eq!(recent[0].created_at, "2026-02-21T10:00:00Z");
648 }
649
650 #[tokio::test]
651 async fn get_recent_threads_empty() {
652 let pool = init_test_db().await.expect("init db");
653 let recent = get_recent_threads(&pool, 10).await.expect("recent");
654 assert!(recent.is_empty());
655 }
656
657 #[tokio::test]
658 async fn get_todays_tweet_times_returns_today_only() {
659 let pool = init_test_db().await.expect("init db");
660
661 let tweet = sample_original_tweet();
663 insert_original_tweet(&pool, &tweet).await.expect("insert");
664
665 let mut old_tweet = sample_original_tweet();
667 old_tweet.created_at = "2020-01-01T10:00:00Z".to_string();
668 old_tweet.tweet_id = Some("ot_old".to_string());
669 insert_original_tweet(&pool, &old_tweet)
670 .await
671 .expect("insert old");
672
673 let times = get_todays_tweet_times(&pool).await.expect("times");
674 assert_eq!(times.len(), 1);
676 }
677
678 #[tokio::test]
679 async fn get_last_thread_time_returns_latest() {
680 let pool = init_test_db().await.expect("init db");
681
682 let mut thread1 = sample_thread();
683 thread1.created_at = "2026-02-20T10:00:00Z".to_string();
684 insert_thread(&pool, &thread1).await.expect("insert");
685
686 let mut thread2 = sample_thread();
687 thread2.created_at = "2026-02-22T10:00:00Z".to_string();
688 insert_thread(&pool, &thread2).await.expect("insert");
689
690 let time = get_last_thread_time(&pool).await.expect("get time");
691 assert_eq!(time, Some("2026-02-22T10:00:00Z".to_string()));
692 }
693
694 #[tokio::test]
695 async fn insert_original_tweet_failed_status() {
696 let pool = init_test_db().await.expect("init db");
697
698 let mut tweet = sample_original_tweet();
699 tweet.status = "failed".to_string();
700 tweet.error_message = Some("API error".to_string());
701 tweet.tweet_id = None;
702
703 let id = insert_original_tweet(&pool, &tweet).await.expect("insert");
704 assert!(id > 0);
705
706 let time = get_last_original_tweet_time(&pool).await.expect("get time");
708 assert!(time.is_none());
709 }
710
711 #[tokio::test]
712 async fn get_threads_in_range_filters() {
713 let pool = init_test_db().await.expect("init db");
714
715 let mut thread = sample_thread();
716 thread.created_at = "2026-02-20T10:00:00Z".to_string();
717 insert_thread(&pool, &thread).await.expect("insert");
718
719 let mut thread2 = sample_thread();
720 thread2.created_at = "2026-02-25T10:00:00Z".to_string();
721 insert_thread(&pool, &thread2).await.expect("insert");
722
723 let in_range = get_threads_in_range(&pool, "2026-02-19T00:00:00Z", "2026-02-21T00:00:00Z")
724 .await
725 .expect("range");
726 assert_eq!(in_range.len(), 1);
727
728 let all = get_threads_in_range(&pool, "2026-02-01T00:00:00Z", "2026-02-28T00:00:00Z")
729 .await
730 .expect("range");
731 assert_eq!(all.len(), 2);
732 }
733}