use crate::event::{flatten_dynamic, Event, FlattenStyle};
use crate::pipeline;
use once_cell::sync::Lazy;
use rhai::Dynamic;
use std::collections::HashMap;
use std::sync::Mutex;
static CSV_FORMATTER_HEADER_REGISTRY: Lazy<Mutex<HashMap<String, bool>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub(crate) fn needs_csv_quoting(value: &str, delimiter: char) -> bool {
value.is_empty()
|| value.contains(delimiter)
|| value.contains('"')
|| value.contains('\n')
|| value.contains('\r')
|| value.starts_with(' ')
|| value.ends_with(' ')
}
pub(crate) fn escape_csv_value(value: &str, delimiter: char) -> String {
if needs_csv_quoting(value, delimiter) {
let escaped = value.replace('"', "\"\"");
format!("\"{}\"", escaped)
} else {
value.to_string()
}
}
pub struct CsvFormatter {
delimiter: char,
keys: Vec<String>,
include_header: bool,
formatter_key: String,
worker_mode: bool, }
impl CsvFormatter {
pub fn new(keys: Vec<String>) -> Self {
let formatter_key = format!(",_{}", Self::keys_hash(&keys));
Self {
delimiter: ',',
keys,
include_header: true,
formatter_key,
worker_mode: false,
}
}
pub fn new_tsv(keys: Vec<String>) -> Self {
let formatter_key = format!("\t_{}", Self::keys_hash(&keys));
Self {
delimiter: '\t',
keys,
include_header: true,
formatter_key,
worker_mode: false,
}
}
pub fn new_csv_no_header(keys: Vec<String>) -> Self {
let formatter_key = format!(",_noheader_{}", Self::keys_hash(&keys));
Self {
delimiter: ',',
keys,
include_header: false,
formatter_key,
worker_mode: false,
}
}
pub fn new_tsv_no_header(keys: Vec<String>) -> Self {
let formatter_key = format!("\t_noheader_{}", Self::keys_hash(&keys));
Self {
delimiter: '\t',
keys,
include_header: false,
formatter_key,
worker_mode: false,
}
}
pub fn new_worker(keys: Vec<String>) -> Self {
let formatter_key = format!(",_worker_{}", Self::keys_hash(&keys));
Self {
delimiter: ',',
keys,
include_header: false, formatter_key,
worker_mode: true,
}
}
pub fn new_tsv_worker(keys: Vec<String>) -> Self {
let formatter_key = format!("\t_worker_{}", Self::keys_hash(&keys));
Self {
delimiter: '\t',
keys,
include_header: false, formatter_key,
worker_mode: true,
}
}
pub fn new_csv_no_header_worker(keys: Vec<String>) -> Self {
let formatter_key = format!(",_noheader_worker_{}", Self::keys_hash(&keys));
Self {
delimiter: ',',
keys,
include_header: false,
formatter_key,
worker_mode: true,
}
}
pub fn new_tsv_no_header_worker(keys: Vec<String>) -> Self {
let formatter_key = format!("\t_noheader_worker_{}", Self::keys_hash(&keys));
Self {
delimiter: '\t',
keys,
include_header: false,
formatter_key,
worker_mode: true,
}
}
fn keys_hash(keys: &[String]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
keys.hash(&mut hasher);
hasher.finish()
}
fn mark_header_written_globally(&self) -> bool {
let mut registry = CSV_FORMATTER_HEADER_REGISTRY.lock().unwrap();
if registry.get(&self.formatter_key).copied().unwrap_or(false) {
false
} else {
registry.insert(self.formatter_key.clone(), true);
true
}
}
pub fn format_header(&self) -> String {
self.keys
.iter()
.map(|key| escape_csv_value(key, self.delimiter))
.collect::<Vec<_>>()
.join(&self.delimiter.to_string())
}
fn format_data_row(&self, event: &Event) -> String {
self.keys
.iter()
.map(|key| {
if let Some(value) = event.fields.get(key) {
let string_value = self.format_csv_value(value);
escape_csv_value(&string_value, self.delimiter)
} else {
String::new() }
})
.collect::<Vec<_>>()
.join(&self.delimiter.to_string())
}
fn format_csv_value(&self, value: &Dynamic) -> String {
if value.clone().try_cast::<rhai::Map>().is_some()
|| value.clone().try_cast::<rhai::Array>().is_some()
{
let flattened = flatten_dynamic(value, FlattenStyle::Underscore, 0);
if flattened.len() == 1 {
flattened.values().next().unwrap().to_string()
} else if flattened.is_empty() {
String::new()
} else {
flattened
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join(",")
}
} else {
value.to_string()
}
}
}
impl pipeline::Formatter for CsvFormatter {
fn format(&self, event: &Event) -> String {
let mut output = String::new();
if !self.worker_mode && self.include_header && self.mark_header_written_globally() {
output.push_str(&self.format_header());
output.push('\n');
}
output.push_str(&self.format_data_row(event));
output
}
}