1use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub enum FlakeCategory {
12 OracleTimeout,
14 ResourceExhaustion,
16 FsContention,
18 PortConflict,
20 TmpdirRace,
22 JsGcPressure,
24}
25
26impl FlakeCategory {
27 #[must_use]
29 pub const fn all() -> &'static [Self] {
30 &[
31 Self::OracleTimeout,
32 Self::ResourceExhaustion,
33 Self::FsContention,
34 Self::PortConflict,
35 Self::TmpdirRace,
36 Self::JsGcPressure,
37 ]
38 }
39
40 #[must_use]
42 pub const fn label(self) -> &'static str {
43 match self {
44 Self::OracleTimeout => "TS oracle timeout",
45 Self::ResourceExhaustion => "resource exhaustion",
46 Self::FsContention => "filesystem contention",
47 Self::PortConflict => "port conflict",
48 Self::TmpdirRace => "temp directory race",
49 Self::JsGcPressure => "QuickJS GC pressure",
50 }
51 }
52}
53
54impl std::fmt::Display for FlakeCategory {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.write_str(self.label())
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum FlakeClassification {
64 Transient {
66 category: FlakeCategory,
67 matched_line: String,
68 },
69 Deterministic,
71}
72
73impl FlakeClassification {
74 #[must_use]
76 pub const fn is_retriable(&self) -> bool {
77 matches!(self, Self::Transient { .. })
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct FlakeEvent {
84 pub target: String,
85 pub classification: FlakeClassification,
86 pub attempt: u32,
87 pub timestamp: String,
88}
89
90const MAX_CLASSIFY_INPUT_SIZE: usize = 1024 * 1024; #[must_use]
101pub fn classify_failure(output: &str) -> FlakeClassification {
102 let bounded_output = if output.len() > MAX_CLASSIFY_INPUT_SIZE {
105 &output[..MAX_CLASSIFY_INPUT_SIZE]
106 } else {
107 output
108 };
109
110 let lower = bounded_output.to_lowercase();
113
114 for line in lower.lines() {
115 let trimmed = line.trim();
116
117 if (trimmed.contains("oracle") || trimmed.contains("bun"))
119 && (trimmed.contains("timed out") || trimmed.contains("timeout"))
120 {
121 return FlakeClassification::Transient {
122 category: FlakeCategory::OracleTimeout,
123 matched_line: trimmed.to_string(),
124 };
125 }
126
127 if trimmed.contains("out of memory")
129 || trimmed.contains("enomem")
130 || trimmed.contains("cannot allocate")
131 {
132 let category = if trimmed.contains("quickjs") || trimmed.contains("allocation failed") {
134 FlakeCategory::JsGcPressure
135 } else {
136 FlakeCategory::ResourceExhaustion
137 };
138 return FlakeClassification::Transient {
139 category,
140 matched_line: trimmed.to_string(),
141 };
142 }
143
144 if trimmed.contains("ebusy")
146 || trimmed.contains("etxtbsy")
147 || trimmed.contains("resource busy")
148 {
149 return FlakeClassification::Transient {
150 category: FlakeCategory::FsContention,
151 matched_line: trimmed.to_string(),
152 };
153 }
154
155 if trimmed.contains("eaddrinuse") || trimmed.contains("address already in use") {
157 return FlakeClassification::Transient {
158 category: FlakeCategory::PortConflict,
159 matched_line: trimmed.to_string(),
160 };
161 }
162
163 if (trimmed.contains("no such file or directory") || trimmed.contains("enoent"))
165 && (trimmed.contains("/tmp") || trimmed.contains("\\tmp") || trimmed.contains("tmpdir"))
166 {
167 return FlakeClassification::Transient {
168 category: FlakeCategory::TmpdirRace,
169 matched_line: trimmed.to_string(),
170 };
171 }
172
173 if trimmed.contains("quickjs") && trimmed.contains("allocation failed") {
175 return FlakeClassification::Transient {
176 category: FlakeCategory::JsGcPressure,
177 matched_line: trimmed.to_string(),
178 };
179 }
180 }
181
182 FlakeClassification::Deterministic
183}
184
185#[derive(Debug, Clone)]
187pub struct RetryPolicy {
188 pub max_retries: u32,
190 pub retry_delay_secs: u32,
192 pub flake_budget: u32,
194}
195
196impl Default for RetryPolicy {
197 fn default() -> Self {
198 Self::from_env(|key| std::env::var(key))
199 }
200}
201
202impl RetryPolicy {
203 fn from_env<F>(get_env: F) -> Self
204 where
205 F: Fn(&str) -> std::result::Result<String, std::env::VarError>,
206 {
207 Self {
208 max_retries: get_env("PI_CONFORMANCE_MAX_RETRIES")
209 .ok()
210 .and_then(|v| v.parse::<u32>().ok())
211 .map_or(1, |v| v.min(100)), retry_delay_secs: get_env("PI_CONFORMANCE_RETRY_DELAY")
213 .ok()
214 .and_then(|v| v.parse::<u32>().ok())
215 .map_or(5, |v| v.min(3600)), flake_budget: get_env("PI_CONFORMANCE_FLAKE_BUDGET")
217 .ok()
218 .and_then(|v| v.parse::<u32>().ok())
219 .map_or(3, |v| v.min(1000)), }
221 }
222
223 #[must_use]
225 pub const fn should_retry(&self, classification: &FlakeClassification, attempt: u32) -> bool {
226 classification.is_retriable() && attempt < self.max_retries
227 }
228}
229
230#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn classify_oracle_timeout() {
240 let output = "error: TS oracle process timed out after 30s";
241 let result = classify_failure(output);
242 assert!(matches!(
243 result,
244 FlakeClassification::Transient {
245 category: FlakeCategory::OracleTimeout,
246 ..
247 }
248 ));
249 }
250
251 #[test]
252 fn classify_bun_timeout() {
253 let output = "bun process timed out waiting for response";
254 let result = classify_failure(output);
255 assert!(matches!(
256 result,
257 FlakeClassification::Transient {
258 category: FlakeCategory::OracleTimeout,
259 ..
260 }
261 ));
262 }
263
264 #[test]
265 fn classify_oom() {
266 let output = "fatal: out of memory (allocator returned null)";
267 let result = classify_failure(output);
268 assert!(matches!(
269 result,
270 FlakeClassification::Transient {
271 category: FlakeCategory::ResourceExhaustion,
272 ..
273 }
274 ));
275 }
276
277 #[test]
278 fn classify_enomem() {
279 let output = "error: ENOMEM: not enough memory";
280 let result = classify_failure(output);
281 assert!(matches!(
282 result,
283 FlakeClassification::Transient {
284 category: FlakeCategory::ResourceExhaustion,
285 ..
286 }
287 ));
288 }
289
290 #[test]
291 fn classify_quickjs_gc() {
292 let output = "quickjs runtime: allocation failed, out of memory";
293 let result = classify_failure(output);
294 assert!(matches!(
295 result,
296 FlakeClassification::Transient {
297 category: FlakeCategory::JsGcPressure,
298 ..
299 }
300 ));
301 }
302
303 #[test]
304 fn classify_ebusy() {
305 let output = "error: EBUSY: resource busy or locked";
306 let result = classify_failure(output);
307 assert!(matches!(
308 result,
309 FlakeClassification::Transient {
310 category: FlakeCategory::FsContention,
311 ..
312 }
313 ));
314 }
315
316 #[test]
317 fn classify_port_conflict() {
318 let output = "listen EADDRINUSE: address already in use :::8080";
319 let result = classify_failure(output);
320 assert!(matches!(
321 result,
322 FlakeClassification::Transient {
323 category: FlakeCategory::PortConflict,
324 ..
325 }
326 ));
327 }
328
329 #[test]
330 fn classify_tmpdir_race() {
331 let output = "error: No such file or directory (os error 2), path: /tmp/pi-test-abc123";
332 let result = classify_failure(output);
333 assert!(matches!(
334 result,
335 FlakeClassification::Transient {
336 category: FlakeCategory::TmpdirRace,
337 ..
338 }
339 ));
340 }
341
342 #[test]
343 fn classify_deterministic() {
344 let output = "assertion failed: expected PASS but got FAIL\nnote: left == right";
345 let result = classify_failure(output);
346 assert_eq!(result, FlakeClassification::Deterministic);
347 }
348
349 #[test]
350 fn classify_empty_output() {
351 assert_eq!(classify_failure(""), FlakeClassification::Deterministic);
352 }
353
354 #[test]
355 fn classification_is_retriable() {
356 let transient = FlakeClassification::Transient {
357 category: FlakeCategory::OracleTimeout,
358 matched_line: "timeout".into(),
359 };
360 assert!(transient.is_retriable());
361 assert!(!FlakeClassification::Deterministic.is_retriable());
362 }
363
364 #[test]
365 fn retry_policy_default() {
366 let policy = RetryPolicy {
367 max_retries: 1,
368 retry_delay_secs: 5,
369 flake_budget: 3,
370 };
371 let transient = FlakeClassification::Transient {
372 category: FlakeCategory::OracleTimeout,
373 matched_line: "x".into(),
374 };
375 assert!(policy.should_retry(&transient, 0));
376 assert!(!policy.should_retry(&transient, 1));
377 assert!(!policy.should_retry(&FlakeClassification::Deterministic, 0));
378 }
379
380 #[test]
381 fn flake_event_serde_roundtrip() {
382 let event = FlakeEvent {
383 target: "ext_conformance".into(),
384 classification: FlakeClassification::Transient {
385 category: FlakeCategory::OracleTimeout,
386 matched_line: "oracle timed out".into(),
387 },
388 attempt: 1,
389 timestamp: "2026-02-08T03:00:00Z".into(),
390 };
391 let json = serde_json::to_string(&event).unwrap();
392 let back: FlakeEvent = serde_json::from_str(&json).unwrap();
393 assert_eq!(back.target, "ext_conformance");
394 assert!(back.classification.is_retriable());
395 }
396
397 #[test]
398 fn retry_policy_bounds_environment_variables() {
399 let policy = RetryPolicy::from_env(|key| match key {
400 "PI_CONFORMANCE_MAX_RETRIES"
401 | "PI_CONFORMANCE_RETRY_DELAY"
402 | "PI_CONFORMANCE_FLAKE_BUDGET" => Ok("999999999".to_string()),
403 _ => Err(std::env::VarError::NotPresent),
404 });
405
406 assert_eq!(policy.max_retries, 100);
408 assert_eq!(policy.retry_delay_secs, 3600);
409 assert_eq!(policy.flake_budget, 1000);
410 }
411
412 #[test]
413 fn flake_category_all_covered() {
414 assert_eq!(FlakeCategory::all().len(), 6);
415 for cat in FlakeCategory::all() {
416 assert!(!cat.label().is_empty());
417 assert!(!cat.to_string().is_empty());
418 }
419 }
420
421 #[test]
422 fn multiline_output_matches_first_pattern() {
423 let output = "starting test...\ncompiling extensions...\nerror: bun process timed out\nassert failed";
424 let result = classify_failure(output);
425 assert!(matches!(
426 result,
427 FlakeClassification::Transient {
428 category: FlakeCategory::OracleTimeout,
429 ..
430 }
431 ));
432 }
433
434 #[test]
435 fn case_insensitive_matching() {
436 let output = "ERROR: OUT OF MEMORY";
437 let result = classify_failure(output);
438 assert!(result.is_retriable());
439 }
440
441 #[test]
442 fn bounded_input_prevents_dos() {
443 let large_prefix = "x".repeat(MAX_CLASSIFY_INPUT_SIZE + 1000);
446 let pattern = "oracle timed out";
447 let large_input = format!("{large_prefix}\n{pattern}");
448
449 let result = classify_failure(&large_input);
452
453 assert_eq!(result, FlakeClassification::Deterministic);
455
456 let bounded_input = format!("{pattern}\n{}", "y".repeat(MAX_CLASSIFY_INPUT_SIZE));
458 let result = classify_failure(&bounded_input);
459 assert!(matches!(
460 result,
461 FlakeClassification::Transient {
462 category: FlakeCategory::OracleTimeout,
463 ..
464 }
465 ));
466 }
467
468 mod proptest_flake_classifier {
469 use super::*;
470 use proptest::prelude::*;
471
472 fn arb_transient_line() -> impl Strategy<Value = (String, FlakeCategory)> {
474 prop_oneof![
475 Just((
476 "oracle process timed out".to_string(),
477 FlakeCategory::OracleTimeout
478 )),
479 Just((
480 "bun timed out waiting".to_string(),
481 FlakeCategory::OracleTimeout
482 )),
483 Just((
484 "fatal: out of memory".to_string(),
485 FlakeCategory::ResourceExhaustion
486 )),
487 Just((
488 "error: ENOMEM".to_string(),
489 FlakeCategory::ResourceExhaustion
490 )),
491 Just((
492 "cannot allocate 4 GB".to_string(),
493 FlakeCategory::ResourceExhaustion
494 )),
495 Just((
496 "quickjs runtime: allocation failed, out of memory".to_string(),
497 FlakeCategory::JsGcPressure
498 )),
499 Just((
500 "EBUSY: resource busy".to_string(),
501 FlakeCategory::FsContention
502 )),
503 Just(("ETXTBSY".to_string(), FlakeCategory::FsContention)),
504 Just((
505 "resource busy or locked".to_string(),
506 FlakeCategory::FsContention
507 )),
508 Just((
509 "EADDRINUSE on port 8080".to_string(),
510 FlakeCategory::PortConflict
511 )),
512 Just((
513 "address already in use".to_string(),
514 FlakeCategory::PortConflict
515 )),
516 Just((
517 "ENOENT: no such file or directory /tmp/pi-test".to_string(),
518 FlakeCategory::TmpdirRace
519 )),
520 ]
521 }
522
523 proptest! {
524 #[test]
525 fn classify_failure_never_panics(s in ".*") {
526 let _ = classify_failure(&s);
527 }
528
529 #[test]
530 fn deterministic_is_not_retriable(s in "[a-zA-Z0-9 ]{0,200}") {
531 let result = classify_failure(&s);
532 if result == FlakeClassification::Deterministic {
533 assert!(!result.is_retriable());
534 }
535 }
536
537 #[test]
538 fn transient_is_always_retriable(s in ".*") {
539 let result = classify_failure(&s);
540 if let FlakeClassification::Transient { .. } = &result {
541 assert!(result.is_retriable());
542 }
543 }
544
545 #[test]
546 fn known_transient_lines_classify_correctly(
547 (line, expected_cat) in arb_transient_line()
548 ) {
549 let result = classify_failure(&line);
550 match result {
551 FlakeClassification::Transient { category, .. } => {
552 assert_eq!(
553 category, expected_cat,
554 "line {line:?} got {category:?} expected {expected_cat:?}"
555 );
556 }
557 FlakeClassification::Deterministic => {
558 assert!(false, "expected Transient for {line:?}, got Deterministic");
559 }
560 }
561 }
562
563 #[test]
564 fn classify_is_case_insensitive(
565 (line, expected_cat) in arb_transient_line()
566 ) {
567 let upper = classify_failure(&line.to_uppercase());
568 let lower = classify_failure(&line.to_lowercase());
569 match (&upper, &lower) {
570 (
571 FlakeClassification::Transient { category: cu, .. },
572 FlakeClassification::Transient { category: cl, .. },
573 ) => {
574 assert_eq!(*cu, expected_cat);
575 assert_eq!(*cl, expected_cat);
576 }
577 _ => assert!(false, "expected both Transient for line {line:?}"),
578 }
579 }
580
581 #[test]
582 fn noise_prefix_preserves_classification(
583 noise in "[a-zA-Z0-9 ]{0,50}",
584 (line, expected_cat) in arb_transient_line(),
585 ) {
586 let input = format!("{noise}\n{line}");
587 let result = classify_failure(&input);
588 match result {
589 FlakeClassification::Transient { category, .. } => {
590 assert_eq!(category, expected_cat);
591 }
592 FlakeClassification::Deterministic => {
593 assert!(false, "expected Transient for input with line {line:?}");
594 }
595 }
596 }
597
598 #[test]
599 fn whitespace_only_is_deterministic(s in "[ \\t\\n]{0,100}") {
600 assert_eq!(classify_failure(&s), FlakeClassification::Deterministic);
601 }
602
603 #[test]
604 fn serde_roundtrip_transient((line, _cat) in arb_transient_line()) {
605 let result = classify_failure(&line);
606 let json = serde_json::to_string(&result).unwrap();
607 let back: FlakeClassification = serde_json::from_str(&json).unwrap();
608 assert_eq!(result, back);
609 }
610
611 #[test]
612 fn serde_roundtrip_category(idx in 0..6usize) {
613 let cat = FlakeCategory::all()[idx];
614 let json = serde_json::to_string(&cat).unwrap();
615 let back: FlakeCategory = serde_json::from_str(&json).unwrap();
616 assert_eq!(cat, back);
617 }
618
619 #[test]
620 fn all_categories_have_nonempty_labels(idx in 0..6usize) {
621 let cat = FlakeCategory::all()[idx];
622 assert!(!cat.label().is_empty());
623 assert!(!cat.to_string().is_empty());
624 assert_eq!(cat.label(), cat.to_string());
625 }
626
627 #[test]
628 fn retry_policy_respects_attempt_bound(
629 max_retries in 0..10u32,
630 attempt in 0..20u32,
631 ) {
632 let policy = RetryPolicy {
633 max_retries,
634 retry_delay_secs: 1,
635 flake_budget: 3,
636 };
637 let transient = FlakeClassification::Transient {
638 category: FlakeCategory::OracleTimeout,
639 matched_line: "x".into(),
640 };
641 let should = policy.should_retry(&transient, attempt);
642 assert_eq!(should, attempt < max_retries);
643 }
644
645 #[test]
646 fn retry_policy_never_retries_deterministic(
647 max_retries in 0..10u32,
648 attempt in 0..20u32,
649 ) {
650 let policy = RetryPolicy {
651 max_retries,
652 retry_delay_secs: 1,
653 flake_budget: 3,
654 };
655 assert!(!policy.should_retry(&FlakeClassification::Deterministic, attempt));
656 }
657
658 #[test]
659 fn flake_event_serde_roundtrip_prop(
660 target in "[a-z_]{1,20}",
661 attempt in 0..100u32,
662 idx in 0..6usize,
663 ) {
664 let cat = FlakeCategory::all()[idx];
665 let event = FlakeEvent {
666 target: target.clone(),
667 classification: FlakeClassification::Transient {
668 category: cat,
669 matched_line: "matched".into(),
670 },
671 attempt,
672 timestamp: "2026-01-01T00:00:00Z".into(),
673 };
674 let json = serde_json::to_string(&event).unwrap();
675 let back: FlakeEvent = serde_json::from_str(&json).unwrap();
676 assert_eq!(back.target, target);
677 assert_eq!(back.attempt, attempt);
678 assert!(back.classification.is_retriable());
679 }
680 }
681 }
682}