fluxus_source_gharchive/
gharchive.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use async_trait::async_trait;
3use chrono::{NaiveDate, Utc};
4use fluxus::sources::Source;
5use fluxus::utils::models::{Record, StreamError, StreamResult};
6use futures::TryStreamExt;
7use futures::future::Either;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::io::{Error, ErrorKind};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs::File;
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio_util::io::StreamReader;
16use url::Url;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct Event {
20    pub id: String,
21    #[serde(rename = "type")]
22    pub event_type: String,
23    pub public: bool,
24    pub payload: Value,
25    pub repo: Repo,
26    pub actor: Actor,
27    pub org: Option<Org>,
28    pub created_at: chrono::DateTime<chrono::Utc>,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct Repo {
33    pub id: Option<i64>,
34    pub name: String,
35    pub url: String,
36}
37
38#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct Actor {
40    pub id: Option<i64>,
41    pub login: Option<String>,
42    pub gravatar_id: Option<String>,
43    pub avatar_url: String,
44    pub url: String,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct Org {
49    pub id: Option<i64>,
50    pub login: Option<String>,
51    pub gravatar_id: Option<String>,
52    pub avatar_url: String,
53    pub url: String,
54}
55
56/// A source that reads GitHub archive source files
57pub struct GithubArchiveSource {
58    uri: Url,
59    reader: Option<Box<dyn tokio::io::AsyncBufRead + Unpin + Send + Sync>>,
60    io_timeout: Option<Duration>,
61    start_date: NaiveDate,
62    end_date: Option<NaiveDate>,
63    cur_date: NaiveDate,
64    cur_hour: u32,
65}
66
67impl GithubArchiveSource {
68    /// Create a new GithubArchiveSource with a specific GitHub Archive URL
69    pub fn new<T: TryInto<Url>>(uri: T) -> Option<Self> {
70        let uri = uri.try_into().ok()?;
71        Some(Self {
72            uri,
73            reader: None,
74            io_timeout: Some(Duration::from_secs(10)),
75            start_date: Utc::now().date_naive(),
76            end_date: None,
77            cur_date: Utc::now().date_naive(),
78            cur_hour: 0,
79        })
80    }
81
82    /// Create a new GithubArchiveSource with a given start date
83    /// The source will start processing from 00:00 of the specified date
84    ///
85    /// Date format should be YYYY-MM-DD
86    pub fn from_date(start_date: &str) -> StreamResult<Self> {
87        let start_date = NaiveDate::parse_from_str(start_date, "%Y-%m-%d")
88            .map_err(|e| StreamError::Config(format!("Invalid date format: {}", e)))?;
89
90        let fetch_url = format!("https://data.gharchive.org/{}-{}.json.gz", start_date, 0);
91        let uri = Url::parse(&fetch_url)
92            .map_err(|e| StreamError::Config(format!("Failed to construct URL: {}", e)))?;
93
94        Ok(Self {
95            uri,
96            reader: None,
97            io_timeout: Some(Duration::from_secs(10)),
98            start_date,
99            end_date: None,
100            cur_date: start_date,
101            cur_hour: 0,
102        })
103    }
104
105    /// Create a new GithubArchiveSource with a specific date and hour
106    /// Use this when you need precise control over the starting hour
107    ///
108    /// Date format should be YYYY-MM-DD
109    /// Hour must be between 0 and 23
110    pub fn from_hour(start_date: &str, hour: u32) -> StreamResult<Self> {
111        let start_date = NaiveDate::parse_from_str(start_date, "%Y-%m-%d")
112            .map_err(|e| StreamError::Config(format!("Invalid date format: {}", e)))?;
113
114        if hour > 23 {
115            return Err(StreamError::Config(
116                "Hour must be between 0 and 23".to_string(),
117            ));
118        }
119
120        let fetch_url = format!("https://data.gharchive.org/{}-{}.json.gz", start_date, hour);
121        let uri = Url::parse(&fetch_url)
122            .map_err(|e| StreamError::Config(format!("Failed to construct URL: {}", e)))?;
123
124        Ok(Self {
125            uri,
126            reader: None,
127            io_timeout: Some(Duration::from_secs(10)),
128            start_date,
129            end_date: None,
130            cur_date: start_date,
131            cur_hour: hour,
132        })
133    }
134
135    pub fn from_file<P: Into<PathBuf>>(path: P) -> Option<Self> {
136        let path = path.into();
137        let uri_info = if path.is_absolute() {
138            Url::from_file_path(path).ok()
139        } else {
140            Url::from_file_path(std::env::current_dir().ok()?.join(path)).ok()
141        };
142
143        let start_date = Utc::now().date_naive();
144
145        uri_info.map(|uri| Self {
146            uri,
147            reader: None,
148            io_timeout: None,
149            start_date,
150            end_date: None,
151            cur_date: start_date,
152            cur_hour: 0,
153        })
154    }
155
156    pub fn set_io_timeout(&mut self, io_timeout: Duration) {
157        self.io_timeout = Some(io_timeout);
158    }
159
160    /// Set the start date for data analysis
161    ///
162    /// Date format should be YYYY-MM-DD
163    pub fn set_start_date(&mut self, start_date: &str) -> StreamResult<()> {
164        self.start_date = NaiveDate::parse_from_str(start_date, "%Y-%m-%d")
165            .map_err(|e| StreamError::Config(format!("Invalid start date format: {}", e)))?;
166
167        self.cur_date = self.start_date;
168        self.cur_hour = 0;
169        Ok(())
170    }
171
172    /// Set the end date for data analysis (optional)
173    ///
174    /// Date format should be YYYY-MM-DD
175    pub fn set_end_date(&mut self, end_date: &str) -> StreamResult<()> {
176        let end_date = NaiveDate::parse_from_str(end_date, "%Y-%m-%d")
177            .map_err(|e| StreamError::Config(format!("Invalid end date format: {}", e)))?;
178
179        if end_date < self.start_date {
180            return Err(StreamError::Config(
181                "End date cannot be earlier than start date".to_string(),
182            ));
183        }
184
185        self.end_date = Some(end_date);
186        Ok(())
187    }
188
189    /// Create a URL for a specific date and hour
190    fn build_date_url(&self, date: NaiveDate, hour: u32) -> Url {
191        let date_str = date.format("%Y-%m-%d").to_string();
192        let url_str = format!("https://data.gharchive.org/{}-{}.json.gz", date_str, hour);
193        Url::parse(&url_str).expect("Failed to construct URL")
194    }
195
196    /// Check if we have reached the end date
197    fn is_past_end_date(&self) -> bool {
198        match self.end_date {
199            Some(end_date) => {
200                self.cur_date > end_date || (self.cur_date == end_date && self.cur_hour >= 23)
201            }
202            None => false,
203        }
204    }
205}
206
207impl GithubArchiveSource {
208    async fn init_file(&mut self) -> StreamResult<()> {
209        let file = File::open(self.uri.path()).await?;
210        self.reader = Some(if self.uri.path().ends_with(".gz") {
211            let buf_reader = BufReader::new(file);
212            let decompressed = BufReader::new(GzipDecoder::new(buf_reader));
213            Box::new(decompressed)
214        } else {
215            Box::new(BufReader::new(file))
216        });
217
218        Ok(())
219    }
220
221    async fn init_http(&mut self) -> StreamResult<()> {
222        let client = reqwest::Client::builder()
223            .timeout(self.io_timeout.unwrap_or(Duration::from_secs(10)))
224            .build()
225            .map_err(|_e| {
226                StreamError::Io(Error::new(ErrorKind::Other, "create http client error"))
227            })?;
228
229        let request = client.get(self.uri.clone());
230
231        let response = request.send().await.map_err(|e| {
232            StreamError::Io(Error::new(
233                ErrorKind::Other,
234                format!("HTTP request failed:: {}", e),
235            ))
236        })?;
237
238        if !response.status().is_success() {
239            return Err(StreamError::Io(Error::new(
240                ErrorKind::Other,
241                format!(
242                    "gharchive request failed, http status is {}",
243                    response.status()
244                ),
245            )));
246        }
247
248        let is_gzip = response
249            .headers()
250            .get(reqwest::header::CONTENT_TYPE)
251            .is_some_and(|content_type| content_type == "application/gzip");
252
253        let async_read = response
254            .bytes_stream()
255            .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e));
256
257        let stream_reader = StreamReader::new(async_read);
258        self.reader = Some(Box::new(BufReader::new(if is_gzip {
259            Box::new(GzipDecoder::new(BufReader::new(stream_reader)))
260                as Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync>
261        } else {
262            Box::new(stream_reader) as Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync>
263        })));
264        Ok(())
265    }
266
267    /// Advance to the next hour or day
268    async fn advance_next(&mut self) -> StreamResult<bool> {
269        self.reader = None;
270
271        self.cur_hour = match self.cur_hour {
272            hour if hour < 23 => hour + 1,
273            _ => {
274                self.cur_date = self.cur_date.succ_opt().unwrap();
275
276                if self.is_past_end_date() {
277                    return Ok(false);
278                }
279
280                0
281            }
282        };
283
284        self.uri = self.build_date_url(self.cur_date, self.cur_hour);
285
286        match self.init().await {
287            Ok(_) => Ok(true),
288            Err(e) => {
289                tracing::warn!("Failed to initialize next dataset: {}", e);
290                Box::pin(self.advance_next()).await
291            }
292        }
293    }
294}
295
296#[async_trait]
297impl Source<Event> for GithubArchiveSource {
298    async fn init(&mut self) -> StreamResult<()> {
299        match self.uri.scheme() {
300            "http" | "https" => self.init_http().await,
301            "file" => self.init_file().await,
302            _ => Err(StreamError::Io(Error::new(
303                ErrorKind::Other,
304                "not support scheme",
305            ))),
306        }
307    }
308
309    /*
310    archive file downloaded from https://data.gharchive.org/ must be split by CRLF
311    */
312    async fn next(&mut self) -> StreamResult<Option<Record<Event>>> {
313        let result = self
314            .reader
315            .as_mut()
316            .map_or_else(
317                || Either::Left(async { Ok(None) }),
318                |reader| {
319                    Either::Right(async {
320                        let mut line = String::new();
321                        let read_result = reader.read_line(&mut line).await?;
322                        if read_result == 0 {
323                            return Ok(None);
324                        }
325                        let event: Event = serde_json::from_str(&line)?;
326                        Ok(Some(Record::new(event)))
327                    })
328                },
329            )
330            .await;
331
332        match result {
333            Ok(Some(record)) => Ok(Some(record)),
334            Ok(None) => {
335                let advanced = self.advance_next().await?;
336                if advanced {
337                    self.next().await
338                } else {
339                    Ok(None)
340                }
341            }
342            Err(e) => Err(e),
343        }
344    }
345
346    async fn close(&mut self) -> StreamResult<()> {
347        self.reader = None;
348        Ok(())
349    }
350}