1use crate::twitter_client::{api, PagedResult, TwitterClient};
2use crate::user_config::UserConfig;
3use anyhow::{anyhow, Context, Result};
4use itertools::Itertools;
5use std::collections::HashMap;
6use std::fs;
7use std::future::Future;
8use std::sync::{Arc, Mutex};
9use tokio::sync::Mutex as AsyncMutex;
10
11#[derive(Debug)]
16pub struct Store {
17 pub twitter_client: TwitterClient,
18 pub twitter_user: api::User,
19 pub tweets: Arc<Mutex<HashMap<String, api::Tweet>>>,
20 pub tweets_feed: Arc<Mutex<Vec<String>>>,
21 pub tweets_feed_page_token: Arc<AsyncMutex<Option<String>>>,
22 pub user_config: Arc<Mutex<UserConfig>>,
23}
24
25impl Store {
26 pub fn new(
27 twitter_client: TwitterClient,
28 twitter_user: &api::User,
29 user_config: &UserConfig,
30 ) -> Self {
31 Self {
32 twitter_client,
33 twitter_user: twitter_user.clone(),
34 tweets: Arc::new(Mutex::new(HashMap::new())),
35 tweets_feed: Arc::new(Mutex::new(Vec::new())),
36 tweets_feed_page_token: Arc::new(AsyncMutex::new(None)),
37 user_config: Arc::new(Mutex::new(user_config.clone())),
38 }
39 }
40
41 pub fn save_user_config(&self) -> Result<()> {
42 let user_config = self.user_config.lock().unwrap();
43 let user_config = serde_json::to_string(&*user_config)?;
44 fs::write("./var/.user_config", user_config)?;
45 Ok(())
46 }
47
48 pub async fn load_tweets_feed<
54 F: Future<Output = PagedResult<Vec<api::Tweet>>>,
55 G: Fn(Option<String>) -> F,
56 >(
57 &self,
58 g: G,
59 restart: bool,
60 ) -> Result<()> {
61 let mut tweets_page_token = self
62 .tweets_feed_page_token
63 .try_lock()
64 .with_context(|| anyhow!("Already in-flight"))?;
65
66 let mut maybe_page_token = None;
67 if !restart {
69 let next_page_token = tweets_page_token.as_ref().ok_or(anyhow!("No more pages"))?;
70 maybe_page_token = Some(next_page_token.clone());
71 }
72
73 let (new_tweets, page_token) = g(maybe_page_token).await?;
74 let mut new_tweets_reverse_chronological: Vec<String> = Vec::new();
75
76 *tweets_page_token = page_token;
77
78 {
79 let mut tweets = self.tweets.lock().unwrap();
80 for tweet in new_tweets {
81 new_tweets_reverse_chronological.push(tweet.id.clone());
82 tweets.insert(tweet.id.clone(), tweet);
83 }
84 }
85 {
86 let mut tweets_reverse_chronological = self.tweets_feed.lock().unwrap();
87 if restart {
88 *tweets_reverse_chronological = new_tweets_reverse_chronological;
89 } else {
90 tweets_reverse_chronological.append(&mut new_tweets_reverse_chronological);
91 }
92 }
93
94 Ok(())
95 }
96
97 pub async fn load_tweets_reverse_chronological(&self, restart: bool) -> Result<()> {
98 self.load_tweets_feed(
99 move |maybe_page_token| async move {
100 self.twitter_client
101 .timeline_reverse_chronological(&self.twitter_user.id, maybe_page_token)
102 .await
103 },
104 restart,
105 )
106 .await
107 }
108
109 pub async fn load_user_tweets(&self, user_id: &str, restart: bool) -> Result<()> {
110 self.load_tweets_feed(
111 move |maybe_page_token| async move {
112 self.twitter_client
113 .user_tweets(user_id, maybe_page_token)
114 .await
115 },
116 restart,
117 )
118 .await
119 }
120
121 pub async fn load_search_tweets(&self, query: &str, restart: bool) -> Result<()> {
122 self.load_tweets_feed(
123 move |_maybe_page_token| {
124 let query = query.clone();
125 async move { self.twitter_client.search_tweets(&query).await }
126 },
127 restart,
128 )
129 .await
130 }
131}