use regex::Regex;
use serde_json::Value;
#[derive(Debug, Clone)]
pub struct PrintSinkOutput {
rows: Vec<PrintSinkRow>,
}
#[derive(Debug, Clone)]
pub struct PrintSinkRow {
pub row_kind: String,
pub data: Value,
}
impl PrintSinkOutput {
pub fn parse(output: &str) -> Self {
let re =
Regex::new(r"\d+ -> \(sample_every=\d+\) \[(Insert|Delete|Update)\] (.+)").unwrap();
let rows = output
.lines()
.filter_map(|line| {
re.captures(line).and_then(|caps| {
let row_kind = caps.get(1)?.as_str().to_string();
let json_str = caps.get(2)?.as_str();
let data = serde_json::from_str(json_str).ok()?;
Some(PrintSinkRow { row_kind, data })
})
})
.collect();
Self { rows }
}
pub fn has_column(&self, name: &str) -> bool {
self.rows.iter().any(|r| r.data.get(name).is_some())
}
pub fn column_count(&self, name: &str) -> usize {
self.rows
.iter()
.filter(|r| r.data.get(name).is_some())
.count()
}
pub fn column_values(&self, name: &str) -> Vec<&Value> {
self.rows.iter().filter_map(|r| r.data.get(name)).collect()
}
pub fn rows(&self) -> &[PrintSinkRow] {
&self.rows
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn column_names(&self) -> Vec<String> {
self.rows
.first()
.and_then(|r| r.data.as_object())
.map(|obj| obj.keys().cloned().collect())
.unwrap_or_default()
}
pub fn get(&self, index: usize) -> Option<&PrintSinkRow> {
self.rows.get(index)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_print_sink_output() {
let output = r#"
2025-01-08T10:00:00.000Z INFO streamling: 1 -> (sample_every=1) [Insert] {"id":1,"data":"hello","_gs_op":"c"}
2025-01-08T10:00:00.001Z INFO streamling: 2 -> (sample_every=1) [Insert] {"id":2,"data":"world","_gs_op":"c"}
2025-01-08T10:00:00.002Z INFO streamling: 3 -> (sample_every=1) [Delete] {"id":1,"data":"hello","_gs_op":"d"}
"#;
let parsed = PrintSinkOutput::parse(output);
assert_eq!(parsed.len(), 3);
assert!(parsed.has_column("id"));
assert!(parsed.has_column("data"));
assert!(parsed.has_column("_gs_op"));
assert!(!parsed.has_column("nonexistent"));
assert_eq!(parsed.column_count("id"), 3);
assert_eq!(parsed.column_count("_gs_op"), 3);
let column_names = parsed.column_names();
assert!(column_names.contains(&"id".to_string()));
assert!(column_names.contains(&"data".to_string()));
assert!(column_names.contains(&"_gs_op".to_string()));
assert_eq!(parsed.rows()[0].row_kind, "Insert");
assert_eq!(parsed.rows()[1].row_kind, "Insert");
assert_eq!(parsed.rows()[2].row_kind, "Delete");
}
#[test]
fn test_parse_empty_output() {
let output = "Some other log line\nAnother line without print sink format";
let parsed = PrintSinkOutput::parse(output);
assert!(parsed.is_empty());
}
}