streamling_e2e/resources/
print_sink.rs1use regex::Regex;
9use serde_json::Value;
10
11#[derive(Debug, Clone)]
13pub struct PrintSinkOutput {
14 rows: Vec<PrintSinkRow>,
15}
16
17#[derive(Debug, Clone)]
19pub struct PrintSinkRow {
20 pub row_kind: String,
22 pub data: Value,
24}
25
26impl PrintSinkOutput {
27 pub fn parse(output: &str) -> Self {
31 let re =
32 Regex::new(r"\d+ -> \(sample_every=\d+\) \[(Insert|Delete|Update)\] (.+)").unwrap();
33
34 let rows = output
35 .lines()
36 .filter_map(|line| {
37 re.captures(line).and_then(|caps| {
38 let row_kind = caps.get(1)?.as_str().to_string();
39 let json_str = caps.get(2)?.as_str();
40 let data = serde_json::from_str(json_str).ok()?;
41 Some(PrintSinkRow { row_kind, data })
42 })
43 })
44 .collect();
45
46 Self { rows }
47 }
48
49 pub fn has_column(&self, name: &str) -> bool {
51 self.rows.iter().any(|r| r.data.get(name).is_some())
52 }
53
54 pub fn column_count(&self, name: &str) -> usize {
56 self.rows
57 .iter()
58 .filter(|r| r.data.get(name).is_some())
59 .count()
60 }
61
62 pub fn column_values(&self, name: &str) -> Vec<&Value> {
64 self.rows.iter().filter_map(|r| r.data.get(name)).collect()
65 }
66
67 pub fn rows(&self) -> &[PrintSinkRow] {
69 &self.rows
70 }
71
72 pub fn len(&self) -> usize {
74 self.rows.len()
75 }
76
77 pub fn is_empty(&self) -> bool {
79 self.rows.is_empty()
80 }
81
82 pub fn column_names(&self) -> Vec<String> {
84 self.rows
85 .first()
86 .and_then(|r| r.data.as_object())
87 .map(|obj| obj.keys().cloned().collect())
88 .unwrap_or_default()
89 }
90
91 pub fn get(&self, index: usize) -> Option<&PrintSinkRow> {
93 self.rows.get(index)
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[test]
102 fn test_parse_print_sink_output() {
103 let output = r#"
1042025-01-08T10:00:00.000Z INFO streamling: 1 -> (sample_every=1) [Insert] {"id":1,"data":"hello","_gs_op":"c"}
1052025-01-08T10:00:00.001Z INFO streamling: 2 -> (sample_every=1) [Insert] {"id":2,"data":"world","_gs_op":"c"}
1062025-01-08T10:00:00.002Z INFO streamling: 3 -> (sample_every=1) [Delete] {"id":1,"data":"hello","_gs_op":"d"}
107"#;
108
109 let parsed = PrintSinkOutput::parse(output);
110
111 assert_eq!(parsed.len(), 3);
112 assert!(parsed.has_column("id"));
113 assert!(parsed.has_column("data"));
114 assert!(parsed.has_column("_gs_op"));
115 assert!(!parsed.has_column("nonexistent"));
116
117 assert_eq!(parsed.column_count("id"), 3);
118 assert_eq!(parsed.column_count("_gs_op"), 3);
119
120 let column_names = parsed.column_names();
121 assert!(column_names.contains(&"id".to_string()));
122 assert!(column_names.contains(&"data".to_string()));
123 assert!(column_names.contains(&"_gs_op".to_string()));
124
125 assert_eq!(parsed.rows()[0].row_kind, "Insert");
127 assert_eq!(parsed.rows()[1].row_kind, "Insert");
128 assert_eq!(parsed.rows()[2].row_kind, "Delete");
129 }
130
131 #[test]
132 fn test_parse_empty_output() {
133 let output = "Some other log line\nAnother line without print sink format";
134 let parsed = PrintSinkOutput::parse(output);
135 assert!(parsed.is_empty());
136 }
137}