mod client;
mod input;
mod output;
use clap::Parser;
use client::Auth;
use fluent_uri::UriRef;
use input::Input;
use output::{BulkAction, ElasticsearchOutputConfig, Output, OutputPreflightConfig};
use std::{path::PathBuf, process::ExitCode};
#[derive(Parser)]
#[command(version)]
struct Cli {
#[arg(
help = "Input URI(s) followed by the output URI",
required = true,
num_args = 2..
)]
paths: Vec<UriRef<String>>,
#[arg(
help = "Content subfield name for file imports",
long,
default_value = "body"
)]
content: String,
#[arg(
help = "Ignore certificate validation",
long,
short = 'k',
default_value = "false"
)]
insecure: bool,
#[arg(help = "Apikey to authenticate via http header", long, short)]
apikey: Option<String>,
#[arg(
help = "Username for basic authentication",
long,
short,
conflicts_with = "apikey",
requires = "password"
)]
username: Option<String>,
#[arg(
help = "Password for basic authentication",
long,
short,
conflicts_with = "apikey",
requires = "username"
)]
password: Option<String>,
#[arg(
help = "Quiet mode, don't print runtime summary",
long,
short = 'q',
default_value = "false"
)]
quiet: bool,
#[arg(
help = "Disable request body gzip compression",
long,
short = 'z',
default_value = "false"
)]
uncompressed: bool,
#[arg(
help = "Bulk action for Elasticsearch outputs",
long,
value_enum,
default_value_t = BulkAction::Create
)]
action: BulkAction,
#[arg(
help = "Documents per Elasticsearch bulk request",
long,
default_value_t = ElasticsearchOutputConfig::DEFAULT_BATCH_SIZE,
value_parser = parse_nonzero_usize
)]
batch_size: usize,
#[arg(
help = "Maximum concurrent Elasticsearch bulk requests",
long,
default_value_t = ElasticsearchOutputConfig::DEFAULT_MAX_INFLIGHT_REQUESTS,
value_parser = parse_nonzero_usize
)]
max_requests: usize,
#[arg(help = "Elasticsearch ingest pipeline JSON or YAML file", long)]
pipeline: Option<PathBuf>,
#[arg(help = "Elasticsearch ingest pipeline name", long)]
pipeline_name: Option<String>,
#[arg(
help = "Composable index template file for Elasticsearch outputs; .json, .jsonc, .json5, .yml, and .yaml are detected by extension, and other extensions are parsed as strict JSON",
long
)]
template: Option<PathBuf>,
#[arg(help = "Composable index template name override", long)]
template_name: Option<String>,
#[arg(help = "Overwrite an existing composable index template", long)]
template_overwrite: Option<bool>,
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> ExitCode {
let start_time = std::time::Instant::now();
let env = env_logger::Env::default().filter_or("LOG_LEVEL", "warn");
env_logger::Builder::from_env(env)
.format_timestamp_millis()
.init();
let args = Cli::parse();
let Cli {
mut paths,
content,
quiet,
insecure,
apikey,
password,
username,
uncompressed,
action,
batch_size,
max_requests,
pipeline,
pipeline_name,
template,
template_name,
template_overwrite,
} = args;
let output = paths.pop().expect("clap requires at least two paths");
let inputs = paths;
if let Err(err) = validate_multi_input_output(&inputs, &output) {
return exit_with_error(err);
}
let auth = match Auth::try_new(apikey, username, password) {
Ok(auth) => auth,
Err(err) => return exit_with_error(err),
};
let elasticsearch_config = match ElasticsearchOutputConfig::try_new(batch_size, max_requests) {
Ok(config) => config,
Err(err) => return exit_with_error(err),
};
let preflight = OutputPreflightConfig {
pipeline,
pipeline_name,
template,
template_name,
template_overwrite,
};
if let Err(err) = preflight.validate() {
return exit_with_error(err);
}
let (mut input, mut output) = if preflight.has_elasticsearch_options() {
let output = match Output::try_new(
insecure,
auth,
output,
action,
!uncompressed,
elasticsearch_config,
preflight,
)
.await
{
Ok(output) => output,
Err(err) => return exit_with_error(err),
};
log::debug!("output: {output}");
let input = match Input::try_new(inputs, content).await {
Ok(input) => input,
Err(err) => return exit_with_error(err),
};
log::debug!("input: {input}");
(input, output)
} else {
let input = match Input::try_new(inputs, content).await {
Ok(input) => input,
Err(err) => return exit_with_error(err),
};
log::debug!("input: {input}");
let output = match Output::try_new(
insecure,
auth,
output,
action,
!uncompressed,
elasticsearch_config,
preflight,
)
.await
{
Ok(output) => output,
Err(err) => return exit_with_error(err),
};
log::debug!("output: {output}");
(input, output)
};
let mut input_line: usize = 0;
let mut output_line: usize = 0;
let output_name = output.to_string();
let mut line_buffer = String::with_capacity(1024);
loop {
let line = match input.read_next(&mut line_buffer) {
Ok(Some(line)) => line,
Ok(None) => break,
Err(err) => return exit_with_error(err),
};
input_line += 1;
match output.send(line).await {
Ok(sent) => output_line += sent,
Err(err) => return exit_with_error(err),
}
line_buffer.clear();
}
output_line += match output.close().await {
Ok(sent) => sent,
Err(err) => return exit_with_error(err),
};
if !quiet {
println!(
"Piped {} of {} docs to {output_name} in {:.3} seconds",
comma_formatted(output_line),
comma_formatted(input_line),
start_time.elapsed().as_secs_f32()
);
}
ExitCode::SUCCESS
}
fn comma_formatted(number: usize) -> String {
let string = number.to_string();
let len = string.len();
let mut result = String::with_capacity(len + len / 3);
for (i, c) in string.chars().enumerate() {
result.push(c);
let pos = len - i - 1;
if pos > 0 && pos % 3 == 0 {
result.push(',');
}
}
result
}
fn exit_with_error(err: eyre::Report) -> ExitCode {
eprintln!("{err}");
ExitCode::FAILURE
}
fn validate_multi_input_output(
inputs: &[UriRef<String>],
output: &UriRef<String>,
) -> eyre::Result<()> {
if inputs.len() <= 1 {
return Ok(());
}
if !inputs.iter().all(is_local_file_input) {
return Ok(());
}
let is_file_output = match output.scheme().map(|scheme| scheme.as_str()) {
Some("file") => true,
None => output.path().as_str() != "-",
_ => false,
};
if !is_file_output {
return Ok(());
}
if is_ndjson_file_output(output.path().as_str()) {
return Ok(());
}
Err(eyre::eyre!(
"multiple file inputs require a file output path ending in .ndjson or .ndjson.gz"
))
}
fn is_ndjson_file_output(path: &str) -> bool {
let lower_path = path.to_ascii_lowercase();
lower_path.ends_with(".ndjson") || lower_path.ends_with(".ndjson.gz")
}
fn is_local_file_input(input: &UriRef<String>) -> bool {
matches!(
input.scheme().map(|scheme| scheme.as_str()),
Some("file") | None
) && input.path().as_str() != "-"
}
fn parse_nonzero_usize(value: &str) -> Result<usize, String> {
let parsed = value.parse::<usize>().map_err(|err| err.to_string())?;
if parsed == 0 {
return Err("value must be at least 1".to_string());
}
Ok(parsed)
}