#![allow(dead_code)]
use crate::column::{ColumnFun, ColumnSingle};
use crate::comp::{Comp, CompMaker};
use crate::prelude::*;
use crate::util::split_plain;
use lazy_static::lazy_static;
use std::cell::RefCell;
use std::cmp;
use std::rc::Rc;
use std::sync::Mutex;
pub trait Agg {
fn add(&mut self, data: &[u8]);
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()>;
fn reset(&mut self);
fn value(&self) -> f64 {
0.0
}
}
type AggRef = Rc<RefCell<dyn Agg>>;
#[derive(Clone)]
pub struct Agger {
pub spec: String,
pub agg: AggRef,
pub out: String,
pub fmt: NumFormat,
}
impl fmt::Debug for Agger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} to {}", &self.spec, &self.out)
}
}
pub trait LineAgg {
fn add(&mut self, data: &TextLine);
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()>;
fn reset(&mut self);
fn value(&self) -> f64 {
0.0
}
fn lookup(&mut self, fieldnames: &[&str]) -> Result<()>;
fn replace<'a>(&self, head: &'a StringLine) -> &'a str;
fn can_replace(&self) -> bool;
fn is_col(&self, col: usize) -> bool;
}
type LineAggRef = Rc<RefCell<dyn LineAgg>>;
#[derive(Clone)]
pub struct LineAgger {
pub spec: String,
pub agg: LineAggRef,
pub out: AggType,
pub fmt: NumFormat,
}
impl fmt::Debug for LineAgger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} to {:?}", &self.spec, &self.out)
}
}
impl LineAgger {
fn is_col(&self, col: usize) -> bool {
return self.agg.borrow().is_col(col);
}
pub fn add(&mut self, data: &TextLine) {
self.agg.borrow_mut().add(data)
}
fn result(&mut self, w: &mut dyn Write) -> Result<()> {
self.agg.borrow_mut().result(w, self.fmt)
}
fn reset(&mut self) {
self.agg.borrow_mut().reset()
}
fn value(&self) -> f64 {
self.agg.borrow().value()
}
fn lookup(&mut self, fieldnames: &[&str]) -> Result<()> {
self.agg.borrow_mut().lookup(fieldnames)
}
fn replace<'a>(&self, head: &'a StringLine) -> &'a str {
self.agg.borrow().replace(head)
}
fn can_replace(&self) -> bool {
self.agg.borrow().can_replace()
}
pub fn new(spec: &str) -> Result<Self> {
if let Some((a, b)) = spec.split_once(',') {
if a.eq_ignore_ascii_case("replace") {
Self::new_replace2(b, spec)
} else if a.eq_ignore_ascii_case("prefix") || a.eq_ignore_ascii_case("prepend") {
Self::new_prefix2(spec, spec)
} else if a.eq_ignore_ascii_case("suffix") || a.eq_ignore_ascii_case("append") {
Self::new_append2(spec, spec)
} else {
err!("Unknown placement type '{}' in LineAgg spec '{}'", a, spec)
}
} else {
err!("No comma in LineAgg spec '{}'", spec)
}
}
pub fn new_replace2(rep: &str, spec: &str) -> Result<Self> {
Self::new2(AggType::Replace, rep, spec)
}
pub fn new_replace(spec: &str) -> Result<Self> {
Self::new_replace2(spec, &format!("replace,{}", spec))
}
pub fn new_prefix(spec: &str) -> Result<Self> {
Self::new_prefix2(spec, &format!("prefix,{}", spec))
}
pub fn new_prefix2(rep: &str, spec: &str) -> Result<Self> {
if let Some((a, b)) = rep.split_once(',') {
Self::new2(AggType::Prefix(a.to_string()), b, spec)
} else {
err!("Prefix format is NewName,Column,Spec : {}", spec)
}
}
pub fn new_append(spec: &str) -> Result<Self> {
Self::new_append2(spec, &format!("append,{}", spec))
}
pub fn new_append2(rep: &str, spec: &str) -> Result<Self> {
if let Some((a, b)) = rep.split_once(',') {
Self::new2(AggType::Append(a.to_string()), b, spec)
} else {
err!("Append format is NewName,Column,Spec : {}", spec)
}
}
pub fn new2(out: AggType, rep: &str, spec: &str) -> Result<Self> {
Ok(Self {
spec: spec.to_string(),
agg: AggMaker::make_line(rep)?,
out,
fmt: NumFormat::default(),
})
}
}
struct ExprAgg {
var: String,
expr: Expr,
which: usize,
start: f64,
val: f64,
}
impl LineAgg for ExprAgg {
fn add(&mut self, data: &TextLine) {
self.val = self.expr.eval(data);
self.expr.set_var(self.which, self.val);
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.value(), w)
}
fn reset(&mut self) {
self.val = self.start;
self.expr.set_var(self.which, self.val);
}
fn value(&self) -> f64 {
self.val
}
fn lookup(&mut self, fieldnames: &[&str]) -> Result<()> {
self.expr.lookup(fieldnames)
}
fn replace<'a>(&self, _head: &'a StringLine) -> &'a str {
""
}
fn can_replace(&self) -> bool {
false
}
fn is_col(&self, _col: usize) -> bool {
false
}
}
impl ExprAgg {
fn new(spec: &str) -> Result<Self> {
if let Some((a, b)) = spec.split_once(',') {
let var = a.to_string();
if let Some((c, d)) = b.split_once(',') {
let start = c.to_f64_whole(spec.as_bytes(), "Var,Init,Expr")?;
let mut expr = Expr::new(d)?;
let which = expr.find_var(a);
expr.set_var(which, start);
return Ok(Self {
var,
expr,
which,
start,
val: start,
});
}
}
err!("Expr Agg format is Var,Init,Expr '{}'", spec)
}
}
pub trait Counter {
fn counter(&mut self, data: &[u8]) -> usize;
}
struct Chars {
utf8: bool,
}
impl Chars {
const fn new(utf8: bool) -> Self {
Self { utf8 }
}
}
impl Counter for Chars {
fn counter(&mut self, data: &[u8]) -> usize {
if self.utf8 {
String::from_utf8_lossy(data).chars().count()
} else {
data.len()
}
}
}
struct Swords {
utf8: bool,
}
impl Swords {
const fn new(utf8: bool) -> Self {
Self { utf8 }
}
}
impl Counter for Swords {
fn counter(&mut self, data: &[u8]) -> usize {
let mut saw_space = true;
let mut ret = 0;
if self.utf8 {
for x in String::from_utf8_lossy(data).chars() {
if !x.is_whitespace() {
if saw_space {
ret += 1;
saw_space = false;
}
} else {
saw_space = true;
}
}
} else {
for x in data {
if *x > b' ' {
if saw_space {
ret += 1;
saw_space = false;
}
} else {
saw_space = true;
}
}
}
ret
}
}
struct Awords {
utf8: bool,
}
impl Awords {
const fn new(utf8: bool) -> Self {
Self { utf8 }
}
}
impl Counter for Awords {
fn counter(&mut self, data: &[u8]) -> usize {
let mut saw_space = true;
let mut ret = 0;
if self.utf8 {
for x in String::from_utf8_lossy(data).chars() {
if x.is_alphanumeric() {
if saw_space {
ret += 1;
saw_space = false;
}
} else {
saw_space = true;
}
}
} else {
for x in data {
if x.is_ascii_alphanumeric() {
if saw_space {
ret += 1;
saw_space = false;
}
} else {
saw_space = true;
}
}
}
ret
}
}
#[derive(Clone, Debug)]
pub enum AggType {
Replace,
Prefix(String),
Append(String),
}
impl Default for AggType {
fn default() -> Self {
Self::Replace
}
}
#[derive(Clone, Debug)]
struct AggCol {
src: NamedCol,
agg: Agger,
}
impl LineAgg for AggCol {
fn is_col(&self, col: usize) -> bool {
self.src.num == col
}
fn reset(&mut self) {
self.agg.reset();
}
fn add(&mut self, data: &TextLine) {
self.agg.add(data.get(self.src.num));
}
fn lookup(&mut self, fieldnames: &[&str]) -> Result<()> {
self.src.lookup(fieldnames)
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
self.agg.result(w, fmt)
}
fn value(&self) -> f64 {
self.agg.value()
}
fn replace<'a>(&self, head: &'a StringLine) -> &'a str {
head.get(self.src.num)
}
fn can_replace(&self) -> bool {
true
}
}
impl AggCol {
fn new(spec: &str) -> Result<Self> {
if let Some((a, _)) = spec.split_once(',') {
Self::new2(a, spec)
} else {
err!("No comma in AggCol spec")
}
}
fn new2(src: &str, spec: &str) -> Result<Self> {
Ok(Self {
src: NamedCol::new_from(src)?,
agg: Agger::new(spec)?,
})
}
}
impl ColumnFun for LineAgger {
fn add_names(&self, w: &mut ColumnHeader, head: &StringLine) -> Result<()> {
match &self.out {
AggType::Replace => w.push(self.agg.borrow().replace(head)),
AggType::Prefix(s) => w.push(s),
AggType::Append(s) => w.push(s),
}
}
fn write(&mut self, w: &mut dyn Write, _line: &TextLine, _text: &TextFileMode) -> Result<()> {
self.agg.borrow_mut().result(w, self.fmt)
}
fn lookup(&mut self, fieldnames: &[&str]) -> Result<()> {
self.agg.borrow_mut().lookup(fieldnames)
}
}
impl ColumnFun for Agger {
fn add_names(&self, w: &mut ColumnHeader, _head: &StringLine) -> Result<()> {
w.push(&self.out)
}
fn write(&mut self, w: &mut dyn Write, _line: &TextLine, _text: &TextFileMode) -> Result<()> {
self.agg.borrow_mut().result(w, self.fmt)
}
fn lookup(&mut self, _fieldnames: &[&str]) -> Result<()> {
Ok(())
}
}
impl Agger {
pub fn add(&mut self, data: &[u8]) {
self.agg.borrow_mut().add(data)
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
self.agg.borrow_mut().result(w, fmt)
}
fn reset(&mut self) {
self.agg.borrow_mut().reset()
}
fn value(&self) -> f64 {
self.agg.borrow().value()
}
pub fn new(spec: &str) -> Result<Self> {
if let Some((a, b)) = spec.split_once(',') {
Self::new2(a, b, spec)
} else {
Self::new2("", spec, spec)
}
}
pub fn new2(name: &str, spec: &str, orig: &str) -> Result<Self> {
Ok(Self {
spec: orig.to_string(),
out: name.to_string(),
agg: AggMaker::make(spec)?,
fmt: NumFormat::default(),
})
}
pub fn deep_clone(&self) -> Self {
Self::new(&self.spec).unwrap()
}
}
#[derive(Clone, Default, Debug)]
pub struct AggList {
v: Vec<Agger>,
fmt: NumFormat,
}
impl AggList {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.v.len()
}
pub fn fmt(&mut self, fmt: NumFormat) {
self.fmt = fmt;
for x in &mut self.v {
x.fmt = fmt;
}
}
pub fn deep_clone(&self) -> Self {
let mut n = Self::new();
for x in &self.v {
n.push(&x.spec).unwrap();
}
n.fmt(self.fmt);
n
}
pub fn get(&self, pos: usize) -> &Agger {
&self.v[pos]
}
pub fn reset(&mut self) {
for x in &mut self.v {
x.agg.borrow_mut().reset();
}
}
pub fn is_empty(&self) -> bool {
self.v.is_empty()
}
pub fn push(&mut self, spec: &str) -> Result<()> {
self.do_push(Agger::new(spec)?)
}
pub fn do_push(&mut self, item: Agger) -> Result<()> {
self.v.push(item);
Ok(()) }
pub fn add(&mut self, data: &[u8]) {
for x in &mut self.v {
x.agg.borrow_mut().add(data);
}
}
pub fn fill(&self, w: &mut Writer) {
for x in &self.v {
w.push(Box::new(x.clone()));
}
}
}
#[derive(Clone, Default, Debug)]
pub struct LineAggList {
v: Vec<LineAgger>,
}
impl LineAggList {
pub fn new() -> Self {
Self::default()
}
pub fn reset(&mut self) {
for x in &mut self.v {
x.reset();
}
}
pub fn fmt(&mut self, fmt: NumFormat) {
for x in &mut self.v {
x.fmt = fmt;
}
}
pub fn is_empty(&self) -> bool {
self.v.is_empty()
}
pub fn push_replace(&mut self, spec: &str) -> Result<()> {
self.push(LineAgger::new_replace(spec)?)
}
pub fn push_prefix(&mut self, spec: &str) -> Result<()> {
self.push(LineAgger::new_prefix(spec)?)
}
pub fn push_first_prefix(&mut self, spec: &str) -> Result<()> {
self.v.insert(0, LineAgger::new_prefix(spec)?);
Ok(())
}
pub fn push_append(&mut self, spec: &str) -> Result<()> {
self.push(LineAgger::new_append(spec)?)
}
pub fn push(&mut self, item: LineAgger) -> Result<()> {
self.v.push(item);
Ok(()) }
pub fn add(&mut self, data: &TextLine) {
for x in &mut self.v {
x.add(data);
}
}
pub fn lookup(&mut self, fieldnames: &[&str]) -> Result<()> {
for x in &mut self.v {
x.lookup(fieldnames)?;
}
Ok(())
}
pub fn fill(&self, w: &mut Writer, head: &StringLine) {
for x in &self.v {
if let AggType::Prefix(_s) = &x.out {
w.push(Box::new(x.clone()));
}
}
'outer: for (i, x) in head.iter().enumerate() {
for y in &self.v {
if let AggType::Replace = &y.out {
if y.is_col(i) {
w.push(Box::new(y.clone()));
continue 'outer;
}
}
}
w.push(Box::new(ColumnSingle::with_name(x).unwrap()));
}
for x in &self.v {
if let AggType::Append(_s) = &x.out {
w.push(Box::new(x.clone()));
}
}
}
}
#[derive(Debug)]
struct Merge {
delim: u8,
out_delim: u8,
max_parts: usize,
comp: Comp,
do_sort: bool,
do_uniq: bool,
min_len: usize,
max_len: usize,
do_count: bool,
data: TextLine,
}
impl Default for Merge {
fn default() -> Self {
Self {
delim: b',',
out_delim: b',',
max_parts: usize::MAX,
comp: Comp::default(),
do_sort: false,
do_uniq: false,
min_len: 0,
max_len: usize::MAX,
do_count: false,
data: TextLine::default(),
}
}
}
impl Merge {
fn new(spec: &str) -> Result<Self> {
let mut m = Self::default();
let mut sp = spec;
loop {
if sp.is_empty() {
break;
}
if sp.starts_with("comp:") {
m.add_one(sp)?;
break;
}
if sp.as_bytes()[0] == b'D' {
if sp.len() < 3 {
return err!("Merge Delim Spec must be three bytes, e.g. 'D.,'");
}
m.add_one(&sp[0..3])?;
sp = &sp[3..];
if !sp.is_empty() {
if sp.as_bytes()[0] != b'.' {
return err!("Merge Delim Spec must be three bytes, e.g. 'D.,'");
}
sp = &sp[1..];
}
}
if let Some((a, b)) = sp.split_once('.') {
m.add_one(a)?;
sp = b;
} else {
m.add_one(sp)?;
break;
}
}
Ok(m)
}
fn add_one(&mut self, spec: &str) -> Result<()> {
if spec.is_empty() {
return err!("Invalid empty Merge Part");
} else if spec.as_bytes()[0] == b'D' {
if spec.len() != 3 {
return err!("Merge Delim Spec must be three bytes, e.g. 'D.,'");
}
self.delim = spec.as_bytes()[1];
self.out_delim = spec.as_bytes()[2];
} else if spec.eq_ignore_ascii_case("sort") {
self.do_sort = true;
} else if spec.eq_ignore_ascii_case("uniq") {
self.do_uniq = true;
} else if spec.eq_ignore_ascii_case("count") {
self.do_count = true;
} else if let Some(val) = spec.strip_prefix("comp:") {
self.comp = CompMaker::make_comp(val)?;
} else if let Some(val) = spec.strip_prefix("min_len:") {
self.min_len = val.to_usize_whole(spec.as_bytes(), "merge")?;
} else if let Some(val) = spec.strip_prefix("max_len:") {
self.max_len = val.to_usize_whole(spec.as_bytes(), "merge")?;
} else if let Some(val) = spec.strip_prefix("max_parts:") {
self.max_parts = val.to_usize_whole(spec.as_bytes(), "merge")?;
} else {
return err!("Unrecognized Merge Part '{}'", spec);
}
Ok(())
}
}
impl Agg for Merge {
fn add(&mut self, data: &[u8]) {
if !data.is_empty() {
if !self.data.line().is_empty() {
self.data.raw().push(self.delim);
}
self.data.raw().extend_from_slice(data);
}
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
split_plain(&mut self.data.parts, &self.data.line, self.delim);
if self.do_sort {
self.data.parts.sort_by(|a, b| {
self.comp
.comp(a.get(&self.data.line), b.get(&self.data.line))
});
}
if self.do_uniq {
self.data.parts.dedup_by(|a, b| {
self.comp
.equal(a.get(&self.data.line), b.get(&self.data.line))
});
}
if self.do_count {
fmt.print(self.data.parts().len() as f64, w)?;
} else {
let mut num_written = 0;
for x in &self.data.parts {
if x.len() >= self.min_len && x.len() <= self.max_len {
if num_written > 0 {
w.write_all(&[self.out_delim])?;
}
w.write_all(x.get(self.data.line()))?;
num_written += 1;
if num_written >= self.max_parts {
break;
}
}
}
}
Ok(())
}
fn reset(&mut self) {
self.data.clear();
}
}
struct Min {
comp: Comp,
val: Vec<u8>,
empty: bool,
}
impl Min {
fn new(spec: &str) -> Result<Self> {
Ok(Self {
comp: CompMaker::make_comp(spec)?,
val: Vec::new(),
empty: true,
})
}
}
impl Agg for Min {
fn value(&self) -> f64 {
self.val.to_f64_lossy()
}
fn add(&mut self, data: &[u8]) {
if self.empty {
self.empty = false;
self.val.extend_from_slice(data);
return;
}
if self.comp.comp.comp(&self.val, data) == Ordering::Greater {
self.val.clear();
self.val.extend_from_slice(data);
}
}
fn result(&mut self, w: &mut dyn Write, _fmt: NumFormat) -> Result<()> {
w.write_all(&self.val)?;
Ok(())
}
fn reset(&mut self) {
self.val.clear();
self.empty = true;
}
}
struct Mean {
val: f64,
cnt: f64,
}
impl Mean {
fn new(spec: &str) -> Result<Self> {
if spec.is_empty() {
Ok(Self { val: 0.0, cnt: 0.0 })
} else {
err!("Unexpected pattern with 'Mean' aggregator : '{}'", spec)
}
}
}
impl Agg for Mean {
fn add(&mut self, data: &[u8]) {
self.val += data.to_f64_lossy();
self.cnt += 1.0;
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.value(), w)
}
fn value(&self) -> f64 {
if self.cnt > 0.0 {
self.val / self.cnt
} else {
0.0
}
}
fn reset(&mut self) {
self.val = 0.0;
self.cnt = 0.0;
}
}
struct Sum {
val: f64,
}
impl Sum {
fn new(spec: &str) -> Result<Self> {
if spec.is_empty() {
Ok(Self { val: 0.0 })
} else {
err!("Unexpected pattern with 'Sum' aggregator : '{}'", spec)
}
}
}
impl Agg for Sum {
fn add(&mut self, data: &[u8]) {
self.val += data.to_f64_lossy();
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.value(), w)
}
fn value(&self) -> f64 {
self.val
}
fn reset(&mut self) {
self.val = 0.0;
}
}
struct ASum {
val: usize,
cnt: Box<dyn Counter>,
}
impl ASum {
fn new(spec: &str) -> Result<Self> {
Ok(Self {
val: 0,
cnt: AggMaker::make_counter(spec)?,
})
}
}
impl Agg for ASum {
fn value(&self) -> f64 {
self.val as f64
}
fn add(&mut self, data: &[u8]) {
self.val += self.cnt.counter(data);
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.val as f64, w)
}
fn reset(&mut self) {
self.val = 0;
}
}
struct AMin {
val: usize,
cnt: Box<dyn Counter>,
}
impl AMin {
fn new(spec: &str) -> Result<Self> {
Ok(Self {
val: usize::MAX,
cnt: AggMaker::make_counter(spec)?,
})
}
}
impl Agg for AMin {
fn value(&self) -> f64 {
self.val as f64
}
fn add(&mut self, data: &[u8]) {
self.val = cmp::min(self.val, self.cnt.counter(data));
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.val as f64, w)
}
fn reset(&mut self) {
self.val = usize::MAX;
}
}
struct AMax {
val: usize,
cnt: Box<dyn Counter>,
}
impl AMax {
fn new(spec: &str) -> Result<Self> {
Ok(Self {
val: 0,
cnt: AggMaker::make_counter(spec)?,
})
}
}
impl Agg for AMax {
fn value(&self) -> f64 {
self.val as f64
}
fn add(&mut self, data: &[u8]) {
self.val = cmp::max(self.val, self.cnt.counter(data));
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.val as f64, w)
}
fn reset(&mut self) {
self.val = 0;
}
}
struct AMean {
val: usize,
num: usize,
cnt: Box<dyn Counter>,
}
impl AMean {
fn new(spec: &str) -> Result<Self> {
Ok(Self {
val: 0,
num: 0,
cnt: AggMaker::make_counter(spec)?,
})
}
}
impl Agg for AMean {
fn value(&self) -> f64 {
if self.num > 0 {
self.val as f64 / self.num as f64
} else {
0.0
}
}
fn add(&mut self, data: &[u8]) {
self.val += self.cnt.counter(data);
self.num += 1;
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.val as f64, w)
}
fn reset(&mut self) {
self.val = 0;
self.num = 0;
}
}
struct Max {
comp: Comp,
val: Vec<u8>,
}
impl Max {
fn new(spec: &str) -> Result<Self> {
Ok(Self {
comp: CompMaker::make_comp(spec)?,
val: Vec::new(),
})
}
}
impl Agg for Max {
fn add(&mut self, data: &[u8]) {
if self.comp.comp.comp(&self.val, data) == Ordering::Less {
self.val.clear();
self.val.extend_from_slice(data);
}
}
fn result(&mut self, w: &mut dyn Write, _fmt: NumFormat) -> Result<()> {
w.write_all(&self.val)?;
Ok(())
}
fn reset(&mut self) {
self.val.clear();
}
fn value(&self) -> f64 {
self.val.to_f64_lossy()
}
}
struct Prefix {
val: Vec<u8>,
empty: bool,
}
impl Prefix {
fn new(spec: &str) -> Result<Self> {
if spec.is_empty() {
Ok(Self {
val: Vec::new(),
empty: true,
})
} else {
err!(
"Unexpected non-empty pattern passed to Prefix agregator : '{}'",
spec
)
}
}
}
fn common_prefix(val: &mut Vec<u8>, data: &[u8]) {
if val.len() > data.len() {
val.truncate(data.len());
}
#[allow(clippy::needless_range_loop)] for x in 0..val.len() {
if val[x] != data[x] {
val.truncate(x);
break;
}
}
}
impl Agg for Prefix {
fn add(&mut self, data: &[u8]) {
if self.empty {
self.empty = false;
self.val.extend_from_slice(data);
} else {
common_prefix(&mut self.val, data);
}
}
fn result(&mut self, w: &mut dyn Write, _fmt: NumFormat) -> Result<()> {
w.write_all(&self.val)?;
Ok(())
}
fn reset(&mut self) {
self.val.clear();
self.empty = true;
}
}
struct Suffix {
val: Vec<u8>,
empty: bool,
}
impl Suffix {
fn new(spec: &str) -> Result<Self> {
if spec.is_empty() {
Ok(Self {
val: Vec::new(),
empty: true,
})
} else {
err!(
"Unexpected non-empty pattern passed to Sufffix agregator : '{}'",
spec
)
}
}
}
fn common_suffix(val: &mut Vec<u8>, data: &[u8]) {
let mut ok_bytes = 0;
for x in val.iter().rev().zip(data.iter().rev()) {
if x.0 == x.1 {
ok_bytes += 1;
} else {
break;
}
}
if ok_bytes < val.len() {
val.drain(0..(val.len() - ok_bytes));
}
}
impl Agg for Suffix {
fn add(&mut self, data: &[u8]) {
if self.empty {
self.empty = false;
self.val.extend_from_slice(data);
} else {
common_suffix(&mut self.val, data);
}
}
fn result(&mut self, w: &mut dyn Write, _fmt: NumFormat) -> Result<()> {
w.write_all(&self.val)?;
Ok(())
}
fn reset(&mut self) {
self.val.clear();
self.empty = true;
}
}
struct Count {
val: isize,
init: isize,
}
impl Count {
fn new(spec: &str) -> Result<Self> {
if spec.is_empty() {
Ok(Self { val: 0, init: 0 })
} else {
let val = spec.to_isize_whole(spec.as_bytes(), "count")?;
Ok(Self { val, init: val })
}
}
}
impl Agg for Count {
fn value(&self) -> f64 {
self.val as f64
}
fn add(&mut self, _data: &[u8]) {
self.val += 1;
}
fn result(&mut self, w: &mut dyn Write, fmt: NumFormat) -> Result<()> {
fmt.print(self.value(), w)
}
fn reset(&mut self) {
self.val = self.init;
}
}
type MakerBox = Box<dyn Fn(&str) -> Result<Rc<RefCell<dyn Agg>>> + Send>;
struct AggMakerItem {
tag: &'static str,
help: &'static str,
maker: MakerBox,
}
type CounterBox = Box<dyn Fn(bool, &str) -> Result<Box<dyn Counter>> + Send>;
struct CounterMakerItem {
tag: &'static str,
help: &'static str,
maker: CounterBox,
}
struct AggMakerAlias {
old_name: &'static str,
new_name: &'static str,
}
lazy_static! {
static ref COUNTER_MAKER: Mutex<Vec<CounterMakerItem>> = Mutex::new(Vec::new());
static ref AGG_MAKER: Mutex<Vec<AggMakerItem>> = Mutex::new(Vec::new());
static ref AGG_ALIAS: Mutex<Vec<AggMakerAlias>> = Mutex::new(Vec::new());
static ref MODIFIERS: Vec<&'static str> = vec![];
}
#[derive(Debug, Clone, Default)]
pub struct AggMaker {}
impl AggMaker {
fn init() -> Result<()> {
if !AGG_MAKER.lock().unwrap().is_empty() {
return Ok(());
}
Self::do_add_alias("min", "minimum")?;
Self::do_add_alias("max", "maximum")?;
Self::do_add_alias("mean", "avg")?;
Self::do_add_alias("amean", "aavg")?;
Self::do_push_counter("chars", "Count the characters", |utf8, _p| {
Ok(Box::new(Chars::new(utf8)))
})?;
Self::do_push_counter("swords", "Count words, meaning non-space", |utf8, _p| {
Ok(Box::new(Swords::new(utf8)))
})?;
Self::do_push_counter("awords", "Count words, meaning alphanumeric", |utf8, _p| {
Ok(Box::new(Awords::new(utf8)))
})?;
Self::do_push("asum", "Sum of the given counter", |p| {
Ok(Rc::new(RefCell::new(ASum::new(p)?)))
})?;
Self::do_push("amin", "Min of the given counter", |p| {
Ok(Rc::new(RefCell::new(AMin::new(p)?)))
})?;
Self::do_push("amax", "Max of the given counter", |p| {
Ok(Rc::new(RefCell::new(AMax::new(p)?)))
})?;
Self::do_push("amean", "Mean of the given counter", |p| {
Ok(Rc::new(RefCell::new(AMean::new(p)?)))
})?;
Self::do_push("merge", "Merge into a delimited list of values", |p| {
Ok(Rc::new(RefCell::new(Merge::new(p)?)))
})?;
Self::do_push("count", "The number of things aggregated", |p| {
Ok(Rc::new(RefCell::new(Count::new(p)?)))
})?;
Self::do_push(
"prefix",
"The longest common prefix of the input values",
|p| Ok(Rc::new(RefCell::new(Prefix::new(p)?))),
)?;
Self::do_push(
"suffix",
"The longest common suffix of the input values",
|p| Ok(Rc::new(RefCell::new(Suffix::new(p)?))),
)?;
Self::do_push(
"min",
"Keep the minimum value, Pattern is the associated Comparator",
|p| Ok(Rc::new(RefCell::new(Min::new(p)?))),
)?;
Self::do_push(
"max",
"Keep the maximum value, Pattern is the associated Comparator",
|p| Ok(Rc::new(RefCell::new(Max::new(p)?))),
)?;
Self::do_push("mean", "The arithmetic mean of the values", |p| {
Ok(Rc::new(RefCell::new(Mean::new(p)?)))
})?;
Self::do_push("sum", "The sum of the values", |p| {
Ok(Rc::new(RefCell::new(Sum::new(p)?)))
})?;
Ok(())
}
pub fn push<F: 'static>(tag: &'static str, help: &'static str, maker: F) -> Result<()>
where
F: Fn(&str) -> Result<Rc<RefCell<dyn Agg>>> + Send,
{
Self::init()?;
Self::do_push(tag, help, maker)
}
pub fn push_counter<F: 'static>(tag: &'static str, help: &'static str, maker: F) -> Result<()>
where
F: Fn(bool, &str) -> Result<Box<dyn Counter>> + Send,
{
Self::init()?;
Self::do_push_counter(tag, help, maker)
}
pub fn add_alias(old_name: &'static str, new_name: &'static str) -> Result<()> {
Self::init()?;
Self::do_add_alias(old_name, new_name)
}
fn resolve_alias(name: &str) -> &str {
let mut mm = AGG_ALIAS.lock().unwrap();
for x in mm.iter_mut() {
if x.new_name == name {
return x.old_name;
}
}
name
}
fn do_add_alias(old_name: &'static str, new_name: &'static str) -> Result<()> {
if MODIFIERS.contains(&new_name) {
return err!(
"You can't add an alias named {new_name} because that is reserved for a modifier"
);
}
let m = AggMakerAlias { old_name, new_name };
let mut mm = AGG_ALIAS.lock().unwrap();
for x in mm.iter_mut() {
if x.new_name == m.new_name {
*x = m;
return Ok(());
}
}
mm.push(m);
Ok(())
}
fn do_push<F: 'static>(tag: &'static str, help: &'static str, maker: F) -> Result<()>
where
F: Fn(&str) -> Result<Rc<RefCell<dyn Agg>>> + Send,
{
if MODIFIERS.contains(&tag) {
return err!("You can't add a agg named {tag} because that is reserved for a modifier");
}
let m = AggMakerItem {
tag,
help,
maker: Box::new(maker),
};
let mut mm = AGG_MAKER.lock().unwrap();
for x in mm.iter_mut() {
if x.tag == m.tag {
*x = m;
return Ok(());
}
}
mm.push(m);
Ok(())
}
fn do_push_counter<F: 'static>(tag: &'static str, help: &'static str, maker: F) -> Result<()>
where
F: Fn(bool, &str) -> Result<Box<dyn Counter>> + Send,
{
if MODIFIERS.contains(&tag) {
return err!(
"You can't add a counter named {tag} because that is reserved for a modifier"
);
}
let m = CounterMakerItem {
tag,
help,
maker: Box::new(maker),
};
let mut mm = COUNTER_MAKER.lock().unwrap();
for x in mm.iter_mut() {
if x.tag == m.tag {
*x = m;
return Ok(());
}
}
mm.push(m);
Ok(())
}
pub fn help() {
Self::init().unwrap();
println!("Modifers :");
println!("utf8 : do the unicode thing, rather than the ascii thing.");
println!("Methods :");
let mm = AGG_MAKER.lock().unwrap();
for x in &*mm {
println!("{:12}{}", x.tag, x.help);
}
println!("Counters :");
let mm = COUNTER_MAKER.lock().unwrap();
for x in &*mm {
println!("{:12}{}", x.tag, x.help);
}
println!();
println!("'merge' pattern is a period delimited set of any of the following :");
println!("sort -- sort the parts");
println!("uniq -- Remove any adjacent equal parts. Happens after sorting.");
println!("count -- Display the count of parts, not the parts themselves.");
println!("max_len:N -- Discard any parts longer than N bytes.");
println!("min_len:N -- Discard any parts shorter than N bytes.");
println!("max_parts:N -- Display only the first N parts.");
println!("Dxy -- 'x' is the input delimiter and 'y' is the output delimiter. Default is comma for both.");
println!("comp:spec -- Spec for comparator for sort or uniq. Must be last piece.");
println!();
println!("See also https://avjewe.github.io/cdxdoc/Aggregator.html.");
}
pub fn make_line(spec: &str) -> Result<LineAggRef> {
if let Some((a, b)) = spec.split_once(',') {
if a == "expr" {
return Ok(Rc::new(RefCell::new(ExprAgg::new(b)?)));
}
}
Ok(Rc::new(RefCell::new(AggCol::new(spec)?)))
}
pub fn make2(spec: &str, pattern: &str) -> Result<AggRef> {
Self::init()?;
let mut name = "";
if !spec.is_empty() {
for x in spec.split('.') {
name = x;
}
name = Self::resolve_alias(name);
}
let mm = AGG_MAKER.lock().unwrap();
for x in &*mm {
if x.tag == name {
return (x.maker)(pattern);
}
}
err!("No Agg found with name '{}'", name)
}
pub fn make_counter2(spec: &str, pattern: &str) -> Result<Box<dyn Counter>> {
let mut name = "";
let mut utf8 = false;
if !spec.is_empty() {
for x in spec.split('.') {
if x.eq_ignore_ascii_case("utf8") {
utf8 = true;
} else {
name = x;
}
}
name = Self::resolve_alias(name);
}
let mm = COUNTER_MAKER.lock().unwrap();
for x in &*mm {
if x.tag == name {
return (x.maker)(utf8, pattern);
}
}
err!("No Counter found with name '{}'", name)
}
pub fn make(spec: &str) -> Result<AggRef> {
if let Some((a, b)) = spec.split_once(',') {
Self::make2(a, b)
} else {
Self::make2(spec, "")
}
}
pub fn make_counter(spec: &str) -> Result<Box<dyn Counter>> {
if let Some((a, b)) = spec.split_once(',') {
Self::make_counter2(a, b)
} else {
Self::make_counter2(spec, "")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn suffix() {
let mut v = b"123456789".to_vec();
common_suffix(&mut v, b"123456789");
assert_eq!(&v, b"123456789");
common_suffix(&mut v, b"23456789");
assert_eq!(&v, b"23456789");
common_suffix(&mut v, b"dsjfldkjsfaslkjfaslkfjadflkj3456789");
assert_eq!(&v, b"3456789");
common_suffix(&mut v, b"");
assert_eq!(&v, b"");
common_suffix(&mut v, b"");
assert_eq!(&v, b"");
common_suffix(&mut v, b"sadfddsafasdgfg");
assert_eq!(&v, b"");
}
#[test]
fn prefix() {
let mut v = b"123456789".to_vec();
common_prefix(&mut v, b"123456789");
assert_eq!(&v, b"123456789");
common_prefix(&mut v, b"12345678");
assert_eq!(&v, b"12345678");
common_prefix(&mut v, b"1234567sdfhasflhasflasflaksjfasdkhj");
assert_eq!(&v, b"1234567");
common_prefix(&mut v, b"sdkjadflkjafdakjdsf");
assert_eq!(&v, b"");
common_prefix(&mut v, b"sdkjadflkjafdakjdsf");
assert_eq!(&v, b"");
common_prefix(&mut v, b"");
assert_eq!(&v, b"");
}
}