#![doc = include_str!("../docs/pull_request.md")]
use std::collections::HashSet;
use std::fmt::Write as _;
use std::io::Write;
use std::iter::FromIterator as _;
use std::path::Path;
use crate::utils::csv::*;
use crate::utils::dataframes::u32;
use crate::utils::fs::*;
use crate::utils::github::*;
use crate::utils::github_api::*;
use crate::utils::json::*;
use crate::utils::logger::{log_seed, Logger};
use anyhow::{bail, Context, Error, Result};
use clap::ArgAction;
use clap::{Arg, Command};
use indicatif::ProgressBar;
use json::JsonValue;
use polars::frame::DataFrame;
use polars::prelude::*;
use rand::rngs::StdRng;
use rand::seq::SliceRandom as _;
use rand::SeedableRng;
use tracing::info;
pub fn cli() -> Command {
Command::new("pr")
.about("Collect pull requests of GitHub projects")
.long_about(include_str!("../docs/pull_request.md"))
.author("Andrea Gilot <andrea.gilot@it.uu.se>")
.disable_version_flag(true)
.arg(
Arg::new("output")
.short('o')
.long("output")
.value_name("OUTPUT_FILE.csv")
.help("Path to the output csv file to store the metadata. \
By default, the name of the output file is the same as the input file with the suffix '.pulls.csv'.")
.required(false)
)
.arg(
Arg::new("input")
.short('i')
.long("input")
.value_name("INPUT_FILE.csv")
.help("Path to the input csv file to use. One of the columns must contain the full names of the projects. ")
.required(true)
)
.arg(
Arg::new("tokens")
.short('t')
.long("tokens")
.value_name("TOKENS_FILE.csv")
.help("Path to the file containing the GitHub tokens to use. It must be a valid CSV file with one column named 'token' and where every line is a \
valid GitHub token (e.g ghp_Ab0C1D2eFg3hIjk4LM56oPqRsTuvWX7yZa8B).")
.required(true)
)
.arg(
Arg::new("dest")
.short('d')
.long("dest")
.aliases(["target", "destination"])
.value_name("DESTINATION")
.help("Path to the directory where to store the pull request comments.")
.required(true)
)
.arg(
Arg::new("seed")
.short('s')
.long("seed")
.value_name("SEED")
.help("Seed used to randomly shuffle the input data.")
.default_value("9990520807055774474")
.value_parser(clap::value_parser!(u64)),
)
.arg(
Arg::new("force")
.short('f')
.long("force")
.help("Override the output file if it already exists.")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("ids")
.long("ids")
.help("Name of the column containing the ids of the projects.")
.value_name("COLUMN_NAME")
.default_value("id")
)
.arg(
Arg::new("names")
.long("names")
.help("Name of the column containing the full names of the projects.")
.value_name("COLUMN_NAME")
.default_value("name")
)
.arg(
Arg::new("sub")
.long("sub")
.value_name("NUMBER_OF_PROJECTS")
.help("Number of projects to sample from the input file. \
If not specified, all remaining projects in the input file are used.")
)
}
pub fn run(
input_path: &str,
output_path: Option<&String>,
tokens: &str,
seed: u64,
force: bool,
ids: &str,
names: &str,
target: &str,
sub: Option<usize>,
logger: &Logger,
) -> Result<()> {
logger.log_tokens(tokens)?;
let input_file: DataFrame = logger.run_task("Loading input file", || {
open_csv(
input_path,
Some(Schema::from_iter(vec![
Field::new(ids.into(), DataType::UInt32),
Field::new(names.into(), DataType::String),
])),
Some(vec![ids, names]),
)
})?;
log_seed(seed);
let mut shuffled_idx: Vec<usize> = (0..input_file.height()).collect();
logger.run_task("Loading project IDs in random order", || {
let mut rng: StdRng = SeedableRng::seed_from_u64(seed);
shuffled_idx.shuffle(&mut rng);
Ok(())
})?;
let shuffled_rows = shuffled_idx.into_iter().map(|idx| {
let row = input_file.get_row(idx).unwrap().0;
match (row[0].clone(), row[1].clone()) {
(AnyValue::UInt32(id), AnyValue::String(name)) => Ok((id, name)),
_ => Err(idx),
}
});
let n_pr: usize = input_file.height();
info!(" {} projects found.", n_pr);
let default_output_path: String = format!("{}.pulls.csv", &input_path);
let output_file_path: &str = output_path.unwrap_or(&default_output_path);
let previous_results: HashSet<u32> = if force {
HashSet::new()
} else {
logger.run_task("Resuming progress", || {
Ok(if Path::new(output_file_path).exists() {
let df_res: DataFrame = open_csv(
output_file_path,
Some(Schema::from_iter(vec![Field::new(
ids.into(),
DataType::UInt32,
)])),
Some(vec![ids]),
)?;
u32(&df_res, ids)?.into_iter().collect()
} else {
HashSet::new()
})
})?
};
if !previous_results.is_empty() {
info!(
" the metadata of {} projects have already been queried",
previous_results.len()
);
}
let mut output_file: CSVFile = CSVFile::new(
output_file_path,
if force {
FileMode::Overwrite
} else {
FileMode::Append
},
)?;
output_file.write_header(PRMetadata::header())?;
let gh = Github::new(tokens);
info!("Starting to query the GitHub API...");
let mut n: usize = match sub {
Some(m) => m,
None => n_pr - previous_results.len(),
};
let progress_bar: ProgressBar = ProgressBar::new(n_pr as u64);
progress_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{elapsed} {wide_bar} {percent}%")
.unwrap(),
);
if sub.is_some() {
progress_bar.set_length(n as u64);
}
for row in shuffled_rows {
if n == 0 {
break;
}
match row {
Ok((id, full_name)) => {
if !previous_results.contains(&id) {
let mut pull_requests: String = String::new();
if let Ok(pages) = scrape_pages(
&gh,
&|per_page, page| {
format!("https://api.github.com/repositories/{id}/pulls?state=all&per_page={per_page}&page={page}")
},
&|json| {
let mut pr_metadata: PRMetadata =
PRMetadata::parse_json(&json, (id, target.to_string()))?;
scrape_pr_comments(&gh, id, &pr_metadata).unwrap_or_else(|_| {
pr_metadata.file_path = String::new();
});
Ok(pr_metadata)
},
) {
for pr_res in pages {
let obj: PRMetadata = pr_res.unwrap_or_default();
writeln!(
&mut pull_requests,
"{}",
obj.to_csv((id, full_name.to_string()))
)?;
}
write!(&mut output_file, "{pull_requests}")?;
}
progress_bar.inc(1);
n -= 1;
}
}
Err(idx) => {
bail!("Could not parse row {idx} in the input file")
}
}
}
Ok(())
}
#[derive(Debug, Clone, Eq, PartialEq, Default)]
struct PRMetadata {
pr_number: u32,
file_path: String,
user: String,
user_id: u64,
created_at: u64,
updated_at: u64,
closed_at: u64,
merged_at: u64,
draft: bool,
state: String,
body: String,
}
impl ToCSV for PRMetadata {
type Key = (u32, String);
fn header() -> &'static [&'static str] {
&[
"id",
"name",
"pr_number",
"file_path",
"user",
"user_id",
"created_at",
"updated_at",
"closed_at",
"merged_at",
"draft",
"state",
]
}
fn to_csv(&self, key: Self::Key) -> String {
format!(
"{},{},{},{},{},{},{},{},{},{},{},{}",
key.0,
key.1,
self.pr_number,
self.file_path,
self.user,
self.user_id,
self.created_at,
self.updated_at,
self.closed_at,
self.merged_at,
if self.draft { 1 } else { 0 },
self.state,
)
}
}
impl FromGitHub for PRMetadata {
type Complement = (u32, String);
fn parse_json(json: &JsonValue, complement: Self::Complement) -> Result<Self, Error> {
let pr_number: u32 = get_field::<u32>(json, "number")?;
let created_at: i64 = if field_is_null(json, "created_at")? {
0
} else {
Self::parse_date_time(json, "created_at")?
};
let updated_at: i64 = if field_is_null(json, "updated_at")? {
0
} else {
Self::parse_date_time(json, "updated_at")?
};
let closed_at: i64 = if field_is_null(json, "closed_at")? {
0
} else {
Self::parse_date_time(json, "closed_at")?
};
let merged_at: i64 = if field_is_null(json, "merged_at")? {
0
} else {
Self::parse_date_time(json, "merged_at")?
};
let draft: bool = get_field::<bool>(json, "draft")?;
let state: String = get_field::<String>(json, "state")?;
let user_json: &JsonValue = &json["user"];
let user: String = get_field::<String>(user_json, "login")?;
let user_id: u64 = get_field::<u64>(user_json, "id")?;
let path: String = format!(
"{}/{}/{}/{}_{}.csv",
complement.1,
complement.0 % 10000,
complement.0,
complement.0,
pr_number
);
let body: String = if field_is_null(json, "body")? {
"".to_string()
} else {
clean_string_to_csv(&get_field::<String>(json, "body")?)
};
Ok(Self {
file_path: path,
pr_number,
created_at: created_at as u64,
updated_at: updated_at as u64,
closed_at: closed_at as u64,
merged_at: merged_at as u64,
draft,
state,
user,
user_id,
body,
})
}
}
fn scrape_pages<T>(
gh: &Github,
request: &dyn Fn(usize, usize) -> String,
func: &dyn Fn(JsonValue) -> Result<T, Error>,
) -> Result<Vec<Result<T, Error>>, Error> {
let mut page: usize = 1;
const PER_PAGE: usize = 100;
let mut is_null: bool = false;
let mut items: Vec<Result<T, Error>> = Vec::new();
while !is_null {
let json: JsonValue = gh
.request(&request(PER_PAGE, page))
.with_context(|| format!("Error during GitHub request {}", &request(PER_PAGE, page)))?;
{
if json.is_empty() {
is_null = true;
} else {
items.extend(json.members().map(|item| func(item.clone())));
if items.is_empty() {
is_null = true;
} else {
page += 1;
}
}
}
}
Ok(items)
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum PRCommentType {
Review,
Code,
Discussion,
Body,
Error,
}
#[derive(Debug)]
struct PRComment {
id: i64,
user: String,
user_id: u64,
comment_type: PRCommentType,
created_at: u64,
body: String,
}
impl ToCSV for PRComment {
type Key = ();
fn header() -> &'static [&'static str] {
&["id", "user", "user_id", "type", "created_at", "body"]
}
fn to_csv(&self, _key: Self::Key) -> String {
format!(
"{},{},{},{},{},\"{}\"",
self.id,
self.user,
self.user_id,
match self.comment_type {
PRCommentType::Review => "review",
PRCommentType::Code => "code",
PRCommentType::Discussion => "discussion",
PRCommentType::Body => "body",
PRCommentType::Error => "error",
},
self.created_at,
clean_string_to_csv(&self.body)
)
}
}
impl Default for PRComment {
fn default() -> Self {
Self {
id: -1,
user: String::new(),
user_id: 0,
comment_type: PRCommentType::Error,
created_at: 0,
body: String::new(),
}
}
}
impl FromGitHub for PRComment {
type Complement = PRCommentType;
fn parse_json(json: &JsonValue, complement: PRCommentType) -> Result<Self, Error> {
let id: u64 = get_field::<u64>(json, "id")?;
let user_json = &json["user"];
let user: String = get_field::<String>(user_json, "login")?;
let user_id: u64 = get_field::<u64>(user_json, "id")?;
let created_at: i64 = if complement == PRCommentType::Review {
if field_is_null(json, "submitted_at")? {
0
} else {
PRMetadata::parse_date_time(json, "submitted_at")?
}
} else if field_is_null(json, "created_at")? {
0
} else {
PRMetadata::parse_date_time(json, "created_at")?
};
let body = if field_is_null(json, "body")? {
"".to_string()
} else {
get_field::<String>(json, "body")?
};
Ok(Self {
id: id as i64,
user,
user_id,
comment_type: complement,
created_at: created_at as u64,
body,
})
}
}
fn scrape_pr_comments(gh: &Github, repo_id: u32, pr: &PRMetadata) -> Result<()> {
let mut file_content: String = String::new();
let mut output_file: CSVFile = CSVFile::new(&pr.file_path, FileMode::Overwrite)?;
writeln!(&mut file_content, "{}", PRComment::header().join(","))?;
let pr_body: PRComment = PRComment {
id: 0,
user: pr.user.clone(),
user_id: pr.user_id,
comment_type: PRCommentType::Body,
created_at: pr.created_at,
body: pr.body.clone(),
};
writeln!(&mut file_content, "{}", pr_body.to_csv(()))?;
for t in [
(PRCommentType::Discussion, "issues", "comments"),
(PRCommentType::Code, "pulls", "comments"),
(PRCommentType::Review, "pulls", "reviews"),
] {
for row_res in scrape_pages(
gh,
&|per_page, page| {
format!(
"https://api.github.com/repositories/{}/{}/{}/{}?per_page={}&page={}",
repo_id, t.1, pr.pr_number, t.2, per_page, page
)
},
&|json| Ok(PRComment::parse_json(&json, t.0)?.to_csv(())),
)? {
writeln!(
&mut file_content,
"{}",
row_res.unwrap_or_else(|_| PRComment::default().to_csv(()))
)?;
}
}
write!(&mut output_file, "{file_content}")?;
Ok(())
}
#[cfg(test)]
mod tests {
use anyhow::ensure;
use crate::utils::logger::test_logger;
use super::*;
const TEST_DATA: &str = "tests/data/phases/pull_request";
fn test_phase_pull_request(
input_file: &str,
output_file: &str,
target: &str,
pr_paths: &Vec<String>,
) -> Result<()> {
ensure!(std::path::Path::new(&input_file).exists());
let tokens_file: String = "ghtokens.csv".to_string();
run(
input_file,
Some(&output_file.to_string()),
&tokens_file,
0,
false,
"id",
"name",
target,
None,
test_logger(),
)?;
for pr_path in pr_paths {
let pr_discussion = open_csv(pr_path, None, None)?;
let pr_discussion_expected = open_csv(&format!("{pr_path}.expected"), None, None)?;
assert_eq!(pr_discussion, pr_discussion_expected);
delete_file(pr_path, false)?;
}
let output_df = open_csv(output_file, None, None)?;
let expected_df = open_csv(&format!("{output_file}.expected"), None, None)?;
assert_eq!(expected_df, output_df);
delete_file(output_file, false)
}
#[test]
fn test_pr_empty_output() -> Result<()> {
test_phase_pull_request(
&format!("{TEST_DATA}/repos.csv"),
&format!("{TEST_DATA}/repos.csv.pulls.csv"),
&format!("{TEST_DATA}/prs"),
&vec![
format!("{}/prs/5983/1128315983/1128315983_1.csv", TEST_DATA),
format!("{}/prs/5983/1128315983/1128315983_2.csv", TEST_DATA),
],
)
}
#[test]
fn test_pr_with_output() -> Result<()> {
let input_path: String = format!("{TEST_DATA}/repos2.csv");
std::fs::copy(
format!("{TEST_DATA}/repos_complete.csv.expected"),
format!("{TEST_DATA}/repos_complete.csv"),
)?;
test_phase_pull_request(
&input_path,
&format!("{TEST_DATA}/repos_complete.csv"),
&format!("{TEST_DATA}/prs2"),
&vec![],
)
}
#[test]
fn test_pr_with_partial_output() -> Result<()> {
let input_path: String = format!("{TEST_DATA}/repos3.csv");
let output_path: String = format!("{TEST_DATA}/repos_partial_output.csv.temp");
std::fs::copy(
format!("{TEST_DATA}/repos_partial_output.csv"),
&output_path,
)?;
ensure!(std::path::Path::new(&output_path).exists());
test_phase_pull_request(
&input_path,
&output_path,
&format!("{TEST_DATA}/prs3"),
&vec![],
)
}
#[test]
fn test_language_scraper_inexistent() -> Result<()> {
test_phase_pull_request(
&format!("{TEST_DATA}/invalid.csv"),
&format!("{TEST_DATA}/invalid.csv.pulls.csv"),
&format!("{TEST_DATA}/prs_invalid"),
&vec![],
)
}
}