1use crate::antithesis::{TestLocation, is_running_in_antithesis};
2use crate::backend::{DataSource, DataSourceError, TestCaseResult, TestRunResult, TestRunner};
3use crate::cbor_utils::{as_bool, as_text, as_u64, cbor_map, map_get, map_insert};
4use crate::control::{currently_in_test_context, with_test_context};
5use crate::protocol::{Connection, HANDSHAKE_STRING, Stream};
6use crate::test_case::{ASSUME_FAIL_STRING, LOOP_DONE_STRING, STOP_TEST_STRING, TestCase};
7use ciborium::Value;
8
9use std::backtrace::{Backtrace, BacktraceStatus};
10use std::cell::RefCell;
11use std::fs::{File, OpenOptions};
12use std::panic::{self, AssertUnwindSafe, catch_unwind};
13use std::process::{Command, Stdio};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, LazyLock, Mutex, Once};
16use std::time::{Duration, Instant};
17
18const SUPPORTED_PROTOCOL_VERSIONS: (&str, &str) = ("0.10", "0.10");
19const HEGEL_SERVER_VERSION: &str = "0.4.7";
20const HEGEL_SERVER_COMMAND_ENV: &str = "HEGEL_SERVER_COMMAND";
21const HEGEL_SERVER_DIR: &str = ".hegel";
22static SERVER_LOG_PATH: Mutex<Option<String>> = Mutex::new(None);
23static LOG_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
24static SESSION: Mutex<Option<Arc<HegelSession>>> = Mutex::new(None);
25
26static PANIC_HOOK_INIT: Once = Once::new();
27
28static PROTOCOL_DEBUG: LazyLock<bool> = LazyLock::new(|| {
31 matches!(
32 std::env::var("HEGEL_PROTOCOL_DEBUG")
33 .unwrap_or_default()
34 .to_lowercase()
35 .as_str(),
36 "1" | "true"
37 )
38});
39
40pub(crate) struct ServerDataSource {
43 connection: Arc<Connection>,
44 stream: Mutex<Stream>,
45 aborted: AtomicBool,
46 verbosity: Verbosity,
47}
48
49impl ServerDataSource {
50 pub(crate) fn new(connection: Arc<Connection>, stream: Stream, verbosity: Verbosity) -> Self {
51 ServerDataSource {
52 connection,
53 stream: Mutex::new(stream),
54 aborted: AtomicBool::new(false),
55 verbosity,
56 }
57 }
58
59 fn send_request(&self, command: &str, payload: &Value) -> Result<Value, DataSourceError> {
60 if self.aborted.load(Ordering::SeqCst) {
61 return Err(DataSourceError::StopTest);
62 }
63 let debug = *PROTOCOL_DEBUG || self.verbosity == Verbosity::Debug;
64
65 let mut entries = vec![(
66 Value::Text("command".to_string()),
67 Value::Text(command.to_string()),
68 )];
69
70 if let Value::Map(map) = payload {
71 for (k, v) in map {
72 entries.push((k.clone(), v.clone()));
73 }
74 }
75
76 let request = Value::Map(entries);
77
78 if debug {
79 eprintln!("REQUEST: {:?}", request);
80 }
81
82 let result = self
83 .stream
84 .lock()
85 .unwrap_or_else(|e| e.into_inner())
86 .request_cbor(&request);
87
88 match result {
89 Ok(response) => {
90 if debug {
91 eprintln!("RESPONSE: {:?}", response);
92 }
93 Ok(response)
94 }
95 Err(e) => {
96 let error_msg = e.to_string();
97 if error_msg.contains("UnsatisfiedAssumption") {
98 if debug {
100 eprintln!("RESPONSE: UnsatisfiedAssumption");
101 }
102 Err(DataSourceError::Assume)
103 } else if error_msg.contains("overflow")
105 || error_msg.contains("StopTest")
106 || error_msg.contains("stream is closed")
107 {
108 if debug {
109 eprintln!("RESPONSE: StopTest/overflow"); }
111 self.stream
112 .lock()
113 .unwrap_or_else(|e| e.into_inner())
114 .mark_closed();
115 self.aborted.store(true, Ordering::SeqCst);
116 Err(DataSourceError::StopTest)
117 } else if error_msg.contains("FlakyStrategyDefinition")
119 || error_msg.contains("FlakyReplay")
120 {
122 self.stream
123 .lock()
124 .unwrap_or_else(|e| e.into_inner())
125 .mark_closed();
126 self.aborted.store(true, Ordering::SeqCst);
127 Err(DataSourceError::StopTest)
128 } else if self.connection.server_has_exited() {
129 panic!("{}", server_crash_message()); } else {
131 Err(DataSourceError::ServerError(e.to_string()))
132 }
133 }
134 }
135 }
136}
137
138impl DataSource for ServerDataSource {
139 fn generate(&self, schema: &Value) -> Result<Value, DataSourceError> {
140 self.send_request("generate", &cbor_map! {"schema" => schema.clone()})
141 }
142
143 fn start_span(&self, label: u64) -> Result<(), DataSourceError> {
144 self.send_request("start_span", &cbor_map! {"label" => label})?;
145 Ok(())
146 }
147
148 fn stop_span(&self, discard: bool) -> Result<(), DataSourceError> {
149 self.send_request("stop_span", &cbor_map! {"discard" => discard})?;
150 Ok(())
151 }
152
153 fn new_collection(
154 &self,
155 min_size: u64,
156 max_size: Option<u64>,
157 ) -> Result<String, DataSourceError> {
158 let mut payload = cbor_map! {
159 "min_size" => min_size
160 };
161 if let Some(max) = max_size {
162 map_insert(&mut payload, "max_size", max);
163 }
164 let response = self.send_request("new_collection", &payload)?;
165 match response {
166 Value::Integer(i) => {
167 let n: i128 = i.into();
168 Ok(n.to_string())
169 }
170 _ => panic!(
172 "Expected integer response from new_collection, got {:?}",
173 response
174 ),
175 }
177 }
178
179 fn collection_more(&self, collection: &str) -> Result<bool, DataSourceError> {
180 let collection_id: i64 = collection.parse().unwrap();
181 let response = self.send_request(
182 "collection_more",
183 &cbor_map! { "collection_id" => collection_id },
184 )?;
185 match response {
186 Value::Bool(b) => Ok(b),
187 _ => panic!("Expected bool from collection_more, got {:?}", response), }
189 }
190
191 fn collection_reject(
193 &self,
194 collection: &str,
195 why: Option<&str>,
196 ) -> Result<(), DataSourceError> {
197 let collection_id: i64 = collection.parse().unwrap();
198 let mut payload = cbor_map! {
199 "collection_id" => collection_id
200 };
201 if let Some(reason) = why {
202 map_insert(&mut payload, "why", reason.to_string());
203 }
204 self.send_request("collection_reject", &payload)?;
205 Ok(())
206 }
208
209 fn new_pool(&self) -> Result<i128, DataSourceError> {
210 let response = self.send_request("new_pool", &cbor_map! {})?;
211 match response {
212 Value::Integer(i) => Ok(i.into()),
213 other => panic!("Expected integer response for pool id, got {:?}", other), }
215 }
216
217 fn pool_add(&self, pool_id: i128) -> Result<i128, DataSourceError> {
218 let response = self.send_request("pool_add", &cbor_map! {"pool_id" => pool_id})?;
219 match response {
220 Value::Integer(i) => Ok(i.into()),
221 other => panic!("Expected integer response for variable id, got {:?}", other), }
223 }
224
225 fn pool_generate(&self, pool_id: i128, consume: bool) -> Result<i128, DataSourceError> {
226 let response = self.send_request(
227 "pool_generate",
228 &cbor_map! {
229 "pool_id" => pool_id,
230 "consume" => consume,
231 },
232 )?;
233 match response {
234 Value::Integer(i) => Ok(i.into()),
235 other => panic!("Expected integer response for variable id, got {:?}", other), }
237 }
238
239 fn mark_complete(&self, status: &str, origin: Option<&str>) {
240 let origin_value = match origin {
241 Some(s) => Value::Text(s.to_string()),
242 None => Value::Null,
243 };
244 let mark_complete = cbor_map! {
245 "command" => "mark_complete",
246 "status" => status,
247 "origin" => origin_value
248 };
249 let mut stream = self.stream.lock().unwrap_or_else(|e| e.into_inner());
250 let _ = stream.request_cbor(&mark_complete);
251 let _ = stream.close();
252 }
253
254 fn test_aborted(&self) -> bool {
255 self.aborted.load(Ordering::SeqCst)
256 }
257}
258
259fn parse_version(s: &str) -> (u32, u32) {
263 let parts: Vec<&str> = s.split('.').collect();
264 if parts.len() != 2 {
265 panic!("invalid version string '{s}': expected 'major.minor' format");
266 }
267 let major = parts[0]
268 .parse()
269 .unwrap_or_else(|_| panic!("invalid major version in '{s}'"));
270 let minor = parts[1]
271 .parse()
272 .unwrap_or_else(|_| panic!("invalid minor version in '{s}'"));
273 (major, minor)
274}
275
276struct HegelSession {
282 connection: Arc<Connection>,
283 control: Mutex<Stream>,
288 child: Arc<Mutex<std::process::Child>>,
292}
293
294impl HegelSession {
295 fn get() -> Arc<HegelSession> {
298 let mut guard = SESSION.lock().unwrap_or_else(|e| e.into_inner());
299 if let Some(ref s) = *guard {
300 if !s.connection.server_has_exited() {
301 return Arc::clone(s);
302 }
303 }
304 init_panic_hook();
305 let session = Arc::new(HegelSession::init());
306 *guard = Some(Arc::clone(&session));
307 session
308 }
309
310 fn init() -> HegelSession {
311 let mut cmd = hegel_command();
312 cmd.arg("--stdio").arg("--verbosity").arg("normal");
313
314 cmd.env("PYTHONUNBUFFERED", "1");
315 let log_file = server_log_file();
316 cmd.stdin(Stdio::piped());
317 cmd.stdout(Stdio::piped());
318 cmd.stderr(Stdio::from(log_file));
319
320 let mut child = match cmd.spawn() {
321 Ok(child) => child,
322 Err(e) => panic!("Failed to spawn hegel server: {e}"), };
324
325 let child_stdin = child.stdin.take().expect("Failed to take child stdin");
326 let child_stdout = child.stdout.take().expect("Failed to take child stdout");
327
328 let connection = Connection::new(Box::new(child_stdout), Box::new(child_stdin));
329 let mut control = connection.control_stream();
330
331 let binary_path = std::env::var(HEGEL_SERVER_COMMAND_ENV).ok();
333
334 let handshake_result = control
336 .send_request(HANDSHAKE_STRING.to_vec())
337 .and_then(|req_id| control.receive_reply(req_id));
338
339 let response = match handshake_result {
340 Ok(r) => r,
341 Err(e) => handle_handshake_failure(&mut child, binary_path.as_deref(), e), };
343
344 let decoded = String::from_utf8_lossy(&response);
345 let server_version = match decoded.strip_prefix("Hegel/") {
346 Some(v) => v,
347 None => {
348 let _ = child.kill(); panic!("Bad handshake response: {decoded:?}"); }
351 };
352 let (lo, hi) = SUPPORTED_PROTOCOL_VERSIONS;
353 let version = parse_version(server_version);
354 if version < parse_version(lo) || version > parse_version(hi) {
355 let _ = child.kill();
357 panic!(
358 "hegel-rust supports protocol versions {lo} through {hi}, but \
359 the connected server is using protocol version {server_version}. Upgrading \
360 hegel-rust or downgrading hegel-core might help."
361 );
362 }
364
365 let child_arc = Arc::new(Mutex::new(child));
366 let child_for_monitor = Arc::clone(&child_arc);
367
368 let conn_for_monitor = Arc::clone(&connection);
372 std::thread::spawn(move || {
373 loop {
374 {
375 let mut guard = child_for_monitor.lock().unwrap();
376 if matches!(guard.try_wait(), Ok(Some(_))) {
377 drop(guard);
378 conn_for_monitor.mark_server_exited();
379 return;
380 }
381 }
382 std::thread::sleep(Duration::from_millis(10));
383 }
384 });
385
386 HegelSession {
387 connection,
388 control: Mutex::new(control),
389 child: child_arc,
390 }
391 }
392}
393
394fn receive_event(test_stream: &mut Stream, connection: &Connection) -> (u32, Vec<u8>) {
397 match test_stream.receive_request() {
398 Ok(event) => event,
399 Err(_) if connection.server_has_exited() => {
401 panic!("{}", server_crash_message());
402 }
404 Err(e) => unreachable!("Failed to receive event (server still running): {}", e),
405 }
406}
407
408pub(crate) struct ServerTestRunner;
410
411impl ServerTestRunner {
412 fn run_single_test_case(
413 &self,
414 settings: &Settings,
415 run_case: &mut dyn FnMut(Box<dyn DataSource>, bool) -> TestCaseResult,
416 ) -> TestRunResult {
417 let session = HegelSession::get();
418 let connection = &session.connection;
419 let verbosity = settings.verbosity;
420
421 let mut test_stream = connection.new_stream();
422
423 let mut msg = cbor_map! {
424 "command" => "single_test_case",
425 "stream_id" => test_stream.stream_id
426 };
427 if let Some(seed) = settings.seed {
428 map_insert(&mut msg, "seed", seed);
429 }
430
431 let response = {
432 let mut control = session.control.lock().unwrap_or_else(|e| e.into_inner());
433 let send_id = control.send_request(cbor_encode(&msg));
434 send_id.and_then(|id| control.receive_reply(id))
435 }
436 .unwrap_or_else(|e| handle_channel_error(e));
437 let _: Value = cbor_decode(&response);
438
439 if verbosity == Verbosity::Debug {
440 eprintln!("single_test_case response received");
441 }
442
443 let ack_null = cbor_map! {"result" => Value::Null};
444 let mut failure_message: Option<String> = None;
445 let mut passed = true;
446
447 loop {
448 let (event_id, event_payload) = receive_event(&mut test_stream, connection);
449
450 let event: Value = cbor_decode(&event_payload);
451 let event_type = map_get(&event, "event")
452 .and_then(as_text)
453 .expect("Expected event in payload");
454
455 if verbosity == Verbosity::Debug {
456 eprintln!("Received event: {:?}", event);
457 }
458
459 match event_type {
460 "test_case" => {
461 let stream_id = map_get(&event, "stream_id")
462 .and_then(as_u64)
463 .expect("Missing stream id") as u32;
464
465 let test_case_stream = connection.connect_stream(stream_id);
466
467 test_stream
468 .write_reply(event_id, cbor_encode(&ack_null))
469 .expect("Failed to ack test_case");
470
471 let backend = Box::new(ServerDataSource::new(
472 Arc::clone(connection),
473 test_case_stream,
474 verbosity,
475 ));
476 let tc_result = run_case(backend, true);
477
478 if let TestCaseResult::Interesting { panic_message } = tc_result {
479 passed = false;
480 failure_message = Some(panic_message);
481 }
482 }
483 "test_done" => {
484 let ack_true = cbor_map! {"result" => true};
485 test_stream
486 .write_reply(event_id, cbor_encode(&ack_true))
487 .expect("Failed to ack test_done");
488 break;
489 }
490 _ => panic!("unknown event: {}", event_type), }
492 }
493
494 TestRunResult {
495 passed,
496 failure_message,
497 }
498 }
499}
500
501impl TestRunner for ServerTestRunner {
502 fn run(
503 &self,
504 settings: &Settings,
505 database_key: Option<&str>,
506 run_case: &mut dyn FnMut(Box<dyn DataSource>, bool) -> TestCaseResult,
507 ) -> TestRunResult {
508 if settings.mode == Mode::SingleTestCase {
509 return self.run_single_test_case(settings, run_case);
510 }
511
512 let session = HegelSession::get();
513 let connection = &session.connection;
514 let verbosity = settings.verbosity;
515
516 let mut test_stream = connection.new_stream();
517
518 let suppress_names: Vec<Value> = settings
519 .suppress_health_check
520 .iter()
521 .map(|c| Value::Text(c.as_str().to_string()))
522 .collect();
523
524 let database_key_bytes =
525 database_key.map_or(Value::Null, |k| Value::Bytes(k.as_bytes().to_vec()));
526
527 let mut run_test_msg = cbor_map! {
528 "command" => "run_test",
529 "test_cases" => settings.test_cases,
530 "seed" => settings.seed.map_or(Value::Null, Value::from),
531 "stream_id" => test_stream.stream_id,
532 "database_key" => database_key_bytes,
533 "derandomize" => settings.derandomize
534 };
535 let db_value = match &settings.database {
536 Database::Unset => Option::None, Database::Disabled => Some(Value::Null),
538 Database::Path(s) => Some(Value::Text(s.clone())),
539 };
540 if let Some(db) = db_value {
541 if let Value::Map(ref mut map) = run_test_msg {
542 map.push((Value::Text("database".to_string()), db));
543 }
544 }
545 if !suppress_names.is_empty() {
546 if let Value::Map(ref mut map) = run_test_msg {
547 map.push((
548 Value::Text("suppress_health_check".to_string()),
549 Value::Array(suppress_names),
550 ));
551 }
552 }
553
554 let run_test_response = {
560 let mut control = session.control.lock().unwrap_or_else(|e| e.into_inner());
561 let send_id = control.send_request(cbor_encode(&run_test_msg));
562 send_id.and_then(|id| control.receive_reply(id))
563 }
564 .unwrap_or_else(|e| handle_channel_error(e));
565 let _run_test_result: Value = cbor_decode(&run_test_response);
566
567 if verbosity == Verbosity::Debug {
568 eprintln!("run_test response received");
569 }
570
571 let result_data: Value;
572 let ack_null = cbor_map! {"result" => Value::Null};
573 loop {
574 let (event_id, event_payload) = receive_event(&mut test_stream, connection);
575
576 let event: Value = cbor_decode(&event_payload);
577 let event_type = map_get(&event, "event")
578 .and_then(as_text)
579 .expect("Expected event in payload");
580
581 if verbosity == Verbosity::Debug {
582 eprintln!("Received event: {:?}", event);
583 }
584
585 match event_type {
586 "test_case" => {
587 let stream_id = map_get(&event, "stream_id")
588 .and_then(as_u64)
589 .expect("Missing stream id") as u32;
590
591 let test_case_stream = connection.connect_stream(stream_id);
592
593 test_stream
595 .write_reply(event_id, cbor_encode(&ack_null))
596 .expect("Failed to ack test_case");
597
598 let backend = Box::new(ServerDataSource::new(
599 Arc::clone(connection),
600 test_case_stream,
601 verbosity,
602 ));
603 run_case(backend, false);
604 }
605 "test_done" => {
606 let ack_true = cbor_map! {"result" => true};
607 test_stream
608 .write_reply(event_id, cbor_encode(&ack_true))
609 .expect("Failed to ack test_done");
610 result_data = map_get(&event, "results").cloned().unwrap_or(Value::Null);
611 break;
612 }
613 _ => panic!("unknown event: {}", event_type), }
615 }
616
617 if let Some(error_msg) = map_get(&result_data, "error").and_then(as_text) {
619 panic!("Server error: {}", error_msg); }
621
622 if let Some(failure_msg) = map_get(&result_data, "health_check_failure").and_then(as_text) {
624 panic!("Health check failure:\n{}", failure_msg); }
626
627 if let Some(flaky_msg) = map_get(&result_data, "flaky").and_then(as_text) {
629 panic!("Flaky test detected: {}", flaky_msg);
630 }
631
632 let n_interesting = map_get(&result_data, "interesting_test_cases")
633 .and_then(as_u64)
634 .unwrap_or(0);
635
636 if verbosity == Verbosity::Debug {
637 eprintln!("Test done. interesting_test_cases={}", n_interesting);
638 }
639
640 let mut failure_message: Option<String> = None;
642 for _ in 0..n_interesting {
643 let (event_id, event_payload) = test_stream
644 .receive_request()
645 .expect("Failed to receive final test_case");
646
647 let event: Value = cbor_decode(&event_payload);
648 let event_type = map_get(&event, "event").and_then(as_text);
649 assert_eq!(event_type, Some("test_case"));
650
651 let stream_id = map_get(&event, "stream_id")
652 .and_then(as_u64)
653 .expect("Missing stream id") as u32;
654
655 let test_case_stream = connection.connect_stream(stream_id);
656
657 test_stream
658 .write_reply(event_id, cbor_encode(&ack_null))
659 .expect("Failed to ack final test_case");
660
661 let backend = Box::new(ServerDataSource::new(
662 Arc::clone(connection),
663 test_case_stream,
664 verbosity,
665 ));
666 let tc_result = run_case(backend, true);
667
668 if let TestCaseResult::Interesting { panic_message } = tc_result {
669 failure_message = Some(panic_message);
670 }
671
672 if connection.server_has_exited() {
673 panic!("{}", server_crash_message()); }
675 }
676
677 let passed = map_get(&result_data, "passed")
678 .and_then(as_bool)
679 .unwrap_or(true);
680
681 TestRunResult {
682 passed,
683 failure_message,
684 }
685 }
686}
687
688thread_local! {
691 static LAST_PANIC_INFO: RefCell<Option<(String, String, String, Backtrace)>> = const { RefCell::new(None) };
693}
694
695fn take_panic_info() -> Option<(String, String, String, Backtrace)> {
697 LAST_PANIC_INFO.with(|info| info.borrow_mut().take())
698}
699
700fn format_backtrace(bt: &Backtrace, full: bool) -> String {
707 let backtrace_str = format!("{}", bt);
708
709 if full {
710 return backtrace_str;
711 }
712
713 let lines: Vec<&str> = backtrace_str.lines().collect();
718 let mut start_idx = 0;
719 let mut end_idx = lines.len();
720
721 for (i, line) in lines.iter().enumerate() {
722 if line.contains("__rust_end_short_backtrace") {
723 for (j, next_line) in lines.iter().enumerate().skip(i + 1) {
725 if next_line
726 .trim_start()
727 .chars()
728 .next()
729 .map(|c| c.is_ascii_digit())
730 .unwrap_or(false)
731 {
732 start_idx = j;
733 break;
734 }
735 }
736 }
737 if line.contains("__rust_begin_short_backtrace") {
738 for (j, prev_line) in lines
740 .iter()
741 .enumerate()
742 .take(i + 1)
743 .collect::<Vec<_>>()
744 .into_iter()
745 .rev()
746 {
747 if prev_line
748 .trim_start()
749 .chars()
750 .next()
751 .map(|c| c.is_ascii_digit())
752 .unwrap_or(false)
753 {
754 end_idx = j;
755 break;
756 }
757 }
758 break;
759 }
760 }
761
762 let filtered: Vec<&str> = lines[start_idx..end_idx].to_vec();
764 let mut new_frame_num = 0usize;
765 let mut result = Vec::new();
766
767 for line in filtered {
768 let trimmed = line.trim_start();
769 if trimmed
770 .chars()
771 .next()
772 .map(|c| c.is_ascii_digit())
773 .unwrap_or(false)
774 {
775 if let Some(colon_pos) = trimmed.find(':') {
778 let rest = &trimmed[colon_pos..];
779 result.push(format!("{:>4}{}", new_frame_num, rest));
781 new_frame_num += 1;
782 } else {
783 result.push(line.to_string());
784 }
785 } else {
786 result.push(line.to_string());
787 }
788 }
789
790 result.join("\n")
791}
792fn init_panic_hook() {
800 PANIC_HOOK_INIT.call_once(|| {
801 let prev_hook = panic::take_hook();
802 panic::set_hook(Box::new(move |info| {
803 if !currently_in_test_context() {
804 prev_hook(info);
806 return;
807 }
808
809 let thread = std::thread::current();
810 let thread_name = thread.name().unwrap_or("<unnamed>").to_string();
811 let thread_id = format!("{:?}", thread.id())
813 .trim_start_matches("ThreadId(")
814 .trim_end_matches(')')
815 .to_string();
816 let location = info
817 .location()
818 .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()))
819 .unwrap_or_else(|| "<unknown>".to_string());
820
821 let backtrace = Backtrace::capture();
822
823 LAST_PANIC_INFO
824 .with(|l| *l.borrow_mut() = Some((thread_name, thread_id, location, backtrace)));
825 }));
826 });
827}
828
829fn hegel_command() -> Command {
830 if let Ok(override_path) = std::env::var(HEGEL_SERVER_COMMAND_ENV) {
831 return Command::new(resolve_hegel_path(&override_path)); }
833 let uv_path = crate::uv::find_uv();
834 let mut cmd = Command::new(uv_path);
835 cmd.args([
836 "tool",
837 "run",
838 "--from",
839 &format!("hegel-core=={HEGEL_SERVER_VERSION}"),
840 "hegel",
841 ]);
842 cmd
843}
844
845fn server_log_file() -> File {
846 std::fs::create_dir_all(HEGEL_SERVER_DIR).ok();
847 let pid = std::process::id();
848 let ix = LOG_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
849 let path = format!("{HEGEL_SERVER_DIR}/server.{pid}-{ix}.log");
850 *SERVER_LOG_PATH.lock().unwrap() = Some(path.clone());
851 OpenOptions::new()
852 .create(true)
853 .append(true)
854 .open(&path)
855 .expect("Failed to open server log file")
856}
857
858fn wait_for_exit(
859 child: &mut std::process::Child,
860 timeout: Duration,
861) -> Option<std::process::ExitStatus> {
862 let start = Instant::now();
863 loop {
864 if let Ok(Some(status)) = child.try_wait() {
865 return Some(status);
866 }
867 if start.elapsed() >= timeout {
868 return None;
869 }
870 std::thread::sleep(Duration::from_millis(10));
871 }
872}
873
874fn handle_handshake_failure(
875 child: &mut std::process::Child,
876 binary_path: Option<&str>,
877 handshake_err: impl std::fmt::Display,
878) -> ! {
879 let exit_status = wait_for_exit(child, Duration::from_millis(100));
880 let child_still_running = exit_status.is_none();
881 if child_still_running {
882 let _ = child.kill();
883 let _ = child.wait();
884 panic!(
885 "The hegel server failed during startup handshake: {handshake_err}\n\n\
886 The server process did not exit. Possibly bad virtualenv?"
887 );
888 }
889 panic!(
890 "{}",
891 startup_error_message(binary_path, exit_status.unwrap())
892 );
893}
894
895fn startup_error_message(
896 binary_path: Option<&str>,
897 exit_status: std::process::ExitStatus,
898) -> String {
899 let mut parts = Vec::new();
900
901 parts.push("The hegel server failed during startup handshake.".to_string());
902 parts.push(format!("The server process exited with {}.", exit_status));
903
904 if let Some(binary_path) = binary_path {
906 let expected_version_string = format!("hegel (version {})", HEGEL_SERVER_VERSION);
907 match Command::new(binary_path).arg("--version").output() {
908 Ok(output) if output.status.success() => {
909 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
910 if stdout != expected_version_string {
911 parts.push(format!(
912 "Version mismatch: expected '{}', got '{}'.",
913 expected_version_string, stdout
914 ));
915 }
916 }
917 Ok(_) => {
918 parts.push(format!(
919 "'{}' --version exited unsuccessfully. Is this a hegel binary?",
920 binary_path
921 ));
922 }
923 Err(e) => {
924 parts.push(format!(
925 "Could not run '{}' --version: {}. Is this a hegel binary?",
926 binary_path, e
927 ));
928 }
929 }
930 }
931
932 if let Some(log_path) = SERVER_LOG_PATH.lock().unwrap().clone() {
934 if let Ok(contents) = std::fs::read_to_string(&log_path) {
935 if !contents.trim().is_empty() {
936 let lines: Vec<&str> = contents.lines().collect();
937 let display_lines: Vec<&str> = lines.iter().take(3).copied().collect();
938 let mut log_section =
939 format!("Server log ({}):\n{}", log_path, display_lines.join("\n"));
940 if lines.len() > 3 {
941 log_section.push_str(&format!("\n... (see {} for full output)", log_path));
942 }
943 parts.push(log_section);
944 }
945 }
946 }
947
948 parts.join("\n\n")
949}
950
951fn resolve_hegel_path(path: &str) -> String {
952 let p = std::path::Path::new(path);
953 if p.exists() {
954 crate::utils::validate_executable(path);
955 return path.to_string();
956 }
957
958 if !path.chars().any(std::path::is_separator) {
960 if let Some(resolved) = crate::utils::which(path) {
961 crate::utils::validate_executable(&resolved);
962 return resolved;
963 }
964 panic!(
965 "Hegel server binary '{}' not found on PATH. \
966 Check that {} is set correctly, or install hegel-core.",
967 path, HEGEL_SERVER_COMMAND_ENV
968 );
969 }
970
971 panic!(
972 "Hegel server binary not found at '{}'. \
973 Check that {} is set correctly.",
974 path, HEGEL_SERVER_COMMAND_ENV
975 );
976}
977
978pub fn format_log_excerpt(content: &str) -> String {
983 const MAX_UNINDENTED: usize = 5;
984 const INDENT_THRESHOLD: usize = 10;
985 const INDENT_CONTEXT: usize = 3;
986
987 let lines: Vec<&str> = content.lines().collect();
988 if lines.is_empty() {
989 return "(empty)".to_string();
990 }
991
992 let mut unindented_seen = 0;
994 let mut start_idx = 0;
995 for (i, line) in lines.iter().enumerate().rev() {
996 if is_log_unindented(line) {
997 unindented_seen += 1;
998 if unindented_seen >= MAX_UNINDENTED {
999 start_idx = i;
1000 break;
1001 }
1002 }
1003 }
1004
1005 let relevant = &lines[start_idx..];
1007 let mut output: Vec<String> = Vec::new();
1008 let mut indent_run: Vec<&str> = Vec::new();
1009
1010 for &line in relevant {
1011 if is_log_unindented(line) {
1012 flush_log_indent_run(
1013 &mut indent_run,
1014 &mut output,
1015 INDENT_THRESHOLD,
1016 INDENT_CONTEXT,
1017 );
1018 output.push(line.to_string());
1019 } else {
1020 indent_run.push(line);
1021 }
1022 }
1023 flush_log_indent_run(
1024 &mut indent_run,
1025 &mut output,
1026 INDENT_THRESHOLD,
1027 INDENT_CONTEXT,
1028 );
1029
1030 output.join("\n")
1031}
1032
1033fn is_log_unindented(line: &str) -> bool {
1034 !line.is_empty() && !line.starts_with(' ') && !line.starts_with('\t')
1035}
1036
1037fn flush_log_indent_run(
1038 run: &mut Vec<&str>,
1039 output: &mut Vec<String>,
1040 threshold: usize,
1041 context: usize,
1042) {
1043 if run.is_empty() {
1044 return;
1045 }
1046 if run.len() > threshold {
1047 let keep = context.min(run.len() / 2);
1048 for &line in &run[..keep] {
1049 output.push(line.to_string());
1050 }
1051 let hidden = run.len() - 2 * keep;
1052 output.push(format!(" [...{hidden} lines...]"));
1053 for &line in &run[run.len() - keep..] {
1054 output.push(line.to_string());
1055 }
1056 } else {
1057 for &line in run.iter() {
1058 output.push(line.to_string());
1059 }
1060 }
1061 run.clear();
1062}
1063
1064fn server_log_excerpt() -> Option<String> {
1065 let log_path = SERVER_LOG_PATH.lock().unwrap().clone()?;
1066 let content = std::fs::read_to_string(log_path).ok()?;
1067 let trimmed = content.trim();
1068 if trimmed.is_empty() {
1069 return None;
1070 }
1071 Some(format_log_excerpt(trimmed))
1072}
1073
1074fn server_crash_message() -> String {
1075 const BASE: &str = "The hegel server process exited unexpectedly.";
1076 let log_path_owned = SERVER_LOG_PATH.lock().unwrap().clone();
1077 let log_path = log_path_owned.as_deref().unwrap_or(".hegel/server.log");
1078 match server_log_excerpt() {
1079 Some(excerpt) => format!("{BASE}\n\nLast server log entries:\n{excerpt}"),
1080 None => format!("{BASE}\n\n(No entries found in {log_path})"),
1081 }
1082}
1083
1084fn handle_channel_error(e: std::io::Error) -> ! {
1085 if e.kind() == std::io::ErrorKind::ConnectionAborted {
1086 panic!("{}", server_crash_message());
1087 }
1088 unreachable!("unexpected channel error: {e}")
1089}
1090
1091#[doc(hidden)]
1094pub fn __test_kill_server() {
1095 let guard = SESSION.lock().unwrap_or_else(|e| e.into_inner());
1096 if let Some(session) = guard.as_ref() {
1097 let child_arc = Arc::clone(&session.child);
1098 let conn = Arc::clone(&session.connection);
1099 drop(guard);
1100 let _ = child_arc.lock().unwrap().kill();
1101 while !conn.server_has_exited() {
1102 std::thread::yield_now();
1103 }
1104 }
1105}
1106
1107#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1114pub enum HealthCheck {
1115 FilterTooMuch,
1117 TooSlow,
1119 TestCasesTooLarge,
1121 LargeInitialTestCase,
1123}
1124
1125impl HealthCheck {
1126 pub const fn all() -> [HealthCheck; 4] {
1139 [
1140 HealthCheck::FilterTooMuch,
1141 HealthCheck::TooSlow,
1142 HealthCheck::TestCasesTooLarge,
1143 HealthCheck::LargeInitialTestCase,
1144 ]
1145 }
1146
1147 fn as_str(&self) -> &'static str {
1148 match self {
1149 HealthCheck::FilterTooMuch => "filter_too_much",
1150 HealthCheck::TooSlow => "too_slow",
1151 HealthCheck::TestCasesTooLarge => "test_cases_too_large",
1152 HealthCheck::LargeInitialTestCase => "large_initial_test_case",
1153 }
1154 }
1155}
1156
1157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1159pub enum Mode {
1160 TestRun,
1162 SingleTestCase,
1166}
1167
1168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1170pub enum Verbosity {
1171 Quiet,
1173 Normal,
1175 Verbose,
1177 Debug,
1179}
1180
1181#[derive(Debug, Clone)]
1189pub struct Settings {
1190 pub(crate) mode: Mode,
1191 pub(crate) test_cases: u64,
1192 pub(crate) verbosity: Verbosity,
1193 pub(crate) seed: Option<u64>,
1194 pub(crate) derandomize: bool,
1195 pub(crate) database: Database,
1196 pub(crate) suppress_health_check: Vec<HealthCheck>,
1197}
1198
1199impl Settings {
1200 pub fn new() -> Self {
1202 let in_ci = is_in_ci();
1203 Self {
1204 mode: Mode::TestRun,
1205 test_cases: 100,
1206 verbosity: Verbosity::Normal,
1207 seed: None,
1208 derandomize: in_ci,
1209 database: if in_ci {
1210 Database::Disabled
1211 } else {
1212 Database::Unset },
1214 suppress_health_check: Vec::new(),
1215 }
1216 }
1217
1218 pub fn mode(mut self, mode: Mode) -> Self {
1220 self.mode = mode;
1221 self
1222 }
1223
1224 pub fn test_cases(mut self, n: u64) -> Self {
1226 self.test_cases = n;
1227 self
1228 }
1229
1230 pub fn verbosity(mut self, verbosity: Verbosity) -> Self {
1232 self.verbosity = verbosity;
1233 self
1234 }
1235
1236 pub fn seed(mut self, seed: Option<u64>) -> Self {
1238 self.seed = seed;
1239 self
1240 }
1241
1242 pub fn derandomize(mut self, derandomize: bool) -> Self {
1244 self.derandomize = derandomize;
1245 self
1246 }
1247
1248 pub fn database(mut self, database: Option<String>) -> Self {
1250 self.database = match database {
1251 None => Database::Disabled,
1252 Some(path) => Database::Path(path),
1253 };
1254 self
1255 }
1256
1257 pub fn suppress_health_check(mut self, checks: impl IntoIterator<Item = HealthCheck>) -> Self {
1275 self.suppress_health_check.extend(checks);
1276 self
1277 }
1278}
1279
1280impl Default for Settings {
1281 fn default() -> Self {
1282 Self::new()
1283 }
1284}
1285
1286#[derive(Debug, Clone, PartialEq, Eq)]
1287pub(crate) enum Database {
1288 Unset,
1289 Disabled,
1290 Path(String),
1291}
1292
1293#[doc(hidden)]
1297pub fn hegel<F>(test_fn: F)
1298where
1299 F: FnMut(TestCase),
1300{
1301 Hegel::new(test_fn).run();
1302}
1303
1304fn is_in_ci() -> bool {
1305 const CI_VARS: &[(&str, Option<&str>)] = &[
1306 ("CI", None),
1307 ("TF_BUILD", Some("true")),
1308 ("BUILDKITE", Some("true")),
1309 ("CIRCLECI", Some("true")),
1310 ("CIRRUS_CI", Some("true")),
1311 ("CODEBUILD_BUILD_ID", None),
1312 ("GITHUB_ACTIONS", Some("true")),
1313 ("GITLAB_CI", None),
1314 ("HEROKU_TEST_RUN_ID", None),
1315 ("TEAMCITY_VERSION", None),
1316 ("bamboo.buildKey", None),
1317 ];
1318
1319 CI_VARS.iter().any(|(key, value)| match value {
1320 None => std::env::var_os(key).is_some(),
1321 Some(expected) => std::env::var(key).ok().as_deref() == Some(expected),
1322 })
1323}
1324
1325#[doc(hidden)]
1327pub struct Hegel<F> {
1328 test_fn: F,
1329 database_key: Option<String>,
1330 test_location: Option<TestLocation>,
1331 settings: Settings,
1332}
1333
1334impl<F> Hegel<F>
1335where
1336 F: FnMut(TestCase),
1337{
1338 pub fn new(test_fn: F) -> Self {
1340 Self {
1341 test_fn,
1342 database_key: None,
1343 settings: Settings::new(),
1344 test_location: None,
1345 }
1346 }
1347
1348 pub fn settings(mut self, settings: Settings) -> Self {
1350 self.settings = settings;
1351 self
1352 }
1353
1354 #[doc(hidden)]
1355 pub fn __database_key(mut self, key: String) -> Self {
1356 self.database_key = Some(key);
1357 self
1358 }
1359
1360 #[doc(hidden)]
1361 pub fn test_location(mut self, location: TestLocation) -> Self {
1362 self.test_location = Some(location);
1363 self
1364 }
1365
1366 pub fn run(self) {
1370 init_panic_hook();
1371
1372 let runner = ServerTestRunner;
1373 let mut test_fn = self.test_fn;
1374 let got_interesting = AtomicBool::new(false);
1375
1376 let result = runner.run(
1377 &self.settings,
1378 self.database_key.as_deref(),
1379 &mut |backend, is_final| {
1380 let tc_result = run_test_case(backend, &mut test_fn, is_final, self.settings.mode);
1381 if matches!(&tc_result, TestCaseResult::Interesting { .. }) {
1382 got_interesting.store(true, Ordering::SeqCst);
1383 }
1384 tc_result
1385 },
1386 );
1387
1388 let test_failed = !result.passed || got_interesting.load(Ordering::SeqCst);
1389
1390 if is_running_in_antithesis() {
1391 #[cfg(not(feature = "antithesis"))]
1392 panic!(
1393 "When Hegel is run inside of Antithesis, it requires the `antithesis` feature. \
1394 You can add it with {{ features = [\"antithesis\"] }}."
1395 );
1396
1397 #[cfg(feature = "antithesis")]
1398 if let Some(ref loc) = self.test_location {
1400 crate::antithesis::emit_assertion(loc, !test_failed);
1401 }
1403 }
1404
1405 if test_failed {
1406 let msg = result.failure_message.as_deref().unwrap_or("unknown");
1407 panic!("Property test failed: {}", msg);
1408 }
1409 }
1410}
1411
1412fn run_test_case(
1415 data_source: Box<dyn DataSource>,
1416 test_fn: &mut dyn FnMut(TestCase),
1417 is_final: bool,
1418 mode: Mode,
1419) -> TestCaseResult {
1420 let tc = TestCase::new(data_source, is_final, mode);
1421
1422 let result = with_test_context(|| catch_unwind(AssertUnwindSafe(|| test_fn(tc.clone()))));
1423
1424 let (tc_result, origin) = match &result {
1425 Ok(()) => (TestCaseResult::Valid, None),
1426 Err(e) => {
1427 let msg = panic_message(e);
1428 if msg == ASSUME_FAIL_STRING {
1429 (TestCaseResult::Invalid, None)
1430 } else if msg == STOP_TEST_STRING {
1431 (TestCaseResult::Overrun, None)
1432 } else if msg == LOOP_DONE_STRING {
1433 (TestCaseResult::Valid, None)
1437 } else {
1438 let (thread_name, thread_id, location, backtrace) = take_panic_info()
1440 .unwrap_or_else(|| {
1441 (
1443 "<unknown>".to_string(),
1444 "?".to_string(),
1445 "<unknown>".to_string(),
1446 Backtrace::disabled(),
1447 )
1448 });
1450
1451 if is_final {
1452 eprintln!(
1453 "thread '{}' ({}) panicked at {}:",
1454 thread_name, thread_id, location
1455 );
1456 eprintln!("{}", msg);
1457
1458 if backtrace.status() == BacktraceStatus::Captured {
1460 let is_full = std::env::var("RUST_BACKTRACE")
1461 .map(|v| v == "full")
1462 .unwrap_or(false);
1463 let formatted = format_backtrace(&backtrace, is_full);
1464 eprintln!("stack backtrace:\n{}", formatted);
1465 if !is_full {
1466 eprintln!(
1467 "note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace."
1468 );
1469 }
1470 }
1471 }
1473
1474 let origin = format!("Panic at {}", location);
1475 (
1476 TestCaseResult::Interesting { panic_message: msg },
1477 Some(origin),
1478 )
1479 }
1480 }
1481 };
1482
1483 if !tc.test_aborted() {
1486 let status = match &tc_result {
1487 TestCaseResult::Valid => "VALID",
1488 TestCaseResult::Invalid | TestCaseResult::Overrun => "INVALID",
1489 TestCaseResult::Interesting { .. } => "INTERESTING",
1490 };
1491 tc.mark_complete(status, origin.as_deref());
1492 }
1493
1494 tc_result
1495}
1496
1497fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
1499 if let Some(s) = payload.downcast_ref::<&str>() {
1500 s.to_string()
1501 } else if let Some(s) = payload.downcast_ref::<String>() {
1502 s.clone()
1503 } else {
1504 "Unknown panic".to_string() }
1506}
1507
1508fn cbor_encode(value: &Value) -> Vec<u8> {
1510 let mut bytes = Vec::new();
1511 ciborium::into_writer(value, &mut bytes).expect("CBOR encoding failed");
1512 bytes
1513}
1514
1515fn cbor_decode(bytes: &[u8]) -> Value {
1517 ciborium::from_reader(bytes).expect("CBOR decoding failed")
1518}
1519
1520#[cfg(test)]
1521#[path = "../tests/embedded/runner_tests.rs"]
1522mod tests;