1use std::collections::{BTreeMap, VecDeque};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use chromiumoxide::cdp::browser_protocol::network::{Cookie, SetUserAgentOverrideParams};
7use chromiumoxide::{Browser, BrowserConfig};
8use futures_util::stream::StreamExt;
9use futures_util::Stream;
10use once_cell::sync::Lazy;
11use reqwest::header::{self, HeaderMap, HeaderValue};
12use reqwest::{Client, StatusCode};
13use serde::Deserialize;
14use serde_json::Value;
15use url::Url;
16
17use crate::error::Error;
18use crate::header_persist::{load_headers, save_headers, PersistHeadersError};
19
20pub struct TweetScraper {
21 client: Client,
22 fetch_state: FetchState,
23}
24
25pub enum HeaderPersist {
26 Load(PathBuf),
27 Save(PathBuf),
28 None,
29}
30
31#[derive(Default)]
33struct FetchState {
34 tweets: VecDeque<Value>,
35 query: String,
36 limit: Option<usize>,
37 min_id: Option<u128>,
38 tweets_count: usize,
39 cursor: Option<String>,
40 errored: bool,
41}
42
43#[derive(Debug)]
44struct BrowserData {
45 cookies: Vec<Cookie>,
46}
47
48static USER_AGENT: &str = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36";
49static ACCEPT_VALUE: &str = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9";
50static AUTHORIZATION_VALUE: &str = "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA";
51
52impl TweetScraper {
53 pub async fn initialize(header_persist: HeaderPersist) -> Result<Self, Error> {
54 let headers = if let HeaderPersist::Load(p) = &header_persist {
56 load_headers(&p)
58 .await
59 .map_err(|e| Error::PersistHeaders(PersistHeadersError::Load(e, p.clone())))?
60 } else {
61 let browser_data = browser_data().await?;
63 let mut headers = HeaderMap::new();
64 headers.insert(header::ACCEPT, HeaderValue::from_static(ACCEPT_VALUE));
65 headers.insert(
66 header::ACCEPT_ENCODING,
67 HeaderValue::from_static("gzip, deflate, br"),
68 );
69 headers.insert(
70 header::ACCEPT_LANGUAGE,
71 HeaderValue::from_static("en-US,en;q=0.9"),
72 );
73 headers.insert(
74 header::UPGRADE_INSECURE_REQUESTS,
75 HeaderValue::from_static("1"),
76 );
77 headers.insert(
78 header::AUTHORIZATION,
79 HeaderValue::from_static(AUTHORIZATION_VALUE),
80 );
81 let guest_token = &browser_data
82 .cookies
83 .iter()
84 .find(|c| c.name == "gt")
85 .ok_or_else(|| Error::NoGuestToken)?
86 .value;
87 headers.insert(
88 "x-guest-token",
89 HeaderValue::from_str(guest_token).map_err(|_| Error::InvalidGuestToken)?,
90 );
91 headers.insert(header::USER_AGENT, HeaderValue::from_static(USER_AGENT));
92 headers
93 };
94
95 if let HeaderPersist::Save(p) = &header_persist {
97 save_headers(&headers, &p)
98 .await
99 .map_err(|e| Error::PersistHeaders(PersistHeadersError::Save(e, p.clone())))?;
100 }
101
102 let client = Client::builder()
103 .default_headers(headers)
104 .gzip(true)
105 .brotli(true)
106 .deflate(true)
107 .build()
108 .map_err(|_| Error::Internal("could not build reqwest client".into()))?;
109
110 Ok(Self {
111 client,
112 fetch_state: Default::default(),
113 })
114 }
115
116 pub async fn tweets(
117 &mut self,
118 query: impl AsRef<str>,
119 limit: Option<usize>,
120 min_id: Option<u128>,
121 ) -> impl Stream<Item = Result<Value, Error>> + '_ {
122 self.fetch_state = FetchState {
124 query: query.as_ref().to_owned(),
125 limit,
126 min_id,
127 ..Default::default()
128 };
129
130 futures_util::stream::unfold(self, |state| async {
131 if state.fetch_state.errored {
133 return None;
134 }
135
136 if let Some(limit) = state.fetch_state.limit {
138 if state.fetch_state.tweets_count >= limit {
139 return None;
140 }
141 }
142
143 let mut should_return_tweet = |tweet| {
144 if let Some(min_id) = state.fetch_state.min_id {
146 let parse_id = |tweet: &Value| -> Result<u128, Error> {
147 let id = tweet["id_str"]
148 .as_str()
149 .ok_or_else(|| Error::TweetParse("no id_str key".into()))?
150 .parse()
151 .map_err(|e| Error::TweetParse(format!("invalid id_str: {e}")))?;
152 Ok(id)
153 };
154 match parse_id(&tweet) {
155 Ok(id) => {
156 if id < min_id {
157 return None;
158 }
159 }
160 Err(e) => {
161 state.fetch_state.errored = true;
162 return Some(Err(e));
163 }
164 }
165 }
166
167 state.fetch_state.tweets_count += 1;
169 Some(Ok(tweet))
170 };
171
172 if let Some(tweet) = state.fetch_state.tweets.pop_front() {
174 if let Some(r) = should_return_tweet(tweet) {
175 return Some((r, state));
176 }
177 }
178
179 match query_twitter(
181 &state.client,
182 state.fetch_state.query.as_str(),
183 state.fetch_state.cursor.as_deref(),
184 )
185 .await
186 {
187 Ok((tweets, cursor)) => {
188 state.fetch_state.tweets.extend(tweets.into_iter());
189 state.fetch_state.cursor = Some(cursor);
190 }
191 Err(e) => {
192 state.fetch_state.errored = true;
193 return Some((Err(e), state));
194 }
195 }
196
197 if let Some(tweet) = state.fetch_state.tweets.pop_front() {
199 if let Some(r) = should_return_tweet(tweet) {
200 return Some((r, state));
201 }
202 }
203
204 None
205 })
206 }
207}
208
209async fn browser_data() -> Result<BrowserData, Error> {
211 let (mut browser, mut handler) = Browser::launch(
212 BrowserConfig::builder()
213 .window_size(800, 600)
215 .build()
216 .map_err(Error::Internal)?,
217 )
218 .await
219 .map_err(Error::Cdp)?;
220
221 let browser_handler =
222 tokio::task::spawn(async move { while handler.next().await.is_some() {} });
223
224 let page = Arc::new(
225 browser
226 .start_incognito_context()
227 .await
228 .map_err(Error::Cdp)?
229 .new_page("about:blank")
230 .await
231 .map_err(Error::Cdp)?,
232 );
233
234 page.set_user_agent(SetUserAgentOverrideParams::new(USER_AGENT))
235 .await
236 .map_err(Error::Cdp)?;
237
238 page.goto("https://twitter.com/explore")
240 .await
241 .map_err(Error::Cdp)?;
242 page.wait_for_navigation().await.map_err(Error::Cdp)?;
243
244 let cookies = page.get_cookies().await.map_err(Error::Cdp)?;
245
246 browser.close().await.map_err(Error::Cdp)?;
247 _ = browser_handler.await;
248
249 Ok(BrowserData { cookies })
250}
251
252async fn query_twitter(
253 client: &Client,
254 query: impl AsRef<str>,
255 cursor: Option<&str>,
256) -> Result<(Vec<Value>, String), Error> {
257 static URL: &str = "https://api.twitter.com/2/search/adaptive.json";
258
259 let mut url = Url::parse(URL).map_err(|_| Error::Internal("could not parse api url".into()))?;
260 url.query_pairs_mut()
261 .clear()
262 .append_pair("include_profile_interstitial_type", "1")
263 .append_pair("include_blocking", "1")
264 .append_pair("include_blocked_by", "1")
265 .append_pair("include_followed_by", "1")
266 .append_pair("include_want_retweets", "1")
267 .append_pair("include_mute_edge", "1")
268 .append_pair("include_can_dm", "1")
269 .append_pair("include_can_media_tag", "1")
270 .append_pair("skip_status", "1")
271 .append_pair("cards_platform", "Web-12")
272 .append_pair("include_cards", "1")
273 .append_pair("include_ext_alt_text", "true")
274 .append_pair("include_quote_count", "true")
275 .append_pair("include_reply_count", "1")
276 .append_pair("tweet_mode", "extended")
277 .append_pair("include_entities", "true")
278 .append_pair("include_user_entities", "true")
279 .append_pair("include_ext_media_color", "true")
280 .append_pair("include_ext_media_availability", "true")
281 .append_pair("send_error_codes", "true")
282 .append_pair("simple_quoted_tweet", "true")
283 .append_pair("query_source", "typed_query")
284 .append_pair("pc", "1")
285 .append_pair("spelling_corrections", "1")
286 .append_pair("ext", "mediaStats%2ChighlightedLabel")
287 .append_pair("count", "20")
288 .append_pair("tweet_search_mode", "live")
289 .append_pair("q", query.as_ref());
290
291 if let Some(cursor) = cursor {
292 url.query_pairs_mut().append_pair("cursor", cursor);
293 }
294
295 static RETRY_STATUS: Lazy<Vec<StatusCode>> =
296 Lazy::new(|| [StatusCode::TOO_MANY_REQUESTS, StatusCode::REQUEST_TIMEOUT].into());
297 let json = loop {
298 let response = client
299 .get(url.as_str())
300 .send()
301 .await
302 .map_err(|e| Error::Network(e.to_string()))?;
303 if response.status().is_success() {
304 break response
305 .json::<Value>()
306 .await
307 .map_err(|e| Error::TweetParse(e.to_string()))?;
308 }
309
310 if response.status().is_server_error() || RETRY_STATUS.contains(&response.status()) {
311 eprintln!(
312 "received response status code: {}, waiting 60 seconds",
313 response.status().as_u16()
314 );
315 tokio::time::sleep(Duration::from_secs(60)).await;
316 } else {
317 return Err(Error::BadStatus(response.status().as_u16()));
318 }
319 };
320
321 parse_tweets(json)
322}
323
324fn parse_tweets(json: Value) -> Result<(Vec<Value>, String), Error> {
325 #[derive(Deserialize)]
326 #[serde(rename_all = "camelCase")]
327 struct Root {
328 global_objects: GlobalObjects,
329 timeline: Value,
330 }
331
332 #[derive(Deserialize)]
333 struct GlobalObjects {
334 tweets: BTreeMap<String, Value>,
335 users: BTreeMap<String, Value>,
336 }
337
338 let root: Root = serde_json::from_value(json).map_err(|e| Error::TweetParse(e.to_string()))?;
339
340 let mut tweets = root.global_objects.tweets;
342 let users = root.global_objects.users;
343 for (_, tweet) in tweets.iter_mut() {
344 if let Some(tweet) = tweet.as_object_mut() {
345 if let Some(user_id_str) = tweet["user_id_str"].as_str() {
346 if let Some(user) = users.get(user_id_str) {
347 tweet.insert("user".to_owned(), user.clone());
348 }
349 }
350 }
351 }
352 let tweets: Vec<_> = tweets.into_values().rev().collect();
353
354 let timeline_str =
356 serde_json::to_string(&root.timeline).map_err(|e| Error::TweetParse(e.to_string()))?;
357 let cursor_re = regex::Regex::new(r#""(scroll:.+?)""#)
358 .map_err(|_| Error::Internal("could not build regex".into()))?;
359 let cursor = cursor_re
360 .captures(&timeline_str)
361 .ok_or_else(|| Error::TweetParse("could not find cursor".into()))?
362 .get(1)
363 .unwrap()
364 .as_str()
365 .to_owned();
366
367 Ok((tweets, cursor))
368}