floe_core/runner/
outcome.rs1use crate::report::RunStatus;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum ConnectorRunStatus {
21 Succeeded,
26 Failed,
29 Timeout,
32}
33
34impl ConnectorRunStatus {
35 pub fn to_run_status(self) -> RunStatus {
41 match self {
42 ConnectorRunStatus::Succeeded => RunStatus::Success,
43 ConnectorRunStatus::Failed | ConnectorRunStatus::Timeout => RunStatus::Failed,
44 }
45 }
46
47 pub fn exit_code(self) -> i32 {
49 match self {
50 ConnectorRunStatus::Succeeded => 0,
51 ConnectorRunStatus::Failed | ConnectorRunStatus::Timeout => 1,
52 }
53 }
54}
55
56pub fn parse_run_status_from_logs(logs: &str) -> Option<RunStatus> {
77 for line in logs.lines() {
78 let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) else {
79 continue;
80 };
81 if obj.get("event").and_then(|v| v.as_str()) != Some("run_finished") {
82 continue;
83 }
84 let Some(status_str) = obj.get("status").and_then(|v| v.as_str()) else {
85 continue;
86 };
87 match status_str {
88 "success" => return Some(RunStatus::Success),
89 "success_with_warnings" => return Some(RunStatus::SuccessWithWarnings),
90 "rejected" => return Some(RunStatus::Rejected),
91 "aborted" => return Some(RunStatus::Aborted),
92 "failed" => return Some(RunStatus::Failed),
93 _ => continue,
94 }
95 }
96 None
97}
98
99#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::report::RunStatus;
107
108 #[test]
111 fn succeeded_maps_to_run_status_success() {
112 assert_eq!(
113 ConnectorRunStatus::Succeeded.to_run_status(),
114 RunStatus::Success
115 );
116 }
117
118 #[test]
119 fn failed_maps_to_run_status_failed() {
120 assert_eq!(
121 ConnectorRunStatus::Failed.to_run_status(),
122 RunStatus::Failed
123 );
124 }
125
126 #[test]
127 fn timeout_maps_to_run_status_failed() {
128 assert_eq!(
129 ConnectorRunStatus::Timeout.to_run_status(),
130 RunStatus::Failed
131 );
132 }
133
134 #[test]
135 fn succeeded_exit_code_is_zero() {
136 assert_eq!(ConnectorRunStatus::Succeeded.exit_code(), 0);
137 }
138
139 #[test]
140 fn failed_exit_code_is_one() {
141 assert_eq!(ConnectorRunStatus::Failed.exit_code(), 1);
142 }
143
144 #[test]
145 fn timeout_exit_code_is_one() {
146 assert_eq!(ConnectorRunStatus::Timeout.exit_code(), 1);
147 }
148
149 fn make_run_finished_log(status: &str) -> String {
152 format!(
153 r#"{{"schema":"floe/v0/log","level":"info","event":"run_finished","run_id":"x","status":"{status}","exit_code":0,"files":1,"rows":10,"accepted":8,"rejected":2,"warnings":0,"errors":0,"summary_uri":null,"ts_ms":0}}"#
154 )
155 }
156
157 #[test]
158 fn parses_success_status() {
159 assert_eq!(
160 parse_run_status_from_logs(&make_run_finished_log("success")),
161 Some(RunStatus::Success)
162 );
163 }
164
165 #[test]
166 fn parses_success_with_warnings_status() {
167 assert_eq!(
168 parse_run_status_from_logs(&make_run_finished_log("success_with_warnings")),
169 Some(RunStatus::SuccessWithWarnings)
170 );
171 }
172
173 #[test]
174 fn parses_rejected_status() {
175 assert_eq!(
176 parse_run_status_from_logs(&make_run_finished_log("rejected")),
177 Some(RunStatus::Rejected)
178 );
179 }
180
181 #[test]
182 fn parses_aborted_status() {
183 assert_eq!(
184 parse_run_status_from_logs(&make_run_finished_log("aborted")),
185 Some(RunStatus::Aborted)
186 );
187 }
188
189 #[test]
190 fn parses_failed_status() {
191 assert_eq!(
192 parse_run_status_from_logs(&make_run_finished_log("failed")),
193 Some(RunStatus::Failed)
194 );
195 }
196
197 #[test]
198 fn returns_none_for_empty_logs() {
199 assert_eq!(parse_run_status_from_logs(""), None);
200 }
201
202 #[test]
203 fn returns_none_for_non_json_logs() {
204 assert_eq!(
205 parse_run_status_from_logs("some plain text\nno json here"),
206 None
207 );
208 }
209
210 #[test]
211 fn ignores_non_run_finished_events() {
212 let noise = r#"{"schema":"floe/v0/log","level":"info","event":"entity_finished","name":"foo","status":"rejected","ts_ms":0}"#;
213 assert_eq!(parse_run_status_from_logs(noise), None);
214 }
215
216 #[test]
217 fn ignores_unknown_status_values() {
218 assert_eq!(
219 parse_run_status_from_logs(&make_run_finished_log("unknown_future_status")),
220 None
221 );
222 }
223
224 #[test]
225 fn malformed_run_finished_missing_status_followed_by_valid_returns_valid() {
226 let malformed = r#"{"schema":"floe/v0/log","event":"run_finished","run_id":"x","ts_ms":0}"#;
229 let valid = make_run_finished_log("rejected");
230 let logs = format!("{malformed}\n{valid}");
231 assert_eq!(
232 parse_run_status_from_logs(&logs),
233 Some(RunStatus::Rejected),
234 "valid run_finished after malformed one must be found"
235 );
236 }
237
238 #[test]
239 fn malformed_run_finished_non_string_status_followed_by_valid_returns_valid() {
240 let malformed = r#"{"schema":"floe/v0/log","event":"run_finished","status":42,"ts_ms":0}"#;
242 let valid = make_run_finished_log("success");
243 let logs = format!("{malformed}\n{valid}");
244 assert_eq!(
245 parse_run_status_from_logs(&logs),
246 Some(RunStatus::Success),
247 "valid run_finished after non-string status must be found"
248 );
249 }
250
251 #[test]
252 fn only_malformed_run_finished_records_returns_none() {
253 let malformed1 =
254 r#"{"schema":"floe/v0/log","event":"run_finished","run_id":"x","ts_ms":0}"#;
255 let malformed2 =
256 r#"{"schema":"floe/v0/log","event":"run_finished","status":null,"ts_ms":0}"#;
257 let logs = format!("{malformed1}\n{malformed2}");
258 assert_eq!(
259 parse_run_status_from_logs(&logs),
260 None,
261 "no valid run_finished => None"
262 );
263 }
264
265 #[test]
266 fn finds_event_among_mixed_log_lines() {
267 let logs = format!(
268 "not json\n{}\n{}",
269 r#"{"event":"other","status":"success"}"#,
270 make_run_finished_log("rejected")
271 );
272 assert_eq!(parse_run_status_from_logs(&logs), Some(RunStatus::Rejected));
273 }
274}