1use crate::test_logging::{
65 LogRecord, ReproManifest, TestContext, TestEvent, TestLogLevel, TestLogger, TestSummary,
66};
67
68pub const NDJSON_SCHEMA_VERSION: u32 = 1;
77
78#[derive(Debug, Clone, serde::Serialize)]
82pub struct NdjsonEvent {
83 pub v: u32,
85 pub ts_us: u64,
87 pub level: &'static str,
89 pub category: &'static str,
91 pub event: String,
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub test_id: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
98 pub seed: Option<u64>,
99 #[serde(skip_serializing_if = "Option::is_none")]
101 pub subsystem: Option<String>,
102 #[serde(skip_serializing_if = "Option::is_none")]
104 pub invariant: Option<String>,
105 pub thread_id: u64,
107 pub message: String,
109 #[serde(skip_serializing_if = "serde_json::Map::is_empty")]
111 pub data: serde_json::Map<String, serde_json::Value>,
112}
113
114impl NdjsonEvent {
115 #[must_use]
117 pub fn from_record(record: &LogRecord, ctx: Option<&TestContext>) -> Self {
118 let mut data = serde_json::Map::new();
119 populate_event_data(&record.event, &mut data);
120
121 Self {
122 v: NDJSON_SCHEMA_VERSION,
123 ts_us: record.elapsed.as_micros() as u64,
124 level: record.event.level().name(),
125 category: record.event.category(),
126 event: event_type_name(&record.event),
127 test_id: ctx.map(|c| c.test_id.clone()),
128 seed: ctx.map(|c| c.seed),
129 subsystem: ctx.and_then(|c| c.subsystem.clone()),
130 invariant: ctx.and_then(|c| c.invariant.clone()),
131 thread_id: thread_id_u64(),
132 message: format!("{}", record.event),
133 data,
134 }
135 }
136
137 #[must_use]
139 pub fn to_json_line(&self) -> String {
140 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
141 }
142}
143
144fn event_type_name(event: &TestEvent) -> String {
146 match event {
147 TestEvent::ReactorPoll { .. } => "ReactorPoll",
148 TestEvent::ReactorWake { .. } => "ReactorWake",
149 TestEvent::ReactorRegister { .. } => "ReactorRegister",
150 TestEvent::ReactorDeregister { .. } => "ReactorDeregister",
151 TestEvent::IoRead { .. } => "IoRead",
152 TestEvent::IoWrite { .. } => "IoWrite",
153 TestEvent::IoConnect { .. } => "IoConnect",
154 TestEvent::IoAccept { .. } => "IoAccept",
155 TestEvent::WakerWake { .. } => "WakerWake",
156 TestEvent::WakerClone { .. } => "WakerClone",
157 TestEvent::WakerDrop { .. } => "WakerDrop",
158 TestEvent::TaskPoll { .. } => "TaskPoll",
159 TestEvent::TaskSpawn { .. } => "TaskSpawn",
160 TestEvent::TaskComplete { .. } => "TaskComplete",
161 TestEvent::TimerScheduled { .. } => "TimerScheduled",
162 TestEvent::TimerFired { .. } => "TimerFired",
163 TestEvent::RegionCreate { .. } => "RegionCreate",
164 TestEvent::RegionStateChange { .. } => "RegionStateChange",
165 TestEvent::RegionClose { .. } => "RegionClose",
166 TestEvent::ObligationCreate { .. } => "ObligationCreate",
167 TestEvent::ObligationResolve { .. } => "ObligationResolve",
168 TestEvent::Custom { .. } => "Custom",
169 TestEvent::Error { .. } => "Error",
170 TestEvent::Warn { .. } => "Warn",
171 }
172 .to_string()
173}
174
175#[allow(clippy::too_many_lines)]
179fn populate_event_data(event: &TestEvent, data: &mut serde_json::Map<String, serde_json::Value>) {
180 use serde_json::Value;
181 match event {
182 TestEvent::ReactorPoll {
183 events_returned, ..
184 } => {
185 data.insert("events_returned".into(), Value::from(*events_returned));
186 }
187 TestEvent::ReactorWake { source, .. } => {
188 data.insert("source".into(), Value::from(*source));
189 }
190 TestEvent::ReactorRegister {
191 token, interest, ..
192 } => {
193 data.insert("token".into(), Value::from(*token));
194 data.insert("readable".into(), Value::from(interest.readable));
195 data.insert("writable".into(), Value::from(interest.writable));
196 }
197 TestEvent::ReactorDeregister { token, .. }
198 | TestEvent::WakerClone { token, .. }
199 | TestEvent::WakerDrop { token, .. } => {
200 data.insert("token".into(), Value::from(*token));
201 }
202 TestEvent::IoRead {
203 token,
204 bytes,
205 would_block,
206 ..
207 }
208 | TestEvent::IoWrite {
209 token,
210 bytes,
211 would_block,
212 ..
213 } => {
214 data.insert("token".into(), Value::from(*token));
215 data.insert("bytes".into(), Value::from(*bytes));
216 data.insert("would_block".into(), Value::from(*would_block));
217 }
218 TestEvent::IoConnect { addr, result, .. } => {
219 data.insert("addr".into(), Value::from(addr.as_str()));
220 data.insert("result".into(), Value::from(*result));
221 }
222 TestEvent::IoAccept { local, peer, .. } => {
223 data.insert("local".into(), Value::from(local.as_str()));
224 data.insert("peer".into(), Value::from(peer.as_str()));
225 }
226 TestEvent::WakerWake { task_id, .. }
227 | TestEvent::TimerScheduled { task_id, .. }
228 | TestEvent::TimerFired { task_id, .. } => {
229 data.insert("task_id".into(), Value::from(*task_id));
230 }
231 TestEvent::TaskPoll {
232 task_id, result, ..
233 } => {
234 data.insert("task_id".into(), Value::from(*task_id));
235 data.insert("result".into(), Value::from(*result));
236 }
237 TestEvent::TaskSpawn { task_id, name, .. } => {
238 data.insert("task_id".into(), Value::from(*task_id));
239 if let Some(n) = name {
240 data.insert("name".into(), Value::from(n.as_str()));
241 }
242 }
243 TestEvent::TaskComplete {
244 task_id, outcome, ..
245 } => {
246 data.insert("task_id".into(), Value::from(*task_id));
247 data.insert("outcome".into(), Value::from(*outcome));
248 }
249 TestEvent::RegionCreate {
250 region_id,
251 parent_id,
252 ..
253 } => {
254 data.insert("region_id".into(), Value::from(*region_id));
255 if let Some(p) = parent_id {
256 data.insert("parent_id".into(), Value::from(*p));
257 }
258 }
259 TestEvent::RegionStateChange {
260 region_id,
261 from_state,
262 to_state,
263 ..
264 } => {
265 data.insert("region_id".into(), Value::from(*region_id));
266 data.insert("from_state".into(), Value::from(*from_state));
267 data.insert("to_state".into(), Value::from(*to_state));
268 }
269 TestEvent::RegionClose {
270 region_id,
271 task_count,
272 ..
273 } => {
274 data.insert("region_id".into(), Value::from(*region_id));
275 data.insert("task_count".into(), Value::from(*task_count));
276 }
277 TestEvent::ObligationCreate {
278 obligation_id,
279 kind,
280 holder_id,
281 ..
282 } => {
283 data.insert("obligation_id".into(), Value::from(*obligation_id));
284 data.insert("kind".into(), Value::from(*kind));
285 data.insert("holder_id".into(), Value::from(*holder_id));
286 }
287 TestEvent::ObligationResolve {
288 obligation_id,
289 resolution,
290 ..
291 } => {
292 data.insert("obligation_id".into(), Value::from(*obligation_id));
293 data.insert("resolution".into(), Value::from(*resolution));
294 }
295 TestEvent::Custom {
296 category, message, ..
297 }
298 | TestEvent::Error {
299 category, message, ..
300 }
301 | TestEvent::Warn {
302 category, message, ..
303 } => {
304 data.insert("category_detail".into(), Value::from(*category));
305 data.insert("detail".into(), Value::from(message.as_str()));
306 }
307 }
308}
309
310fn thread_id_u64() -> u64 {
312 let id = std::thread::current().id();
313 let s = format!("{id:?}");
314 s.trim_start_matches("ThreadId(")
315 .trim_end_matches(')')
316 .parse::<u64>()
317 .unwrap_or_default()
318}
319
320pub struct NdjsonLogger {
330 inner: TestLogger,
331 ctx: Option<TestContext>,
332 ndjson_enabled: bool,
333}
334
335impl NdjsonLogger {
336 #[must_use]
338 pub fn new(level: TestLogLevel, ctx: Option<TestContext>) -> Self {
339 let ndjson_enabled = std::env::var("ASUPERSYNC_TEST_NDJSON")
340 .is_ok_and(|v| v == "1" || v.eq_ignore_ascii_case("true"));
341 Self {
342 inner: TestLogger::new(level),
343 ctx,
344 ndjson_enabled,
345 }
346 }
347
348 #[must_use]
350 pub fn enabled(level: TestLogLevel, ctx: Option<TestContext>) -> Self {
351 Self {
352 inner: TestLogger::new(level),
353 ctx,
354 ndjson_enabled: true,
355 }
356 }
357
358 pub fn log(&self, event: TestEvent) {
360 self.inner.log(event.clone());
361 if self.ndjson_enabled {
362 let record = LogRecord {
363 elapsed: self.inner.elapsed(),
364 event,
365 };
366 let ndjson = NdjsonEvent::from_record(&record, self.ctx.as_ref());
367 eprintln!("{}", ndjson.to_json_line());
368 }
369 }
370
371 #[must_use]
373 pub fn inner(&self) -> &TestLogger {
374 &self.inner
375 }
376
377 #[must_use]
379 pub fn to_ndjson(&self) -> String {
380 let events = self.inner.events();
381 let mut output = String::new();
382 for record in &events {
383 let ndjson = NdjsonEvent::from_record(record, self.ctx.as_ref());
384 output.push_str(&ndjson.to_json_line());
385 output.push('\n');
386 }
387 output
388 }
389
390 pub fn write_ndjson_file(&self, path: &std::path::Path) -> std::io::Result<()> {
392 std::fs::write(path, self.to_ndjson())
393 }
394}
395
396#[must_use]
414pub fn trace_file_name(subsystem: &str, scenario: &str, seed: u64) -> String {
415 format!("{subsystem}_{scenario}_{seed:016x}.trace")
416}
417
418#[must_use]
422pub fn ndjson_file_name(subsystem: &str, scenario: &str, seed: u64) -> String {
423 format!("{subsystem}_{scenario}_{seed:016x}.ndjson")
424}
425
426#[must_use]
436pub fn artifact_bundle_dir(
437 base_dir: &std::path::Path,
438 test_id: &str,
439 seed: u64,
440) -> std::path::PathBuf {
441 base_dir.join(test_id).join(format!("{seed:016x}"))
442}
443
444#[must_use]
448pub fn artifact_base_dir() -> std::path::PathBuf {
449 std::env::var("ASUPERSYNC_TEST_ARTIFACTS_DIR").map_or_else(
450 |_| std::path::PathBuf::from("target/test-artifacts"),
451 std::path::PathBuf::from,
452 )
453}
454
455pub fn write_artifact_bundle(
464 manifest: &ReproManifest,
465 ndjson_logger: Option<&NdjsonLogger>,
466 summary: Option<&TestSummary>,
467) -> std::io::Result<std::path::PathBuf> {
468 let base = artifact_base_dir();
469 let bundle_dir = artifact_bundle_dir(&base, &manifest.scenario_id, manifest.seed);
470 std::fs::create_dir_all(&bundle_dir)?;
471
472 let manifest_json = serde_json::to_string_pretty(manifest).map_err(std::io::Error::other)?;
474 std::fs::write(bundle_dir.join("manifest.json"), manifest_json)?;
475
476 if let Some(logger) = ndjson_logger {
478 logger.write_ndjson_file(&bundle_dir.join("events.ndjson"))?;
479 }
480
481 if let Some(s) = summary {
483 let summary_json = serde_json::to_string_pretty(s).map_err(std::io::Error::other)?;
484 std::fs::write(bundle_dir.join("summary.json"), summary_json)?;
485 }
486
487 Ok(bundle_dir)
488}
489
490#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::test_logging::{Interest, TestLogLevel};
498 use std::time::Duration;
499
500 fn init_test(name: &str) {
501 crate::test_utils::init_test_logging();
502 crate::test_phase!(name);
503 }
504
505 #[test]
506 fn test_ndjson_event_from_task_spawn() {
507 init_test("test_ndjson_event_from_task_spawn");
508 let record = LogRecord {
509 elapsed: Duration::from_micros(1234),
510 event: TestEvent::TaskSpawn {
511 task_id: 42,
512 name: Some("worker".into()),
513 },
514 };
515 let ctx = TestContext::new("ndjson_test", 0xDEAD_BEEF).with_subsystem("scheduler");
516
517 let ndjson = NdjsonEvent::from_record(&record, Some(&ctx));
518 assert_eq!(ndjson.v, NDJSON_SCHEMA_VERSION);
519 assert_eq!(ndjson.ts_us, 1234);
520 assert_eq!(ndjson.level, "INFO");
521 assert_eq!(ndjson.category, "task");
522 assert_eq!(ndjson.event, "TaskSpawn");
523 assert_eq!(ndjson.test_id.as_deref(), Some("ndjson_test"));
524 assert_eq!(ndjson.seed, Some(0xDEAD_BEEF));
525 assert_eq!(ndjson.subsystem.as_deref(), Some("scheduler"));
526 assert_eq!(
527 ndjson
528 .data
529 .get("task_id")
530 .and_then(serde_json::Value::as_u64),
531 Some(42)
532 );
533 assert_eq!(
534 ndjson.data.get("name").and_then(|v| v.as_str()),
535 Some("worker")
536 );
537
538 let json_line = ndjson.to_json_line();
540 let parsed: serde_json::Value = serde_json::from_str(&json_line).expect("valid JSON");
541 assert_eq!(parsed["v"], 1);
542 assert_eq!(parsed["event"], "TaskSpawn");
543 crate::test_complete!("test_ndjson_event_from_task_spawn");
544 }
545
546 #[test]
547 fn test_ndjson_event_without_context() {
548 init_test("test_ndjson_event_without_context");
549 let record = LogRecord {
550 elapsed: Duration::from_millis(5),
551 event: TestEvent::ReactorPoll {
552 timeout: None,
553 events_returned: 3,
554 duration: Duration::from_micros(100),
555 },
556 };
557
558 let ndjson = NdjsonEvent::from_record(&record, None);
559 assert!(ndjson.test_id.is_none());
560 assert!(ndjson.seed.is_none());
561 assert!(ndjson.subsystem.is_none());
562 assert_eq!(ndjson.category, "reactor");
563 assert_eq!(ndjson.event, "ReactorPoll");
564
565 let json_line = ndjson.to_json_line();
566 let parsed: serde_json::Value = serde_json::from_str(&json_line).expect("valid JSON");
567 assert!(parsed.get("test_id").is_none());
568 assert!(parsed.get("seed").is_none());
569 crate::test_complete!("test_ndjson_event_without_context");
570 }
571
572 #[test]
573 fn test_ndjson_logger_captures_and_exports() {
574 init_test("test_ndjson_logger_captures_and_exports");
575 let ctx = TestContext::new("ndjson_export", 0x42).with_subsystem("io");
576 let logger = NdjsonLogger::enabled(TestLogLevel::Trace, Some(ctx));
577
578 logger.log(TestEvent::IoRead {
579 token: 5,
580 bytes: 1024,
581 would_block: false,
582 });
583 logger.log(TestEvent::IoWrite {
584 token: 5,
585 bytes: 512,
586 would_block: false,
587 });
588 logger.log(TestEvent::TaskSpawn {
589 task_id: 1,
590 name: None,
591 });
592
593 let ndjson_output = logger.to_ndjson();
594 let lines: Vec<&str> = ndjson_output.trim().lines().collect();
595 assert_eq!(lines.len(), 3, "should have 3 NDJSON lines");
596
597 for line in &lines {
599 let parsed: serde_json::Value = serde_json::from_str(line).expect("valid JSON line");
600 assert_eq!(parsed["v"], 1);
601 assert_eq!(parsed["test_id"], "ndjson_export");
602 assert_eq!(parsed["seed"], 0x42);
603 }
604
605 let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
607 assert_eq!(first["event"], "IoRead");
608 let second: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
609 assert_eq!(second["event"], "IoWrite");
610 crate::test_complete!("test_ndjson_logger_captures_and_exports");
611 }
612
613 #[test]
614 fn test_trace_file_naming() {
615 init_test("test_trace_file_naming");
616 assert_eq!(
617 trace_file_name("scheduler", "cancel_drain", 0xDEAD_BEEF),
618 "scheduler_cancel_drain_00000000deadbeef.trace"
619 );
620 assert_eq!(
621 ndjson_file_name("obligation", "leak_check", 42),
622 "obligation_leak_check_000000000000002a.ndjson"
623 );
624 crate::test_complete!("test_trace_file_naming");
625 }
626
627 #[test]
628 fn test_artifact_bundle_dir_layout() {
629 init_test("test_artifact_bundle_dir_layout");
630 let base = std::path::Path::new("/tmp/test-artifacts");
631 let dir = artifact_bundle_dir(base, "cancel_test", 0xCAFE);
632 assert_eq!(
633 dir,
634 std::path::PathBuf::from("/tmp/test-artifacts/cancel_test/000000000000cafe")
635 );
636 crate::test_complete!("test_artifact_bundle_dir_layout");
637 }
638
639 #[test]
640 fn test_write_artifact_bundle_roundtrip() {
641 init_test("test_write_artifact_bundle_roundtrip");
642
643 let tmp = tempfile::TempDir::new().expect("create temp dir");
644 std::env::set_var("ASUPERSYNC_TEST_ARTIFACTS_DIR", tmp.path());
645
646 let ctx = TestContext::new("bundle_test", 0xBEEF)
647 .with_subsystem("scheduler")
648 .with_invariant("quiescence");
649
650 let logger = NdjsonLogger::enabled(TestLogLevel::Info, Some(ctx.clone()));
651 logger.log(TestEvent::TaskSpawn {
652 task_id: 1,
653 name: Some("test_task".into()),
654 });
655 logger.log(TestEvent::TaskComplete {
656 task_id: 1,
657 outcome: "ok",
658 });
659
660 let manifest = ReproManifest::from_context(&ctx, true)
661 .with_env_snapshot()
662 .with_phases(vec!["setup".to_string(), "exercise".to_string()]);
663
664 let bundle_path =
665 write_artifact_bundle(&manifest, Some(&logger), None).expect("write bundle");
666
667 assert!(bundle_path.join("manifest.json").exists());
669 assert!(bundle_path.join("events.ndjson").exists());
670
671 let manifest_str = std::fs::read_to_string(bundle_path.join("manifest.json")).unwrap();
673 let loaded: ReproManifest = serde_json::from_str(&manifest_str).unwrap();
674 assert_eq!(loaded.seed, 0xBEEF);
675 assert_eq!(loaded.scenario_id, "bundle_test");
676 assert!(loaded.passed);
677
678 let ndjson_str = std::fs::read_to_string(bundle_path.join("events.ndjson")).unwrap();
680 let lines: Vec<&str> = ndjson_str.trim().lines().collect();
681 assert_eq!(lines.len(), 2);
682 let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
683 assert_eq!(first["event"], "TaskSpawn");
684
685 std::env::remove_var("ASUPERSYNC_TEST_ARTIFACTS_DIR");
686 crate::test_complete!("test_write_artifact_bundle_roundtrip");
687 }
688
689 #[test]
690 #[allow(clippy::too_many_lines)]
691 fn test_all_event_types_produce_valid_ndjson() {
692 init_test("test_all_event_types_produce_valid_ndjson");
693 let events = vec![
694 TestEvent::ReactorPoll {
695 timeout: None,
696 events_returned: 0,
697 duration: Duration::from_micros(10),
698 },
699 TestEvent::ReactorWake { source: "waker" },
700 TestEvent::ReactorRegister {
701 token: 1,
702 interest: Interest {
703 readable: true,
704 writable: false,
705 },
706 source_type: "tcp",
707 },
708 TestEvent::ReactorDeregister { token: 1 },
709 TestEvent::IoRead {
710 token: 1,
711 bytes: 100,
712 would_block: false,
713 },
714 TestEvent::IoWrite {
715 token: 2,
716 bytes: 200,
717 would_block: true,
718 },
719 TestEvent::IoConnect {
720 addr: "127.0.0.1:8080".into(),
721 result: "success",
722 },
723 TestEvent::IoAccept {
724 local: "0.0.0.0:9090".into(),
725 peer: "192.168.1.1:54321".into(),
726 },
727 TestEvent::WakerWake {
728 token: 10,
729 task_id: 1,
730 },
731 TestEvent::WakerClone { token: 11 },
732 TestEvent::WakerDrop { token: 12 },
733 TestEvent::TaskPoll {
734 task_id: 1,
735 result: "ready",
736 },
737 TestEvent::TaskSpawn {
738 task_id: 2,
739 name: Some("bg".into()),
740 },
741 TestEvent::TaskComplete {
742 task_id: 1,
743 outcome: "ok",
744 },
745 TestEvent::TimerScheduled {
746 deadline: Duration::from_secs(5),
747 task_id: 99,
748 },
749 TestEvent::TimerFired { task_id: 99 },
750 TestEvent::RegionCreate {
751 region_id: 1,
752 parent_id: Some(0),
753 },
754 TestEvent::RegionStateChange {
755 region_id: 1,
756 from_state: "open",
757 to_state: "closing",
758 },
759 TestEvent::RegionClose {
760 region_id: 1,
761 task_count: 3,
762 duration: Duration::from_millis(100),
763 },
764 TestEvent::ObligationCreate {
765 obligation_id: 50,
766 kind: "permit",
767 holder_id: 1,
768 },
769 TestEvent::ObligationResolve {
770 obligation_id: 50,
771 resolution: "commit",
772 },
773 TestEvent::Custom {
774 category: "test",
775 message: "hello".into(),
776 },
777 TestEvent::Error {
778 category: "test",
779 message: "oops".into(),
780 },
781 TestEvent::Warn {
782 category: "test",
783 message: "hmm".into(),
784 },
785 ];
786
787 for event in events {
788 let record = LogRecord {
789 elapsed: Duration::from_micros(100),
790 event,
791 };
792 let ndjson = NdjsonEvent::from_record(&record, None);
793 let line = ndjson.to_json_line();
794 let _parsed: serde_json::Value =
795 serde_json::from_str(&line).expect("all events must produce valid JSON");
796 }
797 crate::test_complete!("test_all_event_types_produce_valid_ndjson");
798 }
799}