Skip to main content

streamling_e2e/resources/
print_sink.rs

1//! Print sink output parser for capturing and verifying pipeline output.
2//!
3//! The print sink outputs JSON rows to tracing logs with format:
4//! `{row_count} -> (sample_every={n}) [{row_kind}] {json}`
5//!
6//! This module parses that output to enable assertions on the pipeline's output data.
7
8use regex::Regex;
9use serde_json::Value;
10
11/// Parsed output from a print sink
12#[derive(Debug, Clone)]
13pub struct PrintSinkOutput {
14    rows: Vec<PrintSinkRow>,
15}
16
17/// A single row from the print sink output
18#[derive(Debug, Clone)]
19pub struct PrintSinkRow {
20    /// Row kind: "Insert", "Delete", or "Update"
21    pub row_kind: String,
22    /// Parsed JSON data
23    pub data: Value,
24}
25
26impl PrintSinkOutput {
27    /// Parse print sink output from captured stderr/stdout
28    ///
29    /// Format: "{row_count} -> (sample_every={n}) [{row_kind}] {json}"
30    pub fn parse(output: &str) -> Self {
31        let re =
32            Regex::new(r"\d+ -> \(sample_every=\d+\) \[(Insert|Delete|Update)\] (.+)").unwrap();
33
34        let rows = output
35            .lines()
36            .filter_map(|line| {
37                re.captures(line).and_then(|caps| {
38                    let row_kind = caps.get(1)?.as_str().to_string();
39                    let json_str = caps.get(2)?.as_str();
40                    let data = serde_json::from_str(json_str).ok()?;
41                    Some(PrintSinkRow { row_kind, data })
42                })
43            })
44            .collect();
45
46        Self { rows }
47    }
48
49    /// Check if a column exists in any row
50    pub fn has_column(&self, name: &str) -> bool {
51        self.rows.iter().any(|r| r.data.get(name).is_some())
52    }
53
54    /// Count how many rows have a given column
55    pub fn column_count(&self, name: &str) -> usize {
56        self.rows
57            .iter()
58            .filter(|r| r.data.get(name).is_some())
59            .count()
60    }
61
62    /// Get all values for a column across all rows
63    pub fn column_values(&self, name: &str) -> Vec<&Value> {
64        self.rows.iter().filter_map(|r| r.data.get(name)).collect()
65    }
66
67    /// Get all rows
68    pub fn rows(&self) -> &[PrintSinkRow] {
69        &self.rows
70    }
71
72    /// Get the number of rows
73    pub fn len(&self) -> usize {
74        self.rows.len()
75    }
76
77    /// Check if there are no rows
78    pub fn is_empty(&self) -> bool {
79        self.rows.is_empty()
80    }
81
82    /// Get column names from the first row
83    pub fn column_names(&self) -> Vec<String> {
84        self.rows
85            .first()
86            .and_then(|r| r.data.as_object())
87            .map(|obj| obj.keys().cloned().collect())
88            .unwrap_or_default()
89    }
90
91    /// Get a specific row by index
92    pub fn get(&self, index: usize) -> Option<&PrintSinkRow> {
93        self.rows.get(index)
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100
101    #[test]
102    fn test_parse_print_sink_output() {
103        let output = r#"
1042025-01-08T10:00:00.000Z  INFO streamling: 1 -> (sample_every=1) [Insert] {"id":1,"data":"hello","_gs_op":"c"}
1052025-01-08T10:00:00.001Z  INFO streamling: 2 -> (sample_every=1) [Insert] {"id":2,"data":"world","_gs_op":"c"}
1062025-01-08T10:00:00.002Z  INFO streamling: 3 -> (sample_every=1) [Delete] {"id":1,"data":"hello","_gs_op":"d"}
107"#;
108
109        let parsed = PrintSinkOutput::parse(output);
110
111        assert_eq!(parsed.len(), 3);
112        assert!(parsed.has_column("id"));
113        assert!(parsed.has_column("data"));
114        assert!(parsed.has_column("_gs_op"));
115        assert!(!parsed.has_column("nonexistent"));
116
117        assert_eq!(parsed.column_count("id"), 3);
118        assert_eq!(parsed.column_count("_gs_op"), 3);
119
120        let column_names = parsed.column_names();
121        assert!(column_names.contains(&"id".to_string()));
122        assert!(column_names.contains(&"data".to_string()));
123        assert!(column_names.contains(&"_gs_op".to_string()));
124
125        // Check row kinds
126        assert_eq!(parsed.rows()[0].row_kind, "Insert");
127        assert_eq!(parsed.rows()[1].row_kind, "Insert");
128        assert_eq!(parsed.rows()[2].row_kind, "Delete");
129    }
130
131    #[test]
132    fn test_parse_empty_output() {
133        let output = "Some other log line\nAnother line without print sink format";
134        let parsed = PrintSinkOutput::parse(output);
135        assert!(parsed.is_empty());
136    }
137}