Skip to main content

hegel/
runner.rs

1use crate::antithesis::{TestLocation, is_running_in_antithesis};
2use crate::backend::{DataSource, DataSourceError, TestCaseResult, TestRunResult, TestRunner};
3use crate::cbor_utils::{as_bool, as_text, as_u64, cbor_map, map_get, map_insert};
4use crate::control::{currently_in_test_context, with_test_context};
5use crate::protocol::{Connection, HANDSHAKE_STRING, Stream};
6use crate::test_case::{ASSUME_FAIL_STRING, LOOP_DONE_STRING, STOP_TEST_STRING, TestCase};
7use ciborium::Value;
8
9use std::backtrace::{Backtrace, BacktraceStatus};
10use std::cell::RefCell;
11use std::fs::{File, OpenOptions};
12use std::panic::{self, AssertUnwindSafe, catch_unwind};
13use std::process::{Command, Stdio};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, LazyLock, Mutex, Once};
16use std::time::{Duration, Instant};
17
18const SUPPORTED_PROTOCOL_VERSIONS: (&str, &str) = ("0.10", "0.10");
19const HEGEL_SERVER_VERSION: &str = "0.4.7";
20const HEGEL_SERVER_COMMAND_ENV: &str = "HEGEL_SERVER_COMMAND";
21const HEGEL_SERVER_DIR: &str = ".hegel";
22static SERVER_LOG_PATH: Mutex<Option<String>> = Mutex::new(None);
23static LOG_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
24static SESSION: Mutex<Option<Arc<HegelSession>>> = Mutex::new(None);
25
26static PANIC_HOOK_INIT: Once = Once::new();
27
28// ─── ServerDataSource ──────────────────────────────────────────────────────────
29
30static PROTOCOL_DEBUG: LazyLock<bool> = LazyLock::new(|| {
31    matches!(
32        std::env::var("HEGEL_PROTOCOL_DEBUG")
33            .unwrap_or_default()
34            .to_lowercase()
35            .as_str(),
36        "1" | "true"
37    )
38});
39
40/// Backend implementation that communicates with the hegel-core server
41/// over a multiplexed stream.
42pub(crate) struct ServerDataSource {
43    connection: Arc<Connection>,
44    stream: Mutex<Stream>,
45    aborted: AtomicBool,
46    verbosity: Verbosity,
47}
48
49impl ServerDataSource {
50    pub(crate) fn new(connection: Arc<Connection>, stream: Stream, verbosity: Verbosity) -> Self {
51        ServerDataSource {
52            connection,
53            stream: Mutex::new(stream),
54            aborted: AtomicBool::new(false),
55            verbosity,
56        }
57    }
58
59    fn send_request(&self, command: &str, payload: &Value) -> Result<Value, DataSourceError> {
60        if self.aborted.load(Ordering::SeqCst) {
61            return Err(DataSourceError::StopTest);
62        }
63        let debug = *PROTOCOL_DEBUG || self.verbosity == Verbosity::Debug;
64
65        let mut entries = vec![(
66            Value::Text("command".to_string()),
67            Value::Text(command.to_string()),
68        )];
69
70        if let Value::Map(map) = payload {
71            for (k, v) in map {
72                entries.push((k.clone(), v.clone()));
73            }
74        }
75
76        let request = Value::Map(entries);
77
78        if debug {
79            eprintln!("REQUEST: {:?}", request);
80        }
81
82        let result = self
83            .stream
84            .lock()
85            .unwrap_or_else(|e| e.into_inner())
86            .request_cbor(&request);
87
88        match result {
89            Ok(response) => {
90                if debug {
91                    eprintln!("RESPONSE: {:?}", response);
92                }
93                Ok(response)
94            }
95            Err(e) => {
96                let error_msg = e.to_string();
97                if error_msg.contains("UnsatisfiedAssumption") {
98                    // nocov start
99                    if debug {
100                        eprintln!("RESPONSE: UnsatisfiedAssumption");
101                    }
102                    Err(DataSourceError::Assume)
103                    // nocov end
104                } else if error_msg.contains("overflow")
105                    || error_msg.contains("StopTest")
106                    || error_msg.contains("stream is closed")
107                {
108                    if debug {
109                        eprintln!("RESPONSE: StopTest/overflow"); // nocov
110                    }
111                    self.stream
112                        .lock()
113                        .unwrap_or_else(|e| e.into_inner())
114                        .mark_closed();
115                    self.aborted.store(true, Ordering::SeqCst);
116                    Err(DataSourceError::StopTest)
117                // nocov start
118                } else if error_msg.contains("FlakyStrategyDefinition")
119                    || error_msg.contains("FlakyReplay")
120                // nocov end
121                {
122                    self.stream
123                        .lock()
124                        .unwrap_or_else(|e| e.into_inner())
125                        .mark_closed();
126                    self.aborted.store(true, Ordering::SeqCst);
127                    Err(DataSourceError::StopTest)
128                } else if self.connection.server_has_exited() {
129                    panic!("{}", server_crash_message()); // nocov
130                } else {
131                    Err(DataSourceError::ServerError(e.to_string()))
132                }
133            }
134        }
135    }
136}
137
138impl DataSource for ServerDataSource {
139    fn generate(&self, schema: &Value) -> Result<Value, DataSourceError> {
140        self.send_request("generate", &cbor_map! {"schema" => schema.clone()})
141    }
142
143    fn start_span(&self, label: u64) -> Result<(), DataSourceError> {
144        self.send_request("start_span", &cbor_map! {"label" => label})?;
145        Ok(())
146    }
147
148    fn stop_span(&self, discard: bool) -> Result<(), DataSourceError> {
149        self.send_request("stop_span", &cbor_map! {"discard" => discard})?;
150        Ok(())
151    }
152
153    fn new_collection(
154        &self,
155        min_size: u64,
156        max_size: Option<u64>,
157    ) -> Result<String, DataSourceError> {
158        let mut payload = cbor_map! {
159            "min_size" => min_size
160        };
161        if let Some(max) = max_size {
162            map_insert(&mut payload, "max_size", max);
163        }
164        let response = self.send_request("new_collection", &payload)?;
165        match response {
166            Value::Integer(i) => {
167                let n: i128 = i.into();
168                Ok(n.to_string())
169            }
170            // nocov start
171            _ => panic!(
172                "Expected integer response from new_collection, got {:?}",
173                response
174            ),
175            // nocov end
176        }
177    }
178
179    fn collection_more(&self, collection: &str) -> Result<bool, DataSourceError> {
180        let collection_id: i64 = collection.parse().unwrap();
181        let response = self.send_request(
182            "collection_more",
183            &cbor_map! { "collection_id" => collection_id },
184        )?;
185        match response {
186            Value::Bool(b) => Ok(b),
187            _ => panic!("Expected bool from collection_more, got {:?}", response), // nocov
188        }
189    }
190
191    // nocov start
192    fn collection_reject(
193        &self,
194        collection: &str,
195        why: Option<&str>,
196    ) -> Result<(), DataSourceError> {
197        let collection_id: i64 = collection.parse().unwrap();
198        let mut payload = cbor_map! {
199            "collection_id" => collection_id
200        };
201        if let Some(reason) = why {
202            map_insert(&mut payload, "why", reason.to_string());
203        }
204        self.send_request("collection_reject", &payload)?;
205        Ok(())
206        // nocov end
207    }
208
209    fn new_pool(&self) -> Result<i128, DataSourceError> {
210        let response = self.send_request("new_pool", &cbor_map! {})?;
211        match response {
212            Value::Integer(i) => Ok(i.into()),
213            other => panic!("Expected integer response for pool id, got {:?}", other), // nocov
214        }
215    }
216
217    fn pool_add(&self, pool_id: i128) -> Result<i128, DataSourceError> {
218        let response = self.send_request("pool_add", &cbor_map! {"pool_id" => pool_id})?;
219        match response {
220            Value::Integer(i) => Ok(i.into()),
221            other => panic!("Expected integer response for variable id, got {:?}", other), // nocov
222        }
223    }
224
225    fn pool_generate(&self, pool_id: i128, consume: bool) -> Result<i128, DataSourceError> {
226        let response = self.send_request(
227            "pool_generate",
228            &cbor_map! {
229                "pool_id" => pool_id,
230                "consume" => consume,
231            },
232        )?;
233        match response {
234            Value::Integer(i) => Ok(i.into()),
235            other => panic!("Expected integer response for variable id, got {:?}", other), // nocov
236        }
237    }
238
239    fn mark_complete(&self, status: &str, origin: Option<&str>) {
240        let origin_value = match origin {
241            Some(s) => Value::Text(s.to_string()),
242            None => Value::Null,
243        };
244        let mark_complete = cbor_map! {
245            "command" => "mark_complete",
246            "status" => status,
247            "origin" => origin_value
248        };
249        let mut stream = self.stream.lock().unwrap_or_else(|e| e.into_inner());
250        let _ = stream.request_cbor(&mark_complete);
251        let _ = stream.close();
252    }
253
254    fn test_aborted(&self) -> bool {
255        self.aborted.load(Ordering::SeqCst)
256    }
257}
258
259// ─── HegelSession ───────────────────────────────────────────────────────────
260
261/// Parse a "major.minor" version string into a comparable tuple.
262fn parse_version(s: &str) -> (u32, u32) {
263    let parts: Vec<&str> = s.split('.').collect();
264    if parts.len() != 2 {
265        panic!("invalid version string '{s}': expected 'major.minor' format");
266    }
267    let major = parts[0]
268        .parse()
269        .unwrap_or_else(|_| panic!("invalid major version in '{s}'"));
270    let minor = parts[1]
271        .parse()
272        .unwrap_or_else(|_| panic!("invalid minor version in '{s}'"));
273    (major, minor)
274}
275
276/// A persistent connection to the hegel server subprocess.
277///
278/// A new session is created on first use and whenever the previous server
279/// process has exited (crash or explicit kill). The Python server supports
280/// multiple sequential `run_test` commands over a single connection.
281struct HegelSession {
282    connection: Arc<Connection>,
283    /// The control stream is shared across threads, so it's behind a Mutex
284    /// because Stream is not thread-safe. The lock is only held for the
285    /// brief run_test send/receive; test execution runs concurrently on
286    /// per-test streams.
287    control: Mutex<Stream>,
288    /// The server subprocess. Shared with the monitor thread so that
289    /// `__test_kill_server` can call `child.kill()` directly rather than
290    /// shelling out to the OS `kill` command.
291    child: Arc<Mutex<std::process::Child>>,
292}
293
294impl HegelSession {
295    /// Return the current live session, or create a new one if the server has
296    /// exited (either crashed or been killed since the last call).
297    fn get() -> Arc<HegelSession> {
298        let mut guard = SESSION.lock().unwrap_or_else(|e| e.into_inner());
299        if let Some(ref s) = *guard {
300            if !s.connection.server_has_exited() {
301                return Arc::clone(s);
302            }
303        }
304        init_panic_hook();
305        let session = Arc::new(HegelSession::init());
306        *guard = Some(Arc::clone(&session));
307        session
308    }
309
310    fn init() -> HegelSession {
311        let mut cmd = hegel_command();
312        cmd.arg("--stdio").arg("--verbosity").arg("normal");
313
314        cmd.env("PYTHONUNBUFFERED", "1");
315        let log_file = server_log_file();
316        cmd.stdin(Stdio::piped());
317        cmd.stdout(Stdio::piped());
318        cmd.stderr(Stdio::from(log_file));
319
320        let mut child = match cmd.spawn() {
321            Ok(child) => child,
322            Err(e) => panic!("Failed to spawn hegel server: {e}"), // nocov
323        };
324
325        let child_stdin = child.stdin.take().expect("Failed to take child stdin");
326        let child_stdout = child.stdout.take().expect("Failed to take child stdout");
327
328        let connection = Connection::new(Box::new(child_stdout), Box::new(child_stdin));
329        let mut control = connection.control_stream();
330
331        // Derive the binary path before the handshake so it's available for error messages.
332        let binary_path = std::env::var(HEGEL_SERVER_COMMAND_ENV).ok();
333
334        // Handshake
335        let handshake_result = control
336            .send_request(HANDSHAKE_STRING.to_vec())
337            .and_then(|req_id| control.receive_reply(req_id));
338
339        let response = match handshake_result {
340            Ok(r) => r,
341            Err(e) => handle_handshake_failure(&mut child, binary_path.as_deref(), e), // nocov
342        };
343
344        let decoded = String::from_utf8_lossy(&response);
345        let server_version = match decoded.strip_prefix("Hegel/") {
346            Some(v) => v,
347            None => {
348                let _ = child.kill(); // nocov
349                panic!("Bad handshake response: {decoded:?}"); // nocov
350            }
351        };
352        let (lo, hi) = SUPPORTED_PROTOCOL_VERSIONS;
353        let version = parse_version(server_version);
354        if version < parse_version(lo) || version > parse_version(hi) {
355            // nocov start
356            let _ = child.kill();
357            panic!(
358                "hegel-rust supports protocol versions {lo} through {hi}, but \
359                 the connected server is using protocol version {server_version}. Upgrading \
360                 hegel-rust or downgrading hegel-core might help."
361            );
362            // nocov end
363        }
364
365        let child_arc = Arc::new(Mutex::new(child));
366        let child_for_monitor = Arc::clone(&child_arc);
367
368        // Monitor thread: reaps the subprocess when it exits and notifies the
369        // connection. Polls try_wait() so the lock is not held while waiting,
370        // leaving it available for __test_kill_server to call kill().
371        let conn_for_monitor = Arc::clone(&connection);
372        std::thread::spawn(move || {
373            loop {
374                {
375                    let mut guard = child_for_monitor.lock().unwrap();
376                    if matches!(guard.try_wait(), Ok(Some(_))) {
377                        drop(guard);
378                        conn_for_monitor.mark_server_exited();
379                        return;
380                    }
381                }
382                std::thread::sleep(Duration::from_millis(10));
383            }
384        });
385
386        HegelSession {
387            connection,
388            control: Mutex::new(control),
389            child: child_arc,
390        }
391    }
392}
393
394// ─── ServerTestRunner ───────────────────────────────────────────────────────
395
396fn receive_event(test_stream: &mut Stream, connection: &Connection) -> (u32, Vec<u8>) {
397    match test_stream.receive_request() {
398        Ok(event) => event,
399        // nocov start
400        Err(_) if connection.server_has_exited() => {
401            panic!("{}", server_crash_message());
402            // nocov end
403        }
404        Err(e) => unreachable!("Failed to receive event (server still running): {}", e),
405    }
406}
407
408/// Test runner that communicates with the hegel-core server.
409pub(crate) struct ServerTestRunner;
410
411impl ServerTestRunner {
412    fn run_single_test_case(
413        &self,
414        settings: &Settings,
415        run_case: &mut dyn FnMut(Box<dyn DataSource>, bool) -> TestCaseResult,
416    ) -> TestRunResult {
417        let session = HegelSession::get();
418        let connection = &session.connection;
419        let verbosity = settings.verbosity;
420
421        let mut test_stream = connection.new_stream();
422
423        let mut msg = cbor_map! {
424            "command" => "single_test_case",
425            "stream_id" => test_stream.stream_id
426        };
427        if let Some(seed) = settings.seed {
428            map_insert(&mut msg, "seed", seed);
429        }
430
431        let response = {
432            let mut control = session.control.lock().unwrap_or_else(|e| e.into_inner());
433            let send_id = control.send_request(cbor_encode(&msg));
434            send_id.and_then(|id| control.receive_reply(id))
435        }
436        .unwrap_or_else(|e| handle_channel_error(e));
437        let _: Value = cbor_decode(&response);
438
439        if verbosity == Verbosity::Debug {
440            eprintln!("single_test_case response received");
441        }
442
443        let ack_null = cbor_map! {"result" => Value::Null};
444        let mut failure_message: Option<String> = None;
445        let mut passed = true;
446
447        loop {
448            let (event_id, event_payload) = receive_event(&mut test_stream, connection);
449
450            let event: Value = cbor_decode(&event_payload);
451            let event_type = map_get(&event, "event")
452                .and_then(as_text)
453                .expect("Expected event in payload");
454
455            if verbosity == Verbosity::Debug {
456                eprintln!("Received event: {:?}", event);
457            }
458
459            match event_type {
460                "test_case" => {
461                    let stream_id = map_get(&event, "stream_id")
462                        .and_then(as_u64)
463                        .expect("Missing stream id") as u32;
464
465                    let test_case_stream = connection.connect_stream(stream_id);
466
467                    test_stream
468                        .write_reply(event_id, cbor_encode(&ack_null))
469                        .expect("Failed to ack test_case");
470
471                    let backend = Box::new(ServerDataSource::new(
472                        Arc::clone(connection),
473                        test_case_stream,
474                        verbosity,
475                    ));
476                    let tc_result = run_case(backend, true);
477
478                    if let TestCaseResult::Interesting { panic_message } = tc_result {
479                        passed = false;
480                        failure_message = Some(panic_message);
481                    }
482                }
483                "test_done" => {
484                    let ack_true = cbor_map! {"result" => true};
485                    test_stream
486                        .write_reply(event_id, cbor_encode(&ack_true))
487                        .expect("Failed to ack test_done");
488                    break;
489                }
490                _ => panic!("unknown event: {}", event_type), // nocov
491            }
492        }
493
494        TestRunResult {
495            passed,
496            failure_message,
497        }
498    }
499}
500
501impl TestRunner for ServerTestRunner {
502    fn run(
503        &self,
504        settings: &Settings,
505        database_key: Option<&str>,
506        run_case: &mut dyn FnMut(Box<dyn DataSource>, bool) -> TestCaseResult,
507    ) -> TestRunResult {
508        if settings.mode == Mode::SingleTestCase {
509            return self.run_single_test_case(settings, run_case);
510        }
511
512        let session = HegelSession::get();
513        let connection = &session.connection;
514        let verbosity = settings.verbosity;
515
516        let mut test_stream = connection.new_stream();
517
518        let suppress_names: Vec<Value> = settings
519            .suppress_health_check
520            .iter()
521            .map(|c| Value::Text(c.as_str().to_string()))
522            .collect();
523
524        let database_key_bytes =
525            database_key.map_or(Value::Null, |k| Value::Bytes(k.as_bytes().to_vec()));
526
527        let mut run_test_msg = cbor_map! {
528            "command" => "run_test",
529            "test_cases" => settings.test_cases,
530            "seed" => settings.seed.map_or(Value::Null, Value::from),
531            "stream_id" => test_stream.stream_id,
532            "database_key" => database_key_bytes,
533            "derandomize" => settings.derandomize
534        };
535        let db_value = match &settings.database {
536            Database::Unset => Option::None, // nocov
537            Database::Disabled => Some(Value::Null),
538            Database::Path(s) => Some(Value::Text(s.clone())),
539        };
540        if let Some(db) = db_value {
541            if let Value::Map(ref mut map) = run_test_msg {
542                map.push((Value::Text("database".to_string()), db));
543            }
544        }
545        if !suppress_names.is_empty() {
546            if let Value::Map(ref mut map) = run_test_msg {
547                map.push((
548                    Value::Text("suppress_health_check".to_string()),
549                    Value::Array(suppress_names),
550                ));
551            }
552        }
553
554        // The control stream is behind a Mutex because Stream requires &mut self.
555        // This only serializes the brief run_test send/receive — actual test
556        // execution happens on per-test streams without holding this lock.
557        // The lock is released before any error handling so the mutex is never
558        // poisoned by a server crash on one thread affecting other threads.
559        let run_test_response = {
560            let mut control = session.control.lock().unwrap_or_else(|e| e.into_inner());
561            let send_id = control.send_request(cbor_encode(&run_test_msg));
562            send_id.and_then(|id| control.receive_reply(id))
563        }
564        .unwrap_or_else(|e| handle_channel_error(e));
565        let _run_test_result: Value = cbor_decode(&run_test_response);
566
567        if verbosity == Verbosity::Debug {
568            eprintln!("run_test response received");
569        }
570
571        let result_data: Value;
572        let ack_null = cbor_map! {"result" => Value::Null};
573        loop {
574            let (event_id, event_payload) = receive_event(&mut test_stream, connection);
575
576            let event: Value = cbor_decode(&event_payload);
577            let event_type = map_get(&event, "event")
578                .and_then(as_text)
579                .expect("Expected event in payload");
580
581            if verbosity == Verbosity::Debug {
582                eprintln!("Received event: {:?}", event);
583            }
584
585            match event_type {
586                "test_case" => {
587                    let stream_id = map_get(&event, "stream_id")
588                        .and_then(as_u64)
589                        .expect("Missing stream id") as u32;
590
591                    let test_case_stream = connection.connect_stream(stream_id);
592
593                    // Ack the test_case event BEFORE running the test (prevents deadlock)
594                    test_stream
595                        .write_reply(event_id, cbor_encode(&ack_null))
596                        .expect("Failed to ack test_case");
597
598                    let backend = Box::new(ServerDataSource::new(
599                        Arc::clone(connection),
600                        test_case_stream,
601                        verbosity,
602                    ));
603                    run_case(backend, false);
604                }
605                "test_done" => {
606                    let ack_true = cbor_map! {"result" => true};
607                    test_stream
608                        .write_reply(event_id, cbor_encode(&ack_true))
609                        .expect("Failed to ack test_done");
610                    result_data = map_get(&event, "results").cloned().unwrap_or(Value::Null);
611                    break;
612                }
613                _ => panic!("unknown event: {}", event_type), // nocov
614            }
615        }
616
617        // Check for server-side errors before processing results
618        if let Some(error_msg) = map_get(&result_data, "error").and_then(as_text) {
619            panic!("Server error: {}", error_msg); // nocov
620        }
621
622        // Check for health check failure before processing results
623        if let Some(failure_msg) = map_get(&result_data, "health_check_failure").and_then(as_text) {
624            panic!("Health check failure:\n{}", failure_msg); // nocov
625        }
626
627        // Check for flaky test detection
628        if let Some(flaky_msg) = map_get(&result_data, "flaky").and_then(as_text) {
629            panic!("Flaky test detected: {}", flaky_msg);
630        }
631
632        let n_interesting = map_get(&result_data, "interesting_test_cases")
633            .and_then(as_u64)
634            .unwrap_or(0);
635
636        if verbosity == Verbosity::Debug {
637            eprintln!("Test done. interesting_test_cases={}", n_interesting);
638        }
639
640        // Process final replay test cases (one per interesting example)
641        let mut failure_message: Option<String> = None;
642        for _ in 0..n_interesting {
643            let (event_id, event_payload) = test_stream
644                .receive_request()
645                .expect("Failed to receive final test_case");
646
647            let event: Value = cbor_decode(&event_payload);
648            let event_type = map_get(&event, "event").and_then(as_text);
649            assert_eq!(event_type, Some("test_case"));
650
651            let stream_id = map_get(&event, "stream_id")
652                .and_then(as_u64)
653                .expect("Missing stream id") as u32;
654
655            let test_case_stream = connection.connect_stream(stream_id);
656
657            test_stream
658                .write_reply(event_id, cbor_encode(&ack_null))
659                .expect("Failed to ack final test_case");
660
661            let backend = Box::new(ServerDataSource::new(
662                Arc::clone(connection),
663                test_case_stream,
664                verbosity,
665            ));
666            let tc_result = run_case(backend, true);
667
668            if let TestCaseResult::Interesting { panic_message } = tc_result {
669                failure_message = Some(panic_message);
670            }
671
672            if connection.server_has_exited() {
673                panic!("{}", server_crash_message()); // nocov
674            }
675        }
676
677        let passed = map_get(&result_data, "passed")
678            .and_then(as_bool)
679            .unwrap_or(true);
680
681        TestRunResult {
682            passed,
683            failure_message,
684        }
685    }
686}
687
688// ─── Panic hook and backtrace ───────────────────────────────────────────────
689
690thread_local! {
691    /// (thread_name, thread_id, location, backtrace)
692    static LAST_PANIC_INFO: RefCell<Option<(String, String, String, Backtrace)>> = const { RefCell::new(None) };
693}
694
695/// (thread_name, thread_id, location, backtrace).
696fn take_panic_info() -> Option<(String, String, String, Backtrace)> {
697    LAST_PANIC_INFO.with(|info| info.borrow_mut().take())
698}
699
700/// Format a backtrace, optionally filtering to "short" format.
701///
702/// Short format shows only frames between `__rust_end_short_backtrace` and
703/// `__rust_begin_short_backtrace` markers, matching the default Rust panic handler.
704/// Frame numbers are renumbered to start at 0.
705// nocov start
706fn format_backtrace(bt: &Backtrace, full: bool) -> String {
707    let backtrace_str = format!("{}", bt);
708
709    if full {
710        return backtrace_str;
711    }
712
713    // Filter to short backtrace: keep lines between the markers
714    // Frame groups look like:
715    //    N: function::name
716    //              at /path/to/file.rs:123:45
717    let lines: Vec<&str> = backtrace_str.lines().collect();
718    let mut start_idx = 0;
719    let mut end_idx = lines.len();
720
721    for (i, line) in lines.iter().enumerate() {
722        if line.contains("__rust_end_short_backtrace") {
723            // Skip past this frame (find the next frame number)
724            for (j, next_line) in lines.iter().enumerate().skip(i + 1) {
725                if next_line
726                    .trim_start()
727                    .chars()
728                    .next()
729                    .map(|c| c.is_ascii_digit())
730                    .unwrap_or(false)
731                {
732                    start_idx = j;
733                    break;
734                }
735            }
736        }
737        if line.contains("__rust_begin_short_backtrace") {
738            // Find the start of this frame (the line with the frame number)
739            for (j, prev_line) in lines
740                .iter()
741                .enumerate()
742                .take(i + 1)
743                .collect::<Vec<_>>()
744                .into_iter()
745                .rev()
746            {
747                if prev_line
748                    .trim_start()
749                    .chars()
750                    .next()
751                    .map(|c| c.is_ascii_digit())
752                    .unwrap_or(false)
753                {
754                    end_idx = j;
755                    break;
756                }
757            }
758            break;
759        }
760    }
761
762    // Renumber frames starting at 0
763    let filtered: Vec<&str> = lines[start_idx..end_idx].to_vec();
764    let mut new_frame_num = 0usize;
765    let mut result = Vec::new();
766
767    for line in filtered {
768        let trimmed = line.trim_start();
769        if trimmed
770            .chars()
771            .next()
772            .map(|c| c.is_ascii_digit())
773            .unwrap_or(false)
774        {
775            // This is a frame number line like "   8: function_name"
776            // Find where the number ends (at the colon)
777            if let Some(colon_pos) = trimmed.find(':') {
778                let rest = &trimmed[colon_pos..];
779                // Preserve original indentation style (right-aligned numbers)
780                result.push(format!("{:>4}{}", new_frame_num, rest));
781                new_frame_num += 1;
782            } else {
783                result.push(line.to_string());
784            }
785        } else {
786            result.push(line.to_string());
787        }
788    }
789
790    result.join("\n")
791}
792// nocov end
793
794// Panic unconditionally prints to stderr, even if it's caught later. This results in
795// messy output during shrinking. To avoid this, we replace the panic hook with our
796// own that suppresses the printing except for the final replay.
797//
798// This is called once per process, the first time any hegel test runs.
799fn init_panic_hook() {
800    PANIC_HOOK_INIT.call_once(|| {
801        let prev_hook = panic::take_hook();
802        panic::set_hook(Box::new(move |info| {
803            if !currently_in_test_context() {
804                // use actual panic hook outside of tests
805                prev_hook(info);
806                return;
807            }
808
809            let thread = std::thread::current();
810            let thread_name = thread.name().unwrap_or("<unnamed>").to_string();
811            // ThreadId's debug output is ThreadId(N)
812            let thread_id = format!("{:?}", thread.id())
813                .trim_start_matches("ThreadId(")
814                .trim_end_matches(')')
815                .to_string();
816            let location = info
817                .location()
818                .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()))
819                .unwrap_or_else(|| "<unknown>".to_string());
820
821            let backtrace = Backtrace::capture();
822
823            LAST_PANIC_INFO
824                .with(|l| *l.borrow_mut() = Some((thread_name, thread_id, location, backtrace)));
825        }));
826    });
827}
828
829fn hegel_command() -> Command {
830    if let Ok(override_path) = std::env::var(HEGEL_SERVER_COMMAND_ENV) {
831        return Command::new(resolve_hegel_path(&override_path)); // nocov
832    }
833    let uv_path = crate::uv::find_uv();
834    let mut cmd = Command::new(uv_path);
835    cmd.args([
836        "tool",
837        "run",
838        "--from",
839        &format!("hegel-core=={HEGEL_SERVER_VERSION}"),
840        "hegel",
841    ]);
842    cmd
843}
844
845fn server_log_file() -> File {
846    std::fs::create_dir_all(HEGEL_SERVER_DIR).ok();
847    let pid = std::process::id();
848    let ix = LOG_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
849    let path = format!("{HEGEL_SERVER_DIR}/server.{pid}-{ix}.log");
850    *SERVER_LOG_PATH.lock().unwrap() = Some(path.clone());
851    OpenOptions::new()
852        .create(true)
853        .append(true)
854        .open(&path)
855        .expect("Failed to open server log file")
856}
857
858fn wait_for_exit(
859    child: &mut std::process::Child,
860    timeout: Duration,
861) -> Option<std::process::ExitStatus> {
862    let start = Instant::now();
863    loop {
864        if let Ok(Some(status)) = child.try_wait() {
865            return Some(status);
866        }
867        if start.elapsed() >= timeout {
868            return None;
869        }
870        std::thread::sleep(Duration::from_millis(10));
871    }
872}
873
874fn handle_handshake_failure(
875    child: &mut std::process::Child,
876    binary_path: Option<&str>,
877    handshake_err: impl std::fmt::Display,
878) -> ! {
879    let exit_status = wait_for_exit(child, Duration::from_millis(100));
880    let child_still_running = exit_status.is_none();
881    if child_still_running {
882        let _ = child.kill();
883        let _ = child.wait();
884        panic!(
885            "The hegel server failed during startup handshake: {handshake_err}\n\n\
886             The server process did not exit. Possibly bad virtualenv?"
887        );
888    }
889    panic!(
890        "{}",
891        startup_error_message(binary_path, exit_status.unwrap())
892    );
893}
894
895fn startup_error_message(
896    binary_path: Option<&str>,
897    exit_status: std::process::ExitStatus,
898) -> String {
899    let mut parts = Vec::new();
900
901    parts.push("The hegel server failed during startup handshake.".to_string());
902    parts.push(format!("The server process exited with {}.", exit_status));
903
904    // Version detection via --version (only when we have a binary path to check)
905    if let Some(binary_path) = binary_path {
906        let expected_version_string = format!("hegel (version {})", HEGEL_SERVER_VERSION);
907        match Command::new(binary_path).arg("--version").output() {
908            Ok(output) if output.status.success() => {
909                let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
910                if stdout != expected_version_string {
911                    parts.push(format!(
912                        "Version mismatch: expected '{}', got '{}'.",
913                        expected_version_string, stdout
914                    ));
915                }
916            }
917            Ok(_) => {
918                parts.push(format!(
919                    "'{}' --version exited unsuccessfully. Is this a hegel binary?",
920                    binary_path
921                ));
922            }
923            Err(e) => {
924                parts.push(format!(
925                    "Could not run '{}' --version: {}. Is this a hegel binary?",
926                    binary_path, e
927                ));
928            }
929        }
930    }
931
932    // Include server log contents
933    if let Some(log_path) = SERVER_LOG_PATH.lock().unwrap().clone() {
934        if let Ok(contents) = std::fs::read_to_string(&log_path) {
935            if !contents.trim().is_empty() {
936                let lines: Vec<&str> = contents.lines().collect();
937                let display_lines: Vec<&str> = lines.iter().take(3).copied().collect();
938                let mut log_section =
939                    format!("Server log ({}):\n{}", log_path, display_lines.join("\n"));
940                if lines.len() > 3 {
941                    log_section.push_str(&format!("\n... (see {} for full output)", log_path));
942                }
943                parts.push(log_section);
944            }
945        }
946    }
947
948    parts.join("\n\n")
949}
950
951fn resolve_hegel_path(path: &str) -> String {
952    let p = std::path::Path::new(path);
953    if p.exists() {
954        crate::utils::validate_executable(path);
955        return path.to_string();
956    }
957
958    // Bare name (no path separator) — try PATH lookup
959    if !path.chars().any(std::path::is_separator) {
960        if let Some(resolved) = crate::utils::which(path) {
961            crate::utils::validate_executable(&resolved);
962            return resolved;
963        }
964        panic!(
965            "Hegel server binary '{}' not found on PATH. \
966             Check that {} is set correctly, or install hegel-core.",
967            path, HEGEL_SERVER_COMMAND_ENV
968        );
969    }
970
971    panic!(
972        "Hegel server binary not found at '{}'. \
973         Check that {} is set correctly.",
974        path, HEGEL_SERVER_COMMAND_ENV
975    );
976}
977
978/// Format a server log excerpt for inclusion in error messages.
979///
980/// Returns the last 5 unindented lines and the content between them. Runs of
981/// more than 10 consecutive indented lines are truncated with a summary.
982pub fn format_log_excerpt(content: &str) -> String {
983    const MAX_UNINDENTED: usize = 5;
984    const INDENT_THRESHOLD: usize = 10;
985    const INDENT_CONTEXT: usize = 3;
986
987    let lines: Vec<&str> = content.lines().collect();
988    if lines.is_empty() {
989        return "(empty)".to_string();
990    }
991
992    // Find start: walk backwards until we've seen MAX_UNINDENTED unindented lines
993    let mut unindented_seen = 0;
994    let mut start_idx = 0;
995    for (i, line) in lines.iter().enumerate().rev() {
996        if is_log_unindented(line) {
997            unindented_seen += 1;
998            if unindented_seen >= MAX_UNINDENTED {
999                start_idx = i;
1000                break;
1001            }
1002        }
1003    }
1004
1005    // Process the relevant section, truncating long indented runs
1006    let relevant = &lines[start_idx..];
1007    let mut output: Vec<String> = Vec::new();
1008    let mut indent_run: Vec<&str> = Vec::new();
1009
1010    for &line in relevant {
1011        if is_log_unindented(line) {
1012            flush_log_indent_run(
1013                &mut indent_run,
1014                &mut output,
1015                INDENT_THRESHOLD,
1016                INDENT_CONTEXT,
1017            );
1018            output.push(line.to_string());
1019        } else {
1020            indent_run.push(line);
1021        }
1022    }
1023    flush_log_indent_run(
1024        &mut indent_run,
1025        &mut output,
1026        INDENT_THRESHOLD,
1027        INDENT_CONTEXT,
1028    );
1029
1030    output.join("\n")
1031}
1032
1033fn is_log_unindented(line: &str) -> bool {
1034    !line.is_empty() && !line.starts_with(' ') && !line.starts_with('\t')
1035}
1036
1037fn flush_log_indent_run(
1038    run: &mut Vec<&str>,
1039    output: &mut Vec<String>,
1040    threshold: usize,
1041    context: usize,
1042) {
1043    if run.is_empty() {
1044        return;
1045    }
1046    if run.len() > threshold {
1047        let keep = context.min(run.len() / 2);
1048        for &line in &run[..keep] {
1049            output.push(line.to_string());
1050        }
1051        let hidden = run.len() - 2 * keep;
1052        output.push(format!("  [...{hidden} lines...]"));
1053        for &line in &run[run.len() - keep..] {
1054            output.push(line.to_string());
1055        }
1056    } else {
1057        for &line in run.iter() {
1058            output.push(line.to_string());
1059        }
1060    }
1061    run.clear();
1062}
1063
1064fn server_log_excerpt() -> Option<String> {
1065    let log_path = SERVER_LOG_PATH.lock().unwrap().clone()?;
1066    let content = std::fs::read_to_string(log_path).ok()?;
1067    let trimmed = content.trim();
1068    if trimmed.is_empty() {
1069        return None;
1070    }
1071    Some(format_log_excerpt(trimmed))
1072}
1073
1074fn server_crash_message() -> String {
1075    const BASE: &str = "The hegel server process exited unexpectedly.";
1076    let log_path_owned = SERVER_LOG_PATH.lock().unwrap().clone();
1077    let log_path = log_path_owned.as_deref().unwrap_or(".hegel/server.log");
1078    match server_log_excerpt() {
1079        Some(excerpt) => format!("{BASE}\n\nLast server log entries:\n{excerpt}"),
1080        None => format!("{BASE}\n\n(No entries found in {log_path})"),
1081    }
1082}
1083
1084fn handle_channel_error(e: std::io::Error) -> ! {
1085    if e.kind() == std::io::ErrorKind::ConnectionAborted {
1086        panic!("{}", server_crash_message());
1087    }
1088    unreachable!("unexpected channel error: {e}")
1089}
1090
1091/// Kill the hegel server process and wait until the connection detects that it
1092/// has exited.  Only for use in tests — not part of the public API.
1093#[doc(hidden)]
1094pub fn __test_kill_server() {
1095    let guard = SESSION.lock().unwrap_or_else(|e| e.into_inner());
1096    if let Some(session) = guard.as_ref() {
1097        let child_arc = Arc::clone(&session.child);
1098        let conn = Arc::clone(&session.connection);
1099        drop(guard);
1100        let _ = child_arc.lock().unwrap().kill();
1101        while !conn.server_has_exited() {
1102            std::thread::yield_now();
1103        }
1104    }
1105}
1106
1107// ─── Public types ───────────────────────────────────────────────────────────
1108
1109/// Health checks that can be suppressed during test execution.
1110///
1111/// Health checks detect common issues with test configuration that would
1112/// otherwise cause tests to run inefficiently or not at all.
1113#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1114pub enum HealthCheck {
1115    /// Too many test cases are being filtered out via `assume()`.
1116    FilterTooMuch,
1117    /// Test execution is too slow.
1118    TooSlow,
1119    /// Generated test cases are too large.
1120    TestCasesTooLarge,
1121    /// The smallest natural input is very large.
1122    LargeInitialTestCase,
1123}
1124
1125impl HealthCheck {
1126    /// Returns all health check variants.
1127    ///
1128    /// Useful for suppressing all health checks at once:
1129    ///
1130    /// ```no_run
1131    /// use hegel::HealthCheck;
1132    ///
1133    /// #[hegel::test(suppress_health_check = HealthCheck::all())]
1134    /// fn my_test(tc: hegel::TestCase) {
1135    ///     // ...
1136    /// }
1137    /// ```
1138    pub const fn all() -> [HealthCheck; 4] {
1139        [
1140            HealthCheck::FilterTooMuch,
1141            HealthCheck::TooSlow,
1142            HealthCheck::TestCasesTooLarge,
1143            HealthCheck::LargeInitialTestCase,
1144        ]
1145    }
1146
1147    fn as_str(&self) -> &'static str {
1148        match self {
1149            HealthCheck::FilterTooMuch => "filter_too_much",
1150            HealthCheck::TooSlow => "too_slow",
1151            HealthCheck::TestCasesTooLarge => "test_cases_too_large",
1152            HealthCheck::LargeInitialTestCase => "large_initial_test_case",
1153        }
1154    }
1155}
1156
1157/// Controls the test execution mode.
1158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1159pub enum Mode {
1160    /// Run a full test (multiple test cases with shrinking). This is the default.
1161    TestRun,
1162    /// Run a single test case with no shrinking or replay. Useful for
1163    /// Antithesis workloads and other contexts where you want pure data
1164    /// generation without property-testing overhead.
1165    SingleTestCase,
1166}
1167
1168/// Controls how much output Hegel produces during test runs.
1169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1170pub enum Verbosity {
1171    /// Suppress all output.
1172    Quiet,
1173    /// Default output level.
1174    Normal,
1175    /// Show more detail about the test run.
1176    Verbose,
1177    /// Show protocol-level debug information.
1178    Debug,
1179}
1180
1181/// Configuration for a Hegel test run.
1182///
1183/// Use builder methods to customize, then pass to [`Hegel::settings`] or
1184/// the `settings` parameter of `#[hegel::test]`.
1185///
1186/// In CI environments (detected automatically), the database is disabled
1187/// and tests are derandomized by default.
1188#[derive(Debug, Clone)]
1189pub struct Settings {
1190    pub(crate) mode: Mode,
1191    pub(crate) test_cases: u64,
1192    pub(crate) verbosity: Verbosity,
1193    pub(crate) seed: Option<u64>,
1194    pub(crate) derandomize: bool,
1195    pub(crate) database: Database,
1196    pub(crate) suppress_health_check: Vec<HealthCheck>,
1197}
1198
1199impl Settings {
1200    /// Create settings with defaults. Detects CI environments automatically.
1201    pub fn new() -> Self {
1202        let in_ci = is_in_ci();
1203        Self {
1204            mode: Mode::TestRun,
1205            test_cases: 100,
1206            verbosity: Verbosity::Normal,
1207            seed: None,
1208            derandomize: in_ci,
1209            database: if in_ci {
1210                Database::Disabled
1211            } else {
1212                Database::Unset // nocov
1213            },
1214            suppress_health_check: Vec::new(),
1215        }
1216    }
1217
1218    /// Set the execution mode. Defaults to [`Mode::TestRun`].
1219    pub fn mode(mut self, mode: Mode) -> Self {
1220        self.mode = mode;
1221        self
1222    }
1223
1224    /// Set the number of test cases to run (default: 100).
1225    pub fn test_cases(mut self, n: u64) -> Self {
1226        self.test_cases = n;
1227        self
1228    }
1229
1230    /// Set the verbosity level.
1231    pub fn verbosity(mut self, verbosity: Verbosity) -> Self {
1232        self.verbosity = verbosity;
1233        self
1234    }
1235
1236    /// Set a fixed seed for reproducibility, or `None` for random.
1237    pub fn seed(mut self, seed: Option<u64>) -> Self {
1238        self.seed = seed;
1239        self
1240    }
1241
1242    /// When true, use a fixed seed derived from the test name. Enabled by default in CI.
1243    pub fn derandomize(mut self, derandomize: bool) -> Self {
1244        self.derandomize = derandomize;
1245        self
1246    }
1247
1248    /// Set the database path for storing failing examples, or `None` to disable.
1249    pub fn database(mut self, database: Option<String>) -> Self {
1250        self.database = match database {
1251            None => Database::Disabled,
1252            Some(path) => Database::Path(path),
1253        };
1254        self
1255    }
1256
1257    /// Suppress one or more health checks so they do not cause test failure.
1258    ///
1259    /// Health checks detect common issues like excessive filtering or slow
1260    /// tests. Use this to suppress specific checks when they are expected.
1261    ///
1262    /// # Example
1263    ///
1264    /// ```no_run
1265    /// use hegel::{HealthCheck, Verbosity};
1266    /// use hegel::generators as gs;
1267    ///
1268    /// #[hegel::test(suppress_health_check = [HealthCheck::FilterTooMuch, HealthCheck::TooSlow])]
1269    /// fn my_test(tc: hegel::TestCase) {
1270    ///     let n: i32 = tc.draw(gs::integers());
1271    ///     tc.assume(n > 0);
1272    /// }
1273    /// ```
1274    pub fn suppress_health_check(mut self, checks: impl IntoIterator<Item = HealthCheck>) -> Self {
1275        self.suppress_health_check.extend(checks);
1276        self
1277    }
1278}
1279
1280impl Default for Settings {
1281    fn default() -> Self {
1282        Self::new()
1283    }
1284}
1285
1286#[derive(Debug, Clone, PartialEq, Eq)]
1287pub(crate) enum Database {
1288    Unset,
1289    Disabled,
1290    Path(String),
1291}
1292
1293// ─── Hegel test builder ─────────────────────────────────────────────────────
1294
1295// internal use only
1296#[doc(hidden)]
1297pub fn hegel<F>(test_fn: F)
1298where
1299    F: FnMut(TestCase),
1300{
1301    Hegel::new(test_fn).run();
1302}
1303
1304fn is_in_ci() -> bool {
1305    const CI_VARS: &[(&str, Option<&str>)] = &[
1306        ("CI", None),
1307        ("TF_BUILD", Some("true")),
1308        ("BUILDKITE", Some("true")),
1309        ("CIRCLECI", Some("true")),
1310        ("CIRRUS_CI", Some("true")),
1311        ("CODEBUILD_BUILD_ID", None),
1312        ("GITHUB_ACTIONS", Some("true")),
1313        ("GITLAB_CI", None),
1314        ("HEROKU_TEST_RUN_ID", None),
1315        ("TEAMCITY_VERSION", None),
1316        ("bamboo.buildKey", None),
1317    ];
1318
1319    CI_VARS.iter().any(|(key, value)| match value {
1320        None => std::env::var_os(key).is_some(),
1321        Some(expected) => std::env::var(key).ok().as_deref() == Some(expected),
1322    })
1323}
1324
1325// internal use only
1326#[doc(hidden)]
1327pub struct Hegel<F> {
1328    test_fn: F,
1329    database_key: Option<String>,
1330    test_location: Option<TestLocation>,
1331    settings: Settings,
1332}
1333
1334impl<F> Hegel<F>
1335where
1336    F: FnMut(TestCase),
1337{
1338    /// Create a new test builder with default settings.
1339    pub fn new(test_fn: F) -> Self {
1340        Self {
1341            test_fn,
1342            database_key: None,
1343            settings: Settings::new(),
1344            test_location: None,
1345        }
1346    }
1347
1348    /// Override the default settings.
1349    pub fn settings(mut self, settings: Settings) -> Self {
1350        self.settings = settings;
1351        self
1352    }
1353
1354    #[doc(hidden)]
1355    pub fn __database_key(mut self, key: String) -> Self {
1356        self.database_key = Some(key);
1357        self
1358    }
1359
1360    #[doc(hidden)]
1361    pub fn test_location(mut self, location: TestLocation) -> Self {
1362        self.test_location = Some(location);
1363        self
1364    }
1365
1366    /// Run the property-based tests.
1367    ///
1368    /// Panics if any test case fails.
1369    pub fn run(self) {
1370        init_panic_hook();
1371
1372        let runner = ServerTestRunner;
1373        let mut test_fn = self.test_fn;
1374        let got_interesting = AtomicBool::new(false);
1375
1376        let result = runner.run(
1377            &self.settings,
1378            self.database_key.as_deref(),
1379            &mut |backend, is_final| {
1380                let tc_result = run_test_case(backend, &mut test_fn, is_final, self.settings.mode);
1381                if matches!(&tc_result, TestCaseResult::Interesting { .. }) {
1382                    got_interesting.store(true, Ordering::SeqCst);
1383                }
1384                tc_result
1385            },
1386        );
1387
1388        let test_failed = !result.passed || got_interesting.load(Ordering::SeqCst);
1389
1390        if is_running_in_antithesis() {
1391            #[cfg(not(feature = "antithesis"))]
1392            panic!(
1393                "When Hegel is run inside of Antithesis, it requires the `antithesis` feature. \
1394                You can add it with {{ features = [\"antithesis\"] }}."
1395            );
1396
1397            #[cfg(feature = "antithesis")]
1398            // nocov start
1399            if let Some(ref loc) = self.test_location {
1400                crate::antithesis::emit_assertion(loc, !test_failed);
1401                // nocov end
1402            }
1403        }
1404
1405        if test_failed {
1406            let msg = result.failure_message.as_deref().unwrap_or("unknown");
1407            panic!("Property test failed: {}", msg);
1408        }
1409    }
1410}
1411
1412// ─── Generic test case execution ────────────────────────────────────────────
1413
1414fn run_test_case(
1415    data_source: Box<dyn DataSource>,
1416    test_fn: &mut dyn FnMut(TestCase),
1417    is_final: bool,
1418    mode: Mode,
1419) -> TestCaseResult {
1420    let tc = TestCase::new(data_source, is_final, mode);
1421
1422    let result = with_test_context(|| catch_unwind(AssertUnwindSafe(|| test_fn(tc.clone()))));
1423
1424    let (tc_result, origin) = match &result {
1425        Ok(()) => (TestCaseResult::Valid, None),
1426        Err(e) => {
1427            let msg = panic_message(e);
1428            if msg == ASSUME_FAIL_STRING {
1429                (TestCaseResult::Invalid, None)
1430            } else if msg == STOP_TEST_STRING {
1431                (TestCaseResult::Overrun, None)
1432            } else if msg == LOOP_DONE_STRING {
1433                // `TestCase::repeat` returns `!`, so it exits via this
1434                // sentinel panic when its loop completes normally. Treat it
1435                // the same as a no-panic return.
1436                (TestCaseResult::Valid, None)
1437            } else {
1438                // Take panic info - we need location for origin, and print details on final
1439                let (thread_name, thread_id, location, backtrace) = take_panic_info()
1440                    .unwrap_or_else(|| {
1441                        // nocov start
1442                        (
1443                            "<unknown>".to_string(),
1444                            "?".to_string(),
1445                            "<unknown>".to_string(),
1446                            Backtrace::disabled(),
1447                        )
1448                        // nocov end
1449                    });
1450
1451                if is_final {
1452                    eprintln!(
1453                        "thread '{}' ({}) panicked at {}:",
1454                        thread_name, thread_id, location
1455                    );
1456                    eprintln!("{}", msg);
1457
1458                    // nocov start
1459                    if backtrace.status() == BacktraceStatus::Captured {
1460                        let is_full = std::env::var("RUST_BACKTRACE")
1461                            .map(|v| v == "full")
1462                            .unwrap_or(false);
1463                        let formatted = format_backtrace(&backtrace, is_full);
1464                        eprintln!("stack backtrace:\n{}", formatted);
1465                        if !is_full {
1466                            eprintln!(
1467                                "note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace."
1468                            );
1469                        }
1470                    }
1471                    // nocov end
1472                }
1473
1474                let origin = format!("Panic at {}", location);
1475                (
1476                    TestCaseResult::Interesting { panic_message: msg },
1477                    Some(origin),
1478                )
1479            }
1480        }
1481    };
1482
1483    // Send mark_complete via the data source.
1484    // Skip if test was aborted (StopTest) - the data source already closed.
1485    if !tc.test_aborted() {
1486        let status = match &tc_result {
1487            TestCaseResult::Valid => "VALID",
1488            TestCaseResult::Invalid | TestCaseResult::Overrun => "INVALID",
1489            TestCaseResult::Interesting { .. } => "INTERESTING",
1490        };
1491        tc.mark_complete(status, origin.as_deref());
1492    }
1493
1494    tc_result
1495}
1496
1497/// Extract a message from a panic payload.
1498fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
1499    if let Some(s) = payload.downcast_ref::<&str>() {
1500        s.to_string()
1501    } else if let Some(s) = payload.downcast_ref::<String>() {
1502        s.clone()
1503    } else {
1504        "Unknown panic".to_string() // nocov
1505    }
1506}
1507
1508/// Encode a ciborium::Value to CBOR bytes.
1509fn cbor_encode(value: &Value) -> Vec<u8> {
1510    let mut bytes = Vec::new();
1511    ciborium::into_writer(value, &mut bytes).expect("CBOR encoding failed");
1512    bytes
1513}
1514
1515/// Decode CBOR bytes to a ciborium::Value.
1516fn cbor_decode(bytes: &[u8]) -> Value {
1517    ciborium::from_reader(bytes).expect("CBOR decoding failed")
1518}
1519
1520#[cfg(test)]
1521#[path = "../tests/embedded/runner_tests.rs"]
1522mod tests;