tweet_scraper/
scraper.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use chromiumoxide::cdp::browser_protocol::network::{Cookie, SetUserAgentOverrideParams};
7use chromiumoxide::{Browser, BrowserConfig};
8use futures_util::stream::StreamExt;
9use futures_util::Stream;
10use once_cell::sync::Lazy;
11use reqwest::header::{self, HeaderMap, HeaderValue};
12use reqwest::{Client, StatusCode};
13use serde::Deserialize;
14use serde_json::Value;
15use url::Url;
16
17use crate::error::Error;
18use crate::header_persist::{load_headers, save_headers, PersistHeadersError};
19
20pub struct TweetScraper {
21    client: Client,
22    fetch_state: FetchState,
23}
24
25pub enum HeaderPersist {
26    Load(PathBuf),
27    Save(PathBuf),
28    None,
29}
30
31// State during stream iteration
32#[derive(Default)]
33struct FetchState {
34    tweets: VecDeque<Value>,
35    query: String,
36    limit: Option<usize>,
37    min_id: Option<u128>,
38    tweets_count: usize,
39    cursor: Option<String>,
40    errored: bool,
41}
42
43#[derive(Debug)]
44struct BrowserData {
45    cookies: Vec<Cookie>,
46}
47
48static USER_AGENT: &str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36";
49static ACCEPT_VALUE: &str = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9";
50static AUTHORIZATION_VALUE: &str = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA";
51
52impl TweetScraper {
53    pub async fn initialize(header_persist: HeaderPersist) -> Result<Self, Error> {
54        // If requested, load headers from file, otherwise spawn chromium process to get headers
55        let headers = if let HeaderPersist::Load(p) = &header_persist {
56            // Load headers from file
57            load_headers(&p)
58                .await
59                .map_err(|e| Error::PersistHeaders(PersistHeadersError::Load(e, p.clone())))?
60        } else {
61            // Load headers from chromium
62            let browser_data = browser_data().await?;
63            let mut headers = HeaderMap::new();
64            headers.insert(header::ACCEPT, HeaderValue::from_static(ACCEPT_VALUE));
65            headers.insert(
66                header::ACCEPT_ENCODING,
67                HeaderValue::from_static("gzip, deflate, br"),
68            );
69            headers.insert(
70                header::ACCEPT_LANGUAGE,
71                HeaderValue::from_static("en-US,en;q=0.9"),
72            );
73            headers.insert(
74                header::UPGRADE_INSECURE_REQUESTS,
75                HeaderValue::from_static("1"),
76            );
77            headers.insert(
78                header::AUTHORIZATION,
79                HeaderValue::from_static(AUTHORIZATION_VALUE),
80            );
81            let guest_token = &browser_data
82                .cookies
83                .iter()
84                .find(|c| c.name == "gt")
85                .ok_or_else(|| Error::NoGuestToken)?
86                .value;
87            headers.insert(
88                "x-guest-token",
89                HeaderValue::from_str(guest_token).map_err(|_| Error::InvalidGuestToken)?,
90            );
91            headers.insert(header::USER_AGENT, HeaderValue::from_static(USER_AGENT));
92            headers
93        };
94
95        // Save headers
96        if let HeaderPersist::Save(p) = &header_persist {
97            save_headers(&headers, &p)
98                .await
99                .map_err(|e| Error::PersistHeaders(PersistHeadersError::Save(e, p.clone())))?;
100        }
101
102        let client = Client::builder()
103            .default_headers(headers)
104            .gzip(true)
105            .brotli(true)
106            .deflate(true)
107            .build()
108            .map_err(|_| Error::Internal("could not build reqwest client".into()))?;
109
110        Ok(Self {
111            client,
112            fetch_state: Default::default(),
113        })
114    }
115
116    pub async fn tweets(
117        &mut self,
118        query: impl AsRef<str>,
119        limit: Option<usize>,
120        min_id: Option<u128>,
121    ) -> impl Stream<Item = Result<Value, Error>> + '_ {
122        // Reset internal state
123        self.fetch_state = FetchState {
124            query: query.as_ref().to_owned(),
125            limit,
126            min_id,
127            ..Default::default()
128        };
129
130        futures_util::stream::unfold(self, |state| async {
131            // Stop if previously errored
132            if state.fetch_state.errored {
133                return None;
134            }
135
136            // Stop if limit number reached
137            if let Some(limit) = state.fetch_state.limit {
138                if state.fetch_state.tweets_count >= limit {
139                    return None;
140                }
141            }
142
143            let mut should_return_tweet = |tweet| {
144                // Stop if minimum tweet id reached
145                if let Some(min_id) = state.fetch_state.min_id {
146                    let parse_id = |tweet: &Value| -> Result<u128, Error> {
147                        let id = tweet["id_str"]
148                            .as_str()
149                            .ok_or_else(|| Error::TweetParse("no id_str key".into()))?
150                            .parse()
151                            .map_err(|e| Error::TweetParse(format!("invalid id_str: {e}")))?;
152                        Ok(id)
153                    };
154                    match parse_id(&tweet) {
155                        Ok(id) => {
156                            if id < min_id {
157                                return None;
158                            }
159                        }
160                        Err(e) => {
161                            state.fetch_state.errored = true;
162                            return Some(Err(e));
163                        }
164                    }
165                }
166
167                // Return next tweet
168                state.fetch_state.tweets_count += 1;
169                Some(Ok(tweet))
170            };
171
172            // Try returning the next tweet if available
173            if let Some(tweet) = state.fetch_state.tweets.pop_front() {
174                if let Some(r) = should_return_tweet(tweet) {
175                    return Some((r, state));
176                }
177            }
178
179            // Scrape Twitter
180            match query_twitter(
181                &state.client,
182                state.fetch_state.query.as_str(),
183                state.fetch_state.cursor.as_deref(),
184            )
185            .await
186            {
187                Ok((tweets, cursor)) => {
188                    state.fetch_state.tweets.extend(tweets.into_iter());
189                    state.fetch_state.cursor = Some(cursor);
190                }
191                Err(e) => {
192                    state.fetch_state.errored = true;
193                    return Some((Err(e), state));
194                }
195            }
196
197            // Try returning the next tweet if available
198            if let Some(tweet) = state.fetch_state.tweets.pop_front() {
199                if let Some(r) = should_return_tweet(tweet) {
200                    return Some((r, state));
201                }
202            }
203
204            None
205        })
206    }
207}
208
209/// Get cookies for twitter.com
210async fn browser_data() -> Result<BrowserData, Error> {
211    let (mut browser, mut handler) = Browser::launch(
212        BrowserConfig::builder()
213            // Sometimes twitter webpage hangs if window is larger???
214            .window_size(800, 600)
215            .build()
216            .map_err(Error::Internal)?,
217    )
218    .await
219    .map_err(Error::Cdp)?;
220
221    let browser_handler =
222        tokio::task::spawn(async move { while handler.next().await.is_some() {} });
223
224    let page = Arc::new(
225        browser
226            .start_incognito_context()
227            .await
228            .map_err(Error::Cdp)?
229            .new_page("about:blank")
230            .await
231            .map_err(Error::Cdp)?,
232    );
233
234    page.set_user_agent(SetUserAgentOverrideParams::new(USER_AGENT))
235        .await
236        .map_err(Error::Cdp)?;
237
238    // Navigate to website to extract cookies
239    page.goto("https://twitter.com/explore")
240        .await
241        .map_err(Error::Cdp)?;
242    page.wait_for_navigation().await.map_err(Error::Cdp)?;
243
244    let cookies = page.get_cookies().await.map_err(Error::Cdp)?;
245
246    browser.close().await.map_err(Error::Cdp)?;
247    _ = browser_handler.await;
248
249    Ok(BrowserData { cookies })
250}
251
252async fn query_twitter(
253    client: &Client,
254    query: impl AsRef<str>,
255    cursor: Option<&str>,
256) -> Result<(Vec<Value>, String), Error> {
257    static URL: &str = "https://api.twitter.com/2/search/adaptive.json";
258
259    let mut url = Url::parse(URL).map_err(|_| Error::Internal("could not parse api url".into()))?;
260    url.query_pairs_mut()
261        .clear()
262        .append_pair("include_profile_interstitial_type", "1")
263        .append_pair("include_blocking", "1")
264        .append_pair("include_blocked_by", "1")
265        .append_pair("include_followed_by", "1")
266        .append_pair("include_want_retweets", "1")
267        .append_pair("include_mute_edge", "1")
268        .append_pair("include_can_dm", "1")
269        .append_pair("include_can_media_tag", "1")
270        .append_pair("skip_status", "1")
271        .append_pair("cards_platform", "Web-12")
272        .append_pair("include_cards", "1")
273        .append_pair("include_ext_alt_text", "true")
274        .append_pair("include_quote_count", "true")
275        .append_pair("include_reply_count", "1")
276        .append_pair("tweet_mode", "extended")
277        .append_pair("include_entities", "true")
278        .append_pair("include_user_entities", "true")
279        .append_pair("include_ext_media_color", "true")
280        .append_pair("include_ext_media_availability", "true")
281        .append_pair("send_error_codes", "true")
282        .append_pair("simple_quoted_tweet", "true")
283        .append_pair("query_source", "typed_query")
284        .append_pair("pc", "1")
285        .append_pair("spelling_corrections", "1")
286        .append_pair("ext", "mediaStats%2ChighlightedLabel")
287        .append_pair("count", "20")
288        .append_pair("tweet_search_mode", "live")
289        .append_pair("q", query.as_ref());
290
291    if let Some(cursor) = cursor {
292        url.query_pairs_mut().append_pair("cursor", cursor);
293    }
294
295    static RETRY_STATUS: Lazy<Vec<StatusCode>> =
296        Lazy::new(|| [StatusCode::TOO_MANY_REQUESTS, StatusCode::REQUEST_TIMEOUT].into());
297    let json = loop {
298        let response = client
299            .get(url.as_str())
300            .send()
301            .await
302            .map_err(|e| Error::Network(e.to_string()))?;
303        if response.status().is_success() {
304            break response
305                .json::<Value>()
306                .await
307                .map_err(|e| Error::TweetParse(e.to_string()))?;
308        }
309
310        if response.status().is_server_error() || RETRY_STATUS.contains(&response.status()) {
311            eprintln!(
312                "received response status code: {}, waiting 60 seconds",
313                response.status().as_u16()
314            );
315            tokio::time::sleep(Duration::from_secs(60)).await;
316        } else {
317            return Err(Error::BadStatus(response.status().as_u16()));
318        }
319    };
320
321    parse_tweets(json)
322}
323
324fn parse_tweets(json: Value) -> Result<(Vec<Value>, String), Error> {
325    #[derive(Deserialize)]
326    #[serde(rename_all = "camelCase")]
327    struct Root {
328        global_objects: GlobalObjects,
329        timeline: Value,
330    }
331
332    #[derive(Deserialize)]
333    struct GlobalObjects {
334        tweets: BTreeMap<String, Value>,
335        users: BTreeMap<String, Value>,
336    }
337
338    let root: Root = serde_json::from_value(json).map_err(|e| Error::TweetParse(e.to_string()))?;
339
340    // Add user object to every tweet
341    let mut tweets = root.global_objects.tweets;
342    let users = root.global_objects.users;
343    for (_, tweet) in tweets.iter_mut() {
344        if let Some(tweet) = tweet.as_object_mut() {
345            if let Some(user_id_str) = tweet["user_id_str"].as_str() {
346                if let Some(user) = users.get(user_id_str) {
347                    tweet.insert("user".to_owned(), user.clone());
348                }
349            }
350        }
351    }
352    let tweets: Vec<_> = tweets.into_values().rev().collect();
353
354    // Parse cursor
355    let timeline_str =
356        serde_json::to_string(&root.timeline).map_err(|e| Error::TweetParse(e.to_string()))?;
357    let cursor_re = regex::Regex::new(r#""(scroll:.+?)""#)
358        .map_err(|_| Error::Internal("could not build regex".into()))?;
359    let cursor = cursor_re
360        .captures(&timeline_str)
361        .ok_or_else(|| Error::TweetParse("could not find cursor".into()))?
362        .get(1)
363        .unwrap()
364        .as_str()
365        .to_owned();
366
367    Ok((tweets, cursor))
368}