#![allow(unused_assignments)]
static USAGE: &str = r#"
Fetches data from web services for every row using HTTP Get.
Fetch is integrated with `jql` to directly parse out values from an API JSON response.
<url-column> needs to be a fully qualified URL path. Alternatively, you can dynamically
construct URLs for each CSV record with the --url-template option (see Examples below).
To use a proxy, please set env vars HTTP_PROXY and HTTPS_PROXY
(e.g. export HTTPS_PROXY=socks5://127.0.0.1:1086).
Fetch caches responses to minimize traffic and maximize performance. By default, it uses
a non-persistent memoized cache for each fetch session.
For persistent, inter-session caching, Redis is supported with the --redis flag.
By default, it will connect to a local Redis instance at redis://127.0.0.1:6379/1,
with a cache expiry Time-to-Live (TTL) of 2,419,200 seconds (28 days),
and cache hits NOT refreshing the TTL of cached values.
Set the environment variables QSV_REDIS_CONNSTR, QSV_REDIS_TTL_SECONDS and
QSV_REDIS_TTL_REFRESH to change default Redis settings.
Supports brotli, gzip and deflate automatic decompression for improved throughput and
performance, preferring brotli over gzip over deflate.
Automatically upgrades its connection to HTTP/2 with adaptive flow control as well
if the server supports it.
See https://www.cloudflare.com/learning/performance/http2-vs-http1.1/ and
https://medium.com/coderscorner/http-2-flow-control-77e54f7fd518 for more info.
EXAMPLES USING THE URL-COLUMN ARGUMENT:
data.csv
URL
https://api.zippopotam.us/us/90210
https://api.zippopotam.us/us/94105
https://api.zippopotam.us/us/92802
Given the data.csv above, fetch the JSON response.
$ qsv fetch URL data.csv
Note the output will be a JSONL file - with a minified JSON response per line, not a CSV file.
Now, if we want to generate a CSV file with the parsed City and State, we use the
new-column and jql options. (See https://github.com/yamafaktory/jql#jql for more info on
how to use the jql JSON Query Language)
$ qsv fetch URL --new-column CityState --jql '"places"[0]."place name","places"[0]."state abbreviation"'
data.csv > data_with_CityState.csv
data_with_CityState.csv
URL, CityState,
https://api.zippopotam.us/us/90210, "Beverly Hills, CA"
https://api.zippopotam.us/us/94105, "San Francisco, CA"
https://api.zippopotam.us/us/92802, "Anaheim, CA"
As you can see, entering jql selectors on the command line is error prone and can quickly become cumbersome.
Alternatively, the jql selector can be saved and loaded from a file using the --jqlfile option.
$ qsv fetch URL --new-column CityState --jqlfile places.jql data.csv > datatest.csv
EXAMPLES USING THE --URL-TEMPLATE OPTION:
Instead of using hardcoded URLs, you can also dynamically construct the URL for each CSV row using CSV column
values in that row.
Exanple 1:
For example, we have a CSV with four columns and we want to geocode against the geocode.earth API that expects
latitude and longitude passed as URL parameters.
addr_data.csv
location, description, latitude, longitude
Home, "house is not a home when there's no one there", 40.68889829703977, -73.99589368107037
X, "marks the spot", 40.78576117777992, -73.96279560368552
work, "moolah", 40.70692672280804, -74.0112264146281
school, "exercise brain", 40.72916494539206, -73.99624185993626
gym, "exercise muscles", 40.73947342617386, -73.99039923885411
Geocode addresses in addr_data.csv, pass the latitude and longitude fields and store
the response in a new column called response into enriched_addr_data.csv.
$ qsv fetch --url-template "https://api.geocode.earth/v1/reverse?point.lat={latitude}&point.lon={longitude}"
addr_data.csv -c response > enriched_addr_data.csv
Example 2:
Geocode addresses in addresses.csv, pass the "street address" and "zip-code" fields
and use jql to parse placename from the JSON response into a new column in addresses_with_placename.csv.
Note how field name non-alphanumeric characters (space and hyphen) in the url-template were replaced with _.
$ qsv fetch --jql '"features"[0]."properties","name"' addresses.csv -c placename --url-template
"https://api.geocode.earth/v1/search/structured?address={street_address}&postalcode={zip_code}"
> addresses_with_placename.csv
USING THE HTTP-HEADER OPTION:
The --http-header option allows you to append arbitrary key value pairs (a valid pair is a key and value separated by a colon)
to the HTTP header (to authenticate against an API, pass custom header fields, etc.). Note that you can
pass as many key-value pairs by using --http-header option repeatedly. For example:
$ qsv fetch URL data.csv --http-header "X-Api-Key:TEST_KEY" -H "X-Api-Secret:ABC123XYZ" -H "Accept-Language: fr-FR"
For more extensive examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_fetch.rs.
Usage:
qsv fetch [<url-column> | --url-template <template>] [--jql <selector> | --jqlfile <file>] [--http-header <k:v>...] [options] [<input>]
qsv fetch --help
Fetch options:
<url-column> Name of the column with the URL.
Mutually exclusive with --url-template.
--url-template <template> URL template to use. Use column names enclosed with
curly braces to insert the CSV data for a record.
Mutually exclusive with url-column.
-c, --new-column <name> Put the fetched values in a new column. Specifying this option
results in a CSV. Otherwise, the output is in JSONL format.
--jql <selector> Apply jql selector to API returned JSON value.
Mutually exclusive with --jqlfile,
--jqlfile <file> Load jql selector from file instead.
Mutually exclusive with --jql.
--pretty Prettify JSON responses. Otherwise, they're minified.
If the response is not in JSON format, it's passed through.
Note that --pretty requires the --new-column option.
--rate-limit <qps> Rate Limit in Queries Per Second (max: 1000). Note that fetch
dynamically throttles as well based on rate-limit and
retry-after response headers.
Set to 0 to go as fast as possible, automatically throttling as required.
CAUTION: Only use zero for APIs that use RateLimit and/or Retry-After headers,
otherwise your fetch job may look like a Denial Of Service attack.
Even though zero is the default, this is mitigated by --max-errors having a
default of 10.
[default: 0 ]
--timeout <seconds> Timeout for each URL request.
[default: 15 ]
-H, --http-header <k:v> Append custom header(s) to the HTTP header. Pass multiple key-value pairs
by adding this option multiple times, once for each pair. The key and value
should be separated by a colon.
--max-retries <count> Maximum number of retries per record before an error is raised.
[default: 5]
--max-errors <count> Maximum number of errors before aborting.
Set to zero (0) to continue despite errors.
[default: 10 ]
--store-error On error, store error code/message instead of blank value.
--cache-error Cache error responses even if a request fails. If an identical URL is requested,
the cached error is returned. Otherwise, the fetch is attempted again
for --max-retries.
--cookies Allow cookies.
--user-agent <agent> Specify a custom user agent. Try to follow the syntax here -
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
--report <d|s> Creates a report of the fetch job. The report has the same name as the input file
with the ".fetch-report" suffix.
There are two kinds of report - d for "detailed" & s for "short". The detailed
report has the same columns as the input CSV with six additional columns -
qsv_fetch_url, qsv_fetch_status, qsv_fetch_cache_hit, qsv_fetch_retries,
qsv_fetch_elapsed_ms & qsv_fetch_response.
The short report only has the six columns without the "qsv_fetch_" prefix.
[default: none]
--redis Use Redis to cache responses. It connects to "redis://127.0.0.1:6379/1"
with a connection pool size of 20, with a TTL of 28 days, and a cache hit
NOT renewing an entry's TTL.
Adjust the QSV_REDIS_CONNSTR, QSV_REDIS_MAX_POOL_SIZE, QSV_REDIS_TTL_SECONDS &
QSV_REDIS_TTL_REFRESH respectively to change Redis settings.
--flushdb Flush all the keys in the current Redis database on startup.
This option is ignored if the --redis option is NOT enabled.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will not be interpreted
as headers. Namely, it will be sorted with the rest
of the rows. Otherwise, the first row will always
appear as the header row in the output.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. (default: ,)
-p, --progressbar Show progress bars. Not valid for stdin.
"#;
use std::{fs, num::NonZeroU32, thread, time};
use cached::{
proc_macro::{cached, io_cached},
Cached, IOCached, RedisCache, Return,
};
use dynfmt::Format;
use governor::{
clock::DefaultClock,
middleware::NoOpMiddleware,
state::{direct::NotKeyed, InMemoryState},
Quota, RateLimiter,
};
use indicatif::{HumanCount, MultiProgress, ProgressBar, ProgressDrawTarget};
use log::{
debug, error, info, log_enabled, warn,
Level::{Debug, Info, Trace, Warn},
};
use once_cell::sync::{Lazy, OnceCell};
use rand::Rng;
use redis;
use regex::Regex;
use reqwest::{
blocking::Client,
header::{HeaderMap, HeaderName, HeaderValue},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use url::Url;
use crate::{
config::{Config, Delimiter},
regex_once_cell,
select::SelectColumns,
util, CliError, CliResult,
};
#[derive(Deserialize, Debug)]
struct Args {
flag_url_template: Option<String>,
flag_new_column: Option<String>,
flag_jql: Option<String>,
flag_jqlfile: Option<String>,
flag_pretty: bool,
flag_rate_limit: u32,
flag_timeout: u64,
flag_http_header: Vec<String>,
flag_max_retries: u8,
flag_max_errors: u64,
flag_store_error: bool,
flag_cache_error: bool,
flag_cookies: bool,
flag_user_agent: Option<String>,
flag_report: String,
flag_redis: bool,
flag_flushdb: bool,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_progressbar: bool,
arg_url_column: SelectColumns,
arg_input: Option<String>,
}
static DEFAULT_REDIS_CONN_STR: &str = "redis://127.0.0.1:6379/1";
static DEFAULT_REDIS_TTL_SECS: u64 = 60 * 60 * 24 * 28; static DEFAULT_REDIS_POOL_SIZE: u32 = 20;
static TIMEOUT_SECS: OnceCell<u64> = OnceCell::new();
const FETCH_REPORT_PREFIX: &str = "qsv_fetch_";
const FETCH_REPORT_SUFFIX: &str = ".fetch-report.tsv";
static DEFAULT_ACCEPT_ENCODING: &str = "br;q=1.0, gzip;q=0.6, deflate;q=0.4, *;q=0.2";
const MINIMUM_WAIT_MS: u64 = 10;
const MIN_WAIT: time::Duration = time::Duration::from_millis(MINIMUM_WAIT_MS);
#[derive(PartialEq)]
enum ReportKind {
Detailed,
Short,
None,
}
impl From<reqwest::Error> for CliError {
fn from(err: reqwest::Error) -> CliError {
CliError::Other(err.to_string())
}
}
struct RedisConfig {
conn_str: String,
max_pool_size: u32,
ttl_secs: u64,
ttl_refresh: bool,
}
impl RedisConfig {
fn load() -> Self {
Self {
conn_str: std::env::var("QSV_REDIS_CONNSTR")
.unwrap_or_else(|_| DEFAULT_REDIS_CONN_STR.to_string()),
max_pool_size: std::env::var("QSV_REDIS_MAX_POOL_SIZE")
.unwrap_or_else(|_| DEFAULT_REDIS_POOL_SIZE.to_string())
.parse()
.unwrap_or(DEFAULT_REDIS_POOL_SIZE),
ttl_secs: std::env::var("QSV_REDIS_TTL_SECS")
.unwrap_or_else(|_| DEFAULT_REDIS_TTL_SECS.to_string())
.parse()
.unwrap_or(DEFAULT_REDIS_TTL_SECS),
ttl_refresh: std::env::var("QSV_REDIS_TTL_REFRESH").is_ok(),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct FetchResponse {
response: String,
status_code: u16,
retries: u8,
}
static REDISCONFIG: Lazy<RedisConfig> = Lazy::new(RedisConfig::load);
static JQL_GROUPS: once_cell::sync::OnceCell<Vec<jql::Group>> = OnceCell::new();
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
if args.flag_timeout > 3_600 {
return fail!("Timeout cannot be more than 3,600 seconds (1 hour).");
} else if args.flag_timeout == 0 {
return fail!("Timeout cannot be zero.");
}
info!("TIMEOUT: {} secs", args.flag_timeout);
TIMEOUT_SECS.set(args.flag_timeout).unwrap();
if args.flag_redis {
let conn_str = &REDISCONFIG.conn_str;
let redis_client = match redis::Client::open(conn_str.to_string()) {
Ok(rc) => rc,
Err(e) => {
return fail_clierror!(r#"Invalid Redis connection string "{conn_str}": {e:?}"#)
}
};
let mut redis_conn;
match redis_client.get_connection() {
Err(e) => {
return fail_clierror!(r#"Cannot connect to Redis using "{conn_str}": {e:?}"#)
}
Ok(x) => redis_conn = x,
}
if args.flag_flushdb {
redis::cmd("FLUSHDB").execute(&mut redis_conn);
info!("flushed Redis database.");
}
}
let mut rconfig = Config::new(&args.arg_input)
.delimiter(args.flag_delimiter)
.trim(csv::Trim::All)
.checkutf8(false)
.no_headers(args.flag_no_headers);
let mut rdr = rconfig.reader()?;
let mut wtr = if args.flag_new_column.is_some() {
Config::new(&args.flag_output).writer()?
} else {
Config::new(&args.flag_output)
.quote_style(csv::QuoteStyle::Never)
.flexible(true)
.writer()?
};
let mut headers = rdr.byte_headers()?.clone();
let include_existing_columns = if let Some(name) = args.flag_new_column {
headers.push_field(name.as_bytes());
wtr.write_byte_record(&headers)?;
true
} else {
if args.flag_pretty {
return fail!("The --pretty option requires the --new-column option.");
}
false
};
let mut column_index = 0_usize;
if args.flag_url_template.is_none() {
rconfig = rconfig.select(args.arg_url_column);
let sel = rconfig.selection(&headers)?;
column_index = *sel.iter().next().unwrap();
if sel.len() != 1 {
return fail!("Only a single URL column may be selected.");
}
}
let mut dynfmt_url_template = String::new();
if let Some(ref url_template) = args.flag_url_template {
if args.flag_no_headers {
return fail!("--url-template option requires column headers.");
}
let str_headers = rdr.headers()?.clone();
let mut dynfmt_fields = Vec::with_capacity(10);
dynfmt_url_template = url_template.to_string();
let (safe_headers, _) =
util::safe_header_names(&str_headers, false, false, &Vec::new(), "");
let formatstr_re: &'static Regex = regex_once_cell!(r"\{(?P<key>\w+)?\}");
for format_fields in formatstr_re.captures_iter(url_template) {
dynfmt_fields.push(format_fields.name("key").unwrap().as_str());
}
dynfmt_fields.sort_unstable();
for (i, field) in safe_headers.into_iter().enumerate() {
if dynfmt_fields.binary_search(&field.as_str()).is_ok() {
let field_with_curly = format!("{{{field}}}");
let field_index = format!("{{{i}}}");
dynfmt_url_template = dynfmt_url_template.replace(&field_with_curly, &field_index);
}
}
debug!("dynfmt_fields: {dynfmt_fields:?} url_template: {dynfmt_url_template}");
}
let rate_limit = match args.flag_rate_limit {
0 => NonZeroU32::new(u32::MAX).unwrap(),
1..=1000 => NonZeroU32::new(args.flag_rate_limit).unwrap(),
_ => return fail!("Rate Limit should be between 0 to 1000 queries per second."),
};
info!("RATE LIMIT: {rate_limit}");
let user_agent = match args.flag_user_agent {
Some(ua) => match HeaderValue::from_str(ua.as_str()) {
Ok(_) => ua,
Err(e) => return fail_clierror!("Invalid user-agent value: {e}"),
},
None => util::DEFAULT_USER_AGENT.to_string(),
};
info!("USER-AGENT: {user_agent}");
let http_headers: HeaderMap = {
let mut map = HeaderMap::with_capacity(args.flag_http_header.len() + 1);
for header in args.flag_http_header {
let vals: Vec<&str> = header.split(':').collect();
if vals.len() != 2 {
return fail_clierror!(
"{vals:?} is not a valid key-value pair. Expecting a key and a value \
separated by a colon."
);
}
let k: String = String::from(vals[0].trim());
let header_name: HeaderName =
match HeaderName::from_lowercase(k.to_lowercase().as_bytes()) {
Ok(h) => h,
Err(e) => return fail_clierror!("Invalid header name: {e}"),
};
let v: String = String::from(vals[1].trim());
let header_val: HeaderValue = match HeaderValue::from_str(v.as_str()) {
Ok(v) => v,
Err(e) => return fail_clierror!("Invalid header value: {e}"),
};
map.append(header_name, header_val);
}
map.append(
reqwest::header::ACCEPT_ENCODING,
HeaderValue::from_str(DEFAULT_ACCEPT_ENCODING).unwrap(),
);
map
};
debug!("HTTP Header: {http_headers:?}");
let client_timeout = time::Duration::from_secs(*TIMEOUT_SECS.get().unwrap_or(&30));
let client = Client::builder()
.user_agent(user_agent)
.default_headers(http_headers)
.cookie_store(args.flag_cookies)
.brotli(true)
.gzip(true)
.deflate(true)
.use_rustls_tls()
.http2_adaptive_window(true)
.connection_verbose(log_enabled!(Debug) || log_enabled!(Trace))
.timeout(client_timeout)
.build()?;
let limiter =
RateLimiter::direct(Quota::per_second(rate_limit).allow_burst(NonZeroU32::new(1).unwrap()));
let show_progress =
(args.flag_progressbar || std::env::var("QSV_PROGRESSBAR").is_ok()) && !rconfig.is_stdin();
let multi_progress = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(5));
let progress = multi_progress.add(ProgressBar::new(0));
let mut record_count = 0;
let error_progress = multi_progress.add(ProgressBar::new(args.flag_max_errors));
if args.flag_max_errors > 0 && show_progress {
console::set_colors_enabled(true); error_progress.set_style(
indicatif::ProgressStyle::default_bar()
.template("{bar:37.red/white} {percent}%{msg} ({per_sec:7})")
.unwrap(),
);
error_progress.set_message(format!(
" of {} max errors",
HumanCount(args.flag_max_errors)
));
} else {
error_progress.set_draw_target(ProgressDrawTarget::hidden());
}
if show_progress {
record_count = util::count_rows(&rconfig)?;
util::prep_progress(&progress, record_count);
} else {
multi_progress.set_draw_target(ProgressDrawTarget::hidden());
}
let jql_selector: Option<String> = match args.flag_jqlfile {
Some(ref jql_file) => Some(fs::read_to_string(jql_file)?),
None => args.flag_jql.as_ref().map(std::string::ToString::to_string),
};
let report = if args.flag_report.to_lowercase().starts_with('d') {
ReportKind::Detailed
} else if args.flag_report.to_lowercase().starts_with('s') {
ReportKind::Short
} else {
ReportKind::None
};
let mut report_wtr;
let report_path;
if report == ReportKind::None {
report_wtr = Config::new(&Some("sink".to_string())).writer()?;
report_path = String::new();
} else {
report_path = args
.arg_input
.clone()
.unwrap_or_else(|| "stdin.csv".to_string());
report_wtr = Config::new(&Some(report_path.clone() + FETCH_REPORT_SUFFIX)).writer()?;
let mut report_headers = if report == ReportKind::Detailed {
headers.clone()
} else {
csv::ByteRecord::new()
};
let rptcol_prefix = if report == ReportKind::Detailed {
FETCH_REPORT_PREFIX
} else {
""
};
report_headers.push_field(format!("{rptcol_prefix}url").as_bytes());
report_headers.push_field(format!("{rptcol_prefix}status").as_bytes());
report_headers.push_field(format!("{rptcol_prefix}cache_hit").as_bytes());
report_headers.push_field(format!("{rptcol_prefix}retries").as_bytes());
report_headers.push_field(format!("{rptcol_prefix}elapsed_ms").as_bytes());
report_headers.push_field(format!("{rptcol_prefix}response").as_bytes());
report_wtr.write_byte_record(&report_headers)?;
}
let mut record = csv::ByteRecord::new();
let mut jsonl_record = csv::ByteRecord::new();
let mut report_record = csv::ByteRecord::new();
let mut url = String::with_capacity(100);
let mut record_vec: Vec<String> = Vec::with_capacity(headers.len());
let mut redis_cache_hits: u64 = 0;
let mut intermediate_redis_value: Return<String> = Return {
was_cached: false,
value: String::new(),
};
let mut intermediate_value: Return<FetchResponse> = Return {
was_cached: false,
value: FetchResponse {
response: String::new(),
status_code: 0_u16,
retries: 0_u8,
},
};
let mut final_value = String::with_capacity(150);
let mut final_response = FetchResponse {
response: String::new(),
status_code: 0_u16,
retries: 0_u8,
};
let empty_response = FetchResponse {
response: String::new(),
status_code: 0_u16,
retries: 0_u8,
};
let mut running_error_count = 0_u64;
let mut running_success_count = 0_u64;
let mut was_cached;
let mut now = time::Instant::now();
while rdr.read_byte_record(&mut record)? {
if show_progress {
progress.inc(1);
}
if report != ReportKind::None {
now = time::Instant::now();
};
if args.flag_url_template.is_some() {
record_vec.clear();
for field in &record {
record_vec.push(unsafe { std::str::from_utf8_unchecked(field).to_owned() });
}
if let Ok(formatted) =
dynfmt::SimpleCurlyFormat.format(&dynfmt_url_template, &*record_vec)
{
url = formatted.into_owned();
}
} else if let Ok(s) = std::str::from_utf8(&record[column_index]) {
s.clone_into(&mut url);
} else {
url = String::new();
}
if url.is_empty() {
final_response.clone_from(&empty_response);
was_cached = false;
} else if args.flag_redis {
intermediate_redis_value = get_redis_response(
&url,
&client,
&limiter,
&jql_selector,
args.flag_store_error,
args.flag_pretty,
include_existing_columns,
args.flag_max_retries,
)?;
was_cached = intermediate_redis_value.was_cached;
if was_cached {
redis_cache_hits += 1;
}
final_response = match serde_json::from_str(&intermediate_redis_value) {
Ok(r) => r,
Err(e) => {
return fail_clierror!(
"Cannot deserialize Redis cache value. Try flushing the Redis cache with \
--flushdb: {e}"
)
}
};
if !args.flag_cache_error && final_response.status_code != 200 {
let key = format!(
"{}{:?}{}{}{}",
url,
jql_selector,
args.flag_store_error,
args.flag_pretty,
include_existing_columns
);
if GET_REDIS_RESPONSE.cache_remove(&key).is_err() && log_enabled!(Warn) {
warn!(r#"Cannot remove Redis key "{key}""#);
};
}
} else {
intermediate_value = get_cached_response(
&url,
&client,
&limiter,
&jql_selector,
args.flag_store_error,
args.flag_pretty,
include_existing_columns,
args.flag_max_retries,
);
final_response = intermediate_value.value;
was_cached = intermediate_value.was_cached;
if !args.flag_cache_error && final_response.status_code != 200 {
let mut cache = GET_CACHED_RESPONSE.lock().unwrap();
cache.cache_remove(&url);
}
};
if final_response.status_code == 200 {
running_success_count += 1;
} else {
running_error_count += 1;
error_progress.inc(1);
}
final_value.clone_from(&final_response.response);
if include_existing_columns {
record.push_field(final_value.as_bytes());
wtr.write_byte_record(&record)?;
} else {
jsonl_record.clear();
if final_value.is_empty() {
jsonl_record.push_field(b"{}");
} else {
jsonl_record.push_field(final_value.as_bytes());
}
wtr.write_byte_record(&jsonl_record)?;
}
if report != ReportKind::None {
if report == ReportKind::Detailed {
report_record.clone_from(&record);
} else {
report_record.clear();
}
report_record.push_field(url.as_bytes());
report_record.push_field(final_response.status_code.to_string().as_bytes());
report_record.push_field(if was_cached { b"1" } else { b"0" });
report_record.push_field(final_response.retries.to_string().as_bytes());
report_record.push_field(now.elapsed().as_millis().to_string().as_bytes());
if include_existing_columns {
report_record.push_field(final_value.as_bytes());
} else {
report_record.push_field(jsonl_record.as_slice());
}
report_wtr.write_byte_record(&report_record)?;
}
if args.flag_max_errors > 0 && running_error_count >= args.flag_max_errors {
break;
}
}
report_wtr.flush()?;
if show_progress {
if args.flag_redis {
util::update_cache_info!(progress, redis_cache_hits, record_count);
} else {
util::update_cache_info!(progress, GET_CACHED_RESPONSE);
}
util::finish_progress(&progress);
if running_error_count == 0 {
error_progress.finish_and_clear();
} else if running_error_count >= args.flag_max_errors {
error_progress.finish();
thread::sleep(time::Duration::from_nanos(10));
let abort_msg = format!(
"{} max errors. Fetch aborted.",
HumanCount(args.flag_max_errors)
);
winfo!("{abort_msg}");
} else {
error_progress.abandon();
}
}
let mut end_msg = format!(
"{} records successfully fetched as {}. {} errors.",
HumanCount(running_success_count),
if include_existing_columns {
"CSV"
} else {
"JSONL"
},
HumanCount(running_error_count)
);
if report != ReportKind::None {
use std::fmt::Write;
write!(
&mut end_msg,
" {} report created: \"{}{FETCH_REPORT_SUFFIX}\"",
if report == ReportKind::Detailed {
"Detailed"
} else {
"Short"
},
report_path
)
.unwrap();
}
winfo!("{end_msg}");
Ok(wtr.flush()?)
}
#[cached(
size = 2_000_000,
key = "String",
convert = r#"{ format!("{}", url) }"#,
with_cached_flag = true
)]
fn get_cached_response(
url: &str,
client: &reqwest::blocking::Client,
limiter: &governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>,
flag_jql: &Option<String>,
flag_store_error: bool,
flag_pretty: bool,
include_existing_columns: bool,
flag_max_retries: u8,
) -> cached::Return<FetchResponse> {
Return::new(get_response(
url,
client,
limiter,
flag_jql,
flag_store_error,
flag_pretty,
include_existing_columns,
flag_max_retries,
))
}
#[io_cached(
type = "cached::RedisCache<String, String>",
key = "String",
convert = r#"{ format!("{}{:?}{}{}{}", url, flag_jql, flag_store_error, flag_pretty, include_existing_columns) }"#,
create = r##" {
RedisCache::new("f", REDISCONFIG.ttl_secs)
.set_namespace("q")
.set_refresh(REDISCONFIG.ttl_refresh)
.set_connection_string(&REDISCONFIG.conn_str)
.set_connection_pool_max_size(REDISCONFIG.max_pool_size)
.build()
.expect("error building redis cache")
} "##,
map_error = r##"|e| CliError::Other(format!("Redis Error: {:?}", e))"##,
with_cached_flag = true
)]
fn get_redis_response(
url: &str,
client: &reqwest::blocking::Client,
limiter: &governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>,
flag_jql: &Option<String>,
flag_store_error: bool,
flag_pretty: bool,
include_existing_columns: bool,
flag_max_retries: u8,
) -> Result<cached::Return<String>, CliError> {
Ok(Return::new({
serde_json::to_string(&get_response(
url,
client,
limiter,
flag_jql,
flag_store_error,
flag_pretty,
include_existing_columns,
flag_max_retries,
))
.unwrap()
}))
}
#[inline]
fn get_response(
url: &str,
client: &reqwest::blocking::Client,
limiter: &governor::RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>,
flag_jql: &Option<String>,
flag_store_error: bool,
flag_pretty: bool,
include_existing_columns: bool,
flag_max_retries: u8,
) -> FetchResponse {
let valid_url = match Url::parse(url) {
Ok(valid) => valid.to_string(),
Err(e) => {
let url_invalid_err = if flag_store_error {
if include_existing_columns {
format!("Invalid URL: {e}")
} else {
let json_error = json!({
"errors": [{
"title": "Invalid URL",
"detail": e.to_string()
}]
});
format!("{json_error}")
}
} else {
String::new()
};
error!("Invalid URL: Store_error: {flag_store_error} - {url_invalid_err}");
return FetchResponse {
response: url_invalid_err,
status_code: reqwest::StatusCode::NOT_FOUND.as_u16(),
retries: 0_u8,
};
}
};
info!("Using URL: {valid_url}");
let mut limiter_total_wait: u64;
let timeout_secs = unsafe { *TIMEOUT_SECS.get_unchecked() };
let governor_timeout_ms = timeout_secs * 1_000;
let mut retries = 0_u8;
let mut error_flag;
let mut final_value = String::new();
let mut api_status;
let mut api_respheader = HeaderMap::new();
'retry: loop {
limiter_total_wait = 0;
while limiter.check().is_err() {
limiter_total_wait += MINIMUM_WAIT_MS;
thread::sleep(MIN_WAIT);
if limiter_total_wait > governor_timeout_ms {
info!("rate limit timed out after {limiter_total_wait} ms");
break;
} else if limiter_total_wait == MINIMUM_WAIT_MS {
info!("throttling...");
}
}
if log_enabled!(Info) && limiter_total_wait > 0 && limiter_total_wait <= governor_timeout_ms
{
info!("throttled for {limiter_total_wait} ms");
}
if let Ok(resp) = client.get(&valid_url).send() {
api_respheader.clone_from(resp.headers());
api_status = resp.status();
let api_value: String = resp.text().unwrap_or_default();
if api_status.is_client_error() || api_status.is_server_error() {
error_flag = true;
error!(
"HTTP error. url: {valid_url:?}, error: {:?}",
api_status.canonical_reason().unwrap_or("unknown error")
);
if flag_store_error {
final_value = format!(
"HTTP ERROR {} - {}",
api_status.as_str(),
api_status.canonical_reason().unwrap_or("unknown error")
);
} else {
final_value = String::new();
}
} else {
error_flag = false;
if let Some(selectors) = flag_jql {
let jql_groups =
JQL_GROUPS.get_or_init(|| jql::selectors_parser(selectors).unwrap());
match apply_jql(&api_value, jql_groups) {
Ok(s) => {
final_value = s;
}
Err(e) => {
error!(
"jql error. json: {api_value:?}, selectors: {selectors:?}, error: \
{e:?}"
);
if flag_store_error {
final_value = e.to_string();
} else {
final_value = String::new();
}
error_flag = true;
}
}
} else if flag_pretty {
if let Ok(pretty_json) = jsonxf::pretty_print(&api_value) {
final_value = pretty_json;
} else {
final_value = api_value;
}
} else if let Ok(minimized_json) = jsonxf::minimize(&api_value) {
final_value = minimized_json;
} else {
final_value = api_value;
}
}
} else {
error_flag = true;
api_respheader.clear();
api_status = reqwest::StatusCode::BAD_REQUEST;
}
if error_flag
|| (!api_respheader.is_empty()
&& (api_respheader.contains_key("ratelimit-limit")
|| api_respheader.contains_key("x-ratelimit-limit")
|| api_respheader.contains_key("retry-after")))
{
let mut ratelimit_remaining = api_respheader.get("ratelimit-remaining");
if ratelimit_remaining.is_none() {
let temp_var = api_respheader.get("x-ratelimit-remaining");
if temp_var.is_some() {
ratelimit_remaining = temp_var;
}
}
let mut ratelimit_reset = api_respheader.get("ratelimit-reset");
if ratelimit_reset.is_none() {
let temp_var = api_respheader.get("x-ratelimit-reset");
if temp_var.is_some() {
ratelimit_reset = temp_var;
}
}
let mut ratelimit_remaining_sec = api_respheader.get("ratelimit-remaining-second");
if ratelimit_remaining_sec.is_none() {
let temp_var = api_respheader.get("x-ratelimit-remaining-second");
if temp_var.is_some() {
ratelimit_remaining_sec = temp_var;
}
}
let mut ratelimit_reset_sec = api_respheader.get("ratelimit-reset-second");
if ratelimit_reset_sec.is_none() {
let temp_var = api_respheader.get("x-ratelimit-reset-second");
if temp_var.is_some() {
ratelimit_reset_sec = temp_var;
}
}
let retry_after = api_respheader.get("retry-after");
if log_enabled!(Debug) {
let rapidapi_proxy_response = api_respheader.get("X-RapidAPI-Proxy-Response");
debug!(
"api_status:{api_status:?} rate_limit_remaining:{ratelimit_remaining:?} \
{ratelimit_remaining_sec:?} ratelimit_reset:{ratelimit_reset:?} \
{ratelimit_reset_sec:?} retry_after:{retry_after:?} \
rapid_api_proxy_response:{rapidapi_proxy_response:?}"
);
}
let remaining = ratelimit_remaining.map_or_else(
|| {
if let Some(ratelimit_remaining_sec) = ratelimit_remaining_sec {
let remaining_sec_str = ratelimit_remaining_sec.to_str().unwrap();
remaining_sec_str.parse::<u64>().unwrap_or(1)
} else {
9999_u64
}
},
|ratelimit_remaining| {
let remaining_str = ratelimit_remaining.to_str().unwrap();
remaining_str.parse::<u64>().unwrap_or(1)
},
);
let mut reset_secs = ratelimit_reset.map_or_else(
|| {
if let Some(ratelimit_reset_sec) = ratelimit_reset_sec {
let reset_sec_str = ratelimit_reset_sec.to_str().unwrap();
reset_sec_str.parse::<u64>().unwrap_or(1)
} else {
u64::from(error_flag)
}
},
|ratelimit_reset| {
let reset_str = ratelimit_reset.to_str().unwrap();
reset_str.parse::<u64>().unwrap_or(1)
},
);
if let Some(retry_after) = retry_after {
let retry_str = retry_after.to_str().unwrap();
reset_secs = retry_str.parse::<u64>().unwrap_or(timeout_secs);
}
if reset_secs > timeout_secs {
warn!("Reset_secs {reset_secs} > timeout_secs {timeout_secs}.");
break 'retry;
}
if remaining <= 1 || reset_secs >= 1 {
let pause_time =
(reset_secs * 1001) + (retries as u64 * rand::thread_rng().gen_range(10..30));
info!(
"sleeping for {pause_time} ms until ratelimit is reset/retry_after has elapsed"
);
thread::sleep(time::Duration::from_millis(pause_time));
}
if retries >= flag_max_retries {
warn!("{flag_max_retries} max-retries reached.");
break 'retry;
}
retries += 1;
info!("retrying {retries}...");
} else {
break 'retry;
}
}
if error_flag {
if flag_store_error && !include_existing_columns {
let json_error = json!({
"errors": [{
"title": "HTTP ERROR",
"detail": final_value
}]
});
FetchResponse {
response: format!("{json_error}"),
status_code: api_status.as_u16(),
retries,
}
} else {
FetchResponse {
response: String::new(),
status_code: api_status.as_u16(),
retries,
}
}
} else {
FetchResponse {
response: final_value,
status_code: api_status.as_u16(),
retries,
}
}
}
use jql::groups_walker;
use serde_json::{Deserializer, Value};
#[inline]
pub fn apply_jql(json: &str, groups: &[jql::Group]) -> Result<String, String> {
if let Err(error) = serde_json::from_str::<Value>(json) {
return fail_format!("Invalid json: {error:?}");
}
let mut result: Result<String, _> = Ok(String::default());
Deserializer::from_str(json)
.into_iter::<Value>()
.for_each(|value| match value {
Ok(valid_json) => {
match groups_walker(&valid_json, groups) {
Ok(selection) => {
fn get_value_string(v: &Value) -> String {
if v.is_null() {
"null".to_string()
} else if v.is_boolean() {
v.as_bool().unwrap().to_string()
} else if v.is_f64() {
v.as_f64().unwrap().to_string()
} else if v.is_i64() {
v.as_i64().unwrap().to_string()
} else if v.is_u64() {
v.as_u64().unwrap().to_string()
} else if v.is_string() {
v.as_str().unwrap().to_string()
} else {
v.to_string()
}
}
match &selection {
Value::Array(array) => {
let mut concat_string = String::new();
let mut values = array.iter();
if let Some(v) = values.next() {
let str_val = get_value_string(v);
concat_string.push_str(&str_val);
}
for v in values {
let str_val = get_value_string(v);
concat_string.push_str(", ");
concat_string.push_str(&str_val);
}
result = Ok(concat_string);
}
Value::Object(object) => {
result = Ok(format!("{object:?}"));
}
_ => {
result = Ok(get_value_string(&selection));
}
}
}
Err(error) => {
result = Err(format!("{error:?}"));
}
}
}
Err(error) => {
result = Err(format!("Invalid json: {error:?}"));
}
});
result
}
#[test]
fn test_apply_jql_invalid_json() {
let json =
r#"<!doctype html><html lang="en"><meta charset=utf-8><title>shortest html5</title>"#;
let selectors = r#"."places"[0]."place name""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value: String = apply_jql(json, &jql_groups).unwrap_err();
assert_eq!(
"Invalid json: Error(\"expected value\", line: 1, column: 1)",
value
);
}
#[test]
fn test_apply_jql_invalid_selector() {
let json = r#"{"post code": "90210", "country": "United States", "country abbreviation": "US", "places": [{"place name": "Beverly Hills", "longitude": "-118.4065", "state": "California", "state abbreviation": "CA", "latitude": "34.0901"}]}"#;
let selectors = r#"."place"[0]."place name""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value = apply_jql(json, &jql_groups).unwrap_err();
assert_eq!(r#""Node \"place\" not found on the parent element""#, value);
}
#[test]
fn test_apply_jql_string() {
let json = r#"{"post code": "90210", "country": "United States", "country abbreviation": "US", "places": [{"place name": "Beverly Hills", "longitude": "-118.4065", "state": "California", "state abbreviation": "CA", "latitude": "34.0901"}]}"#;
let selectors = r#"."places"[0]."place name""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value: String = apply_jql(json, &jql_groups).unwrap();
assert_eq!("Beverly Hills", value);
}
#[test]
fn test_apply_jql_number() {
let json = r#"{"post code": "90210", "country": "United States", "country abbreviation": "US", "places": [{"place name": "Beverly Hills", "longitude": -118.4065, "state": "California", "state abbreviation": "CA", "latitude": 34.0901}]}"#;
let selectors = r#"."places"[0]."longitude""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value: String = apply_jql(json, &jql_groups).unwrap();
assert_eq!("-118.4065", value);
}
#[test]
fn test_apply_jql_bool() {
let json = r#"{"post code": "90210", "country": "United States", "country abbreviation": "US", "places": [{"place name": "Beverly Hills", "longitude": -118.4065, "state": "California", "state abbreviation": "CA", "latitude": 34.0901, "expensive": true}]}"#;
let selectors = r#"."places"[0]."expensive""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value: String = apply_jql(json, &jql_groups).unwrap();
assert_eq!("true", value);
}
#[test]
fn test_apply_jql_null() {
let json = r#"{"post code": "90210", "country": "United States", "country abbreviation": "US", "places": [{"place name": "Beverly Hills", "longitude": -118.4065, "state": "California", "state abbreviation": "CA", "latitude": 34.0901, "university":null}]}"#;
let selectors = r#"."places"[0]."university""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value: String = apply_jql(json, &jql_groups).unwrap();
assert_eq!("null", value);
}
#[test]
fn test_apply_jql_array() {
let json = r#"{"post code": "90210", "country": "United States", "country abbreviation": "US", "places": [{"place name": "Beverly Hills", "longitude": -118.4065, "state": "California", "state abbreviation": "CA", "latitude": 34.0901}]}"#;
let selectors = r#"."places"[0]."longitude",."places"[0]."latitude""#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value: String = apply_jql(json, &jql_groups).unwrap();
assert_eq!("-118.4065, 34.0901", value);
}
#[test]
fn test_root_out_of_bounds() {
let json = r#"[{"page":1,"pages":1,"per_page":"50","total":1},[{"id":"BRA","iso2Code":"BR","name":"Brazil","region":{"id":"LCN","iso2code":"ZJ","value":"Latin America & Caribbean (all income levels)"},"adminregion":{"id":"LAC","iso2code":"XJ","value":"Latin America & Caribbean (developing only)"},"incomeLevel":{"id":"UMC","iso2code":"XT","value":"Upper middle income"},"lendingType":{"id":"IBD","iso2code":"XF","value":"IBRD"},"capitalCity":"Brasilia","longitude":"-47.9292","latitude":"-15.7801"}]]"#;
let selectors = r#"[2].[0]."incomeLevel"."value"'"#;
let jql_groups = jql::selectors_parser(selectors).unwrap();
let value = apply_jql(json, &jql_groups).unwrap_err();
assert_eq!(
r#""Index [2] is out of bound, root element has a length of 2""#,
value
);
}