use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FlakeCategory {
OracleTimeout,
ResourceExhaustion,
FsContention,
PortConflict,
TmpdirRace,
JsGcPressure,
}
impl FlakeCategory {
#[must_use]
pub const fn all() -> &'static [Self] {
&[
Self::OracleTimeout,
Self::ResourceExhaustion,
Self::FsContention,
Self::PortConflict,
Self::TmpdirRace,
Self::JsGcPressure,
]
}
#[must_use]
pub const fn label(self) -> &'static str {
match self {
Self::OracleTimeout => "TS oracle timeout",
Self::ResourceExhaustion => "resource exhaustion",
Self::FsContention => "filesystem contention",
Self::PortConflict => "port conflict",
Self::TmpdirRace => "temp directory race",
Self::JsGcPressure => "QuickJS GC pressure",
}
}
}
impl std::fmt::Display for FlakeCategory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.label())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FlakeClassification {
Transient {
category: FlakeCategory,
matched_line: String,
},
Deterministic,
}
impl FlakeClassification {
#[must_use]
pub const fn is_retriable(&self) -> bool {
matches!(self, Self::Transient { .. })
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlakeEvent {
pub target: String,
pub classification: FlakeClassification,
pub attempt: u32,
pub timestamp: String,
}
#[must_use]
pub fn classify_failure(output: &str) -> FlakeClassification {
let lower = output.to_lowercase();
for line in lower.lines() {
let trimmed = line.trim();
if (trimmed.contains("oracle") || trimmed.contains("bun"))
&& (trimmed.contains("timed out") || trimmed.contains("timeout"))
{
return FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
matched_line: trimmed.to_string(),
};
}
if trimmed.contains("out of memory")
|| trimmed.contains("enomem")
|| trimmed.contains("cannot allocate")
{
let category = if trimmed.contains("quickjs") || trimmed.contains("allocation failed") {
FlakeCategory::JsGcPressure
} else {
FlakeCategory::ResourceExhaustion
};
return FlakeClassification::Transient {
category,
matched_line: trimmed.to_string(),
};
}
if trimmed.contains("ebusy")
|| trimmed.contains("etxtbsy")
|| trimmed.contains("resource busy")
{
return FlakeClassification::Transient {
category: FlakeCategory::FsContention,
matched_line: trimmed.to_string(),
};
}
if trimmed.contains("eaddrinuse") || trimmed.contains("address already in use") {
return FlakeClassification::Transient {
category: FlakeCategory::PortConflict,
matched_line: trimmed.to_string(),
};
}
if (trimmed.contains("no such file or directory") || trimmed.contains("enoent"))
&& trimmed.contains("tmp")
{
return FlakeClassification::Transient {
category: FlakeCategory::TmpdirRace,
matched_line: trimmed.to_string(),
};
}
if trimmed.contains("quickjs") && trimmed.contains("allocation failed") {
return FlakeClassification::Transient {
category: FlakeCategory::JsGcPressure,
matched_line: trimmed.to_string(),
};
}
}
FlakeClassification::Deterministic
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_retries: u32,
pub retry_delay_secs: u32,
pub flake_budget: u32,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: std::env::var("PI_CONFORMANCE_MAX_RETRIES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(1),
retry_delay_secs: std::env::var("PI_CONFORMANCE_RETRY_DELAY")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5),
flake_budget: std::env::var("PI_CONFORMANCE_FLAKE_BUDGET")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(3),
}
}
}
impl RetryPolicy {
#[must_use]
pub const fn should_retry(&self, classification: &FlakeClassification, attempt: u32) -> bool {
classification.is_retriable() && attempt < self.max_retries
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classify_oracle_timeout() {
let output = "error: TS oracle process timed out after 30s";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
..
}
));
}
#[test]
fn classify_bun_timeout() {
let output = "bun process timed out waiting for response";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
..
}
));
}
#[test]
fn classify_oom() {
let output = "fatal: out of memory (allocator returned null)";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::ResourceExhaustion,
..
}
));
}
#[test]
fn classify_enomem() {
let output = "error: ENOMEM: not enough memory";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::ResourceExhaustion,
..
}
));
}
#[test]
fn classify_quickjs_gc() {
let output = "quickjs runtime: allocation failed, out of memory";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::JsGcPressure,
..
}
));
}
#[test]
fn classify_ebusy() {
let output = "error: EBUSY: resource busy or locked";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::FsContention,
..
}
));
}
#[test]
fn classify_port_conflict() {
let output = "listen EADDRINUSE: address already in use :::8080";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::PortConflict,
..
}
));
}
#[test]
fn classify_tmpdir_race() {
let output = "error: No such file or directory (os error 2), path: /tmp/pi-test-abc123";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::TmpdirRace,
..
}
));
}
#[test]
fn classify_deterministic() {
let output = "assertion failed: expected PASS but got FAIL\nnote: left == right";
let result = classify_failure(output);
assert_eq!(result, FlakeClassification::Deterministic);
}
#[test]
fn classify_empty_output() {
assert_eq!(classify_failure(""), FlakeClassification::Deterministic);
}
#[test]
fn classification_is_retriable() {
let transient = FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
matched_line: "timeout".into(),
};
assert!(transient.is_retriable());
assert!(!FlakeClassification::Deterministic.is_retriable());
}
#[test]
fn retry_policy_default() {
let policy = RetryPolicy {
max_retries: 1,
retry_delay_secs: 5,
flake_budget: 3,
};
let transient = FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
matched_line: "x".into(),
};
assert!(policy.should_retry(&transient, 0));
assert!(!policy.should_retry(&transient, 1));
assert!(!policy.should_retry(&FlakeClassification::Deterministic, 0));
}
#[test]
fn flake_event_serde_roundtrip() {
let event = FlakeEvent {
target: "ext_conformance".into(),
classification: FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
matched_line: "oracle timed out".into(),
},
attempt: 1,
timestamp: "2026-02-08T03:00:00Z".into(),
};
let json = serde_json::to_string(&event).unwrap();
let back: FlakeEvent = serde_json::from_str(&json).unwrap();
assert_eq!(back.target, "ext_conformance");
assert!(back.classification.is_retriable());
}
#[test]
fn flake_category_all_covered() {
assert_eq!(FlakeCategory::all().len(), 6);
for cat in FlakeCategory::all() {
assert!(!cat.label().is_empty());
assert!(!cat.to_string().is_empty());
}
}
#[test]
fn multiline_output_matches_first_pattern() {
let output = "starting test...\ncompiling extensions...\nerror: bun process timed out\nassert failed";
let result = classify_failure(output);
assert!(matches!(
result,
FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
..
}
));
}
#[test]
fn case_insensitive_matching() {
let output = "ERROR: OUT OF MEMORY";
let result = classify_failure(output);
assert!(result.is_retriable());
}
mod proptest_flake_classifier {
use super::*;
use proptest::prelude::*;
fn arb_transient_line() -> impl Strategy<Value = (String, FlakeCategory)> {
prop_oneof![
Just((
"oracle process timed out".to_string(),
FlakeCategory::OracleTimeout
)),
Just((
"bun timed out waiting".to_string(),
FlakeCategory::OracleTimeout
)),
Just((
"fatal: out of memory".to_string(),
FlakeCategory::ResourceExhaustion
)),
Just((
"error: ENOMEM".to_string(),
FlakeCategory::ResourceExhaustion
)),
Just((
"cannot allocate 4 GB".to_string(),
FlakeCategory::ResourceExhaustion
)),
Just((
"quickjs runtime: allocation failed, out of memory".to_string(),
FlakeCategory::JsGcPressure
)),
Just((
"EBUSY: resource busy".to_string(),
FlakeCategory::FsContention
)),
Just(("ETXTBSY".to_string(), FlakeCategory::FsContention)),
Just((
"resource busy or locked".to_string(),
FlakeCategory::FsContention
)),
Just((
"EADDRINUSE on port 8080".to_string(),
FlakeCategory::PortConflict
)),
Just((
"address already in use".to_string(),
FlakeCategory::PortConflict
)),
Just((
"ENOENT: no such file or directory /tmp/pi-test".to_string(),
FlakeCategory::TmpdirRace
)),
]
}
proptest! {
#[test]
fn classify_failure_never_panics(s in ".*") {
let _ = classify_failure(&s);
}
#[test]
fn deterministic_is_not_retriable(s in "[a-zA-Z0-9 ]{0,200}") {
let result = classify_failure(&s);
if result == FlakeClassification::Deterministic {
assert!(!result.is_retriable());
}
}
#[test]
fn transient_is_always_retriable(s in ".*") {
let result = classify_failure(&s);
if let FlakeClassification::Transient { .. } = &result {
assert!(result.is_retriable());
}
}
#[test]
fn known_transient_lines_classify_correctly(
(line, expected_cat) in arb_transient_line()
) {
let result = classify_failure(&line);
match result {
FlakeClassification::Transient { category, .. } => {
assert_eq!(
category, expected_cat,
"line {line:?} got {category:?} expected {expected_cat:?}"
);
}
FlakeClassification::Deterministic => {
panic!("expected Transient for {line:?}, got Deterministic");
}
}
}
#[test]
fn classify_is_case_insensitive(
(line, expected_cat) in arb_transient_line()
) {
let upper = classify_failure(&line.to_uppercase());
let lower = classify_failure(&line.to_lowercase());
match (&upper, &lower) {
(
FlakeClassification::Transient { category: cu, .. },
FlakeClassification::Transient { category: cl, .. },
) => {
assert_eq!(*cu, expected_cat);
assert_eq!(*cl, expected_cat);
}
_ => panic!("expected both Transient for line {line:?}"),
}
}
#[test]
fn noise_prefix_preserves_classification(
noise in "[a-zA-Z0-9 ]{0,50}",
(line, expected_cat) in arb_transient_line(),
) {
let input = format!("{noise}\n{line}");
let result = classify_failure(&input);
match result {
FlakeClassification::Transient { category, .. } => {
assert_eq!(category, expected_cat);
}
FlakeClassification::Deterministic => {
panic!("expected Transient for input with line {line:?}");
}
}
}
#[test]
fn whitespace_only_is_deterministic(s in "[ \\t\\n]{0,100}") {
assert_eq!(classify_failure(&s), FlakeClassification::Deterministic);
}
#[test]
fn serde_roundtrip_transient((line, _cat) in arb_transient_line()) {
let result = classify_failure(&line);
let json = serde_json::to_string(&result).unwrap();
let back: FlakeClassification = serde_json::from_str(&json).unwrap();
assert_eq!(result, back);
}
#[test]
fn serde_roundtrip_category(idx in 0..6usize) {
let cat = FlakeCategory::all()[idx];
let json = serde_json::to_string(&cat).unwrap();
let back: FlakeCategory = serde_json::from_str(&json).unwrap();
assert_eq!(cat, back);
}
#[test]
fn all_categories_have_nonempty_labels(idx in 0..6usize) {
let cat = FlakeCategory::all()[idx];
assert!(!cat.label().is_empty());
assert!(!cat.to_string().is_empty());
assert_eq!(cat.label(), cat.to_string());
}
#[test]
fn retry_policy_respects_attempt_bound(
max_retries in 0..10u32,
attempt in 0..20u32,
) {
let policy = RetryPolicy {
max_retries,
retry_delay_secs: 1,
flake_budget: 3,
};
let transient = FlakeClassification::Transient {
category: FlakeCategory::OracleTimeout,
matched_line: "x".into(),
};
let should = policy.should_retry(&transient, attempt);
assert_eq!(should, attempt < max_retries);
}
#[test]
fn retry_policy_never_retries_deterministic(
max_retries in 0..10u32,
attempt in 0..20u32,
) {
let policy = RetryPolicy {
max_retries,
retry_delay_secs: 1,
flake_budget: 3,
};
assert!(!policy.should_retry(&FlakeClassification::Deterministic, attempt));
}
#[test]
fn flake_event_serde_roundtrip_prop(
target in "[a-z_]{1,20}",
attempt in 0..100u32,
idx in 0..6usize,
) {
let cat = FlakeCategory::all()[idx];
let event = FlakeEvent {
target: target.clone(),
classification: FlakeClassification::Transient {
category: cat,
matched_line: "matched".into(),
},
attempt,
timestamp: "2026-01-01T00:00:00Z".into(),
};
let json = serde_json::to_string(&event).unwrap();
let back: FlakeEvent = serde_json::from_str(&json).unwrap();
assert_eq!(back.target, target);
assert_eq!(back.attempt, attempt);
assert!(back.classification.is_retriable());
}
}
}
}