1use async_compression::tokio::bufread::GzipDecoder;
2use async_trait::async_trait;
3use chrono::{NaiveDate, Utc};
4use fluxus::sources::Source;
5use fluxus::utils::models::{Record, StreamError, StreamResult};
6use futures::TryStreamExt;
7use futures::future::Either;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::io::{Error, ErrorKind};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs::File;
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio_util::io::StreamReader;
16use url::Url;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct Event {
20 pub id: String,
21 #[serde(rename = "type")]
22 pub event_type: String,
23 pub public: bool,
24 pub payload: Value,
25 pub repo: Repo,
26 pub actor: Actor,
27 pub org: Option<Org>,
28 pub created_at: chrono::DateTime<chrono::Utc>,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct Repo {
33 pub id: Option<i64>,
34 pub name: String,
35 pub url: String,
36}
37
38#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct Actor {
40 pub id: Option<i64>,
41 pub login: Option<String>,
42 pub gravatar_id: Option<String>,
43 pub avatar_url: String,
44 pub url: String,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct Org {
49 pub id: Option<i64>,
50 pub login: Option<String>,
51 pub gravatar_id: Option<String>,
52 pub avatar_url: String,
53 pub url: String,
54}
55
56pub struct GithubArchiveSource {
58 uri: Url,
59 reader: Option<Box<dyn tokio::io::AsyncBufRead + Unpin + Send + Sync>>,
60 io_timeout: Option<Duration>,
61 start_date: NaiveDate,
62 end_date: Option<NaiveDate>,
63 cur_date: NaiveDate,
64 cur_hour: u32,
65}
66
67impl GithubArchiveSource {
68 pub fn new<T: TryInto<Url>>(uri: T) -> Option<Self> {
70 let uri = uri.try_into().ok()?;
71 Some(Self {
72 uri,
73 reader: None,
74 io_timeout: Some(Duration::from_secs(10)),
75 start_date: Utc::now().date_naive(),
76 end_date: None,
77 cur_date: Utc::now().date_naive(),
78 cur_hour: 0,
79 })
80 }
81
82 pub fn from_date(start_date: &str) -> StreamResult<Self> {
87 let start_date = NaiveDate::parse_from_str(start_date, "%Y-%m-%d")
88 .map_err(|e| StreamError::Config(format!("Invalid date format: {}", e)))?;
89
90 let fetch_url = format!("https://data.gharchive.org/{}-{}.json.gz", start_date, 0);
91 let uri = Url::parse(&fetch_url)
92 .map_err(|e| StreamError::Config(format!("Failed to construct URL: {}", e)))?;
93
94 Ok(Self {
95 uri,
96 reader: None,
97 io_timeout: Some(Duration::from_secs(10)),
98 start_date,
99 end_date: None,
100 cur_date: start_date,
101 cur_hour: 0,
102 })
103 }
104
105 pub fn from_hour(start_date: &str, hour: u32) -> StreamResult<Self> {
111 let start_date = NaiveDate::parse_from_str(start_date, "%Y-%m-%d")
112 .map_err(|e| StreamError::Config(format!("Invalid date format: {}", e)))?;
113
114 if hour > 23 {
115 return Err(StreamError::Config(
116 "Hour must be between 0 and 23".to_string(),
117 ));
118 }
119
120 let fetch_url = format!("https://data.gharchive.org/{}-{}.json.gz", start_date, hour);
121 let uri = Url::parse(&fetch_url)
122 .map_err(|e| StreamError::Config(format!("Failed to construct URL: {}", e)))?;
123
124 Ok(Self {
125 uri,
126 reader: None,
127 io_timeout: Some(Duration::from_secs(10)),
128 start_date,
129 end_date: None,
130 cur_date: start_date,
131 cur_hour: hour,
132 })
133 }
134
135 pub fn from_file<P: Into<PathBuf>>(path: P) -> Option<Self> {
136 let path = path.into();
137 let uri_info = if path.is_absolute() {
138 Url::from_file_path(path).ok()
139 } else {
140 Url::from_file_path(std::env::current_dir().ok()?.join(path)).ok()
141 };
142
143 let start_date = Utc::now().date_naive();
144
145 uri_info.map(|uri| Self {
146 uri,
147 reader: None,
148 io_timeout: None,
149 start_date,
150 end_date: None,
151 cur_date: start_date,
152 cur_hour: 0,
153 })
154 }
155
156 pub fn set_io_timeout(&mut self, io_timeout: Duration) {
157 self.io_timeout = Some(io_timeout);
158 }
159
160 pub fn set_start_date(&mut self, start_date: &str) -> StreamResult<()> {
164 self.start_date = NaiveDate::parse_from_str(start_date, "%Y-%m-%d")
165 .map_err(|e| StreamError::Config(format!("Invalid start date format: {}", e)))?;
166
167 self.cur_date = self.start_date;
168 self.cur_hour = 0;
169 Ok(())
170 }
171
172 pub fn set_end_date(&mut self, end_date: &str) -> StreamResult<()> {
176 let end_date = NaiveDate::parse_from_str(end_date, "%Y-%m-%d")
177 .map_err(|e| StreamError::Config(format!("Invalid end date format: {}", e)))?;
178
179 if end_date < self.start_date {
180 return Err(StreamError::Config(
181 "End date cannot be earlier than start date".to_string(),
182 ));
183 }
184
185 self.end_date = Some(end_date);
186 Ok(())
187 }
188
189 fn build_date_url(&self, date: NaiveDate, hour: u32) -> Url {
191 let date_str = date.format("%Y-%m-%d").to_string();
192 let url_str = format!("https://data.gharchive.org/{}-{}.json.gz", date_str, hour);
193 Url::parse(&url_str).expect("Failed to construct URL")
194 }
195
196 fn is_past_end_date(&self) -> bool {
198 match self.end_date {
199 Some(end_date) => {
200 self.cur_date > end_date || (self.cur_date == end_date && self.cur_hour >= 23)
201 }
202 None => false,
203 }
204 }
205}
206
207impl GithubArchiveSource {
208 async fn init_file(&mut self) -> StreamResult<()> {
209 let file = File::open(self.uri.path()).await?;
210 self.reader = Some(if self.uri.path().ends_with(".gz") {
211 let buf_reader = BufReader::new(file);
212 let decompressed = BufReader::new(GzipDecoder::new(buf_reader));
213 Box::new(decompressed)
214 } else {
215 Box::new(BufReader::new(file))
216 });
217
218 Ok(())
219 }
220
221 async fn init_http(&mut self) -> StreamResult<()> {
222 let client = reqwest::Client::builder()
223 .timeout(self.io_timeout.unwrap_or(Duration::from_secs(10)))
224 .build()
225 .map_err(|_e| {
226 StreamError::Io(Error::new(ErrorKind::Other, "create http client error"))
227 })?;
228
229 let request = client.get(self.uri.clone());
230
231 let response = request.send().await.map_err(|e| {
232 StreamError::Io(Error::new(
233 ErrorKind::Other,
234 format!("HTTP request failed:: {}", e),
235 ))
236 })?;
237
238 if !response.status().is_success() {
239 return Err(StreamError::Io(Error::new(
240 ErrorKind::Other,
241 format!(
242 "gharchive request failed, http status is {}",
243 response.status()
244 ),
245 )));
246 }
247
248 let is_gzip = response
249 .headers()
250 .get(reqwest::header::CONTENT_TYPE)
251 .is_some_and(|content_type| content_type == "application/gzip");
252
253 let async_read = response
254 .bytes_stream()
255 .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e));
256
257 let stream_reader = StreamReader::new(async_read);
258 self.reader = Some(Box::new(BufReader::new(if is_gzip {
259 Box::new(GzipDecoder::new(BufReader::new(stream_reader)))
260 as Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync>
261 } else {
262 Box::new(stream_reader) as Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync>
263 })));
264 Ok(())
265 }
266
267 async fn advance_next(&mut self) -> StreamResult<bool> {
269 self.reader = None;
270
271 self.cur_hour = match self.cur_hour {
272 hour if hour < 23 => hour + 1,
273 _ => {
274 self.cur_date = self.cur_date.succ_opt().unwrap();
275
276 if self.is_past_end_date() {
277 return Ok(false);
278 }
279
280 0
281 }
282 };
283
284 self.uri = self.build_date_url(self.cur_date, self.cur_hour);
285
286 match self.init().await {
287 Ok(_) => Ok(true),
288 Err(e) => {
289 tracing::warn!("Failed to initialize next dataset: {}", e);
290 Box::pin(self.advance_next()).await
291 }
292 }
293 }
294}
295
296#[async_trait]
297impl Source<Event> for GithubArchiveSource {
298 async fn init(&mut self) -> StreamResult<()> {
299 match self.uri.scheme() {
300 "http" | "https" => self.init_http().await,
301 "file" => self.init_file().await,
302 _ => Err(StreamError::Io(Error::new(
303 ErrorKind::Other,
304 "not support scheme",
305 ))),
306 }
307 }
308
309 async fn next(&mut self) -> StreamResult<Option<Record<Event>>> {
313 let result = self
314 .reader
315 .as_mut()
316 .map_or_else(
317 || Either::Left(async { Ok(None) }),
318 |reader| {
319 Either::Right(async {
320 let mut line = String::new();
321 let read_result = reader.read_line(&mut line).await?;
322 if read_result == 0 {
323 return Ok(None);
324 }
325 let event: Event = serde_json::from_str(&line)?;
326 Ok(Some(Record::new(event)))
327 })
328 },
329 )
330 .await;
331
332 match result {
333 Ok(Some(record)) => Ok(Some(record)),
334 Ok(None) => {
335 let advanced = self.advance_next().await?;
336 if advanced {
337 self.next().await
338 } else {
339 Ok(None)
340 }
341 }
342 Err(e) => Err(e),
343 }
344 }
345
346 async fn close(&mut self) -> StreamResult<()> {
347 self.reader = None;
348 Ok(())
349 }
350}