Skip to main content

hegel/
runner.rs

1use crate::antithesis::{TestLocation, is_running_in_antithesis};
2use crate::control::{currently_in_test_context, with_test_context};
3use crate::protocol::{Channel, Connection, HANDSHAKE_STRING, SERVER_CRASHED_MESSAGE};
4use crate::test_case::{ASSUME_FAIL_STRING, STOP_TEST_STRING, TestCase};
5use ciborium::Value;
6
7use crate::cbor_utils::{as_bool, as_text, as_u64, cbor_map, map_get};
8use std::backtrace::{Backtrace, BacktraceStatus};
9use std::cell::RefCell;
10use std::fs::{File, OpenOptions};
11use std::panic::{self, AssertUnwindSafe, catch_unwind};
12use std::process::{Command, Stdio};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::{Arc, Mutex, Once};
15
16const SUPPORTED_PROTOCOL_VERSIONS: (f64, f64) = (0.6, 0.7);
17const HEGEL_SERVER_VERSION: &str = "0.2.3";
18const HEGEL_SERVER_COMMAND_ENV: &str = "HEGEL_SERVER_COMMAND";
19const HEGEL_SERVER_DIR: &str = ".hegel";
20const UV_NOT_FOUND_MESSAGE: &str = "\
21You are seeing this error message because hegel-rust tried to use `uv` to install \
22hegel-core, but could not find uv on the PATH.
23
24Hegel uses a Python server component called `hegel-core` to share core property-based \
25testing functionality across languages. There are two ways for Hegel to get hegel-core:
26
27* By default, Hegel looks for uv (https://docs.astral.sh/uv/) on the PATH, and \
28  uses uv to install hegel-core to a local `.hegel/venv` directory. We recommend this \
29  option. To continue, install uv: https://docs.astral.sh/uv/getting-started/installation/.
30* Alternatively, you can manage the installation of hegel-core yourself. After installing, \
31  setting the HEGEL_SERVER_COMMAND environment variable to your hegel-core binary path tells \
32  hegel-rust to use that hegel-core instead.
33
34See https://hegel.dev/reference/installation for more details.";
35static HEGEL_SERVER_COMMAND: std::sync::OnceLock<String> = std::sync::OnceLock::new();
36static SERVER_LOG_FILE: std::sync::OnceLock<Mutex<File>> = std::sync::OnceLock::new();
37static SESSION: std::sync::OnceLock<HegelSession> = std::sync::OnceLock::new();
38
39static PANIC_HOOK_INIT: Once = Once::new();
40
41/// A persistent connection to the hegel server subprocess.
42///
43/// Created once per process on first use. The subprocess and connection
44/// are reused across all `Hegel::run()` calls. The Python server supports
45/// multiple sequential `run_test` commands over a single connection.
46struct HegelSession {
47    connection: Arc<Connection>,
48    /// The control channel is shared across threads, so it's behind a Mutex
49    /// because Channel is not thread-safe. The lock is only held for the
50    /// brief run_test send/receive; test execution runs concurrently on
51    /// per-test channels.
52    control: Mutex<Channel>,
53}
54
55impl HegelSession {
56    fn get() -> &'static HegelSession {
57        SESSION.get_or_init(|| {
58            init_panic_hook();
59            HegelSession::init()
60        })
61    }
62
63    fn init() -> HegelSession {
64        let hegel_binary_path = find_hegel();
65        let mut cmd = Command::new(&hegel_binary_path);
66        cmd.arg("--stdio").arg("--verbosity").arg("normal");
67
68        cmd.env("PYTHONUNBUFFERED", "1");
69        let log_file = server_log_file();
70        cmd.stdin(Stdio::piped());
71        cmd.stdout(Stdio::piped());
72        cmd.stderr(Stdio::from(log_file));
73
74        #[allow(clippy::expect_fun_call)]
75        let mut child = cmd
76            .spawn()
77            .expect(format!("Failed to spawn hegel at path {}", hegel_binary_path).as_str());
78
79        let child_stdin = child.stdin.take().expect("Failed to take child stdin");
80        let child_stdout = child.stdout.take().expect("Failed to take child stdout");
81
82        let connection = Connection::new(Box::new(child_stdout), Box::new(child_stdin));
83        let mut control = connection.control_channel();
84
85        // Handshake
86        let req_id = control
87            .send_request(HANDSHAKE_STRING.to_vec())
88            .expect("Failed to send version negotiation");
89        let response = control
90            .receive_reply(req_id)
91            .expect("Failed to receive version response");
92
93        let decoded = String::from_utf8_lossy(&response);
94        let server_version = match decoded.strip_prefix("Hegel/") {
95            Some(v) => v,
96            None => {
97                let _ = child.kill();
98                panic!("Bad handshake response: {decoded:?}");
99            }
100        };
101        let version: f64 = server_version.parse().unwrap_or_else(|_| {
102            let _ = child.kill();
103            panic!("Bad version number: {server_version}");
104        });
105
106        let (lo, hi) = SUPPORTED_PROTOCOL_VERSIONS;
107        if !(lo <= version && version <= hi) {
108            let _ = child.kill();
109            panic!(
110                "hegel-rust supports protocol versions {lo} through {hi}, but \
111                 the connected server is using protocol version {version}. Upgrading \
112                 hegel-rust or downgrading hegel-core might help."
113            );
114        }
115
116        // Monitor thread: detects server crash. The pipe close from
117        // the child exiting will unblock any pending reads.
118        let conn_for_monitor = Arc::clone(&connection);
119        std::thread::spawn(move || {
120            let _ = child.wait();
121            conn_for_monitor.mark_server_exited();
122        });
123
124        HegelSession {
125            connection,
126            control: Mutex::new(control),
127        }
128    }
129}
130
131thread_local! {
132    /// (thread_name, thread_id, location, backtrace)
133    static LAST_PANIC_INFO: RefCell<Option<(String, String, String, Backtrace)>> = const { RefCell::new(None) };
134}
135
136/// (thread_name, thread_id, location, backtrace).
137fn take_panic_info() -> Option<(String, String, String, Backtrace)> {
138    LAST_PANIC_INFO.with(|info| info.borrow_mut().take())
139}
140
141/// Format a backtrace, optionally filtering to "short" format.
142///
143/// Short format shows only frames between `__rust_end_short_backtrace` and
144/// `__rust_begin_short_backtrace` markers, matching the default Rust panic handler.
145/// Frame numbers are renumbered to start at 0.
146fn format_backtrace(bt: &Backtrace, full: bool) -> String {
147    let backtrace_str = format!("{}", bt);
148
149    if full {
150        return backtrace_str;
151    }
152
153    // Filter to short backtrace: keep lines between the markers
154    // Frame groups look like:
155    //    N: function::name
156    //              at /path/to/file.rs:123:45
157    let lines: Vec<&str> = backtrace_str.lines().collect();
158    let mut start_idx = 0;
159    let mut end_idx = lines.len();
160
161    for (i, line) in lines.iter().enumerate() {
162        if line.contains("__rust_end_short_backtrace") {
163            // Skip past this frame (find the next frame number)
164            for (j, next_line) in lines.iter().enumerate().skip(i + 1) {
165                if next_line
166                    .trim_start()
167                    .chars()
168                    .next()
169                    .map(|c| c.is_ascii_digit())
170                    .unwrap_or(false)
171                {
172                    start_idx = j;
173                    break;
174                }
175            }
176        }
177        if line.contains("__rust_begin_short_backtrace") {
178            // Find the start of this frame (the line with the frame number)
179            for (j, prev_line) in lines
180                .iter()
181                .enumerate()
182                .take(i + 1)
183                .collect::<Vec<_>>()
184                .into_iter()
185                .rev()
186            {
187                if prev_line
188                    .trim_start()
189                    .chars()
190                    .next()
191                    .map(|c| c.is_ascii_digit())
192                    .unwrap_or(false)
193                {
194                    end_idx = j;
195                    break;
196                }
197            }
198            break;
199        }
200    }
201
202    // Renumber frames starting at 0
203    let filtered: Vec<&str> = lines[start_idx..end_idx].to_vec();
204    let mut new_frame_num = 0usize;
205    let mut result = Vec::new();
206
207    for line in filtered {
208        let trimmed = line.trim_start();
209        if trimmed
210            .chars()
211            .next()
212            .map(|c| c.is_ascii_digit())
213            .unwrap_or(false)
214        {
215            // This is a frame number line like "   8: function_name"
216            // Find where the number ends (at the colon)
217            if let Some(colon_pos) = trimmed.find(':') {
218                let rest = &trimmed[colon_pos..];
219                // Preserve original indentation style (right-aligned numbers)
220                result.push(format!("{:>4}{}", new_frame_num, rest));
221                new_frame_num += 1;
222            } else {
223                result.push(line.to_string());
224            }
225        } else {
226            result.push(line.to_string());
227        }
228    }
229
230    result.join("\n")
231}
232
233// Panic unconditionally prints to stderr, even if it's caught later. This results in
234// messy output during shrinking. To avoid this, we replace the panic hook with our
235// own that suppresses the printing except for the final replay.
236//
237// This is called once per process, the first time any hegel test runs.
238fn init_panic_hook() {
239    PANIC_HOOK_INIT.call_once(|| {
240        let prev_hook = panic::take_hook();
241        panic::set_hook(Box::new(move |info| {
242            if !currently_in_test_context() {
243                // use actual panic hook outside of tests
244                prev_hook(info);
245                return;
246            }
247
248            let thread = std::thread::current();
249            let thread_name = thread.name().unwrap_or("<unnamed>").to_string();
250            // ThreadId's debug output is ThreadId(N)
251            let thread_id = format!("{:?}", thread.id())
252                .trim_start_matches("ThreadId(")
253                .trim_end_matches(')')
254                .to_string();
255            let location = info
256                .location()
257                .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()))
258                .unwrap_or_else(|| "<unknown>".to_string());
259
260            let backtrace = Backtrace::capture();
261
262            LAST_PANIC_INFO
263                .with(|l| *l.borrow_mut() = Some((thread_name, thread_id, location, backtrace)));
264        }));
265    });
266}
267
268fn ensure_hegel_installed() -> Result<String, String> {
269    let venv_dir = format!("{HEGEL_SERVER_DIR}/venv");
270    let version_file = format!("{venv_dir}/hegel-version");
271    let hegel_bin = format!("{venv_dir}/bin/hegel");
272    let install_log = format!("{HEGEL_SERVER_DIR}/install.log");
273
274    // Check cached version
275    if let Ok(cached) = std::fs::read_to_string(&version_file) {
276        if cached.trim() == HEGEL_SERVER_VERSION && std::path::Path::new(&hegel_bin).is_file() {
277            return Ok(hegel_bin);
278        }
279    }
280
281    std::fs::create_dir_all(HEGEL_SERVER_DIR)
282        .map_err(|e| format!("Failed to create {HEGEL_SERVER_DIR}: {e}"))?;
283
284    let log_file = std::fs::File::create(&install_log)
285        .map_err(|e| format!("Failed to create install log: {e}"))?;
286
287    let status = std::process::Command::new("uv")
288        .args(["venv", "--clear", &venv_dir])
289        .stderr(log_file.try_clone().unwrap())
290        .stdout(log_file.try_clone().unwrap())
291        .status();
292    match &status {
293        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
294            return Err(UV_NOT_FOUND_MESSAGE.to_string());
295        }
296        Err(e) => {
297            return Err(format!("Failed to run `uv venv`: {e}"));
298        }
299        Ok(s) if !s.success() => {
300            let log = std::fs::read_to_string(&install_log).unwrap_or_default();
301            return Err(format!("uv venv failed. Install log:\n{log}"));
302        }
303        Ok(_) => {}
304    }
305
306    let python_path = format!("{venv_dir}/bin/python");
307    let status = std::process::Command::new("uv")
308        .args([
309            "pip",
310            "install",
311            "--python",
312            &python_path,
313            &format!("hegel-core=={HEGEL_SERVER_VERSION}"),
314        ])
315        .stderr(log_file.try_clone().unwrap())
316        .stdout(log_file)
317        .status()
318        .map_err(|e| format!("Failed to run `uv pip install`: {e}"))?;
319    if !status.success() {
320        let log = std::fs::read_to_string(&install_log).unwrap_or_default();
321        return Err(format!(
322            "Failed to install hegel-core (version: {HEGEL_SERVER_VERSION}). \
323             Set {HEGEL_SERVER_COMMAND_ENV} to a hegel binary path to skip installation.\n\
324             Install log:\n{log}"
325        ));
326    }
327
328    if !std::path::Path::new(&hegel_bin).is_file() {
329        return Err(format!("hegel not found at {hegel_bin} after installation"));
330    }
331
332    std::fs::write(&version_file, HEGEL_SERVER_VERSION)
333        .map_err(|e| format!("Failed to write version file: {e}"))?;
334
335    Ok(hegel_bin)
336}
337
338fn server_log_file() -> File {
339    let file = SERVER_LOG_FILE.get_or_init(|| {
340        std::fs::create_dir_all(HEGEL_SERVER_DIR).ok();
341        let file = OpenOptions::new()
342            .create(true)
343            .append(true)
344            .open(format!("{HEGEL_SERVER_DIR}/server.log"))
345            .expect("Failed to open server log file");
346        Mutex::new(file)
347    });
348    file.lock()
349        .unwrap()
350        .try_clone()
351        .expect("Failed to clone server log file handle")
352}
353
354fn find_hegel() -> String {
355    if let Ok(override_path) = std::env::var(HEGEL_SERVER_COMMAND_ENV) {
356        return override_path;
357    }
358    HEGEL_SERVER_COMMAND
359        .get_or_init(|| ensure_hegel_installed().unwrap_or_else(|e| panic!("{e}")))
360        .clone()
361}
362
363/// Health checks that can be suppressed during test execution.
364///
365/// Health checks detect common issues with test configuration that would
366/// otherwise cause tests to run inefficiently or not at all.
367#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
368pub enum HealthCheck {
369    /// Too many test cases are being filtered out via `assume()`.
370    FilterTooMuch,
371    /// Test execution is too slow.
372    TooSlow,
373    /// Generated test cases are too large.
374    TestCasesTooLarge,
375    /// The smallest natural input is very large.
376    LargeInitialTestCase,
377}
378
379impl HealthCheck {
380    /// Returns all health check variants.
381    ///
382    /// Useful for suppressing all health checks at once:
383    ///
384    /// ```no_run
385    /// use hegel::HealthCheck;
386    ///
387    /// #[hegel::test(suppress_health_check = HealthCheck::all())]
388    /// fn my_test(tc: hegel::TestCase) {
389    ///     // ...
390    /// }
391    /// ```
392    pub const fn all() -> [HealthCheck; 4] {
393        [
394            HealthCheck::FilterTooMuch,
395            HealthCheck::TooSlow,
396            HealthCheck::TestCasesTooLarge,
397            HealthCheck::LargeInitialTestCase,
398        ]
399    }
400
401    fn as_str(&self) -> &'static str {
402        match self {
403            HealthCheck::FilterTooMuch => "filter_too_much",
404            HealthCheck::TooSlow => "too_slow",
405            HealthCheck::TestCasesTooLarge => "test_cases_too_large",
406            HealthCheck::LargeInitialTestCase => "large_initial_test_case",
407        }
408    }
409}
410
411/// Controls how much output Hegel produces during test runs.
412#[derive(Debug, Clone, Copy, PartialEq, Eq)]
413pub enum Verbosity {
414    /// Suppress all output.
415    Quiet,
416    /// Default output level.
417    Normal,
418    /// Show more detail about the test run.
419    Verbose,
420    /// Show protocol-level debug information.
421    Debug,
422}
423
424impl Verbosity {}
425
426// internal use only
427#[doc(hidden)]
428pub fn hegel<F>(test_fn: F)
429where
430    F: FnMut(TestCase),
431{
432    Hegel::new(test_fn).run();
433}
434
435fn is_in_ci() -> bool {
436    const CI_VARS: &[(&str, Option<&str>)] = &[
437        ("CI", None),
438        ("TF_BUILD", Some("true")),
439        ("BUILDKITE", Some("true")),
440        ("CIRCLECI", Some("true")),
441        ("CIRRUS_CI", Some("true")),
442        ("CODEBUILD_BUILD_ID", None),
443        ("GITHUB_ACTIONS", Some("true")),
444        ("GITLAB_CI", None),
445        ("HEROKU_TEST_RUN_ID", None),
446        ("TEAMCITY_VERSION", None),
447        ("bamboo.buildKey", None),
448    ];
449
450    CI_VARS.iter().any(|(key, value)| match value {
451        None => std::env::var_os(key).is_some(),
452        Some(expected) => std::env::var(key).ok().as_deref() == Some(expected),
453    })
454}
455
456#[derive(Debug, Clone, PartialEq, Eq)]
457enum Database {
458    Unset,
459    Disabled,
460    Path(String),
461}
462
463/// Configuration for a Hegel test run.
464///
465/// Use builder methods to customize, then pass to [`Hegel::settings`] or
466/// the `settings` parameter of `#[hegel::test]`.
467///
468/// In CI environments (detected automatically), the database is disabled
469/// and tests are derandomized by default.
470#[derive(Debug, Clone)]
471pub struct Settings {
472    test_cases: u64,
473    verbosity: Verbosity,
474    seed: Option<u64>,
475    derandomize: bool,
476    database: Database,
477    suppress_health_check: Vec<HealthCheck>,
478}
479
480impl Settings {
481    /// Create settings with defaults. Detects CI environments automatically.
482    pub fn new() -> Self {
483        let in_ci = is_in_ci();
484        Self {
485            test_cases: 100,
486            verbosity: Verbosity::Normal,
487            seed: None,
488            derandomize: in_ci,
489            database: if in_ci {
490                Database::Disabled
491            } else {
492                Database::Unset
493            },
494            suppress_health_check: Vec::new(),
495        }
496    }
497
498    /// Set the number of test cases to run (default: 100).
499    pub fn test_cases(mut self, n: u64) -> Self {
500        self.test_cases = n;
501        self
502    }
503
504    /// Set the verbosity level.
505    pub fn verbosity(mut self, verbosity: Verbosity) -> Self {
506        self.verbosity = verbosity;
507        self
508    }
509
510    /// Set a fixed seed for reproducibility, or `None` for random.
511    pub fn seed(mut self, seed: Option<u64>) -> Self {
512        self.seed = seed;
513        self
514    }
515
516    /// When true, use a fixed seed derived from the test name. Enabled by default in CI.
517    pub fn derandomize(mut self, derandomize: bool) -> Self {
518        self.derandomize = derandomize;
519        self
520    }
521
522    /// Set the database path for storing failing examples, or `None` to disable.
523    pub fn database(mut self, database: Option<String>) -> Self {
524        self.database = match database {
525            None => Database::Disabled,
526            Some(path) => Database::Path(path),
527        };
528        self
529    }
530
531    /// Suppress one or more health checks so they do not cause test failure.
532    ///
533    /// Health checks detect common issues like excessive filtering or slow
534    /// tests. Use this to suppress specific checks when they are expected.
535    ///
536    /// # Example
537    ///
538    /// ```no_run
539    /// use hegel::{HealthCheck, Verbosity};
540    /// use hegel::generators as gs;
541    ///
542    /// #[hegel::test(suppress_health_check = [HealthCheck::FilterTooMuch, HealthCheck::TooSlow])]
543    /// fn my_test(tc: hegel::TestCase) {
544    ///     let n: i32 = tc.draw(gs::integers());
545    ///     tc.assume(n > 0);
546    /// }
547    /// ```
548    pub fn suppress_health_check(mut self, checks: impl IntoIterator<Item = HealthCheck>) -> Self {
549        self.suppress_health_check.extend(checks);
550        self
551    }
552}
553
554impl Default for Settings {
555    fn default() -> Self {
556        Self::new()
557    }
558}
559
560// internal use only
561#[doc(hidden)]
562pub struct Hegel<F> {
563    test_fn: F,
564    database_key: Option<String>,
565    test_location: Option<TestLocation>,
566    settings: Settings,
567}
568
569impl<F> Hegel<F>
570where
571    F: FnMut(TestCase),
572{
573    /// Create a new test builder with default settings.
574    pub fn new(test_fn: F) -> Self {
575        Self {
576            test_fn,
577            database_key: None,
578            settings: Settings::new(),
579            test_location: None,
580        }
581    }
582
583    /// Override the default settings.
584    pub fn settings(mut self, settings: Settings) -> Self {
585        self.settings = settings;
586        self
587    }
588
589    #[doc(hidden)]
590    pub fn __database_key(mut self, key: String) -> Self {
591        self.database_key = Some(key);
592        self
593    }
594
595    #[doc(hidden)]
596    pub fn test_location(mut self, location: TestLocation) -> Self {
597        self.test_location = Some(location);
598        self
599    }
600
601    /// Run the property-based tests.
602    ///
603    /// Connects to the shared hegel server (spawning it on first use),
604    /// sends a `run_test` command, processes test cases, and reports results.
605    /// Panics if any test case fails.
606    pub fn run(self) {
607        let session = HegelSession::get();
608        let connection = &session.connection;
609
610        let mut test_fn = self.test_fn;
611        let verbosity = self.settings.verbosity;
612        let got_interesting = Arc::new(AtomicBool::new(false));
613        let mut test_channel = connection.new_channel();
614
615        let suppress_names: Vec<Value> = self
616            .settings
617            .suppress_health_check
618            .iter()
619            .map(|c| Value::Text(c.as_str().to_string()))
620            .collect();
621
622        let database_key_bytes = self
623            .database_key
624            .map_or(Value::Null, |k| Value::Bytes(k.into_bytes()));
625
626        let mut run_test_msg = cbor_map! {
627            "command" => "run_test",
628            "test_cases" => self.settings.test_cases,
629            "seed" => self.settings.seed.map_or(Value::Null, Value::from),
630            "channel_id" => test_channel.channel_id,
631            "database_key" => database_key_bytes,
632            "derandomize" => self.settings.derandomize
633        };
634        let db_value = match &self.settings.database {
635            Database::Unset => Option::None,
636            Database::Disabled => Some(Value::Null),
637            Database::Path(s) => Some(Value::Text(s.clone())),
638        };
639        if let Some(db) = db_value {
640            if let Value::Map(ref mut map) = run_test_msg {
641                map.push((Value::Text("database".to_string()), db));
642            }
643        }
644        if !suppress_names.is_empty() {
645            if let Value::Map(ref mut map) = run_test_msg {
646                map.push((
647                    Value::Text("suppress_health_check".to_string()),
648                    Value::Array(suppress_names),
649                ));
650            }
651        }
652
653        // The control channel is behind a Mutex because Channel requires &mut self.
654        // This only serializes the brief run_test send/receive — actual test
655        // execution happens on per-test channels without holding this lock.
656        {
657            let mut control = session.control.lock().unwrap();
658            let run_test_id = control
659                .send_request(cbor_encode(&run_test_msg))
660                .expect("Failed to send run_test");
661
662            let run_test_response = control
663                .receive_reply(run_test_id)
664                .expect("Failed to receive run_test response");
665            let _run_test_result: Value = cbor_decode(&run_test_response);
666        }
667
668        if verbosity == Verbosity::Debug {
669            eprintln!("run_test response received");
670        }
671
672        let result_data: Value;
673        let ack_null = cbor_map! {"result" => Value::Null};
674        loop {
675            // Handle the server dying between events: receive_request will
676            // fail with RecvError once the background reader clears the senders.
677            let (event_id, event_payload) = match test_channel.receive_request() {
678                Ok(event) => event,
679                Err(_) if connection.server_has_exited() => {
680                    panic!("{}", SERVER_CRASHED_MESSAGE);
681                }
682                Err(e) => unreachable!("Failed to receive event (server still running): {}", e),
683            };
684
685            let event: Value = cbor_decode(&event_payload);
686            let event_type = map_get(&event, "event")
687                .and_then(as_text)
688                .expect("Expected event in payload");
689
690            if verbosity == Verbosity::Debug {
691                eprintln!("Received event: {:?}", event);
692            }
693
694            match event_type {
695                "test_case" => {
696                    let channel_id = map_get(&event, "channel_id")
697                        .and_then(as_u64)
698                        .expect("Missing channel id") as u32;
699
700                    let test_case_channel = connection.connect_channel(channel_id);
701
702                    // Ack the test_case event BEFORE running the test (prevents deadlock)
703                    test_channel
704                        .write_reply(event_id, cbor_encode(&ack_null))
705                        .expect("Failed to ack test_case");
706
707                    run_test_case(
708                        connection,
709                        test_case_channel,
710                        &mut test_fn,
711                        false,
712                        verbosity,
713                        &got_interesting,
714                    );
715                }
716                "test_done" => {
717                    let ack_true = cbor_map! {"result" => true};
718                    test_channel
719                        .write_reply(event_id, cbor_encode(&ack_true))
720                        .expect("Failed to ack test_done");
721                    result_data = map_get(&event, "results").cloned().unwrap_or(Value::Null);
722                    break;
723                }
724                _ => {
725                    panic!("unknown event: {}", event_type);
726                }
727            }
728        }
729
730        // Check for server-side errors before processing results
731        if let Some(error_msg) = map_get(&result_data, "error").and_then(as_text) {
732            panic!("Server error: {}", error_msg);
733        }
734
735        // Check for health check failure before processing results
736        if let Some(failure_msg) = map_get(&result_data, "health_check_failure").and_then(as_text) {
737            panic!("Health check failure:\n{}", failure_msg);
738        }
739
740        // Check for flaky test detection
741        if let Some(flaky_msg) = map_get(&result_data, "flaky").and_then(as_text) {
742            panic!("Flaky test detected: {}", flaky_msg);
743        }
744
745        let n_interesting = map_get(&result_data, "interesting_test_cases")
746            .and_then(as_u64)
747            .unwrap_or(0);
748
749        if verbosity == Verbosity::Debug {
750            eprintln!("Test done. interesting_test_cases={}", n_interesting);
751        }
752
753        // Process final replay test cases (one per interesting example)
754        let mut final_result: Option<TestCaseResult> = None;
755        for _ in 0..n_interesting {
756            let (event_id, event_payload) = test_channel
757                .receive_request()
758                .expect("Failed to receive final test_case");
759
760            let event: Value = cbor_decode(&event_payload);
761            let event_type = map_get(&event, "event").and_then(as_text);
762            assert_eq!(event_type, Some("test_case"));
763
764            let channel_id = map_get(&event, "channel_id")
765                .and_then(as_u64)
766                .expect("Missing channel id") as u32;
767
768            let test_case_channel = connection.connect_channel(channel_id);
769
770            test_channel
771                .write_reply(event_id, cbor_encode(&ack_null))
772                .expect("Failed to ack final test_case");
773
774            let tc_result = run_test_case(
775                connection,
776                test_case_channel,
777                &mut test_fn,
778                true,
779                verbosity,
780                &got_interesting,
781            );
782
783            if matches!(&tc_result, TestCaseResult::Interesting { .. }) {
784                final_result = Some(tc_result);
785            }
786
787            if connection.server_has_exited() {
788                panic!("{}", SERVER_CRASHED_MESSAGE);
789            }
790        }
791
792        let passed = map_get(&result_data, "passed")
793            .and_then(as_bool)
794            .unwrap_or(true);
795
796        let test_failed = !passed || got_interesting.load(Ordering::SeqCst);
797
798        if is_running_in_antithesis() {
799            #[cfg(not(feature = "antithesis"))]
800            panic!(
801                "When Hegel is run inside of Antithesis, it requires the `antithesis` feature. \
802                You can add it with {{ features = [\"antithesis\"] }}."
803            );
804
805            #[cfg(feature = "antithesis")]
806            if let Some(ref loc) = self.test_location {
807                crate::antithesis::emit_assertion(loc, !test_failed);
808            }
809        }
810
811        if test_failed {
812            let msg = match &final_result {
813                Some(TestCaseResult::Interesting { panic_message }) => panic_message.as_str(),
814                _ => "unknown",
815            };
816            panic!("Property test failed: {}", msg);
817        }
818    }
819}
820
821enum TestCaseResult {
822    Valid,
823    Invalid,
824    Interesting { panic_message: String },
825}
826
827fn run_test_case<F: FnMut(TestCase)>(
828    connection: &Arc<Connection>,
829    test_channel: Channel,
830    test_fn: &mut F,
831    is_final: bool,
832    verbosity: Verbosity,
833    got_interesting: &Arc<AtomicBool>,
834) -> TestCaseResult {
835    // Create TestCase. The test function gets a clone (cheap Rc bump),
836    // so we retain access to the same underlying TestCaseData after the test runs.
837    let tc = TestCase::new(Arc::clone(connection), test_channel, verbosity, is_final);
838
839    let result = with_test_context(|| catch_unwind(AssertUnwindSafe(|| test_fn(tc.clone()))));
840
841    let (tc_result, origin) = match &result {
842        Ok(()) => (TestCaseResult::Valid, None),
843        Err(e) => {
844            let msg = panic_message(e);
845            if msg == ASSUME_FAIL_STRING || msg == STOP_TEST_STRING {
846                (TestCaseResult::Invalid, None)
847            } else {
848                got_interesting.store(true, Ordering::SeqCst);
849
850                // Take panic info - we need location for origin, and print details on final
851                let (thread_name, thread_id, location, backtrace) = take_panic_info()
852                    .unwrap_or_else(|| {
853                        (
854                            "<unknown>".to_string(),
855                            "?".to_string(),
856                            "<unknown>".to_string(),
857                            Backtrace::disabled(),
858                        )
859                    });
860
861                if is_final {
862                    eprintln!(
863                        "thread '{}' ({}) panicked at {}:",
864                        thread_name, thread_id, location
865                    );
866                    eprintln!("{}", msg);
867
868                    if backtrace.status() == BacktraceStatus::Captured {
869                        let is_full = std::env::var("RUST_BACKTRACE")
870                            .map(|v| v == "full")
871                            .unwrap_or(false);
872                        let formatted = format_backtrace(&backtrace, is_full);
873                        eprintln!("stack backtrace:\n{}", formatted);
874                        if !is_full {
875                            eprintln!(
876                                "note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace."
877                            );
878                        }
879                    }
880                }
881
882                let origin = format!("Panic at {}", location);
883                (
884                    TestCaseResult::Interesting { panic_message: msg },
885                    Some(origin),
886                )
887            }
888        }
889    };
890
891    // Send mark_complete using the same channel that generators used.
892    // Skip if test was aborted (StopTest) - server already closed the channel.
893    if !tc.test_aborted() {
894        let status = match &tc_result {
895            TestCaseResult::Valid => "VALID",
896            TestCaseResult::Invalid => "INVALID",
897            TestCaseResult::Interesting { .. } => "INTERESTING",
898        };
899        let origin_value = match &origin {
900            Some(s) => Value::Text(s.clone()),
901            None => Value::Null,
902        };
903        let mark_complete = cbor_map! {
904            "command" => "mark_complete",
905            "status" => status,
906            "origin" => origin_value
907        };
908        tc.send_mark_complete(&mark_complete);
909    }
910
911    tc_result
912}
913
914/// Extract a message from a panic payload.
915fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
916    if let Some(s) = payload.downcast_ref::<&str>() {
917        s.to_string()
918    } else if let Some(s) = payload.downcast_ref::<String>() {
919        s.clone()
920    } else {
921        "Unknown panic".to_string()
922    }
923}
924
925/// Encode a ciborium::Value to CBOR bytes.
926fn cbor_encode(value: &Value) -> Vec<u8> {
927    let mut bytes = Vec::new();
928    ciborium::into_writer(value, &mut bytes).expect("CBOR encoding failed");
929    bytes
930}
931
932/// Decode CBOR bytes to a ciborium::Value.
933fn cbor_decode(bytes: &[u8]) -> Value {
934    ciborium::from_reader(bytes).expect("CBOR decoding failed")
935}