use std::env;
use std::fs::File;
use std::io::{self, BufRead, BufReader, Write};
use std::process;
type WsResult<T> = Result<T, String>;
#[derive(Debug, Clone, PartialEq, Eq)]
struct Field {
label: String,
value: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct Event {
fields: Vec<Field>,
}
impl Event {
fn get(&self, label: &str) -> Option<&str> {
self.fields
.iter()
.find(|field| field.label.eq_ignore_ascii_case(label))
.map(|field| field.value.as_str())
}
fn verbose(&self, separator: &str) -> String {
self.fields
.iter()
.map(|field| format!("[{}]{}", field.label, field.value))
.collect::<Vec<_>>()
.join(separator)
}
fn plain(&self, separator: &str) -> String {
self.fields
.iter()
.map(|field| field.value.as_str())
.collect::<Vec<_>>()
.join(separator)
}
fn json(&self) -> String {
let body = self
.fields
.iter()
.map(|field| {
format!(
"\"{}\":\"{}\"",
escape_json(&field.label),
escape_json(&field.value)
)
})
.collect::<Vec<_>>()
.join(",");
format!("{{{body}}}")
}
}
#[derive(Debug)]
enum Kid {
CsvIn(CsvInKid),
Match(MatchKid),
Noop,
Print(PrintKid),
}
#[derive(Debug)]
struct Pipeline {
kids: Vec<Kid>,
}
impl Pipeline {
fn parse(spec: &str) -> WsResult<Self> {
let stages = split_pipeline(spec)?;
if stages.is_empty() {
return Err("pipeline is empty".to_string());
}
let mut kids = Vec::with_capacity(stages.len());
for stage in stages {
let tokens = tokenize(&stage)?;
let Some((name, args)) = tokens.split_first() else {
continue;
};
let kid = match name.as_str() {
"csv_in" => Kid::CsvIn(CsvInKid::parse(args)?),
"match" => Kid::Match(MatchKid::parse(args)?),
"noop" => Kid::Noop,
"print" | "p" | "pmeta" => Kid::Print(PrintKid::parse(args)?),
other => return Err(format!("unknown kid '{other}'")),
};
kids.push(kid);
}
Ok(Self { kids })
}
fn run<R: BufRead, W: Write>(&self, stdin: &mut R, stdout: &mut W) -> WsResult<()> {
let mut events = Vec::new();
let mut has_source = false;
for kid in &self.kids {
match kid {
Kid::CsvIn(csv_in) => {
if has_source {
return Err("this prototype supports one source kid per pipeline".to_string());
}
events = csv_in.read(stdin)?;
has_source = true;
}
Kid::Match(matcher) => {
require_source(has_source, "match")?;
events.retain(|event| matcher.matches(event));
}
Kid::Noop => {
require_source(has_source, "noop")?;
}
Kid::Print(print) => {
require_source(has_source, "print")?;
print.write(&events, stdout)?;
}
}
}
if !has_source {
return Err("pipeline needs a source kid, for example: csv_in -i NAME AGE".to_string());
}
Ok(())
}
}
#[derive(Debug)]
enum CsvInput {
Stdin,
File(String),
}
#[derive(Debug)]
struct CsvInKid {
delimiter: char,
input: CsvInput,
labels: Vec<String>,
}
impl CsvInKid {
fn parse(args: &[String]) -> WsResult<Self> {
let mut delimiter = ',';
let mut input = CsvInput::Stdin;
let mut labels = Vec::new();
let mut index = 0;
while index < args.len() {
match args[index].as_str() {
"-i" => {
input = CsvInput::Stdin;
index += 1;
}
"-d" | "-s" => {
let Some(spec) = args.get(index + 1) else {
return Err(format!("{} expects a delimiter", args[index]));
};
delimiter = parse_delimiter(spec)?;
index += 2;
}
"-r" => {
let Some(path) = args.get(index + 1) else {
return Err("-r expects a file path".to_string());
};
input = CsvInput::File(path.clone());
index += 2;
}
option if option.starts_with('-') => {
return Err(format!("csv_in option '{option}' is not implemented yet"));
}
label => {
labels.push(label.to_string());
index += 1;
}
}
}
Ok(Self {
delimiter,
input,
labels,
})
}
fn read<R: BufRead>(&self, stdin: &mut R) -> WsResult<Vec<Event>> {
match &self.input {
CsvInput::Stdin => read_events(stdin, self.delimiter, &self.labels),
CsvInput::File(path) => {
let file = File::open(path).map_err(|err| format!("failed to open {path}: {err}"))?;
let mut reader = BufReader::new(file);
read_events(&mut reader, self.delimiter, &self.labels)
}
}
}
}
#[derive(Debug)]
struct MatchKid {
label: String,
value: String,
}
impl MatchKid {
fn parse(args: &[String]) -> WsResult<Self> {
match args {
[pair] => {
let Some((label, value)) = pair.split_once('=') else {
return Err("match expects LABEL VALUE or LABEL=VALUE".to_string());
};
Ok(Self {
label: label.to_string(),
value: value.to_string(),
})
}
[label, value] => Ok(Self {
label: label.clone(),
value: value.clone(),
}),
_ => Err("match expects LABEL VALUE or LABEL=VALUE".to_string()),
}
}
fn matches(&self, event: &Event) -> bool {
event.get(&self.label) == Some(self.value.as_str())
}
}
#[derive(Debug)]
struct PrintKid {
json: bool,
separator: String,
verbose: bool,
}
impl PrintKid {
fn parse(args: &[String]) -> WsResult<Self> {
let mut json = false;
let mut separator = ":".to_string();
let mut verbose = false;
let mut index = 0;
while index < args.len() {
match args[index].as_str() {
"-J" => {
json = true;
index += 1;
}
"-V" | "-VV" => {
verbose = true;
index += 1;
}
"-s" => {
let Some(value) = args.get(index + 1) else {
return Err("-s expects a separator".to_string());
};
separator = value.clone();
index += 2;
}
option => return Err(format!("print option '{option}' is not implemented yet")),
}
}
Ok(Self {
json,
separator,
verbose,
})
}
fn write<W: Write>(&self, events: &[Event], stdout: &mut W) -> WsResult<()> {
for event in events {
let line = if self.json {
event.json()
} else if self.verbose {
event.verbose(&self.separator)
} else {
event.plain(&self.separator)
};
writeln!(stdout, "{line}").map_err(|err| format!("failed to write output: {err}"))?;
}
Ok(())
}
}
fn read_events<R: BufRead>(reader: &mut R, delimiter: char, labels: &[String]) -> WsResult<Vec<Event>> {
let mut events = Vec::new();
let mut line = String::new();
loop {
line.clear();
let bytes = reader
.read_line(&mut line)
.map_err(|err| format!("failed to read input: {err}"))?;
if bytes == 0 {
break;
}
let row = line.trim_end_matches(|ch| ch == '\r' || ch == '\n');
if row.is_empty() {
continue;
}
let values = split_delimited_row(row, delimiter)?;
let fields = values
.into_iter()
.enumerate()
.map(|(index, value)| Field {
label: labels
.get(index)
.cloned()
.unwrap_or_else(|| format!("FIELD{}", index + 1)),
value,
})
.collect();
events.push(Event { fields });
}
Ok(events)
}
fn require_source(has_source: bool, kid: &str) -> WsResult<()> {
if has_source {
Ok(())
} else {
Err(format!("{kid} needs a source kid before it"))
}
}
fn parse_delimiter(spec: &str) -> WsResult<char> {
match spec {
"\\t" => Ok('\t'),
"\\n" => Ok('\n'),
"\\r" => Ok('\r'),
"" => Err("delimiter cannot be empty".to_string()),
_ => spec
.chars()
.next()
.ok_or_else(|| "delimiter cannot be empty".to_string()),
}
}
fn split_delimited_row(row: &str, delimiter: char) -> WsResult<Vec<String>> {
let mut values = Vec::new();
let mut current = String::new();
let mut chars = row.chars().peekable();
let mut in_quotes = false;
while let Some(ch) = chars.next() {
match ch {
'"' if in_quotes && chars.peek() == Some(&'"') => {
current.push('"');
chars.next();
}
'"' => in_quotes = !in_quotes,
ch if ch == delimiter && !in_quotes => {
values.push(current);
current = String::new();
}
_ => current.push(ch),
}
}
if in_quotes {
return Err("unterminated quote in delimited input".to_string());
}
values.push(current);
Ok(values)
}
fn split_pipeline(spec: &str) -> WsResult<Vec<String>> {
let mut stages = Vec::new();
let mut current = String::new();
let mut in_single = false;
let mut in_double = false;
for ch in spec.chars() {
match ch {
'\'' if !in_double => {
in_single = !in_single;
current.push(ch);
}
'"' if !in_single => {
in_double = !in_double;
current.push(ch);
}
'|' if !in_single && !in_double => {
let stage = current.trim();
if !stage.is_empty() {
stages.push(stage.to_string());
}
current.clear();
}
_ => current.push(ch),
}
}
if in_single || in_double {
return Err("unterminated quote in pipeline".to_string());
}
let stage = current.trim();
if !stage.is_empty() {
stages.push(stage.to_string());
}
Ok(stages)
}
fn tokenize(segment: &str) -> WsResult<Vec<String>> {
let mut tokens = Vec::new();
let mut current = String::new();
let mut chars = segment.chars().peekable();
let mut in_single = false;
let mut in_double = false;
while let Some(ch) = chars.next() {
match ch {
'\\' if !in_single => {
let Some(next) = chars.next() else {
return Err("dangling escape in pipeline".to_string());
};
current.push(next);
}
'\'' if !in_double => in_single = !in_single,
'"' if !in_single => in_double = !in_double,
ch if ch.is_whitespace() && !in_single && !in_double => {
if !current.is_empty() {
tokens.push(current);
current = String::new();
}
}
_ => current.push(ch),
}
}
if in_single || in_double {
return Err("unterminated quote in pipeline segment".to_string());
}
if !current.is_empty() {
tokens.push(current);
}
Ok(tokens)
}
fn escape_json(value: &str) -> String {
let mut escaped = String::new();
for ch in value.chars() {
match ch {
'"' => escaped.push_str("\\\""),
'\\' => escaped.push_str("\\\\"),
'\n' => escaped.push_str("\\n"),
'\r' => escaped.push_str("\\r"),
'\t' => escaped.push_str("\\t"),
ch if ch.is_control() => escaped.push_str(&format!("\\u{:04x}", ch as u32)),
_ => escaped.push(ch),
}
}
escaped
}
fn usage() -> &'static str {
"waterslide-rs 0.1.0
Usage:
waterslide-rs 'csv_in -i NAME AGE RIDE | print -V'
waterslide-rs --list-kids
Kids:
csv_in [-i] [-r file] [-d delimiter] LABEL...
match LABEL VALUE
match LABEL=VALUE
noop
print [-V] [-J] [-s separator]
Compatibility:
-D, -P, -A, and -C are accepted and ignored so old WaterSlide-style
invocations are easier to adapt.
"
}
fn main() {
if let Err(err) = real_main() {
eprintln!("ERROR: {err}");
process::exit(1);
}
}
fn real_main() -> WsResult<()> {
let args = env::args().skip(1).collect::<Vec<_>>();
if args.is_empty() || args.iter().any(|arg| arg == "-h" || arg == "--help") {
print!("{}", usage());
return Ok(());
}
if args.iter().any(|arg| arg == "--list-kids") {
println!("csv_in");
println!("match");
println!("noop");
println!("print");
return Ok(());
}
let pipeline_spec = collect_pipeline_spec(&args)?;
let pipeline = Pipeline::parse(&pipeline_spec)?;
let stdin = io::stdin();
let mut stdin = stdin.lock();
let stdout = io::stdout();
let mut stdout = stdout.lock();
pipeline.run(&mut stdin, &mut stdout)
}
fn collect_pipeline_spec(args: &[String]) -> WsResult<String> {
let mut pipeline_parts = Vec::new();
let mut index = 0;
while index < args.len() {
match args[index].as_str() {
"-A" | "-C" | "-D" | "-P" => {
if index + 1 >= args.len() {
return Err(format!("{} expects a path", args[index]));
}
index += 2;
}
flag if flag.starts_with('-') => {
return Err(format!("unknown top-level option '{flag}'"));
}
value => {
pipeline_parts.push(value.to_string());
index += 1;
}
}
}
if pipeline_parts.is_empty() {
return Err("nothing to parse".to_string());
}
Ok(pipeline_parts.join(" "))
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn parses_csv_quotes() {
let row = split_delimited_row(r#"Ava,"twisty, fast","quote "" ok""#, ',').unwrap();
assert_eq!(row, vec!["Ava", "twisty, fast", "quote \" ok"]);
}
#[test]
fn tokenizes_quoted_options() {
let tokens = tokenize(r#"print -s " :: " -V"#).unwrap();
assert_eq!(tokens, vec!["print", "-s", " :: ", "-V"]);
}
#[test]
fn runs_demo_pipeline() {
let pipeline = Pipeline::parse("csv_in -i NAME AGE RIDE | match RIDE fast | print -V").unwrap();
let mut input = Cursor::new(b"Ava,8,twisty\nBen,7,fast\n".to_vec());
let mut output = Vec::new();
pipeline.run(&mut input, &mut output).unwrap();
assert_eq!(
String::from_utf8(output).unwrap(),
"[NAME]Ben:[AGE]7:[RIDE]fast\n"
);
}
}