Skip to main content

floe_core/runner/
outcome.rs

1//! Run-outcome helpers for connector-owned execution adapters.
2//!
3//! Connectors (Airflow, Dagster, Kubernetes, …) use these types and helpers
4//! to map their execution results onto Floe's normalized [`RunStatus`] before
5//! building a [`RunOutcome`] or writing a summary report.
6
7use crate::report::RunStatus;
8
9// ---------------------------------------------------------------------------
10// ConnectorRunStatus
11// ---------------------------------------------------------------------------
12
13/// The terminal state of a connector-managed run as observed by the connector.
14///
15/// Connectors use this enum to represent their own job/task states before
16/// mapping them to the Floe-normalized [`RunStatus`].  The mapping is
17/// deliberately coarse: fine-grained outcomes (e.g. `Rejected`, `Aborted`)
18/// can only be inferred from the in-pod [`parse_run_status_from_logs`] helper.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum ConnectorRunStatus {
21    /// The job/task completed without infrastructure error.
22    ///
23    /// The actual logical outcome (success, rejected, …) must be read from
24    /// the pod/task logs via [`parse_run_status_from_logs`].
25    Succeeded,
26    /// The job/task failed at the infrastructure level (non-zero exit,
27    /// pod OOM, etc.) before or without emitting a `run_finished` event.
28    Failed,
29    /// The connector's timeout elapsed before the job/task reached a
30    /// terminal state.
31    Timeout,
32}
33
34impl ConnectorRunStatus {
35    /// Map to a Floe [`RunStatus`] suitable for a summary report.
36    ///
37    /// Use [`parse_run_status_from_logs`] first to obtain a finer-grained
38    /// status from pod logs; only fall back to this mapping when logs are
39    /// unavailable or parsing fails.
40    pub fn to_run_status(self) -> RunStatus {
41        match self {
42            ConnectorRunStatus::Succeeded => RunStatus::Success,
43            ConnectorRunStatus::Failed | ConnectorRunStatus::Timeout => RunStatus::Failed,
44        }
45    }
46
47    /// Conventional exit code for use in summary reports.
48    pub fn exit_code(self) -> i32 {
49        match self {
50            ConnectorRunStatus::Succeeded => 0,
51            ConnectorRunStatus::Failed | ConnectorRunStatus::Timeout => 1,
52        }
53    }
54}
55
56// ---------------------------------------------------------------------------
57// parse_run_status_from_logs
58// ---------------------------------------------------------------------------
59
60/// Parse a [`RunStatus`] from structured log output produced by
61/// `floe run --log-format json`.
62///
63/// Scans `logs` line-by-line for a JSON object with
64/// `"event": "run_finished"` and extracts the `"status"` field.
65/// Returns `None` if no such event is found or if the status string
66/// is not a recognized value.
67///
68/// # Example
69/// ```
70/// use floe_core::runner::parse_run_status_from_logs;
71/// use floe_core::report::RunStatus;
72///
73/// let logs = r#"{"schema":"floe/v0/log","level":"info","event":"run_finished","run_id":"r1","status":"success","exit_code":0,"files":1,"rows":10,"accepted":10,"rejected":0,"warnings":0,"errors":0,"summary_uri":null,"ts_ms":0}"#;
74/// assert_eq!(parse_run_status_from_logs(logs), Some(RunStatus::Success));
75/// ```
76pub fn parse_run_status_from_logs(logs: &str) -> Option<RunStatus> {
77    for line in logs.lines() {
78        let Ok(obj) = serde_json::from_str::<serde_json::Value>(line) else {
79            continue;
80        };
81        if obj.get("event").and_then(|v| v.as_str()) != Some("run_finished") {
82            continue;
83        }
84        let Some(status_str) = obj.get("status").and_then(|v| v.as_str()) else {
85            continue;
86        };
87        match status_str {
88            "success" => return Some(RunStatus::Success),
89            "success_with_warnings" => return Some(RunStatus::SuccessWithWarnings),
90            "rejected" => return Some(RunStatus::Rejected),
91            "aborted" => return Some(RunStatus::Aborted),
92            "failed" => return Some(RunStatus::Failed),
93            _ => continue,
94        }
95    }
96    None
97}
98
99// ---------------------------------------------------------------------------
100// Tests
101// ---------------------------------------------------------------------------
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::report::RunStatus;
107
108    // ---- ConnectorRunStatus ------------------------------------------------
109
110    #[test]
111    fn succeeded_maps_to_run_status_success() {
112        assert_eq!(
113            ConnectorRunStatus::Succeeded.to_run_status(),
114            RunStatus::Success
115        );
116    }
117
118    #[test]
119    fn failed_maps_to_run_status_failed() {
120        assert_eq!(
121            ConnectorRunStatus::Failed.to_run_status(),
122            RunStatus::Failed
123        );
124    }
125
126    #[test]
127    fn timeout_maps_to_run_status_failed() {
128        assert_eq!(
129            ConnectorRunStatus::Timeout.to_run_status(),
130            RunStatus::Failed
131        );
132    }
133
134    #[test]
135    fn succeeded_exit_code_is_zero() {
136        assert_eq!(ConnectorRunStatus::Succeeded.exit_code(), 0);
137    }
138
139    #[test]
140    fn failed_exit_code_is_one() {
141        assert_eq!(ConnectorRunStatus::Failed.exit_code(), 1);
142    }
143
144    #[test]
145    fn timeout_exit_code_is_one() {
146        assert_eq!(ConnectorRunStatus::Timeout.exit_code(), 1);
147    }
148
149    // ---- parse_run_status_from_logs ----------------------------------------
150
151    fn make_run_finished_log(status: &str) -> String {
152        format!(
153            r#"{{"schema":"floe/v0/log","level":"info","event":"run_finished","run_id":"x","status":"{status}","exit_code":0,"files":1,"rows":10,"accepted":8,"rejected":2,"warnings":0,"errors":0,"summary_uri":null,"ts_ms":0}}"#
154        )
155    }
156
157    #[test]
158    fn parses_success_status() {
159        assert_eq!(
160            parse_run_status_from_logs(&make_run_finished_log("success")),
161            Some(RunStatus::Success)
162        );
163    }
164
165    #[test]
166    fn parses_success_with_warnings_status() {
167        assert_eq!(
168            parse_run_status_from_logs(&make_run_finished_log("success_with_warnings")),
169            Some(RunStatus::SuccessWithWarnings)
170        );
171    }
172
173    #[test]
174    fn parses_rejected_status() {
175        assert_eq!(
176            parse_run_status_from_logs(&make_run_finished_log("rejected")),
177            Some(RunStatus::Rejected)
178        );
179    }
180
181    #[test]
182    fn parses_aborted_status() {
183        assert_eq!(
184            parse_run_status_from_logs(&make_run_finished_log("aborted")),
185            Some(RunStatus::Aborted)
186        );
187    }
188
189    #[test]
190    fn parses_failed_status() {
191        assert_eq!(
192            parse_run_status_from_logs(&make_run_finished_log("failed")),
193            Some(RunStatus::Failed)
194        );
195    }
196
197    #[test]
198    fn returns_none_for_empty_logs() {
199        assert_eq!(parse_run_status_from_logs(""), None);
200    }
201
202    #[test]
203    fn returns_none_for_non_json_logs() {
204        assert_eq!(
205            parse_run_status_from_logs("some plain text\nno json here"),
206            None
207        );
208    }
209
210    #[test]
211    fn ignores_non_run_finished_events() {
212        let noise = r#"{"schema":"floe/v0/log","level":"info","event":"entity_finished","name":"foo","status":"rejected","ts_ms":0}"#;
213        assert_eq!(parse_run_status_from_logs(noise), None);
214    }
215
216    #[test]
217    fn ignores_unknown_status_values() {
218        assert_eq!(
219            parse_run_status_from_logs(&make_run_finished_log("unknown_future_status")),
220            None
221        );
222    }
223
224    #[test]
225    fn malformed_run_finished_missing_status_followed_by_valid_returns_valid() {
226        // A run_finished record with no "status" field must be skipped, not
227        // treated as a terminal None that hides the valid record below it.
228        let malformed = r#"{"schema":"floe/v0/log","event":"run_finished","run_id":"x","ts_ms":0}"#;
229        let valid = make_run_finished_log("rejected");
230        let logs = format!("{malformed}\n{valid}");
231        assert_eq!(
232            parse_run_status_from_logs(&logs),
233            Some(RunStatus::Rejected),
234            "valid run_finished after malformed one must be found"
235        );
236    }
237
238    #[test]
239    fn malformed_run_finished_non_string_status_followed_by_valid_returns_valid() {
240        // "status" present but not a string (e.g. a number) — still malformed.
241        let malformed = r#"{"schema":"floe/v0/log","event":"run_finished","status":42,"ts_ms":0}"#;
242        let valid = make_run_finished_log("success");
243        let logs = format!("{malformed}\n{valid}");
244        assert_eq!(
245            parse_run_status_from_logs(&logs),
246            Some(RunStatus::Success),
247            "valid run_finished after non-string status must be found"
248        );
249    }
250
251    #[test]
252    fn only_malformed_run_finished_records_returns_none() {
253        let malformed1 =
254            r#"{"schema":"floe/v0/log","event":"run_finished","run_id":"x","ts_ms":0}"#;
255        let malformed2 =
256            r#"{"schema":"floe/v0/log","event":"run_finished","status":null,"ts_ms":0}"#;
257        let logs = format!("{malformed1}\n{malformed2}");
258        assert_eq!(
259            parse_run_status_from_logs(&logs),
260            None,
261            "no valid run_finished => None"
262        );
263    }
264
265    #[test]
266    fn finds_event_among_mixed_log_lines() {
267        let logs = format!(
268            "not json\n{}\n{}",
269            r#"{"event":"other","status":"success"}"#,
270            make_run_finished_log("rejected")
271        );
272        assert_eq!(parse_run_status_from_logs(&logs), Some(RunStatus::Rejected));
273    }
274}