use crate::report::RunStatus;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectorRunStatus {
Succeeded,
Failed,
Timeout,
}
impl ConnectorRunStatus {
pub fn to_run_status(self) -> RunStatus {
match self {
ConnectorRunStatus::Succeeded => RunStatus::Success,
ConnectorRunStatus::Failed | ConnectorRunStatus::Timeout => RunStatus::Failed,
}
}
pub fn exit_code(self) -> i32 {
match self {
ConnectorRunStatus::Succeeded => 0,
ConnectorRunStatus::Failed | ConnectorRunStatus::Timeout => 1,
}
}
}
pub fn parse_run_status_from_logs(logs: &str) -> Option<RunStatus> {
for line in logs.lines() {
let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) else {
continue;
};
if obj.get("event").and_then(|v| v.as_str()) != Some("run_finished") {
continue;
}
let Some(status_str) = obj.get("status").and_then(|v| v.as_str()) else {
continue;
};
match status_str {
"success" => return Some(RunStatus::Success),
"success_with_warnings" => return Some(RunStatus::SuccessWithWarnings),
"rejected" => return Some(RunStatus::Rejected),
"aborted" => return Some(RunStatus::Aborted),
"failed" => return Some(RunStatus::Failed),
_ => continue,
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::report::RunStatus;
#[test]
fn succeeded_maps_to_run_status_success() {
assert_eq!(
ConnectorRunStatus::Succeeded.to_run_status(),
RunStatus::Success
);
}
#[test]
fn failed_maps_to_run_status_failed() {
assert_eq!(
ConnectorRunStatus::Failed.to_run_status(),
RunStatus::Failed
);
}
#[test]
fn timeout_maps_to_run_status_failed() {
assert_eq!(
ConnectorRunStatus::Timeout.to_run_status(),
RunStatus::Failed
);
}
#[test]
fn succeeded_exit_code_is_zero() {
assert_eq!(ConnectorRunStatus::Succeeded.exit_code(), 0);
}
#[test]
fn failed_exit_code_is_one() {
assert_eq!(ConnectorRunStatus::Failed.exit_code(), 1);
}
#[test]
fn timeout_exit_code_is_one() {
assert_eq!(ConnectorRunStatus::Timeout.exit_code(), 1);
}
fn make_run_finished_log(status: &str) -> String {
format!(
r#"{{"schema":"floe/v0/log","level":"info","event":"run_finished","run_id":"x","status":"{status}","exit_code":0,"files":1,"rows":10,"accepted":8,"rejected":2,"warnings":0,"errors":0,"summary_uri":null,"ts_ms":0}}"#
)
}
#[test]
fn parses_success_status() {
assert_eq!(
parse_run_status_from_logs(&make_run_finished_log("success")),
Some(RunStatus::Success)
);
}
#[test]
fn parses_success_with_warnings_status() {
assert_eq!(
parse_run_status_from_logs(&make_run_finished_log("success_with_warnings")),
Some(RunStatus::SuccessWithWarnings)
);
}
#[test]
fn parses_rejected_status() {
assert_eq!(
parse_run_status_from_logs(&make_run_finished_log("rejected")),
Some(RunStatus::Rejected)
);
}
#[test]
fn parses_aborted_status() {
assert_eq!(
parse_run_status_from_logs(&make_run_finished_log("aborted")),
Some(RunStatus::Aborted)
);
}
#[test]
fn parses_failed_status() {
assert_eq!(
parse_run_status_from_logs(&make_run_finished_log("failed")),
Some(RunStatus::Failed)
);
}
#[test]
fn returns_none_for_empty_logs() {
assert_eq!(parse_run_status_from_logs(""), None);
}
#[test]
fn returns_none_for_non_json_logs() {
assert_eq!(
parse_run_status_from_logs("some plain text\nno json here"),
None
);
}
#[test]
fn ignores_non_run_finished_events() {
let noise = r#"{"schema":"floe/v0/log","level":"info","event":"entity_finished","name":"foo","status":"rejected","ts_ms":0}"#;
assert_eq!(parse_run_status_from_logs(noise), None);
}
#[test]
fn ignores_unknown_status_values() {
assert_eq!(
parse_run_status_from_logs(&make_run_finished_log("unknown_future_status")),
None
);
}
#[test]
fn malformed_run_finished_missing_status_followed_by_valid_returns_valid() {
let malformed = r#"{"schema":"floe/v0/log","event":"run_finished","run_id":"x","ts_ms":0}"#;
let valid = make_run_finished_log("rejected");
let logs = format!("{malformed}\n{valid}");
assert_eq!(
parse_run_status_from_logs(&logs),
Some(RunStatus::Rejected),
"valid run_finished after malformed one must be found"
);
}
#[test]
fn malformed_run_finished_non_string_status_followed_by_valid_returns_valid() {
let malformed = r#"{"schema":"floe/v0/log","event":"run_finished","status":42,"ts_ms":0}"#;
let valid = make_run_finished_log("success");
let logs = format!("{malformed}\n{valid}");
assert_eq!(
parse_run_status_from_logs(&logs),
Some(RunStatus::Success),
"valid run_finished after non-string status must be found"
);
}
#[test]
fn only_malformed_run_finished_records_returns_none() {
let malformed1 =
r#"{"schema":"floe/v0/log","event":"run_finished","run_id":"x","ts_ms":0}"#;
let malformed2 =
r#"{"schema":"floe/v0/log","event":"run_finished","status":null,"ts_ms":0}"#;
let logs = format!("{malformed1}\n{malformed2}");
assert_eq!(
parse_run_status_from_logs(&logs),
None,
"no valid run_finished => None"
);
}
#[test]
fn finds_event_among_mixed_log_lines() {
let logs = format!(
"not json\n{}\n{}",
r#"{"event":"other","status":"success"}"#,
make_run_finished_log("rejected")
);
assert_eq!(parse_run_status_from_logs(&logs), Some(RunStatus::Rejected));
}
}